Skip to main

Source

Source

A Source is a function which can be subscribed to with a Sink. This construct is the basis of all reactive programming done with this library. Sources are by default essentially a lazy push-style stream/observable which will produce new values every subscription. The "lazy" part can be thought of as follows:

function mySource(): void {
    return (sink: Sink<mySourceType>) => {
        const producer = Producer();
        // ...add subscriber to producer.
        producer.produce();
    }
}

Compared to a less lazy and more eager implementation:

function mySource(): void {
    const producer = Producer();
    producer.produce();
    return (sink: Sink<mySourceType>) => {
         // ...add subscriber to producer.
    }
}

The shape of a Source is a function which takes a Sink, which will be passed to the "produce" function given at the creation of the Source whose job is to fill up the Sink with values. When the Source is subscribed to, this produce function is called with the sink given to the source. The given produce function should stop trying to emit values to the subscribed sink when the subscribed sink is disposed, and should stop/cleanup any ongoing side processes.

If the given (subscribed) sink is disposed (meaning it will not take any more values), then the given produce function will never be called and the sink will just be ignored. On the other hand, if the sink is active, then the given produce function will be called with the sink as the only parameter.

However, if the given produce function throws an error during initial execution, the error will be passed to the sink if it is active at the time of throwing (it might not be active in the case where it is disposed inside the given produce function, and then after this the given produce function throws), then the error will be passed to the sink as a Throw event, otherwise is will be asynchronously reported through a setTimeout with delay zero, similar to how Promises don't synchronously throw errors during construction. Because of this error handling behavior, it is always a good practice to wrap any functions called asynchronously after subscription in a try/catch, then to pass the error on in a Throw event to the subscribed sink which can then handle it.

The implementation for Source is very basic, and can roughly be thought of as follows:

const Source = produce => sink => {
    if (sink.active) {
        try { produce(sink) }
        catch (error) {
            if (sink.active) sink(Throw(error));
            else setTimeout(() => { throw error })
        }
    }
}

Source - Function

Signature - source.ts#L506

function Source<T>(produce: (sink: Sink<T>) => void): Source<T>

Parameters

ParameterTypeDescription
produce
(sink: Sink<T>) => void

This will be called with the given sink each subscription. When the sink is disposed this function should stop trying to emit values, and should stop/cleanup any ongoing side processes

Returns

TypeDescription
Source<T>

The created Source.

Example Usage

// Creating a Source which synchronously produces values 0..50
const source = Source(sink => {
    // Note: It is guaranteed (at the start of execution of this function
    // at least) that the sink here is active.
    for (let i = 0; i <= 50 && sink.active; i++) {
        sink(Push(i));
    }
    // Even if the above loop breaks, and the sink is no longer active, it
    // will just ignore this End event, meaning there is no need to check
    // whether the sink is active for distributing singular events like this
    // at the end of execution.
    sink(End);
});
source(Sink(console.log));
source(Sink(console.log));
// Logs:
// Push(0)
// Push(1)
// ...
// Push(50)
// End
// Push(0)
// Push(1)
// ...
// Push(50)
// End

Example Usage

// Creating a Source that maps an external api into a reactive one.
import { MyExternalSubscriptionToken, myExternalApi } from './myExternalApi';
const source = Source(sink => {
    let subscriptionToken: { v: MyExternalSubscriptionToken } | undefined;
    sink.add(() => {
        if (subscriptionToken) {
             myExternalApi.cancel(subscriptionToken);
        }
    })
    // In this example myExternalApi may throw.
    try {
        subscriptionToken = myExternalApi.request((value, error) => {
            if (error) {
                sink(Throw(error));
                return;
            }
            sink(Push(value));
            sink(End);
        });
    } catch (error) {
        sink(Throw(error));
    }
});

See Also

Source - Interface

Signature - source.ts#L427

interface Source<T> {
    (sink: Sink<T>): void
}