Kefir.js 3.8.0 (changelog)

Kefir — is a Reactive Programming library for JavaScript inspired by Bacon.js and RxJS, with focus on high performance and low memory usage.

Kefir has a GitHub repository, where you can send pull requests, report bugs, and have fun reading source code.

See also Deprecated API docs.

Installation

Kefir is available as an NPM and a Bower package, as well as a simple file download.

NPM

npm install kefir

Bower

bower install kefir

Downloads (3.8.0)

For Developmentkefir.js~ 100 KB
For Productionkefir.min.js~ 10 KB (when gzipped)
All fileskefir-3.8.0.zip... including documentation, tests, source maps, etc.

Kefir also available on jsDelivr.

Examples

Let's start from a quick little example to get you a feel of what is it like to program with Kefir. First we create a stream of events that will produce three numbers with interval of 100 milliseconds:

var numbers = Kefir.sequentially(100, [1, 2, 3]);

Now let's create another stream based on the first one. As you might guess, it will produce 2, 4, and 6.

var numbers2 = numbers.map(x => x * 2);

Suppose we don't want number 4 to be in the sequence, no problem, we can filter it out:

var numbers3 = numbers2.filter(x => x !== 4);

Ok, I think numbers3 stream is what we want, it's time to subscribe to it and to get the values:

numbers3.onValue(x => {
  logger.log(x);
});


Nice, here is another one. Let's this time begin from a streams based on user actions instead of timer. First we create a stream that will contain button click events:

var btnClicks = Kefir.fromEvents(document.querySelector('#ex-2-btn'), 'click');

Also let's create a stream of a text input value changes:

var inputValue = Kefir.fromEvents(document.querySelector('#ex-2-input'), 'keyup')
  .map(event => event.target.value);

That's a good beginning, we have streams representing user actions, now we can transform and combine them to create our desired result stream. First we want to have a property representing how many times user clicked the button, we will use scan method to create it:

var clicksCount = btnClicks.scan(sum => sum + 1, 0);

Now we have two numbers, first is clicks count, and second is text field content, which actually a string, but let's fix that:

var inputNumber = inputValue.map(text => parseInt(text, 10));

Done, but now it can produce the NaN if user type "banana" or something instead of a number. Let's fix this too using errors handling that Kefir provides:

var fixedInputNumber = inputNumber.flatMap(
  x => isNaN(x)
    ? Kefir.constantError('banana?')
    : Kefir.constant(x)
);

Almost done, final step is to combine our two dynamic number values. Suppose we want to multiply them:

var theResult = Kefir.combine([fixedInputNumber, clicksCount], (a, b) => a * b);

Good, let's display the result:

var outputElement = document.querySelector('#ex-2-output');

theResult
  .onValue(x => {
    outputElement.innerHTML = x;
  })
  .onError(error => {
    outputElement.innerHTML = '<span style="color:red">' + error + '</span>';
  });

* =


More examples

Also, almost any code snippet below can be run in the browser console, on this page. So you can play with Kefir right now, just open up the browser console.

Intro to Streams and Properties

Kefir supports two types of observables — streams and properties. Streams represent sequences of events made available over time. And properties represent values that change over time. The value of a property changes in response to events, which means that any stream may be easily converted to a property.

In practice, the only difference between the two types of observables is that properties may have a current value. The process of subscribing to both types of observables is the same: you call the onValue method, passing a callback function to it. But when you subscribe to a property which has a current value, the callback is called immediately (synchronously) with the current value of the property.

Create a stream

neverKefir.never()
Creates a stream that already ended and will never produce any events.

var stream = Kefir.never();
stream.log();
> [never] <end:current>
stream:  X

laterKefir.later(wait, value)
Creates a stream that produces a single value after wait milliseconds, then ends.

var stream = Kefir.later(1000, 1);
stream.log();
> [later] <value> 1
> [later] <end>
stream:  ----1X

intervalKefir.interval(interval, value)
Creates a stream that produces the same value each interval milliseconds. Never ends.

var stream = Kefir.interval(1000, 1);
stream.log();
> [interval] <value> 1
> [interval] <value> 1
> [interval] <value> 1
...
stream:  ----1----1----1----1---

sequentiallyKefir.sequentially(interval, values)
Creates a stream containing the given values (array), delivered with the given interval in milliseconds. Ends after all values are delivered.

var stream = Kefir.sequentially(1000, [1, 2, 3]);
stream.log();
> [sequentially] <value> 1
> [sequentially] <value> 2
> [sequentially] <value> 3
> [sequentially] <end>
stream:  ----1----2----3X

fromPollKefir.fromPoll(interval, fn)
Creates a stream that polls the given fn function, with the given interval in milliseconds, and emits the values returned by fn. Never ends.

var start = new Date();
var stream = Kefir.fromPoll(1000, () => new Date() - start);
stream.log();
> [fromPoll] <value> 1001
> [fromPoll] <value> 2002
> [fromPoll] <value> 3004
> [fromPoll] <value> 4006
> [fromPoll] <value> 5007
> [fromPoll] <value> 6007
...
stream:  ----•----•----•----•---
          1001 2002 3004 4006

withIntervalKefir.withInterval(interval, handler)
General method to create an interval based stream. Creates a stream that calls the given handler function, with the given interval in milliseconds. The handler function is called with one argument — an emitter.

var start = new Date();
var stream = Kefir.withInterval(1000, emitter => {
  var time = new Date() - start;
  if (time < 4000) {
    emitter.emit(time);   // emit a value
  } else {
    emitter.end();        // end the stream
  }
});
stream.log();
> [withInterval] <value> 1002
> [withInterval] <value> 2003
> [withInterval] <value> 3005
> [withInterval] <end>
stream:  ----•----•----•----X
          1002 2003 3005

You may call emitter methods several times on each interval tick, or not call them at all.

fromCallbackKefir.fromCallback(fn)
Convert a function that accepts a callback as the first argument to a stream. Emits at most one value when callback is called, then ends. The fn function will be called at most once, when the first subscriber will be added to the stream.

var stream = Kefir.fromCallback(callback => {
  // we use setTimeout here just to simulate some asynchronous activity
  setTimeout(() => callback(1), 1000);
});
stream.log();
> [fromCallback] <value> 1
> [fromCallback] <end>
stream:  ----1X

fromNodeCallbackKefir.fromNodeCallback(fn)
Similar to fromCallback, but the callback passed to fn is a Node.JS style callback — callback(error, result). If the error argument of the callback is truthy, an error will be emitted from the result stream, otherwise a value is emitted. The stream will end after the first value or on error.

var stream = Kefir.fromNodeCallback(callback => {
  // we use setTimeout here just to simulate some asynchronous activity
  setTimeout(() => callback(null, 1), 1000);
});
stream.log();
> [fromNodeCallback] <value> 1
> [fromNodeCallback] <end>
stream:  ----1X

fromEventsKefir.fromEvents(target, eventName, [transform])
Creates a stream from events on a DOM EventTarget or a Node.JS EventEmitter object, or an object that supports event listeners using on/off methods (e.g. a jQuery object).

If a transform function is provided, it will be called on each event with the same arguments and context (this) as the event listener callback. And the value returned by transform will be emitted from the stream. If no transform function is provided, the first argument of the callback is emitted by default, i.e. the function x => x is used as transform.

var stream = Kefir.fromEvents(document.body, 'click');
stream.log()
> [fromEvents] <value> MouseEvent {y: 474, x: 551 ...}
> [fromEvents] <value> MouseEvent {y: 361, x: 751 ...}
> [fromEvents] <value> MouseEvent {y: 444, x: 1120 ...}
stream:  ----•-----------•----•---
    MouseEvent   MouseEvent   MouseEvent

streamKefir.stream(subscribe)
Creates a general purpose stream. The subscribe callback is called on each activation, and if a function is returned from subscribe, it will be called on the following deactivation. The subscribe function is called with emitter as an argument, which can be used to emit events from the result stream.

var stream = Kefir.stream(emitter => {

  var count = 0;
  emitter.emit(count);

  var intervalId = setInterval(() => {
    count++;
    if (count < 4) {
      emitter.emit(count);
    } else {
      emitter.end();
    }
  }, 1000);

  return () => {
    clearInterval(intervalId);
  }

});
stream.log()
> [stream] <value:current> 0
> [stream] <value> 1
> [stream] <value> 2
> [stream] <value> 3
> [stream] <end>
stream:  0----1----2----3----X

Create a property

constantKefir.constant(value)
Creates an ended property, with the specified current value.

var property = Kefir.constant(1);
property.log();
> [constant] <value:current> 1
> [constant] <end:current>
property: 1X

constantErrorKefir.constantError(error)
Creates an ended property, with the specified current error.

var property = Kefir.constantError(1);
property.log();
> [constantError] <error:current> 1
> [constantError] <end:current>
property: eX

Convert observables

toPropertystream.toProperty([getCurrent])
Converts a stream to a property. Accepts an optional getCurrent callback, which will be called on each activation to get the current value at that moment.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.toProperty(() => 0);
result.log();
> [sequentially.toProperty] <value:current> 0
> [sequentially.toProperty] <value> 1
> [sequentially.toProperty] <value> 2
> [sequentially.toProperty] <value> 3
> [sequentially.toProperty] <end>
source:  ----1----2----3X
result: 0----1----2----3X

changesproperty.changes()
Converts a property to a stream. If the property has a current value (or error), it will be ignored (subscribers of the stream won't get it).

If you call changes on a stream, it will return a new stream with current values/errors removed.

var source = Kefir.sequentially(100, [1, 2, 3]);
var property = source.toProperty(() => 0);
var result = property.changes();
result.log();
> [sequentially.toProperty.changes] <value> 1
> [sequentially.toProperty.changes] <value> 2
> [sequentially.toProperty.changes] <value> 3
> [sequentially.toProperty.changes] <end>
property: 0----1----2----3X
result:    ----1----2----3X

Subscribe / add side effects

observeobs.observe(observer) obs.observe([onValue], [onError], [onEnd])
Subscribes the provided observer to obs. Observer is an object with 3 optional methods:

Returns a Subscription object, which has an unsubscribe method and a read-only closed property. closed indicates whether the unsubscribe method has been called or the observable have ended.

var stream = Kefir.sequentially(1000, [1, 2]);
var subscription = stream.observe({
  value(value) {
    console.log('value:', value);
  },
  error(error) {
    console.log('error:', error);
  },
  end() {
    console.log('end');
  },
});

...

// later
subscription.unsubscribe()
> value: 1
> value: 2
> end

In addition to passing in an Observer, observe can takes callbacks individually:

var stream = Kefir.sequentially(1000, [1, 2]);
stream.observe(
  function onValue(value) {
    console.log('value', value);
  },
  function onError(error) {
    console.log('error', error);
  },
  function onEnd() {
    console.log('end');
  }
);

This methods is designed to replace all other methods for subscribing (onValue, offValue, onError, etc). It's recomented to use observe instead of other methods, they will be removed eventually.

onValueobs.onValue(callback)
Subscribes callback to values on an observable.

If called on a property, which has a current value, callback will be called immediately (synchronously) with that value.

var stream = Kefir.sequentially(1000, [1, 2]);
stream.onValue(x => {
  console.log('value:', x);
});
> value: 1
> value: 2

offValueobs.offValue(callback)
Unsubscribes callback from values on an observable.

onErrorobs.onError(callback)
Subscribes callback to errors on an observable.

If called on a property, which has a current error, callback will be called immediately (synchronously) with that error.

var property = Kefir.constantError(1);
property.onError(x => {
  console.log('error:', x);
});
> error: 1

offErrorobs.offError(callback)
Unsubscribes callback from errors on an observable.

onEndobs.onEnd(callback)
Subscribes callback to ending of an observable.

If observable is already ended, callback will be called immediately (synchronously).

var stream = Kefir.sequentially(1000, [1, 2]);
stream.onEnd(() => {
  console.log('stream ended');
});
> stream ended

offEndobs.offEnd(callback)
Unsubscribes callback from ending of an observable.

onAnyobs.onAny(callback)
Subscribes callback to all three types of events. Callback is called with an event object as argument. Each event object contains three attributes — type, value, and current.

var stream = Kefir.sequentially(1000, [1, 2]);
stream.onAny(event => {
  console.log('event:', event);
});
> event: Object {type: "value", value: 1}
> event: Object {type: "error", value: 2}
> event: Object {type: "end", value: undefined}

offAnyobs.offAny(callback)
Unsubscribes an onAny subscriber.

logobs.log([name])
Turns on logging of any event to the browser console. Accepts an optional name argument that will be shown in the log if provided.

var stream = Kefir.sequentially(1000, [1, 2]);
stream.log('my stream');
> my stream <value> 1
> my stream <value> 2
> my stream <end>

offLogobs.offLog([name])
Turns off logging. If .log was called with a name argument, offLog must be called with the same name argument.

spyobs.spy([name])
Turns on spying of any event to the browser console. Similar to .log, however .spy will not cause the stream to activate by itself. Accepts an optional name argument that will be shown in the log if provided.

var stream = Kefir.sequentially(250, [1, 2, 3]);
stream.spy('spied');      // stream is *not* activated here.
stream.observe(() => {});
> spied <value> 1
> spied <value> 2
> spied <value> 3
> spied <end>

offSpyobs.offSpy([name])
Turns off spying. If .spy was called with a name argument, offSpy must be called with the same name argument.

Modify an observable

Most methods in this section create a new observable of same type* from an original one. The new observable applies some transformation to each event from the original one and emits the result of the transformation. In most cases a transformation is applied only to value events, end and error events just passes through untouched.

* For example if the original observable was a stream, then the new one will also be a stream. Same for properties.

mapobs.map(fn)
Applies the given fn function to each value from the original observable and emits the value returned by fn.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.map(x => x + 1);
result.log();
> [sequentially.map] <value> 2
> [sequentially.map] <value> 3
> [sequentially.map] <value> 4
> [sequentially.map] <end>
source: ---1---2---3X
result: ---2---3---4X

mapErrorsobs.mapErrors(fn)
Same as map but for errors.

var source = Kefir.sequentially(100, [1, 2, 3]).flatMap(Kefir.constantError);
var result = source.mapErrors(x => x * 2);
result.log();
> [sequentially.flatMap.mapErrors] <error> 2
> [sequentially.flatMap.mapErrors] <error> 4
> [sequentially.flatMap.mapErrors] <error> 6
> [sequentially.flatMap.mapErrors] <end>
source:  ---e---e---e---eX
            0   1   2   3
result:  ---e---e---e---eX
            0   2   4   6

filterobs.filter([predicate])
Filters values from the original observable using the given predicate function.

If no predicate is provided, the function x => x will be used.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.filter(x => x > 1);
result.log();
> [sequentially.filter] <value> 2
> [sequentially.filter] <value> 3
> [sequentially.filter] <end>
source: ---1---2---3X
result: -------2---3X

See also filterBy.

filterErrorsobs.filterErrors([predicate])
Same as filter but for errors.

var source = Kefir.sequentially(100, [0, 1, 2, 3]).flatMap(Kefir.constantError);
var result = source.filterErrors(x => (x % 2) === 0);
result.log();
> [sequentially.flatMap.filterErrors] <error> 0
> [sequentially.flatMap.filterErrors] <error> 2
> [sequentially.flatMap.filterErrors] <end>
source:  ---e---e---e---eX
            0   1   2   3
result:  ---e-------e----X
            0       2

takeobs.take(n)
Emits the first n values from the original observable, then ends.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.take(2);
result.log();
> [sequentially.take] <value> 1
> [sequentially.take] <value> 2
> [sequentially.take] <end>
source: ---1---2---3X
result: ---1---2X

takeErrorsobs.takeErrors(n)
Emits the first n errors from the original observable, then ends. Values just flow through.

var source = Kefir.sequentially(100, [1, 2, 3])
  .flatMap(x => Kefir.constantError(x));
var result = source.takeErrors(2);
result.log();
> [sequentially.flatMap.takeErrors] <error> 1
> [sequentially.flatMap.takeErrors] <error> 2
> [sequentially.flatMap.takeErrors] <end>
source: ---e---e---eX
           1   2   3
result: ---e---eX
           1   2

takeWhileobs.takeWhile([predicate])
Emits values from the original observable until the given predicate function applied to a value returns false. Ends when the predicate returns false.

If no predicate is provided, the function x => x will be used.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.takeWhile(x => x < 3);
result.log();
> [sequentially.takeWhile] <value> 1
> [sequentially.takeWhile] <value> 2
> [sequentially.takeWhile] <end>
source: ---1---2---3X
result: ---1---2---X

See also takeWhileBy.

lastobs.last()
Emits only the last value from the original observable.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.last();
result.log();
> [sequentially.last] <value> 3
> [sequentially.last] <end>
source: ---1---2---3X
result: -----------3X

skipobs.skip(n)
Skips the first n values from the original observable, then emits all the rest.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.skip(2);
result.log();
> [sequentially.skip] <value> 3
> [sequentially.skip] <end>
source: ---1---2---3X
result: -----------3X

skipWhileobs.skipWhile([predicate])
Skips values from the original observable until the given predicate function applied to a value returns false, then stops applying the predicate to values and emits all of them.

If no predicate is provided, the function x => x will be used.

var source = Kefir.sequentially(100, [1, 3, 2]);
var result = source.skipWhile(x => x < 3);
result.log();
> [sequentially.skipWhile] <value> 3
> [sequentially.skipWhile] <value> 2
> [sequentially.skipWhile] <end>
source: ---1---3---2X
result: -------3---2X

skipDuplicatesobs.skipDuplicates([comparator])
Skips duplicate values using === for comparison. Accepts an optional comparator function which is then used instead of ===.

var source = Kefir.sequentially(100, [1, 2, 2, 3, 1]);
var result = source.skipDuplicates();
result.log();
> [sequentially.skipDuplicates] <value> 1
> [sequentially.skipDuplicates] <value> 2
> [sequentially.skipDuplicates] <value> 3
> [sequentially.skipDuplicates] <value> 1
> [sequentially.skipDuplicates] <end>
source: ---1---2---2---3---1X
result: ---1---2-------3---1X

With custom comparator function:

var source = Kefir.sequentially(100, [1, 2, 2.1, 3, 1]);
var result = source.skipDuplicates(
  (a, b) => Math.round(a) === Math.round(b)
);
result.log();
> [sequentially.skipDuplicates] <value> 1
> [sequentially.skipDuplicates] <value> 2
> [sequentially.skipDuplicates] <value> 3
> [sequentially.skipDuplicates] <value> 1
> [sequentially.skipDuplicates] <end>
source: ---1---2---•---3---1X
                 2.1
result: ---1---2-------3---1X

diffobs.diff([fn], [seed])
On each value from the original observable, calls the fn function with the previous and current values as arguments. At first time, calls fn with seed and current value. Emits whatever fn returns.

If no seed is provided, the first value will be used as a seed, and the result observable won't emit on first value.

If no fn function is provided, (a, b) => [a, b] will be used. If you want to omit fn but provide seed, pass null as fn.

var source = Kefir.sequentially(100, [1, 2, 2, 3]);
var result = source.diff((prev, next) => next - prev, 0);
result.log();
> [sequentially.diff] <value> 1
> [sequentially.diff] <value> 1
> [sequentially.diff] <value> 0
> [sequentially.diff] <value> 1
> [sequentially.diff] <end>
source: ---1---2---2---3X
result: ---1---1---0---1X

scanobs.scan(fn, [seed])
On each value from the original observable, calls the fn function with the previous result returned by fn and the current value emitted by the original observable. At first time, calls fn with seed as previous result. Emits whatever fn returns. Always creates a property.

If no seed is provided, the first value will be used as a seed.

var source = Kefir.sequentially(100, [1, 2, 2, 3]);
var result = source.scan((prev, next) => next + prev, 0);
result.log();
> [sequentially.scan] <value:current> 0
> [sequentially.scan] <value> 1
> [sequentially.scan] <value> 3
> [sequentially.scan] <value> 5
> [sequentially.scan] <value> 8
> [sequentially.scan] <end>
source:  ---1---2---2---3X
result: 0---1---3---5---8X

flattenobs.flatten([transformer])
For this method it's expected that the source observable emits arrays. The result stream will then emit each element of these arrays.

Always returns a stream.

var source = Kefir.sequentially(100, [[1], [], [2,3]]);
var result = source.flatten();
result.log();
> [sequentially.flatten] <value> 1
> [sequentially.flatten] <value> 2
> [sequentially.flatten] <value> 3
> [sequentially.flatten] <end>
source:  --------•--------•-------- •X
               [1]       []     [2,3]
result:  --------1-----------------23X

You can also provide the transformer function which will be applied to each value from obs observable, and which is supposed to return an array. This makes flatten a pretty powerful transformation method. It allows you to do three kinds of transformations on each value: change value (like map), skip value (like filter), and respond with several values to a single value. If you want to skip a value, return an empty array, to change the value — return an array with a single new value, to emit several values — return them in an array.

var source = Kefir.sequentially(100, [1, 2, 3, 4]);
var result = source.flatten(x => {
  if (x % 2 === 0) {
    return [x * 10];
  } else {
    return [];
  }
});
result.log();
> [sequentially.flatten] <value> 20
> [sequentially.flatten] <value> 40
> [sequentially.flatten] <end>
source:  ---1---2---3---4X
result:  -------•-------•X
               20      40

See also flatMap

delayobs.delay(wait)
Delays all events by wait milliseconds, with an exception for the current value of a property, or the end of an already ended observable. Doesn't delay errors.

var source = Kefir.sequentially(200, [1, 2, 3]);
var result = source.delay(100);
result.log();

> [sequentially.delay] <value> 1
> [sequentially.delay] <value> 2
> [sequentially.delay] <value> 3
> [sequentially.delay] <end>
source:  -----1-----2-----3X
result:  --------1-----2-----3X

throttleobs.throttle(wait, [options])
Return a new throttled version of the original observable, which will emit values only at most once every wait milliseconds. If used on a property, the current value will always pass without any delay.

Accepts an optional options object similar to underscore.throttle. By default, it will emit an event as soon as it comes for the first time, and, if any new event comes during the wait period, it will emit the last of them as soon as that period is over. If you'd like to disable the leading-edge emit, pass {leading: false}. And if you'd like to disable the emit on the trailing-edge, pass {trailing: false}.

var source = Kefir.sequentially(750, [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
var result = source.throttle(2500);
result.log();
> [sequentially.throttle] <value> 1
> [sequentially.throttle] <value> 4
> [sequentially.throttle] <value> 7
> [sequentially.throttle] <value> 0
> [sequentially.throttle] <end>
source:  --1--2--3--4--5--6--7--8--9--0X
result:  --1---------4---------7---------0X

debounceobs.debounce(wait, [options])
Creates a new debounced version of the original observable. Will emit a value only after wait milliseconds period of no events. Pass {immediate: true} as an options object to cause observable to emit a value on the leading instead of the trailing edge of the wait interval. If used on a property, the current value will always pass without any delay.

var source = Kefir.sequentially(100, [1, 2, 3, 0, 0, 0, 4, 5, 6]);
source = source.filter(x => x > 0);
var result = source.debounce(250);
result.log();
> [sequentially.filter.debounce] <value> 3
> [sequentially.filter.debounce] <value> 6
> [sequentially.filter.debounce] <end>
source:  ---1---2---3---------------4---5---6X
result:  ----------------------3---------------------6X

ignoreValuesobs.ignoreValues()
Ignores all values from the original observable, emitting only errors and end.

var source = Kefir.sequentially(100, [0, -1, 2, -3])
  .flatMap(x => x < 0 ? Kefir.constantError(x) : Kefir.constant(x));
var result = source.ignoreValues()
result.log();
> [sequentially.flatMap.ignoreValues] <error> -1
> [sequentially.flatMap.ignoreValues] <error> -3
> [sequentially.flatMap.ignoreValues] <end>
source:  ---•---e---•---eX
            0  -1   2  -3
result:  -------e-------eX
               -1      -3

ignoreErrorsobs.ignoreErrors()
Ignores all errors from the original observable, emitting only values and end.

var source = Kefir.sequentially(100, [0, -1, 2, -3])
  .flatMap(x => x < 0 ? Kefir.constantError(x) : Kefir.constant(x));
var result = source.ignoreErrors()
result.log();
> [sequentially.flatMap.ignoreErrors] <value> 0
> [sequentially.flatMap.ignoreErrors] <value> 2
> [sequentially.flatMap.ignoreErrors] <end>
source:  ---•---e---•---eX
            0  -1   2  -3
result:  ---•-------•----X
            0       2

ignoreEndobs.ignoreEnd()
Ignores end of source observable.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.ignoreEnd();
result.log();
> [sequentially.ignoreEnd] <value> 1
> [sequentially.ignoreEnd] <value> 2
> [sequentially.ignoreEnd] <value> 3
source:  ---1---2---3X
result:  ---1---2---3---

beforeEndobs.beforeEnd(fn)
Allows you to insert an additional value just before the observable ends. fn will be called on obs' end with no arguments, and whatever it return will be emitted in the result stream before end.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.beforeEnd(() => 0);
result.log();
> [sequentially.beforeEnd] <value> 1
> [sequentially.beforeEnd] <value> 2
> [sequentially.beforeEnd] <value> 3
> [sequentially.beforeEnd] <value> 0
> [sequentially.beforeEnd] <end>
source:  ---1---2---3 X
result:  ---1---3---30X

slidingWindowobs.slidingWindow(max, [min])
Will emit arrays containing the last n values from the obs observable, where n is between max and min arguments. By default min equals 0.

var source = Kefir.sequentially(100, [1, 2, 3, 4, 5]);
var result = source.slidingWindow(3, 2)
result.log();
> [sequentially.slidingWindow] <value> [1, 2]
> [sequentially.slidingWindow] <value> [1, 2, 3]
> [sequentially.slidingWindow] <value> [2, 3, 4]
> [sequentially.slidingWindow] <value> [3, 4, 5]
> [sequentially.slidingWindow] <end>
source:  --------1--------2--------3--------4--------5X
result:  -----------------•--------•--------•--------•X
                      [1,2]  [1,2,3]  [2,3,4]  [3,4,5]

bufferWhileobs.bufferWhile([predicate], [options])
Passes every value from the source observable to the predicate function. If it returns true, adds the value to the buffer, otherwise flushes the buffer. Also flushes the buffer before end, but you can disable that by passing {flushOnEnd: false} as options.

The default predicate is x => x. If you want to omit predicate but pass options, pass null as predicate.

var source = Kefir.sequentially(100, [1, 2, 3, 4, 5]);
var result = source.bufferWhile(x => x !== 3);
result.log();
> [sequentially.bufferWhile] <value> [1, 2, 3]
> [sequentially.bufferWhile] <value> [4, 5]
> [sequentially.bufferWhile] <end>
source:  ---1---2---3---4---5 X
result:  -----------•--------•X
              [1,2,3]    [4,5]

bufferWithCountobs.bufferWithCount(count, [options])
Buffers all values from obs observable, and flushes the buffer every time count values have been passed through. Also flushes the buffer before end, but you can disable that by passing {flushOnEnd: false} as options.

var source = Kefir.sequentially(100, [1, 2, 3, 4, 5]);
var result = source.bufferWithCount(2);
result.log();
> [sequentially.bufferWithCount] <value> [1, 2]
> [sequentially.bufferWithCount] <value> [3, 4]
> [sequentially.bufferWithCount] <value> [5]
> [sequentially.bufferWithCount] <end>
source:  --------1--------2--------3--------4--------5 X
result:  -----------------•-----------------•---------•X
                      [1,2]             [3,4]       [5]X

bufferWithTimeOrCountobs.bufferWithTimeOrCount(interval, count, [options])
Continuously buffers values from the source observable, flushing every interval milliseconds, or immediately once count values have been stored. Also flushes the buffer before end, but you can disable that by passing {flushOnEnd: false} as options.

Limited by time:

var source = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6, 7, 8]);
var result = source.bufferWithTimeOrCount(330, 10);
result.log();
> [sequentially.bufferWithTimeOrCount] <value> [1, 2, 3]
> [sequentially.bufferWithTimeOrCount] <value> [4, 5, 6]
> [sequentially.bufferWithTimeOrCount] <value> [7, 8]
> [sequentially.bufferWithTimeOrCount] <end>
source:  ---1---2---3---4---5---6---7---8 X
result:  ------------•------------•------•X
              [1,2,3]       [4,5,6]  [7,8]

Limited by count:

var source = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6, 7, 8]);
var result = source.bufferWithTimeOrCount(330, 2);
result.log();
> [sequentially.bufferWithTimeOrCount] <value> [1, 2]
> [sequentially.bufferWithTimeOrCount] <value> [3, 4]
> [sequentially.bufferWithTimeOrCount] <value> [5, 6]
> [sequentially.bufferWithTimeOrCount] <value> [7, 8]
> [sequentially.bufferWithTimeOrCount] <end>
source:  ---1---2---3---4---5---6---7---8X
result:  -------•-------•-------•-------•X
            [1,2]   [3,4]   [5,6]   [7,8]

transduceobs.transduce(transducer)
This method allows you to use transducers in Kefir. It supports any transducers implementation that follows the transducer protocol, for example cognitect-labs/transducers-js or jlongster/transducers.js. To learn more about transducers please visit these library pages.

In the example the cognitect-labs/transducers-js library is used.

var t = transducers;
var source = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6]);
var myTransducer = t.comp(
  t.map(x => x + 10),
  t.filter(x => x % 2 === 0),
  t.take(2)
);
var result = source.transduce(myTransducer);
result.log();
> [sequentially.transduce] <value> 12
> [sequentially.transduce] <value> 14
> [sequentially.transduce] <end>
source:  ---1---2---3---4---5---6X
result:  -------•-------•X
               12      14

thruobs.thru(transformer)
Calls transformer with obs as argument and returns whatever transformer returned. Put another way, o.thru(fn) is the same as fn(o). This allows you to integrate your helper functions into chains of Kefir method calls without adding them to Observable prototype.

var source = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6]);
var transformer = obs => obs.filter(x => x > 2);
var result = source.thru(transformer).map(x => x - 1); // is the same as transformer(source).map(...)
result.log();
source:  ---1---2---3---4---5---6X
result:  -----------2---3---4---5X
> [sequentially.filter.map] <value> 2
> [sequentially.filter.map] <value> 3
> [sequentially.filter.map] <value> 4
> [sequentially.filter.map] <value> 5
> [sequentially.filter.map] <end>

withHandlerobs.withHandler(handler)
The most general transformation method. All other transformation methods above can be implemented via withHandler. Will call the handler function on each event from obs observable, passing to it two arguments: an emitter, and an event object (with same format as in onAny callback).

By default, it will not emit any values or errors, and it will not end when obs observable ends. Instead you should implement the desired behaviour in the handler function, i.e. analyse event object and call emitter methods if necessary. You can call the emitter methods several times in each handler execution, and you can also call them any time later, for example to implement delay.

var source = Kefir.sequentially(100, [0, 1, 2, 3]);
var result = source.withHandler((emitter, event) => {
  if (event.type === 'end') {
    emitter.emit('bye');
    emitter.end();
  }
  if (event.type === 'value') {
    for (var i = 0; i < event.value; i++) {
      emitter.emit(event.value);
    }
  }
});
result.log();
> [sequentially.withHandler] <value> 1
> [sequentially.withHandler] <value> 2
> [sequentially.withHandler] <value> 2
> [sequentially.withHandler] <value> 3
> [sequentially.withHandler] <value> 3
> [sequentially.withHandler] <value> 3
> [sequentially.withHandler] <value> bye
> [sequentially.withHandler] <end>
source:  ---0---1--- 2---  3 X
result:  -------•---••---••••X
                1   22   333bye

Combine observables

combineKefir.combine(obss, [passiveObss], [combinator])obs.combine(otherObs, [combinator])Returns a stream. Combines two or more observables together. On each value from any source observable (obss array), emits a combined value, generated by the combinator function from the latest values from each source observable. The combinator function is called with the latest values as arguments. If no combinator is provided, it emits an array containing the latest values.

var a = Kefir.sequentially(100, [1, 3]);
var b = Kefir.sequentially(100, [2, 4]).delay(40);

var result = Kefir.combine([a, b], (a, b) => a + b);
result.log();
> [combine] <value> 3
> [combine] <value> 5
> [combine] <value> 7
> [combine] <end>
a:       ----1----3X
b:       ------2----4X

result:  ------3--5-7X

You can also pass part of the source observables as passiveObss in a second array, the result stream won't emit on values from passiveObss, but all the values will be available in the combinator function.

var a = Kefir.sequentially(100, [1, 3]);
var b = Kefir.sequentially(100, [2, 4]).delay(40);
var c = Kefir.sequentially(60, [5, 6, 7]);

var result = Kefir.combine([a, b], [c], (a, b, c) => a + b + c);
result.log();
> [combine] <value> 9
> [combine] <value> 12
> [combine] <value> 14
> [combine] <end>
a:       ----1----3X
b:       ------2----4X
c:       --5--6--7X

result:  ------•--•-•X
               9 12 14

Also, combine supports passing objects as both obss and passiveObss. The combinator function will then be called with a single argument, a new object with the latest value from each observable. If no combinator is provided, it emits the object containing latest values.

var aStream = Kefir.sequentially(100, [1, 3]);
var bStream = Kefir.sequentially(100, [2, 4]).delay(40);

var result = Kefir.combine({ a: aStream, b: bStream });
result.log();
> [combine] <value> { a: 1, b: 2 }
> [combine] <value> { a: 3, b: 2 }
> [combine] <value> { a: 3, b: 4 }
> [combine] <end>
a:       ----1----3X
b:       ------2----4X

result:  ------•--•-•X

If there are duplicate keys in both obss and passiveObss, only the latest values from obss will appear in the combined object for the duplicated keys.

The result stream emits a value only when it has at least one value from each of source observables. Ends when all the active source observables (obss array) end.

You can also combine two observables by calling a.combine(b, combinator) if you like.

zipKefir.zip(sources, [combinator])obs.zip(otherObs, [combinator])Creates a stream with values from sources lined up with each other. For example if you have two sources with values [1, 2, 3] and [4, 5, 6, 7], the result stream will emit [1, 4], [2, 5], and [3, 6]. The result stream will emit the next value only when it has at least one value from each source.

You can also provide a combinator function. In this case, instead of emitting an array of values, they will be passed to combinator as arguments, and the returned value will be emitted (same as in combine)

Also in zip you can pass ordinary arrays along with observables in the sources, e.g. Kefir.zip([obs, [1, 2, 3], fn)]. In other words, sources is an array of observables and arrays, or only observables of course.

The result stream ends when all sources end.

var a = Kefir.sequentially(100, [0, 1, 2, 3]);
var b = Kefir.sequentially(160, [4, 5, 6]);
var c = Kefir.sequentially(100, [8, 9]).delay(260).toProperty(() => 7);
var result = Kefir.zip([a, b, c]);
result.log();
> [zip] <value> [0, 4, 7]
> [zip] <value> [1, 5, 8]
> [zip] <value> [2, 6, 9]
> [zip] <end>
a:    ----0----1----2----3X
b:    -------4-------5-------6X
c:   7-----------------8----9X

abc:  -------•---------•-----•X
       [0,4,7]   [1,5,8]     [2,6,9]

This method sometimes is used incorrectly instead of combine. Please make sure you understand the difference and are making right choice.

mergeKefir.merge(obss)obs.merge(otherObs)Merges several obss observables into a single stream i.e., simply repeats values from each source observable. Ends when all obss observables end.

You can also merge two observables by calling a.merge(b), if you like.

var a = Kefir.sequentially(100, [0, 1, 2]);
var b = Kefir.sequentially(100, [0, 1, 2]).delay(30);
var c = Kefir.sequentially(100, [0, 1, 2]).delay(60);
var abc = Kefir.merge([a, b, c]);
abc.log();
> [merge] <value> 0
> [merge] <value> 0
> [merge] <value> 0
> [merge] <value> 1
> [merge] <value> 1
> [merge] <value> 1
> [merge] <value> 2
> [merge] <value> 2
> [merge] <value> 2
> [merge] <end>
a:    ----------0---------1---------2X
b:    ------------0---------1---------2X
c:    --------------0---------1---------2X

abc:  ----------0-0-0-----1-1-1-----2-2-2X

concatKefir.concat(obss)obs.concat(otherObs)Concatenates several obss observables into one stream. Like merge, but switches to the next source only after the previous one end.

var a = Kefir.sequentially(100, [0, 1, 2]);
var b = Kefir.sequentially(100, [3, 4, 5]);

var abc = Kefir.concat([a, b]);
abc.log();
> [concat] <value> 0
> [concat] <value> 1
> [concat] <value> 2
> [concat] <value> 3
> [concat] <value> 4
> [concat] <value> 5
> [concat] <end>
a:    ---0---1---2X
b:                ---3---4---5X

abc:  ---0---1---2---3---4---5X

This method sometimes is used incorrectly instead of merge. Please make sure you understand the difference and are making right choice.

poolKefir.pool()
Pool is like merge to which you can dynamically add and remove sources. When you create a new pool it has no sources. Then you can add observables to it using the plug method, and remove them using unplug. Pool never ends.

var a = Kefir.sequentially(100, [0, 1, 2]);
var b = Kefir.sequentially(100, [0, 1, 2]).delay(30);
var c = Kefir.sequentially(100, [0, 1, 2]).delay(60);
var pool = Kefir.pool();
pool.plug(a);
pool.plug(b);
pool.plug(c);
pool.log();
> [pool] <value> 0
> [pool] <value> 0
> [pool] <value> 0
> [pool] <value> 1
> [pool] <value> 1
> [pool] <value> 1
> [pool] <value> 2
> [pool] <value> 2
> [pool] <value> 2
a:    ----------0---------1---------2X
b:    ------------0---------1---------2X
c:    --------------0---------1---------2X

pool: ----------0-0-0-----1-1-1-----2-2-2

repeatKefir.repeat(generator)
Calls the generator function which is supposed to return an observable. Emits values and errors from the spawned observable; when it ends, calls generator again to get a new one and so on.

The generator function is called with one argument — iteration number starting from 0. If a falsy value is returned from the generator, the stream ends.

var result = Kefir.repeat(i => {
  if (i < 3) {
    return Kefir.sequentially(100, [i, i]);
  } else {
    return false;
  }
});
result.log();
> [repeat] <value> 0
> [repeat] <value> 0
> [repeat] <value> 1
> [repeat] <value> 1
> [repeat] <value> 2
> [repeat] <value> 2
> [repeat] <end>
spawned 1:  ---0---0X
spawned 2:          ---1---1X
spawned 3:                  ---2---2X

result:     ---0---0---1---1---2---2X

Note that with this method it is possible to create an infinite loop. Consider this example:

var result = Kefir.repeat(() => Kefir.constant(1));

// When we subscribe to it (directly or via .log)
// we already are in an infinite loop.
result.log();

// But if we limit it with .take or something it'll work just fine.
// So the `result` stream defined like this
// may still make sense, depending on how we use it.
result.take(10).log();

It is even more dangerous if generator constantly returns an ended observable with no values (e.g. never). In this case, .take won't help, because you'll never get any single value from it, but generator will be called over and over. The only escape path here is to define an escape condition in the generator:

var result = Kefir.repeat(i => {

  // Defining that a new observable will be spawned at most 10 times
  if (i >= 10) {
    return false;
  }

  return Kefir.never();
});

So just be careful when using repeat, it's a little dangerous but it is still a great method.

flatMapobs.flatMap([transform])
Works similar to flatten, but instead of arrays, it handles observables. Like in flatten you can either provide a transform function which will return observables, or you can use the source obs observable that already emits observables.

Always returns a stream.

flatMap ends when obs and all spawned observables end.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.flatMap(x => Kefir.interval(40, x).take(4));
result.log();
> [sequentially.flatMap] <value> 1
> [sequentially.flatMap] <value> 1
> [sequentially.flatMap] <value> 1
> [sequentially.flatMap] <value> 2
> [sequentially.flatMap] <value> 1
> [sequentially.flatMap] <value> 2
> [sequentially.flatMap] <value> 2
> [sequentially.flatMap] <value> 3
> [sequentially.flatMap] <value> 2
> [sequentially.flatMap] <value> 3
> [sequentially.flatMap] <value> 3
> [sequentially.flatMap] <value> 3
> [sequentially.flatMap] <end>
source:      ----------1---------2---------3X

spawned 1:             ---1---1---1---1X
spawned 2:                       ---2---2---2---2X
spawned 3:                                 ---3---3---3---3X

result:      -------------1---1---1-2-1-2---2-3-2-3---3---3X

flatMapLatestobs.flatMapLatest([fn])
Like flatMap, but repeats events only from the latest added observable i.e., switching from one observable to another.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.flatMapLatest(x => Kefir.interval(40, x).take(4));
result.log();
> [sequentially.flatMapLatest] <value> 1
> [sequentially.flatMapLatest] <value> 1
> [sequentially.flatMapLatest] <value> 2
> [sequentially.flatMapLatest] <value> 2
> [sequentially.flatMapLatest] <value> 3
> [sequentially.flatMapLatest] <value> 3
> [sequentially.flatMapLatest] <value> 3
> [sequentially.flatMapLatest] <value> 3
> [sequentially.flatMapLatest] <end>
source:      ----------1---------2---------3X

spawned 1:             ---1---1---1---1X
spawned 2:                       ---2---2---2---2X
spawned 3:                                 ---3---3---3---3X

result:      -------------1---1-----2---2-----3---3---3---3X

flatMapFirstobs.flatMapFirst([fn])
Like flatMap, but adds a new observable only if the previous one ended. Otherwise, it just ignores the new observable.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.flatMapFirst(x => Kefir.interval(40, x).take(4));
result.log();
> [sequentially.flatMapFirst] <value> 1
> [sequentially.flatMapFirst] <value> 1
> [sequentially.flatMapFirst] <value> 1
> [sequentially.flatMapFirst] <value> 1
> [sequentially.flatMapFirst] <value> 3
> [sequentially.flatMapFirst] <value> 3
> [sequentially.flatMapFirst] <value> 3
> [sequentially.flatMapFirst] <value> 3
> [sequentially.flatMapFirst] <end>
source:      ----------1---------2---------3X

spawned 1:             ---1---1---1---1X
spawned 2:                       ---2---2---2---2X
spawned 3:                                 ---3---3---3---3X

result:      -------------1---1---1---1-------3---3---3---3X

flatMapConcatobs.flatMapConcat([fn])
Like flatMapFirst, but instead of ignoring new observables (if the previous one is still alive), it adds them to the queue. Then, when the current source ends, it takes the oldest observable from the queue, and switches to it.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.flatMapConcat(x => Kefir.interval(40, x).take(4));
result.log();
> [sequentially.flatMapConcat] <value> 1
> [sequentially.flatMapConcat] <value> 1
> [sequentially.flatMapConcat] <value> 1
> [sequentially.flatMapConcat] <value> 1
> [sequentially.flatMapConcat] <value> 2
> [sequentially.flatMapConcat] <value> 2
> [sequentially.flatMapConcat] <value> 2
> [sequentially.flatMapConcat] <value> 2
> [sequentially.flatMapConcat] <value> 3
> [sequentially.flatMapConcat] <value> 3
> [sequentially.flatMapConcat] <value> 3
> [sequentially.flatMapConcat] <value> 3
> [sequentially.flatMapConcat] <end>
source:      ----------1---------2---------3X

spawned 1:             ---1---1---1---1X
spawned 2:                             ---2---2---2---2X
spawned 3:                                             ---3---3---3---3X

result:      -------------1---1---1---1---2---2---2---2---3---3---3---3X

flatMapConcurLimitobs.flatMapConcurLimit([fn], limit)
Like flatMapConcat, but with a configurable number of concurent sources. In other words flatMapConcat is flatMapConcurLimit(fn, 1).

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.flatMapConcurLimit(x => Kefir.interval(40, x).take(6), 2);
result.log();
> [sequentially.flatMapConcurLimit] <value> 1
> [sequentially.flatMapConcurLimit] <value> 1
> [sequentially.flatMapConcurLimit] <value> 1
> [sequentially.flatMapConcurLimit] <value> 2
> [sequentially.flatMapConcurLimit] <value> 1
> [sequentially.flatMapConcurLimit] <value> 2
> [sequentially.flatMapConcurLimit] <value> 1
> [sequentially.flatMapConcurLimit] <value> 2
> [sequentially.flatMapConcurLimit] <value> 1
> [sequentially.flatMapConcurLimit] <value> 2
> [sequentially.flatMapConcurLimit] <value> 3
> [sequentially.flatMapConcurLimit] <value> 2
> [sequentially.flatMapConcurLimit] <value> 3
> [sequentially.flatMapConcurLimit] <value> 2
> [sequentially.flatMapConcurLimit] <value> 3
> [sequentially.flatMapConcurLimit] <value> 3
> [sequentially.flatMapConcurLimit] <value> 3
> [sequentially.flatMapConcurLimit] <value> 3
> [sequentially.flatMapConcurLimit] <end>
source:      ----------1---------2---------3X

spawned 1:             ---1---1---1---1---1---1X
spawned 2:                       ---2---2---2---2---2---2X
spawned 3:                                     ---3---3---3---3---3---3X

result:      -------------1---1---1-2-1-2-1-2-1-2-3-2-3-2-3---3---3---3X

flatMapErrorsobs.flatMapErrors([transform])
Same as flatMap, but operates on errors while values just flow through.

var source = Kefir.sequentially(100, [1, 2]).flatMap(Kefir.constantError);
var result = source.flatMapErrors(x => Kefir.interval(40, x).take(2));
result.log();
> [sequentially.flatMap.flatMapErrors] <value> 1
> [sequentially.flatMap.flatMapErrors] <value> 1
> [sequentially.flatMap.flatMapErrors] <value> 2
> [sequentially.flatMap.flatMapErrors] <value> 2
> [sequentially.flatMap.flatMapErrors] <end>
source:      ----------e---------eX
                       1         2

spawned 1:             ---1---1X
spawned 2:                       ---2---2X

result:      -------------1---1-----2---2X

Combine two observables

Just like in the "Modify an observable" section, most of the methods in this section will return an observable of same type as the original observable (on which the method was called).

filterByobs.filterBy(otherObs)
Works like filter, but instead of calling a predicate on each value from obs observable, it checks the last value from otherObs.

var foo = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6, 7, 8]);
var bar = Kefir.sequentially(200, [false, true, false]).delay(40).toProperty(() => true);
var result = foo.filterBy(bar);
result.log();
> [sequentially.filterBy] <value> 1
> [sequentially.filterBy] <value> 2
> [sequentially.filterBy] <value> 5
> [sequentially.filterBy] <value> 6
> [sequentially.filterBy] <end>
foo:     ----1----2----3----4----5----6----7----8X
bar:    t-----------f---------t---------fX

result:  ----1----2--------------5----6----------X

sampledByobs.sampledBy(otherObs, [combinator])
Returns a stream that emits the latest value from obs observable on each value from otherObs. Ends when otherObs ends.

You can also provide a combinator function which will be used to from the value emitted by the result stream. It is called with the latest values from obs and otherObs as arguments. The default combinator function is (a, b) => a.

var a = Kefir.sequentially(200, [2, 3]).toProperty(() => 1);
var b = Kefir.interval(100, 0).delay(40).take(5);
var result = a.sampledBy(b);
result.log();
> [sequentially.toProperty.sampledBy] <value> 1
> [sequentially.toProperty.sampledBy] <value> 2
> [sequentially.toProperty.sampledBy] <value> 2
> [sequentially.toProperty.sampledBy] <value> 3
> [sequentially.toProperty.sampledBy] <value> 3
> [sequentially.toProperty.sampledBy] <end>
a:      1---------2---------3X
b:       ------0----0----0----0----0X

result:  ------1----2----2----3----3X

skipUntilByobs.skipUntilBy(otherObs)
Skips values from obs until the first value from otherObs.

var foo = Kefir.sequentially(100, [1, 2, 3, 4]);
var bar = Kefir.later(250, 0);
var result = foo.skipUntilBy(bar);
result.log();
> [sequentially.skipUntilBy] <value> 3
> [sequentially.skipUntilBy] <value> 4
> [sequentially.skipUntilBy] <end>
foo:     ----1----2----3----4X
bar:     -----------0X

result:  --------------3----4X

takeUntilByobs.takeUntilBy(otherObs)
Takes values from obs until the first value from otherObs i.e., ends on the first value from otherObs.

var foo = Kefir.sequentially(100, [1, 2, 3, 4]);
var bar = Kefir.later(250, 0);
var result = foo.takeUntilBy(bar);
result.log();
> [sequentially.takeUntilBy] <value> 1
> [sequentially.takeUntilBy] <value> 2
> [sequentially.takeUntilBy] <end>
foo:     ----1----2----3----4X
bar:     -----------0X

result:  ----1----2-X

bufferByobs.bufferBy(otherObs, [options])
Buffers all values from obs observable, and flushes the buffer on each value from otherObs. Also flushes the buffer before end.

If options.flushOnEnd is false, the buffer won't be flushed when the main observable ends.

The result observable will emit [] in cases, when the buffer supposed to be flushed, but it's empty.

var foo = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6, 7, 8]).delay(40);
var bar = Kefir.sequentially(300, [1, 2])
var result = foo.bufferBy(bar);
result.log();
> [sequentially.delay.bufferBy] <value> [1, 2]
> [sequentially.delay.bufferBy] <value> [3, 4, 5]
> [sequentially.delay.bufferBy] <value> [6, 7, 8]
> [sequentially.delay.bufferBy] <end>
foo:     ------1----2----3----4----5----6----7----8 X
bar:     --------------1--------------2X

result:  --------------•--------------•------------•X
                  [1, 2]      [3, 4, 5]    [6, 7, 8]

bufferWhileByobs.bufferWhileBy(otherObs, [options])
Similar to bufferWhile, but instead of using a predicate function it uses another observable. On each value from obs observable: if the last value from otherObs was truthy, adds the new value to the buffer, otherwise flushes the buffer (with the new value included).

If options.flushOnEnd is false, the buffer won't be flushed when the main observable ends.

If options.flushOnChange is true, the buffer will be also flushed each time the controlling observable emits false.

The result observable will emit [] in cases, when the buffer supposed to be flushed, but it's empty.

The default options are {flushOnEnd: true, flushOnChange: false}.

var foo = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6, 7, 8]);
var bar = Kefir.sequentially(200, [false, true, false]).delay(40);
var result = foo.bufferWhileBy(bar);
result.log();
> [sequentially.bufferWhileBy] <value> [1, 2, 3]
> [sequentially.bufferWhileBy] <value> [4]
> [sequentially.bufferWhileBy] <value> [5, 6, 7]
> [sequentially.bufferWhileBy] <value> [8]
> [sequentially.bufferWhileBy] <end>
foo:     ----1----2----3----4----5----6----7----8X
bar:     -----------f---------t---------fX

result:  --------------•----•--------------•----•X
               [1, 2, 3]  [4]      [5, 6, 7]  [8]

Interoperation with other async abstractions

fromPromiseKefir.fromPromise(promise)
Converts a Promise to a Kefir Property. Uses promise.then(onFulfilled, onRejected) interface to subscribe to the promise. Also calls promise.done() (if there is such methods) to prevent libraries like Q or when from swallowing exceptions.

var result = Kefir.fromPromise(new Promise(fulfill => fulfill(1)));
result.log();
> [fromPromise] <value> 1
> [fromPromise] <end>
result:  ----1X

toPromiseobs.toPromise([PromiseConstructor])
Converts an Kefir Observable to a Promise. If called without arguments the default gloabal.Promise constructor is used, alternatively you can pass a promise constructor that supports following interface: new Promise((resolve, reject) => { ... }). The promise will be fulfilled or rejected at the moment source observable ends, with the latest value or error. If observable ends without any value or error the promise will never be fulfilled/rejected.

var promise = Kefir.sequentially(1000, [1, 2]).toPromise();
promise.then(x => {
  console.log('fulfilled with:', x);
});
> fulfilled with: 2

fromESObservableKefir.fromESObservable(observable)
Converts an ECMAScript Observable to a Kefir Stream.

var result = Kefir.fromESObservable(new Observable(observer => {
  observer.next(1);
  observer.next(2);
  observer.complete();
}));
result.log();
> [fromESObservable] <value> 1
> [fromESObservable] <value> 2
> [fromESObservable] <end>
result:  12X

toESObservableobs.toESObservable()obs[Symbol.observable]()Converts an Kefir Observable to an ECMAScript Observable.

Also available as obs[Symbol.observable], so you can use ES Observable's from method with Kefir Observables e.g., Observable.from(Kefir.sequentially(1000, [1, 2))].

var observable = Kefir.sequentially(1000, [1, 2]).toESObservable();
observable.subscribe({
  next(x) {
    console.log('value:', x);
  },
  complete() {
    console.log('completed');
  }
});
> value: 1
> value: 2
> completed

Static LandKefir.staticLand.Observable
Provides Static Land compatibility. The Observable type object supports following algebras: Semigroup, Monoid, Functor, Bifunctor, Apply, Applicative, Chain, Monad.

var Observable = Kefir.staticLand.Observable;
var obs = Observable.map(x => x * 3, Observable.of(2));
obs.log();
> [constant.map] <value:current> 6
> [constant.map] <end:current>

Activation and deactivation of observables

At the moment one create an observable it's not yet subscribed to its source. Observables subscribe to their sources only when they themselves get a first subscriber. In this docs this process is called activation of an observable. Also when the last subscriber is removed from an observable, the observable deactivates and unsubscribes from its source. Later it can be activated again, and so on.

The source to which observable subscribe on activation may be an another observable (for example in .map), several other observables (.combine), or some external source (.fromEvents).

For example stream = Kefir.fromEvents(el, 'click') won't immediately subscribe to the 'click' event on el, it will subscribe only when the first listener will be added to the stream. And it will automatically unsubscribe when the last listener will be removed from the stream.

var stream = Kefir.fromEvents(el, 'click');
// at this moment event listener to _el_ not added

stream.onValue(someFn);
// now 'click' listener is added to _el_

stream.offValue(someFn);
// and now it is removed again

As you might already guess activation and deactivation propagates up the observables chain. For instance if one create a long chain like Kefir.fromEvents(...).map(...).filter(...).take(...), the whole chain will be inactive until first subscriber is added, and then it will activate up to .fromEvents. Same for deactivation.

Emitter

Emitter is an object that has four methods for emitting events. It is used in several places in Kefir as a proxy to emit events from some observable.

emitter.value(123);
emitter.error('Oh, snap!');
emitter.end();

All emitter methods are bound to their context, and can be passed as callbacks safely without binding:

// instead of this
el.addEventListener('click', emitter.value.bind(emitter));

// you can do just this
el.addEventListener('click', emitter.value);

There also exist legacy aliases to emitter methods:

Errors

Kefir supports an additional channel to pass data through observables — errors. Unlike values, errors normally just flow through the observable chain without any transformation. Consider this example:

var foo = Kefir.stream(emitter => {
  emitter.emit(0);
  emitter.emit(2);
  emitter.error(-1);
  emitter.emit(3);
  emitter.end();
});

var bar = foo.map(x => x + 2).filter(x => x > 3);
bar.log();

> [stream.map.filter] <value> 4
> [stream.map.filter] <error> -1
> [stream.map.filter] <value> 5
> [stream.map.filter] <end>
foo: ---0---2---e---3---X
                -1

bar: -------4---e---5---X
                -1

As you can see values are being mapped and filtered, but errors just flow unchanged. Also notice that observable doesn't end on an error by default, but you can use the takeErrors method to make it happen. Consider a slight change to the above example:

var foo = Kefir.stream(emitter => {
  emitter.emit(0);
  emitter.emit(2);
  emitter.error(-1);
  emitter.emit(3);
  emitter.end();
});

var bar = foo.map(x => x + 2).filter(x => x > 3);
bar.takeErrors(1).log();
> [stream.map.filter.takeErrors] <value:current> 4
> [stream.map.filter.takeErrors] <error:current> -1
> [stream.map.filter.takeErrors] <end:current>
foo: ---0---2---e---3---X
                -1

bar: -------4---eX
                -1

Current values/errors in streams

Normally in Kefir only Properties have current values, but depending on how we define "current value" we might say that Streams also may have them.

Let's see how we get current values from Properties first. There is no direct access to the current value of a property, we can't do something like prop.getCurrent(). Instead we subscribe to the property using onValue(callback) for example, and our callback gets called immediately with the current value.

If we define "current value" through that technical detail of getting it: current value is the value that we get in the callback immediately after subscribing. Then we can say that sometimes Streams also may have current values.

Let see some examples.

// This is the most straightforward way for creating such a stream
var s1 = Kefir.stream(emitter => {
  emitter.emit(1);
});

// But it can be created accidentally or intentionally in some other cases, such as
var s2 = Kefir.merge([Kefir.constant(1), Kefir.never()]);
var s3 = Kefir.combine([Kefir.constant(1), Kefir.constant(1)]);

This feature of Streams has its cons and pros.

Cons

Only first subscriber gets that value. Even if it was an onEnd or onError subscriber, it'll still "consume" the "current value". Let's see it in an example:

var stream = Kefir.stream(emitter => {
  emitter.emit(1);
});
stream.onValue(x => console.log('first', x)); // logs "first 1"
stream.onValue(x => console.log('second', x)); // won't log


// Even with onError it will be consumed
var stream2 = ...
stream.onError(fn);
stream.onValue(x => console.log('second', x)); // won't log

This is fixed in Properties as they remember the last value, and call any new subscriber with it.

Another issue is that it's not very good from semantics point of view. The moment when an event happens in a stream depends on the moment when the stream gets the first subscriber. That makes subscribing not a pure operation, and whole system gets less declarative and functional.

Pros

This feature allows you to define current value in a stream. Well it sounds just like the definition of the feature, but let's just look at an example, hopefully it'll help you understand what I mean:

var scrollTopStream = Kefir.stream(emitter => {
  emitter.emit(window.scrollY); // here we are emitting the current value!
  window.addEventListener('scroll', () => {
    emitter.emit(window.scrollY);
  });
});

// Let's now convert it to a property like good citizens
var scrollTopProperty = scrollTopStream.toProperty();

Another benefit is that it makes possible to not lose current values when converting properties to streams and then back to properties. For example, combine always returns a stream (why?) but it'll still emit the current value. So one can do Kefir.combine([p1, p2, fn).toProperty()], and get a property combined from two other properties with the correct current value.

P.S.

Note that all this applies to errors as well.

Also it's a good practice to convert all streams that might emit current values to properties by using the toProperty method. That should make your code more reliable as all subscribers will get current values. And it's just better semantically as current values should live in properties.