You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

352 lines
9.6 KiB

import { autorun, observable } from "mobx";
import * as mqtt from "mqtt";
import { ErrorCode } from "@common/ErrorCode";
import logger from "@common/logger";
import * as s from "@common/sprinklersRpc";
import * as requests from "@common/sprinklersRpc/deviceRequests";
import { RpcError } from "@common/sprinklersRpc/RpcError";
import { seralizeRequest } from "@common/sprinklersRpc/schema/requests";
import { getRandomId } from "@common/utils";
import { MqttProgram } from "./MqttProgram";
import { MqttSection } from "./MqttSection";
import { MqttSectionRunner } from "./MqttSectionRunner";
const log = logger.child({ source: "mqtt" });
interface WithRid {
rid: number;
}
export const DEVICE_PREFIX = "devices";
const REQUEST_TIMEOUT = 5000;
export interface MqttRpcClientOptions {
mqttUri: string;
username?: string;
password?: string;
}
export class MqttRpcClient extends s.SprinklersRPC
implements MqttRpcClientOptions {
get connected(): boolean {
return this.connectionState.isServerConnected || false;
}
private static newClientId() {
return "sprinklers3-MqttApiClient-" + getRandomId();
}
mqttUri!: string;
username?: string;
password?: string;
client!: mqtt.Client;
@observable
connectionState: s.ConnectionState = new s.ConnectionState();
devices: Map<string, MqttSprinklersDevice> = new Map();
constructor(opts: MqttRpcClientOptions) {
super();
Object.assign(this, opts);
this.connectionState.serverToBroker = false;
}
start() {
const clientId = MqttRpcClient.newClientId();
const mqttUri = this.mqttUri;
log.info({ mqttUri, clientId }, "connecting to mqtt broker with client id");
this.client = mqtt.connect(
mqttUri,
{
clientId,
connectTimeout: 5000,
reconnectPeriod: 5000,
username: this.username,
password: this.password
}
);
this.client.on("message", this.onMessageArrived.bind(this));
this.client.on("close", () => {
logger.warn("mqtt disconnected");
this.connectionState.serverToBroker = false;
});
this.client.on("error", err => {
log.error({ err }, "mqtt error");
});
this.client.on("connect", () => {
log.info("mqtt connected");
this.connectionState.serverToBroker = true;
});
}
releaseDevice(id: string) {
const device = this.devices.get(id);
if (!device) {
return;
}
device.doUnsubscribe();
this.devices.delete(id);
}
protected getDevice(id: string): s.SprinklersDevice {
if (/\//.test(id)) {
throw new Error("Device id cannot contain a /");
}
let device = this.devices.get(id);
if (!device) {
this.devices.set(id, (device = new MqttSprinklersDevice(this, id)));
if (this.connected) {
device.doSubscribe();
}
}
return device;
}
private onMessageArrived(
topic: string,
payload: Buffer,
packet: mqtt.Packet
) {
try {
this.processMessage(topic, payload, packet);
} catch (err) {
log.error({ err }, "error while processing mqtt message");
}
}
private processMessage(
topic: string,
payloadBuf: Buffer,
packet: mqtt.Packet
) {
const payload = payloadBuf.toString("utf8");
log.trace({ topic, payload }, "message arrived: ");
const regexp = new RegExp(`^${DEVICE_PREFIX}\\/([^\\/]+)\\/?(.*)$`);
const matches = regexp.exec(topic);
if (!matches) {
return log.warn({ topic }, "received message on invalid topic");
}
const id = matches[1];
const topicSuffix = matches[2];
const device = this.devices.get(id);
if (!device) {
log.debug({ id }, "received message for unknown device");
return;
}
device.onMessage(topicSuffix, payload);
}
}
type ResponseCallback = (response: requests.Response) => void;
const subscriptions = [
"/connected",
"/sections",
"/sections/+/#",
"/programs",
"/programs/+/#",
"/responses",
"/section_runner"
];
type IHandler = (payload: any, ...matches: string[]) => void;
interface IHandlerEntry {
test: RegExp;
handler: IHandler;
}
const handler = (test: RegExp) => (
target: MqttSprinklersDevice,
propertyKey: string,
descriptor: TypedPropertyDescriptor<IHandler>
) => {
if (typeof descriptor.value === "function") {
const entry = {
test,
handler: descriptor.value
};
(target.handlers || (target.handlers = [])).push(entry);
}
};
class MqttSprinklersDevice extends s.SprinklersDevice {
readonly apiClient: MqttRpcClient;
handlers!: IHandlerEntry[];
private subscriptions: string[];
private nextRequestId: number = Math.floor(Math.random() * 1000000000);
private responseCallbacks: Map<number, ResponseCallback> = new Map();
constructor(apiClient: MqttRpcClient, id: string) {
super(apiClient, id);
this.sectionConstructor = MqttSection;
this.sectionRunnerConstructor = MqttSectionRunner;
this.programConstructor = MqttProgram;
this.apiClient = apiClient;
this.sectionRunner = new MqttSectionRunner(this);
this.subscriptions = subscriptions.map(filter => this.prefix + filter);
autorun(() => {
const brokerConnected = apiClient.connected;
this.connectionState.serverToBroker = brokerConnected;
if (brokerConnected) {
if (this.connectionState.brokerToDevice == null) {
this.connectionState.brokerToDevice = false;
}
this.doSubscribe();
} else {
this.connectionState.brokerToDevice = false;
}
});
}
get prefix(): string {
return DEVICE_PREFIX + "/" + this.id;
}
doSubscribe() {
this.apiClient.client.subscribe(this.subscriptions, { qos: 1 }, err => {
if (err) {
log.error({ err, id: this.id }, "error subscribing to device");
} else {
log.debug({ id: this.id }, "subscribed to device");
}
});
}
doUnsubscribe() {
this.apiClient.client.unsubscribe(this.subscriptions, (err: Error | undefined) => {
if (err) {
log.error({ err, id: this.id }, "error unsubscribing to device");
} else {
log.debug({ id: this.id }, "unsubscribed to device");
}
});
}
onMessage(topic: string, payload: string) {
for (const { test, handler: hndlr } of this.handlers) {
const matches = topic.match(test);
if (!matches) {
continue;
}
matches.shift();
hndlr.call(this, payload, ...matches);
return;
}
log.warn(
{ topic },
"MqttSprinklersDevice recieved message on invalid topic"
);
}
makeRequest(request: requests.Request): Promise<requests.Response> {
return new Promise<requests.Response>((resolve, reject) => {
const topic = this.prefix + "/requests";
const json = seralizeRequest(request);
const requestId = (json.rid = this.getRequestId());
const payloadStr = JSON.stringify(json);
let timeoutHandle: any;
const callback: ResponseCallback = data => {
if (data.result === "error") {
reject(new RpcError(data.message, data.code, data));
} else {
resolve(data);
}
this.responseCallbacks.delete(requestId);
clearTimeout(timeoutHandle);
};
timeoutHandle = setTimeout(() => {
reject(new RpcError("the request has timed out", ErrorCode.Timeout));
this.responseCallbacks.delete(requestId);
clearTimeout(timeoutHandle);
}, REQUEST_TIMEOUT);
this.responseCallbacks.set(requestId, callback);
this.apiClient.client.publish(topic, payloadStr, { qos: 1 });
});
}
private getRequestId(): number {
return this.nextRequestId++;
}
/* tslint:disable:no-unused-variable */
@handler(/^connected$/)
private handleConnected(payload: string) {
this.connectionState.brokerToDevice = payload === "true";
log.trace(
`MqttSprinklersDevice with prefix ${this.prefix}: ${this.connected}`
);
return;
}
@handler(/^sections(?:\/(\d+)(?:\/?(.+))?)?$/)
private handleSectionsUpdate(
payload: string,
secNumStr?: string,
subTopic?: string
) {
log.trace(
{ section: secNumStr, topic: subTopic, payload },
"handling section update"
);
if (!secNumStr) {
// new number of sections
this.sections.length = Number(payload);
} else {
const secNum = Number(secNumStr);
let section = this.sections[secNum];
if (!section) {
this.sections[secNum] = section = new MqttSection(this, secNum);
}
(section as MqttSection).onMessage(payload, subTopic);
}
}
@handler(/^programs(?:\/(\d+)(?:\/?(.+))?)?$/)
private handleProgramsUpdate(
payload: string,
progNumStr?: string,
subTopic?: string
) {
log.trace(
{ program: progNumStr, topic: subTopic, payload },
"handling program update"
);
if (!progNumStr) {
// new number of programs
this.programs.length = Number(payload);
} else {
const progNum = Number(progNumStr);
let program = this.programs[progNum];
if (!program) {
this.programs[progNum] = program = new MqttProgram(this, progNum);
}
(program as MqttProgram).onMessage(payload, subTopic);
}
}
@handler(/^section_runner$/)
private handleSectionRunnerUpdate(payload: string) {
(this.sectionRunner as MqttSectionRunner).onMessage(payload);
}
@handler(/^responses$/)
private handleResponse(payload: string) {
const data = JSON.parse(payload) as requests.Response & WithRid;
log.trace({ rid: data.rid }, "handling request response");
const cb = this.responseCallbacks.get(data.rid);
if (typeof cb === "function") {
delete data.rid;
cb(data);
}
}
/* tslint:enable:no-unused-variable */
}