Browse Source

Good work on websocket device comms

update-deps
Alex Mikhalev 7 years ago
parent
commit
dab7a9e19e
  1. 5
      .vscode/tasks.json
  2. 3
      app/components/RunSectionForm.tsx
  3. 6
      app/index.tsx
  4. 12
      app/state/index.ts
  5. 46
      app/state/web.ts
  6. 146
      app/state/websocket.ts
  7. 6
      common/logger.ts
  8. 2
      common/sprinklers/json/index.ts
  9. 24
      common/sprinklers/websocketData.ts
  10. 11
      common/utils.ts
  11. 4
      package.json
  12. 6
      server/configureLogger.ts
  13. 78
      server/index.ts
  14. 8
      yarn.lock

5
.vscode/tasks.json vendored

@ -43,6 +43,11 @@ @@ -43,6 +43,11 @@
"type": "npm",
"script": "start:watch",
"problemMatcher": []
},
{
"type": "npm",
"script": "start:dev-server",
"problemMatcher": []
}
]
}

3
app/components/RunSectionForm.tsx

@ -67,9 +67,8 @@ export default class RunSectionForm extends React.Component<{ @@ -67,9 +67,8 @@ export default class RunSectionForm extends React.Component<{
}
const section: Section = this.props.sections[this.state.section];
const { duration } = this.state;
log.debug({ section, duration }, "running section");
section.run(duration)
.then((a) => log.debug("ran section", a))
.then((result) => log.debug({ result }, "requested section run"))
.catch((err) => log.error(err, "error running section"));
}

6
app/index.tsx

@ -3,10 +3,10 @@ import * as ReactDOM from "react-dom"; @@ -3,10 +3,10 @@ import * as ReactDOM from "react-dom";
import { AppContainer } from "react-hot-loader";
import App from "@app/components/App";
import { ProvideState, MqttApiState, WebApiState } from "@app/state";
import log, { setLogger } from "@common/logger";
import { MqttApiState, ProvideState, WebApiState } from "@app/state";
import log from "@common/logger";
setLogger(log.child({ name: "sprinklers3/app" }));
Object.assign(log, { name: "sprinklers3/app", level: "debug" });
// const state = new MqttApiState();
const state = new WebApiState();

12
app/state/index.ts

@ -1,6 +1,6 @@ @@ -1,6 +1,6 @@
import { ISprinklersApi } from "@common/sprinklers";
import { MqttApiClient } from "@common/sprinklers/mqtt";
import { WebApiClient } from "./web";
import { WebApiClient } from "./websocket";
import { UiMessage, UiStore } from "./ui";
export { UiMessage, UiStore };
@ -19,14 +19,14 @@ export abstract class StateBase { @@ -19,14 +19,14 @@ export abstract class StateBase {
}
}
const isDev = process.env.NODE_ENV === "development";
export class MqttApiState extends StateBase {
sprinklersApi = new MqttApiClient(`ws://${location.hostname}:1884`);
}
export class WebApiState extends StateBase {
sprinklersApi = new WebApiClient();
sprinklersApi = new WebApiClient(isDev ?
`ws://${location.hostname}:8080` :
`ws://${location.host}`);
}
// const state = new State();
// export default state;

46
app/state/web.ts

@ -1,46 +0,0 @@ @@ -1,46 +0,0 @@
import { update } from "serializr";
import * as s from "@common/sprinklers";
import * as schema from "@common/sprinklers/json";
export class WebSprinklersDevice extends s.SprinklersDevice {
get id() {
return "grinklers";
}
async runSection(section: number | s.Section, duration: s.Duration): Promise<{}> {
return {};
}
async runProgram(program: number | s.Program): Promise<{}> {
return {};
}
async cancelSectionRunById(id: number): Promise<{}> {
return {};
}
async pauseSectionRunner(): Promise<{}> {
return {};
}
async unpauseSectionRunner(): Promise<{}> {
return {};
}
}
export class WebApiClient implements s.ISprinklersApi {
start() {
// NOT IMPLEMENTED
}
getDevice(name: string): s.SprinklersDevice {
const device = new WebSprinklersDevice();
fetch("/api/grinklers")
.then((res) => res.json())
.then((json) => {
update(schema.sprinklersDeviceSchema, device, json);
})
.catch((e) => alert(e));
return device;
}
removeDevice(name: string) {
// NOT IMPLEMENTED
}
}

146
app/state/websocket.ts

@ -0,0 +1,146 @@ @@ -0,0 +1,146 @@
import { update } from "serializr";
import logger from "@common/logger";
import * as s from "@common/sprinklers";
import * as schema from "@common/sprinklers/json";
import * as ws from "@common/sprinklers/websocketData";
import { checkedIndexOf } from "@common/utils";
const log = logger.child({ source: "websocket" });
export class WebSprinklersDevice extends s.SprinklersDevice {
readonly api: WebApiClient;
constructor(api: WebApiClient) {
super();
this.api = api;
}
get id() {
return "grinklers";
}
runSection(section: number | s.Section, duration: s.Duration): Promise<{}> {
const secNum = checkedIndexOf(section, this.sections, "Section");
const dur = duration.toSeconds();
return this.makeCall("runSection", secNum, dur);
}
async runProgram(program: number | s.Program): Promise<{}> {
return {};
}
async cancelSectionRunById(id: number): Promise<{}> {
return {};
}
async pauseSectionRunner(): Promise<{}> {
return {};
}
async unpauseSectionRunner(): Promise<{}> {
return {};
}
private makeCall(method: string, ...args: any[]) {
return this.api.makeDeviceCall(this.id, method, ...args);
}
}
export class WebApiClient implements s.ISprinklersApi {
readonly webSocketUrl: string;
socket: WebSocket;
device: WebSprinklersDevice;
nextDeviceRequestId = Math.round(Math.random() * 1000000);
deviceResponseCallbacks: { [id: number]: (res: ws.IDeviceCallResponse) => void | undefined; } = {};
constructor(webSocketUrl: string) {
this.webSocketUrl = webSocketUrl;
this.device = new WebSprinklersDevice(this);
}
start() {
log.debug({ url: this.webSocketUrl }, "connecting to websocket");
this.socket = new WebSocket(this.webSocketUrl);
this.socket.onopen = this.onOpen.bind(this);
this.socket.onclose = this.onClose.bind(this);
this.socket.onerror = this.onError.bind(this);
this.socket.onmessage = this.onMessage.bind(this);
}
getDevice(name: string): s.SprinklersDevice {
if (name !== "grinklers") {
throw new Error("Devices which are not grinklers are not supported yet");
}
return this.device;
}
removeDevice(name: string) {
// NOT IMPLEMENTED
}
// args must all be JSON serializable
makeDeviceCall(deviceName: string, method: string, ...args: any[]): Promise<any> {
const id = this.nextDeviceRequestId++;
const data: ws.IDeviceCallRequest = {
type: "deviceCallRequest",
id, deviceName, method, args,
};
const promise = new Promise((resolve, reject) => {
this.deviceResponseCallbacks[id] = (resData) => {
if (resData.result === "success") {
resolve(resData.data);
} else {
reject(resData.data);
}
delete this.deviceResponseCallbacks[id];
};
});
this.socket.send(JSON.stringify(data));
return promise;
}
private onOpen() {
log.info("established websocket connection");
}
private onClose(event: CloseEvent) {
log.info({ reason: event.reason, wasClean: event.wasClean },
"disconnected from websocket");
}
private onError(event: Event) {
log.error(event, "websocket error");
}
private onMessage(event: MessageEvent) {
log.trace({ event }, "websocket message");
let data: ws.IServerMessage;
try {
data = JSON.parse(event.data);
} catch (err) {
return log.error({ event, err }, "received invalid websocket message");
}
switch (data.type) {
case "deviceUpdate":
this.onDeviceUpdate(data);
break;
case "deviceCallResponse":
this.onDeviceCallResponse(data);
break;
default:
log.warn({ data }, "unsupported event type received");
}
}
private onDeviceUpdate(data: ws.IDeviceUpdate) {
if (data.name !== "grinklers") {
return log.warn({ data }, "invalid deviceUpdate received");
}
update(schema.sprinklersDeviceSchema, this.device, data.data);
}
private onDeviceCallResponse(data: ws.IDeviceCallResponse) {
const cb = this.deviceResponseCallbacks[data.id];
if (typeof cb === "function") {
cb(data);
}
}
}

6
common/logger.ts

@ -108,13 +108,9 @@ function formatLevel(value: any): ColoredString { @@ -108,13 +108,9 @@ function formatLevel(value: any): ColoredString {
}
}
let logger: pino.Logger = pino({
const logger: pino.Logger = pino({
browser: { write },
level: "trace",
});
export function setLogger(newLogger: pino.Logger) {
exports.default = logger = newLogger;
}
export default logger;

2
common/sprinklers/json/index.ts

@ -22,7 +22,7 @@ export const dateSchema: PropSchema = { @@ -22,7 +22,7 @@ export const dateSchema: PropSchema = {
jsDate.toISOString() : null,
deserializer: (json: any, done) => {
if (json === null) {
done(null, null);
return done(null, null);
}
try {
done(null, new Date(json));

24
common/sprinklers/websocketData.ts

@ -0,0 +1,24 @@ @@ -0,0 +1,24 @@
export interface IDeviceUpdate {
type: "deviceUpdate";
name: string;
data: any;
}
export interface IDeviceCallResponse {
type: "deviceCallResponse";
id: number;
result: "success" | "error";
data: any;
}
export type IServerMessage = IDeviceUpdate | IDeviceCallResponse;
export interface IDeviceCallRequest {
type: "deviceCallRequest";
id: number;
deviceName: string;
method: string;
args: any[];
}
export type IClientMessage = IDeviceCallRequest;

11
common/utils.ts

@ -1,7 +1,12 @@ @@ -1,7 +1,12 @@
export function checkedIndexOf<T>(o: T | number, arr: T[], type: string = "object"): number {
const idx = (typeof o === "number")
? o
: arr.indexOf(o);
let idx: number;
if (typeof o === "number") {
idx = o;
} else if (typeof (o as any).id === "number") {
idx = (o as any).id;
} else {
idx = arr.indexOf(o);
}
if (idx < 0 || idx > arr.length) {
throw new Error(`Invalid ${type} specified: ${o}`);
}

4
package.json

@ -46,6 +46,7 @@ @@ -46,6 +46,7 @@
"@types/react-dom": "^15.5.0",
"@types/react-fontawesome": "^1.5.0",
"@types/react-hot-loader": "^3.0.4",
"@types/ws": "^3.2.0",
"async": "^2.5.0",
"autoprefixer": "^7.1.4",
"case-sensitive-paths-webpack-plugin": "^2.1.1",
@ -75,7 +76,8 @@ @@ -75,7 +76,8 @@
"serializr": "^1.1.13",
"tslint-loader": "^3.5.3",
"uglifyjs-webpack-plugin": "^0.4.6",
"url-loader": "^0.5.9"
"url-loader": "^0.5.9",
"ws": "^3.2.0"
},
"devDependencies": {
"@types/webpack-env": "^1.13.0",

6
server/configureLogger.ts

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
import log, { setLogger } from "@common/logger";
setLogger(log.child({
import log from "@common/logger";
Object.assign(log, {
name: "sprinklers3/server",
level: "debug",
}));
});

78
server/index.ts

@ -6,32 +6,96 @@ import "./configureLogger"; @@ -6,32 +6,96 @@ import "./configureLogger";
import log from "@common/logger";
import * as mqtt from "@common/sprinklers/mqtt";
import { Server } from "http";
import * as WebSocket from "ws";
import app from "./app";
const mqttClient = new mqtt.MqttApiClient("mqtt://localhost:1883");
mqttClient.start();
import * as s from "@common/sprinklers";
import { sprinklersDeviceSchema } from "@common/sprinklers/json";
import * as ws from "@common/sprinklers/websocketData";
import { autorunAsync } from "mobx";
import { serialize } from "serializr";
const device = mqttClient.getDevice("grinklers");
autorunAsync(() => {
const j = serialize(sprinklersDeviceSchema, device);
log.info({ device: j });
}, 0);
app.get("/api/grinklers", (req, res) => {
const j = serialize(sprinklersDeviceSchema, device);
res.send(j);
});
const server = new Server(app);
async function doDeviceCallRequest(data: ws.IDeviceCallRequest): Promise<any> {
const { deviceName, method, args } = data;
if (deviceName !== "grinklers") {
// error handling? or just get the right device
return;
}
switch (method) {
case "runSection":
return device.runSection(args[0], s.Duration.fromSeconds(args[1]));
default:
// new Error(`unsupported device call: ${data.method}`) // TODO: error handling?
return;
}
}
async function deviceCallRequest(socket: WebSocket, data: ws.IDeviceCallRequest): Promise<void> {
let resData: ws.IDeviceCallResponse;
try {
const result = await doDeviceCallRequest(data);
resData = {
type: "deviceCallResponse",
id: data.id,
result: "success",
data: result,
};
} catch (err) {
resData = {
type: "deviceCallResponse",
id: data.id,
result: "error",
data: err,
};
}
socket.send(JSON.stringify(resData));
}
function webSocketHandler(socket: WebSocket) {
const stop = autorunAsync(() => {
const json = serialize(sprinklersDeviceSchema, device);
log.info({ device: json });
const data = { type: "deviceUpdate", name: "grinklers", data: json };
socket.send(JSON.stringify(data));
}, 100);
socket.on("message", (socketData: WebSocket.Data) => {
if (typeof socketData !== "string") {
return log.error({ type: typeof socketData }, "received invalid socket data type from client");
}
let data: ws.IClientMessage;
try {
data = JSON.parse(socketData);
} catch (err) {
return log.error({ event, err }, "received invalid websocket message from client");
}
switch (data.type) {
case "deviceCallRequest":
deviceCallRequest(socket, data);
break;
default:
return log.warn({ data }, "received invalid client message type");
}
});
socket.on("close", () => stop());
}
const port = +(process.env.PORT || 8080);
const host = process.env.HOST || "0.0.0.0";
const server = new Server(app);
const webSocketServer = new WebSocket.Server({ server });
webSocketServer.on("connection", webSocketHandler);
server.listen(port, host, () => {
log.info(`listening at ${host}:${port}`);
});

8
yarn.lock

@ -100,6 +100,12 @@ @@ -100,6 +100,12 @@
version "1.13.1"
resolved "https://registry.yarnpkg.com/@types/webpack-env/-/webpack-env-1.13.1.tgz#b45c222e24301bd006e3edfc762cc6b51bda236a"
"@types/ws@^3.2.0":
version "3.2.0"
resolved "https://registry.yarnpkg.com/@types/ws/-/ws-3.2.0.tgz#988ff690e6ed10068a86aa0e9f842d0a03c09e21"
dependencies:
"@types/node" "*"
abbrev@1:
version "1.1.0"
resolved "https://registry.yarnpkg.com/abbrev/-/abbrev-1.1.0.tgz#d0554c2256636e2f56e7c2e5ad183f859428d81f"
@ -5759,7 +5765,7 @@ write-file-atomic@^2.0.0: @@ -5759,7 +5765,7 @@ write-file-atomic@^2.0.0:
imurmurhash "^0.1.4"
signal-exit "^3.0.2"
ws@^3.0.0:
ws@^3.0.0, ws@^3.2.0:
version "3.2.0"
resolved "https://registry.yarnpkg.com/ws/-/ws-3.2.0.tgz#d5d3d6b11aff71e73f808f40cc69d52bb6d4a185"
dependencies:

Loading…
Cancel
Save