Publishing a JSON subject from a Java DataSource
The page provides an overview of how to publish a subject of type JSON from a Java DataSource.
The instructions on this page assume you already know how to create a Java DataSource project. For more information on creating a Java DataSource, see the Caplin Platform Developer Tutorial 1.
Requirements
Minimum version of Caplin DataSource for Java: 7.1.9
Third-party dependencies
DataSource for Java does not include a JSON object mapper. Instead, it abstracts JSON operations to an interface, JSONHandler
, and includes two reference implementations that use third-party JSON object mappers: JacksonJsonHandler
and GsonJsonHandler
.
Choose an implementation of JSONHandler
and configure your dependencies accordingly:
Project dependencies for JacksonJsonHandler
Example Gradle configuration for a project using FasterXML’s Jackson Databind for JSON object mapping.
repositories {
mavenCentral()
maven {
url "https://repository.caplin.com"
credentials {
username "<username>"
password "<password>"
}
}
}
dependencies {
implementation 'com.caplin.platform.integration.java:datasource:7.1.21'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2'
implementation 'com.github.java-json-tools:json-patch:1.13'
}
repositories {
mavenCentral()
}
dependencies {
implementation fileTree(dir: 'lib', include: '*.jar') (1)
implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2'
implementation 'com.github.java-json-tools:json-patch:1.13'
}
1 | Copy Caplin’s datasource-7.1.n-jar-with-dependencies.jar to the lib directory in your Gradle project. |
Project dependencies for GsonJsonHandler
Example Gradle configuration for a project using Google’s GSON for JSON object mapping.
repositories {
mavenCentral()
maven {
url "https://repository.caplin.com"
credentials {
username "<username>"
password "<password>"
}
}
}
dependencies {
implementation 'com.caplin.platform.integration.java:datasource:7.1.21'
implementation 'com.google.code.gson:gson:2.10.1'
implementation 'com.tananaev:json-patch:1.2'
}
repositories {
mavenCentral()
}
dependencies {
implementation fileTree(dir: 'lib', include: '*.jar') (1)
implementation 'com.google.code.gson:gson:2.10.1'
implementation 'com.tananaev:json-patch:1.2'
}
1 | Copy Caplin’s datasource-7.1.n-jar-with-dependencies.jar to the lib directory in your Gradle project. |
Configuration
DataSource for Java is not configured to use a specific JSONHandler
implementation by default. Before you can publish JSON messages, configure DataSource for Java to use your preferred JsonHandler
implementation:
Configuration for JacksonJsonHandler
To use JacksonJsonHandler
as the JSON object mapper, choose one of the methods below:
-
Setting the
JsonHandler
implementation in your adapter’s datasource.conf configuration file:json-handler-class com.caplin.datasource.messaging.json.JacksonJsonHandler
-
Setting the
JsonHandler
implementation in your adapter’s code:dataSource.getExtraConfiguration().setJsonHandler( new com.caplin.datasource.messaging.json.JacksonJsonHandler());
Configuration for GsonJsonHandler
To use GsonJsonHandler
as the JSON object mapper, choose one of the methods below:
-
Setting the
JsonHandler
implementation in your adapter’s datasource.conf configuration file:json-handler-class com.caplin.datasource.messaging.json.GsonJsonHandler
-
Setting the
JsonHandler
implementation in your adapter’s code:dataSource.getExtraConfiguration().setJsonHandler( new com.caplin.datasource.messaging.json.GsonJsonHandler());
Overview
JSON messages are published via a CachingPublisher
, which caches the last published value for each subject it serves. The CachingPublisher
uses its cache to serve new requests for subjects it has already served, and, in the case of JSON data, to determine if it is more efficient to publish an update as a JSON patch or in full.
You provide data for JSON subjects by writing a CachingDataProvider
, which you register with a DataSource
instance using DataSource.createCachingPublisher(Namespace, CachingDataProvider)
. This call returns a CachingPublisher
, which you must make available to your CachingDataProvider
in order for it to publish JsonMessage
objects.
When a DataSource receives a JSON subject request, if the CachingPublisher
for the subject namespace does not have a cached object for the subject, then the CachingPublisher
routes the request to its associated CachingDataProvider
. The CachingDataProvider
subscribes to data in the backend and publishes a JSONMessage
.
When a DataSource receives a JSON subject request, if the CachingPublisher
for the subject namespace has a cached object for the subject, then the CachingPublisher
serves the request directly from its cache:
The CachingDataProvider
implementation should also publish updates resulting from its subscription to backend data events. When the CachingDataProvider
publishes an update to a subject that the CachingPublisher
has already cached, the CachingPublisher
chooses the most efficient format in which to publish the update: as a JSON patch or as the full image.
Example
In this example, we’ll create a CachingDataProvider
that generates random pricing data for currency pair subjects (/CCYS/<currency_pair>
).
import com.caplin.datasource.DataSource;
import com.caplin.datasource.messaging.json.JacksonJsonHandler;
import com.caplin.datasource.namespace.PrefixNamespace;
import com.caplin.datasource.publisher.CachingPublisher;
public class ExampleJsonAdapter
{
public static void main(final String[] args)
{
final DataSource dataSource = DataSource.fromArgs(args);
dataSource.getExtraConfiguration().setJsonHandler(new JacksonJsonHandler()); (1)
PricingCachingDataProvider pricingCachingDataProvider =
new PricingCachingDataProvider(); (2)
CachingPublisher pricingCachingPublisher = dataSource.createCachingPublisher(
new PrefixNamespace("/CCYS/"), pricingCachingDataProvider); (3)
pricingCachingDataProvider.setCachingPublisher(pricingCachingPublisher); (4)
dataSource.start();
}
}
1 | Set the implementation of JsonHandler that the adapter’s DataSource instance should use to serialise Java objects to JSON. In this example, we use DataSource for Java’s JacksonJsonHandler implementation. |
2 | Instantiate a CachingDataProvider . In this example we instantiate a PricingCachingDataProvider (see source code below). |
3 | Register the PricingCachingDataProvider with the adapter’s DataSource instance to create a CachingPublisher . |
4 | Initialise the PricingCachingDataProvider with its associated CachingPublisher . |
PricingCachingDataProvider.java
The CachingDataProvider
that responds to subject requests and discards.
import com.caplin.datasource.messaging.json.JsonMessage;
import com.caplin.datasource.publisher.CachingDataProvider;
import com.caplin.datasource.publisher.CachingPublisher;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.*;
public class PricingCachingDataProvider implements CachingDataProvider
{
private final ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
private final Random random = new Random();
private final Map<String, ScheduledFuture<?>> subscriptions =
new ConcurrentHashMap<>();
private CachingPublisher cachingPublisher = null;
@Override
public void onRequest(String subject) { (1)
subscriptions.put(
subject,
executorService.scheduleAtFixedRate(() -> {
Price randomPrice = createRandomPrice(subject.split("/")[2]);
JsonMessage jsonMessage = cachingPublisher.getCachedMessageFactory()
.createJsonMessage(subject, randomPrice);
cachingPublisher.publish(jsonMessage);
}, 0L, 1L, TimeUnit.SECONDS)
);
}
@Override
public void onDiscard(String subject) { (2)
ScheduledFuture<?> scheduledFuture = subscriptions.remove(subject);
if (scheduledFuture != null) scheduledFuture.cancel(true);
}
public void setCachingPublisher(CachingPublisher cachingPublisher) { (3)
this.cachingPublisher = cachingPublisher;
}
private Price createRandomPrice(String currencyPair) {
return new Price(
String.valueOf(random.nextInt()),
String.valueOf(random.nextDouble()),
String.valueOf(random.nextDouble()),
currencyPair
);
}
}
1 | On subject request, parse the subject and subscribe to a backend process to provide updates. This method is called by the CachingPublisher when it does not have cached data for a subject; thereafter, requests for the same subject are served from the CachingPublisher cache. In this example, we start a simulation that publishes a random price for the subject every second. |
2 | On subject discard, cancel the backend process that generates updates for the subject. In this example, we cancel the ScheduledFuture that generates a random price every second. |
3 | The setCachingPublisher method is not part of the CachingDataProvider interface. We use this method to initialise the PricingCachingDataProvider with its associated CachingPublisher . |
Price.java
A POJO that represents the data to send as a JSON message. Instances of this class are serialised to JSON by DataSource for Java’s JsonHandler
.
public class Price {
private final String id;
private final String bid;
private final String ask;
private final String currencyPair;
public Price(String id, String bid, String ask, String currencyPair) {
this.id = id;
this.bid = bid;
this.ask = ask;
this.currencyPair = currencyPair;
}
public String getId() {
return id;
}
public String getBid() {
return bid;
}
public String getAsk() {
return ask;
}
public String getCurrencyPair() {
return currencyPair;
}
}
See also: