Monday, July 13, 2015

Using web socket to manage remote devices


The advent of web sockets opened to full-duplex communication channels between browsers and web servers. Being based on the port 80. this has also the benefit to bypass firewall policies. Perfect application for this technology are real-time messaging, such as multiplayer games and chats. 

We discuss hereby an example of a client and server to remotely control IoT devices based on web sockets. The server uses jdbc to connect to an internal MySQL database for authentication/authorization, the simplejson java library to format messages, as well as the org.java-websocket 1.3.0 library for its networking aspects.


class Server extends WebSocketServer{
private java.sql.Connection dbConn;
  private Map< WebSocket, String> authenticatedConnections;
  private Map<String, ArrayList<WebSocket>> subscriptionControllers;
  private Map<String, ArrayList<WebSocket>> subscriptionDashboards;
  public Server(int port) throws UnknownHostException {
        super(new InetSocketAddress(port));
  authenticatedConnections = new HashMap< WebSocket, String>();
  subscriptionControllers = new HashMap<String, ArrayList<WebSocket>>();
  subscriptionDashboards = new HashMap<String, ArrayList<WebSocket>>();
  }
  public void setDBMSCredentials(String host, int port, String user, String passwd, String db){
  try {
  dbConn = DriverManager.getConnection("jdbc:" + "mysql" + "://" + host + "/" + db , user, passwd);
  System.out.println("Connected to database "+db+" at "+host);
  } catch (SQLException e) {
  e.printStackTrace();
  }
  }
  @Override
  public void onOpen(WebSocket conn, ClientHandshake arg1) {
  System.out.println( conn.getRemoteSocketAddress().getAddress().getHostAddress() + " connected!" );
  }
  @Override
  public void onClose(WebSocket conn, int arg1, String arg2, boolean arg3) {
  System.out.println( conn.getRemoteSocketAddress().getAddress().getHostAddress() + " has disconnected!" );
  // remove the connection from both the authenticated and the subscribed ones if available
  if(authenticatedConnections.containsKey(conn)){
  String authkey = authenticatedConnections.get(conn);
  // remove also from the subscriptions
  if(subscriptionDashboards.containsKey(authkey)){
  subscriptionDashboards.get(authkey).remove(conn); // remove the subscription if available
  }
 
  if(subscriptionControllers.containsKey(authkey)){
                subscriptionControllers.get(authkey).remove(conn); // remove the subscription if available
}
}
}

  @Override
  public void onError(WebSocket conn, Exception arg1) {
  //arg1.printStackTrace();
  System.out.println("Error while interacting with "+conn.getRemoteSocketAddress().getAddress().getHostAddress()+":\n\t"+arg1.getMessage());
}

  @Override
  public void onMessage(WebSocket conn, String s) {
  JSONParser parser = new JSONParser();
  try {
  JSONObject obj = (JSONObject) parser.parse(s);
  // check authentication key
  String authkey = (String) obj.get("authkey");
  String type = (String) obj.get("type");
  String action = (String) obj.get("action");
  // authenticate the user first
  if(authenticatedConnections.containsKey(conn) ||
               this.authenticateUser(authkey)){
  // check if the connection was already authenticated
if(!authenticatedConnections.containsKey(conn))
                    authenticatedConnections.put(conn, authkey);
  // check the operation type
  switch(type){
  case "device_status":
  System.out.println(authkey+": received "+action+" device_status from "
                                  +conn.getRemoteSocketAddress().getAddress().getHostAddress());
if(action.equals("subscribe")){
// check if an entry exists for the given key
if(!subscriptionDashboards.containsKey(authkey)) 
                                subscriptionDashboards.put(authkey, new ArrayList<WebSocket>());
                                // avoid get on missing key
if(!subscriptionDashboards.get(authkey).contains(conn))     
                                subscriptionDashboards.get(authkey).add(conn);
  // avoid multiple entries for the same connection
System.out.println("\tAdding "+authkey+" to subDash");
}else{ // publish
  if(subscriptionDashboards.containsKey(authkey))
                                this.sendAll(s, subscriptionDashboards.get(authkey));
}
break;
case "device_control":
System.out.println(authkey+": received "+action+" device_control from "
                                           +conn.getRemoteSocketAddress().getAddress().getHostAddress());
if(action.equals("subscribe")){
// check if an entry exists for the given key
if(!subscriptionControllers.containsKey(authkey)) 
                                subscriptionControllers.put(authkey, new ArrayList<WebSocket>());
  // avoid get on missing key
if(!subscriptionControllers.get(authkey).contains(conn))   
                                subscriptionControllers.get(authkey).add(conn);
  // avoid multiple entries for the same connection
System.out.println("\tAdding "+authkey+" to subContr");
}else{ // publish
                            if(subscriptionControllers.containsKey(authkey))
                                this.sendAll(s, subscriptionControllers.get(authkey));
}
break;
}
}
} catch (ParseException e) {
// out of protocol format
System.out.println(conn.getRemoteSocketAddress().getAddress().getHostAddress()
                               +": out of protocol format!");
}
}
public void sendAll(String message, ArrayList<WebSocket> conns){
Iterator<WebSocket> i = conns.iterator();
while(i.hasNext()){
WebSocket s = i.next();
if(s.isOpen()){
s.send( message );
System.out.println("\tRouting to "
                      +s.getRemoteSocketAddress().getAddress().getHostAddress());
}
}
}

public boolean authenticateUser(String authkey){
boolean auth = false;
if(dbConn != null){
try {
Statement stmt = dbConn.createStatement();
// prevent sql injection by using a prepared statement
java.sql.PreparedStatement ps = dbConn.prepareStatement("SELECT * FROM advisor.user WHERE user.authkey = ?");
                ps.setString(1, authkey);
ResultSet rs = ps.executeQuery();
if(rs.next()){
auth = true;
}
} catch (SQLException e) {
e.printStackTrace();
}
}
return auth;
  }
}

The server main function might look like this then:

public static void main(String[] args) throws IOException, TimeoutException,     
                    ShutdownSignalException, ConsumerCancelledException, InterruptedException  {
    DeviceManager dm = new DeviceManager();
    Server srv = dm.new Server(8887);

    srv.setDBMSCredentials("localhost", 3306, "root", "", "advisor");
    srv.start();
    System.out.println("Websocket server started");
}

We can now test the server by subscribing for control events on one side and publishing control events on the other one. Let's do this in Python for simplicity.

import threading
import sys, os
from threading import Timer
from websocket import create_connection, WebSocketConnectionClosedException
import json
from plugwise import Stick, Circle, TimeoutException
import serial

self.ws = create_connection(self._control_interface)
print "connection with remote server established!"
# subscribe to control events for the given authentication key
self.ws.send(json.dumps({'authkey':'authkey-for-user',
                         'action': 'subscribe',
                         'type':'device_control'}))
print "successfully subscribed to device control events!"
# start a listener for control events
threading.Thread(target=self.listen_device_control_events).start()

This basically creates a connection to the server, sends a subscribe for 'device_control' events and spawns a thread listening for incoming events, which is implemented as follows:

def listen_device_control_events(self):
    try:
        while not self._exit: # listen for incoming commands until the script is voluntarily terminated
            try:
                received = json.loads( self.ws.recv() )
                if 'type' in received.keys() and received['type'] == 'device_control': # the 'is' is for object comparison
                    print 'received control event to ', received['payload']['state'], 'for', received['payload']['device_id']
                    self.set_circle(received['payload']['device_id'], int(received['payload']['state']))
            except ValueError as e:
                print e
            except WebSocketConnectionClosedException:
                print "the server closed the socket"
            else:
                self.ws.close()

Upon reception of control events, the code simply calls the method set_circle (defined in the python plug wise library) to control the switch status of connected ZigBee wireless nodes.

Now a device_status event can be published each time a device becomes available in the network, by simply calling the following function:

def update_circle_status(self, circle_id, status):
    self.ws.send(json.dumps({'authkey':self._settings['apikey'],
                             'action''publish',
                             'type':'device_status',
                             'payload':status}))

For instance, an event can be sent when initiating all ZigBee circles as follows:

self.circles = {}
    for c in self._settings['circles']:
        self.circles[c] = Circle(c, self.stick)
        self.update_circle_status(c, 1) # send all connected circles as available

Here is a simple implementation for the controlling client, which simply sends control events based on user's input:

from websocket import create_connection, WebSocketConnectionClosedException
import json
import threading

ws = create_connection("ws://143.205.116.250:8887")

while True:
    input = raw_input("Insert status of device")      

    ws.send(json.dumps({'authkey':'authkey-for-user',
'action': 'publish',
'type':'device_control',
'payload':{'device_id':'000D6F00035589CF','state':input}}))
ws.close()

The code was integrated into the Mjölnir gateway to provide an event mechanism for remote device control.

No comments:

Post a Comment