Browse Source

Refactored out websocket api and state stuff

update-deps
Alex Mikhalev 7 years ago
parent
commit
d5b65d7a99
  1. 2
      common/sprinklers/schema/list.ts
  2. 20
      server/app/index.ts
  3. 13
      server/index.ts
  4. 6
      server/state.ts
  5. 108
      server/websocket/index.ts

2
common/sprinklers/schema/list.ts

@ -2,7 +2,7 @@ import { primitive, PropSchema } from "serializr";
function invariant(cond: boolean, message?: string) { function invariant(cond: boolean, message?: string) {
if (!cond) { if (!cond) {
throw new Error("[serializr] " + (message || "Illegal State")); throw new Error("[serializr] " + (message || "Illegal ServerState"));
} }
} }

20
server/app/index.ts

@ -2,19 +2,21 @@ import * as express from "express";
import * as schema from "@common/sprinklers/schema"; import * as schema from "@common/sprinklers/schema";
import {serialize} from "serializr"; import {serialize} from "serializr";
import {state} from "../state"; import { ServerState } from "../state";
import logger from "./logger"; import logger from "./logger";
import serveApp from "./serveApp"; import serveApp from "./serveApp";
const app = express(); export function createApp(state: ServerState) {
const app = express();
app.use(logger); app.use(logger);
app.get("/api/grinklers", (req, res) => { app.get("/api/grinklers", (req, res) => {
const j = serialize(schema.sprinklersDevice, state.device); const j = serialize(schema.sprinklersDevice, state.device);
res.send(j); res.send(j);
}); });
serveApp(app); serveApp(app);
export default app; return app;
}

13
server/index.ts

@ -7,17 +7,20 @@ import log from "@common/logger";
import {Server} from "http"; import {Server} from "http";
import * as WebSocket from "ws"; import * as WebSocket from "ws";
import app from "./app"; import { ServerState } from "./state";
import {state} from "./state"; import { createApp } from "./app";
import {handler as webSocketHandler} from "./websocket"; import { WebSocketApi } from "./websocket";
const state = new ServerState();
const app = createApp(state);
const webSocketApi = new WebSocketApi(state);
const port = +(process.env.PORT || 8080); const port = +(process.env.PORT || 8080);
const host = process.env.HOST || "0.0.0.0"; const host = process.env.HOST || "0.0.0.0";
const server = new Server(app); const server = new Server(app);
const webSocketServer = new WebSocket.Server({server}); const webSocketServer = new WebSocket.Server({server});
webSocketServer.on("connection", webSocketApi.handleConnection);
webSocketServer.on("connection", webSocketHandler);
state.start(); state.start();
server.listen(port, host, () => { server.listen(port, host, () => {

6
server/state.ts

@ -1,7 +1,7 @@
import {SprinklersDevice} from "@common/sprinklers"; import {SprinklersDevice} from "@common/sprinklers";
import * as mqtt from "@common/sprinklers/mqtt"; import * as mqtt from "@common/sprinklers/mqtt";
export class State { export class ServerState {
mqttClient!: mqtt.MqttApiClient; mqttClient!: mqtt.MqttApiClient;
device!: SprinklersDevice; device!: SprinklersDevice;
@ -15,6 +15,4 @@ export class State {
this.mqttClient.start(); this.mqttClient.start();
} }
} }
export const state: State = new State();

108
server/websocket/index.ts

@ -1,61 +1,43 @@
import { autorun } from "mobx";
import { serialize } from "serializr";
import * as WebSocket from "ws";
import log from "@common/logger"; import log from "@common/logger";
import * as requests from "@common/sprinklers/requests"; import * as requests from "@common/sprinklers/requests";
import * as schema from "@common/sprinklers/schema"; import * as schema from "@common/sprinklers/schema";
import * as ws from "@common/sprinklers/websocketData"; import * as ws from "@common/sprinklers/websocketData";
import { autorun } from "mobx";
import { serialize } from "serializr";
import * as WebSocket from "ws";
import { ServerState } from "../state";
import { state } from "../state"; export class WebSocketApi {
state: ServerState;
async function doDeviceCallRequest(requestData: ws.IDeviceCallRequest) { constructor(state: ServerState) {
const { deviceName, data } = requestData; this.state = state;
if (deviceName !== "grinklers") {
// error handling? or just get the right device
return false;
} }
const request = schema.requests.deserializeRequest(data);
return state.device.makeRequest(request);
}
async function deviceCallRequest(socket: WebSocket, data: ws.IDeviceCallRequest): Promise<void> { handleConnection = (socket: WebSocket) => {
let response: requests.Response | false; const disposers = [
try { autorun(() => {
response = await doDeviceCallRequest(data); const json = serialize(schema.sprinklersDevice, this.state.device);
} catch (err) { log.trace({ device: json });
response = err; const data: ws.IDeviceUpdate = { type: "deviceUpdate", name: "grinklers", data: json };
} socket.send(JSON.stringify(data));
if (response) { }, { delay: 100 }),
const resData: ws.IDeviceCallResponse = { autorun(() => {
type: "deviceCallResponse", const data: ws.IBrokerConnectionUpdate = {
id: data.id, type: "brokerConnectionUpdate",
data: response, brokerConnected: this.state.mqttClient.connected,
};
socket.send(JSON.stringify(data));
}),
];
const stop = () => {
disposers.forEach((disposer) => disposer());
}; };
socket.send(JSON.stringify(resData)); socket.on("message", this.handleSocketMessage);
socket.on("close", () => stop());
} }
}
export function handler(socket: WebSocket) { private handleSocketMessage = (socket: WebSocket, socketData: WebSocket.Data) => {
const disposers = [
autorun(() => {
const json = serialize(schema.sprinklersDevice, state.device);
log.trace({ device: json });
const data: ws.IDeviceUpdate = { type: "deviceUpdate", name: "grinklers", data: json };
socket.send(JSON.stringify(data));
}, { delay: 100 }),
autorun(() => {
const data: ws.IBrokerConnectionUpdate = {
type: "brokerConnectionUpdate",
brokerConnected: state.mqttClient.connected,
};
socket.send(JSON.stringify(data));
}),
];
const stop = () => {
disposers.forEach((disposer) => disposer());
};
socket.on("message", (socketData: WebSocket.Data) => {
if (typeof socketData !== "string") { if (typeof socketData !== "string") {
return log.error({ type: typeof socketData }, "received invalid socket data type from client"); return log.error({ type: typeof socketData }, "received invalid socket data type from client");
} }
@ -67,11 +49,37 @@ export function handler(socket: WebSocket) {
} }
switch (data.type) { switch (data.type) {
case "deviceCallRequest": case "deviceCallRequest":
deviceCallRequest(socket, data); this.deviceCallRequest(socket, data);
break; break;
default: default:
return log.warn({ data }, "received invalid client message type"); return log.warn({ data }, "received invalid client message type");
} }
}); }
socket.on("close", () => stop());
private async deviceCallRequest(socket: WebSocket, data: ws.IDeviceCallRequest): Promise<void> {
let response: requests.Response | false;
try {
response = await this.doDeviceCallRequest(data);
} catch (err) {
response = err;
}
if (response) {
const resData: ws.IDeviceCallResponse = {
type: "deviceCallResponse",
id: data.id,
data: response,
};
socket.send(JSON.stringify(resData));
}
}
private async doDeviceCallRequest(requestData: ws.IDeviceCallRequest): Promise<requests.Response | false> {
const { deviceName, data } = requestData;
if (deviceName !== "grinklers") {
// error handling? or just get the right device
return false;
}
const request = schema.requests.deserializeRequest(data);
return this.state.device.makeRequest(request);
}
} }

Loading…
Cancel
Save