Contains information on remote events subscription and notification via WebSockets.
Overview
The purpose of this document is to elaborate the mechanism for client applications to subscribe for remote events and receive remote event notifications via WebSockets. The general structure of the remote events that are pushed through the WebSockets and the syntax for encoding and providing event subscription criteria in order to create events subscriptions on the server are specified below.
A fully functional WebSocket server is in use. It implements the RFC-6455 web socket specification. It allows the use of subprotocols and bi-directional communication. There are two methods of subscription for Remote events using the WebSockets functionality. The first one is by using legacy subscription via URI. This subscription has one direction of communication. The other one is done using the "osgi.events.v1-text" subprotocol and supports bi-directional communication.
Information on both methods is given below.
Legacy Subscription Mechanism
Subscribing for Remote Events and Receiving Event Notifications
The event subscription is created at the moment the client applications opens a connection via WebSocket. Event filtering criteria needs to be encoded in the target URI. Any events generated on the server that match the subscription are pushed through the web socket as long as the connection is open. The format of the events conforms to the general event format defined in Remote Events Subscriptions and Pull Notifications via Long Polling. Event representations are encoded in JSON.
Accepted URI Formats for Event Subscriptions via WebSockets
URI |
Description |
|---|---|
ws://<host>:<port>/m2m/[<gatewayID>]/re/subscriptions?topics=<topic1>,<topic2>,...&filter=<filter> |
Creates an event subscription with a simple subscription criteria consisted of a single subscription criterion defined via query parameters. The subscription criterion specifies topics of interest <topic1> , <topic2>, ..., <topicN> and an optional LDAP filter <filter> which should be defined over the properties of the events with the specified topics. The subscription is created at the moment the connection is opened and removed when the connection is closed. Any events generated on the server that match the subscription are pushed through the web socket as long as the connection is open. If the subscription will be scoped to a single OSGi service gateway accessed remotely via the Remote Management then the gateway identifier shall be provided explicitly in the URI. |
ws://<host>:<port>/m2m/[<gatewayID>]/re/subscriptions/[{"topics":[<topic11>, <topic12>, ...], "filter":<fitler1>}, {"topics":[<topic21>, <topic22>, ...], "filter":<fitler2>}, ...] |
Creates an event subscription with subscription criteria consisted of multiple subscription criterion entries defined within a path parameter. Each subscription criterion specifies topics of interest and corresponding optional LDAP filter which should be defined over the properties of the relevant events. The subscription is created at the moment the connection is opened and removed when the connection is closed. Any events generated on the server that match the subscription are pushed through the web socket as long as the connection is open. If the subscription will be scoped to a single OSGi service gateway accessed remotely via the Remote Management then the gateway identifier shall be provided explicitly in the URI. |
Possible subscription topics:
Type of Topic |
Value |
|---|---|
TOPIC_ALL |
com/prosyst/mbs/services/m2m/rest/fim/ItemRemoteEvent/* |
TOPIC_ITEM_REGISTERED |
com/prosyst/mbs/services/m2m/rest/fim/ItemRemoteEvent/REGISTERED |
TOPIC_ITEM_UNREGISTERED |
com/prosyst/mbs/services/m2m/rest/fim/ItemRemoteEvent/UNREGISTERED |
TOPIC_ITEM_PROPERTY_CHANGED |
com/prosyst/mbs/services/m2m/rest/fim/ItemRemoteEvent/PROPERTY_CHANGED |
TOPIC_ITEM_OPERATION_EXECUTED |
com/prosyst/mbs/services/m2m/rest/fim/ItemRemoteEvent/OPERATION_EXECUTED |
Example Web Socket subscription:
This example covers ALL Functional Item Events and their respective topics as received by the Remote Manager. If you want a subscription for a specific gateway you need to add [<gatewayID>] as shown in the Accepted URI Formats for Event Subscriptions via Web Sockets table.
Using Legacy Subscription Mechanism
Examples for subscribing for remote events and receiving pull notifications via websockets are provided below.
The communication is one-directional and all the subscription functionality is done with the URI. The user receives everything via the websockets without sending anything further. For more information see the last section New Subscription Subprotocol.
The following JavaScript code snippet demonstrates creating a websocket subscription with specified topic in the URI.
var webSocketConfig = {
baseWebSocketURL: (window.location.protocol.toLowerCase().indexOf('s') > -1 ? 'wss' : 'ws') + '://' + window.location.host + '/m2m/re/subscriptions',
openWS: null
};
function logMessage(eo) {
console.log(JSON.parse(eo.data));
}
function openWebSocket() {
webSocketConfig.openWS = new WebSocket(webSocketConfig.baseWebSocketURL + '/m2m/re/subscriptions/?topic=com/prosyst/mbs/services/m2m/rest/fim/ItemRemoteEvent/REGISTERED');
webSocketConfig.openWS.onmessage = logMessage;
}
.....
Deleting the Subscription
The following JavaScript code snippet demonstrates deletion of a websocket subscription via closing the websocket connection.
function closeWebSocket() {
webSocketConfig.openWS.close();
webSocketConfig.openWS = null;
}
....
New Subscription Subprotocol
Subscribing with Message
If you connect to the OSGi Events WebSocket Application with an application URI, only you can subscribe for additional events by sending text messages, with a JSON Object. The JSON Object contains:
Every property must be in lower-case.
The options supported by the OSGi Events WebSocket Application are:
1. Subscribe option with the following format:
Examples
Single topic subscription without a filter
//Without userkey
{"command":"subscribe", "params":"someTopic"}
//With userkey
{"command":"subscribe", "userkey":"someKey", "params":"someTopic"}
Multiple topic subscription without a filter
//Without userkey
{"command":"subscribe", "params":["topicOne", "topicTwo", "topicThree"]}
//With userkey
{"command":"subscribe", "userkey":"someKey", "params":["topicOne", "topicTwo", "topicThree"]}
One or more topic(s) with or without a filter
//Every JSON object in the params array contains information for one subscription. It can have one or more topic(s) and one optional filter
//Without userkey
{"command":"subscribe", "params":[{"filter":"oneFilter", "topic":"oneTopic"}, {"filter":"oneFilter", "topic":["one", "two", "three"]}, {"topic": ["four", "five"]}]}
//With userkey
{"command":"subscribe", "userkey":"someKey", "params":[{"filter":"oneFilter", "topic":"oneTopic"}, {"filter":"oneFilter", "topic":["one", "two", "three"]}, {"topic": ["four", "five"]}]}
2. Unsubscribe option with the following format:
Examples
Subscription cancelling
//Single subscription cancelling
{"command":"unsubscribe", "params":"someId"}
//Single subscription cancelling with userkey
{"command":"unsubscribe", "params":"someId", "userkey":"someUserKey"}
//Multiple subscriptions cancelling
{"command":"unsubscribe", "params":["idOne", "idTwo", "idThree", "idFour", "idFive"]}
//Multiple subscriptions cancelling with userkey
{"command":"unsubscribe", "params":["idOne", "idTwo", "idThree", "idFour", "idFive"], "userkey":"someUserKey"}
Response after cancelling subscription
//Here subscriptions with idOne, idThree and idFive have been cancelled successfully while subscription with idTwo and idFour have failed
//Without userkey
{"successful":["idOne", "idThree", "idFive"],"unsuccessful":["idTwo", "idFour"]}
//With userkey
{"successful":["idOne", "idThree", "idFive"],"unsuccessful":["idTwo", "idFour"], "userkey":"someUserKey"}
Subscription Cancelling
There are three ways for subscription cancelling:
When the subscription is removed the client is notified via a message in the following format: {"subexp":"subId"} where the subId is the Id of the subscription that has expired.
Java Example
Below are described the three classes in the Java Example for the REST WebSocket Push functionality.
The example requires the org.eclipse.jetty.websocket.client from the SDK's bundles/ directory and the gson JSON serialization bundles to resolve.
The dependencies for compilation in the Maven POM are the following:
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
<version>9.3.0.M2</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.0</version>
</dependency>
Push WS Example Class
This class is responsible for the user-friendly testing of the Events Push functionality in REST – you will be prompted for the following input on bundle startup:
When the events have been sent in the Event Admin, the sample client will receive a JSON notification.
Code for unregistration has also been included, but is not used by the example.
The URI for subscription is /m2m/re/subscriptions .
The topic of the test events is "testTopic".
The Web Console Event page can be used to send sample events to test the example.
package demo.test.education.samplewsclient;
import java.io.IOException;
import org.osgi.framework.BundleContext;
import com.google.gson.JsonObject;
/**
* REST Push Example.
*
*/
public class PushWSExample {
BundleContext bc;
/**
* The active web socket.
*/
private JettyWebSocket activeClient = null;
/**
* Constructor for the push example.
* @param bundleContext The bundle context.
* @param name The name of the command group.
* @param help The help message.
*/
public PushWSExample(BundleContext bundleContext) {
this.bc = bundleContext;
}
/**
* Closes the active clients.
*/
public void close() {
try {
activeClient.destroy();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* Subscription method for demonstration purposes.
* @param host The web socket server host address, such as ws://172.0.0.1
* @param port The port of the web socket server,such as 8082
*/
public void subscribe( String host, String port) {
int portNumber = Integer.parseInt(port, 10);
String alias = "/m2m/re/subscriptions";
try {
startNewClient(host, portNumber, alias);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* Unsubscription command for demonstration purposes.
* @param id
* @throws IOException
*/
public void unsubscribe(String id) throws IOException {
JsonObject json = new JsonObject();
json.addProperty("command", "unsubscribe");
json.addProperty("params", id);
activeClient.sendMessage(json.toString());
}
/**
* Starts a new client.
* @param host The host of the client, such as ws://172.22.172.149
* @param port The port of the client.
* @param alias The alias, such as /m2m/re/subscriptions .
* @throws Exception General, all-purpose exception.
*/
private void startNewClient(String host, int port, String alias)
throws Exception {
final JettyWebSocket socket = new JettyWebSocket(host, port, alias);
socket.start();
activeClient = socket;
JsonObject json = new JsonObject();
json.addProperty("command", "subscribe");
json.addProperty("params", "testTopic");
socket.sendMessage(json.toString());
}
}
Activator Class
The following class acts as a BundleActivator for the example:
package demo.test.education.samplewsclient;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
/**
* Activator for the REST Events Push Example.
*
*/
public class Activator implements BundleActivator {
/**
* The example implementation we will use.
*/
private PushWSExample demoImpl;
/**
* Starts the bundle.
* @param context The bundle context.
* @throws Exception General, all-purpose exception.
*/
public void start(BundleContext context) throws Exception {
demoImpl = new PushWSExample(context);
System.out.println("Enter host:");
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String host = reader.readLine();
System.out.println("Enter port:");
String port = reader.readLine();
demoImpl.subscribe(host, port);
}
/**
* Stops the bundle.
* @param context The bundle context.
* @throws Exception General, all-purpose exception.
*/
public void stop(BundleContext context) throws Exception {
if (demoImpl != null) {
demoImpl.close();
}
}
}
Jetty Web Socket Class
A Jetty Web Socket implementation for use by the example:
package demo.test.education.samplewsclient;
import java.io.IOException;
import java.net.URI;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
/**
* Jetty web socket implementation class for the REST Jetty Events Push example.
*
*/
@WebSocket()
public class JettyWebSocket {
/**
* Timeout for socket closure.
*/
private static final int SOCKET_CLOSURE_TIMEOUT_IN_MILLISECONDS = 1000;
/**
* The session of the web socket.
*/
private Session session;
/**
* If set to true, there is a pending session for a connection.
*/
private boolean isPendingSession = true;
/**
* The client related to the Web Socket.
*/
WebSocketClient client;
/**
* The host name.
*/
private final String host;
/**
* The port name.
*/
private final int port;
/**
* The alias of the web socket connection.
*/
private final String alias;
/**
* Constructor for the JettyWebSocket class.
* @param host The host name.
* @param port The port number.
* @param alias The alias of the web socket connection.
*/
public JettyWebSocket(String host, int port, String alias) {
this.host = host;
this.port = port;
this.alias = alias;
}
/**
* Get method for the host.
* @return the host
*/
public String getHost() {
return host;
}
/**
* Get method for the port.
* @return the port
*/
public int getPort() {
return port;
}
/**
* Starts the web socket.
* @throws Exception General, all-purpose exception
*/
public void start() throws Exception {
if (client != null) {
return;
}
URI uri = new URI(host + ":" + port + alias);
client = new WebSocketClient();
client.start();
ClientUpgradeRequest request = new ClientUpgradeRequest();
client.connect(this, uri, request);
final JettyWebSocket currentSocket = this;
Thread clientThread = new Thread(new Runnable() {
public void run() {
try {
synchronized (currentSocket) {
while (currentSocket != null && currentSocket.isConnected()) {
currentSocket.wait(SOCKET_CLOSURE_TIMEOUT_IN_MILLISECONDS);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
currentSocket.destroy();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
clientThread.start();
Thread.sleep(3000);
}
/**
* Reacts to the closing of the web socket.
* @param statusCode The status code of the closing.
* @param reason The reason for the closing.
*/
@OnWebSocketClose
public void onClose(int statusCode, String reason) {
try {
destroy();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Reacts to a successful connection to a web socket.
* @param session The newly created session.
*/
@OnWebSocketConnect
public void onConnect(Session session) {
//System.out.printf("Got connected: %s%n", session);
this.session = session;
isPendingSession = false;
}
/**
* Reacts on a received message /via the web socket/.
* @param message The received message.
*/
@OnWebSocketMessage
public void onMessage(String message) {
System.out.println("Message received:" + message);
}
/**
* Checks if the client is connected.
* @return True if connected, false otherwise.
*/
public boolean isConnected() {
return !isPendingSession && session != null && session.isOpen();
}
/**
* Destroys a web socket client.
* @throws IOException Exception in case of input/output problems.
*/
public void destroy() throws IOException {
if (session != null) {
session.close();
session.disconnect();
session = null;
}
if (client != null) {
client.destroy();
client = null;
}
}
/**
* Sends a message via the web socket.
* @param message The message to send.
* @throws IOException Exception in case of input/output problems.
*/
public void sendMessage(String message) throws IOException {
if (session == null || session.getRemote() == null) {
return;
}
System.out.println("Sending message");
session.getRemote().sendString(message);
}
}