Skip to main

Sink

Sink

A Sink is what a Source subscribes to. All events emitted by the source will be passed to the sink that has been given to the source.

The shape of a Sink is a function which takes an Event and distributes it to the onEvent function as described below. Sinks also implement the Disposable construct, and has all of the methods of the Disposable type, and can be treated as a Disposable. This is important as the way to stop a sink (and also unsubscribe the sources subscribed to the sink) is to dispose the sink itself. This also means that the way to cleanup or cancel some process when a Sink stops taking values (for example, when creating a custom Source), is to add a child to the Sink (as done on a disposable), which will then be called when the Sink is disposed. An example of this cancellation ability is say, cancelling a http request.

The sink constructor takes an onEvent function, which is called every time an event is received. When a sink is disposed, it will stop taking values and will ignore all subsequent events received. As well as this, when the sink is active and when the event received is a ThrowEvent or an EndEvent, then the sink will be disposed and then after this the onEvent function will called with the given event. If the onEvent function throws upon receiving an event, then the sink will immediately be disposed and the error will be thrown asynchronously in a setTimeout with delay zero.

See Also

Sink - Function

Signature - source.ts#L293

function Sink<T>(onEvent: (event: Event<T>) => void): Sink<T>

Parameters

ParameterTypeDescription
onEvent
(event: Event<T>) => void

The callback for when an event is received.

Returns

TypeDescription
Sink<T>

The created Sink.

Example Usage

// In this example the source emits values 0..49.
const source = range(50);
const sink = Sink(event => {
    console.log(event);
    if (event.value === 3) {
        sink.dispose();
    }
})
// Subscribe the source to the sink so that it produces values and sends them
// to the sink.
source(sink);
// Logs:
// Push(0)
// Push(1)
// Push(2)
// Push(3)

Example Usage

// Passing a sink a throw event.
const sink = Sink<number>(event => {
    console.log(event);
});
sink(Push(1));
sink(Throw(new Error('some error was caught')));
sink(Push(4)); // Ignored.
sink(End); // Ignored.
sink(Throw(...)); // Ignored.
// Logs:
// { type: PushType, value: 1 }
// { type: ThrowType, error: Error }

Sink - Interface

Signature - source.ts#L243

interface Sink<T> extends Disposable {
    (event: Event<T>): void
}