import { autorun } from "mobx"; import { serialize } from "serializr"; import * as WebSocket from "ws"; import { ErrorCode } from "@common/ErrorCode"; import * as rpc from "@common/jsonRpc"; import log from "@common/logger"; import { RpcError } from "@common/sprinklersRpc"; import * as deviceRequests from "@common/sprinklersRpc/deviceRequests"; import * as schema from "@common/sprinklersRpc/schema"; import * as ws from "@common/sprinklersRpc/websocketData"; import { AccessToken } from "@common/TokenClaims"; import { verifyToken } from "@server/authentication"; import { User } from "@server/entities"; import { WebSocketApi } from "./WebSocketApi"; type Disposer = () => void; export class WebSocketConnection { api: WebSocketApi; socket: WebSocket; disposers: Array<() => void> = []; // map of device id to disposer function deviceSubscriptions: Map = new Map(); /// This shall be the user id if the client has been authenticated, null otherwise userId: number | null = null; user: User | null = null; private requestHandlers: ws.ClientRequestHandlers = new WebSocketRequestHandlers(); get state() { return this.api.state; } constructor(api: WebSocketApi, socket: WebSocket) { this.api = api; this.socket = socket; this.socket.on("message", this.handleSocketMessage); this.socket.on("close", this.onClose); } stop = () => { this.socket.close(); }; onClose = (code: number, reason: string) => { log.debug({ code, reason }, "WebSocketConnection closing"); this.disposers.forEach(disposer => disposer()); this.deviceSubscriptions.forEach(disposer => disposer()); this.api.removeClient(this); }; subscribeBrokerConnection() { this.disposers.push( autorun(() => { const updateData: ws.IBrokerConnectionUpdate = { brokerConnected: this.state.mqttClient.connected }; this.sendNotification("brokerConnectionUpdate", updateData); }) ); } checkAuthorization() { if (!this.userId || !this.user) { throw new RpcError( "this WebSocket session has not been authenticated", ErrorCode.Unauthorized ); } } checkDevice(devId: string) { const userDevice = this.user!.devices!.find(dev => dev.deviceId === devId); if (userDevice == null) { throw new RpcError( "you do not have permission to subscribe to device", ErrorCode.NoPermission, { id: devId } ); } const deviceId = userDevice.deviceId; if (!deviceId) { throw new RpcError( "device has no associated device prefix", ErrorCode.Internal ); } return userDevice; } sendMessage(data: ws.ServerMessage) { this.socket.send(JSON.stringify(data)); } sendNotification( method: Method, data: ws.IServerNotificationTypes[Method] ) { this.sendMessage({ type: "notification", method, data }); } sendResponse( method: Method, id: number, data: ws.ServerResponseData ) { this.sendMessage({ type: "response", method, id, ...data }); } handleSocketMessage = (socketData: WebSocket.Data) => { this.doHandleSocketMessage(socketData).catch(err => { this.onError({ err }, "unhandled error on handling socket message"); }); }; async doDeviceCallRequest( requestData: ws.IDeviceCallRequest ): Promise { const userDevice = this.checkDevice(requestData.deviceId); const deviceId = userDevice.deviceId!; const device = this.state.mqttClient.acquireDevice(deviceId); try { const request = schema.requests.deserializeRequest(requestData.data); return await device.makeRequest(request); } finally { device.release(); } } private async doHandleSocketMessage(socketData: WebSocket.Data) { if (typeof socketData !== "string") { return this.onError( { type: typeof socketData }, "received invalid socket data type from client", ErrorCode.Parse ); } let data: ws.ClientMessage; try { data = JSON.parse(socketData); } catch (err) { return this.onError( { socketData, err }, "received invalid websocket message from client", ErrorCode.Parse ); } switch (data.type) { case "request": await this.handleRequest(data); break; default: return this.onError( { data }, "received invalid message type from client", ErrorCode.BadRequest ); } } private async handleRequest(request: ws.ClientRequest) { let response: ws.ServerResponseData; try { if (!this.requestHandlers[request.method]) { // noinspection ExceptionCaughtLocallyJS throw new RpcError("received invalid client request method"); } response = await rpc.handleRequest(this.requestHandlers, request, this); } catch (err) { if (err instanceof RpcError) { log.debug({ err }, "rpc error"); response = { result: "error", error: err.toJSON() }; } else { log.error( { method: request.method, err }, "unhandled error during processing of client request" ); response = { result: "error", error: { code: ErrorCode.Internal, message: "unhandled error during processing of client request", data: err.toString() } }; } } this.sendResponse(request.method, request.id, response); } private onError( data: any, message: string, code: number = ErrorCode.Internal ) { log.error(data, message); const errorData: ws.IError = { code, message, data }; this.sendNotification("error", errorData); } } class WebSocketRequestHandlers implements ws.ClientRequestHandlers { async authenticate( this: WebSocketConnection, data: ws.IAuthenticateRequest ): Promise> { if (!data.accessToken) { throw new RpcError("no token specified", ErrorCode.BadRequest); } let claims: AccessToken; try { claims = await verifyToken(data.accessToken, "access"); } catch (e) { throw new RpcError("invalid token", ErrorCode.BadToken, e); } this.userId = claims.aud; this.user = (await this.state.database.users.findById(this.userId, { devices: true })) || null; if (!this.user) { throw new RpcError("user no longer exists", ErrorCode.BadToken); } log.debug( { userId: claims.aud, name: claims.name }, "authenticated websocket client" ); this.subscribeBrokerConnection(); return { result: "success", data: { authenticated: true, message: "authenticated", user: this.user.toJSON() } }; } async deviceSubscribe( this: WebSocketConnection, data: ws.IDeviceSubscribeRequest ): Promise> { this.checkAuthorization(); const userDevice = this.checkDevice(data.deviceId); const deviceId = userDevice.deviceId!; if (!this.deviceSubscriptions.has(deviceId)) { const device = this.state.mqttClient.acquireDevice(deviceId); log.debug( { deviceId, userId: this.userId }, "websocket client subscribed to device" ); const autorunDisposer = autorun( () => { const json = serialize(schema.sprinklersDevice, device); log.trace({ device: json }); const updateData: ws.IDeviceUpdate = { deviceId, data: json }; this.sendNotification("deviceUpdate", updateData); }, { delay: 100 } ); this.deviceSubscriptions.set(deviceId, () => { autorunDisposer(); device.release(); this.deviceSubscriptions.delete(deviceId); }); } const response: ws.IDeviceSubscribeResponse = { deviceId }; return { result: "success", data: response }; } async deviceUnsubscribe( this: WebSocketConnection, data: ws.IDeviceSubscribeRequest ): Promise> { this.checkAuthorization(); const userDevice = this.checkDevice(data.deviceId); const deviceId = userDevice.deviceId!; const disposer = this.deviceSubscriptions.get(deviceId); if (disposer) { disposer(); } const response: ws.IDeviceSubscribeResponse = { deviceId }; return { result: "success", data: response }; } async deviceCall( this: WebSocketConnection, data: ws.IDeviceCallRequest ): Promise> { this.checkAuthorization(); try { const response = await this.doDeviceCallRequest(data); const resData: ws.IDeviceCallResponse = { data: response }; return { result: "success", data: resData }; } catch (err) { const e: deviceRequests.ErrorResponseData = err; throw new RpcError(e.message, e.code, e); } } }