The core implementation of Reactive Streams (with ES Observable API).


0.1.26 years ago6 years agoMinified + gzip package size for rsjs in KB


Ligth implementation of reactive streams (with ES Observables API).

API: es-observable (has stage 1 in active ECMAScript proposals). It is very common API, therefore, can be used to implement both cold and hot and always shared streams that often used for UI (more information about the different streams API: why-we-built-xstream).

In the sources used class names RS (reactive stream) instead of Observable and Control instead of SubscriptionObserver (in order to avoid confusion observer and Observable, Subscription and SubscriptionObserver), but it does not matter, because only RS class is public, and it can be exported under any name.

Only used ES5 syntax, but requires Promise (or polyfill) to forEach method.


var RS = require('rsjs');

/** One function define stream, just like generator. */
function subscriber(control) {
  /* this === undefined */
  /* control send data to stream. */; /* yield value; */
  control.error(value); /* throw value; */
  control.complete(value); /* return value; */

  return function unsubscriber() {
    /* this === undefined */

var stream = new RS(subscriber);

/** @return Promise */
stream.forEach(function(value) {/* this === observer */});

/* observer get data from stream. */
var observer = {
  next: function(value) {/* this === observer */},
  error: function(value) {/* this === observer */},
  complete: function(value) {/* this === observer */}

var subscription = stream.subscribe(observer);

subscription.isUnsubscribed === false;


subscription.isUnsubscribed === true;

Methods stream.of and stream.from are not yet implemented.


Standalone page test/test.html (es-observable-tests).



If you find any bugs or have a feature request, please open an issue on github!

The npm package download data comes from npm's download counts api and package details come from