Browse Source

Vastly cleaned up websocket data format, made it a lot more like json rpc

update-deps
Alex Mikhalev 7 years ago
parent
commit
a179d69241
  1. 2
      app/components/RunSectionForm.tsx
  2. 171
      app/sprinklers/websocket.ts
  3. 155
      common/jsonRpc/index.ts
  4. 3
      common/sprinklers/ErrorCode.ts
  5. 18
      common/sprinklers/SprinklersDevice.ts
  6. 0
      common/sprinklers/deviceRequests.ts
  7. 2
      common/sprinklers/mqtt/index.ts
  8. 2
      common/sprinklers/schema/requests.ts
  9. 80
      common/sprinklers/websocketData.ts
  10. 2
      package.json
  11. 5
      server/express/authentication.ts
  12. 8
      server/express/index.ts
  13. 2
      server/models/Database.ts
  14. 8
      server/models/User.ts
  15. 3
      server/state.ts
  16. 195
      server/websocket/index.ts
  17. 2
      yarn.lock

2
app/components/RunSectionForm.tsx

@ -7,7 +7,7 @@ import { UiStore } from "@app/state"; @@ -7,7 +7,7 @@ import { UiStore } from "@app/state";
import { Duration } from "@common/Duration";
import log from "@common/logger";
import { Section, SprinklersDevice } from "@common/sprinklers";
import { RunSectionResponse } from "@common/sprinklers/requests";
import { RunSectionResponse } from "@common/sprinklers/deviceRequests";
import DurationInput from "./DurationInput";
@observer

171
app/sprinklers/websocket.ts

@ -1,10 +1,11 @@ @@ -1,10 +1,11 @@
import { action, observable, when } from "mobx";
import { update } from "serializr";
import * as rpc from "@common/jsonRpc";
import logger from "@common/logger";
import * as deviceRequests from "@common/sprinklers/deviceRequests";
import { ErrorCode } from "@common/sprinklers/ErrorCode";
import * as s from "@common/sprinklers/index";
import * as requests from "@common/sprinklers/requests";
import * as schema from "@common/sprinklers/schema/index";
import { seralizeRequest } from "@common/sprinklers/schema/requests";
import * as ws from "@common/sprinklers/websocketData";
@ -14,6 +15,8 @@ const log = logger.child({ source: "websocket" }); @@ -14,6 +15,8 @@ const log = logger.child({ source: "websocket" });
const TIMEOUT_MS = 5000;
const RECONNECT_TIMEOUT_MS = 5000;
// tslint:disable:member-ordering
export class WSSprinklersDevice extends s.SprinklersDevice {
readonly api: WebSocketApiClient;
@ -32,29 +35,27 @@ export class WSSprinklersDevice extends s.SprinklersDevice { @@ -32,29 +35,27 @@ export class WSSprinklersDevice extends s.SprinklersDevice {
return this._id;
}
subscribe() {
async subscribe() {
if (!this.api.socket) {
throw new Error("WebSocket not connected");
}
const subscribeRequest: ws.IDeviceSubscribeRequest = {
type: "deviceSubscribeRequest",
deviceId: this.id,
};
this.api.socket.send(JSON.stringify(subscribeRequest));
}
onSubscribeResponse(data: ws.IDeviceSubscribeResponse) {
this.connectionState.serverToBroker = true;
this.connectionState.clientToServer = true;
if (data.result === "success") {
this.connectionState.hasPermission = true;
this.connectionState.brokerToDevice = false;
} else if (data.result === "noPermission") {
this.connectionState.hasPermission = false;
try {
await this.api.makeRequest("deviceSubscribe", subscribeRequest);
this.connectionState.serverToBroker = true;
this.connectionState.clientToServer = true;
} catch (err) {
if ((err as ws.Error).code === ErrorCode.NoPermission) {
this.connectionState.hasPermission = false;
} else {
log.error({ err });
}
}
}
makeRequest(request: requests.Request): Promise<requests.Response> {
makeRequest(request: deviceRequests.Request): Promise<deviceRequests.Response> {
return this.api.makeDeviceCall(this.id, request);
}
}
@ -63,13 +64,11 @@ export class WebSocketApiClient implements s.ISprinklersApi { @@ -63,13 +64,11 @@ export class WebSocketApiClient implements s.ISprinklersApi {
readonly webSocketUrl: string;
devices: Map<string, WSSprinklersDevice> = new Map();
nextDeviceRequestId = Math.round(Math.random() * 1000000);
deviceResponseCallbacks: { [id: number]: (res: ws.IDeviceCallResponse) => void | undefined; } = {};
@observable connectionState: s.ConnectionState = new s.ConnectionState();
socket: WebSocket | null = null;
private nextRequestId = Math.round(Math.random() * 1000000);
private responseCallbacks: ws.ServerResponseHandlers = {};
private reconnectTimer: number | null = null;
get connected(): boolean {
@ -111,47 +110,72 @@ export class WebSocketApiClient implements s.ISprinklersApi { @@ -111,47 +110,72 @@ export class WebSocketApiClient implements s.ISprinklersApi {
// NOT IMPLEMENTED
}
async authenticate(accessToken: string): Promise<ws.IAuthenticateResponse> {
return this.makeRequest("authenticate", { accessToken });
}
// args must all be JSON serializable
makeDeviceCall(deviceId: string, request: requests.Request): Promise<requests.Response> {
async makeDeviceCall(deviceId: string, request: deviceRequests.Request): Promise<deviceRequests.Response> {
if (this.socket == null) {
const res: requests.Response = {
type: request.type,
result: "error",
const error: ws.Error = {
code: ErrorCode.ServerDisconnected,
message: "the server is not connected",
};
throw res;
throw error;
}
const requestData = seralizeRequest(request);
const id = this.nextDeviceRequestId++;
const data: ws.IDeviceCallRequest = {
type: "deviceCallRequest",
requestId: id, deviceId, data: requestData,
};
const promise = new Promise<requests.Response>((resolve, reject) => {
const data: ws.IDeviceCallRequest = { deviceId, data: requestData };
const resData = await this.makeRequest("deviceCall", data);
if (resData.data.result === "error") {
throw {
code: resData.data.code,
message: resData.data.message,
data: resData.data,
};
} else {
return resData.data;
}
}
makeRequest<Method extends ws.ClientRequestMethods>(method: Method, params: ws.IClientRequestTypes[Method]):
Promise<ws.IServerResponseTypes[Method]> {
const id = this.nextRequestId++;
return new Promise<ws.IServerResponseTypes[Method]>((resolve, reject) => {
let timeoutHandle: number;
this.deviceResponseCallbacks[id] = (resData) => {
this.responseCallbacks[id] = (response) => {
clearTimeout(timeoutHandle);
delete this.deviceResponseCallbacks[id];
if (resData.data.result === "success") {
resolve(resData.data);
delete this.responseCallbacks[id];
if (response.result === "success") {
resolve(response.data);
} else {
reject(resData.data);
reject(response.error);
}
};
timeoutHandle = window.setTimeout(() => {
delete this.deviceResponseCallbacks[id];
const res: requests.Response = {
type: request.type,
result: "error",
code: ErrorCode.Timeout,
message: "the request timed out",
delete this.responseCallbacks[id];
const res: ws.ErrorData = {
result: "error", error: {
code: ErrorCode.Timeout,
message: "the request timed out",
},
};
reject(res);
}, TIMEOUT_MS);
this.sendRequest(id, method, params);
});
}
private sendMessage(data: ws.ClientMessage) {
if (!this.socket) {
throw new Error("WebSocketApiClient is not connected");
}
this.socket.send(JSON.stringify(data));
return promise;
}
private sendRequest<Method extends ws.ClientRequestMethods>(
id: number, method: Method, params: ws.IClientRequestTypes[Method],
) {
this.sendMessage({ type: "request", id, method, params });
}
private _reconnect = () => {
@ -194,7 +218,7 @@ export class WebSocketApiClient implements s.ISprinklersApi { @@ -194,7 +218,7 @@ export class WebSocketApiClient implements s.ISprinklersApi {
}
private onMessage(event: MessageEvent) {
let data: ws.IServerMessage;
let data: ws.ServerMessage;
try {
data = JSON.parse(event.data);
} catch (err) {
@ -202,47 +226,46 @@ export class WebSocketApiClient implements s.ISprinklersApi { @@ -202,47 +226,46 @@ export class WebSocketApiClient implements s.ISprinklersApi {
}
log.trace({ data }, "websocket message");
switch (data.type) {
case "deviceSubscribeResponse":
this.onDeviceSubscribeResponse(data);
case "notification":
this.onNotification(data);
break;
case "deviceUpdate":
this.onDeviceUpdate(data);
break;
case "deviceCallResponse":
this.onDeviceCallResponse(data);
break;
case "brokerConnectionUpdate":
this.onBrokerConnectionUpdate(data);
case "response":
this.onResponse(data);
break;
default:
log.warn({ data }, "unsupported event type received");
}
}
private onDeviceSubscribeResponse(data: ws.IDeviceSubscribeResponse) {
const device = this.devices.get(data.deviceId);
if (!device) {
return log.warn({ data }, "invalid deviceSubscribeResponse received");
}
device.onSubscribeResponse(data);
}
private onDeviceUpdate(data: ws.IDeviceUpdate) {
const device = this.devices.get(data.deviceId);
if (!device) {
return log.warn({ data }, "invalid deviceUpdate received");
private onNotification(data: ws.ServerNotification) {
try {
rpc.handleNotification(this.notificationHandlers, data);
} catch (err) {
logger.error({ err }, "error handling server notification");
}
update(schema.sprinklersDevice, device, data.data);
}
private onDeviceCallResponse(data: ws.IDeviceCallResponse) {
const cb = this.deviceResponseCallbacks[data.requestId];
if (typeof cb === "function") {
cb(data);
private onResponse(data: ws.ServerResponse) {
try {
rpc.handleResponse(this.responseCallbacks, data);
} catch (err) {
log.error({ err }, "error handling server response");
}
}
private onBrokerConnectionUpdate(data: ws.IBrokerConnectionUpdate) {
this.connectionState.serverToBroker = data.brokerConnected;
}
private notificationHandlers: ws.ServerNotificationHandlers = {
brokerConnectionUpdate: (data: ws.IBrokerConnectionUpdate) => {
this.connectionState.serverToBroker = data.brokerConnected;
},
deviceUpdate: (data: ws.IDeviceUpdate) => {
const device = this.devices.get(data.deviceId);
if (!device) {
return log.warn({ data }, "invalid deviceUpdate received");
}
update(schema.sprinklersDevice, device, data.data);
},
error: (data: ws.Error) => {
log.warn({ err: data }, "server error");
},
};
}

155
common/jsonRpc/index.ts

@ -0,0 +1,155 @@ @@ -0,0 +1,155 @@
// tslint:disable:interface-over-type-literal
export type DefaultRequestTypes = {};
export type DefaultResponseTypes = {};
export type DefaultErrorType = {
code: number;
message: string;
data?: any;
};
export type DefaultNotificationTypes = {};
// tslint:enable:interface-over-type-literal
// export interface RpcTypes {
// RequestTypes: DefaultRequestTypes;
// ResponseTypes: DefaultResponseTypes;
// NotificationTypes: DefaultNotificationTypes;
// ErrorType: DefaultErrorType;
// }
export interface Request<RequestTypes = DefaultRequestTypes,
Method extends keyof RequestTypes = keyof RequestTypes> {
type: "request";
id: number;
method: Method;
params: RequestTypes[Method];
}
export interface ResponseBase<Method> {
type: "response";
id: number;
method: Method;
}
export interface SuccessData<ResponseType> {
result: "success";
data: ResponseType;
}
export interface ErrorData<ErrorType> {
result: "error";
error: ErrorType;
}
export type ResponseData<ResponseTypes, ErrorType,
Method extends keyof ResponseTypes = keyof ResponseTypes> =
SuccessData<ResponseTypes[Method]> | ErrorData<ErrorType>;
export type Response<ResponseTypes,
ErrorType = DefaultErrorType,
Method extends keyof ResponseTypes = keyof ResponseTypes> =
ResponseBase<Method> & ResponseData<ResponseTypes, ErrorType, Method>;
export interface Notification<NotificationTypes = DefaultNotificationTypes,
Method extends keyof NotificationTypes = keyof NotificationTypes> {
type: "notification";
method: Method;
data: NotificationTypes[Method];
}
export type Message<RequestTypes = DefaultRequestTypes,
ResponseTypes = DefaultResponseTypes,
ErrorType = DefaultErrorType,
NotificationTypes = DefaultNotificationTypes> =
Request<RequestTypes> |
Response<ResponseTypes, ErrorType> |
Notification<NotificationTypes>;
// export type TypesMessage<Types extends RpcTypes = RpcTypes> =
// Message<Types["RequestTypes"], Types["ResponseTypes"], Types["ErrorType"], Types["NotificationTypes"]>;
export function isRequestMethod<Method extends keyof RequestTypes, RequestTypes>(
message: Request<RequestTypes>, method: Method,
): message is Request<RequestTypes, Method> {
return message.method === method;
}
export function isResponseMethod<Method extends keyof ResponseTypes, ErrorType, ResponseTypes>(
message: Response<ResponseTypes, ErrorType>, method: Method,
): message is Response<ResponseTypes, ErrorType, Method> {
return message.method === method;
}
export function isNotificationMethod<Method extends keyof NotificationTypes, NotificationTypes = any>(
message: Notification<NotificationTypes>, method: Method,
): message is Notification<NotificationTypes, Method> {
return message.method === method;
}
export type IRequestHandler<RequestTypes, ResponseTypes extends { [M in Method]: any }, ErrorType,
Method extends keyof RequestTypes> =
(request: RequestTypes[Method]) => Promise<ResponseData<ResponseTypes, ErrorType, Method>>;
export type RequestHandlers<RequestTypes, ResponseTypes extends { [M in keyof RequestTypes]: any }, ErrorType> = {
[Method in keyof RequestTypes]:
IRequestHandler<RequestTypes, ResponseTypes, ErrorType, Method>;
};
export type IResponseHandler<ResponseTypes, ErrorType,
Method extends keyof ResponseTypes = keyof ResponseTypes> =
(response: ResponseData<ResponseTypes, ErrorType, Method>) => void;
export interface ResponseHandlers<ResponseTypes = DefaultResponseTypes, ErrorType = DefaultErrorType> {
[id: number]: IResponseHandler<ResponseTypes, ErrorType>;
}
export type NotificationHandler<NotificationTypes, Method extends keyof NotificationTypes> =
(notification: NotificationTypes[Method]) => void;
export type NotificationHandlers<NotificationTypes> = {
[Method in keyof NotificationTypes]: NotificationHandler<NotificationTypes, Method>;
};
export function listRequestHandlerMethods<RequestTypes,
ResponseTypes extends { [Method in keyof RequestTypes]: any }, ErrorType>(
handlers: RequestHandlers<RequestTypes, ResponseTypes, ErrorType>,
): Array<keyof RequestTypes> {
return Object.keys(handlers) as any;
}
export function listNotificationHandlerMethods<NotificationTypes>(
handlers: NotificationHandlers<NotificationTypes>,
): Array<keyof NotificationTypes> {
return Object.keys(handlers) as any;
}
export async function handleRequest<RequestTypes,
ResponseTypes extends { [Method in keyof RequestTypes]: any }, ErrorType>(
handlers: RequestHandlers<RequestTypes, ResponseTypes, ErrorType>,
message: Request<RequestTypes>,
): Promise<ResponseData<ResponseTypes, ErrorType>> {
const handler = handlers[message.method];
if (!handler) {
throw new Error("No handler for request method " + message.method);
}
return handler(message.params);
}
export function handleResponse<ResponseTypes, ErrorType>(
handlers: ResponseHandlers<ResponseTypes, ErrorType>,
message: Response<ResponseTypes, ErrorType>) {
const handler = handlers[message.id];
if (!handler) {
return;
}
return handler(message);
}
export function handleNotification<NotificationTypes>(
handlers: NotificationHandlers<NotificationTypes>,
message: Notification<NotificationTypes>) {
const handler = handlers[message.method];
if (!handler) {
throw new Error("No handler for notification method " + message.method);
}
return handler(message.data);
}

3
common/sprinklers/ErrorCode.ts

@ -4,6 +4,9 @@ export enum ErrorCode { @@ -4,6 +4,9 @@ export enum ErrorCode {
Parse = 102,
Range = 103,
InvalidData = 104,
BadToken = 105,
Unauthorized = 106,
NoPermission = 107,
Internal = 200,
Timeout = 300,
ServerDisconnected = 301,

18
common/sprinklers/SprinklersDevice.ts

@ -1,7 +1,7 @@ @@ -1,7 +1,7 @@
import { computed, observable } from "mobx";
import { ConnectionState } from "./ConnectionState";
import * as req from "./deviceRequests";
import { Program } from "./Program";
import * as requests from "./requests";
import { Section } from "./Section";
import { SectionRunner } from "./SectionRunner";
@ -25,33 +25,33 @@ export abstract class SprinklersDevice { @@ -25,33 +25,33 @@ export abstract class SprinklersDevice {
abstract get id(): string;
abstract makeRequest(request: requests.Request): Promise<requests.Response>;
abstract makeRequest(request: req.Request): Promise<req.Response>;
runProgram(opts: requests.WithProgram) {
runProgram(opts: req.WithProgram) {
return this.makeRequest({ ...opts, type: "runProgram" });
}
cancelProgram(opts: requests.WithProgram) {
cancelProgram(opts: req.WithProgram) {
return this.makeRequest({ ...opts, type: "cancelProgram" });
}
updateProgram(opts: requests.UpdateProgramData): Promise<requests.UpdateProgramResponse> {
updateProgram(opts: req.UpdateProgramData): Promise<req.UpdateProgramResponse> {
return this.makeRequest({ ...opts, type: "updateProgram" }) as Promise<any>;
}
runSection(opts: requests.RunSectionData): Promise<requests.RunSectionResponse> {
runSection(opts: req.RunSectionData): Promise<req.RunSectionResponse> {
return this.makeRequest({ ...opts, type: "runSection" }) as Promise<any>;
}
cancelSection(opts: requests.WithSection) {
cancelSection(opts: req.WithSection) {
return this.makeRequest({ ...opts, type: "cancelSection" });
}
cancelSectionRunId(opts: requests.CancelSectionRunIdData) {
cancelSectionRunId(opts: req.CancelSectionRunIdData) {
return this.makeRequest({ ...opts, type: "cancelSectionRunId" });
}
pauseSectionRunner(opts: requests.PauseSectionRunnerData) {
pauseSectionRunner(opts: req.PauseSectionRunnerData) {
return this.makeRequest({ ...opts, type: "pauseSectionRunner" });
}

0
common/sprinklers/requests.ts → common/sprinklers/deviceRequests.ts

2
common/sprinklers/mqtt/index.ts

@ -4,7 +4,7 @@ import { update } from "serializr"; @@ -4,7 +4,7 @@ import { update } from "serializr";
import logger from "@common/logger";
import * as s from "@common/sprinklers";
import * as requests from "@common/sprinklers/requests";
import * as requests from "@common/sprinklers/deviceRequests";
import * as schema from "@common/sprinklers/schema";
import { seralizeRequest } from "@common/sprinklers/schema/requests";

2
common/sprinklers/schema/requests.ts

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
import { createSimpleSchema, deserialize, ModelSchema, primitive, serialize } from "serializr";
import * as requests from "../requests";
import * as requests from "../deviceRequests";
import * as common from "./common";
export const withType: ModelSchema<requests.WithType> = createSimpleSchema({

80
common/sprinklers/websocketData.ts

@ -1,52 +1,76 @@ @@ -1,52 +1,76 @@
import { Response as ResponseData } from "@common/sprinklers/requests";
import * as rpc from "../jsonRpc/index";
export interface IError {
type: "error";
message: string;
data: any;
import { Response as ResponseData } from "@common/sprinklers/deviceRequests";
export interface IAuthenticateRequest {
accessToken: string;
}
export interface IDeviceSubscribeResponse {
type: "deviceSubscribeResponse";
export interface IDeviceSubscribeRequest {
deviceId: string;
result: "success" | "noPermission";
}
export interface IDeviceUpdate {
type: "deviceUpdate";
export interface IDeviceCallRequest {
deviceId: string;
data: any;
}
export interface IClientRequestTypes {
"authenticate": IAuthenticateRequest;
"deviceSubscribe": IDeviceSubscribeRequest;
"deviceCall": IDeviceCallRequest;
}
export interface IAuthenticateResponse {
authenticated: boolean;
message: string;
data?: any;
}
export interface IDeviceSubscribeResponse {
deviceId: string;
}
export interface IDeviceCallResponse {
type: "deviceCallResponse";
requestId: number;
data: ResponseData;
}
export interface IBrokerConnectionUpdate {
type: "brokerConnectionUpdate";
brokerConnected: boolean;
export interface IServerResponseTypes {
"authenticate": IAuthenticateResponse;
"deviceSubscribe": IDeviceSubscribeResponse;
"deviceCall": IDeviceCallResponse;
}
export type IServerMessage = IError | IDeviceSubscribeResponse | IDeviceUpdate | IDeviceCallResponse |
IBrokerConnectionUpdate;
export type ClientRequestMethods = keyof IClientRequestTypes;
export interface IAuthenticateRequest {
type: "authenticateRequest";
accessToken: string;
export interface IBrokerConnectionUpdate {
brokerConnected: boolean;
}
export interface IDeviceSubscribeRequest {
type: "deviceSubscribeRequest";
export interface IDeviceUpdate {
deviceId: string;
data: any;
}
export interface IDeviceCallRequest {
type: "deviceCallRequest";
requestId: number;
deviceId: string;
data: any;
export interface IServerNotificationTypes {
"brokerConnectionUpdate": IBrokerConnectionUpdate;
"deviceUpdate": IDeviceUpdate;
"error": Error;
}
export type ServerNotificationMethod = keyof IServerNotificationTypes;
export type Error = rpc.DefaultErrorType;
export type ErrorData = rpc.ErrorData<Error>;
export type ServerMessage = rpc.Message<{}, IServerResponseTypes, Error, IServerNotificationTypes>;
export type ServerNotification = rpc.Notification<IServerNotificationTypes>;
export type ServerResponse = rpc.Response<IServerResponseTypes, Error>;
export type ServerResponseData<Method extends keyof IServerResponseTypes = keyof IServerResponseTypes> =
rpc.ResponseData<IServerResponseTypes, Error, Method>;
export type ServerResponseHandlers = rpc.ResponseHandlers<IServerResponseTypes, Error>;
export type ServerNotificationHandlers = rpc.NotificationHandlers<IServerNotificationTypes>;
export type IClientMessage = IDeviceSubscribeRequest | IDeviceCallRequest;
export type ClientRequest<Method extends keyof IClientRequestTypes = keyof IClientRequestTypes> =
rpc.Request<IClientRequestTypes, Method>;
export type ClientMessage = rpc.Message<IClientRequestTypes, {}, Error, {}>;
export type ClientRequestHandlers = rpc.RequestHandlers<IClientRequestTypes, IServerResponseTypes, Error>;

2
package.json

@ -60,7 +60,6 @@ @@ -60,7 +60,6 @@
"@types/core-js": "^2.5.0",
"@types/express": "^4.16.0",
"@types/jsonwebtoken": "^7.2.7",
"@types/lodash": "^4.14.110",
"@types/lodash-es": "^4.17.0",
"@types/node": "^10.3.5",
"@types/object-assign": "^4.0.30",
@ -83,7 +82,6 @@ @@ -83,7 +82,6 @@
"font-awesome": "^4.7.0",
"happypack": "^5.0.0",
"html-webpack-plugin": "^3.2.0",
"lodash": "^4.17.10",
"lodash-es": "^4.17.10",
"mini-css-extract-plugin": "^0.4.0",
"mobx-react": "^5.2.3",

5
server/express/authentication.ts

@ -1,4 +1,3 @@ @@ -1,4 +1,3 @@
import log from "@common/logger";
import * as Express from "express";
import Router from "express-promise-router";
import * as jwt from "jsonwebtoken";
@ -29,7 +28,7 @@ function getExpTime(lifetime: number) { @@ -29,7 +28,7 @@ function getExpTime(lifetime: number) {
return Math.floor(Date.now() / 1000) + lifetime;
}
interface TokenClaims {
export interface TokenClaims {
iss: string;
type: "access" | "refresh";
aud: string;
@ -49,7 +48,7 @@ function signToken(claims: TokenClaims): Promise<string> { @@ -49,7 +48,7 @@ function signToken(claims: TokenClaims): Promise<string> {
});
}
function verifyToken(token: string): Promise<TokenClaims> {
export function verifyToken(token: string): Promise<TokenClaims> {
return new Promise((resolve, reject) => {
jwt.verify(token, JWT_SECRET, (err, decoded) => {
if (err) {

8
server/express/index.ts

@ -1,6 +1,6 @@ @@ -1,6 +1,6 @@
import * as bodyParser from "body-parser";
import * as express from "express";
import { serialize, serializeAll } from "serializr";
import { serialize} from "serializr";
import * as schema from "@common/sprinklers/schema";
import { ServerState } from "../state";
@ -16,8 +16,10 @@ export function createApp(state: ServerState) { @@ -16,8 +16,10 @@ export function createApp(state: ServerState) {
app.use(logger);
app.use(bodyParser.json());
app.get("/api/grinklers", (req, res) => {
const j = serialize(schema.sprinklersDevice, state.device);
app.get("/api/devices/:deviceId", (req, res) => {
// TODO: authorize device
const device = state.mqttClient.getDevice(req.params.deviceId);
const j = serialize(schema.sprinklersDevice, device);
res.send(j);
});

2
server/models/Database.ts

@ -1,7 +1,7 @@ @@ -1,7 +1,7 @@
import * as r from "rethinkdb";
import { User } from "./User";
import logger from "@common/logger";
import { User } from "./User";
export class Database {
static readonly databaseName = "sprinklers3";

8
server/models/User.ts

@ -1,6 +1,6 @@ @@ -1,6 +1,6 @@
import * as bcrypt from "bcrypt";
import * as r from "rethinkdb";
import { createModelSchema, deserialize, primitive, serialize, update } from "serializr";
import { createModelSchema, primitive, serialize, update } from "serializr";
import { Database } from "./Database";
@ -23,10 +23,6 @@ export class User implements IUser { @@ -23,10 +23,6 @@ export class User implements IUser {
private db: Database;
private get _db() {
return this.db.db;
}
private get table() {
return this.db.db.table(User.tableName);
}
@ -76,7 +72,7 @@ export class User implements IUser { @@ -76,7 +72,7 @@ export class User implements IUser {
async create() {
const data = serialize(this);
delete data.id;
const a = this.table
await this.table
.insert(data)
.run(this.db.conn);
}

3
server/state.ts

@ -1,11 +1,9 @@ @@ -1,11 +1,9 @@
import logger from "@common/logger";
import {SprinklersDevice} from "@common/sprinklers";
import * as mqtt from "@common/sprinklers/mqtt";
import { Database } from "./models/Database";
export class ServerState {
mqttClient: mqtt.MqttApiClient;
device: SprinklersDevice;
database: Database;
constructor() {
@ -14,7 +12,6 @@ export class ServerState { @@ -14,7 +12,6 @@ export class ServerState {
throw new Error("Must specify a MQTT_URL to connect to");
}
this.mqttClient = new mqtt.MqttApiClient(mqttUrl);
this.device = this.mqttClient.getDevice("grinklers");
this.database = new Database();
}

195
server/websocket/index.ts

@ -2,12 +2,17 @@ import { autorun } from "mobx"; @@ -2,12 +2,17 @@ import { autorun } from "mobx";
import { serialize } from "serializr";
import * as WebSocket from "ws";
import * as rpc from "@common/jsonRpc";
import log from "@common/logger";
import * as requests from "@common/sprinklers/requests";
import * as deviceRequests from "@common/sprinklers/deviceRequests";
import { ErrorCode } from "@common/sprinklers/ErrorCode";
import * as schema from "@common/sprinklers/schema";
import * as ws from "@common/sprinklers/websocketData";
import { TokenClaims, verifyToken } from "../express/authentication";
import { ServerState } from "../state";
// tslint:disable:member-ordering
export class WebSocketClient {
api: WebSocketApi;
socket: WebSocket;
@ -30,10 +35,9 @@ export class WebSocketClient { @@ -30,10 +35,9 @@ export class WebSocketClient {
start() {
this.disposers.push(autorun(() => {
const updateData: ws.IBrokerConnectionUpdate = {
type: "brokerConnectionUpdate",
brokerConnected: this.state.mqttClient.connected,
};
this.socket.send(JSON.stringify(updateData));
this.sendNotification("brokerConnectionUpdate", updateData);
}));
this.socket.on("message", this.handleSocketMessage);
this.socket.on("close", this.stop);
@ -44,6 +48,94 @@ export class WebSocketClient { @@ -44,6 +48,94 @@ export class WebSocketClient {
this.api.removeClient(this);
}
private requestHandlers: ws.ClientRequestHandlers = {
authenticate: async (data: ws.IAuthenticateRequest) => {
if (!data.accessToken) {
return {
result: "error", error: {
code: ErrorCode.BadRequest, message: "no token specified",
},
};
}
let decoded: TokenClaims;
try {
decoded = await verifyToken(data.accessToken);
} catch (e) {
return {
result: "error",
error: { code: ErrorCode.BadToken, message: "invalid token", data: e },
};
}
this.userId = decoded.aud;
return {
result: "success",
data: { authenticated: true, message: "authenticated" },
};
},
deviceSubscribe: async (data: ws.IDeviceSubscribeRequest) => {
const deviceId = data.deviceId;
if (deviceId !== "grinklers") { // TODO: somehow validate this device id?
return {
result: "error", error: {
code: ErrorCode.NoPermission,
message: "you do not have permission to subscribe to this device",
},
};
}
if (this.deviceSubscriptions.indexOf(deviceId) === -1) {
this.deviceSubscriptions.push(deviceId);
const device = this.state.mqttClient.getDevice(deviceId);
log.debug({ deviceId, userId: this.userId }, "websocket client subscribed to device");
this.disposers.push(autorun(() => {
const json = serialize(schema.sprinklersDevice, device);
log.trace({ device: json });
const updateData: ws.IDeviceUpdate = { deviceId, data: json };
this.sendNotification("deviceUpdate", updateData);
}, { delay: 100 }));
}
const response: ws.IDeviceSubscribeResponse = {
deviceId,
};
return { result: "success", data: response };
},
deviceCall: async (data: ws.IDeviceCallRequest) => {
try {
const response = await this.doDeviceCallRequest(data);
const resData: ws.IDeviceCallResponse = {
data: response,
};
return { result: "success", data: resData };
} catch (err) {
const e: deviceRequests.ErrorResponseData = err;
return {
result: "error", error: {
code: e.code,
message: e.message,
data: e,
},
};
}
},
};
private sendMessage(data: ws.ServerMessage) {
this.socket.send(JSON.stringify(data));
}
private sendNotification<Method extends ws.ServerNotificationMethod>(
method: Method,
data: ws.IServerNotificationTypes[Method]) {
this.sendMessage({ type: "notification", method, data });
}
private sendResponse<Method extends ws.ClientRequestMethods>(
method: Method,
id: number,
data: ws.ServerResponseData<Method>) {
this.sendMessage({ type: "response", method, id, ...data });
}
private handleSocketMessage = (socketData: WebSocket.Data) => {
this.doHandleSocketMessage(socketData)
.catch((err) => {
@ -53,85 +145,64 @@ export class WebSocketClient { @@ -53,85 +145,64 @@ export class WebSocketClient {
private async doHandleSocketMessage(socketData: WebSocket.Data) {
if (typeof socketData !== "string") {
return this.onError({ type: typeof socketData }, "received invalid socket data type from client");
return this.onError({ type: typeof socketData },
"received invalid socket data type from client", ErrorCode.Parse);
}
let data: ws.IClientMessage;
let data: ws.ClientMessage;
try {
data = JSON.parse(socketData);
} catch (err) {
return this.onError({ event, err }, "received invalid websocket message from client");
return this.onError({ socketData, err }, "received invalid websocket message from client",
ErrorCode.Parse);
}
log.debug({ data }, "client message");
switch (data.type) {
case "deviceSubscribeRequest":
this.deviceSubscribeRequest(data);
break;
case "deviceCallRequest":
await this.deviceCallRequest(data);
case "request":
await this.handleRequest(data);
break;
default:
return this.onError({ data }, "received invalid client message type");
return this.onError({ data }, "received invalid message type from client",
ErrorCode.BadRequest);
}
}
private onError(data: any, message: string) {
log.error(data, message);
const errorData: ws.IError = {
type: "error", message, data,
};
this.socket.send(JSON.stringify(errorData));
}
private deviceSubscribeRequest(data: ws.IDeviceSubscribeRequest) {
const deviceId = data.deviceId;
let result: ws.IDeviceSubscribeResponse["result"];
if (deviceId !== "grinklers") { // TODO: somehow validate this device id?
result = "noPermission";
private async handleRequest(request: ws.ClientRequest) {
let response: ws.ServerResponseData;
if (!this.requestHandlers[request.method]) {
log.warn({ method: request.method }, "received invalid client request method");
response = {
result: "error", error: {
code: ErrorCode.BadRequest, message: "received invalid client request method",
},
};
} else {
if (this.deviceSubscriptions.indexOf(deviceId) !== -1) {
return;
try {
response = await rpc.handleRequest(this.requestHandlers, request);
} catch (err) {
log.error({ method: request.method, err }, "error during processing of client request");
response = {
result: "error", error: {
code: ErrorCode.Internal, message: "error during processing of client request",
data: err.toString(),
},
};
}
this.deviceSubscriptions.push(deviceId);
const device = this.state.mqttClient.getDevice(deviceId);
log.debug({ deviceId, userId: this.userId }, "websocket client subscribed to device");
this.disposers.push(autorun(() => {
const json = serialize(schema.sprinklersDevice, device);
log.trace({ device: json });
const updateData: ws.IDeviceUpdate = { type: "deviceUpdate", deviceId, data: json };
this.socket.send(JSON.stringify(updateData));
}, { delay: 100 }));
result = "success";
}
const response: ws.IDeviceSubscribeResponse = {
type: "deviceSubscribeResponse", deviceId, result,
};
this.socket.send(JSON.stringify(response));
this.sendResponse(request.method, request.id, response);
}
private async deviceCallRequest(data: ws.IDeviceCallRequest): Promise<void> {
let response: requests.Response | false;
try {
response = await this.doDeviceCallRequest(data);
} catch (err) {
response = err;
}
if (response) {
const resData: ws.IDeviceCallResponse = {
type: "deviceCallResponse",
requestId: data.requestId,
data: response,
};
this.socket.send(JSON.stringify(resData));
}
private onError(data: any, message: string, code: number = ErrorCode.Internal) {
log.error(data, message);
const errorData: ws.Error = { code, message, data };
this.sendNotification("error", errorData);
}
private async doDeviceCallRequest(requestData: ws.IDeviceCallRequest): Promise<requests.Response | false> {
private async doDeviceCallRequest(requestData: ws.IDeviceCallRequest): Promise<deviceRequests.Response> {
const { deviceId, data } = requestData;
if (deviceId !== "grinklers") {
// error handling? or just get the right device
return false;
}
const device = this.state.mqttClient.getDevice(deviceId);
// TODO: authorize the requests
const request = schema.requests.deserializeRequest(data);
return this.state.device.makeRequest(request);
return device.makeRequest(request);
}
}

2
yarn.lock

@ -88,7 +88,7 @@ @@ -88,7 +88,7 @@
dependencies:
"@types/lodash" "*"
"@types/lodash@*", "@types/lodash@^4.14.110":
"@types/lodash@*":
version "4.14.110"
resolved "https://registry.yarnpkg.com/@types/lodash/-/lodash-4.14.110.tgz#fb07498f84152947f30ea09d89207ca07123461e"

Loading…
Cancel
Save