src/funnel.js
import Immutable from 'immutable';
import Stream from './stream';
/**
* A Funnel is a special type of Stream that combines the states from feeder
* streams.
* @extends {Stream}
*/
class Funnel extends Stream {
/**
* A Funnel is a special type of Stream that combines the states from feeder
* streams. Once all of the feeder streams have sent state, the Funnel will send the combined
* state to any observers. When creating the Funnel, the sourceMap describes the shape of
* the combined state. The key for each stream will be the key used to access the state of
* that stream on the combined state object.
*
* @access public
* @param {Immutable.Map} sourceMap - a Map of the feeder streams.
* @return {Funnel} returns an instance of the Funnel stream.
*/
constructor (sourceMap) {
if(!sourceMap){
throw new Error('You must supply a source map to a funnel');
}
if(!Immutable.Iterable.isKeyed(sourceMap)){
throw new Error('You must supply an Immutable.Map as a source map to a funnel');
}
super(Immutable.Map());
/**
* @access private
*/
this.primed = false;
/**
* @access private
*/
this.sourceMap = sourceMap;
/**
* @access private
*/
this.sourceKeys = sourceMap.keySeq();
/**
* @access private
*/
this.subscribers = sourceMap.map((sourceStream, key) => {
if(sourceStream.type === 'VIEW') {
throw new Error('Funnels cannot subscribe to Views. At key ' + key);
}
return sourceStream.subscribe(newState => {
/**
* @ignore
*/
this.state = this.state.set(key, newState);
});
});
/**
* @access private
* @override
*/
this.type = 'FUNNEL';
}
/**
* sendState - overrides Stream's `sendState` by enforcing that all feeder streams have
* sent state before sending state on to observers. Once a Funnel is `primed` it will pass
* state on, regardless if one stream sends `undefined`.
*
* @access private
* @param {Immutable} newState - new state (received internally from Stream's `next` method)
* @override
*/
sendState (newState) {
if(this.primed){
super.sendState(newState);
} else {
if (this.sourceKeys.every(source => newState.get(source))) {
this.primed = true;
super.sendState(newState);
} else {
this._state = newState;
}
}
}
/**
* subscribe - Overwrites base Stream class's subscribe method.
* Funnels should not emit state to subscribers until all source
* streams have defined state.
*
* @param {function} observer the obeserver function to listen to state changes.
* @return {object} a subscriber object
*/
subscribe (observer) {
/**
* @ignore
*/
this.observers = this.observers.push(observer);
if(this.primed) { observer(this._state); }
return {
unsubscribe: () => {
const idx = this.observers.indexOf(observer);
if (idx >= 0) {
this.observers = this.observers.delete(this.observers.indexOf(observer));
} else {
throw new RangeError('Observer is not subscribed to this stream.');
}
}
};
}
/**
* forceState - Funnels can accept new state an send state back up
* the subscription chain. This should only be used in cases of
* restoring state.
*
* @access public
* @param {Immutable} state The new state to hydrate the funnel.
* @override
*/
forceState (state) {
this.sourceMap.forEach((sourceStream, key) => {
sourceStream.forceState(state.get(key));
});
}
/**
* destroy - cleans up a Funnel's feeder subscribers.
*
* @access public
*/
destroy () {
this.subscribers.forEach(subscriber => subscriber.unsubscribe());
}
}
/** @ignore Export the Funnel class. */
export default Funnel;