@mvuijs/core / rx / MulticastStream
MulticastStream<T> #
A
Stream
that is multicast. A normal Stream is unicast in the sense that
each time you call .subscribe
, a new stream is created and torn down upon completion
or after calling .unsubscribe
. Additionally, a MulticastStream also allows you to
.next
a new value into the stream from “outside”.
Example #
// multicast: simple
// ----------------------------------------------------------------------
const mc = new MulticastStream<number>();
mc.subscribe(v => console.log(`Observer A: ${v}`));
mc.subscribe(v => console.log(`Observer B: ${v}`));
mc.next(1); mc.next(2);
// Logs:
// Observer A: 1
// Observer B: 1
// Observer A: 2
// Observer B: 2
// Each new value is seen by *both observers*, that is why it is
// called multicast.
// multicast: using a MulticastStream as an Observer
// ----------------------------------------------------------------------
const obs = new Stream<number>(observer => {
observer.next(1); observer.next(2);
});
const mc = new MulticastStream<number>();
mc.subscribe(v => console.log(`Observer A: ${v}`));
mc.subscribe(v => console.log(`Observer B: ${v}`));
obs.subscribe(mc);
// Same log output as above
Type parameters #
T
Hierarchy #
-
Stream
<T
>.MulticastStream
Implements #
-
Observer
<T
>
Constructors #
constructor() #
Signature #
new MulticastStream<T>(): MulticastStream<T>;
Type parameters #
T
Returns #
MulticastStream
<T
>
Overrides: Stream . constructor
Defined in: packages/core/src/rx/multicast-stream.ts:53
Properties #
completed #
Protected
boolean
=false
Defined in: packages/core/src/rx/multicast-stream.ts:50
observers #
Protected
Observer
<T
>[] =[]
Defined in: packages/core/src/rx/multicast-stream.ts:51
Methods #
observable #
Signature #
observable: object;
Returns #
object
Member | Type |
---|---|
subscribe |
(observer :
ObserverDefinition
<T >) => { unsubscribe : () => void ; } |
Inherited from: Stream . [observable]
Defined in: packages/core/src/rx/stream.ts:191
complete() #
Completing a MulticastStream just means clearing all its subscriptions.
Signature #
complete(): void;
Returns #
void
Implementation of: Observer.complete
Defined in: packages/core/src/rx/multicast-stream.ts:68
error() #
Signature #
error(err: any): void;
Parameters #
Name | Type |
---|---|
err |
any |
Returns #
void
Implementation of: Observer.error
Defined in: packages/core/src/rx/multicast-stream.ts:75
filter() #
Shorthand for .pipe(rx.filter(...))
Signature #
filter(filter: Function): Stream<T>;
Parameters #
Name | Type |
---|---|
filter |
(value : T ) => boolean |
Returns #
Stream
<T
>
Inherited from: Stream . filter
Defined in: packages/core/src/rx/stream.ts:187
if() #
Shorthand for .pipe(rx.if(...))
Signature #
if<TrueT>(this: Stream<boolean>, def: TrueT): Stream<undefined | TrueT>;
Type parameters #
TrueT
Parameters #
Name | Type |
---|---|
this |
Stream
<boolean > |
def |
TrueT |
Returns #
Stream
<undefined
| TrueT
>
Defined in: packages/core/src/rx/stream.ts:182
ifelse() #
Shorthand for .pipe(rx.ifelse(...))
Signature #
ifelse<TrueT, FalseT>(this: Stream<boolean>, def: object): Stream<TrueT | FalseT>;
Type parameters #
TrueT
FalseT
Parameters #
Name | Type |
---|---|
this |
Stream
<boolean > |
def |
object |
def.else |
FalseT |
def.if |
TrueT |
Returns #
Stream
<TrueT
| FalseT
>
Inherited from: Stream . ifelse
Defined in: packages/core/src/rx/stream.ts:174
map() #
Shorthand for .pipe(rx.map(...))
Signature #
map<ReturnT>(mapper: Function): Stream<ReturnT>;
Type parameters #
ReturnT
Parameters #
Name | Type |
---|---|
mapper |
(value : T ) => ReturnT |
Returns #
Stream
<ReturnT
>
Defined in: packages/core/src/rx/stream.ts:169
next() #
Trigger all subscriptions with the given value
Signature #
next(value: T): void;
Parameters #
Name | Type |
---|---|
value |
T |
Returns #
void
Implementation of: Observer.next
Defined in: packages/core/src/rx/multicast-stream.ts:81
pipe() #
Pipe this Stream through a series of operators. Operator functions can be looked up in the rx namespace.
Example #
rx.of([0, 1, 2, 3]).pipe(
rx.map(n => n + 1),
rx.filter(n => n !== 2),
).subscribe(console.log);
// prints 1, 3, 4
Signature #
pipe<A, B>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>): Stream<B>;
Type parameters #
A
B
Parameters #
Name | Type |
---|---|
op1 |
OperatorFunction
<T , A > |
op2 |
OperatorFunction
<A , B > |
Returns #
Stream
<B
>
Defined in: packages/core/src/rx/stream.ts:109
subscribe() #
‘Subscribe’ to this Stream with a function. Whenever a new value is emitted (that
is, the next
function passed to the subscriber in the constructor is called),
observer
will be called with the new value.
Returns a ‘unsubscribe’ function that you may want to store to later be able to
unsubscribe. Note that if a Stream does not complete, not unsubscribing is a
memory leak.
Signature #
subscribe(observer: ObserverDefinition<T>): Function;
Parameters #
Name | Type |
---|---|
observer |
ObserverDefinition
<T > |
Returns #
Function
‘Subscribe’ to this Stream with a function. Whenever a new value is emitted (that
is, the next
function passed to the subscriber in the constructor is called),
observer
will be called with the new value.
Returns a ‘unsubscribe’ function that you may want to store to later be able to
unsubscribe. Note that if a Stream does not complete, not unsubscribing is a
memory leak.
Signature #
(): void;
Returns #
void
Inherited from: Stream . subscribe
Defined in: packages/core/src/rx/stream.ts:49
then() #
You can await
the last value of a Stream. This of course requires the Stream to
complete. If no value was emitted, an
EmptyError
will be thrown.
This is useful for consuming a Stream based API in a Promise based environment. For example, mvui’s HTTP client can be used like this:
const response = await http.get<number>('/my-route');
// response will be of type number
Signature #
then(callback: Function): void;
Parameters #
Name | Type |
---|---|
callback |
(value : T ) => any |
Returns #
void
Defined in: packages/core/src/rx/stream.ts:217