@mvuijs/core / rx / MulticastStream
MulticastStream<T> #
A
Stream
that is multicast. A normal Stream is unicast in the sense that
each time you call .subscribe, a new stream is created and torn down upon completion
or after calling .unsubscribe. Additionally, a MulticastStream also allows you to
.next a new value into the stream from “outside”.
Example #
// multicast: simple
// ----------------------------------------------------------------------
const mc = new MulticastStream<number>();
mc.subscribe(v => console.log(`Observer A: ${v}`));
mc.subscribe(v => console.log(`Observer B: ${v}`));
mc.next(1); mc.next(2);
// Logs:
// Observer A: 1
// Observer B: 1
// Observer A: 2
// Observer B: 2
// Each new value is seen by *both observers*, that is why it is
// called multicast.
// multicast: using a MulticastStream as an Observer
// ----------------------------------------------------------------------
const obs = new Stream<number>(observer => {
observer.next(1); observer.next(2);
});
const mc = new MulticastStream<number>();
mc.subscribe(v => console.log(`Observer A: ${v}`));
mc.subscribe(v => console.log(`Observer B: ${v}`));
obs.subscribe(mc);
// Same log output as above
Type parameters #
T
Hierarchy #
-
Stream<T>.MulticastStream
Implements #
-
Observer<T>
Constructors #
constructor() #
Signature #
new MulticastStream<T>(): MulticastStream<T>;
Type parameters #
T
Returns #
MulticastStream
<T>
Overrides: Stream . constructor
Defined in: packages/core/src/rx/multicast-stream.ts:53
Properties #
completed #
Protectedboolean=false
Defined in: packages/core/src/rx/multicast-stream.ts:50
observers #
ProtectedObserver<T>[] =[]
Defined in: packages/core/src/rx/multicast-stream.ts:51
Methods #
observable #
Signature #
observable: object;
Returns #
object
| Member | Type |
|---|---|
subscribe |
(observer:
ObserverDefinition
<T>) => { unsubscribe: () => void; } |
Inherited from: Stream . [observable]
Defined in: packages/core/src/rx/stream.ts:191
complete() #
Completing a MulticastStream just means clearing all its subscriptions.
Signature #
complete(): void;
Returns #
void
Implementation of: Observer.complete
Defined in: packages/core/src/rx/multicast-stream.ts:68
error() #
Signature #
error(err: any): void;
Parameters #
| Name | Type |
|---|---|
err |
any |
Returns #
void
Implementation of: Observer.error
Defined in: packages/core/src/rx/multicast-stream.ts:75
filter() #
Shorthand for .pipe(rx.filter(...))
Signature #
filter(filter: Function): Stream<T>;
Parameters #
| Name | Type |
|---|---|
filter |
(value: T) => boolean |
Returns #
Stream
<T>
Inherited from: Stream . filter
Defined in: packages/core/src/rx/stream.ts:187
if() #
Shorthand for .pipe(rx.if(...))
Signature #
if<TrueT>(this: Stream<boolean>, def: TrueT): Stream<undefined | TrueT>;
Type parameters #
TrueT
Parameters #
| Name | Type |
|---|---|
this |
Stream
<boolean> |
def |
TrueT |
Returns #
Stream
<undefined | TrueT>
Defined in: packages/core/src/rx/stream.ts:182
ifelse() #
Shorthand for .pipe(rx.ifelse(...))
Signature #
ifelse<TrueT, FalseT>(this: Stream<boolean>, def: object): Stream<TrueT | FalseT>;
Type parameters #
TrueTFalseT
Parameters #
| Name | Type |
|---|---|
this |
Stream
<boolean> |
def |
object |
def.else |
FalseT |
def.if |
TrueT |
Returns #
Stream
<TrueT | FalseT>
Inherited from: Stream . ifelse
Defined in: packages/core/src/rx/stream.ts:174
map() #
Shorthand for .pipe(rx.map(...))
Signature #
map<ReturnT>(mapper: Function): Stream<ReturnT>;
Type parameters #
ReturnT
Parameters #
| Name | Type |
|---|---|
mapper |
(value: T) => ReturnT |
Returns #
Stream
<ReturnT>
Defined in: packages/core/src/rx/stream.ts:169
next() #
Trigger all subscriptions with the given value
Signature #
next(value: T): void;
Parameters #
| Name | Type |
|---|---|
value |
T |
Returns #
void
Implementation of: Observer.next
Defined in: packages/core/src/rx/multicast-stream.ts:81
pipe() #
Pipe this Stream through a series of operators. Operator functions can be looked up in the rx namespace.
Example #
rx.of([0, 1, 2, 3]).pipe(
rx.map(n => n + 1),
rx.filter(n => n !== 2),
).subscribe(console.log);
// prints 1, 3, 4
Signature #
pipe<A, B>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>): Stream<B>;
Type parameters #
AB
Parameters #
| Name | Type |
|---|---|
op1 |
OperatorFunction
<T, A> |
op2 |
OperatorFunction
<A, B> |
Returns #
Stream
<B>
Defined in: packages/core/src/rx/stream.ts:109
subscribe() #
‘Subscribe’ to this Stream with a function. Whenever a new value is emitted (that
is, the next function passed to the subscriber in the constructor is called),
observer will be called with the new value.
Returns a ‘unsubscribe’ function that you may want to store to later be able to
unsubscribe. Note that if a Stream does not complete, not unsubscribing is a
memory leak.
Signature #
subscribe(observer: ObserverDefinition<T>): Function;
Parameters #
| Name | Type |
|---|---|
observer |
ObserverDefinition
<T> |
Returns #
Function
‘Subscribe’ to this Stream with a function. Whenever a new value is emitted (that
is, the next function passed to the subscriber in the constructor is called),
observer will be called with the new value.
Returns a ‘unsubscribe’ function that you may want to store to later be able to
unsubscribe. Note that if a Stream does not complete, not unsubscribing is a
memory leak.
Signature #
(): void;
Returns #
void
Inherited from: Stream . subscribe
Defined in: packages/core/src/rx/stream.ts:49
then() #
You can await the last value of a Stream. This of course requires the Stream to
complete. If no value was emitted, an
EmptyError
will be thrown.
This is useful for consuming a Stream based API in a Promise based environment. For example, mvui’s HTTP client can be used like this:
const response = await http.get<number>('/my-route');
// response will be of type number
Signature #
then(callback: Function): void;
Parameters #
| Name | Type |
|---|---|
callback |
(value: T) => any |
Returns #
void
Defined in: packages/core/src/rx/stream.ts:217