StreamLink providers
From Platform 8, you can register a StreamLink session as an active data provider to Liberator.
StreamLink providers offer a subset of the functionality of DataSource providers. They are simple to write and are an ideal introduction to publishing your data via Liberator.
Since: StreamLink 8.0.2, Liberator 8.0.2
Overview
To write a StreamLink provider, there are three things you need to do:
-
Configure Liberator permissioning to allow your provider to publish data to a subject and other StreamLink clients to read data from the subject (see Requirements below).
-
Implement a JsonProvider or RecordProvider for your subject. StreamLink providers use Caplin’s active subscription model: Liberator tells them when to start and stop publishing data for a subject.
-
Implement the provider’s
onRequest
handler. The Request event is triggered when Liberator asks your provider to start publishing data for a subject. The event indicates that Liberator has received a subscription request for a subject that it is not currently hosting, that Liberator has created a data object to host the subject, and that Liberator now needs your provider to start sending data for the subject. -
As part of your implementation of
onRequest
, start a process to publish regular updates to Liberator. The execution will be timer-based if you are polling or generating data, or event-based if you can subscribe to an activity feed. -
As part of your implementation of
onRequest
, register a discard handler. The Discard event is triggered when Liberator asks your provider to stop sending data. The event indicates the subject has no subscribers and Liberator has discarded the data object for the subject. Use this event to clean up any resources such as timers and subscriptions to backend activity feeds. See JsonPublisher.setDiscardHandler and RecordPublisher.setDiscardHandler.
-
-
Register your provider. Call StreamLink.registerJsonProvider or StreamLink.registerRecordProvider to register your JsonProvider or RecordProvider as a provider of a subject pattern. For example, a StreamLink provider of FX pricing data might register a provider for subject pattern
/FX/*
.
If you’re already familiar with DataSource providers, you’ll notice two immediate differences:
-
StreamLink clients authenticate as a Liberator user. The ability of a StreamLink provider to publish data is governed by Liberator’s auth module.
-
StreamLink clients become providers via a simple StreamLink API call.
There are other subtle differences to DataSource providers. For a detailed comparison, see Comparison with DataSource providers.
Requirements
When creating a Caplin Platform deployment that uses SteamLink data provision, ensure the following requirements are met:
-
Liberator permissioning:
-
A StreamLink provider must authenticate as a user who is permitted to contribute (write) to the subjects the provider provides.
-
StreamLink users that subscribe to subjects provided by StreamLink providers must be permitted to subscribe to (read) those subjects, just as they must with any other subject they subscribe to.
-
-
Liberator data services:
-
No data service must provide a subject that is also provided by a StreamLink session. If both a data service and a StreamLink session can service a subscription request, then Liberator ignores the StreamLink provider in favour of the data service.
-
At least one data service must be configured in Liberator in order to disable Liberator’s default data service, which matches all subjects and thus has precedence over all StreamLink providers.
-
Comparison with DataSource providers
Subjects provided by SteamLink sessions differ from subjects provided by DataSource applications in the following ways:
- Declaration
-
StreamLink: Declared at runtime by StreamLink API call, after the StreamLink client has connected to Liberator. See
StreamLink.registerRecordProvider
andStreamLink.registerJsonProvider
.DataSource: Declared in advance by data service configuration, either in the Liberator (a static data service) or in providing DataSource applications (a dynamic data-service).
- Supported data types
-
StreamLink: Records (Type 1), JSON objects
DataSource: All DataSource data types
- Permissioning
-
StreamLink: The providing StreamLink session requires permission from the Liberator auth module to contribute (write) to the subject in order to both register as a provider of the subject and contribute data to it.
DataSource: DataSource applications do not require permission from the Liberator auth module to provide data.
- StreamLink contributions
-
StreamLink: Once one StreamLink session has subscribed to a StreamLink provided subject, any StreamLink session permitted to contribute to the subject may write to it and trigger an update to all subscribers to the subject. When using a StreamLink session to publish backend data, you will likely want to grant only the backend user permission to contribute (write) to the subject.
DataSource: Contributions sent to a subject provided by a data service do not update the subject’s data object in Liberator; instead, the contributions are forwarded to the providing data service peer.
- Precedence
-
All StreamLink provided subjects have a lower precedence than data service provided subjects. A StreamLink provided subject can never take the place of a subject provided by a data service.
Because data services have precedence over StreamLink providers, Liberator’s default data service must be disabled in order for StreamLink providers to work. Liberator’s default data service matches all subjects and thus has precedence over all StreamLink providers.
RTTP traffic
StreamLink and Liberator communicate using Caplin’s Real-time Text Protocol (RTTP). A new RTTP action, PROVIDE
, has been added to support StreamLink data provision, and you will see this action in your StreamLink logs.
The diagram below illustrates example RTTP traffic when a StreamLink client requests a subject provided by another StreamLink client:
When Liberator receives a subscription request for a StreamLink provided subject, it first checks whether a data service provides data for the subject. If no matching data service is found, then Liberator checks if any StreamLink sessions provide data for the subject. If a matching StreamLink session (or sessions) is found, then Liberator creates a local data object and sends a request to the providing StreamLink session(s) to begin contributing data to the object.
The data object created by Liberator is not managed by a data service and has more in common with a local object created by the Liberator config item add-object
.
Access to the data object is permissioned. StreamLink users permitted to subscribe to the object can read it. StreamLink users permitted to contribute to the object can change its data, triggering updates to all StreamLink sessions subscribed to the object. When using a Streamlink application to publish backend data, you will likely want to grant only the backend user permission to contribute data to the object.
StreamLink JavaScript examples
Requirements:
-
StreamLink JS
-
Liberator
-
Users permissioned to request the example subject patterns
/FX/*
and/JSON/FX/*
. -
At least one data service defined.
-
No data services must provide
/FX/*
and/JSON/FX/*
. If they do, StreamLink providers for these subjects are ignored by Liberator.
-
The quickest way to run a Liberator for this example is in a Docker container, with the OpenPermissioning and LiberatorDemoDataSource blades activated:
$ docker login docker-release.caplin.com $ docker run --rm -p 18080:18080 -it docker-release.caplin.com/platform/core:8.0.4 /bin/bash bash-5.1$ cd DeploymentFramework bash-5.1$ ./dfw deactivate PermissioningService bash-5.1$ ./dfw activate OpenPermissioning LiberatorDemoDataSource bash-5.1$ ./dfw start Liberator
To shutdown Liberator and exit the container, run the commands below:
bash-5.1$ ./dfw stop bash-5.1$ exit
Node.js example
The following example illustrates how to register StreamLink providers in a Node.js application.
This example requires Node.js 20+.
Follow the steps below:
-
Start Liberator in a container (see instructions above).
-
Download the StreamLinkTS 8.0.2 or later from Caplin Downloads.
-
Create a new directory, and unzip the StreamLinkTS 8.0.2 ZIP file to it:
-
In the directory
StreamLinkTS-8.0.2-1193-39b2496/node_module/@caplin/streamlink
, run the command below to install StreamLink dependencies:npm install
-
Create a file
node-provider-example.mjs
in the location below and with the following content:node-provider-example.mjsimport { LogLevel, StreamLinkFactory } from "./StreamLinkTS-8.0.2-1193-39b2496/npm-module/@caplin/streamlink/index-node.js" var jsonHandler = { parse: function (jsonString) { }, patch: function (existingObject, jsonPatchString) { }, format: function (obj) { return JSON.stringify(obj, null, "\t"); } }; const streamlink = StreamLinkFactory.create({ liberator_urls: ["rttp://localhost:18080"], username: "admin", password: "admin", application_id: "NodeProvider", // @ts-ignore json_handler: jsonHandler }); const requests = new Map(); const commandListener = { onCommandError(subject, commandErrorEvent) { console.log("onCommandError ", subject, commandErrorEvent); } }; streamlink.getLogger().addListener({ onLog: function (logInfo) { console.log(logInfo.getDate(), logInfo.getLevel().toString(), logInfo.getMessage()); } }, LogLevel.FINEST); streamlink.connect(); streamlink.registerRecordProvider("/FX/*", { onRequest(publisher) { const subject = publisher.getSubject(); console.log("Provider: onRequest " + subject); publisher.setDiscardHandler({ onDiscard: (publisher) => { const subject = publisher.getSubject(); console.log("Provider: onDiscard " + subject); const timer = requests.get(subject); if (timer !== null) { clearInterval(timer); requests.delete(subject); } } }); const quotation = 1 + Math.random(); const timer = setInterval(() => { let jitter = (Math.random() / 100); publisher.send({ "TIME": String(Date.now()), "Bid": (quotation + jitter - 0.0002).toFixed(5), "Ask": (quotation + jitter + 0.0002).toFixed(5) }, false, commandListener); }, 1000); requests.set(subject, timer); } }, commandListener); streamlink.registerJsonProvider("/JSON/FX/*", { onRequest(publisher) { const subject = publisher.getSubject(); console.log("Provider: onRequest " + subject); publisher.setDiscardHandler({ onDiscard: (publisher) => { const subject = publisher.getSubject(); console.log("Provider: onDiscard " + subject); const timer = requests.get(subject); if (timer !== null) { clearInterval(timer); requests.delete(subject); } } }); const quotation = 1 + Math.random(); const timer = setInterval(() => { let jitter = (Math.random() / 100); publisher.send({ "TIME": String(Date.now()), "Bid": (quotation + jitter - 0.0002).toFixed(5), "Ask": (quotation + jitter + 0.0002).toFixed(5) }, commandListener); }, 1000); requests.set(subject, timer); } }, commandListener);
-
Run the command below to start the StreamLink client:
node node-provider-example.mjs
-
Open Liberator Explorer: http://localhost:18080/diagnostics/liberatorexplorer_react/index.html (user=admin, password=admin)
-
In Liberator Explorer, subscribe to /FX/EURUSD
-
In Liberator Explorer, subscribe to /JSON/FX/EURUSD
Web browser example
The following example illustrates how to register StreamLink providers from a web page.
Follow the steps below:
-
Start Liberator in a container (see instructions above).
-
Create an HTML file with the contents below:
<html> <head> <script src="http://localhost:18080/sljs/streamlink.js"></script> <script src="https://cdnjs.cloudflare.com/ajax/libs/fast-json-patch/2.0.7/fast-json-patch.min.js"></script> <script src="https://cdn.jsdelivr.net/npm/immer@1.12.1/dist/immer.umd.js"></script> <script> var jsonHandler = { parse: function (jsonString) { }, patch: function (existingObject, jsonPatchString) { }, format: function (obj) { return JSON.stringify(obj, null, "\t"); } }; const streamlink = caplin.StreamLinkFactory.create({ liberator_urls: ["rttp://localhost:18080"], username: "admin", password: "admin", application_id: "WebProvider", json_handler: jsonHandler }); const requests = new Map(); const commandListener = { onCommandError(subject, commandErrorEvent) { console.log("onCommandError ", subject, commandErrorEvent); } }; streamlink.getLogger().addListener({ onLog: function (logInfo) { console.log(logInfo.getDate(), logInfo.getLevel().toString(), logInfo.getMessage()); } }, caplin.LogLevel.FINEST); streamlink.connect(); streamlink.registerRecordProvider("/FX/*", { onRequest(publisher) { const subject = publisher.getSubject(); console.log("Provider: onRequest " + subject); publisher.setDiscardHandler({ onDiscard: (publisher) => { const subject = publisher.getSubject(); console.log("Provider: onDiscard " + subject); const timer = requests.get(subject); if (timer !== null) { clearInterval(timer); requests.delete(subject); } } }); const quotation = 1 + Math.random(); const timer = setInterval(() => { let jitter = (Math.random() / 100); publisher.send({ "TIME": String(Date.now()), "Bid": (quotation + jitter - 0.0002).toFixed(5), "Ask": (quotation + jitter + 0.0002).toFixed(5) }, false, commandListener); }, 1000); requests.set(subject, timer); } }, commandListener); streamlink.registerJsonProvider("/JSON/FX/*", { onRequest(publisher) { const subject = publisher.getSubject(); console.log("Provider: onRequest " + subject); publisher.setDiscardHandler({ onDiscard: (publisher) => { const subject = publisher.getSubject(); console.log("Provider: onDiscard " + subject); const timer = requests.get(subject); if (timer !== null) { clearInterval(timer); requests.delete(subject); } } }); const quotation = 1 + Math.random(); const timer = setInterval(() => { let jitter = (Math.random() / 100); publisher.send({ "TIME": String(Date.now()), "Bid": (quotation + jitter - 0.0002).toFixed(5), "Ask": (quotation + jitter + 0.0002).toFixed(5) }, commandListener); }, 1000); requests.set(subject, timer); } }, commandListener); </script> </head> <body onunload="streamLink.disconnect();"> </body> </html>
-
Open the example file in a web browser, and press F12 to open the JavaScript console and view StreamLink’s logging output.
-
Open Liberator Explorer: http://localhost:18080/diagnostics/liberatorexplorer_react/index.html (user=admin, password=admin)
-
In Liberator Explorer, subscribe to /FX/EURUSD
-
In Liberator Explorer, subscribe to /JSON/FX/EURUSD