Stream

@mvuijs/core / rx / Stream

Stream<T> #

A potentially asynchronous series of values which can be subscribed to for reactive programming.

Type parameters #

  • T

Hierarchy #

Implements #

Constructors #

constructor() #

Signature #

new Stream<T>(definition: Function): Stream<T>;

Type parameters #

  • T

Parameters #

Name Type
definition (observer: Observer <T>) => TeardownLogic

Returns #

Stream <T>

Defined in: packages/core/src/rx/stream.ts:23

Methods #

observable #

Signature #

observable: object;

Returns #

object

Member Type
subscribe (observer: ObserverDefinition <T>) => { unsubscribe: () => void; }

Defined in: packages/core/src/rx/stream.ts:191

filter() #

Shorthand for .pipe(rx.filter(...))

Signature #

filter(filter: Function): Stream<T>;

Parameters #

Name Type
filter (value: T) => boolean

Returns #

Stream <T>

Defined in: packages/core/src/rx/stream.ts:187

if() #

Shorthand for .pipe(rx.if(...))

Signature #

if<TrueT>(this: Stream<boolean>, def: TrueT): Stream<undefined | TrueT>;

Type parameters #

  • TrueT

Parameters #

Name Type
this Stream <boolean>
def TrueT

Returns #

Stream <undefined | TrueT>

Defined in: packages/core/src/rx/stream.ts:182

ifelse() #

Shorthand for .pipe(rx.ifelse(...))

Signature #

ifelse<TrueT, FalseT>(this: Stream<boolean>, def: object): Stream<TrueT | FalseT>;

Type parameters #

  • TrueT
  • FalseT

Parameters #

Name Type
this Stream <boolean>
def object
def.else FalseT
def.if TrueT

Returns #

Stream <TrueT | FalseT>

Defined in: packages/core/src/rx/stream.ts:174

map() #

Shorthand for .pipe(rx.map(...))

Signature #

map<ReturnT>(mapper: Function): Stream<ReturnT>;

Type parameters #

  • ReturnT

Parameters #

Name Type
mapper (value: T) => ReturnT

Returns #

Stream <ReturnT>

Defined in: packages/core/src/rx/stream.ts:169

pipe() #

Pipe this Stream through a series of operators. Operator functions can be looked up in the rx namespace.

Example #

rx.of([0, 1, 2, 3]).pipe(
  rx.map(n => n + 1),
  rx.filter(n => n !== 2),
).subscribe(console.log);
// prints 1, 3, 4

Signature #

pipe<A, B>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>): Stream<B>;

Type parameters #

  • A
  • B

Parameters #

Name Type
op1 OperatorFunction <T, A>
op2 OperatorFunction <A, B>

Returns #

Stream <B>

Defined in: packages/core/src/rx/stream.ts:109

subscribe() #

‘Subscribe’ to this Stream with a function. Whenever a new value is emitted (that is, the next function passed to the subscriber in the constructor is called), observer will be called with the new value. Returns a ‘unsubscribe’ function that you may want to store to later be able to unsubscribe. Note that if a Stream does not complete, not unsubscribing is a memory leak.

Signature #

subscribe(observer: ObserverDefinition<T>): Function;

Parameters #

Name Type
observer ObserverDefinition <T>

Returns #

Function

‘Subscribe’ to this Stream with a function. Whenever a new value is emitted (that is, the next function passed to the subscriber in the constructor is called), observer will be called with the new value. Returns a ‘unsubscribe’ function that you may want to store to later be able to unsubscribe. Note that if a Stream does not complete, not unsubscribing is a memory leak.

Signature #
(): void;
Returns #

void

Implementation of: Subscribable . subscribe

Defined in: packages/core/src/rx/stream.ts:49

then() #

You can await the last value of a Stream. This of course requires the Stream to complete. If no value was emitted, an EmptyError will be thrown.

This is useful for consuming a Stream based API in a Promise based environment. For example, mvui’s HTTP client can be used like this:

const response = await http.get<number>('/my-route');
// response will be of type number

Signature #

then(callback: Function): void;

Parameters #

Name Type
callback (value: T) => any

Returns #

void

Defined in: packages/core/src/rx/stream.ts:217