diff --git a/app/sprinklers/websocket.ts b/app/sprinklers/websocket.ts index b93a252..50d6c21 100644 --- a/app/sprinklers/websocket.ts +++ b/app/sprinklers/websocket.ts @@ -17,20 +17,36 @@ const RECONNECT_TIMEOUT_MS = 5000; export class WSSprinklersDevice extends s.SprinklersDevice { readonly api: WebSocketApiClient; - constructor(api: WebSocketApiClient) { + private _id: string; + + constructor(api: WebSocketApiClient, id: string) { super(); this.api = api; + this._id = id; autorun(() => { this.connectionState.serverToBroker = api.connectionState.serverToBroker; this.connectionState.clientToServer = api.connectionState.clientToServer; if (!api.connectionState.isConnected) { this.connectionState.brokerToDevice = null; + } else { + this.subscribe(); } }); } get id() { - return "grinklers"; + return this._id; + } + + subscribe() { + if (!this.api.socket) { + throw new Error("WebSocket not connected"); + } + const subscribeRequest: ws.IDeviceSubscribeRequest = { + type: "deviceSubscribeRequest", + deviceId: this.id, + }; + this.api.socket.send(JSON.stringify(subscribeRequest)); } makeRequest(request: requests.Request): Promise { @@ -40,14 +56,15 @@ export class WSSprinklersDevice extends s.SprinklersDevice { export class WebSocketApiClient implements s.ISprinklersApi { readonly webSocketUrl: string; - device: WSSprinklersDevice; + + devices: Map = new Map(); nextDeviceRequestId = Math.round(Math.random() * 1000000); deviceResponseCallbacks: { [id: number]: (res: ws.IDeviceCallResponse) => void | undefined; } = {}; @observable connectionState: s.ConnectionState = new s.ConnectionState(); - private socket: WebSocket | null = null; + socket: WebSocket | null = null; private reconnectTimer: number | null = null; get connected(): boolean { @@ -56,7 +73,6 @@ export class WebSocketApiClient implements s.ISprinklersApi { constructor(webSocketUrl: string) { this.webSocketUrl = webSocketUrl; - this.device = new WSSprinklersDevice(this); this.connectionState.clientToServer = false; this.connectionState.serverToBroker = false; } @@ -77,19 +93,21 @@ export class WebSocketApiClient implements s.ISprinklersApi { } } - getDevice(name: string): s.SprinklersDevice { - if (name !== "grinklers") { - throw new Error("Devices which are not grinklers are not supported yet"); + getDevice(id: string): s.SprinklersDevice { + let device = this.devices.get(id); + if (!device) { + device = new WSSprinklersDevice(this, id); + this.devices.set(id, device); } - return this.device; + return device; } - removeDevice(name: string) { + removeDevice(id: string) { // NOT IMPLEMENTED } // args must all be JSON serializable - makeDeviceCall(deviceName: string, request: requests.Request): Promise { + makeDeviceCall(deviceId: string, request: requests.Request): Promise { if (this.socket == null) { const res: requests.Response = { type: request.type, @@ -103,7 +121,7 @@ export class WebSocketApiClient implements s.ISprinklersApi { const id = this.nextDeviceRequestId++; const data: ws.IDeviceCallRequest = { type: "deviceCallRequest", - id, deviceName, data: requestData, + id, deviceId, data: requestData, }; const promise = new Promise((resolve, reject) => { let timeoutHandle: number; @@ -194,10 +212,11 @@ export class WebSocketApiClient implements s.ISprinklersApi { } private onDeviceUpdate(data: ws.IDeviceUpdate) { - if (data.name !== "grinklers") { + const device = this.devices.get(data.deviceId); + if (!device) { return log.warn({ data }, "invalid deviceUpdate received"); } - update(schema.sprinklersDevice, this.device, data.data); + update(schema.sprinklersDevice, device, data.data); } private onDeviceCallResponse(data: ws.IDeviceCallResponse) { diff --git a/common/sprinklers/mqtt/index.ts b/common/sprinklers/mqtt/index.ts index 3054ab0..98ce8ea 100644 --- a/common/sprinklers/mqtt/index.ts +++ b/common/sprinklers/mqtt/index.ts @@ -53,13 +53,13 @@ export class MqttApiClient implements s.ISprinklersApi { }); } - getDevice(prefix: string): s.SprinklersDevice { - if (/\//.test(prefix)) { - throw new Error("Prefix cannot contain a /"); + getDevice(id: string): s.SprinklersDevice { + if (/\//.test(id)) { + throw new Error("Device id cannot contain a /"); } - let device = this.devices.get(prefix); + let device = this.devices.get(id); if (!device) { - this.devices.set(prefix, device = new MqttSprinklersDevice(this, prefix)); + this.devices.set(id, device = new MqttSprinklersDevice(this, id)); if (this.connected) { device.doSubscribe(); } @@ -67,13 +67,13 @@ export class MqttApiClient implements s.ISprinklersApi { return device; } - removeDevice(prefix: string) { - const device = this.devices.get(prefix); + removeDevice(id: string) { + const device = this.devices.get(id); if (!device) { return; } device.doUnsubscribe(); - this.devices.delete(prefix); + this.devices.delete(id); } private onMessageArrived(topic: string, payload: Buffer, packet: mqtt.Packet) { diff --git a/common/sprinklers/websocketData.ts b/common/sprinklers/websocketData.ts index 43f39f0..0b67f79 100644 --- a/common/sprinklers/websocketData.ts +++ b/common/sprinklers/websocketData.ts @@ -1,8 +1,14 @@ import { Response as ResponseData } from "@common/sprinklers/requests"; +export interface IError { + type: "error"; + message: string; + data: any; +} + export interface IDeviceUpdate { type: "deviceUpdate"; - name: string; + deviceId: string; data: any; } @@ -17,13 +23,20 @@ export interface IBrokerConnectionUpdate { brokerConnected: boolean; } -export type IServerMessage = IDeviceUpdate | IDeviceCallResponse | IBrokerConnectionUpdate; +export type IServerMessage = IError | IDeviceUpdate | IDeviceCallResponse | IBrokerConnectionUpdate; + +export type SubscriptionType = "deviceUpdate" | "brokerConnectionUpdate"; + +export interface IDeviceSubscribeRequest { + type: "deviceSubscribeRequest"; + deviceId: string; +} export interface IDeviceCallRequest { type: "deviceCallRequest"; id: number; - deviceName: string; + deviceId: string; data: any; } -export type IClientMessage = IDeviceCallRequest; +export type IClientMessage = IDeviceSubscribeRequest | IDeviceCallRequest; diff --git a/server/models/User.ts b/server/models/User.ts index 00d6ce3..26bb300 100644 --- a/server/models/User.ts +++ b/server/models/User.ts @@ -1,7 +1,8 @@ +import * as bcrypt from "bcrypt"; import * as r from "rethinkdb"; import { createModelSchema, deserialize, primitive, serialize, update } from "serializr"; + import { Database } from "./Database"; -import * as bcrypt from "bcrypt"; export interface IUser { id: string | undefined; diff --git a/server/websocket/index.ts b/server/websocket/index.ts index d9b6d7a..c695eb9 100644 --- a/server/websocket/index.ts +++ b/server/websocket/index.ts @@ -1,66 +1,104 @@ +import { autorun } from "mobx"; +import { serialize } from "serializr"; +import * as WebSocket from "ws"; + import log from "@common/logger"; import * as requests from "@common/sprinklers/requests"; import * as schema from "@common/sprinklers/schema"; import * as ws from "@common/sprinklers/websocketData"; -import { autorun } from "mobx"; -import { serialize } from "serializr"; -import * as WebSocket from "ws"; import { ServerState } from "../state"; -export class WebSocketApi { - state: ServerState; +export class WebSocketClient { + api: WebSocketApi; + socket: WebSocket; - constructor(state: ServerState) { - this.state = state; + disposers: Array<() => void> = []; + deviceSubscriptions: string[] = []; + + /// This shall be the user id if the client has been authenticated, null otherwise + userId: string | null = null; + + get state() { + return this.api.state; } - listen(webSocketServer: WebSocket.Server) { - webSocketServer.on("connection", this.handleConnection); + constructor(api: WebSocketApi, socket: WebSocket) { + this.api = api; + this.socket = socket; } - handleConnection = (socket: WebSocket) => { - const disposers = [ - autorun(() => { - const json = serialize(schema.sprinklersDevice, this.state.device); - log.trace({ device: json }); - const data: ws.IDeviceUpdate = { type: "deviceUpdate", name: "grinklers", data: json }; - socket.send(JSON.stringify(data)); - }, { delay: 100 }), - autorun(() => { - const data: ws.IBrokerConnectionUpdate = { - type: "brokerConnectionUpdate", - brokerConnected: this.state.mqttClient.connected, - }; - socket.send(JSON.stringify(data)); - }), - ]; - const stop = () => { - disposers.forEach((disposer) => disposer()); - }; - socket.on("message", (data) => this.handleSocketMessage(socket, data)); - socket.on("close", () => stop()); + start() { + this.disposers.push(autorun(() => { + const updateData: ws.IBrokerConnectionUpdate = { + type: "brokerConnectionUpdate", + brokerConnected: this.state.mqttClient.connected, + }; + this.socket.send(JSON.stringify(updateData)); + })); + this.socket.on("message", this.handleSocketMessage); + this.socket.on("close", this.stop); } - private handleSocketMessage(socket: WebSocket, socketData: WebSocket.Data) { + stop = () => { + this.disposers.forEach((disposer) => disposer()); + this.api.removeClient(this); + } + + private handleSocketMessage = (socketData: WebSocket.Data) => { + this.doHandleSocketMessage(socketData) + .catch((err) => { + this.onError({ err }, "unhandled error on handling socket message"); + }); + } + + private async doHandleSocketMessage(socketData: WebSocket.Data) { if (typeof socketData !== "string") { - return log.error({ type: typeof socketData }, "received invalid socket data type from client"); + return this.onError({ type: typeof socketData }, "received invalid socket data type from client"); } let data: ws.IClientMessage; try { data = JSON.parse(socketData); } catch (err) { - return log.error({ event, err }, "received invalid websocket message from client"); + return this.onError({ event, err }, "received invalid websocket message from client"); } switch (data.type) { + case "deviceSubscribeRequest": + this.deviceSubscribeRequest(data); + break; case "deviceCallRequest": - this.deviceCallRequest(socket, data); + await this.deviceCallRequest(data); break; default: - return log.warn({ data }, "received invalid client message type"); + return this.onError({ data }, "received invalid client message type"); } } - private async deviceCallRequest(socket: WebSocket, data: ws.IDeviceCallRequest): Promise { + private onError(data: any, message: string) { + log.error(data, message); + const errorData: ws.IError = { + type: "error", message, data, + }; + this.socket.send(JSON.stringify(errorData)); + } + + private deviceSubscribeRequest(data: ws.IDeviceSubscribeRequest) { + // TODO: somehow validate this device id? + const deviceId = data.deviceId; + if (this.deviceSubscriptions.indexOf(deviceId) !== -1) { + return; + } + this.deviceSubscriptions.push(deviceId); + const device = this.state.mqttClient.getDevice(deviceId); + log.debug({ deviceId, userId: this.userId }, "websocket client subscribed to device"); + this.disposers.push(autorun(() => { + const json = serialize(schema.sprinklersDevice, device); + log.trace({ device: json }); + const updateData: ws.IDeviceUpdate = { type: "deviceUpdate", deviceId, data: json }; + this.socket.send(JSON.stringify(updateData)); + }, { delay: 100 })); + } + + private async deviceCallRequest(data: ws.IDeviceCallRequest): Promise { let response: requests.Response | false; try { response = await this.doDeviceCallRequest(data); @@ -73,13 +111,13 @@ export class WebSocketApi { id: data.id, data: response, }; - socket.send(JSON.stringify(resData)); + this.socket.send(JSON.stringify(resData)); } } private async doDeviceCallRequest(requestData: ws.IDeviceCallRequest): Promise { - const { deviceName, data } = requestData; - if (deviceName !== "grinklers") { + const { deviceId, data } = requestData; + if (deviceId !== "grinklers") { // error handling? or just get the right device return false; } @@ -87,3 +125,29 @@ export class WebSocketApi { return this.state.device.makeRequest(request); } } + +export class WebSocketApi { + state: ServerState; + clients: WebSocketClient[] = []; + + constructor(state: ServerState) { + this.state = state; + } + + listen(webSocketServer: WebSocket.Server) { + webSocketServer.on("connection", this.handleConnection); + } + + handleConnection = (socket: WebSocket) => { + const client = new WebSocketClient(this, socket); + client.start(); + this.clients.push(client); + } + + removeClient(client: WebSocketClient) { + const idx = this.clients.indexOf(client); + if (idx !== -1) { + this.clients.splice(idx, 1); + } + } +}