Browse Source

Added framework support for multiple devices

update-deps
Alex Mikhalev 7 years ago
parent
commit
ad6306ad6e
  1. 47
      app/sprinklers/websocket.ts
  2. 16
      common/sprinklers/mqtt/index.ts
  3. 21
      common/sprinklers/websocketData.ts
  4. 3
      server/models/User.ts
  5. 136
      server/websocket/index.ts

47
app/sprinklers/websocket.ts

@ -17,20 +17,36 @@ const RECONNECT_TIMEOUT_MS = 5000; @@ -17,20 +17,36 @@ const RECONNECT_TIMEOUT_MS = 5000;
export class WSSprinklersDevice extends s.SprinklersDevice {
readonly api: WebSocketApiClient;
constructor(api: WebSocketApiClient) {
private _id: string;
constructor(api: WebSocketApiClient, id: string) {
super();
this.api = api;
this._id = id;
autorun(() => {
this.connectionState.serverToBroker = api.connectionState.serverToBroker;
this.connectionState.clientToServer = api.connectionState.clientToServer;
if (!api.connectionState.isConnected) {
this.connectionState.brokerToDevice = null;
} else {
this.subscribe();
}
});
}
get id() {
return "grinklers";
return this._id;
}
subscribe() {
if (!this.api.socket) {
throw new Error("WebSocket not connected");
}
const subscribeRequest: ws.IDeviceSubscribeRequest = {
type: "deviceSubscribeRequest",
deviceId: this.id,
};
this.api.socket.send(JSON.stringify(subscribeRequest));
}
makeRequest(request: requests.Request): Promise<requests.Response> {
@ -40,14 +56,15 @@ export class WSSprinklersDevice extends s.SprinklersDevice { @@ -40,14 +56,15 @@ export class WSSprinklersDevice extends s.SprinklersDevice {
export class WebSocketApiClient implements s.ISprinklersApi {
readonly webSocketUrl: string;
device: WSSprinklersDevice;
devices: Map<string, WSSprinklersDevice> = new Map();
nextDeviceRequestId = Math.round(Math.random() * 1000000);
deviceResponseCallbacks: { [id: number]: (res: ws.IDeviceCallResponse) => void | undefined; } = {};
@observable connectionState: s.ConnectionState = new s.ConnectionState();
private socket: WebSocket | null = null;
socket: WebSocket | null = null;
private reconnectTimer: number | null = null;
get connected(): boolean {
@ -56,7 +73,6 @@ export class WebSocketApiClient implements s.ISprinklersApi { @@ -56,7 +73,6 @@ export class WebSocketApiClient implements s.ISprinklersApi {
constructor(webSocketUrl: string) {
this.webSocketUrl = webSocketUrl;
this.device = new WSSprinklersDevice(this);
this.connectionState.clientToServer = false;
this.connectionState.serverToBroker = false;
}
@ -77,19 +93,21 @@ export class WebSocketApiClient implements s.ISprinklersApi { @@ -77,19 +93,21 @@ export class WebSocketApiClient implements s.ISprinklersApi {
}
}
getDevice(name: string): s.SprinklersDevice {
if (name !== "grinklers") {
throw new Error("Devices which are not grinklers are not supported yet");
getDevice(id: string): s.SprinklersDevice {
let device = this.devices.get(id);
if (!device) {
device = new WSSprinklersDevice(this, id);
this.devices.set(id, device);
}
return this.device;
return device;
}
removeDevice(name: string) {
removeDevice(id: string) {
// NOT IMPLEMENTED
}
// args must all be JSON serializable
makeDeviceCall(deviceName: string, request: requests.Request): Promise<requests.Response> {
makeDeviceCall(deviceId: string, request: requests.Request): Promise<requests.Response> {
if (this.socket == null) {
const res: requests.Response = {
type: request.type,
@ -103,7 +121,7 @@ export class WebSocketApiClient implements s.ISprinklersApi { @@ -103,7 +121,7 @@ export class WebSocketApiClient implements s.ISprinklersApi {
const id = this.nextDeviceRequestId++;
const data: ws.IDeviceCallRequest = {
type: "deviceCallRequest",
id, deviceName, data: requestData,
id, deviceId, data: requestData,
};
const promise = new Promise<requests.Response>((resolve, reject) => {
let timeoutHandle: number;
@ -194,10 +212,11 @@ export class WebSocketApiClient implements s.ISprinklersApi { @@ -194,10 +212,11 @@ export class WebSocketApiClient implements s.ISprinklersApi {
}
private onDeviceUpdate(data: ws.IDeviceUpdate) {
if (data.name !== "grinklers") {
const device = this.devices.get(data.deviceId);
if (!device) {
return log.warn({ data }, "invalid deviceUpdate received");
}
update(schema.sprinklersDevice, this.device, data.data);
update(schema.sprinklersDevice, device, data.data);
}
private onDeviceCallResponse(data: ws.IDeviceCallResponse) {

16
common/sprinklers/mqtt/index.ts

@ -53,13 +53,13 @@ export class MqttApiClient implements s.ISprinklersApi { @@ -53,13 +53,13 @@ export class MqttApiClient implements s.ISprinklersApi {
});
}
getDevice(prefix: string): s.SprinklersDevice {
if (/\//.test(prefix)) {
throw new Error("Prefix cannot contain a /");
getDevice(id: string): s.SprinklersDevice {
if (/\//.test(id)) {
throw new Error("Device id cannot contain a /");
}
let device = this.devices.get(prefix);
let device = this.devices.get(id);
if (!device) {
this.devices.set(prefix, device = new MqttSprinklersDevice(this, prefix));
this.devices.set(id, device = new MqttSprinklersDevice(this, id));
if (this.connected) {
device.doSubscribe();
}
@ -67,13 +67,13 @@ export class MqttApiClient implements s.ISprinklersApi { @@ -67,13 +67,13 @@ export class MqttApiClient implements s.ISprinklersApi {
return device;
}
removeDevice(prefix: string) {
const device = this.devices.get(prefix);
removeDevice(id: string) {
const device = this.devices.get(id);
if (!device) {
return;
}
device.doUnsubscribe();
this.devices.delete(prefix);
this.devices.delete(id);
}
private onMessageArrived(topic: string, payload: Buffer, packet: mqtt.Packet) {

21
common/sprinklers/websocketData.ts

@ -1,8 +1,14 @@ @@ -1,8 +1,14 @@
import { Response as ResponseData } from "@common/sprinklers/requests";
export interface IError {
type: "error";
message: string;
data: any;
}
export interface IDeviceUpdate {
type: "deviceUpdate";
name: string;
deviceId: string;
data: any;
}
@ -17,13 +23,20 @@ export interface IBrokerConnectionUpdate { @@ -17,13 +23,20 @@ export interface IBrokerConnectionUpdate {
brokerConnected: boolean;
}
export type IServerMessage = IDeviceUpdate | IDeviceCallResponse | IBrokerConnectionUpdate;
export type IServerMessage = IError | IDeviceUpdate | IDeviceCallResponse | IBrokerConnectionUpdate;
export type SubscriptionType = "deviceUpdate" | "brokerConnectionUpdate";
export interface IDeviceSubscribeRequest {
type: "deviceSubscribeRequest";
deviceId: string;
}
export interface IDeviceCallRequest {
type: "deviceCallRequest";
id: number;
deviceName: string;
deviceId: string;
data: any;
}
export type IClientMessage = IDeviceCallRequest;
export type IClientMessage = IDeviceSubscribeRequest | IDeviceCallRequest;

3
server/models/User.ts

@ -1,7 +1,8 @@ @@ -1,7 +1,8 @@
import * as bcrypt from "bcrypt";
import * as r from "rethinkdb";
import { createModelSchema, deserialize, primitive, serialize, update } from "serializr";
import { Database } from "./Database";
import * as bcrypt from "bcrypt";
export interface IUser {
id: string | undefined;

136
server/websocket/index.ts

@ -1,66 +1,104 @@ @@ -1,66 +1,104 @@
import { autorun } from "mobx";
import { serialize } from "serializr";
import * as WebSocket from "ws";
import log from "@common/logger";
import * as requests from "@common/sprinklers/requests";
import * as schema from "@common/sprinklers/schema";
import * as ws from "@common/sprinklers/websocketData";
import { autorun } from "mobx";
import { serialize } from "serializr";
import * as WebSocket from "ws";
import { ServerState } from "../state";
export class WebSocketApi {
state: ServerState;
export class WebSocketClient {
api: WebSocketApi;
socket: WebSocket;
constructor(state: ServerState) {
this.state = state;
disposers: Array<() => void> = [];
deviceSubscriptions: string[] = [];
/// This shall be the user id if the client has been authenticated, null otherwise
userId: string | null = null;
get state() {
return this.api.state;
}
listen(webSocketServer: WebSocket.Server) {
webSocketServer.on("connection", this.handleConnection);
constructor(api: WebSocketApi, socket: WebSocket) {
this.api = api;
this.socket = socket;
}
handleConnection = (socket: WebSocket) => {
const disposers = [
autorun(() => {
const json = serialize(schema.sprinklersDevice, this.state.device);
log.trace({ device: json });
const data: ws.IDeviceUpdate = { type: "deviceUpdate", name: "grinklers", data: json };
socket.send(JSON.stringify(data));
}, { delay: 100 }),
autorun(() => {
const data: ws.IBrokerConnectionUpdate = {
start() {
this.disposers.push(autorun(() => {
const updateData: ws.IBrokerConnectionUpdate = {
type: "brokerConnectionUpdate",
brokerConnected: this.state.mqttClient.connected,
};
socket.send(JSON.stringify(data));
}),
];
const stop = () => {
disposers.forEach((disposer) => disposer());
};
socket.on("message", (data) => this.handleSocketMessage(socket, data));
socket.on("close", () => stop());
this.socket.send(JSON.stringify(updateData));
}));
this.socket.on("message", this.handleSocketMessage);
this.socket.on("close", this.stop);
}
private handleSocketMessage(socket: WebSocket, socketData: WebSocket.Data) {
stop = () => {
this.disposers.forEach((disposer) => disposer());
this.api.removeClient(this);
}
private handleSocketMessage = (socketData: WebSocket.Data) => {
this.doHandleSocketMessage(socketData)
.catch((err) => {
this.onError({ err }, "unhandled error on handling socket message");
});
}
private async doHandleSocketMessage(socketData: WebSocket.Data) {
if (typeof socketData !== "string") {
return log.error({ type: typeof socketData }, "received invalid socket data type from client");
return this.onError({ type: typeof socketData }, "received invalid socket data type from client");
}
let data: ws.IClientMessage;
try {
data = JSON.parse(socketData);
} catch (err) {
return log.error({ event, err }, "received invalid websocket message from client");
return this.onError({ event, err }, "received invalid websocket message from client");
}
switch (data.type) {
case "deviceSubscribeRequest":
this.deviceSubscribeRequest(data);
break;
case "deviceCallRequest":
this.deviceCallRequest(socket, data);
await this.deviceCallRequest(data);
break;
default:
return log.warn({ data }, "received invalid client message type");
return this.onError({ data }, "received invalid client message type");
}
}
private onError(data: any, message: string) {
log.error(data, message);
const errorData: ws.IError = {
type: "error", message, data,
};
this.socket.send(JSON.stringify(errorData));
}
private deviceSubscribeRequest(data: ws.IDeviceSubscribeRequest) {
// TODO: somehow validate this device id?
const deviceId = data.deviceId;
if (this.deviceSubscriptions.indexOf(deviceId) !== -1) {
return;
}
this.deviceSubscriptions.push(deviceId);
const device = this.state.mqttClient.getDevice(deviceId);
log.debug({ deviceId, userId: this.userId }, "websocket client subscribed to device");
this.disposers.push(autorun(() => {
const json = serialize(schema.sprinklersDevice, device);
log.trace({ device: json });
const updateData: ws.IDeviceUpdate = { type: "deviceUpdate", deviceId, data: json };
this.socket.send(JSON.stringify(updateData));
}, { delay: 100 }));
}
private async deviceCallRequest(socket: WebSocket, data: ws.IDeviceCallRequest): Promise<void> {
private async deviceCallRequest(data: ws.IDeviceCallRequest): Promise<void> {
let response: requests.Response | false;
try {
response = await this.doDeviceCallRequest(data);
@ -73,13 +111,13 @@ export class WebSocketApi { @@ -73,13 +111,13 @@ export class WebSocketApi {
id: data.id,
data: response,
};
socket.send(JSON.stringify(resData));
this.socket.send(JSON.stringify(resData));
}
}
private async doDeviceCallRequest(requestData: ws.IDeviceCallRequest): Promise<requests.Response | false> {
const { deviceName, data } = requestData;
if (deviceName !== "grinklers") {
const { deviceId, data } = requestData;
if (deviceId !== "grinklers") {
// error handling? or just get the right device
return false;
}
@ -87,3 +125,29 @@ export class WebSocketApi { @@ -87,3 +125,29 @@ export class WebSocketApi {
return this.state.device.makeRequest(request);
}
}
export class WebSocketApi {
state: ServerState;
clients: WebSocketClient[] = [];
constructor(state: ServerState) {
this.state = state;
}
listen(webSocketServer: WebSocket.Server) {
webSocketServer.on("connection", this.handleConnection);
}
handleConnection = (socket: WebSocket) => {
const client = new WebSocketClient(this, socket);
client.start();
this.clients.push(client);
}
removeClient(client: WebSocketClient) {
const idx = this.clients.indexOf(client);
if (idx !== -1) {
this.clients.splice(idx, 1);
}
}
}

Loading…
Cancel
Save