You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
352 lines
9.6 KiB
352 lines
9.6 KiB
import { autorun, observable } from "mobx"; |
|
import * as mqtt from "mqtt"; |
|
|
|
import { ErrorCode } from "@common/ErrorCode"; |
|
import logger from "@common/logger"; |
|
import * as s from "@common/sprinklersRpc"; |
|
import * as requests from "@common/sprinklersRpc/deviceRequests"; |
|
import { RpcError } from "@common/sprinklersRpc/RpcError"; |
|
import { seralizeRequest } from "@common/sprinklersRpc/schema/requests"; |
|
import { getRandomId } from "@common/utils"; |
|
|
|
import { MqttProgram } from "./MqttProgram"; |
|
import { MqttSection } from "./MqttSection"; |
|
import { MqttSectionRunner } from "./MqttSectionRunner"; |
|
|
|
const log = logger.child({ source: "mqtt" }); |
|
|
|
interface WithRid { |
|
rid: number; |
|
} |
|
|
|
export const DEVICE_PREFIX = "devices"; |
|
const REQUEST_TIMEOUT = 5000; |
|
|
|
export interface MqttRpcClientOptions { |
|
mqttUri: string; |
|
username?: string; |
|
password?: string; |
|
} |
|
|
|
export class MqttRpcClient extends s.SprinklersRPC |
|
implements MqttRpcClientOptions { |
|
get connected(): boolean { |
|
return this.connectionState.isServerConnected || false; |
|
} |
|
|
|
private static newClientId() { |
|
return "sprinklers3-MqttApiClient-" + getRandomId(); |
|
} |
|
|
|
mqttUri!: string; |
|
username?: string; |
|
password?: string; |
|
|
|
client!: mqtt.Client; |
|
@observable |
|
connectionState: s.ConnectionState = new s.ConnectionState(); |
|
devices: Map<string, MqttSprinklersDevice> = new Map(); |
|
|
|
constructor(opts: MqttRpcClientOptions) { |
|
super(); |
|
Object.assign(this, opts); |
|
this.connectionState.serverToBroker = false; |
|
} |
|
|
|
start() { |
|
const clientId = MqttRpcClient.newClientId(); |
|
const mqttUri = this.mqttUri; |
|
log.info({ mqttUri, clientId }, "connecting to mqtt broker with client id"); |
|
this.client = mqtt.connect( |
|
mqttUri, |
|
{ |
|
clientId, |
|
connectTimeout: 5000, |
|
reconnectPeriod: 5000, |
|
username: this.username, |
|
password: this.password |
|
} |
|
); |
|
this.client.on("message", this.onMessageArrived.bind(this)); |
|
this.client.on("close", () => { |
|
logger.warn("mqtt disconnected"); |
|
this.connectionState.serverToBroker = false; |
|
}); |
|
this.client.on("error", err => { |
|
log.error({ err }, "mqtt error"); |
|
}); |
|
this.client.on("connect", () => { |
|
log.info("mqtt connected"); |
|
this.connectionState.serverToBroker = true; |
|
}); |
|
} |
|
|
|
releaseDevice(id: string) { |
|
const device = this.devices.get(id); |
|
if (!device) { |
|
return; |
|
} |
|
device.doUnsubscribe(); |
|
this.devices.delete(id); |
|
} |
|
|
|
protected getDevice(id: string): s.SprinklersDevice { |
|
if (/\//.test(id)) { |
|
throw new Error("Device id cannot contain a /"); |
|
} |
|
let device = this.devices.get(id); |
|
if (!device) { |
|
this.devices.set(id, (device = new MqttSprinklersDevice(this, id))); |
|
if (this.connected) { |
|
device.doSubscribe(); |
|
} |
|
} |
|
return device; |
|
} |
|
|
|
private onMessageArrived( |
|
topic: string, |
|
payload: Buffer, |
|
packet: mqtt.Packet |
|
) { |
|
try { |
|
this.processMessage(topic, payload, packet); |
|
} catch (err) { |
|
log.error({ err }, "error while processing mqtt message"); |
|
} |
|
} |
|
|
|
private processMessage( |
|
topic: string, |
|
payloadBuf: Buffer, |
|
packet: mqtt.Packet |
|
) { |
|
const payload = payloadBuf.toString("utf8"); |
|
log.trace({ topic, payload }, "message arrived: "); |
|
const regexp = new RegExp(`^${DEVICE_PREFIX}\\/([^\\/]+)\\/?(.*)$`); |
|
const matches = regexp.exec(topic); |
|
if (!matches) { |
|
return log.warn({ topic }, "received message on invalid topic"); |
|
} |
|
const id = matches[1]; |
|
const topicSuffix = matches[2]; |
|
const device = this.devices.get(id); |
|
if (!device) { |
|
log.debug({ id }, "received message for unknown device"); |
|
return; |
|
} |
|
device.onMessage(topicSuffix, payload); |
|
} |
|
} |
|
|
|
type ResponseCallback = (response: requests.Response) => void; |
|
|
|
const subscriptions = [ |
|
"/connected", |
|
"/sections", |
|
"/sections/+/#", |
|
"/programs", |
|
"/programs/+/#", |
|
"/responses", |
|
"/section_runner" |
|
]; |
|
|
|
type IHandler = (payload: any, ...matches: string[]) => void; |
|
|
|
interface IHandlerEntry { |
|
test: RegExp; |
|
handler: IHandler; |
|
} |
|
|
|
const handler = (test: RegExp) => ( |
|
target: MqttSprinklersDevice, |
|
propertyKey: string, |
|
descriptor: TypedPropertyDescriptor<IHandler> |
|
) => { |
|
if (typeof descriptor.value === "function") { |
|
const entry = { |
|
test, |
|
handler: descriptor.value |
|
}; |
|
(target.handlers || (target.handlers = [])).push(entry); |
|
} |
|
}; |
|
|
|
class MqttSprinklersDevice extends s.SprinklersDevice { |
|
readonly apiClient: MqttRpcClient; |
|
|
|
handlers!: IHandlerEntry[]; |
|
private subscriptions: string[]; |
|
private nextRequestId: number = Math.floor(Math.random() * 1000000000); |
|
private responseCallbacks: Map<number, ResponseCallback> = new Map(); |
|
|
|
constructor(apiClient: MqttRpcClient, id: string) { |
|
super(apiClient, id); |
|
this.sectionConstructor = MqttSection; |
|
this.sectionRunnerConstructor = MqttSectionRunner; |
|
this.programConstructor = MqttProgram; |
|
this.apiClient = apiClient; |
|
this.sectionRunner = new MqttSectionRunner(this); |
|
this.subscriptions = subscriptions.map(filter => this.prefix + filter); |
|
|
|
autorun(() => { |
|
const brokerConnected = apiClient.connected; |
|
this.connectionState.serverToBroker = brokerConnected; |
|
if (brokerConnected) { |
|
if (this.connectionState.brokerToDevice == null) { |
|
this.connectionState.brokerToDevice = false; |
|
} |
|
this.doSubscribe(); |
|
} else { |
|
this.connectionState.brokerToDevice = false; |
|
} |
|
}); |
|
} |
|
|
|
get prefix(): string { |
|
return DEVICE_PREFIX + "/" + this.id; |
|
} |
|
|
|
doSubscribe() { |
|
this.apiClient.client.subscribe(this.subscriptions, { qos: 1 }, err => { |
|
if (err) { |
|
log.error({ err, id: this.id }, "error subscribing to device"); |
|
} else { |
|
log.debug({ id: this.id }, "subscribed to device"); |
|
} |
|
}); |
|
} |
|
|
|
doUnsubscribe() { |
|
this.apiClient.client.unsubscribe(this.subscriptions, (err: Error | undefined) => { |
|
if (err) { |
|
log.error({ err, id: this.id }, "error unsubscribing to device"); |
|
} else { |
|
log.debug({ id: this.id }, "unsubscribed to device"); |
|
} |
|
}); |
|
} |
|
|
|
onMessage(topic: string, payload: string) { |
|
for (const { test, handler: hndlr } of this.handlers) { |
|
const matches = topic.match(test); |
|
if (!matches) { |
|
continue; |
|
} |
|
matches.shift(); |
|
hndlr.call(this, payload, ...matches); |
|
return; |
|
} |
|
log.warn( |
|
{ topic }, |
|
"MqttSprinklersDevice recieved message on invalid topic" |
|
); |
|
} |
|
|
|
makeRequest(request: requests.Request): Promise<requests.Response> { |
|
return new Promise<requests.Response>((resolve, reject) => { |
|
const topic = this.prefix + "/requests"; |
|
const json = seralizeRequest(request); |
|
const requestId = (json.rid = this.getRequestId()); |
|
const payloadStr = JSON.stringify(json); |
|
|
|
let timeoutHandle: any; |
|
const callback: ResponseCallback = data => { |
|
if (data.result === "error") { |
|
reject(new RpcError(data.message, data.code, data)); |
|
} else { |
|
resolve(data); |
|
} |
|
this.responseCallbacks.delete(requestId); |
|
clearTimeout(timeoutHandle); |
|
}; |
|
|
|
timeoutHandle = setTimeout(() => { |
|
reject(new RpcError("the request has timed out", ErrorCode.Timeout)); |
|
this.responseCallbacks.delete(requestId); |
|
clearTimeout(timeoutHandle); |
|
}, REQUEST_TIMEOUT); |
|
|
|
this.responseCallbacks.set(requestId, callback); |
|
this.apiClient.client.publish(topic, payloadStr, { qos: 1 }); |
|
}); |
|
} |
|
|
|
private getRequestId(): number { |
|
return this.nextRequestId++; |
|
} |
|
|
|
/* tslint:disable:no-unused-variable */ |
|
@handler(/^connected$/) |
|
private handleConnected(payload: string) { |
|
this.connectionState.brokerToDevice = payload === "true"; |
|
log.trace( |
|
`MqttSprinklersDevice with prefix ${this.prefix}: ${this.connected}` |
|
); |
|
return; |
|
} |
|
|
|
@handler(/^sections(?:\/(\d+)(?:\/?(.+))?)?$/) |
|
private handleSectionsUpdate( |
|
payload: string, |
|
secNumStr?: string, |
|
subTopic?: string |
|
) { |
|
log.trace( |
|
{ section: secNumStr, topic: subTopic, payload }, |
|
"handling section update" |
|
); |
|
if (!secNumStr) { |
|
// new number of sections |
|
this.sections.length = Number(payload); |
|
} else { |
|
const secNum = Number(secNumStr); |
|
let section = this.sections[secNum]; |
|
if (!section) { |
|
this.sections[secNum] = section = new MqttSection(this, secNum); |
|
} |
|
(section as MqttSection).onMessage(payload, subTopic); |
|
} |
|
} |
|
|
|
@handler(/^programs(?:\/(\d+)(?:\/?(.+))?)?$/) |
|
private handleProgramsUpdate( |
|
payload: string, |
|
progNumStr?: string, |
|
subTopic?: string |
|
) { |
|
log.trace( |
|
{ program: progNumStr, topic: subTopic, payload }, |
|
"handling program update" |
|
); |
|
if (!progNumStr) { |
|
// new number of programs |
|
this.programs.length = Number(payload); |
|
} else { |
|
const progNum = Number(progNumStr); |
|
let program = this.programs[progNum]; |
|
if (!program) { |
|
this.programs[progNum] = program = new MqttProgram(this, progNum); |
|
} |
|
(program as MqttProgram).onMessage(payload, subTopic); |
|
} |
|
} |
|
|
|
@handler(/^section_runner$/) |
|
private handleSectionRunnerUpdate(payload: string) { |
|
(this.sectionRunner as MqttSectionRunner).onMessage(payload); |
|
} |
|
|
|
@handler(/^responses$/) |
|
private handleResponse(payload: string) { |
|
const data = JSON.parse(payload) as requests.Response & WithRid; |
|
log.trace({ rid: data.rid }, "handling request response"); |
|
const cb = this.responseCallbacks.get(data.rid); |
|
if (typeof cb === "function") { |
|
delete data.rid; |
|
cb(data); |
|
} |
|
} |
|
|
|
/* tslint:enable:no-unused-variable */ |
|
}
|
|
|