import { action, autorun, observable, runInAction, when } from "mobx"; import { update } from "serializr"; import { TokenStore } from "@client/state/TokenStore"; import { ErrorCode } from "@common/ErrorCode"; import { IUser } from "@common/httpApi"; import * as rpc from "@common/jsonRpc"; import logger from "@common/logger"; import * as deviceRequests from "@common/sprinklersRpc/deviceRequests"; import * as s from "@common/sprinklersRpc/index"; import * as schema from "@common/sprinklersRpc/schema/index"; import { seralizeRequest } from "@common/sprinklersRpc/schema/requests"; import * as ws from "@common/sprinklersRpc/websocketData"; import { DefaultEvents, TypedEventEmitter } from "@common/TypedEventEmitter"; const log = logger.child({ source: "websocket" }); const TIMEOUT_MS = 5000; const RECONNECT_TIMEOUT_MS = 5000; const isDev = process.env.NODE_ENV === "development"; const websocketProtocol = (location.protocol === "https:") ? "wss:" : "ws:"; const websocketPort = isDev ? 8080 : location.port; const DEFAULT_URL = `${websocketProtocol}//${location.hostname}:${websocketPort}`; // tslint:disable:member-ordering export class WSSprinklersDevice extends s.SprinklersDevice { readonly api: WebSocketRpcClient; private _id: string; constructor(api: WebSocketRpcClient, id: string) { super(); this.api = api; this._id = id; autorun(this.updateConnectionState); this.waitSubscribe(); } get id() { return this._id; } private updateConnectionState = () => { const { clientToServer, serverToBroker } = this.api.connectionState; runInAction("updateConnectionState", () => { Object.assign(this.connectionState, { clientToServer, serverToBroker }); }); } async subscribe() { const subscribeRequest: ws.IDeviceSubscribeRequest = { deviceId: this.id, }; try { await this.api.makeRequest("deviceSubscribe", subscribeRequest); runInAction("deviceSubscribeSuccess", () => { this.connectionState.brokerToDevice = true; }); } catch (err) { runInAction("deviceSubscribeError", () => { this.connectionState.brokerToDevice = false; if ((err as ws.IError).code === ErrorCode.NoPermission) { this.connectionState.hasPermission = false; } else { log.error({ err }); } }); } } makeRequest(request: deviceRequests.Request): Promise { return this.api.makeDeviceCall(this.id, request); } waitSubscribe = () => { when(() => this.api.authenticated, () => { this.subscribe(); when(() => !this.api.authenticated, this.waitSubscribe); }); } } export interface WebSocketRpcClientEvents extends DefaultEvents { newUserData(userData: IUser): void; rpcError(error: ws.RpcError): void; tokenError(error: ws.RpcError): void; } export class WebSocketRpcClient extends TypedEventEmitter implements s.SprinklersRPC { readonly webSocketUrl: string; devices: Map = new Map(); @observable connectionState: s.ConnectionState = new s.ConnectionState(); socket: WebSocket | null = null; @observable authenticated: boolean = false; tokenStore: TokenStore; private nextRequestId = Math.round(Math.random() * 1000000); private responseCallbacks: ws.ServerResponseHandlers = {}; private reconnectTimer: number | null = null; get connected(): boolean { return this.connectionState.isServerConnected || false; } constructor(tokenStore: TokenStore, webSocketUrl: string = DEFAULT_URL) { super(); this.webSocketUrl = webSocketUrl; this.tokenStore = tokenStore; this.connectionState.clientToServer = false; this.connectionState.serverToBroker = false; this.on("rpcError", (err: ws.RpcError) => { if (err.code === ErrorCode.BadToken) { this.emit("tokenError", err); } }); } start() { this._connect(); } stop() { if (this.reconnectTimer != null) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } if (this.socket != null) { this.socket.close(); this.socket = null; } } getDevice(id: string): s.SprinklersDevice { let device = this.devices.get(id); if (!device) { device = new WSSprinklersDevice(this, id); this.devices.set(id, device); } return device; } removeDevice(id: string) { // NOT IMPLEMENTED } async authenticate(accessToken: string): Promise { return this.makeRequest("authenticate", { accessToken }); } async tryAuthenticate() { when(() => this.connectionState.clientToServer === true && this.tokenStore.accessToken.isValid, async () => { try { const res = await this.authenticate(this.tokenStore.accessToken.token!); runInAction("authenticateSuccess", () => { this.authenticated = res.authenticated; }); logger.info({ user: res.user }, "authenticated websocket connection"); this.emit("newUserData", res.user); } catch (err) { logger.error({ err }, "error authenticating websocket connection"); // TODO message? runInAction("authenticateSuccess", () => { this.authenticated = false; }); } }); } // args must all be JSON serializable async makeDeviceCall(deviceId: string, request: deviceRequests.Request): Promise { if (this.socket == null) { const error: ws.IError = { code: ErrorCode.ServerDisconnected, message: "the server is not connected", }; throw new ws.RpcError("the server is not connected", ErrorCode.ServerDisconnected); } const requestData = seralizeRequest(request); const data: ws.IDeviceCallRequest = { deviceId, data: requestData }; const resData = await this.makeRequest("deviceCall", data); if (resData.data.result === "error") { throw new ws.RpcError(resData.data.message, resData.data.code, resData.data); } else { return resData.data; } } makeRequest(method: Method, params: ws.IClientRequestTypes[Method]): Promise { const id = this.nextRequestId++; return new Promise((resolve, reject) => { let timeoutHandle: number; this.responseCallbacks[id] = (response) => { clearTimeout(timeoutHandle); delete this.responseCallbacks[id]; if (response.result === "success") { resolve(response.data); } else { const { error } = response; reject(new ws.RpcError(error.message, error.code, error.data)); } }; timeoutHandle = window.setTimeout(() => { delete this.responseCallbacks[id]; reject(new ws.RpcError("the request timed out", ErrorCode.Timeout)); }, TIMEOUT_MS); this.sendRequest(id, method, params); }) .catch((err) => { if (err instanceof ws.RpcError) { this.emit("rpcError", err); } throw err; }); } private sendMessage(data: ws.ClientMessage) { if (!this.socket) { throw new Error("WebSocketApiClient is not connected"); } this.socket.send(JSON.stringify(data)); } private sendRequest( id: number, method: Method, params: ws.IClientRequestTypes[Method], ) { this.sendMessage({ type: "request", id, method, params }); } private _reconnect = () => { this._connect(); } private _connect() { if (this.socket != null && (this.socket.readyState === WebSocket.OPEN)) { this.tryAuthenticate(); return; } log.debug({ url: this.webSocketUrl }, "connecting to websocket"); this.socket = new WebSocket(this.webSocketUrl); this.socket.onopen = this.onOpen.bind(this); this.socket.onclose = this.onClose.bind(this); this.socket.onerror = this.onError.bind(this); this.socket.onmessage = this.onMessage.bind(this); } @action private onOpen() { log.info("established websocket connection"); this.connectionState.clientToServer = true; this.authenticated = false; this.tryAuthenticate(); } /* tslint:disable-next-line:member-ordering */ @action private onDisconnect = action(() => { this.connectionState.serverToBroker = null; this.connectionState.clientToServer = false; this.authenticated = false; }); private onClose(event: CloseEvent) { log.info({ event }, "disconnected from websocket"); this.onDisconnect(); this.reconnectTimer = window.setTimeout(this._reconnect, RECONNECT_TIMEOUT_MS); } @action private onError(event: Event) { log.error({ event }, "websocket error"); this.connectionState.serverToBroker = null; this.connectionState.clientToServer = false; this.onDisconnect(); } private onMessage(event: MessageEvent) { let data: ws.ServerMessage; try { data = JSON.parse(event.data); } catch (err) { return log.error({ event, err }, "received invalid websocket message"); } log.trace({ data }, "websocket message"); switch (data.type) { case "notification": this.onNotification(data); break; case "response": this.onResponse(data); break; default: log.warn({ data }, "unsupported event type received"); } } private onNotification(data: ws.ServerNotification) { try { rpc.handleNotification(this.notificationHandlers, data); } catch (err) { logger.error(err, "error handling server notification"); } } private onResponse(data: ws.ServerResponse) { try { rpc.handleResponse(this.responseCallbacks, data); } catch (err) { log.error({ err }, "error handling server response"); } } private notificationHandlers = new WSClientNotificationHandlers(this); } class WSClientNotificationHandlers implements ws.ServerNotificationHandlers { client: WebSocketRpcClient; constructor(client: WebSocketRpcClient) { this.client = client; } @action.bound brokerConnectionUpdate(data: ws.IBrokerConnectionUpdate) { this.client.connectionState.serverToBroker = data.brokerConnected; } @action.bound deviceUpdate(data: ws.IDeviceUpdate) { const device = this.client.devices.get(data.deviceId); if (!device) { return log.warn({ data }, "invalid deviceUpdate received"); } update(schema.sprinklersDevice, device, data.data); } error(data: ws.IError) { log.warn({ err: data }, "server error"); } }