StreamLink providers

From Platform 8, you can register a StreamLink session as an active data provider to Liberator.

StreamLink providers offer a subset of the functionality of DataSource providers. They are simple to write and are an ideal introduction to publishing your data via Liberator.

Since: StreamLink 8.0.2, Liberator 8.0.2

Overview

To write a StreamLink provider, there are three things you need to do:

  1. Configure Liberator permissioning to allow your provider to publish data to a subject and other StreamLink clients to read data from the subject (see Requirements below).

  2. Implement a JsonProvider or RecordProvider for your subject. StreamLink providers use Caplin’s active subscription model: Liberator tells them when to start and stop publishing data for a subject.

    1. Implement the provider’s onRequest handler. The Request event is triggered when Liberator asks your provider to start publishing data for a subject. The event indicates that Liberator has received a subscription request for a subject that it is not currently hosting, that Liberator has created a data object to host the subject, and that Liberator now needs your provider to start sending data for the subject.

    2. As part of your implementation of onRequest, start a process to publish regular updates to Liberator. The execution will be timer-based if you are polling or generating data, or event-based if you can subscribe to an activity feed.

    3. As part of your implementation of onRequest, register a discard handler. The Discard event is triggered when Liberator asks your provider to stop sending data. The event indicates the subject has no subscribers and Liberator has discarded the data object for the subject. Use this event to clean up any resources such as timers and subscriptions to backend activity feeds. See JsonPublisher.setDiscardHandler and RecordPublisher.setDiscardHandler.

  3. Register your provider. Call StreamLink.registerJsonProvider or StreamLink.registerRecordProvider to register your JsonProvider or RecordProvider as a provider of a subject pattern. For example, a StreamLink provider of FX pricing data might register a provider for subject pattern /FX/*.

If you’re already familiar with DataSource providers, you’ll notice two immediate differences:

  • StreamLink clients authenticate as a Liberator user. The ability of a StreamLink provider to publish data is governed by Liberator’s auth module.

  • StreamLink clients become providers via a simple StreamLink API call.

There are other subtle differences to DataSource providers. For a detailed comparison, see Comparison with DataSource providers.

Requirements

When creating a Caplin Platform deployment that uses SteamLink data provision, ensure the following requirements are met:

  • Liberator permissioning:

    • A StreamLink provider must authenticate as a user who is permitted to contribute (write) to the subjects the provider provides.

    • StreamLink users that subscribe to subjects provided by StreamLink providers must be permitted to subscribe to (read) those subjects, just as they must with any other subject they subscribe to.

  • Liberator data services:

    • No data service must provide a subject that is also provided by a StreamLink session. If both a data service and a StreamLink session can service a subscription request, then Liberator ignores the StreamLink provider in favour of the data service.

    • At least one data service must be configured in Liberator in order to disable Liberator’s default data service, which matches all subjects and thus has precedence over all StreamLink providers.

Comparison with DataSource providers

Subjects provided by SteamLink sessions differ from subjects provided by DataSource applications in the following ways:

Declaration

StreamLink: Declared at runtime by StreamLink API call, after the StreamLink client has connected to Liberator. See StreamLink.registerRecordProvider and StreamLink.registerJsonProvider.

DataSource: Declared in advance by data service configuration, either in the Liberator (a static data service) or in providing DataSource applications (a dynamic data-service).

Supported data types

StreamLink: Records (Type 1), JSON objects

DataSource: All DataSource data types

Permissioning

StreamLink: The providing StreamLink session requires permission from the Liberator auth module to contribute (write) to the subject in order to both register as a provider of the subject and contribute data to it.

DataSource: DataSource applications do not require permission from the Liberator auth module to provide data.

StreamLink contributions

StreamLink: Once one StreamLink session has subscribed to a StreamLink provided subject, any StreamLink session permitted to contribute to the subject may write to it and trigger an update to all subscribers to the subject. When using a StreamLink session to publish backend data, you will likely want to grant only the backend user permission to contribute (write) to the subject.

DataSource: Contributions sent to a subject provided by a data service do not update the subject’s data object in Liberator; instead, the contributions are forwarded to the providing data service peer.

Precedence

All StreamLink provided subjects have a lower precedence than data service provided subjects. A StreamLink provided subject can never take the place of a subject provided by a data service.

Because data services have precedence over StreamLink providers, Liberator’s default data service must be disabled in order for StreamLink providers to work. Liberator’s default data service matches all subjects and thus has precedence over all StreamLink providers.

RTTP traffic

StreamLink and Liberator communicate using Caplin’s Real-time Text Protocol (RTTP). A new RTTP action, PROVIDE, has been added to support StreamLink data provision, and you will see this action in your StreamLink logs.

The diagram below illustrates example RTTP traffic when a StreamLink client requests a subject provided by another StreamLink client:

Caplin PlatformStreamLink Client A(the subscriber)StreamLink Client A(the subscriber)LiberatorLiberatorStreamLink Client B(the provider)StreamLink Client B(the provider)Client B registers as a provider of subjects matching glob/FX/*streamLink.registerRecordProvider("/FX/*",recordProvider)PROVIDE /FX/*Client A subscribes to subject /FX/EURUSDstreamLink.subscribe("/FX/EURUSD", ... )REQUEST /FX/EURUSDREQUEST /FX/EURUSDrecordProvider.onRequest(recordPublisher) {     recordPublisher.setDiscardHandler(recordDiscardHandler);    recordPublisher.send( ... ); }CONTRIB /FX/EURUSD {parameters}UPDATE /FX/EURUSDperiodic updatesrecordPublisher.send( ... )CONTRIB /FX/EURUSD {parameters}UPDATE /FX/EURUSDClient A discards subject /FX/EURUSDstreamLinkSubscription.unsubscribe()DISCARD /FX/EURUSDwait fordiscard timeoutDISCARD /FX/EURUSDrecordDiscardHandler.onDiscard( ... )
RTTP traffic for a subject served by a StreamLink provider

When Liberator receives a subscription request for a StreamLink provided subject, it first checks whether a data service provides data for the subject. If no matching data service is found, then Liberator checks if any StreamLink sessions provide data for the subject. If a matching StreamLink session (or sessions) is found, then Liberator creates a local data object and sends a request to the providing StreamLink session(s) to begin contributing data to the object.

The data object created by Liberator is not managed by a data service and has more in common with a local object created by the Liberator config item add-object.

Access to the data object is permissioned. StreamLink users permitted to subscribe to the object can read it. StreamLink users permitted to contribute to the object can change its data, triggering updates to all StreamLink sessions subscribed to the object. When using a Streamlink application to publish backend data, you will likely want to grant only the backend user permission to contribute data to the object.

Requirements:

  • StreamLink JS

  • Liberator

    • Users permissioned to request the example subject patterns /FX/* and /JSON/FX/*.

    • At least one data service defined.

    • No data services must provide /FX/* and /JSON/FX/*. If they do, StreamLink providers for these subjects are ignored by Liberator.

The quickest way to run a Liberator for this example is in a Docker container, with the OpenPermissioning and LiberatorDemoDataSource blades activated:

Starting a Liberator for this example
$ docker login docker-release.caplin.com

$ docker run --rm -p 18080:18080 -it docker-release.caplin.com/platform/core:8.0.4 /bin/bash

bash-5.1$ cd DeploymentFramework

bash-5.1$ ./dfw deactivate PermissioningService

bash-5.1$ ./dfw activate OpenPermissioning LiberatorDemoDataSource

bash-5.1$ ./dfw start Liberator

To shutdown Liberator and exit the container, run the commands below:

bash-5.1$ ./dfw stop

bash-5.1$ exit

Node.js example

The following example illustrates how to register StreamLink providers in a Node.js application.

This example requires Node.js 20+.

Follow the steps below:

  1. Start Liberator in a container (see instructions above).

  2. Download the StreamLinkTS 8.0.2 or later from Caplin Downloads.

  3. Create a new directory, and unzip the StreamLinkTS 8.0.2 ZIP file to it:

    node-provider-exampleStreamLinkTS-8.0.2-1193-39b2496StreamLinkTS-8.0.2-1193-39b2496.zip
  4. In the directory StreamLinkTS-8.0.2-1193-39b2496/node_module/@caplin/streamlink, run the command below to install StreamLink dependencies:

    npm install
  5. Create a file node-provider-example.mjs in the location below and with the following content:

    node-provider-exampleStreamLinkTS-8.0.2-1193-39b2496StreamLinkTS-8.0.2-1193-39b2496.zip node-provider-example.mjs
    node-provider-example.mjs
    import {
        LogLevel,
        StreamLinkFactory
    } from "./StreamLinkTS-8.0.2-1193-39b2496/npm-module/@caplin/streamlink/index-node.js"
    
    var jsonHandler = {
        parse: function (jsonString) {
        },
        patch: function (existingObject, jsonPatchString) {
        },
        format: function (obj) {
            return JSON.stringify(obj, null, "\t");
        }
    };
    const streamlink = StreamLinkFactory.create({
        liberator_urls: ["rttp://localhost:18080"],
        username: "admin",
        password: "admin",
        application_id: "NodeProvider",
        // @ts-ignore
        json_handler: jsonHandler
    });
    const requests = new Map();
    const commandListener = {
        onCommandError(subject, commandErrorEvent) {
            console.log("onCommandError ", subject, commandErrorEvent);
        }
    };
    streamlink.getLogger().addListener({
        onLog: function (logInfo) {
            console.log(logInfo.getDate(), logInfo.getLevel().toString(), logInfo.getMessage());
        }
    }, LogLevel.FINEST);
    streamlink.connect();
    
    streamlink.registerRecordProvider("/FX/*", {
        onRequest(publisher) {
            const subject = publisher.getSubject();
            console.log("Provider: onRequest " + subject);
            publisher.setDiscardHandler({
                onDiscard: (publisher) => {
                    const subject = publisher.getSubject();
                    console.log("Provider: onDiscard " + subject);
                    const timer = requests.get(subject);
                    if (timer !== null) {
                        clearInterval(timer);
                        requests.delete(subject);
                    }
                }
            });
            const quotation = 1 + Math.random();
            const timer = setInterval(() => {
                let jitter = (Math.random() / 100);
                publisher.send({
                        "TIME": String(Date.now()),
                        "Bid": (quotation + jitter - 0.0002).toFixed(5),
                        "Ask": (quotation + jitter + 0.0002).toFixed(5)
                    }, false, commandListener);
            }, 1000);
            requests.set(subject, timer);
        }
    }, commandListener);
    
    streamlink.registerJsonProvider("/JSON/FX/*", {
        onRequest(publisher) {
            const subject = publisher.getSubject();
            console.log("Provider: onRequest " + subject);
            publisher.setDiscardHandler({
                onDiscard: (publisher) => {
                    const subject = publisher.getSubject();
                    console.log("Provider: onDiscard " + subject);
                    const timer = requests.get(subject);
                    if (timer !== null) {
                        clearInterval(timer);
                        requests.delete(subject);
                    }
                }
            });
            const quotation = 1 + Math.random();
            const timer = setInterval(() => {
                let jitter = (Math.random() / 100);
                publisher.send({
                        "TIME": String(Date.now()),
                        "Bid": (quotation + jitter - 0.0002).toFixed(5),
                        "Ask": (quotation + jitter + 0.0002).toFixed(5)
                    }, commandListener);
            }, 1000);
            requests.set(subject, timer);
        }
    }, commandListener);
  6. Run the command below to start the StreamLink client:

    node node-provider-example.mjs
  7. Open Liberator Explorer: http://localhost:18080/diagnostics/liberatorexplorer_react/index.html (user=admin, password=admin)

  8. In Liberator Explorer, subscribe to /FX/EURUSD

    streamlink provider record
  9. In Liberator Explorer, subscribe to /JSON/FX/EURUSD

    streamlink provider json

Web browser example

The following example illustrates how to register StreamLink providers from a web page.

Follow the steps below:

  1. Start Liberator in a container (see instructions above).

  2. Create an HTML file with the contents below:

    <html>
        <head>
            <script src="http://localhost:18080/sljs/streamlink.js"></script>
            <script src="https://cdnjs.cloudflare.com/ajax/libs/fast-json-patch/2.0.7/fast-json-patch.min.js"></script>
            <script src="https://cdn.jsdelivr.net/npm/immer@1.12.1/dist/immer.umd.js"></script>
    
            <script>
                var jsonHandler = {
                    parse: function (jsonString) {
                    },
                    patch: function (existingObject, jsonPatchString) {
                    },
                    format: function (obj) {
                        return JSON.stringify(obj, null, "\t");
                    }
                };
    
                const streamlink = caplin.StreamLinkFactory.create({
                    liberator_urls: ["rttp://localhost:18080"],
                    username: "admin",
                    password: "admin",
                    application_id: "WebProvider",
                    json_handler: jsonHandler
                });
    
                const requests = new Map();
                const commandListener = {
                    onCommandError(subject, commandErrorEvent) {
                        console.log("onCommandError ", subject, commandErrorEvent);
                    }
                };
    
                streamlink.getLogger().addListener({
                    onLog: function (logInfo) {
                        console.log(logInfo.getDate(), logInfo.getLevel().toString(), logInfo.getMessage());
                    }
                }, caplin.LogLevel.FINEST);
    
                streamlink.connect();
    
                streamlink.registerRecordProvider("/FX/*", {
                    onRequest(publisher) {
                        const subject = publisher.getSubject();
                        console.log("Provider: onRequest " + subject);
                        publisher.setDiscardHandler({
                            onDiscard: (publisher) => {
                                const subject = publisher.getSubject();
                                console.log("Provider: onDiscard " + subject);
                                const timer = requests.get(subject);
                                if (timer !== null) {
                                    clearInterval(timer);
                                    requests.delete(subject);
                                }
                            }
                        });
                        const quotation = 1 + Math.random();
                        const timer = setInterval(() => {
                            let jitter = (Math.random() / 100);
                            publisher.send({
                                    "TIME": String(Date.now()),
                                    "Bid": (quotation + jitter - 0.0002).toFixed(5),
                                    "Ask": (quotation + jitter + 0.0002).toFixed(5)
                                }, false, commandListener);
                        }, 1000);
                        requests.set(subject, timer);
                    }
                }, commandListener);
    
                streamlink.registerJsonProvider("/JSON/FX/*", {
                    onRequest(publisher) {
                        const subject = publisher.getSubject();
                        console.log("Provider: onRequest " + subject);
                        publisher.setDiscardHandler({
                            onDiscard: (publisher) => {
                                const subject = publisher.getSubject();
                                console.log("Provider: onDiscard " + subject);
                                const timer = requests.get(subject);
                                if (timer !== null) {
                                    clearInterval(timer);
                                    requests.delete(subject);
                                }
                            }
                        });
                        const quotation = 1 + Math.random();
                        const timer = setInterval(() => {
                            let jitter = (Math.random() / 100);
                            publisher.send({
                                    "TIME": String(Date.now()),
                                    "Bid": (quotation + jitter - 0.0002).toFixed(5),
                                    "Ask": (quotation + jitter + 0.0002).toFixed(5)
                                }, commandListener);
                        }, 1000);
                        requests.set(subject, timer);
                    }
                }, commandListener);
            </script>
        </head>
    
        <body onunload="streamLink.disconnect();">
    
        </body>
    </html>
  3. Open the example file in a web browser, and press F12 to open the JavaScript console and view StreamLink’s logging output.

  4. Open Liberator Explorer: http://localhost:18080/diagnostics/liberatorexplorer_react/index.html (user=admin, password=admin)

  5. In Liberator Explorer, subscribe to /FX/EURUSD

    streamlink provider record
  6. In Liberator Explorer, subscribe to /JSON/FX/EURUSD

    streamlink provider json