Previous Topic

Next Topic

Book Contents

Book Index

Remote Events Subscriptions and Push Notifications via WebSockets

Contains information on remote events subscription and notification via WebSockets.

Overview

The purpose of this document is to elaborate the mechanism for client applications to subscribe for remote events and receive remote event notifications via WebSockets. The general structure of the remote events that are pushed through the WebSockets and the syntax for encoding and providing event subscription criteria in order to create events subscriptions on the server are specified below.

A fully functional WebSocket server is in use. It implements the RFC-6455 web socket specification. It allows the use of subprotocols and bi-directional communication. There are two methods of subscription for Remote events using the WebSockets functionality. The first one is by using legacy subscription via URI. This subscription has one direction of communication. The other one is done using the "osgi.events.v1-text" subprotocol and supports bi-directional communication.

Information on both methods is given below.

Legacy Subscription Mechanism

Subscribing for Remote Events and Receiving Event Notifications

The event subscription is created at the moment the client applications opens a connection via WebSocket. Event filtering criteria needs to be encoded in the target URI. Any events generated on the server that match the subscription are pushed through the web socket as long as the connection is open. The format of the events conforms to the general event format defined in Remote Events Subscriptions and Pull Notifications via Long Polling. Event representations are encoded in JSON.

Accepted URI Formats for Event Subscriptions via WebSockets

URI

Description

ws://<host>:<port>/m2m/[<gatewayID>]/re/subscriptions?topics=<topic1>,<topic2>,...&filter=<filter>

Creates an event subscription with a simple subscription criteria consisted of a single subscription criterion defined via query parameters. The subscription criterion specifies topics of interest <topic1> , <topic2>, ..., <topicN> and an optional LDAP filter <filter> which should be defined over the properties of the events with the specified topics. The subscription is created at the moment the connection is opened and removed when the connection is closed. Any events generated on the server that match the subscription are pushed through the web socket as long as the connection is open. If the subscription will be scoped to a single OSGi service gateway accessed remotely via the Remote Management then the gateway identifier shall be provided explicitly in the URI.

ws://<host>:<port>/m2m/[<gatewayID>]/re/subscriptions/[{"topics":[<topic11>, <topic12>, ...], "filter":<fitler1>}, {"topics":[<topic21>, <topic22>, ...], "filter":<fitler2>}, ...]

Creates an event subscription with subscription criteria consisted of multiple subscription criterion entries defined within a path parameter. Each subscription criterion specifies topics of interest and corresponding optional LDAP filter which should be defined over the properties of the relevant events. The subscription is created at the moment the connection is opened and removed when the connection is closed. Any events generated on the server that match the subscription are pushed through the web socket as long as the connection is open. If the subscription will be scoped to a single OSGi service gateway accessed remotely via the Remote Management then the gateway identifier shall be provided explicitly in the URI.

Possible subscription topics:

Type of Topic

Value

TOPIC_ALL

com/prosyst/mbs/services/m2m/rest/fim/ItemRemoteEvent/*

TOPIC_ITEM_REGISTERED

com/prosyst/mbs/services/m2m/rest/fim/ItemRemoteEvent/REGISTERED

TOPIC_ITEM_UNREGISTERED

com/prosyst/mbs/services/m2m/rest/fim/ItemRemoteEvent/UNREGISTERED

TOPIC_ITEM_PROPERTY_CHANGED

com/prosyst/mbs/services/m2m/rest/fim/ItemRemoteEvent/PROPERTY_CHANGED

TOPIC_ITEM_OPERATION_EXECUTED

com/prosyst/mbs/services/m2m/rest/fim/ItemRemoteEvent/OPERATION_EXECUTED

Example Web Socket subscription:

  1. TOPIC_ALL
    ws://<host>:<port>/m2m/re/subscriptions?topics=com/prosyst/mbs/services/m2m/rest/fim/ItemRemoteEvent/*
  2. TOPIC_ITEM_REGISTERED
    ws://<host>:<port>/m2m/re/subscriptions?topics=com/prosyst/mbs/services/m2m/rest/fim/ItemRemoteEvent/REGISTERED
  3. TOPIC_ITEMUNREGISTERED
    ws://<host>:<port>/m2m/re/subscriptions?topics=com/prosyst/mbs/services/m2m/rest/fim/ItemRemoteEvent/UNREGISTERED
  4. TOPIC_ITEM_PROPERTY_CHANGED
    ws://<host>:<port>/m2m/re/subscriptions?topics=com/prosyst/mbs/services/m2m/rest/fim/ItemRemoteEvent/PROPERTY_CHANGED
  5. TOPIC_ITEM_OPERATION_EXECUTED
    ws://<host>:<port>/m2m/re/subscriptions?topics=com/prosyst/mbs/services/m2m/rest/fim/ItemRemoteEvent/OPERATION_EXECUTED

This example covers ALL Functional Item Events and their respective topics as received by the Remote Manager. If you want a subscription for a specific gateway you need to add [<gatewayID>] as shown in the Accepted URI Formats for Event Subscriptions via Web Sockets table.

Using Legacy Subscription Mechanism

Examples for subscribing for remote events and receiving pull notifications via websockets are provided below.

The communication is one-directional and all the subscription functionality is done with the URI. The user receives everything via the websockets without sending anything further. For more information see the last section New Subscription Subprotocol.

The following JavaScript code snippet demonstrates creating a websocket subscription with specified topic in the URI.

var webSocketConfig = {

        baseWebSocketURL: (window.location.protocol.toLowerCase().indexOf('s') > -1 ? 'wss' : 'ws') + '://' + window.location.host + '/m2m/re/subscriptions',

        openWS: null

    };

    

    function logMessage(eo) {

            console.log(JSON.parse(eo.data));

    }

    function openWebSocket() {

        webSocketConfig.openWS = new WebSocket(webSocketConfig.baseWebSocketURL + '/m2m/re/subscriptions/?topic=com/prosyst/mbs/services/m2m/rest/fim/ItemRemoteEvent/REGISTERED');

        webSocketConfig.openWS.onmessage = logMessage;

    }

.....

Deleting the Subscription

The following JavaScript code snippet demonstrates deletion of a websocket subscription via closing the websocket connection.

function closeWebSocket() {

        webSocketConfig.openWS.close();

        webSocketConfig.openWS = null;

    }

....

New Subscription Subprotocol

Subscribing with Message

If you connect to the OSGi Events WebSocket Application with an application URI, only you can subscribe for additional events by sending text messages, with a JSON Object. The JSON Object contains:

Every property must be in lower-case.

The options supported by the OSGi Events WebSocket Application are:

1.   Subscribe option with the following format:

2.   Unsubscribe option with the following format:

                  Examples

                  Subscription cancelling

          //Single subscription cancelling

          {"command":"unsubscribe", "params":"someId"}

          //Single subscription cancelling with userkey

          {"command":"unsubscribe", "params":"someId", "userkey":"someUserKey"}

          //Multiple subscriptions cancelling

          {"command":"unsubscribe", "params":["idOne", "idTwo", "idThree", "idFour", "idFive"]}

          //Multiple subscriptions cancelling with userkey

          {"command":"unsubscribe", "params":["idOne", "idTwo", "idThree", "idFour", "idFive"], "userkey":"someUserKey"}

                    Response after cancelling subscription

          //Here subscriptions with idOne, idThree and idFive have been cancelled successfully while subscription with idTwo and idFour have failed

          //Without userkey

          {"successful":["idOne", "idThree", "idFive"],"unsuccessful":["idTwo", "idFour"]}

          //With userkey

          {"successful":["idOne", "idThree", "idFive"],"unsuccessful":["idTwo", "idFour"], "userkey":"someUserKey"}

Subscription Cancelling

There are three ways for subscription cancelling:

When the subscription is removed the client is notified via a message in the following format: {"subexp":"subId"} where the subId is the Id of the subscription that has expired.

Java Example

Below are described the three classes in the Java Example for the REST WebSocket Push functionality.

The example requires the org.eclipse.jetty.websocket.client from the SDK's bundles/ directory and the gson JSON serialization bundles to resolve.

The dependencies for compilation in the Maven POM are the following:

Push WS Example Class

This class is responsible for the user-friendly testing of the Events Push functionality in REST – you will be prompted for the following input on bundle startup:

When the events have been sent in the Event Admin, the sample client will receive a JSON notification.

Code for unregistration has also been included, but is not used by the example.

The URI for subscription is /m2m/re/subscriptions .

The topic of the test events is "testTopic".

The Web Console Event page can be used to send sample events to test the example.

package demo.test.education.samplewsclient;

import java.io.IOException;

import org.osgi.framework.BundleContext;

import com.google.gson.JsonObject;

/**

* REST Push Example.

*

*/

public class PushWSExample {

  BundleContext bc;

  /**

   * The active web socket.

   */

  private JettyWebSocket activeClient = null;

  /**

   * Constructor for the push example.

   * @param bundleContext The bundle context.

   * @param name The name of the command group.

   * @param help The help message.

   */

  public PushWSExample(BundleContext bundleContext) {

    this.bc = bundleContext;

  }

  /**

   * Closes the active clients.

   */

  public void close() {

    try {

      activeClient.destroy();

    } catch (IOException e) {

      // TODO Auto-generated catch block

      e.printStackTrace();

    }

  }

  /**

   * Subscription method for demonstration purposes.

   * @param host The web socket server host address, such as ws://172.0.0.1

   * @param port The port of the web socket server,such as 8082

   */

  public void subscribe( String host, String port) {

    int portNumber = Integer.parseInt(port, 10);

    String alias = "/m2m/re/subscriptions";

    try {

      startNewClient(host, portNumber, alias);

    } catch (Exception e) {

      // TODO Auto-generated catch block

      e.printStackTrace();

    }

  }

  /**

   * Unsubscription command for demonstration purposes.

   * @param id

   * @throws IOException

   */

  public void unsubscribe(String id) throws IOException {

    JsonObject json = new JsonObject();

    json.addProperty("command", "unsubscribe");

    json.addProperty("params", id);

    activeClient.sendMessage(json.toString());

  }

  /**

   * Starts a new client.

   * @param host The host of the client, such as ws://172.22.172.149

   * @param port The port of the client.

   * @param alias The alias, such as /m2m/re/subscriptions .

   * @throws Exception General, all-purpose exception.

   */

  private void startNewClient(String host, int port, String alias)

      throws Exception {

    final JettyWebSocket socket = new JettyWebSocket(host, port, alias);

    socket.start();

    activeClient = socket;

    JsonObject json = new JsonObject();

    json.addProperty("command", "subscribe");

    json.addProperty("params", "testTopic");

    socket.sendMessage(json.toString());

  }

}

Activator Class

The following class acts as a BundleActivator for the example:

package demo.test.education.samplewsclient;

import java.io.BufferedReader;

import java.io.InputStreamReader;

import org.osgi.framework.BundleActivator;

import org.osgi.framework.BundleContext;

/**

* Activator for the REST Events Push Example.

*

*/

public class Activator implements BundleActivator {

  /**

   * The example implementation we will use.

   */

  private PushWSExample demoImpl;

  /**

   * Starts the bundle.

   * @param context The bundle context.

   * @throws Exception General, all-purpose exception.

   */

  public void start(BundleContext context) throws Exception {

    demoImpl = new PushWSExample(context);

    System.out.println("Enter host:");

    BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));

    String host = reader.readLine();

    System.out.println("Enter port:");

    String port = reader.readLine();

    demoImpl.subscribe(host, port);

  }

  /**

   * Stops the bundle.

   * @param context The bundle context.

   * @throws Exception General, all-purpose exception.

   */

  public void stop(BundleContext context) throws Exception {

    if (demoImpl != null) {

      demoImpl.close();

    }

  }

}

Jetty Web Socket Class

A Jetty Web Socket implementation for use by the example:

package demo.test.education.samplewsclient;

import java.io.IOException;

import java.net.URI;

import org.eclipse.jetty.websocket.api.Session;

import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;

import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;

import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;

import org.eclipse.jetty.websocket.api.annotations.WebSocket;

import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;

import org.eclipse.jetty.websocket.client.WebSocketClient;

/**

* Jetty web socket implementation class for the REST Jetty Events Push example.

*

*/

@WebSocket()

public class JettyWebSocket {

  /**

   * Timeout for socket closure.

   */

  private static final int SOCKET_CLOSURE_TIMEOUT_IN_MILLISECONDS = 1000;

  /**

   * The session of the web socket.

   */

  private Session          session;

  /**

   * If set to true, there is a pending session for a connection.

   */

  private boolean          isPendingSession                       = true;

  /**

   * The client related to the Web Socket.

   */

  WebSocketClient          client;

  /**

   * The host name.

   */

  private final String     host;

  /**

   * The port name.

   */

  private final int        port;

  /**

   * The alias of the web socket connection.

   */

  private final String     alias;

  /**

   * Constructor for the JettyWebSocket class.

   * @param host The host name.

   * @param port The port number.

   * @param alias The alias of the web socket connection.

   */

  public JettyWebSocket(String host, int port, String alias) {

    this.host = host;

    this.port = port;

    this.alias = alias;

  }

  /**

   * Get method for the host.

  * @return the host

  */

  public String getHost() {

    return host;

  }

  /**

   * Get method for the port.

   * @return the port

   */

  public int getPort() {

    return port;

  }

  /**

   * Starts the web socket.

   * @throws Exception General, all-purpose exception

   */

  public void start() throws Exception {

    if (client != null) {

      return;

    }

    URI uri = new URI(host + ":" + port + alias);

    client = new WebSocketClient();

    client.start();

    ClientUpgradeRequest request = new ClientUpgradeRequest();

    client.connect(this, uri, request);

    final JettyWebSocket currentSocket = this;

    Thread clientThread = new Thread(new Runnable() {

      public void run() {

        try {

          synchronized (currentSocket) {

            while (currentSocket != null && currentSocket.isConnected()) {

              currentSocket.wait(SOCKET_CLOSURE_TIMEOUT_IN_MILLISECONDS);

            }

          }

        } catch (Exception e) {

          e.printStackTrace();

        } finally {

          try {

            currentSocket.destroy();

          } catch (Exception e) {

            e.printStackTrace();

          }

        }

      }

    });

    clientThread.start();

    Thread.sleep(3000);

  }

  /**

   * Reacts to the closing of the web socket.

   * @param statusCode The status code of the closing.

   * @param reason The reason for the closing.

   */

  @OnWebSocketClose

  public void onClose(int statusCode, String reason) {

    try {

      destroy();

    } catch (IOException e) {

      e.printStackTrace();

    }

  }

  /**

   * Reacts to a successful connection to a web socket.

   * @param session The newly created session.

   */

  @OnWebSocketConnect

  public void onConnect(Session session) {

    //System.out.printf("Got connected: %s%n", session);

    this.session = session;

    isPendingSession = false;

  }

  /**

   * Reacts on a received message /via the web socket/.

   * @param message The received message.

   */

  @OnWebSocketMessage

  public void onMessage(String message) {

    System.out.println("Message received:" + message);

  }

  /**

   * Checks if the client is connected.

   * @return True if connected, false otherwise.

   */

  public boolean isConnected() {

    return !isPendingSession && session != null && session.isOpen();

  }

  /**

   * Destroys a web socket client.

   * @throws IOException Exception in case of input/output problems.

   */

  public void destroy() throws IOException {

    if (session != null) {

      session.close();

      session.disconnect();

      session = null;

    }

    if (client != null) {

      client.destroy();

      client = null;

    }

  }

  

  /**

   * Sends a message via the web socket.

   * @param message The message to send.

   * @throws IOException Exception in case of input/output problems.

   */

  public void sendMessage(String message) throws IOException {

    if (session == null || session.getRemote() == null) {

      return;

    }

    System.out.println("Sending message");

    session.getRemote().sendString(message);

  }

}