From dab7a9e19e10c36db64ba4625702d4ac8d8fdb58 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Mon, 9 Oct 2017 08:09:08 -0600 Subject: [PATCH] Good work on websocket device comms --- .vscode/tasks.json | 5 + app/components/RunSectionForm.tsx | 3 +- app/index.tsx | 6 +- app/state/index.ts | 12 +-- app/state/web.ts | 46 --------- app/state/websocket.ts | 146 +++++++++++++++++++++++++++++ common/logger.ts | 6 +- common/sprinklers/json/index.ts | 2 +- common/sprinklers/websocketData.ts | 24 +++++ common/utils.ts | 11 ++- package.json | 4 +- server/configureLogger.ts | 6 +- server/index.ts | 78 +++++++++++++-- yarn.lock | 8 +- 14 files changed, 279 insertions(+), 78 deletions(-) delete mode 100644 app/state/web.ts create mode 100644 app/state/websocket.ts create mode 100644 common/sprinklers/websocketData.ts diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 320e680..f461b63 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -43,6 +43,11 @@ "type": "npm", "script": "start:watch", "problemMatcher": [] + }, + { + "type": "npm", + "script": "start:dev-server", + "problemMatcher": [] } ] } \ No newline at end of file diff --git a/app/components/RunSectionForm.tsx b/app/components/RunSectionForm.tsx index bff6880..9db0e48 100644 --- a/app/components/RunSectionForm.tsx +++ b/app/components/RunSectionForm.tsx @@ -67,9 +67,8 @@ export default class RunSectionForm extends React.Component<{ } const section: Section = this.props.sections[this.state.section]; const { duration } = this.state; - log.debug({ section, duration }, "running section"); section.run(duration) - .then((a) => log.debug("ran section", a)) + .then((result) => log.debug({ result }, "requested section run")) .catch((err) => log.error(err, "error running section")); } diff --git a/app/index.tsx b/app/index.tsx index 385b7d2..e1cb5b5 100644 --- a/app/index.tsx +++ b/app/index.tsx @@ -3,10 +3,10 @@ import * as ReactDOM from "react-dom"; import { AppContainer } from "react-hot-loader"; import App from "@app/components/App"; -import { ProvideState, MqttApiState, WebApiState } from "@app/state"; -import log, { setLogger } from "@common/logger"; +import { MqttApiState, ProvideState, WebApiState } from "@app/state"; +import log from "@common/logger"; -setLogger(log.child({ name: "sprinklers3/app" })); +Object.assign(log, { name: "sprinklers3/app", level: "debug" }); // const state = new MqttApiState(); const state = new WebApiState(); diff --git a/app/state/index.ts b/app/state/index.ts index d25dd00..c9b8328 100644 --- a/app/state/index.ts +++ b/app/state/index.ts @@ -1,6 +1,6 @@ import { ISprinklersApi } from "@common/sprinklers"; import { MqttApiClient } from "@common/sprinklers/mqtt"; -import { WebApiClient } from "./web"; +import { WebApiClient } from "./websocket"; import { UiMessage, UiStore } from "./ui"; export { UiMessage, UiStore }; @@ -19,14 +19,14 @@ export abstract class StateBase { } } +const isDev = process.env.NODE_ENV === "development"; + export class MqttApiState extends StateBase { sprinklersApi = new MqttApiClient(`ws://${location.hostname}:1884`); } export class WebApiState extends StateBase { - sprinklersApi = new WebApiClient(); + sprinklersApi = new WebApiClient(isDev ? + `ws://${location.hostname}:8080` : + `ws://${location.host}`); } - -// const state = new State(); - -// export default state; diff --git a/app/state/web.ts b/app/state/web.ts deleted file mode 100644 index cd547e4..0000000 --- a/app/state/web.ts +++ /dev/null @@ -1,46 +0,0 @@ -import { update } from "serializr"; - -import * as s from "@common/sprinklers"; -import * as schema from "@common/sprinklers/json"; - -export class WebSprinklersDevice extends s.SprinklersDevice { - get id() { - return "grinklers"; - } - async runSection(section: number | s.Section, duration: s.Duration): Promise<{}> { - return {}; - } - async runProgram(program: number | s.Program): Promise<{}> { - return {}; - } - async cancelSectionRunById(id: number): Promise<{}> { - return {}; - } - async pauseSectionRunner(): Promise<{}> { - return {}; - } - async unpauseSectionRunner(): Promise<{}> { - return {}; - } -} - -export class WebApiClient implements s.ISprinklersApi { - start() { - // NOT IMPLEMENTED - } - - getDevice(name: string): s.SprinklersDevice { - const device = new WebSprinklersDevice(); - fetch("/api/grinklers") - .then((res) => res.json()) - .then((json) => { - update(schema.sprinklersDeviceSchema, device, json); - }) - .catch((e) => alert(e)); - return device; - } - - removeDevice(name: string) { - // NOT IMPLEMENTED - } -} diff --git a/app/state/websocket.ts b/app/state/websocket.ts new file mode 100644 index 0000000..8d8d3c8 --- /dev/null +++ b/app/state/websocket.ts @@ -0,0 +1,146 @@ +import { update } from "serializr"; + +import logger from "@common/logger"; +import * as s from "@common/sprinklers"; +import * as schema from "@common/sprinklers/json"; +import * as ws from "@common/sprinklers/websocketData"; +import { checkedIndexOf } from "@common/utils"; + +const log = logger.child({ source: "websocket" }); + +export class WebSprinklersDevice extends s.SprinklersDevice { + readonly api: WebApiClient; + + constructor(api: WebApiClient) { + super(); + this.api = api; + } + + get id() { + return "grinklers"; + } + + runSection(section: number | s.Section, duration: s.Duration): Promise<{}> { + const secNum = checkedIndexOf(section, this.sections, "Section"); + const dur = duration.toSeconds(); + return this.makeCall("runSection", secNum, dur); + } + async runProgram(program: number | s.Program): Promise<{}> { + return {}; + } + async cancelSectionRunById(id: number): Promise<{}> { + return {}; + } + async pauseSectionRunner(): Promise<{}> { + return {}; + } + async unpauseSectionRunner(): Promise<{}> { + return {}; + } + + private makeCall(method: string, ...args: any[]) { + return this.api.makeDeviceCall(this.id, method, ...args); + } +} + +export class WebApiClient implements s.ISprinklersApi { + readonly webSocketUrl: string; + socket: WebSocket; + device: WebSprinklersDevice; + + nextDeviceRequestId = Math.round(Math.random() * 1000000); + deviceResponseCallbacks: { [id: number]: (res: ws.IDeviceCallResponse) => void | undefined; } = {}; + + constructor(webSocketUrl: string) { + this.webSocketUrl = webSocketUrl; + this.device = new WebSprinklersDevice(this); + } + + start() { + log.debug({ url: this.webSocketUrl }, "connecting to websocket"); + this.socket = new WebSocket(this.webSocketUrl); + this.socket.onopen = this.onOpen.bind(this); + this.socket.onclose = this.onClose.bind(this); + this.socket.onerror = this.onError.bind(this); + this.socket.onmessage = this.onMessage.bind(this); + } + + getDevice(name: string): s.SprinklersDevice { + if (name !== "grinklers") { + throw new Error("Devices which are not grinklers are not supported yet"); + } + return this.device; + } + + removeDevice(name: string) { + // NOT IMPLEMENTED + } + + // args must all be JSON serializable + makeDeviceCall(deviceName: string, method: string, ...args: any[]): Promise { + const id = this.nextDeviceRequestId++; + const data: ws.IDeviceCallRequest = { + type: "deviceCallRequest", + id, deviceName, method, args, + }; + const promise = new Promise((resolve, reject) => { + this.deviceResponseCallbacks[id] = (resData) => { + if (resData.result === "success") { + resolve(resData.data); + } else { + reject(resData.data); + } + delete this.deviceResponseCallbacks[id]; + }; + }); + this.socket.send(JSON.stringify(data)); + return promise; + } + + private onOpen() { + log.info("established websocket connection"); + } + + private onClose(event: CloseEvent) { + log.info({ reason: event.reason, wasClean: event.wasClean }, + "disconnected from websocket"); + } + + private onError(event: Event) { + log.error(event, "websocket error"); + } + + private onMessage(event: MessageEvent) { + log.trace({ event }, "websocket message"); + let data: ws.IServerMessage; + try { + data = JSON.parse(event.data); + } catch (err) { + return log.error({ event, err }, "received invalid websocket message"); + } + switch (data.type) { + case "deviceUpdate": + this.onDeviceUpdate(data); + break; + case "deviceCallResponse": + this.onDeviceCallResponse(data); + break; + default: + log.warn({ data }, "unsupported event type received"); + } + } + + private onDeviceUpdate(data: ws.IDeviceUpdate) { + if (data.name !== "grinklers") { + return log.warn({ data }, "invalid deviceUpdate received"); + } + update(schema.sprinklersDeviceSchema, this.device, data.data); + } + + private onDeviceCallResponse(data: ws.IDeviceCallResponse) { + const cb = this.deviceResponseCallbacks[data.id]; + if (typeof cb === "function") { + cb(data); + } + } +} diff --git a/common/logger.ts b/common/logger.ts index 7f0ccc3..6e625b0 100644 --- a/common/logger.ts +++ b/common/logger.ts @@ -108,13 +108,9 @@ function formatLevel(value: any): ColoredString { } } -let logger: pino.Logger = pino({ +const logger: pino.Logger = pino({ browser: { write }, level: "trace", }); -export function setLogger(newLogger: pino.Logger) { - exports.default = logger = newLogger; -} - export default logger; diff --git a/common/sprinklers/json/index.ts b/common/sprinklers/json/index.ts index a5d195e..07efb55 100644 --- a/common/sprinklers/json/index.ts +++ b/common/sprinklers/json/index.ts @@ -22,7 +22,7 @@ export const dateSchema: PropSchema = { jsDate.toISOString() : null, deserializer: (json: any, done) => { if (json === null) { - done(null, null); + return done(null, null); } try { done(null, new Date(json)); diff --git a/common/sprinklers/websocketData.ts b/common/sprinklers/websocketData.ts new file mode 100644 index 0000000..31ac43b --- /dev/null +++ b/common/sprinklers/websocketData.ts @@ -0,0 +1,24 @@ +export interface IDeviceUpdate { + type: "deviceUpdate"; + name: string; + data: any; +} + +export interface IDeviceCallResponse { + type: "deviceCallResponse"; + id: number; + result: "success" | "error"; + data: any; +} + +export type IServerMessage = IDeviceUpdate | IDeviceCallResponse; + +export interface IDeviceCallRequest { + type: "deviceCallRequest"; + id: number; + deviceName: string; + method: string; + args: any[]; +} + +export type IClientMessage = IDeviceCallRequest; diff --git a/common/utils.ts b/common/utils.ts index 5bbeb65..bfd1896 100644 --- a/common/utils.ts +++ b/common/utils.ts @@ -1,7 +1,12 @@ export function checkedIndexOf(o: T | number, arr: T[], type: string = "object"): number { - const idx = (typeof o === "number") - ? o - : arr.indexOf(o); + let idx: number; + if (typeof o === "number") { + idx = o; + } else if (typeof (o as any).id === "number") { + idx = (o as any).id; + } else { + idx = arr.indexOf(o); + } if (idx < 0 || idx > arr.length) { throw new Error(`Invalid ${type} specified: ${o}`); } diff --git a/package.json b/package.json index 1c2839b..5539401 100644 --- a/package.json +++ b/package.json @@ -46,6 +46,7 @@ "@types/react-dom": "^15.5.0", "@types/react-fontawesome": "^1.5.0", "@types/react-hot-loader": "^3.0.4", + "@types/ws": "^3.2.0", "async": "^2.5.0", "autoprefixer": "^7.1.4", "case-sensitive-paths-webpack-plugin": "^2.1.1", @@ -75,7 +76,8 @@ "serializr": "^1.1.13", "tslint-loader": "^3.5.3", "uglifyjs-webpack-plugin": "^0.4.6", - "url-loader": "^0.5.9" + "url-loader": "^0.5.9", + "ws": "^3.2.0" }, "devDependencies": { "@types/webpack-env": "^1.13.0", diff --git a/server/configureLogger.ts b/server/configureLogger.ts index 7da8d5d..bda4ecb 100644 --- a/server/configureLogger.ts +++ b/server/configureLogger.ts @@ -1,5 +1,5 @@ -import log, { setLogger } from "@common/logger"; -setLogger(log.child({ +import log from "@common/logger"; +Object.assign(log, { name: "sprinklers3/server", level: "debug", -})); +}); diff --git a/server/index.ts b/server/index.ts index 4255f69..fb803df 100644 --- a/server/index.ts +++ b/server/index.ts @@ -6,32 +6,96 @@ import "./configureLogger"; import log from "@common/logger"; import * as mqtt from "@common/sprinklers/mqtt"; import { Server } from "http"; +import * as WebSocket from "ws"; import app from "./app"; const mqttClient = new mqtt.MqttApiClient("mqtt://localhost:1883"); - mqttClient.start(); +import * as s from "@common/sprinklers"; import { sprinklersDeviceSchema } from "@common/sprinklers/json"; +import * as ws from "@common/sprinklers/websocketData"; import { autorunAsync } from "mobx"; import { serialize } from "serializr"; const device = mqttClient.getDevice("grinklers"); -autorunAsync(() => { - const j = serialize(sprinklersDeviceSchema, device); - log.info({ device: j }); -}, 0); - app.get("/api/grinklers", (req, res) => { const j = serialize(sprinklersDeviceSchema, device); res.send(j); }); -const server = new Server(app); +async function doDeviceCallRequest(data: ws.IDeviceCallRequest): Promise { + const { deviceName, method, args } = data; + if (deviceName !== "grinklers") { + // error handling? or just get the right device + return; + } + switch (method) { + case "runSection": + return device.runSection(args[0], s.Duration.fromSeconds(args[1])); + default: + // new Error(`unsupported device call: ${data.method}`) // TODO: error handling? + return; + } +} + +async function deviceCallRequest(socket: WebSocket, data: ws.IDeviceCallRequest): Promise { + let resData: ws.IDeviceCallResponse; + try { + const result = await doDeviceCallRequest(data); + resData = { + type: "deviceCallResponse", + id: data.id, + result: "success", + data: result, + }; + } catch (err) { + resData = { + type: "deviceCallResponse", + id: data.id, + result: "error", + data: err, + }; + } + socket.send(JSON.stringify(resData)); +} + +function webSocketHandler(socket: WebSocket) { + const stop = autorunAsync(() => { + const json = serialize(sprinklersDeviceSchema, device); + log.info({ device: json }); + const data = { type: "deviceUpdate", name: "grinklers", data: json }; + socket.send(JSON.stringify(data)); + }, 100); + socket.on("message", (socketData: WebSocket.Data) => { + if (typeof socketData !== "string") { + return log.error({ type: typeof socketData }, "received invalid socket data type from client"); + } + let data: ws.IClientMessage; + try { + data = JSON.parse(socketData); + } catch (err) { + return log.error({ event, err }, "received invalid websocket message from client"); + } + switch (data.type) { + case "deviceCallRequest": + deviceCallRequest(socket, data); + break; + default: + return log.warn({ data }, "received invalid client message type"); + } + }); + socket.on("close", () => stop()); +} const port = +(process.env.PORT || 8080); const host = process.env.HOST || "0.0.0.0"; +const server = new Server(app); +const webSocketServer = new WebSocket.Server({ server }); + +webSocketServer.on("connection", webSocketHandler); + server.listen(port, host, () => { log.info(`listening at ${host}:${port}`); }); diff --git a/yarn.lock b/yarn.lock index a985177..4cec07f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -100,6 +100,12 @@ version "1.13.1" resolved "https://registry.yarnpkg.com/@types/webpack-env/-/webpack-env-1.13.1.tgz#b45c222e24301bd006e3edfc762cc6b51bda236a" +"@types/ws@^3.2.0": + version "3.2.0" + resolved "https://registry.yarnpkg.com/@types/ws/-/ws-3.2.0.tgz#988ff690e6ed10068a86aa0e9f842d0a03c09e21" + dependencies: + "@types/node" "*" + abbrev@1: version "1.1.0" resolved "https://registry.yarnpkg.com/abbrev/-/abbrev-1.1.0.tgz#d0554c2256636e2f56e7c2e5ad183f859428d81f" @@ -5759,7 +5765,7 @@ write-file-atomic@^2.0.0: imurmurhash "^0.1.4" signal-exit "^3.0.2" -ws@^3.0.0: +ws@^3.0.0, ws@^3.2.0: version "3.2.0" resolved "https://registry.yarnpkg.com/ws/-/ws-3.2.0.tgz#d5d3d6b11aff71e73f808f40cc69d52bb6d4a185" dependencies: