From a179d69241bdadd524b9f92508452570a293e831 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Sat, 30 Jun 2018 21:18:41 -0600 Subject: [PATCH] Vastly cleaned up websocket data format, made it a lot more like json rpc --- app/components/RunSectionForm.tsx | 2 +- app/sprinklers/websocket.ts | 171 ++++++++------- common/jsonRpc/index.ts | 155 ++++++++++++++ common/sprinklers/ErrorCode.ts | 3 + common/sprinklers/SprinklersDevice.ts | 18 +- .../{requests.ts => deviceRequests.ts} | 0 common/sprinklers/mqtt/index.ts | 2 +- common/sprinklers/schema/requests.ts | 2 +- common/sprinklers/websocketData.ts | 80 ++++--- package.json | 2 - server/express/authentication.ts | 5 +- server/express/index.ts | 8 +- server/models/Database.ts | 4 +- server/models/User.ts | 8 +- server/state.ts | 3 - server/websocket/index.ts | 195 ++++++++++++------ yarn.lock | 2 +- 17 files changed, 464 insertions(+), 196 deletions(-) create mode 100644 common/jsonRpc/index.ts rename common/sprinklers/{requests.ts => deviceRequests.ts} (100%) diff --git a/app/components/RunSectionForm.tsx b/app/components/RunSectionForm.tsx index f7231f1..4bc0a3c 100644 --- a/app/components/RunSectionForm.tsx +++ b/app/components/RunSectionForm.tsx @@ -7,7 +7,7 @@ import { UiStore } from "@app/state"; import { Duration } from "@common/Duration"; import log from "@common/logger"; import { Section, SprinklersDevice } from "@common/sprinklers"; -import { RunSectionResponse } from "@common/sprinklers/requests"; +import { RunSectionResponse } from "@common/sprinklers/deviceRequests"; import DurationInput from "./DurationInput"; @observer diff --git a/app/sprinklers/websocket.ts b/app/sprinklers/websocket.ts index c5d17da..97fdcdc 100644 --- a/app/sprinklers/websocket.ts +++ b/app/sprinklers/websocket.ts @@ -1,10 +1,11 @@ import { action, observable, when } from "mobx"; import { update } from "serializr"; +import * as rpc from "@common/jsonRpc"; import logger from "@common/logger"; +import * as deviceRequests from "@common/sprinklers/deviceRequests"; import { ErrorCode } from "@common/sprinklers/ErrorCode"; import * as s from "@common/sprinklers/index"; -import * as requests from "@common/sprinklers/requests"; import * as schema from "@common/sprinklers/schema/index"; import { seralizeRequest } from "@common/sprinklers/schema/requests"; import * as ws from "@common/sprinklers/websocketData"; @@ -14,6 +15,8 @@ const log = logger.child({ source: "websocket" }); const TIMEOUT_MS = 5000; const RECONNECT_TIMEOUT_MS = 5000; +// tslint:disable:member-ordering + export class WSSprinklersDevice extends s.SprinklersDevice { readonly api: WebSocketApiClient; @@ -32,29 +35,27 @@ export class WSSprinklersDevice extends s.SprinklersDevice { return this._id; } - subscribe() { + async subscribe() { if (!this.api.socket) { throw new Error("WebSocket not connected"); } const subscribeRequest: ws.IDeviceSubscribeRequest = { - type: "deviceSubscribeRequest", deviceId: this.id, }; - this.api.socket.send(JSON.stringify(subscribeRequest)); - } - - onSubscribeResponse(data: ws.IDeviceSubscribeResponse) { - this.connectionState.serverToBroker = true; - this.connectionState.clientToServer = true; - if (data.result === "success") { - this.connectionState.hasPermission = true; - this.connectionState.brokerToDevice = false; - } else if (data.result === "noPermission") { - this.connectionState.hasPermission = false; + try { + await this.api.makeRequest("deviceSubscribe", subscribeRequest); + this.connectionState.serverToBroker = true; + this.connectionState.clientToServer = true; + } catch (err) { + if ((err as ws.Error).code === ErrorCode.NoPermission) { + this.connectionState.hasPermission = false; + } else { + log.error({ err }); + } } } - makeRequest(request: requests.Request): Promise { + makeRequest(request: deviceRequests.Request): Promise { return this.api.makeDeviceCall(this.id, request); } } @@ -63,13 +64,11 @@ export class WebSocketApiClient implements s.ISprinklersApi { readonly webSocketUrl: string; devices: Map = new Map(); - - nextDeviceRequestId = Math.round(Math.random() * 1000000); - deviceResponseCallbacks: { [id: number]: (res: ws.IDeviceCallResponse) => void | undefined; } = {}; - @observable connectionState: s.ConnectionState = new s.ConnectionState(); - socket: WebSocket | null = null; + + private nextRequestId = Math.round(Math.random() * 1000000); + private responseCallbacks: ws.ServerResponseHandlers = {}; private reconnectTimer: number | null = null; get connected(): boolean { @@ -111,47 +110,72 @@ export class WebSocketApiClient implements s.ISprinklersApi { // NOT IMPLEMENTED } + async authenticate(accessToken: string): Promise { + return this.makeRequest("authenticate", { accessToken }); + } + // args must all be JSON serializable - makeDeviceCall(deviceId: string, request: requests.Request): Promise { + async makeDeviceCall(deviceId: string, request: deviceRequests.Request): Promise { if (this.socket == null) { - const res: requests.Response = { - type: request.type, - result: "error", + const error: ws.Error = { code: ErrorCode.ServerDisconnected, message: "the server is not connected", }; - throw res; + throw error; } const requestData = seralizeRequest(request); - const id = this.nextDeviceRequestId++; - const data: ws.IDeviceCallRequest = { - type: "deviceCallRequest", - requestId: id, deviceId, data: requestData, - }; - const promise = new Promise((resolve, reject) => { + const data: ws.IDeviceCallRequest = { deviceId, data: requestData }; + const resData = await this.makeRequest("deviceCall", data); + if (resData.data.result === "error") { + throw { + code: resData.data.code, + message: resData.data.message, + data: resData.data, + }; + } else { + return resData.data; + } + } + + makeRequest(method: Method, params: ws.IClientRequestTypes[Method]): + Promise { + const id = this.nextRequestId++; + return new Promise((resolve, reject) => { let timeoutHandle: number; - this.deviceResponseCallbacks[id] = (resData) => { + this.responseCallbacks[id] = (response) => { clearTimeout(timeoutHandle); - delete this.deviceResponseCallbacks[id]; - if (resData.data.result === "success") { - resolve(resData.data); + delete this.responseCallbacks[id]; + if (response.result === "success") { + resolve(response.data); } else { - reject(resData.data); + reject(response.error); } }; timeoutHandle = window.setTimeout(() => { - delete this.deviceResponseCallbacks[id]; - const res: requests.Response = { - type: request.type, - result: "error", - code: ErrorCode.Timeout, - message: "the request timed out", + delete this.responseCallbacks[id]; + const res: ws.ErrorData = { + result: "error", error: { + code: ErrorCode.Timeout, + message: "the request timed out", + }, }; reject(res); }, TIMEOUT_MS); + this.sendRequest(id, method, params); }); + } + + private sendMessage(data: ws.ClientMessage) { + if (!this.socket) { + throw new Error("WebSocketApiClient is not connected"); + } this.socket.send(JSON.stringify(data)); - return promise; + } + + private sendRequest( + id: number, method: Method, params: ws.IClientRequestTypes[Method], + ) { + this.sendMessage({ type: "request", id, method, params }); } private _reconnect = () => { @@ -194,7 +218,7 @@ export class WebSocketApiClient implements s.ISprinklersApi { } private onMessage(event: MessageEvent) { - let data: ws.IServerMessage; + let data: ws.ServerMessage; try { data = JSON.parse(event.data); } catch (err) { @@ -202,47 +226,46 @@ export class WebSocketApiClient implements s.ISprinklersApi { } log.trace({ data }, "websocket message"); switch (data.type) { - case "deviceSubscribeResponse": - this.onDeviceSubscribeResponse(data); + case "notification": + this.onNotification(data); break; - case "deviceUpdate": - this.onDeviceUpdate(data); - break; - case "deviceCallResponse": - this.onDeviceCallResponse(data); - break; - case "brokerConnectionUpdate": - this.onBrokerConnectionUpdate(data); + case "response": + this.onResponse(data); break; default: log.warn({ data }, "unsupported event type received"); } } - private onDeviceSubscribeResponse(data: ws.IDeviceSubscribeResponse) { - const device = this.devices.get(data.deviceId); - if (!device) { - return log.warn({ data }, "invalid deviceSubscribeResponse received"); - } - device.onSubscribeResponse(data); - } - - private onDeviceUpdate(data: ws.IDeviceUpdate) { - const device = this.devices.get(data.deviceId); - if (!device) { - return log.warn({ data }, "invalid deviceUpdate received"); + private onNotification(data: ws.ServerNotification) { + try { + rpc.handleNotification(this.notificationHandlers, data); + } catch (err) { + logger.error({ err }, "error handling server notification"); } - update(schema.sprinklersDevice, device, data.data); } - private onDeviceCallResponse(data: ws.IDeviceCallResponse) { - const cb = this.deviceResponseCallbacks[data.requestId]; - if (typeof cb === "function") { - cb(data); + private onResponse(data: ws.ServerResponse) { + try { + rpc.handleResponse(this.responseCallbacks, data); + } catch (err) { + log.error({ err }, "error handling server response"); } } - private onBrokerConnectionUpdate(data: ws.IBrokerConnectionUpdate) { - this.connectionState.serverToBroker = data.brokerConnected; - } + private notificationHandlers: ws.ServerNotificationHandlers = { + brokerConnectionUpdate: (data: ws.IBrokerConnectionUpdate) => { + this.connectionState.serverToBroker = data.brokerConnected; + }, + deviceUpdate: (data: ws.IDeviceUpdate) => { + const device = this.devices.get(data.deviceId); + if (!device) { + return log.warn({ data }, "invalid deviceUpdate received"); + } + update(schema.sprinklersDevice, device, data.data); + }, + error: (data: ws.Error) => { + log.warn({ err: data }, "server error"); + }, + }; } diff --git a/common/jsonRpc/index.ts b/common/jsonRpc/index.ts new file mode 100644 index 0000000..694c369 --- /dev/null +++ b/common/jsonRpc/index.ts @@ -0,0 +1,155 @@ +// tslint:disable:interface-over-type-literal +export type DefaultRequestTypes = {}; +export type DefaultResponseTypes = {}; +export type DefaultErrorType = { + code: number; + message: string; + data?: any; +}; +export type DefaultNotificationTypes = {}; +// tslint:enable:interface-over-type-literal + +// export interface RpcTypes { +// RequestTypes: DefaultRequestTypes; +// ResponseTypes: DefaultResponseTypes; +// NotificationTypes: DefaultNotificationTypes; +// ErrorType: DefaultErrorType; +// } + +export interface Request { + type: "request"; + id: number; + method: Method; + params: RequestTypes[Method]; +} + +export interface ResponseBase { + type: "response"; + id: number; + method: Method; +} + +export interface SuccessData { + result: "success"; + data: ResponseType; +} + +export interface ErrorData { + result: "error"; + error: ErrorType; +} + +export type ResponseData = + SuccessData | ErrorData; + +export type Response = + ResponseBase & ResponseData; + +export interface Notification { + type: "notification"; + method: Method; + data: NotificationTypes[Method]; +} + +export type Message = + Request | + Response | + Notification; + +// export type TypesMessage = +// Message; + +export function isRequestMethod( + message: Request, method: Method, +): message is Request { + return message.method === method; +} + +export function isResponseMethod( + message: Response, method: Method, +): message is Response { + return message.method === method; +} + +export function isNotificationMethod( + message: Notification, method: Method, +): message is Notification { + return message.method === method; +} + +export type IRequestHandler = + (request: RequestTypes[Method]) => Promise>; + +export type RequestHandlers = { + [Method in keyof RequestTypes]: + IRequestHandler; +}; + +export type IResponseHandler = + (response: ResponseData) => void; + +export interface ResponseHandlers { + [id: number]: IResponseHandler; +} + +export type NotificationHandler = + (notification: NotificationTypes[Method]) => void; + +export type NotificationHandlers = { + [Method in keyof NotificationTypes]: NotificationHandler; +}; + +export function listRequestHandlerMethods( + handlers: RequestHandlers, +): Array { + return Object.keys(handlers) as any; +} + +export function listNotificationHandlerMethods( + handlers: NotificationHandlers, +): Array { + return Object.keys(handlers) as any; +} + +export async function handleRequest( + handlers: RequestHandlers, + message: Request, +): Promise> { + const handler = handlers[message.method]; + if (!handler) { + throw new Error("No handler for request method " + message.method); + } + return handler(message.params); +} + +export function handleResponse( + handlers: ResponseHandlers, + message: Response) { + const handler = handlers[message.id]; + if (!handler) { + return; + } + return handler(message); +} + +export function handleNotification( + handlers: NotificationHandlers, + message: Notification) { + const handler = handlers[message.method]; + if (!handler) { + throw new Error("No handler for notification method " + message.method); + } + return handler(message.data); +} diff --git a/common/sprinklers/ErrorCode.ts b/common/sprinklers/ErrorCode.ts index eb85221..2cc2c3d 100644 --- a/common/sprinklers/ErrorCode.ts +++ b/common/sprinklers/ErrorCode.ts @@ -4,6 +4,9 @@ export enum ErrorCode { Parse = 102, Range = 103, InvalidData = 104, + BadToken = 105, + Unauthorized = 106, + NoPermission = 107, Internal = 200, Timeout = 300, ServerDisconnected = 301, diff --git a/common/sprinklers/SprinklersDevice.ts b/common/sprinklers/SprinklersDevice.ts index 4be42a4..bdb64a6 100644 --- a/common/sprinklers/SprinklersDevice.ts +++ b/common/sprinklers/SprinklersDevice.ts @@ -1,7 +1,7 @@ import { computed, observable } from "mobx"; import { ConnectionState } from "./ConnectionState"; +import * as req from "./deviceRequests"; import { Program } from "./Program"; -import * as requests from "./requests"; import { Section } from "./Section"; import { SectionRunner } from "./SectionRunner"; @@ -25,33 +25,33 @@ export abstract class SprinklersDevice { abstract get id(): string; - abstract makeRequest(request: requests.Request): Promise; + abstract makeRequest(request: req.Request): Promise; - runProgram(opts: requests.WithProgram) { + runProgram(opts: req.WithProgram) { return this.makeRequest({ ...opts, type: "runProgram" }); } - cancelProgram(opts: requests.WithProgram) { + cancelProgram(opts: req.WithProgram) { return this.makeRequest({ ...opts, type: "cancelProgram" }); } - updateProgram(opts: requests.UpdateProgramData): Promise { + updateProgram(opts: req.UpdateProgramData): Promise { return this.makeRequest({ ...opts, type: "updateProgram" }) as Promise; } - runSection(opts: requests.RunSectionData): Promise { + runSection(opts: req.RunSectionData): Promise { return this.makeRequest({ ...opts, type: "runSection" }) as Promise; } - cancelSection(opts: requests.WithSection) { + cancelSection(opts: req.WithSection) { return this.makeRequest({ ...opts, type: "cancelSection" }); } - cancelSectionRunId(opts: requests.CancelSectionRunIdData) { + cancelSectionRunId(opts: req.CancelSectionRunIdData) { return this.makeRequest({ ...opts, type: "cancelSectionRunId" }); } - pauseSectionRunner(opts: requests.PauseSectionRunnerData) { + pauseSectionRunner(opts: req.PauseSectionRunnerData) { return this.makeRequest({ ...opts, type: "pauseSectionRunner" }); } diff --git a/common/sprinklers/requests.ts b/common/sprinklers/deviceRequests.ts similarity index 100% rename from common/sprinklers/requests.ts rename to common/sprinklers/deviceRequests.ts diff --git a/common/sprinklers/mqtt/index.ts b/common/sprinklers/mqtt/index.ts index 3a914e9..5956071 100644 --- a/common/sprinklers/mqtt/index.ts +++ b/common/sprinklers/mqtt/index.ts @@ -4,7 +4,7 @@ import { update } from "serializr"; import logger from "@common/logger"; import * as s from "@common/sprinklers"; -import * as requests from "@common/sprinklers/requests"; +import * as requests from "@common/sprinklers/deviceRequests"; import * as schema from "@common/sprinklers/schema"; import { seralizeRequest } from "@common/sprinklers/schema/requests"; diff --git a/common/sprinklers/schema/requests.ts b/common/sprinklers/schema/requests.ts index f44949e..12543bb 100644 --- a/common/sprinklers/schema/requests.ts +++ b/common/sprinklers/schema/requests.ts @@ -1,5 +1,5 @@ import { createSimpleSchema, deserialize, ModelSchema, primitive, serialize } from "serializr"; -import * as requests from "../requests"; +import * as requests from "../deviceRequests"; import * as common from "./common"; export const withType: ModelSchema = createSimpleSchema({ diff --git a/common/sprinklers/websocketData.ts b/common/sprinklers/websocketData.ts index d223ca0..48b644b 100644 --- a/common/sprinklers/websocketData.ts +++ b/common/sprinklers/websocketData.ts @@ -1,52 +1,76 @@ -import { Response as ResponseData } from "@common/sprinklers/requests"; +import * as rpc from "../jsonRpc/index"; -export interface IError { - type: "error"; - message: string; - data: any; +import { Response as ResponseData } from "@common/sprinklers/deviceRequests"; + +export interface IAuthenticateRequest { + accessToken: string; } -export interface IDeviceSubscribeResponse { - type: "deviceSubscribeResponse"; +export interface IDeviceSubscribeRequest { deviceId: string; - result: "success" | "noPermission"; } -export interface IDeviceUpdate { - type: "deviceUpdate"; +export interface IDeviceCallRequest { deviceId: string; data: any; } +export interface IClientRequestTypes { + "authenticate": IAuthenticateRequest; + "deviceSubscribe": IDeviceSubscribeRequest; + "deviceCall": IDeviceCallRequest; +} + +export interface IAuthenticateResponse { + authenticated: boolean; + message: string; + data?: any; +} + +export interface IDeviceSubscribeResponse { + deviceId: string; +} + export interface IDeviceCallResponse { - type: "deviceCallResponse"; - requestId: number; data: ResponseData; } -export interface IBrokerConnectionUpdate { - type: "brokerConnectionUpdate"; - brokerConnected: boolean; +export interface IServerResponseTypes { + "authenticate": IAuthenticateResponse; + "deviceSubscribe": IDeviceSubscribeResponse; + "deviceCall": IDeviceCallResponse; } -export type IServerMessage = IError | IDeviceSubscribeResponse | IDeviceUpdate | IDeviceCallResponse | - IBrokerConnectionUpdate; +export type ClientRequestMethods = keyof IClientRequestTypes; -export interface IAuthenticateRequest { - type: "authenticateRequest"; - accessToken: string; +export interface IBrokerConnectionUpdate { + brokerConnected: boolean; } -export interface IDeviceSubscribeRequest { - type: "deviceSubscribeRequest"; +export interface IDeviceUpdate { deviceId: string; + data: any; } -export interface IDeviceCallRequest { - type: "deviceCallRequest"; - requestId: number; - deviceId: string; - data: any; +export interface IServerNotificationTypes { + "brokerConnectionUpdate": IBrokerConnectionUpdate; + "deviceUpdate": IDeviceUpdate; + "error": Error; } +export type ServerNotificationMethod = keyof IServerNotificationTypes; + +export type Error = rpc.DefaultErrorType; +export type ErrorData = rpc.ErrorData; + +export type ServerMessage = rpc.Message<{}, IServerResponseTypes, Error, IServerNotificationTypes>; +export type ServerNotification = rpc.Notification; +export type ServerResponse = rpc.Response; +export type ServerResponseData = + rpc.ResponseData; +export type ServerResponseHandlers = rpc.ResponseHandlers; +export type ServerNotificationHandlers = rpc.NotificationHandlers; -export type IClientMessage = IDeviceSubscribeRequest | IDeviceCallRequest; +export type ClientRequest = + rpc.Request; +export type ClientMessage = rpc.Message; +export type ClientRequestHandlers = rpc.RequestHandlers; diff --git a/package.json b/package.json index dccdbcd..e582eff 100644 --- a/package.json +++ b/package.json @@ -60,7 +60,6 @@ "@types/core-js": "^2.5.0", "@types/express": "^4.16.0", "@types/jsonwebtoken": "^7.2.7", - "@types/lodash": "^4.14.110", "@types/lodash-es": "^4.17.0", "@types/node": "^10.3.5", "@types/object-assign": "^4.0.30", @@ -83,7 +82,6 @@ "font-awesome": "^4.7.0", "happypack": "^5.0.0", "html-webpack-plugin": "^3.2.0", - "lodash": "^4.17.10", "lodash-es": "^4.17.10", "mini-css-extract-plugin": "^0.4.0", "mobx-react": "^5.2.3", diff --git a/server/express/authentication.ts b/server/express/authentication.ts index 1b0d84e..aab3f2c 100644 --- a/server/express/authentication.ts +++ b/server/express/authentication.ts @@ -1,4 +1,3 @@ -import log from "@common/logger"; import * as Express from "express"; import Router from "express-promise-router"; import * as jwt from "jsonwebtoken"; @@ -29,7 +28,7 @@ function getExpTime(lifetime: number) { return Math.floor(Date.now() / 1000) + lifetime; } -interface TokenClaims { +export interface TokenClaims { iss: string; type: "access" | "refresh"; aud: string; @@ -49,7 +48,7 @@ function signToken(claims: TokenClaims): Promise { }); } -function verifyToken(token: string): Promise { +export function verifyToken(token: string): Promise { return new Promise((resolve, reject) => { jwt.verify(token, JWT_SECRET, (err, decoded) => { if (err) { diff --git a/server/express/index.ts b/server/express/index.ts index 132e4a3..6d0db7b 100644 --- a/server/express/index.ts +++ b/server/express/index.ts @@ -1,6 +1,6 @@ import * as bodyParser from "body-parser"; import * as express from "express"; -import { serialize, serializeAll } from "serializr"; +import { serialize} from "serializr"; import * as schema from "@common/sprinklers/schema"; import { ServerState } from "../state"; @@ -16,8 +16,10 @@ export function createApp(state: ServerState) { app.use(logger); app.use(bodyParser.json()); - app.get("/api/grinklers", (req, res) => { - const j = serialize(schema.sprinklersDevice, state.device); + app.get("/api/devices/:deviceId", (req, res) => { + // TODO: authorize device + const device = state.mqttClient.getDevice(req.params.deviceId); + const j = serialize(schema.sprinklersDevice, device); res.send(j); }); diff --git a/server/models/Database.ts b/server/models/Database.ts index fa5520a..ec2efb4 100644 --- a/server/models/Database.ts +++ b/server/models/Database.ts @@ -1,7 +1,7 @@ import * as r from "rethinkdb"; -import { User } from "./User"; import logger from "@common/logger"; +import { User } from "./User"; export class Database { static readonly databaseName = "sprinklers3"; @@ -54,4 +54,4 @@ export class Database { const alex2 = await User.loadByUsername(this, "alex"); logger.info("password valid: " + await alex2!.comparePassword("kakashka")); } -} \ No newline at end of file +} diff --git a/server/models/User.ts b/server/models/User.ts index 26bb300..4c8fe44 100644 --- a/server/models/User.ts +++ b/server/models/User.ts @@ -1,6 +1,6 @@ import * as bcrypt from "bcrypt"; import * as r from "rethinkdb"; -import { createModelSchema, deserialize, primitive, serialize, update } from "serializr"; +import { createModelSchema, primitive, serialize, update } from "serializr"; import { Database } from "./Database"; @@ -23,10 +23,6 @@ export class User implements IUser { private db: Database; - private get _db() { - return this.db.db; - } - private get table() { return this.db.db.table(User.tableName); } @@ -76,7 +72,7 @@ export class User implements IUser { async create() { const data = serialize(this); delete data.id; - const a = this.table + await this.table .insert(data) .run(this.db.conn); } diff --git a/server/state.ts b/server/state.ts index 82a0f4a..ea3be22 100644 --- a/server/state.ts +++ b/server/state.ts @@ -1,11 +1,9 @@ import logger from "@common/logger"; -import {SprinklersDevice} from "@common/sprinklers"; import * as mqtt from "@common/sprinklers/mqtt"; import { Database } from "./models/Database"; export class ServerState { mqttClient: mqtt.MqttApiClient; - device: SprinklersDevice; database: Database; constructor() { @@ -14,7 +12,6 @@ export class ServerState { throw new Error("Must specify a MQTT_URL to connect to"); } this.mqttClient = new mqtt.MqttApiClient(mqttUrl); - this.device = this.mqttClient.getDevice("grinklers"); this.database = new Database(); } diff --git a/server/websocket/index.ts b/server/websocket/index.ts index e6d31de..1586ae2 100644 --- a/server/websocket/index.ts +++ b/server/websocket/index.ts @@ -2,12 +2,17 @@ import { autorun } from "mobx"; import { serialize } from "serializr"; import * as WebSocket from "ws"; +import * as rpc from "@common/jsonRpc"; import log from "@common/logger"; -import * as requests from "@common/sprinklers/requests"; +import * as deviceRequests from "@common/sprinklers/deviceRequests"; +import { ErrorCode } from "@common/sprinklers/ErrorCode"; import * as schema from "@common/sprinklers/schema"; import * as ws from "@common/sprinklers/websocketData"; +import { TokenClaims, verifyToken } from "../express/authentication"; import { ServerState } from "../state"; +// tslint:disable:member-ordering + export class WebSocketClient { api: WebSocketApi; socket: WebSocket; @@ -30,10 +35,9 @@ export class WebSocketClient { start() { this.disposers.push(autorun(() => { const updateData: ws.IBrokerConnectionUpdate = { - type: "brokerConnectionUpdate", brokerConnected: this.state.mqttClient.connected, }; - this.socket.send(JSON.stringify(updateData)); + this.sendNotification("brokerConnectionUpdate", updateData); })); this.socket.on("message", this.handleSocketMessage); this.socket.on("close", this.stop); @@ -44,6 +48,94 @@ export class WebSocketClient { this.api.removeClient(this); } + private requestHandlers: ws.ClientRequestHandlers = { + authenticate: async (data: ws.IAuthenticateRequest) => { + if (!data.accessToken) { + return { + result: "error", error: { + code: ErrorCode.BadRequest, message: "no token specified", + }, + }; + } + let decoded: TokenClaims; + try { + decoded = await verifyToken(data.accessToken); + } catch (e) { + return { + result: "error", + error: { code: ErrorCode.BadToken, message: "invalid token", data: e }, + }; + } + this.userId = decoded.aud; + return { + result: "success", + data: { authenticated: true, message: "authenticated" }, + }; + }, + deviceSubscribe: async (data: ws.IDeviceSubscribeRequest) => { + const deviceId = data.deviceId; + if (deviceId !== "grinklers") { // TODO: somehow validate this device id? + return { + result: "error", error: { + code: ErrorCode.NoPermission, + message: "you do not have permission to subscribe to this device", + }, + }; + } + if (this.deviceSubscriptions.indexOf(deviceId) === -1) { + this.deviceSubscriptions.push(deviceId); + const device = this.state.mqttClient.getDevice(deviceId); + log.debug({ deviceId, userId: this.userId }, "websocket client subscribed to device"); + this.disposers.push(autorun(() => { + const json = serialize(schema.sprinklersDevice, device); + log.trace({ device: json }); + const updateData: ws.IDeviceUpdate = { deviceId, data: json }; + this.sendNotification("deviceUpdate", updateData); + }, { delay: 100 })); + } + + const response: ws.IDeviceSubscribeResponse = { + deviceId, + }; + return { result: "success", data: response }; + }, + deviceCall: async (data: ws.IDeviceCallRequest) => { + try { + const response = await this.doDeviceCallRequest(data); + const resData: ws.IDeviceCallResponse = { + data: response, + }; + return { result: "success", data: resData }; + } catch (err) { + const e: deviceRequests.ErrorResponseData = err; + return { + result: "error", error: { + code: e.code, + message: e.message, + data: e, + }, + }; + } + }, + }; + + private sendMessage(data: ws.ServerMessage) { + this.socket.send(JSON.stringify(data)); + } + + private sendNotification( + method: Method, + data: ws.IServerNotificationTypes[Method]) { + this.sendMessage({ type: "notification", method, data }); + } + + private sendResponse( + method: Method, + id: number, + data: ws.ServerResponseData) { + this.sendMessage({ type: "response", method, id, ...data }); + } + private handleSocketMessage = (socketData: WebSocket.Data) => { this.doHandleSocketMessage(socketData) .catch((err) => { @@ -53,85 +145,64 @@ export class WebSocketClient { private async doHandleSocketMessage(socketData: WebSocket.Data) { if (typeof socketData !== "string") { - return this.onError({ type: typeof socketData }, "received invalid socket data type from client"); + return this.onError({ type: typeof socketData }, + "received invalid socket data type from client", ErrorCode.Parse); } - let data: ws.IClientMessage; + let data: ws.ClientMessage; try { data = JSON.parse(socketData); } catch (err) { - return this.onError({ event, err }, "received invalid websocket message from client"); + return this.onError({ socketData, err }, "received invalid websocket message from client", + ErrorCode.Parse); } + log.debug({ data }, "client message"); switch (data.type) { - case "deviceSubscribeRequest": - this.deviceSubscribeRequest(data); - break; - case "deviceCallRequest": - await this.deviceCallRequest(data); + case "request": + await this.handleRequest(data); break; default: - return this.onError({ data }, "received invalid client message type"); + return this.onError({ data }, "received invalid message type from client", + ErrorCode.BadRequest); } } - private onError(data: any, message: string) { - log.error(data, message); - const errorData: ws.IError = { - type: "error", message, data, - }; - this.socket.send(JSON.stringify(errorData)); - } - - private deviceSubscribeRequest(data: ws.IDeviceSubscribeRequest) { - const deviceId = data.deviceId; - let result: ws.IDeviceSubscribeResponse["result"]; - if (deviceId !== "grinklers") { // TODO: somehow validate this device id? - result = "noPermission"; + private async handleRequest(request: ws.ClientRequest) { + let response: ws.ServerResponseData; + if (!this.requestHandlers[request.method]) { + log.warn({ method: request.method }, "received invalid client request method"); + response = { + result: "error", error: { + code: ErrorCode.BadRequest, message: "received invalid client request method", + }, + }; } else { - if (this.deviceSubscriptions.indexOf(deviceId) !== -1) { - return; + try { + response = await rpc.handleRequest(this.requestHandlers, request); + } catch (err) { + log.error({ method: request.method, err }, "error during processing of client request"); + response = { + result: "error", error: { + code: ErrorCode.Internal, message: "error during processing of client request", + data: err.toString(), + }, + }; } - this.deviceSubscriptions.push(deviceId); - const device = this.state.mqttClient.getDevice(deviceId); - log.debug({ deviceId, userId: this.userId }, "websocket client subscribed to device"); - this.disposers.push(autorun(() => { - const json = serialize(schema.sprinklersDevice, device); - log.trace({ device: json }); - const updateData: ws.IDeviceUpdate = { type: "deviceUpdate", deviceId, data: json }; - this.socket.send(JSON.stringify(updateData)); - }, { delay: 100 })); - result = "success"; } - const response: ws.IDeviceSubscribeResponse = { - type: "deviceSubscribeResponse", deviceId, result, - }; - this.socket.send(JSON.stringify(response)); + this.sendResponse(request.method, request.id, response); } - private async deviceCallRequest(data: ws.IDeviceCallRequest): Promise { - let response: requests.Response | false; - try { - response = await this.doDeviceCallRequest(data); - } catch (err) { - response = err; - } - if (response) { - const resData: ws.IDeviceCallResponse = { - type: "deviceCallResponse", - requestId: data.requestId, - data: response, - }; - this.socket.send(JSON.stringify(resData)); - } + private onError(data: any, message: string, code: number = ErrorCode.Internal) { + log.error(data, message); + const errorData: ws.Error = { code, message, data }; + this.sendNotification("error", errorData); } - private async doDeviceCallRequest(requestData: ws.IDeviceCallRequest): Promise { + private async doDeviceCallRequest(requestData: ws.IDeviceCallRequest): Promise { const { deviceId, data } = requestData; - if (deviceId !== "grinklers") { - // error handling? or just get the right device - return false; - } + const device = this.state.mqttClient.getDevice(deviceId); + // TODO: authorize the requests const request = schema.requests.deserializeRequest(data); - return this.state.device.makeRequest(request); + return device.makeRequest(request); } } diff --git a/yarn.lock b/yarn.lock index 9cd880e..0835524 100644 --- a/yarn.lock +++ b/yarn.lock @@ -88,7 +88,7 @@ dependencies: "@types/lodash" "*" -"@types/lodash@*", "@types/lodash@^4.14.110": +"@types/lodash@*": version "4.14.110" resolved "https://registry.yarnpkg.com/@types/lodash/-/lodash-4.14.110.tgz#fb07498f84152947f30ea09d89207ca07123461e"