MulticastStream

@mvuijs/core / rx / MulticastStream

MulticastStream<T> #

A Stream that is multicast. A normal Stream is unicast in the sense that each time you call .subscribe, a new stream is created and torn down upon completion or after calling .unsubscribe. Additionally, a MulticastStream also allows you to .next a new value into the stream from “outside”.

Example #

// multicast: simple
// ----------------------------------------------------------------------

const mc = new MulticastStream<number>();

mc.subscribe(v => console.log(`Observer A: ${v}`));
mc.subscribe(v => console.log(`Observer B: ${v}`));

mc.next(1); mc.next(2);

// Logs:
// Observer A: 1
// Observer B: 1
// Observer A: 2
// Observer B: 2

// Each new value is seen by *both observers*, that is why it is
// called multicast.

// multicast: using a MulticastStream as an Observer
// ----------------------------------------------------------------------

const obs = new Stream<number>(observer => {
  observer.next(1); observer.next(2);
});

const mc = new MulticastStream<number>();

mc.subscribe(v => console.log(`Observer A: ${v}`));
mc.subscribe(v => console.log(`Observer B: ${v}`));

obs.subscribe(mc);

// Same log output as above

Type parameters #

  • T

Hierarchy #

Implements #

Constructors #

constructor() #

Signature #

new MulticastStream<T>(): MulticastStream<T>;

Type parameters #

  • T

Returns #

MulticastStream <T>

Overrides: Stream . constructor

Defined in: packages/core/src/rx/multicast-stream.ts:53

Properties #

completed #

Protected boolean = false

Defined in: packages/core/src/rx/multicast-stream.ts:50

observers #

Protected Observer <T>[] = []

Defined in: packages/core/src/rx/multicast-stream.ts:51

Methods #

observable #

Signature #

observable: object;

Returns #

object

Member Type
subscribe (observer: ObserverDefinition <T>) => { unsubscribe: () => void; }

Inherited from: Stream . [observable]

Defined in: packages/core/src/rx/stream.ts:191

complete() #

Completing a MulticastStream just means clearing all its subscriptions.

Signature #

complete(): void;

Returns #

void

Implementation of: Observer.complete

Defined in: packages/core/src/rx/multicast-stream.ts:68

error() #

Signature #

error(err: any): void;

Parameters #

Name Type
err any

Returns #

void

Implementation of: Observer.error

Defined in: packages/core/src/rx/multicast-stream.ts:75

filter() #

Shorthand for .pipe(rx.filter(...))

Signature #

filter(filter: Function): Stream<T>;

Parameters #

Name Type
filter (value: T) => boolean

Returns #

Stream <T>

Inherited from: Stream . filter

Defined in: packages/core/src/rx/stream.ts:187

if() #

Shorthand for .pipe(rx.if(...))

Signature #

if<TrueT>(this: Stream<boolean>, def: TrueT): Stream<undefined | TrueT>;

Type parameters #

  • TrueT

Parameters #

Name Type
this Stream <boolean>
def TrueT

Returns #

Stream <undefined | TrueT>

Inherited from: Stream . if

Defined in: packages/core/src/rx/stream.ts:182

ifelse() #

Shorthand for .pipe(rx.ifelse(...))

Signature #

ifelse<TrueT, FalseT>(this: Stream<boolean>, def: object): Stream<TrueT | FalseT>;

Type parameters #

  • TrueT
  • FalseT

Parameters #

Name Type
this Stream <boolean>
def object
def.else FalseT
def.if TrueT

Returns #

Stream <TrueT | FalseT>

Inherited from: Stream . ifelse

Defined in: packages/core/src/rx/stream.ts:174

map() #

Shorthand for .pipe(rx.map(...))

Signature #

map<ReturnT>(mapper: Function): Stream<ReturnT>;

Type parameters #

  • ReturnT

Parameters #

Name Type
mapper (value: T) => ReturnT

Returns #

Stream <ReturnT>

Inherited from: Stream . map

Defined in: packages/core/src/rx/stream.ts:169

next() #

Trigger all subscriptions with the given value

Signature #

next(value: T): void;

Parameters #

Name Type
value T

Returns #

void

Implementation of: Observer.next

Defined in: packages/core/src/rx/multicast-stream.ts:81

pipe() #

Pipe this Stream through a series of operators. Operator functions can be looked up in the rx namespace.

Example #

rx.of([0, 1, 2, 3]).pipe(
  rx.map(n => n + 1),
  rx.filter(n => n !== 2),
).subscribe(console.log);
// prints 1, 3, 4

Signature #

pipe<A, B>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>): Stream<B>;

Type parameters #

  • A
  • B

Parameters #

Name Type
op1 OperatorFunction <T, A>
op2 OperatorFunction <A, B>

Returns #

Stream <B>

Inherited from: Stream . pipe

Defined in: packages/core/src/rx/stream.ts:109

subscribe() #

‘Subscribe’ to this Stream with a function. Whenever a new value is emitted (that is, the next function passed to the subscriber in the constructor is called), observer will be called with the new value. Returns a ‘unsubscribe’ function that you may want to store to later be able to unsubscribe. Note that if a Stream does not complete, not unsubscribing is a memory leak.

Signature #

subscribe(observer: ObserverDefinition<T>): Function;

Parameters #

Name Type
observer ObserverDefinition <T>

Returns #

Function

‘Subscribe’ to this Stream with a function. Whenever a new value is emitted (that is, the next function passed to the subscriber in the constructor is called), observer will be called with the new value. Returns a ‘unsubscribe’ function that you may want to store to later be able to unsubscribe. Note that if a Stream does not complete, not unsubscribing is a memory leak.

Signature #
(): void;
Returns #

void

Inherited from: Stream . subscribe

Defined in: packages/core/src/rx/stream.ts:49

then() #

You can await the last value of a Stream. This of course requires the Stream to complete. If no value was emitted, an EmptyError will be thrown.

This is useful for consuming a Stream based API in a Promise based environment. For example, mvui’s HTTP client can be used like this:

const response = await http.get<number>('/my-route');
// response will be of type number

Signature #

then(callback: Function): void;

Parameters #

Name Type
callback (value: T) => any

Returns #

void

Inherited from: Stream . then

Defined in: packages/core/src/rx/stream.ts:217