Home Manual Reference Source Repository

src/stream.js

/** Requires Immutable */
import Immutable from 'immutable';

/**
 * A Stream holds the canonical state of a part of an application, and pushes changes to that state
 * to any observers that are subscribed to the Stream.
 *
 */
class Stream {

  /**
   * constructor - When creating a new stream, an initial state is not required, but it is
   * encouraged. If an initial state is passed in, it should be an Immutable data structure.
   *
   * @param  {Immutable} initialState this is the state that the stream will start with.
   * @return {Stream} Instance of a Stream.
   */
  constructor (initialState) {
    if(initialState && !Immutable.Iterable.isIterable(initialState)) {
      throw new TypeError('initialState must be immutable');
    }

    /**
     * @access private
     */
    this._state = initialState;

    /**
     * @access private
     */
    this.observers = Immutable.List();

    /**
     * @access private
     */
    this.type = 'SOURCE';
  }


  /**
   * logStack - Adds a stack to automatically add state changes.
   *
   * @param  {string} name  The name of this stream to be viewed in stack logs.
   * @param  {Stack} stack The stack to add state to.
   */
  logStack (name, stack) {

    /**
     * @access private
     */
    this.name = name;

    /**
     * @access private
     */
    this.stack = stack;
  }
  /**
   * subscribe - takes a callback function that is executed any time the stream received _new_
   * state. The callback should accept `state` as an argument. If the stream currently has state,
   * it will be passed to the callback immediately upon subscription.
   *
   * An observer gets added to an `observers` array. State changes will propagate to any observers
   * in this array.
   *
   * It will return an object (we'll call a _subscriber_) that has an `unsubscribe` method. This
   * method will remove the observer from the `observers` List and should be called anytime the
   * observer is done listening to the stream. Failing to do so could cause memory leaks.
   *
   * @access public
   * @param  {function} observer - the callback function to be executed whenever state updates
   * @return {object}            - a _subscriber_ object with an `unsubscribe` method
   */
  subscribe (observer) {
    this.observers = this.observers.push(observer);
    if(this._state) { observer(this._state); }
    return {
      unsubscribe: () => {
        const idx = this.observers.indexOf(observer);
        if (idx >= 0) {
          this.observers = this.observers.delete(this.observers.indexOf(observer));
        } else {
          throw new RangeError('Observer is not subscribed to this stream.');
        }
      }
    };
  }

  /**
   * sendState - Gets called whenever the Stream receives _new_ state. `sendState` first updates
   * the Streams internal `_state` and then each observer in the `observers` List will be called
   * with the new state as its argument.
   *
   * @access private
   * @param  {Immutable} newState the Stream's new state.
   */
  sendState (newState) {
      this._state = newState;
      if (this.stack) { this.stack.pushState(this.name, this._state, this.type); }
      this.observers.forEach(observer => observer(this._state));
  }

  /**
   * next - Receives new state (externally, ideally through a transformer) and verifies that the
   * new state is different from the Stream's current state before passing on to `sendState` (and,
   * all the observers, by extension).
   *
   * @access public
   * @param  {Immutable} newState
   */
  next (newState) {
    if(!Immutable.is(this._state, newState)){
      this.sendState(newState);
    }
  }

  /**
   * forceState - called when hydrating state from another source.
   *
   * @access public
   * @param  {Immutable.Map} state new state.
   */
  forceState (state) {
    this._state = state;
  }

  /**
   * get state - a getter for returning the current internal state of the Stream.
   *
   * @access public
   * @return {Immutable} - current state contained by the Stream
   */
  get state(){
    return this._state;
  }

  /**
   * set state - a setter that's probably more clever than it needs to be. Instead of setting
   * the internal state, it calls `next`.
   *
   * ```javascript
   * stream.next(newState)
   * // is functionally the same as
   * stream.state = newState
   * ```
   *
   * Like I said, maybe too clever, but I thinks allow for a more delarative style when
   * writing transformers.
   *
   * @access public
   * @param  {type} newState description
   * @return {type}          description
   */
  set state(newState){
    this.next(newState);
  }
}

/** @ignore export the Stream class */
export default Stream;