lunedì 13 luglio 2015

Using web socket to manage remote devices

The advent of web sockets opened to full-duplex communication channels between browsers and web servers. Being based on the port 80. this has also the benefit to bypass firewall policies. Perfect application for this technology are real-time messaging, such as multiplayer games and chats. 

We discuss hereby an example of a client and server to remotely control IoT devices based on web sockets. The server uses jdbc to connect to an internal MySQL database for authentication/authorization, the simplejson java library to format messages, as well as the 1.3.0 library for its networking aspects.

class Server extends WebSocketServer{
private java.sql.Connection dbConn;
  private Map< WebSocket, String> authenticatedConnections;
  private Map<String, ArrayList<WebSocket>> subscriptionControllers;
  private Map<String, ArrayList<WebSocket>> subscriptionDashboards;
  public Server(int port) throws UnknownHostException {
        super(new InetSocketAddress(port));
  authenticatedConnections = new HashMap< WebSocket, String>();
  subscriptionControllers = new HashMap<String, ArrayList<WebSocket>>();
  subscriptionDashboards = new HashMap<String, ArrayList<WebSocket>>();
  public void setDBMSCredentials(String host, int port, String user, String passwd, String db){
  try {
  dbConn = DriverManager.getConnection("jdbc:" + "mysql" + "://" + host + "/" + db , user, passwd);
  System.out.println("Connected to database "+db+" at "+host);
  } catch (SQLException e) {
  public void onOpen(WebSocket conn, ClientHandshake arg1) {
  System.out.println( conn.getRemoteSocketAddress().getAddress().getHostAddress() + " connected!" );
  public void onClose(WebSocket conn, int arg1, String arg2, boolean arg3) {
  System.out.println( conn.getRemoteSocketAddress().getAddress().getHostAddress() + " has disconnected!" );
  // remove the connection from both the authenticated and the subscribed ones if available
  String authkey = authenticatedConnections.get(conn);
  // remove also from the subscriptions
  subscriptionDashboards.get(authkey).remove(conn); // remove the subscription if available
                subscriptionControllers.get(authkey).remove(conn); // remove the subscription if available

  public void onError(WebSocket conn, Exception arg1) {
  System.out.println("Error while interacting with "+conn.getRemoteSocketAddress().getAddress().getHostAddress()+":\n\t"+arg1.getMessage());

  public void onMessage(WebSocket conn, String s) {
  JSONParser parser = new JSONParser();
  try {
  JSONObject obj = (JSONObject) parser.parse(s);
  // check authentication key
  String authkey = (String) obj.get("authkey");
  String type = (String) obj.get("type");
  String action = (String) obj.get("action");
  // authenticate the user first
  if(authenticatedConnections.containsKey(conn) ||
  // check if the connection was already authenticated
                    authenticatedConnections.put(conn, authkey);
  // check the operation type
  case "device_status":
  System.out.println(authkey+": received "+action+" device_status from "
// check if an entry exists for the given key
                                subscriptionDashboards.put(authkey, new ArrayList<WebSocket>());
                                // avoid get on missing key
  // avoid multiple entries for the same connection
System.out.println("\tAdding "+authkey+" to subDash");
}else{ // publish
                                this.sendAll(s, subscriptionDashboards.get(authkey));
case "device_control":
System.out.println(authkey+": received "+action+" device_control from "
// check if an entry exists for the given key
                                subscriptionControllers.put(authkey, new ArrayList<WebSocket>());
  // avoid get on missing key
  // avoid multiple entries for the same connection
System.out.println("\tAdding "+authkey+" to subContr");
}else{ // publish
                                this.sendAll(s, subscriptionControllers.get(authkey));
} catch (ParseException e) {
// out of protocol format
                               +": out of protocol format!");
public void sendAll(String message, ArrayList<WebSocket> conns){
Iterator<WebSocket> i = conns.iterator();
WebSocket s =;
s.send( message );
System.out.println("\tRouting to "

public boolean authenticateUser(String authkey){
boolean auth = false;
if(dbConn != null){
try {
Statement stmt = dbConn.createStatement();
// prevent sql injection by using a prepared statement
java.sql.PreparedStatement ps = dbConn.prepareStatement("SELECT * FROM advisor.user WHERE user.authkey = ?");
                ps.setString(1, authkey);
ResultSet rs = ps.executeQuery();
auth = true;
} catch (SQLException e) {
return auth;

The server main function might look like this then:

public static void main(String[] args) throws IOException, TimeoutException,     
                    ShutdownSignalException, ConsumerCancelledException, InterruptedException  {
    DeviceManager dm = new DeviceManager();
    Server srv = Server(8887);

    srv.setDBMSCredentials("localhost", 3306, "root", "", "advisor");
    System.out.println("Websocket server started");

We can now test the server by subscribing for control events on one side and publishing control events on the other one. Let's do this in Python for simplicity.

import threading
import sys, os
from threading import Timer
from websocket import create_connection, WebSocketConnectionClosedException
import json
from plugwise import Stick, Circle, TimeoutException
import serial = create_connection(self._control_interface)
print "connection with remote server established!"
# subscribe to control events for the given authentication key{'authkey':'authkey-for-user',
                         'action': 'subscribe',
print "successfully subscribed to device control events!"
# start a listener for control events

This basically creates a connection to the server, sends a subscribe for 'device_control' events and spawns a thread listening for incoming events, which is implemented as follows:

def listen_device_control_events(self):
        while not self._exit: # listen for incoming commands until the script is voluntarily terminated
                received = json.loads( )
                if 'type' in received.keys() and received['type'] == 'device_control': # the 'is' is for object comparison
                    print 'received control event to ', received['payload']['state'], 'for', received['payload']['device_id']
                    self.set_circle(received['payload']['device_id'], int(received['payload']['state']))
            except ValueError as e:
                print e
            except WebSocketConnectionClosedException:
                print "the server closed the socket"

Upon reception of control events, the code simply calls the method set_circle (defined in the python plug wise library) to control the switch status of connected ZigBee wireless nodes.

Now a device_status event can be published each time a device becomes available in the network, by simply calling the following function:

def update_circle_status(self, circle_id, status):{'authkey':self._settings['apikey'],

For instance, an event can be sent when initiating all ZigBee circles as follows:

self.circles = {}
    for c in self._settings['circles']:
        self.circles[c] = Circle(c, self.stick)
        self.update_circle_status(c, 1) # send all connected circles as available

Here is a simple implementation for the controlling client, which simply sends control events based on user's input:

from websocket import create_connection, WebSocketConnectionClosedException
import json
import threading

ws = create_connection("ws://")

while True:
    input = raw_input("Insert status of device")      

'action': 'publish',

The code was integrated into the Mjölnir gateway to provide an event mechanism for remote device control.

