You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

317 lines
9.0 KiB

import { autorun } from "mobx";
import { serialize } from "serializr";
import * as WebSocket from "ws";
import { ErrorCode } from "@common/ErrorCode";
import * as rpc from "@common/jsonRpc";
import log from "@common/logger";
import { RpcError } from "@common/sprinklersRpc";
import * as deviceRequests from "@common/sprinklersRpc/deviceRequests";
import * as schema from "@common/sprinklersRpc/schema";
import * as ws from "@common/sprinklersRpc/websocketData";
import { AccessToken } from "@common/TokenClaims";
import { verifyToken } from "@server/authentication";
import { User } from "@server/entities";
import { WebSocketApi } from "./WebSocketApi";
type Disposer = () => void;
export class WebSocketConnection {
api: WebSocketApi;
socket: WebSocket;
disposers: Array<() => void> = [];
// map of device id to disposer function
deviceSubscriptions: Map<string, Disposer> = new Map();
/// This shall be the user id if the client has been authenticated, null otherwise
userId: number | null = null;
user: User | null = null;
private requestHandlers: ws.ClientRequestHandlers = new WebSocketRequestHandlers();
get state() {
return this.api.state;
}
constructor(api: WebSocketApi, socket: WebSocket) {
this.api = api;
this.socket = socket;
this.socket.on("message", this.handleSocketMessage);
this.socket.on("close", this.onClose);
}
stop = () => {
this.socket.close();
};
onClose = (code: number, reason: string) => {
log.debug({ code, reason }, "WebSocketConnection closing");
this.disposers.forEach(disposer => disposer());
this.deviceSubscriptions.forEach(disposer => disposer());
this.api.removeClient(this);
};
subscribeBrokerConnection() {
this.disposers.push(
autorun(() => {
const updateData: ws.IBrokerConnectionUpdate = {
brokerConnected: this.state.mqttClient.connected
};
this.sendNotification("brokerConnectionUpdate", updateData);
})
);
}
checkAuthorization() {
if (!this.userId || !this.user) {
throw new RpcError(
"this WebSocket session has not been authenticated",
ErrorCode.Unauthorized
);
}
}
checkDevice(devId: string) {
const userDevice = this.user!.devices!.find(dev => dev.deviceId === devId);
if (userDevice == null) {
throw new RpcError(
"you do not have permission to subscribe to device",
ErrorCode.NoPermission,
{ id: devId }
);
}
const deviceId = userDevice.deviceId;
if (!deviceId) {
throw new RpcError(
"device has no associated device prefix",
ErrorCode.Internal
);
}
return userDevice;
}
sendMessage(data: ws.ServerMessage) {
this.socket.send(JSON.stringify(data));
}
sendNotification<Method extends ws.ServerNotificationMethod>(
method: Method,
data: ws.IServerNotificationTypes[Method]
) {
this.sendMessage({ type: "notification", method, data });
}
sendResponse<Method extends ws.ClientRequestMethods>(
method: Method,
id: number,
data: ws.ServerResponseData<Method>
) {
this.sendMessage({ type: "response", method, id, ...data });
}
handleSocketMessage = (socketData: WebSocket.Data) => {
this.doHandleSocketMessage(socketData).catch(err => {
this.onError({ err }, "unhandled error on handling socket message");
});
};
async doDeviceCallRequest(
requestData: ws.IDeviceCallRequest
): Promise<deviceRequests.Response> {
const userDevice = this.checkDevice(requestData.deviceId);
const deviceId = userDevice.deviceId!;
const device = this.state.mqttClient.acquireDevice(deviceId);
try {
const request = schema.requests.deserializeRequest(requestData.data);
return await device.makeRequest(request);
} finally {
device.release();
}
}
private async doHandleSocketMessage(socketData: WebSocket.Data) {
if (typeof socketData !== "string") {
return this.onError(
{ type: typeof socketData },
"received invalid socket data type from client",
ErrorCode.Parse
);
}
let data: ws.ClientMessage;
try {
data = JSON.parse(socketData);
} catch (err) {
return this.onError(
{ socketData, err },
"received invalid websocket message from client",
ErrorCode.Parse
);
}
switch (data.type) {
case "request":
await this.handleRequest(data);
break;
default:
return this.onError(
{ data },
"received invalid message type from client",
ErrorCode.BadRequest
);
}
}
private async handleRequest(request: ws.ClientRequest) {
let response: ws.ServerResponseData;
try {
if (!this.requestHandlers[request.method]) {
// noinspection ExceptionCaughtLocallyJS
throw new RpcError("received invalid client request method");
}
response = await rpc.handleRequest(this.requestHandlers, request, this);
} catch (err) {
if (err instanceof RpcError) {
log.debug({ err }, "rpc error");
response = { result: "error", error: err.toJSON() };
} else {
log.error(
{ method: request.method, err },
"unhandled error during processing of client request"
);
response = {
result: "error",
error: {
code: ErrorCode.Internal,
message: "unhandled error during processing of client request",
data: err.toString()
}
};
}
}
this.sendResponse(request.method, request.id, response);
}
private onError(
data: any,
message: string,
code: number = ErrorCode.Internal
) {
log.error(data, message);
const errorData: ws.IError = { code, message, data };
this.sendNotification("error", errorData);
}
}
class WebSocketRequestHandlers implements ws.ClientRequestHandlers {
async authenticate(
this: WebSocketConnection,
data: ws.IAuthenticateRequest
): Promise<ws.ServerResponseData<"authenticate">> {
if (!data.accessToken) {
throw new RpcError("no token specified", ErrorCode.BadRequest);
}
let claims: AccessToken;
try {
claims = await verifyToken<AccessToken>(data.accessToken, "access");
} catch (e) {
throw new RpcError("invalid token", ErrorCode.BadToken, e);
}
this.userId = claims.aud;
this.user =
(await this.state.database.users.findById(this.userId, {
devices: true
})) || null;
if (!this.user) {
throw new RpcError("user no longer exists", ErrorCode.BadToken);
}
log.debug(
{ userId: claims.aud, name: claims.name },
"authenticated websocket client"
);
this.subscribeBrokerConnection();
return {
result: "success",
data: {
authenticated: true,
message: "authenticated",
user: this.user.toJSON()
}
};
}
async deviceSubscribe(
this: WebSocketConnection,
data: ws.IDeviceSubscribeRequest
): Promise<ws.ServerResponseData<"deviceSubscribe">> {
this.checkAuthorization();
const userDevice = this.checkDevice(data.deviceId);
const deviceId = userDevice.deviceId!;
if (!this.deviceSubscriptions.has(deviceId)) {
const device = this.state.mqttClient.acquireDevice(deviceId);
log.debug(
{ deviceId, userId: this.userId },
"websocket client subscribed to device"
);
const autorunDisposer = autorun(
() => {
const json = serialize(schema.sprinklersDevice, device);
log.trace({ device: json });
const updateData: ws.IDeviceUpdate = { deviceId, data: json };
this.sendNotification("deviceUpdate", updateData);
},
{ delay: 100 }
);
this.deviceSubscriptions.set(deviceId, () => {
autorunDisposer();
device.release();
this.deviceSubscriptions.delete(deviceId);
});
}
const response: ws.IDeviceSubscribeResponse = {
deviceId
};
return { result: "success", data: response };
}
async deviceUnsubscribe(
this: WebSocketConnection,
data: ws.IDeviceSubscribeRequest
): Promise<ws.ServerResponseData<"deviceUnsubscribe">> {
this.checkAuthorization();
const userDevice = this.checkDevice(data.deviceId);
const deviceId = userDevice.deviceId!;
const disposer = this.deviceSubscriptions.get(deviceId);
if (disposer) {
disposer();
}
const response: ws.IDeviceSubscribeResponse = {
deviceId
};
return { result: "success", data: response };
}
async deviceCall(
this: WebSocketConnection,
data: ws.IDeviceCallRequest
): Promise<ws.ServerResponseData<"deviceCall">> {
this.checkAuthorization();
try {
const response = await this.doDeviceCallRequest(data);
const resData: ws.IDeviceCallResponse = {
data: response
};
return { result: "success", data: resData };
} catch (err) {
const e: deviceRequests.ErrorResponseData = err;
throw new RpcError(e.message, e.code, e);
}
}
}