The advent of web sockets opened to full-duplex communication channels between browsers and web servers. Being based on the port 80. this has also the benefit to bypass firewall policies. Perfect application for this technology are real-time messaging, such as multiplayer games and chats.
We discuss hereby an example of a client and server to remotely control IoT devices based on web sockets. The server uses jdbc to connect to an internal MySQL database for authentication/authorization, the simplejson java library to format messages, as well as the org.java-websocket 1.3.0 library for its networking aspects.
class Server extends WebSocketServer{
private java.sql.Connection dbConn;
private Map< WebSocket, String> authenticatedConnections;
private Map<String, ArrayList<WebSocket>> subscriptionControllers;
private Map<String, ArrayList<WebSocket>> subscriptionDashboards;
public Server(int port) throws UnknownHostException {
super(new InetSocketAddress(port));
authenticatedConnections = new HashMap< WebSocket, String>();
subscriptionControllers = new HashMap<String, ArrayList<WebSocket>>();
subscriptionDashboards = new HashMap<String, ArrayList<WebSocket>>();
}
public void setDBMSCredentials(String host, int port, String user, String passwd, String db){
try {
dbConn = DriverManager.getConnection("jdbc:" + "mysql" + "://" + host + "/" + db , user, passwd);
System.out.println("Connected to database "+db+" at "+host);
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
public void onOpen(WebSocket conn, ClientHandshake arg1) {
System.out.println( conn.getRemoteSocketAddress().getAddress().getHostAddress() + " connected!" );
}
@Override
public void onClose(WebSocket conn, int arg1, String arg2, boolean arg3) {
System.out.println( conn.getRemoteSocketAddress().getAddress().getHostAddress() + " has disconnected!" );
// remove the connection from both the authenticated and the subscribed ones if available
if(authenticatedConnections.containsKey(conn)){
String authkey = authenticatedConnections.get(conn);
// remove also from the subscriptions
if(subscriptionDashboards.containsKey(authkey)){
subscriptionDashboards.get(authkey).remove(conn); // remove the subscription if available
}
if(subscriptionControllers.containsKey(authkey)){
subscriptionControllers.get(authkey).remove(conn); // remove the subscription if available
}
}
}
@Override
public void onError(WebSocket conn, Exception arg1) {
//arg1.printStackTrace();
System.out.println("Error while interacting with "+conn.getRemoteSocketAddress().getAddress().getHostAddress()+":\n\t"+arg1.getMessage());
}
@Override
public void onMessage(WebSocket conn, String s) {
JSONParser parser = new JSONParser();
try {
JSONObject obj = (JSONObject) parser.parse(s);
// check authentication key
String authkey = (String) obj.get("authkey");
String type = (String) obj.get("type");
String action = (String) obj.get("action");
// authenticate the user first
if(authenticatedConnections.containsKey(conn) ||
this.authenticateUser(authkey)){
// check if the connection was already authenticated
if(!authenticatedConnections.containsKey(conn))
authenticatedConnections.put(conn, authkey);
authenticatedConnections.put(conn, authkey);
// check the operation type
switch(type){
case "device_status":
System.out.println(authkey+": received "+action+" device_status from "
+conn.getRemoteSocketAddress().getAddress().getHostAddress());
+conn.getRemoteSocketAddress().getAddress().getHostAddress());
if(action.equals("subscribe")){
// check if an entry exists for the given key
if(!subscriptionDashboards.containsKey(authkey))
subscriptionDashboards.put(authkey, new ArrayList<WebSocket>());
// avoid get on missing key
subscriptionDashboards.put(authkey, new ArrayList<WebSocket>());
// avoid get on missing key
if(!subscriptionDashboards.get(authkey).contains(conn))
subscriptionDashboards.get(authkey).add(conn);
// avoid multiple entries for the same connection
System.out.println("\tAdding "+authkey+" to subDash");
}else{ // publish
if(subscriptionDashboards.containsKey(authkey))
this.sendAll(s, subscriptionDashboards.get(authkey));
this.sendAll(s, subscriptionDashboards.get(authkey));
}
break;
case "device_control":
System.out.println(authkey+": received "+action+" device_control from "
+conn.getRemoteSocketAddress().getAddress().getHostAddress());
+conn.getRemoteSocketAddress().getAddress().getHostAddress());
if(action.equals("subscribe")){
// check if an entry exists for the given key
if(!subscriptionControllers.containsKey(authkey))
subscriptionControllers.put(authkey, new ArrayList<WebSocket>());
// avoid get on missing key
subscriptionControllers.put(authkey, new ArrayList<WebSocket>());
// avoid get on missing key
if(!subscriptionControllers.get(authkey).contains(conn))
subscriptionControllers.get(authkey).add(conn);
// avoid multiple entries for the same connection
System.out.println("\tAdding "+authkey+" to subContr");
}else{ // publish
if(subscriptionControllers.containsKey(authkey))
this.sendAll(s, subscriptionControllers.get(authkey));
this.sendAll(s, subscriptionControllers.get(authkey));
}
break;
}
}
} catch (ParseException e) {
// out of protocol format
System.out.println(conn.getRemoteSocketAddress().getAddress().getHostAddress()
+": out of protocol format!");
+": out of protocol format!");
}
}
public void sendAll(String message, ArrayList<WebSocket> conns){
Iterator<WebSocket> i = conns.iterator();
while(i.hasNext()){
WebSocket s = i.next();
if(s.isOpen()){
s.send( message );
System.out.println("\tRouting to "
+s.getRemoteSocketAddress().getAddress().getHostAddress());
+s.getRemoteSocketAddress().getAddress().getHostAddress());
}
}
}
public boolean authenticateUser(String authkey){
boolean auth = false;
if(dbConn != null){
try {
Statement stmt = dbConn.createStatement();
// prevent sql injection by using a prepared statement
java.sql.PreparedStatement ps = dbConn.prepareStatement("SELECT * FROM advisor.user WHERE user.authkey = ?");
ps.setString(1, authkey);
ResultSet rs = ps.executeQuery();
if(rs.next()){
auth = true;
}
} catch (SQLException e) {
e.printStackTrace();
}
}
return auth;
}
}
The server main function might look like this then:
public static void main(String[] args) throws IOException, TimeoutException,
ShutdownSignalException, ConsumerCancelledException, InterruptedException {
DeviceManager dm = new DeviceManager();
Server srv = dm.new Server(8887);
srv.setDBMSCredentials("localhost", 3306, "root", "", "advisor");
srv.start();
System.out.println("Websocket server started");
}
import threading
import sys, os
from threading import Timer
from websocket import create_connection, WebSocketConnectionClosedException
import json
from plugwise import Stick, Circle, TimeoutException
import serial
self.ws = create_connection(self._control_interface)
print "connection with remote server established!"
# subscribe to control events for the given authentication key
self.ws.send(json.dumps({'authkey':'authkey-for-user',
'action': 'subscribe',
'type':'device_control'}))
print "successfully subscribed to device control events!"
# start a listener for control events
threading.Thread(target=self.listen_device_control_events).start()
This basically creates a connection to the server, sends a subscribe for 'device_control' events and spawns a thread listening for incoming events, which is implemented as follows:
def listen_device_control_events(self):
try:
while not self._exit: # listen for incoming commands until the script is voluntarily terminated
try:
received = json.loads( self.ws.recv() )
if 'type' in received.keys() and received['type'] == 'device_control': # the 'is' is for object comparison
print 'received control event to ', received['payload']['state'], 'for', received['payload']['device_id']
self.set_circle(received['payload']['device_id'], int(received['payload']['state']))
except ValueError as e:
print e
except WebSocketConnectionClosedException:
print "the server closed the socket"
else:
self.ws.close()
Now a device_status event can be published each time a device becomes available in the network, by simply calling the following function:
def update_circle_status(self, circle_id, status):
self.ws.send(json.dumps({'authkey':self._settings['apikey'],
'action': 'publish',
'type':'device_status',
'payload':status}))
For instance, an event can be sent when initiating all ZigBee circles as follows:
self.circles = {}
for c in self._settings['circles']:
self.circles[c] = Circle(c, self.stick)
self.update_circle_status(c, 1) # send all connected circles as available
Here is a simple implementation for the controlling client, which simply sends control events based on user's input:
from websocket import create_connection, WebSocketConnectionClosedException
import json
import threading
ws = create_connection("ws://143.205.116.250:8887")
while True:
input = raw_input("Insert status of device")
ws.send(json.dumps({'authkey':'authkey-for-user',
'action': 'publish',
'type':'device_control',
'payload':{'device_id':'000D6F00035589CF','state':input}}))
ws.close()
No comments:
Post a Comment