|
|
|
@ -12,7 +12,7 @@ export class MqttApiClient implements s.ISprinklersApi {
@@ -12,7 +12,7 @@ export class MqttApiClient implements s.ISprinklersApi {
|
|
|
|
|
readonly mqttUri: string; |
|
|
|
|
client: mqtt.Client; |
|
|
|
|
connected: boolean; |
|
|
|
|
devices: { [prefix: string]: MqttSprinklersDevice } = {}; |
|
|
|
|
devices: Map<string, MqttSprinklersDevice> = new Map(); |
|
|
|
|
|
|
|
|
|
constructor(mqttUri: string) { |
|
|
|
|
this.mqttUri = mqttUri; |
|
|
|
@ -38,8 +38,8 @@ export class MqttApiClient implements s.ISprinklersApi {
@@ -38,8 +38,8 @@ export class MqttApiClient implements s.ISprinklersApi {
|
|
|
|
|
this.client.on("connect", () => { |
|
|
|
|
log.info("mqtt connected"); |
|
|
|
|
this.connected = true; |
|
|
|
|
for (const prefix of Object.keys(this.devices)) { |
|
|
|
|
const device = this.devices[prefix]; |
|
|
|
|
const values = this.devices.values(); |
|
|
|
|
for (const device of values) { |
|
|
|
|
device.doSubscribe(); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
@ -49,22 +49,23 @@ export class MqttApiClient implements s.ISprinklersApi {
@@ -49,22 +49,23 @@ export class MqttApiClient implements s.ISprinklersApi {
|
|
|
|
|
if (/\//.test(prefix)) { |
|
|
|
|
throw new Error("Prefix cannot contain a /"); |
|
|
|
|
} |
|
|
|
|
if (!this.devices[prefix]) { |
|
|
|
|
const device = this.devices[prefix] = new MqttSprinklersDevice(this, prefix); |
|
|
|
|
let device = this.devices.get(prefix); |
|
|
|
|
if (!device) { |
|
|
|
|
this.devices.set(prefix, device = new MqttSprinklersDevice(this, prefix)); |
|
|
|
|
if (this.connected) { |
|
|
|
|
device.doSubscribe(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return this.devices[prefix]; |
|
|
|
|
return device; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
removeDevice(prefix: string) { |
|
|
|
|
const device = this.devices[prefix]; |
|
|
|
|
const device = this.devices.get(prefix); |
|
|
|
|
if (!device) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
device.doUnsubscribe(); |
|
|
|
|
delete this.devices[prefix]; |
|
|
|
|
this.devices.delete(prefix); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private onMessageArrived(topic: string, payload: Buffer, packet: mqtt.Packet) { |
|
|
|
@ -81,7 +82,7 @@ export class MqttApiClient implements s.ISprinklersApi {
@@ -81,7 +82,7 @@ export class MqttApiClient implements s.ISprinklersApi {
|
|
|
|
|
const topicIdx = topic.indexOf("/"); // find the first /
|
|
|
|
|
const prefix = topic.substr(0, topicIdx); // assume prefix does not contain a /
|
|
|
|
|
const topicSuffix = topic.substr(topicIdx + 1); |
|
|
|
|
const device = this.devices[prefix]; |
|
|
|
|
const device = this.devices.get(prefix); |
|
|
|
|
if (!device) { |
|
|
|
|
log.debug({ prefix }, "received message for unknown device"); |
|
|
|
|
return; |
|
|
|
@ -100,13 +101,29 @@ const subscriptions = [
@@ -100,13 +101,29 @@ const subscriptions = [
|
|
|
|
|
"/section_runner", |
|
|
|
|
]; |
|
|
|
|
|
|
|
|
|
type IHandler = (payload: any, ...matches: string[]) => void; |
|
|
|
|
|
|
|
|
|
interface IHandlerEntry { |
|
|
|
|
test: RegExp; |
|
|
|
|
handler: IHandler; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const handler = (test: RegExp) => |
|
|
|
|
(target: MqttSprinklersDevice, propertyKey: string, descriptor: TypedPropertyDescriptor<IHandler>) => { |
|
|
|
|
if (typeof descriptor.value === "function") { |
|
|
|
|
(target.handlers || (target.handlers = [])).push({ |
|
|
|
|
test, handler: descriptor.value, |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
class MqttSprinklersDevice extends s.SprinklersDevice { |
|
|
|
|
readonly apiClient: MqttApiClient; |
|
|
|
|
readonly prefix: string; |
|
|
|
|
|
|
|
|
|
private responseCallbacks: { |
|
|
|
|
[rid: number]: ResponseCallback; |
|
|
|
|
} = {}; |
|
|
|
|
handlers: IHandlerEntry[]; |
|
|
|
|
private nextRequestId: number = Math.floor(Math.random() * 1000000000); |
|
|
|
|
private responseCallbacks: Map<number, ResponseCallback> = new Map(); |
|
|
|
|
|
|
|
|
|
constructor(apiClient: MqttApiClient, prefix: string) { |
|
|
|
|
super(); |
|
|
|
@ -133,73 +150,17 @@ class MqttSprinklersDevice extends s.SprinklersDevice {
@@ -133,73 +150,17 @@ class MqttSprinklersDevice extends s.SprinklersDevice {
|
|
|
|
|
this.apiClient.client.unsubscribe(topics); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Updates this device with the specified message |
|
|
|
|
* @param topic The topic, with prefix removed |
|
|
|
|
* @param payload The payload buffer |
|
|
|
|
*/ |
|
|
|
|
onMessage(topic: string, payload: string) { |
|
|
|
|
if (topic === "connected") { |
|
|
|
|
this.connected = (payload === "true"); |
|
|
|
|
log.trace(`MqttSprinklersDevice with prefix ${this.prefix}: ${this.connected}`); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
let matches = topic.match(/^sections(?:\/(\d+)(?:\/?(.+))?)?$/); |
|
|
|
|
if (matches != null) { |
|
|
|
|
//noinspection JSUnusedLocalSymbols
|
|
|
|
|
/* tslint:disable-next-line:no-unused-variable */ |
|
|
|
|
const [_topic, secStr, subTopic] = matches; |
|
|
|
|
log.trace({ section: secStr, topic: subTopic, payload }); |
|
|
|
|
if (!secStr) { // new number of sections
|
|
|
|
|
this.sections.length = Number(payload); |
|
|
|
|
} else { |
|
|
|
|
const secNum = Number(secStr); |
|
|
|
|
let section = this.sections[secNum]; |
|
|
|
|
if (!section) { |
|
|
|
|
this.sections[secNum] = section = new MqttSection(this, secNum); |
|
|
|
|
} |
|
|
|
|
(section as MqttSection).onMessage(subTopic, payload); |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
matches = topic.match(/^programs(?:\/(\d+)(?:\/?(.+))?)?$/); |
|
|
|
|
if (matches != null) { |
|
|
|
|
//noinspection JSUnusedLocalSymbols
|
|
|
|
|
/* tslint:disable-next-line:no-unused-variable */ |
|
|
|
|
const [_topic, progStr, subTopic] = matches; |
|
|
|
|
log.trace({ program: progStr, topic: subTopic, payload }); |
|
|
|
|
if (!progStr) { // new number of programs
|
|
|
|
|
this.programs.length = Number(payload); |
|
|
|
|
} else { |
|
|
|
|
const progNum = Number(progStr); |
|
|
|
|
let program = this.programs[progNum]; |
|
|
|
|
if (!program) { |
|
|
|
|
this.programs[progNum] = program = new MqttProgram(this, progNum); |
|
|
|
|
} |
|
|
|
|
(program as MqttProgram).onMessage(subTopic, payload); |
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
matches = topic.match(/^section_runner$/); |
|
|
|
|
if (matches != null) { |
|
|
|
|
(this.sectionRunner as MqttSectionRunner).onMessage(payload); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
matches = topic.match(/^responses\/(\d+)$/); |
|
|
|
|
if (matches != null) { |
|
|
|
|
//noinspection JSUnusedLocalSymbols
|
|
|
|
|
/* tslint:disable-next-line:no-unused-variable */ |
|
|
|
|
const [_topic, respIdStr] = matches; |
|
|
|
|
log.trace({ response: respIdStr }); |
|
|
|
|
const respId = parseInt(respIdStr, 10); |
|
|
|
|
const data = JSON.parse(payload) as IResponseData; |
|
|
|
|
const cb = this.responseCallbacks[respId]; |
|
|
|
|
if (typeof cb === "function") { |
|
|
|
|
cb(data); |
|
|
|
|
for (const { test, handler: hndlr } of this.handlers) { |
|
|
|
|
const matches = topic.match(test); |
|
|
|
|
if (!matches) { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
matches.shift(); |
|
|
|
|
hndlr.call(this, payload, ...matches); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
log.warn({ topic }, "MqttSprinklersDevice recieved invalid message"); |
|
|
|
|
log.warn({ topic }, "MqttSprinklersDevice recieved message on invalid topic"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
runSection(section: s.Section | number, duration: s.Duration) { |
|
|
|
@ -227,26 +188,78 @@ class MqttSprinklersDevice extends s.SprinklersDevice {
@@ -227,26 +188,78 @@ class MqttSprinklersDevice extends s.SprinklersDevice {
|
|
|
|
|
return this.makeRequest(`section_runner/unpause`); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//noinspection JSMethodCanBeStatic
|
|
|
|
|
private nextRequestId(): number { |
|
|
|
|
return Math.floor(Math.random() * 1000000000); |
|
|
|
|
private getRequestId(): number { |
|
|
|
|
return this.nextRequestId++; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private makeRequest(topic: string, payload: any = {}): Promise<IResponseData> { |
|
|
|
|
return new Promise<IResponseData>((resolve, reject) => { |
|
|
|
|
const requestId = payload.rid = this.nextRequestId(); |
|
|
|
|
const requestId = payload.rid = this.getRequestId(); |
|
|
|
|
const payloadStr = JSON.stringify(payload); |
|
|
|
|
const fullTopic = this.prefix + "/" + topic; |
|
|
|
|
this.responseCallbacks[requestId] = (data) => { |
|
|
|
|
this.responseCallbacks.set(requestId, (data) => { |
|
|
|
|
if (data.error != null) { |
|
|
|
|
reject(data); |
|
|
|
|
} else { |
|
|
|
|
resolve(data); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
this.responseCallbacks.delete(requestId); |
|
|
|
|
}); |
|
|
|
|
this.apiClient.client.publish(fullTopic, payloadStr, { qos: 1 }); |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@handler(/^connected$/) |
|
|
|
|
private handleConnected(payload: string) { |
|
|
|
|
this.connected = (payload === "true"); |
|
|
|
|
log.trace(`MqttSprinklersDevice with prefix ${this.prefix}: ${this.connected}`); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@handler(/^sections(?:\/(\d+)(?:\/?(.+))?)?$/) |
|
|
|
|
private handleSectionsUpdate(payload: string, secNumStr?: string, subTopic?: string) { |
|
|
|
|
log.trace({ section: secNumStr, topic: subTopic, payload }, "handling section update"); |
|
|
|
|
if (!secNumStr) { // new number of sections
|
|
|
|
|
this.sections.length = Number(payload); |
|
|
|
|
} else { |
|
|
|
|
const secNum = Number(secNumStr); |
|
|
|
|
let section = this.sections[secNum]; |
|
|
|
|
if (!section) { |
|
|
|
|
this.sections[secNum] = section = new MqttSection(this, secNum); |
|
|
|
|
} |
|
|
|
|
(section as MqttSection).onMessage(payload, subTopic); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@handler(/^programs(?:\/(\d+)(?:\/?(.+))?)?$/) |
|
|
|
|
private handleProgramsUpdate(payload: string, progNumStr?: string, subTopic?: string) { |
|
|
|
|
log.trace({ program: progNumStr, topic: subTopic, payload }, "handling program update"); |
|
|
|
|
if (!progNumStr) { // new number of programs
|
|
|
|
|
this.programs.length = Number(payload); |
|
|
|
|
} else { |
|
|
|
|
const progNum = Number(progNumStr); |
|
|
|
|
let program = this.programs[progNum]; |
|
|
|
|
if (!program) { |
|
|
|
|
this.programs[progNum] = program = new MqttProgram(this, progNum); |
|
|
|
|
} |
|
|
|
|
(program as MqttProgram).onMessage(payload, subTopic); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@handler(/^section_runner$/) |
|
|
|
|
private handleSectionRunnerUpdate(payload: string) { |
|
|
|
|
(this.sectionRunner as MqttSectionRunner).onMessage(payload); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@handler(/^responses\/(\d+)$/) |
|
|
|
|
private handleResponse(payload: string, responseIdStr: string) { |
|
|
|
|
log.trace({ response: responseIdStr }, "handling request response"); |
|
|
|
|
const respId = parseInt(responseIdStr, 10); |
|
|
|
|
const data = JSON.parse(payload) as IResponseData; |
|
|
|
|
const cb = this.responseCallbacks.get(respId); |
|
|
|
|
if (typeof cb === "function") { |
|
|
|
|
cb(data); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -264,7 +277,7 @@ interface IRunSectionJSON {
@@ -264,7 +277,7 @@ interface IRunSectionJSON {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
class MqttSection extends s.Section { |
|
|
|
|
onMessage(topic: string, payload: string) { |
|
|
|
|
onMessage(payload: string, topic: string | undefined) { |
|
|
|
|
if (topic === "state") { |
|
|
|
|
this.state = (payload === "true"); |
|
|
|
|
} else if (topic == null) { |
|
|
|
@ -278,7 +291,7 @@ class MqttSection extends s.Section {
@@ -278,7 +291,7 @@ class MqttSection extends s.Section {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
class MqttProgram extends s.Program { |
|
|
|
|
onMessage(topic: string, payload: string) { |
|
|
|
|
onMessage(payload: string, topic: string | undefined) { |
|
|
|
|
if (topic === "running") { |
|
|
|
|
this.running = (payload === "true"); |
|
|
|
|
} else if (topic == null) { |
|
|
|
|