Browse Source

Properly subscribe to and unsubscribe from devices

update-deps
Alex Mikhalev 6 years ago
parent
commit
9296226036
  1. 1
      Dockerfile
  2. 61
      client/components/DeviceView.tsx
  3. 2
      client/pages/LogoutPage.tsx
  4. 92
      client/pages/ProgramPage.tsx
  5. 26
      client/sprinklersRpc/WSSprinklersDevice.ts
  6. 50
      client/sprinklersRpc/WebSocketRpcClient.ts
  7. 16
      common/TypedEventEmitter.ts
  8. 15
      common/jsonRpc/index.ts
  9. 21
      common/sprinklersRpc/RpcError.ts
  10. 33
      common/sprinklersRpc/SprinklersDevice.ts
  11. 30
      common/sprinklersRpc/SprinklersRPC.ts
  12. 2
      common/sprinklersRpc/index.ts
  13. 83
      common/sprinklersRpc/mqtt/index.ts
  14. 22
      common/sprinklersRpc/websocketData.ts
  15. 8
      common/utils.ts
  16. 3
      server/express/api/devices.ts
  17. 22
      server/express/authentication.ts
  18. 2
      server/index.ts
  19. 26
      server/sprinklersRpc/WebSocketApi.ts
  20. 262
      server/sprinklersRpc/WebSocketConnection.ts
  21. 247
      server/sprinklersRpc/websocketServer.ts

1
Dockerfile

@ -24,5 +24,4 @@ COPY --from=builder /app/dist ./dist
COPY --from=builder /app/public ./public COPY --from=builder /app/public ./public
EXPOSE 8080 EXPOSE 8080
EXPOSE 8081
ENTRYPOINT [ "node", "." ] ENTRYPOINT [ "node", "." ]

61
client/components/DeviceView.tsx

@ -52,10 +52,22 @@ interface DeviceViewProps {
inList?: boolean; inList?: boolean;
} }
class DeviceView extends React.Component<DeviceViewProps & RouteComponentProps<any>> { class DeviceView extends React.Component<DeviceViewProps> {
renderBody(iDevice: ISprinklersDevice, device: SprinklersDevice) { deviceInfo: ISprinklersDevice | null = null;
device: SprinklersDevice | null = null;
componentWillUnmount() {
if (this.device) {
this.device.release();
}
}
renderBody() {
const { inList, appState: { uiStore, routerStore } } = this.props; const { inList, appState: { uiStore, routerStore } } = this.props;
const { connectionState, sectionRunner, sections } = device; if (!this.deviceInfo || !this.device) {
return null;
}
const { connectionState, sectionRunner, sections } = this.device;
if (!connectionState.isAvailable || inList) { if (!connectionState.isAvailable || inList) {
return null; return null;
} }
@ -69,30 +81,51 @@ class DeviceView extends React.Component<DeviceViewProps & RouteComponentProps<a
<SectionTable sections={sections} /> <SectionTable sections={sections} />
</Grid.Column> </Grid.Column>
<Grid.Column mobile="16" tablet="7" computer="7" largeScreen="4"> <Grid.Column mobile="16" tablet="7" computer="7" largeScreen="4">
<RunSectionForm device={device} uiStore={uiStore} /> <RunSectionForm device={this.device} uiStore={uiStore} />
</Grid.Column> </Grid.Column>
</Grid> </Grid>
<ProgramTable iDevice={iDevice} device={device} routerStore={routerStore} /> <ProgramTable iDevice={this.deviceInfo} device={this.device} routerStore={routerStore} />
<Route path={route.program(":deviceId", ":programId")} component={p.ProgramPage} /> <Route path={route.program(":deviceId", ":programId")} component={p.ProgramPage} />
</React.Fragment> </React.Fragment>
); );
} }
updateDevice() {
const { userStore, sprinklersRpc } = this.props.appState;
const id = this.props.deviceId;
// tslint:disable-next-line:prefer-conditional-expression
if (this.deviceInfo == null || this.deviceInfo.id !== id) {
this.deviceInfo = userStore.findDevice(id);
}
if (!this.deviceInfo || !this.deviceInfo.deviceId) {
if (this.device) {
this.device.release();
this.device = null;
}
} else {
if (this.device == null || this.device.id !== this.deviceInfo.deviceId) {
if (this.device) {
this.device.release();
}
this.device = sprinklersRpc.acquireDevice(this.deviceInfo.deviceId);
}
}
}
render() { render() {
const { deviceId, inList, appState: { sprinklersRpc, userStore } } = this.props; this.updateDevice();
const iDevice = userStore.findDevice(deviceId); const { inList } = this.props;
let itemContent: React.ReactNode; let itemContent: React.ReactNode;
if (!iDevice || !iDevice.deviceId) { if (!this.deviceInfo || !this.device) {
// TODO: better and link back to devices list // TODO: better and link back to devices list
itemContent = <span>You do not have access to this device</span>; itemContent = <span>You do not have access to this device</span>;
} else { } else {
const device = sprinklersRpc.getDevice(iDevice.deviceId); const { connectionState } = this.device;
const { connectionState } = device;
let header: React.ReactNode; let header: React.ReactNode;
if (inList) { // tslint:disable-line:prefer-conditional-expression if (inList) { // tslint:disable-line:prefer-conditional-expression
header = <Link to={route.device(iDevice.id)}>Device <kbd>{iDevice.name}</kbd></Link>; header = <Link to={route.device(this.deviceInfo.id)}>Device <kbd>{this.deviceInfo.name}</kbd></Link>;
} else { } else {
header = <span>Device <kbd>{iDevice.name}</kbd></span>; header = <span>Device <kbd>{this.deviceInfo.name}</kbd></span>;
} }
itemContent = ( itemContent = (
<React.Fragment> <React.Fragment>
@ -105,7 +138,7 @@ class DeviceView extends React.Component<DeviceViewProps & RouteComponentProps<a
<Item.Meta> <Item.Meta>
Raspberry Pi Grinklers Device Raspberry Pi Grinklers Device
</Item.Meta> </Item.Meta>
{this.renderBody(iDevice, device)} {this.renderBody()}
</Item.Content> </Item.Content>
</React.Fragment> </React.Fragment>
); );
@ -114,4 +147,4 @@ class DeviceView extends React.Component<DeviceViewProps & RouteComponentProps<a
} }
} }
export default injectState(withRouter(observer(DeviceView))); export default injectState(observer(DeviceView));

2
client/pages/LogoutPage.tsx

@ -5,7 +5,7 @@ import { AppState, ConsumeState } from "@client/state";
export default function LogoutPage() { export default function LogoutPage() {
function consumeState(appState: AppState) { function consumeState(appState: AppState) {
appState.tokenStore.clear(); appState.tokenStore.clearAll();
return ( return (
<Redirect to="/login" /> <Redirect to="/login" />
); );

92
client/pages/ProgramPage.tsx

@ -1,4 +1,4 @@
import { assign, merge } from "lodash"; import { assign } from "lodash";
import { observer } from "mobx-react"; import { observer } from "mobx-react";
import * as qs from "query-string"; import * as qs from "query-string";
import * as React from "react"; import * as React from "react";
@ -23,11 +23,60 @@ class ProgramPage extends React.Component<ProgramPageProps> {
return qs.parse(this.props.location.search).editing != null; return qs.parse(this.props.location.search).editing != null;
} }
iDevice!: ISprinklersDevice; deviceInfo: ISprinklersDevice | null = null;
device!: SprinklersDevice; device: SprinklersDevice | null = null;
program!: Program; program: Program | null = null;
programView: Program | null = null; programView: Program | null = null;
componentWillUnmount() {
if (this.device) {
this.device.release();
}
}
updateProgram() {
const { userStore, sprinklersRpc } = this.props.appState;
const devId = Number(this.props.match.params.deviceId);
const programId = Number(this.props.match.params.programId);
// tslint:disable-next-line:prefer-conditional-expression
if (this.deviceInfo == null || this.deviceInfo.id !== devId) {
this.deviceInfo = userStore.findDevice(devId);
}
if (!this.deviceInfo || !this.deviceInfo.deviceId) {
if (this.device) {
this.device.release();
this.device = null;
}
return;
} else {
if (this.device == null || this.device.id !== this.deviceInfo.deviceId) {
if (this.device) {
this.device.release();
}
this.device = sprinklersRpc.acquireDevice(this.deviceInfo.deviceId);
}
}
if (!this.program || this.program.id !== programId) {
if (this.device.programs.length > programId && programId >= 0) {
this.program = this.device.programs[programId];
} else {
return;
}
}
if (this.isEditing) {
if (this.programView == null && this.program) {
// this.programView = createViewModel(this.program);
// this.programView = observable(toJS(this.program));
this.programView = this.program.clone();
}
} else {
if (this.programView != null) {
// this.programView.reset();
this.programView = null;
}
}
}
renderName(program: Program) { renderName(program: Program) {
const { name } = program; const { name } = program;
if (this.isEditing) { if (this.isEditing) {
@ -98,39 +147,12 @@ class ProgramPage extends React.Component<ProgramPageProps> {
} }
render() { render() {
const { deviceId: did, programId: pid } = this.props.match.params; this.updateProgram();
const { userStore, sprinklersRpc } = this.props.appState;
const deviceId = Number(did);
const programId = Number(pid);
// tslint:disable-next-line:prefer-conditional-expression
if (!this.iDevice || this.iDevice.id !== deviceId) {
this.iDevice = userStore.findDevice(deviceId)!;
}
if (this.iDevice && this.iDevice.deviceId && (!this.device || this.device.id !== this.iDevice.deviceId)) {
this.device = sprinklersRpc.getDevice(this.iDevice.deviceId);
}
// tslint:disable-next-line:prefer-conditional-expression
if (!this.program || this.program.id !== programId) {
if (this.device.programs.length > programId && programId >= 0) {
this.program = this.device.programs[programId];
} else {
return null;
}
}
if (this.isEditing) {
if (this.programView == null && this.program) {
// this.programView = createViewModel(this.program);
// this.programView = observable(toJS(this.program));
this.programView = this.program.clone();
}
} else {
if (this.programView != null) {
// this.programView.reset();
this.programView = null;
}
}
const program = this.programView || this.program; const program = this.programView || this.program;
if (!this.device || !program) {
return null;
}
const editing = this.isEditing; const editing = this.isEditing;
const { running, enabled, schedule, sequence } = program; const { running, enabled, schedule, sequence } = program;

26
client/sprinklersRpc/WSSprinklersDevice.ts

@ -9,17 +9,15 @@ import { log, WebSocketRpcClient } from "./WebSocketRpcClient";
// tslint:disable:member-ordering // tslint:disable:member-ordering
export class WSSprinklersDevice extends s.SprinklersDevice { export class WSSprinklersDevice extends s.SprinklersDevice {
readonly api: WebSocketRpcClient; readonly api: WebSocketRpcClient;
private _id: string;
constructor(api: WebSocketRpcClient, id: string) { constructor(api: WebSocketRpcClient, id: string) {
super(); super(api, id);
this.api = api; this.api = api;
this._id = id;
autorun(this.updateConnectionState); autorun(this.updateConnectionState);
this.waitSubscribe(); this.waitSubscribe();
} }
get id() {
return this._id;
}
private updateConnectionState = () => { private updateConnectionState = () => {
const { clientToServer, serverToBroker } = this.api.connectionState; const { clientToServer, serverToBroker } = this.api.connectionState;
runInAction("updateConnectionState", () => { runInAction("updateConnectionState", () => {
@ -34,7 +32,7 @@ export class WSSprinklersDevice extends s.SprinklersDevice {
try { try {
await this.api.makeRequest("deviceSubscribe", subscribeRequest); await this.api.makeRequest("deviceSubscribe", subscribeRequest);
runInAction("deviceSubscribeSuccess", () => { runInAction("deviceSubscribeSuccess", () => {
this.connectionState.brokerToDevice = true; this.connectionState.hasPermission = true;
}); });
} catch (err) { } catch (err) {
runInAction("deviceSubscribeError", () => { runInAction("deviceSubscribeError", () => {
@ -48,6 +46,20 @@ export class WSSprinklersDevice extends s.SprinklersDevice {
} }
} }
async unsubscribe() {
const unsubscribeRequest: ws.IDeviceSubscribeRequest = {
deviceId: this.id,
};
try {
await this.api.makeRequest("deviceUnsubscribe", unsubscribeRequest);
runInAction("deviceUnsubscribeSuccess", () => {
this.connectionState.brokerToDevice = false;
});
} catch (err) {
log.error({ err }, "error unsubscribing from device");
}
}
makeRequest(request: deviceRequests.Request): Promise<deviceRequests.Response> { makeRequest(request: deviceRequests.Request): Promise<deviceRequests.Response> {
return this.api.makeDeviceCall(this.id, request); return this.api.makeDeviceCall(this.id, request);
} }

50
client/sprinklersRpc/WebSocketRpcClient.ts

@ -1,4 +1,4 @@
import { action, observable, runInAction, when } from "mobx"; import { action, computed, observable, runInAction, when } from "mobx";
import { update } from "serializr"; import { update } from "serializr";
import { TokenStore } from "@client/state/TokenStore"; import { TokenStore } from "@client/state/TokenStore";
@ -6,12 +6,12 @@ import { ErrorCode } from "@common/ErrorCode";
import { IUser } from "@common/httpApi"; import { IUser } from "@common/httpApi";
import * as rpc from "@common/jsonRpc"; import * as rpc from "@common/jsonRpc";
import logger from "@common/logger"; import logger from "@common/logger";
import * as s from "@common/sprinklersRpc";
import * as deviceRequests from "@common/sprinklersRpc/deviceRequests"; import * as deviceRequests from "@common/sprinklersRpc/deviceRequests";
import * as s from "@common/sprinklersRpc/index"; import * as schema from "@common/sprinklersRpc/schema/";
import * as schema from "@common/sprinklersRpc/schema/index";
import { seralizeRequest } from "@common/sprinklersRpc/schema/requests"; import { seralizeRequest } from "@common/sprinklersRpc/schema/requests";
import * as ws from "@common/sprinklersRpc/websocketData"; import * as ws from "@common/sprinklersRpc/websocketData";
import { DefaultEvents, TypedEventEmitter } from "@common/TypedEventEmitter"; import { DefaultEvents, TypedEventEmitter, typedEventEmitter } from "@common/TypedEventEmitter";
import { WSSprinklersDevice } from "./WSSprinklersDevice"; import { WSSprinklersDevice } from "./WSSprinklersDevice";
export const log = logger.child({ source: "websocket" }); export const log = logger.child({ source: "websocket" });
@ -27,15 +27,22 @@ const DEFAULT_URL = `${websocketProtocol}//${location.hostname}:${websocketPort}
export interface WebSocketRpcClientEvents extends DefaultEvents { export interface WebSocketRpcClientEvents extends DefaultEvents {
newUserData(userData: IUser): void; newUserData(userData: IUser): void;
rpcError(error: ws.RpcError): void; rpcError(error: s.RpcError): void;
tokenError(error: ws.RpcError): void; tokenError(error: s.RpcError): void;
} }
export class WebSocketRpcClient extends TypedEventEmitter<WebSocketRpcClientEvents> implements s.SprinklersRPC { // tslint:disable:member-ordering
export interface WebSocketRpcClient extends TypedEventEmitter<WebSocketRpcClientEvents> {
}
@typedEventEmitter
export class WebSocketRpcClient extends s.SprinklersRPC {
@computed
get connected(): boolean { get connected(): boolean {
return this.connectionState.isServerConnected || false; return this.connectionState.isServerConnected || false;
} }
readonly webSocketUrl: string; readonly webSocketUrl: string;
devices: Map<string, WSSprinklersDevice> = new Map(); devices: Map<string, WSSprinklersDevice> = new Map();
@ -51,7 +58,6 @@ export class WebSocketRpcClient extends TypedEventEmitter<WebSocketRpcClientEven
private responseCallbacks: ws.ServerResponseHandlers = {}; private responseCallbacks: ws.ServerResponseHandlers = {};
private reconnectTimer: number | null = null; private reconnectTimer: number | null = null;
/* tslint:disable-next-line:member-ordering */
@action @action
private onDisconnect = action(() => { private onDisconnect = action(() => {
this.connectionState.serverToBroker = null; this.connectionState.serverToBroker = null;
@ -68,7 +74,7 @@ export class WebSocketRpcClient extends TypedEventEmitter<WebSocketRpcClientEven
this.connectionState.clientToServer = false; this.connectionState.clientToServer = false;
this.connectionState.serverToBroker = false; this.connectionState.serverToBroker = false;
this.on("rpcError", (err: ws.RpcError) => { this.on("rpcError", (err: s.RpcError) => {
if (err.code === ErrorCode.BadToken) { if (err.code === ErrorCode.BadToken) {
this.emit("tokenError", err); this.emit("tokenError", err);
} }
@ -90,7 +96,9 @@ export class WebSocketRpcClient extends TypedEventEmitter<WebSocketRpcClientEven
} }
} }
getDevice(id: string): s.SprinklersDevice { acquireDevice = s.SprinklersRPC.prototype.acquireDevice;
protected getDevice(id: string): s.SprinklersDevice {
let device = this.devices.get(id); let device = this.devices.get(id);
if (!device) { if (!device) {
device = new WSSprinklersDevice(this, id); device = new WSSprinklersDevice(this, id);
@ -99,8 +107,14 @@ export class WebSocketRpcClient extends TypedEventEmitter<WebSocketRpcClientEven
return device; return device;
} }
removeDevice(id: string) { releaseDevice(id: string): void {
// NOT IMPLEMENTED const device = this.devices.get(id);
if (!device) return;
device.unsubscribe()
.then(() => {
log.debug({ id }, "released device");
this.devices.delete(id);
});
} }
async authenticate(accessToken: string): Promise<ws.IAuthenticateResponse> { async authenticate(accessToken: string): Promise<ws.IAuthenticateResponse> {
@ -120,7 +134,7 @@ export class WebSocketRpcClient extends TypedEventEmitter<WebSocketRpcClientEven
} catch (err) { } catch (err) {
logger.error({ err }, "error authenticating websocket connection"); logger.error({ err }, "error authenticating websocket connection");
// TODO message? // TODO message?
runInAction("authenticateSuccess", () => { runInAction("authenticateError", () => {
this.authenticated = false; this.authenticated = false;
}); });
} }
@ -134,13 +148,13 @@ export class WebSocketRpcClient extends TypedEventEmitter<WebSocketRpcClientEven
code: ErrorCode.ServerDisconnected, code: ErrorCode.ServerDisconnected,
message: "the server is not connected", message: "the server is not connected",
}; };
throw new ws.RpcError("the server is not connected", ErrorCode.ServerDisconnected); throw new s.RpcError("the server is not connected", ErrorCode.ServerDisconnected);
} }
const requestData = seralizeRequest(request); const requestData = seralizeRequest(request);
const data: ws.IDeviceCallRequest = { deviceId, data: requestData }; const data: ws.IDeviceCallRequest = { deviceId, data: requestData };
const resData = await this.makeRequest("deviceCall", data); const resData = await this.makeRequest("deviceCall", data);
if (resData.data.result === "error") { if (resData.data.result === "error") {
throw new ws.RpcError(resData.data.message, resData.data.code, resData.data); throw new s.RpcError(resData.data.message, resData.data.code, resData.data);
} else { } else {
return resData.data; return resData.data;
} }
@ -158,17 +172,17 @@ export class WebSocketRpcClient extends TypedEventEmitter<WebSocketRpcClientEven
resolve(response.data); resolve(response.data);
} else { } else {
const { error } = response; const { error } = response;
reject(new ws.RpcError(error.message, error.code, error.data)); reject(new s.RpcError(error.message, error.code, error.data));
} }
}; };
timeoutHandle = window.setTimeout(() => { timeoutHandle = window.setTimeout(() => {
delete this.responseCallbacks[id]; delete this.responseCallbacks[id];
reject(new ws.RpcError("the request timed out", ErrorCode.Timeout)); reject(new s.RpcError("the request timed out", ErrorCode.Timeout));
}, TIMEOUT_MS); }, TIMEOUT_MS);
this.sendRequest(id, method, params); this.sendRequest(id, method, params);
}) })
.catch((err) => { .catch((err) => {
if (err instanceof ws.RpcError) { if (err instanceof s.RpcError) {
this.emit("rpcError", err); this.emit("rpcError", err);
} }
throw err; throw err;

16
common/TypedEventEmitter.ts

@ -45,4 +45,20 @@ const TypedEventEmitter = EventEmitter as {
}; };
type TypedEventEmitter<TEvents extends DefaultEvents = AnyEvents> = ITypedEventEmitter<TEvents>; type TypedEventEmitter<TEvents extends DefaultEvents = AnyEvents> = ITypedEventEmitter<TEvents>;
type Constructable = new (...args: any[]) => any;
export function typedEventEmitter<TBase extends Constructable, TEvents extends DefaultEvents = AnyEvents>(Base: TBase):
TBase & TypedEventEmitter<TEvents> {
const NewClass = class extends Base {
constructor(...args: any[]) {
super(...args);
EventEmitter.call(this);
}
};
Object.getOwnPropertyNames(EventEmitter.prototype).forEach((name) => {
NewClass.prototype[name] = (EventEmitter.prototype as any)[name];
});
return NewClass as any;
}
export { TypedEventEmitter }; export { TypedEventEmitter };

15
common/jsonRpc/index.ts

@ -126,30 +126,35 @@ export async function handleRequest<RequestTypes,
ResponseTypes extends { [Method in keyof RequestTypes]: any }, ErrorType>( ResponseTypes extends { [Method in keyof RequestTypes]: any }, ErrorType>(
handlers: RequestHandlers<RequestTypes, ResponseTypes, ErrorType>, handlers: RequestHandlers<RequestTypes, ResponseTypes, ErrorType>,
message: Request<RequestTypes>, message: Request<RequestTypes>,
thisParam?: any,
): Promise<ResponseData<ResponseTypes, ErrorType>> { ): Promise<ResponseData<ResponseTypes, ErrorType>> {
const handler = handlers[message.method]; const handler = handlers[message.method];
if (!handler) { if (!handler) {
throw new Error("No handler for request method " + message.method); throw new Error("No handler for request method " + message.method);
} }
return handler(message.params); return handler.call(thisParam, message.params);
} }
export function handleResponse<ResponseTypes, ErrorType>( export function handleResponse<ResponseTypes, ErrorType>(
handlers: ResponseHandlers<ResponseTypes, ErrorType>, handlers: ResponseHandlers<ResponseTypes, ErrorType>,
message: Response<ResponseTypes, ErrorType>) { message: Response<ResponseTypes, ErrorType>,
thisParam?: any,
) {
const handler = handlers[message.id]; const handler = handlers[message.id];
if (!handler) { if (!handler) {
return; return;
} }
return handler(message); return handler.call(thisParam, message);
} }
export function handleNotification<NotificationTypes>( export function handleNotification<NotificationTypes>(
handlers: NotificationHandlers<NotificationTypes>, handlers: NotificationHandlers<NotificationTypes>,
message: Notification<NotificationTypes>) { message: Notification<NotificationTypes>,
thisParam?: any,
) {
const handler = handlers[message.method]; const handler = handlers[message.method];
if (!handler) { if (!handler) {
throw new Error("No handler for notification method " + message.method); throw new Error("No handler for notification method " + message.method);
} }
return handler(message.data); return handler.call(thisParam, message.data);
} }

21
common/sprinklersRpc/RpcError.ts

@ -0,0 +1,21 @@
import { ErrorCode } from "@common/ErrorCode";
import { IError } from "./websocketData";
export class RpcError extends Error implements IError {
name = "RpcError";
code: number;
data: any;
constructor(message: string, code: number = ErrorCode.BadRequest, data: any = {}) {
super(message);
this.code = code;
if (data instanceof Error) {
this.data = data.toString();
}
this.data = data;
}
toJSON(): IError {
return { code: this.code, message: this.message, data: this.data };
}
}

33
common/sprinklersRpc/SprinklersDevice.ts

@ -4,8 +4,12 @@ import * as req from "./deviceRequests";
import { Program } from "./Program"; import { Program } from "./Program";
import { Section } from "./Section"; import { Section } from "./Section";
import { SectionRunner } from "./SectionRunner"; import { SectionRunner } from "./SectionRunner";
import { SprinklersRPC } from "./SprinklersRPC";
export abstract class SprinklersDevice { export abstract class SprinklersDevice {
readonly rpc: SprinklersRPC;
readonly id: string;
@observable connectionState: ConnectionState = new ConnectionState(); @observable connectionState: ConnectionState = new ConnectionState();
@observable sections: Section[] = []; @observable sections: Section[] = [];
@observable programs: Program[] = []; @observable programs: Program[] = [];
@ -19,14 +23,37 @@ export abstract class SprinklersDevice {
sectionRunnerConstructor: typeof SectionRunner = SectionRunner; sectionRunnerConstructor: typeof SectionRunner = SectionRunner;
programConstructor: typeof Program = Program; programConstructor: typeof Program = Program;
protected constructor() { private references: number = 0;
protected constructor(rpc: SprinklersRPC, id: string) {
this.rpc = rpc;
this.id = id;
this.sectionRunner = new (this.sectionRunnerConstructor)(this); this.sectionRunner = new (this.sectionRunnerConstructor)(this);
} }
abstract get id(): string;
abstract makeRequest(request: req.Request): Promise<req.Response>; abstract makeRequest(request: req.Request): Promise<req.Response>;
/**
* Increase the reference count for this sprinklers device
* @returns The new reference count
*/
acquire(): number {
return ++this.references;
}
/**
* Releases one reference to this device. When the reference count reaches 0, the device
* will be released and no longer updated.
* @returns The reference count after being updated
*/
release(): number {
this.references--;
if (this.references <= 0) {
this.rpc.releaseDevice(this.id);
}
return this.references;
}
runProgram(opts: req.WithProgram) { runProgram(opts: req.WithProgram) {
return this.makeRequest({ ...opts, type: "runProgram" }); return this.makeRequest({ ...opts, type: "runProgram" });
} }

30
common/sprinklersRpc/SprinklersRPC.ts

@ -1,13 +1,31 @@
import { ConnectionState } from "./ConnectionState"; import { ConnectionState } from "./ConnectionState";
import { SprinklersDevice } from "./SprinklersDevice"; import { SprinklersDevice } from "./SprinklersDevice";
export interface SprinklersRPC { export abstract class SprinklersRPC {
readonly connectionState: ConnectionState; abstract readonly connectionState: ConnectionState;
readonly connected: boolean; abstract readonly connected: boolean;
start(): void; abstract start(): void;
getDevice(id: string): SprinklersDevice; /**
* Acquires a reference to a device. This reference must be released by calling
* SprinklersDevice#release for every time this method was called
* @param id The id of the device
*/
acquireDevice(id: string): SprinklersDevice {
const device = this.getDevice(id);
device.acquire();
return device;
}
removeDevice(id: string): void; /**
* Forces a device to be released. The device will no longer be updated.
*
* This should not be used normally, instead SprinklersDevice#release should be called to manage
* each reference to a device.
* @param id The id of the device to remove
*/
abstract releaseDevice(id: string): void;
protected abstract getDevice(id: string): SprinklersDevice;
} }

2
common/sprinklersRpc/index.ts

@ -1,4 +1,3 @@
// export * from "./Duration";
export * from "./SprinklersRPC"; export * from "./SprinklersRPC";
export * from "./Program"; export * from "./Program";
export * from "./schedule"; export * from "./schedule";
@ -6,3 +5,4 @@ export * from "./Section";
export * from "./SectionRunner"; export * from "./SectionRunner";
export * from "./SprinklersDevice"; export * from "./SprinklersDevice";
export * from "./ConnectionState"; export * from "./ConnectionState";
export * from "./RpcError";

83
common/sprinklersRpc/mqtt/index.ts

@ -1,9 +1,11 @@
import { autorun, observable } from "mobx"; import { autorun, observable } from "mobx";
import * as mqtt from "mqtt"; import * as mqtt from "mqtt";
import { ErrorCode } from "@common/ErrorCode";
import logger from "@common/logger"; import logger from "@common/logger";
import * as s from "@common/sprinklersRpc"; import * as s from "@common/sprinklersRpc";
import * as requests from "@common/sprinklersRpc/deviceRequests"; import * as requests from "@common/sprinklersRpc/deviceRequests";
import { RpcError } from "@common/sprinklersRpc/RpcError";
import { seralizeRequest } from "@common/sprinklersRpc/schema/requests"; import { seralizeRequest } from "@common/sprinklersRpc/schema/requests";
import { getRandomId } from "@common/utils"; import { getRandomId } from "@common/utils";
@ -18,6 +20,7 @@ interface WithRid {
} }
export const DEVICE_PREFIX = "devices"; export const DEVICE_PREFIX = "devices";
const REQUEST_TIMEOUT = 5000;
export interface MqttRpcClientOptions { export interface MqttRpcClientOptions {
mqttUri: string; mqttUri: string;
@ -25,7 +28,7 @@ export interface MqttRpcClientOptions {
password?: string; password?: string;
} }
export class MqttRpcClient implements s.SprinklersRPC, MqttRpcClientOptions { export class MqttRpcClient extends s.SprinklersRPC implements MqttRpcClientOptions {
get connected(): boolean { get connected(): boolean {
return this.connectionState.isServerConnected || false; return this.connectionState.isServerConnected || false;
} }
@ -43,6 +46,7 @@ export class MqttRpcClient implements s.SprinklersRPC, MqttRpcClientOptions {
devices: Map<string, MqttSprinklersDevice> = new Map(); devices: Map<string, MqttSprinklersDevice> = new Map();
constructor(opts: MqttRpcClientOptions) { constructor(opts: MqttRpcClientOptions) {
super();
Object.assign(this, opts); Object.assign(this, opts);
this.connectionState.serverToBroker = false; this.connectionState.serverToBroker = false;
} }
@ -69,7 +73,16 @@ export class MqttRpcClient implements s.SprinklersRPC, MqttRpcClientOptions {
}); });
} }
getDevice(id: string): s.SprinklersDevice { releaseDevice(id: string) {
const device = this.devices.get(id);
if (!device) {
return;
}
device.doUnsubscribe();
this.devices.delete(id);
}
protected getDevice(id: string): s.SprinklersDevice {
if (/\//.test(id)) { if (/\//.test(id)) {
throw new Error("Device id cannot contain a /"); throw new Error("Device id cannot contain a /");
} }
@ -83,15 +96,6 @@ export class MqttRpcClient implements s.SprinklersRPC, MqttRpcClientOptions {
return device; return device;
} }
removeDevice(id: string) {
const device = this.devices.get(id);
if (!device) {
return;
}
device.doUnsubscribe();
this.devices.delete(id);
}
private onMessageArrived(topic: string, payload: Buffer, packet: mqtt.Packet) { private onMessageArrived(topic: string, payload: Buffer, packet: mqtt.Packet) {
try { try {
this.processMessage(topic, payload, packet); this.processMessage(topic, payload, packet);
@ -119,6 +123,8 @@ export class MqttRpcClient implements s.SprinklersRPC, MqttRpcClientOptions {
} }
} }
type ResponseCallback = (response: requests.Response) => void;
const subscriptions = [ const subscriptions = [
"/connected", "/connected",
"/sections", "/sections",
@ -148,7 +154,6 @@ const handler = (test: RegExp) =>
class MqttSprinklersDevice extends s.SprinklersDevice { class MqttSprinklersDevice extends s.SprinklersDevice {
readonly apiClient: MqttRpcClient; readonly apiClient: MqttRpcClient;
readonly id: string;
handlers!: IHandlerEntry[]; handlers!: IHandlerEntry[];
private subscriptions: string[]; private subscriptions: string[];
@ -156,12 +161,11 @@ class MqttSprinklersDevice extends s.SprinklersDevice {
private responseCallbacks: Map<number, ResponseCallback> = new Map(); private responseCallbacks: Map<number, ResponseCallback> = new Map();
constructor(apiClient: MqttRpcClient, id: string) { constructor(apiClient: MqttRpcClient, id: string) {
super(); super(apiClient, id);
this.sectionConstructor = MqttSection; this.sectionConstructor = MqttSection;
this.sectionRunnerConstructor = MqttSectionRunner; this.sectionRunnerConstructor = MqttSectionRunner;
this.programConstructor = MqttProgram; this.programConstructor = MqttProgram;
this.apiClient = apiClient; this.apiClient = apiClient;
this.id = id;
this.sectionRunner = new MqttSectionRunner(this); this.sectionRunner = new MqttSectionRunner(this);
this.subscriptions = subscriptions.map((filter) => this.prefix + filter); this.subscriptions = subscriptions.map((filter) => this.prefix + filter);
@ -183,27 +187,23 @@ class MqttSprinklersDevice extends s.SprinklersDevice {
return DEVICE_PREFIX + "/" + this.id; return DEVICE_PREFIX + "/" + this.id;
} }
doSubscribe(): Promise<void> { doSubscribe() {
return new Promise((resolve, reject) => { this.apiClient.client.subscribe(this.subscriptions, { qos: 1 }, (err) => {
this.apiClient.client.subscribe(this.subscriptions, { qos: 1 }, (err) => { if (err) {
if (err) { log.error({ err, id: this.id }, "error subscribing to device");
reject(err); } else {
} else { log.debug({ id: this.id }, "subscribed to device");
resolve(); }
}
});
}); });
} }
doUnsubscribe(): Promise<void> { doUnsubscribe() {
return new Promise((resolve, reject) => { this.apiClient.client.unsubscribe(this.subscriptions, (err) => {
this.apiClient.client.unsubscribe(this.subscriptions, (err) => { if (err) {
if (err) { log.error({ err, id: this.id }, "error unsubscribing to device");
reject(err); } else {
} else { log.debug({ id: this.id }, "unsubscribed to device");
resolve(); }
}
});
}); });
} }
@ -226,14 +226,25 @@ class MqttSprinklersDevice extends s.SprinklersDevice {
const json = seralizeRequest(request); const json = seralizeRequest(request);
const requestId = json.rid = this.getRequestId(); const requestId = json.rid = this.getRequestId();
const payloadStr = JSON.stringify(json); const payloadStr = JSON.stringify(json);
this.responseCallbacks.set(requestId, (data) => {
let timeoutHandle: any;
const callback: ResponseCallback = (data) => {
if (data.result === "error") { if (data.result === "error") {
reject(data); reject(new RpcError(data.message, data.code, data));
} else { } else {
resolve(data); resolve(data);
} }
this.responseCallbacks.delete(requestId); this.responseCallbacks.delete(requestId);
}); clearTimeout(timeoutHandle);
};
timeoutHandle = setTimeout(() => {
reject(new RpcError("the request has timed out", ErrorCode.Timeout));
this.responseCallbacks.delete(requestId);
clearTimeout(timeoutHandle);
}, REQUEST_TIMEOUT);
this.responseCallbacks.set(requestId, callback);
this.apiClient.client.publish(topic, payloadStr, { qos: 1 }); this.apiClient.client.publish(topic, payloadStr, { qos: 1 });
}); });
} }
@ -298,5 +309,3 @@ class MqttSprinklersDevice extends s.SprinklersDevice {
/* tslint:enable:no-unused-variable */ /* tslint:enable:no-unused-variable */
} }
type ResponseCallback = (response: requests.Response) => void;

22
common/sprinklersRpc/websocketData.ts

@ -1,6 +1,5 @@
import * as rpc from "../jsonRpc/index"; import * as rpc from "../jsonRpc/index";
import { ErrorCode } from "@common/ErrorCode";
import { IUser } from "@common/httpApi"; import { IUser } from "@common/httpApi";
import { Response as ResponseData } from "@common/sprinklersRpc/deviceRequests"; import { Response as ResponseData } from "@common/sprinklersRpc/deviceRequests";
@ -20,6 +19,7 @@ export interface IDeviceCallRequest {
export interface IClientRequestTypes { export interface IClientRequestTypes {
"authenticate": IAuthenticateRequest; "authenticate": IAuthenticateRequest;
"deviceSubscribe": IDeviceSubscribeRequest; "deviceSubscribe": IDeviceSubscribeRequest;
"deviceUnsubscribe": IDeviceSubscribeRequest;
"deviceCall": IDeviceCallRequest; "deviceCall": IDeviceCallRequest;
} }
@ -40,6 +40,7 @@ export interface IDeviceCallResponse {
export interface IServerResponseTypes { export interface IServerResponseTypes {
"authenticate": IAuthenticateResponse; "authenticate": IAuthenticateResponse;
"deviceSubscribe": IDeviceSubscribeResponse; "deviceSubscribe": IDeviceSubscribeResponse;
"deviceUnsubscribe": IDeviceSubscribeResponse;
"deviceCall": IDeviceCallResponse; "deviceCall": IDeviceCallResponse;
} }
@ -64,25 +65,6 @@ export type ServerNotificationMethod = keyof IServerNotificationTypes;
export type IError = rpc.DefaultErrorType; export type IError = rpc.DefaultErrorType;
export type ErrorData = rpc.ErrorData<IError>; export type ErrorData = rpc.ErrorData<IError>;
export class RpcError extends Error implements IError {
name = "RpcError";
code: number;
data: any;
constructor(message: string, code: number = ErrorCode.BadRequest, data: any = {}) {
super(message);
this.code = code;
if (data instanceof Error) {
this.data = data.toString();
}
this.data = data;
}
toJSON(): IError {
return { code: this.code, message: this.message, data: this.data };
}
}
export type ServerMessage = rpc.Message<{}, IServerResponseTypes, IError, IServerNotificationTypes>; export type ServerMessage = rpc.Message<{}, IServerResponseTypes, IError, IServerNotificationTypes>;
export type ServerNotification = rpc.Notification<IServerNotificationTypes>; export type ServerNotification = rpc.Notification<IServerNotificationTypes>;
export type ServerResponse = rpc.Response<IServerResponseTypes, IError>; export type ServerResponse = rpc.Response<IServerResponseTypes, IError>;

8
common/utils.ts

@ -16,3 +16,11 @@ export function checkedIndexOf<T>(o: T | number, arr: T[], type: string = "objec
export function getRandomId() { export function getRandomId() {
return Math.floor(Math.random() * 1000000000); return Math.floor(Math.random() * 1000000000);
} }
export function applyMixins(derivedCtor: any, baseCtors: any[]) {
baseCtors.forEach((baseCtor) => {
Object.getOwnPropertyNames(baseCtor.prototype).forEach((name) => {
derivedCtor.prototype[name] = baseCtor.prototype[name];
});
});
}

3
server/express/api/devices.ts

@ -36,9 +36,10 @@ export function devices(state: ServerState) {
if (!userDevice) { if (!userDevice) {
throw new ApiError("User does not have access to the specified device", ErrorCode.NoPermission); throw new ApiError("User does not have access to the specified device", ErrorCode.NoPermission);
} }
const device = state.mqttClient.getDevice(req.params.deviceId); const device = state.mqttClient.acquireDevice(req.params.deviceId);
const j = serialize(schema.sprinklersDevice, device); const j = serialize(schema.sprinklersDevice, device);
res.send(j); res.send(j);
device.release();
}); });
router.post("/register", verifyAuthorization({ router.post("/register", verifyAuthorization({

22
server/express/authentication.ts

@ -10,14 +10,14 @@ import {
TokenGrantRequest, TokenGrantRequest,
TokenGrantResponse, TokenGrantResponse,
} from "@common/httpApi"; } from "@common/httpApi";
import { AccessToken, DeviceRegistrationToken, DeviceToken, RefreshToken, TokenClaims, SuperuserToken } from "@common/TokenClaims"; import * as tok from "@common/TokenClaims";
import { User } from "../entities"; import { User } from "../entities";
import { ServerState } from "../state"; import { ServerState } from "../state";
declare global { declare global {
namespace Express { namespace Express {
interface Request { interface Request {
token?: AccessToken; token?: tok.AccessToken;
} }
} }
} }
@ -39,7 +39,7 @@ function getExpTime(lifetime: number) {
return Math.floor(Date.now() / 1000) + lifetime; return Math.floor(Date.now() / 1000) + lifetime;
} }
function signToken(claims: TokenClaims): Promise<string> { function signToken(claims: tok.TokenClaims): Promise<string> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
jwt.sign(claims, JWT_SECRET, (err: Error, encoded: string) => { jwt.sign(claims, JWT_SECRET, (err: Error, encoded: string) => {
if (err) { if (err) {
@ -51,7 +51,7 @@ function signToken(claims: TokenClaims): Promise<string> {
}); });
} }
export function verifyToken<TClaims extends TokenClaims = TokenClaims>( export function verifyToken<TClaims extends tok.TokenClaims = tok.TokenClaims>(
token: string, type?: TClaims["type"], token: string, type?: TClaims["type"],
): Promise<TClaims> { ): Promise<TClaims> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
@ -67,7 +67,7 @@ export function verifyToken<TClaims extends TokenClaims = TokenClaims>(
reject(err); reject(err);
} }
} else { } else {
const claims: TokenClaims = decoded as any; const claims: tok.TokenClaims = decoded as any;
if (type != null && claims.type !== type) { if (type != null && claims.type !== type) {
reject(new ApiError(`Expected a "${type}" token, received a "${claims.type}" token`, reject(new ApiError(`Expected a "${type}" token, received a "${claims.type}" token`,
ErrorCode.BadToken)); ErrorCode.BadToken));
@ -79,7 +79,7 @@ export function verifyToken<TClaims extends TokenClaims = TokenClaims>(
} }
function generateAccessToken(user: User, secret: string): Promise<string> { function generateAccessToken(user: User, secret: string): Promise<string> {
const access_token_claims: AccessToken = { const access_token_claims: tok.AccessToken = {
iss: ISSUER, iss: ISSUER,
aud: user.id, aud: user.id,
name: user.name, name: user.name,
@ -91,7 +91,7 @@ function generateAccessToken(user: User, secret: string): Promise<string> {
} }
function generateRefreshToken(user: User, secret: string): Promise<string> { function generateRefreshToken(user: User, secret: string): Promise<string> {
const refresh_token_claims: RefreshToken = { const refresh_token_claims: tok.RefreshToken = {
iss: ISSUER, iss: ISSUER,
aud: user.id, aud: user.id,
name: user.name, name: user.name,
@ -103,7 +103,7 @@ function generateRefreshToken(user: User, secret: string): Promise<string> {
} }
function generateDeviceRegistrationToken(secret: string): Promise<string> { function generateDeviceRegistrationToken(secret: string): Promise<string> {
const device_reg_token_claims: DeviceRegistrationToken = { const device_reg_token_claims: tok.DeviceRegistrationToken = {
iss: ISSUER, iss: ISSUER,
type: "device_reg", type: "device_reg",
}; };
@ -111,7 +111,7 @@ function generateDeviceRegistrationToken(secret: string): Promise<string> {
} }
export function generateDeviceToken(id: number, deviceId: string): Promise<string> { export function generateDeviceToken(id: number, deviceId: string): Promise<string> {
const device_token_claims: DeviceToken = { const device_token_claims: tok.DeviceToken = {
iss: ISSUER, iss: ISSUER,
type: "device", type: "device",
aud: deviceId, aud: deviceId,
@ -121,7 +121,7 @@ export function generateDeviceToken(id: number, deviceId: string): Promise<strin
} }
export function generateSuperuserToken(): Promise<string> { export function generateSuperuserToken(): Promise<string> {
const superuser_claims: SuperuserToken = { const superuser_claims: tok.SuperuserToken = {
iss: ISSUER, iss: ISSUER,
type: "superuser", type: "superuser",
}; };
@ -200,7 +200,7 @@ export function authentication(state: ServerState) {
} }
export interface VerifyAuthorizationOpts { export interface VerifyAuthorizationOpts {
type: TokenClaims["type"]; type: tok.TokenClaims["type"];
} }
export function verifyAuthorization(options?: Partial<VerifyAuthorizationOpts>): Express.RequestHandler { export function verifyAuthorization(options?: Partial<VerifyAuthorizationOpts>): Express.RequestHandler {

2
server/index.ts

@ -10,7 +10,7 @@ import * as WebSocket from "ws";
import { ServerState } from "./state"; import { ServerState } from "./state";
import { createApp } from "./express"; import { createApp } from "./express";
import { WebSocketApi } from "./sprinklersRpc/websocketServer"; import { WebSocketApi } from "./sprinklersRpc/WebSocketApi";
const state = new ServerState(); const state = new ServerState();
const app = createApp(state); const app = createApp(state);

26
server/sprinklersRpc/WebSocketApi.ts

@ -0,0 +1,26 @@
import * as WebSocket from "ws";
import { ServerState } from "@server/state";
import { WebSocketConnection } from "./WebSocketConnection";
export class WebSocketApi {
state: ServerState;
clients: Set<WebSocketConnection> = new Set();
constructor(state: ServerState) {
this.state = state;
}
listen(webSocketServer: WebSocket.Server) {
webSocketServer.on("connection", this.handleConnection);
}
handleConnection = (socket: WebSocket) => {
const client = new WebSocketConnection(this, socket);
this.clients.add(client);
}
removeClient(client: WebSocketConnection) {
return this.clients.delete(client);
}
}

262
server/sprinklersRpc/WebSocketConnection.ts

@ -0,0 +1,262 @@
import { autorun } from "mobx";
import { serialize } from "serializr";
import * as WebSocket from "ws";
import { ErrorCode } from "@common/ErrorCode";
import * as rpc from "@common/jsonRpc";
import log from "@common/logger";
import { RpcError } from "@common/sprinklersRpc";
import * as deviceRequests from "@common/sprinklersRpc/deviceRequests";
import * as schema from "@common/sprinklersRpc/schema";
import * as ws from "@common/sprinklersRpc/websocketData";
import { AccessToken } from "@common/TokenClaims";
import { User } from "@server/entities";
import { verifyToken } from "@server/express/authentication";
import { WebSocketApi } from "./WebSocketApi";
type Disposer = () => void;
export class WebSocketConnection {
api: WebSocketApi;
socket: WebSocket;
disposers: Array<() => void> = [];
// map of device id to disposer function
deviceSubscriptions: Map<string, Disposer> = new Map();
/// This shall be the user id if the client has been authenticated, null otherwise
userId: number | null = null;
user: User | null = null;
private requestHandlers: ws.ClientRequestHandlers = new WebSocketRequestHandlers();
get state() {
return this.api.state;
}
constructor(api: WebSocketApi, socket: WebSocket) {
this.api = api;
this.socket = socket;
this.socket.on("message", this.handleSocketMessage);
this.socket.on("close", this.onClose);
}
stop = () => {
this.socket.close();
}
onClose = (code: number, reason: string) => {
log.debug({ code, reason }, "WebSocketConnection closing");
this.disposers.forEach((disposer) => disposer());
this.deviceSubscriptions.forEach((disposer) => disposer());
this.api.removeClient(this);
}
subscribeBrokerConnection() {
this.disposers.push(autorun(() => {
const updateData: ws.IBrokerConnectionUpdate = {
brokerConnected: this.state.mqttClient.connected,
};
this.sendNotification("brokerConnectionUpdate", updateData);
}));
}
checkAuthorization() {
if (!this.userId || !this.user) {
throw new RpcError("this WebSocket session has not been authenticated",
ErrorCode.Unauthorized);
}
}
checkDevice(devId: string) {
const userDevice = this.user!.devices!.find((dev) => dev.deviceId === devId);
if (userDevice == null) {
throw new RpcError("you do not have permission to subscribe to device",
ErrorCode.NoPermission, { id: devId });
}
const deviceId = userDevice.deviceId;
if (!deviceId) {
throw new RpcError("device has no associated device prefix", ErrorCode.Internal);
}
return userDevice;
}
sendMessage(data: ws.ServerMessage) {
this.socket.send(JSON.stringify(data));
}
sendNotification<Method extends ws.ServerNotificationMethod>(
method: Method,
data: ws.IServerNotificationTypes[Method]) {
this.sendMessage({ type: "notification", method, data });
}
sendResponse<Method extends ws.ClientRequestMethods>(
method: Method,
id: number,
data: ws.ServerResponseData<Method>) {
this.sendMessage({ type: "response", method, id, ...data });
}
handleSocketMessage = (socketData: WebSocket.Data) => {
this.doHandleSocketMessage(socketData)
.catch((err) => {
this.onError({ err }, "unhandled error on handling socket message");
});
}
async doDeviceCallRequest(requestData: ws.IDeviceCallRequest): Promise<deviceRequests.Response> {
const userDevice = this.checkDevice(requestData.deviceId);
const deviceId = userDevice.deviceId!;
const device = this.state.mqttClient.acquireDevice(deviceId);
try {
const request = schema.requests.deserializeRequest(requestData.data);
return await device.makeRequest(request);
} finally {
device.release();
}
}
private async doHandleSocketMessage(socketData: WebSocket.Data) {
if (typeof socketData !== "string") {
return this.onError({ type: typeof socketData },
"received invalid socket data type from client", ErrorCode.Parse);
}
let data: ws.ClientMessage;
try {
data = JSON.parse(socketData);
} catch (err) {
return this.onError({ socketData, err }, "received invalid websocket message from client",
ErrorCode.Parse);
}
switch (data.type) {
case "request":
await this.handleRequest(data);
break;
default:
return this.onError({ data }, "received invalid message type from client",
ErrorCode.BadRequest);
}
}
private async handleRequest(request: ws.ClientRequest) {
let response: ws.ServerResponseData;
try {
if (!this.requestHandlers[request.method]) {
// noinspection ExceptionCaughtLocallyJS
throw new RpcError("received invalid client request method");
}
response = await rpc.handleRequest(this.requestHandlers, request, this);
} catch (err) {
if (err instanceof RpcError) {
log.debug({ err }, "rpc error");
response = { result: "error", error: err.toJSON() };
} else {
log.error({ method: request.method, err }, "unhandled error during processing of client request");
response = {
result: "error", error: {
code: ErrorCode.Internal, message: "unhandled error during processing of client request",
data: err.toString(),
},
};
}
}
this.sendResponse(request.method, request.id, response);
}
private onError(data: any, message: string, code: number = ErrorCode.Internal) {
log.error(data, message);
const errorData: ws.IError = { code, message, data };
this.sendNotification("error", errorData);
}
}
class WebSocketRequestHandlers implements ws.ClientRequestHandlers {
async authenticate(this: WebSocketConnection, data: ws.IAuthenticateRequest):
Promise<ws.ServerResponseData<"authenticate">> {
if (!data.accessToken) {
throw new RpcError("no token specified", ErrorCode.BadRequest);
}
let claims: AccessToken;
try {
claims = await verifyToken<AccessToken>(data.accessToken, "access");
} catch (e) {
throw new RpcError("invalid token", ErrorCode.BadToken, e);
}
this.userId = claims.aud;
this.user = await this.state.database.users.
findById(this.userId, { devices: true }) || null;
if (!this.user) {
throw new RpcError("user no longer exists", ErrorCode.BadToken);
}
log.debug({ userId: claims.aud, name: claims.name }, "authenticated websocket client");
this.subscribeBrokerConnection();
return {
result: "success",
data: { authenticated: true, message: "authenticated", user: this.user.toJSON() },
};
}
async deviceSubscribe(this: WebSocketConnection, data: ws.IDeviceSubscribeRequest):
Promise<ws.ServerResponseData<"deviceSubscribe">> {
this.checkAuthorization();
const userDevice = this.checkDevice(data.deviceId);
const deviceId = userDevice.deviceId!;
if (!this.deviceSubscriptions.has(deviceId)) {
const device = this.state.mqttClient.acquireDevice(deviceId);
log.debug({ deviceId, userId: this.userId }, "websocket client subscribed to device");
const autorunDisposer = autorun(() => {
const json = serialize(schema.sprinklersDevice, device);
log.trace({ device: json });
const updateData: ws.IDeviceUpdate = { deviceId, data: json };
this.sendNotification("deviceUpdate", updateData);
}, { delay: 100 });
this.deviceSubscriptions.set(deviceId, () => {
autorunDisposer();
device.release();
this.deviceSubscriptions.delete(deviceId);
});
}
const response: ws.IDeviceSubscribeResponse = {
deviceId,
};
return { result: "success", data: response };
}
async deviceUnsubscribe(this: WebSocketConnection, data: ws.IDeviceSubscribeRequest):
Promise<ws.ServerResponseData<"deviceUnsubscribe">> {
this.checkAuthorization();
const userDevice = this.checkDevice(data.deviceId);
const deviceId = userDevice.deviceId!;
const disposer = this.deviceSubscriptions.get(deviceId);
if (disposer) {
disposer();
}
const response: ws.IDeviceSubscribeResponse = {
deviceId,
};
return { result: "success", data: response };
}
async deviceCall(this: WebSocketConnection, data: ws.IDeviceCallRequest):
Promise<ws.ServerResponseData<"deviceCall">> {
this.checkAuthorization();
try {
const response = await this.doDeviceCallRequest(data);
const resData: ws.IDeviceCallResponse = {
data: response,
};
return { result: "success", data: resData };
} catch (err) {
const e: deviceRequests.ErrorResponseData = err;
throw new RpcError(e.message, e.code, e);
}
}
}

247
server/sprinklersRpc/websocketServer.ts

@ -1,247 +0,0 @@
import { autorun } from "mobx";
import { serialize } from "serializr";
import * as WebSocket from "ws";
import { ErrorCode } from "@common/ErrorCode";
import * as rpc from "@common/jsonRpc";
import log from "@common/logger";
import * as deviceRequests from "@common/sprinklersRpc/deviceRequests";
import * as schema from "@common/sprinklersRpc/schema";
import * as ws from "@common/sprinklersRpc/websocketData";
import { AccessToken } from "@common/TokenClaims";
import { User } from "@server/entities";
import { verifyToken } from "@server/express/authentication";
import { ServerState } from "@server/state";
// tslint:disable:member-ordering
export class WebSocketClient {
api: WebSocketApi;
socket: WebSocket;
disposers: Array<() => void> = [];
deviceSubscriptions: string[] = [];
/// This shall be the user id if the client has been authenticated, null otherwise
userId: number | null = null;
user: User | null = null;
get state() {
return this.api.state;
}
constructor(api: WebSocketApi, socket: WebSocket) {
this.api = api;
this.socket = socket;
}
start() {
this.socket.on("message", this.handleSocketMessage);
this.socket.on("close", this.stop);
}
stop = () => {
this.disposers.forEach((disposer) => disposer());
this.api.removeClient(this);
}
private subscribeBrokerConnection() {
this.disposers.push(autorun(() => {
const updateData: ws.IBrokerConnectionUpdate = {
brokerConnected: this.state.mqttClient.connected,
};
this.sendNotification("brokerConnectionUpdate", updateData);
}));
}
private checkAuthorization() {
if (!this.userId || !this.user) {
throw new ws.RpcError("this WebSocket session has not been authenticated",
ErrorCode.Unauthorized);
}
}
private checkDevice(devId: string) {
const userDevice = this.user!.devices!.find((dev) => dev.deviceId === devId);
if (userDevice == null) {
throw new ws.RpcError("you do not have permission to subscribe to this device",
ErrorCode.NoPermission);
}
const deviceId = userDevice.deviceId;
if (!deviceId) {
throw new ws.RpcError("device has no associated device prefix", ErrorCode.BadRequest);
}
return userDevice;
}
private requestHandlers: ws.ClientRequestHandlers = {
authenticate: async (data: ws.IAuthenticateRequest) => {
if (!data.accessToken) {
throw new ws.RpcError("no token specified", ErrorCode.BadRequest);
}
let claims: AccessToken;
try {
claims = await verifyToken<AccessToken>(data.accessToken, "access");
} catch (e) {
throw new ws.RpcError("invalid token", ErrorCode.BadToken, e);
}
this.userId = claims.aud;
this.user = await this.state.database.users.
findById(this.userId, { devices: true }) || null;
if (!this.user) {
throw new ws.RpcError("user no longer exists", ErrorCode.BadToken);
}
log.info({ userId: claims.aud, name: claims.name }, "authenticated websocket client");
this.subscribeBrokerConnection();
return {
result: "success",
data: { authenticated: true, message: "authenticated", user: this.user.toJSON() },
};
},
deviceSubscribe: async (data: ws.IDeviceSubscribeRequest) => {
this.checkAuthorization();
const userDevice = this.checkDevice(data.deviceId);
const deviceId = userDevice.deviceId!;
if (this.deviceSubscriptions.indexOf(deviceId) === -1) {
this.deviceSubscriptions.push(deviceId);
const device = this.state.mqttClient.getDevice(deviceId);
log.debug({ deviceId, userId: this.userId }, "websocket client subscribed to device");
this.disposers.push(autorun(() => {
const json = serialize(schema.sprinklersDevice, device);
log.trace({ device: json });
const updateData: ws.IDeviceUpdate = { deviceId, data: json };
this.sendNotification("deviceUpdate", updateData);
}, { delay: 100 }));
}
const response: ws.IDeviceSubscribeResponse = {
deviceId,
};
return { result: "success", data: response };
},
deviceCall: async (data: ws.IDeviceCallRequest) => {
this.checkAuthorization();
try {
const response = await this.doDeviceCallRequest(data);
const resData: ws.IDeviceCallResponse = {
data: response,
};
return { result: "success", data: resData };
} catch (err) {
const e: deviceRequests.ErrorResponseData = err;
throw new ws.RpcError(e.message, e.code, e);
}
},
};
private sendMessage(data: ws.ServerMessage) {
this.socket.send(JSON.stringify(data));
}
private sendNotification<Method extends ws.ServerNotificationMethod>(
method: Method,
data: ws.IServerNotificationTypes[Method]) {
this.sendMessage({ type: "notification", method, data });
}
private sendResponse<Method extends ws.ClientRequestMethods>(
method: Method,
id: number,
data: ws.ServerResponseData<Method>) {
this.sendMessage({ type: "response", method, id, ...data });
}
private handleSocketMessage = (socketData: WebSocket.Data) => {
this.doHandleSocketMessage(socketData)
.catch((err) => {
this.onError({ err }, "unhandled error on handling socket message");
});
}
private async doHandleSocketMessage(socketData: WebSocket.Data) {
if (typeof socketData !== "string") {
return this.onError({ type: typeof socketData },
"received invalid socket data type from client", ErrorCode.Parse);
}
let data: ws.ClientMessage;
try {
data = JSON.parse(socketData);
} catch (err) {
return this.onError({ socketData, err }, "received invalid websocket message from client",
ErrorCode.Parse);
}
switch (data.type) {
case "request":
await this.handleRequest(data);
break;
default:
return this.onError({ data }, "received invalid message type from client",
ErrorCode.BadRequest);
}
}
private async handleRequest(request: ws.ClientRequest) {
let response: ws.ServerResponseData;
try {
if (!this.requestHandlers[request.method]) {
// noinspection ExceptionCaughtLocallyJS
throw new ws.RpcError("received invalid client request method");
}
response = await rpc.handleRequest(this.requestHandlers, request);
} catch (err) {
if (err instanceof ws.RpcError) {
log.debug({ err }, "rpc error");
response = { result: "error", error: err.toJSON() };
} else {
log.error({ method: request.method, err }, "unhandled error during processing of client request");
response = {
result: "error", error: {
code: ErrorCode.Internal, message: "unhandled error during processing of client request",
data: err.toString(),
},
};
}
}
this.sendResponse(request.method, request.id, response);
}
private onError(data: any, message: string, code: number = ErrorCode.Internal) {
log.error(data, message);
const errorData: ws.IError = { code, message, data };
this.sendNotification("error", errorData);
}
private async doDeviceCallRequest(requestData: ws.IDeviceCallRequest): Promise<deviceRequests.Response> {
const userDevice = this.checkDevice(requestData.deviceId);
const deviceId = userDevice.deviceId!;
const device = this.state.mqttClient.getDevice(deviceId);
const request = schema.requests.deserializeRequest(requestData.data);
return device.makeRequest(request);
}
}
export class WebSocketApi {
state: ServerState;
clients: WebSocketClient[] = [];
constructor(state: ServerState) {
this.state = state;
}
listen(webSocketServer: WebSocket.Server) {
webSocketServer.on("connection", this.handleConnection);
}
handleConnection = (socket: WebSocket) => {
const client = new WebSocketClient(this, socket);
client.start();
this.clients.push(client);
}
removeClient(client: WebSocketClient) {
const idx = this.clients.indexOf(client);
if (idx !== -1) {
this.clients.splice(idx, 1);
}
}
}
Loading…
Cancel
Save