|
|
|
@ -1,5 +1,6 @@
@@ -1,5 +1,6 @@
|
|
|
|
|
import * as mqtt from "../node_modules/mqtt"; |
|
|
|
|
|
|
|
|
|
import logger from "./logger"; |
|
|
|
|
import { |
|
|
|
|
Duration, |
|
|
|
|
ISprinklersApi, |
|
|
|
@ -14,6 +15,8 @@ import {
@@ -14,6 +15,8 @@ import {
|
|
|
|
|
} from "./sprinklers"; |
|
|
|
|
import { checkedIndexOf } from "./utils"; |
|
|
|
|
|
|
|
|
|
const log = logger.child({ source: "mqtt" }); |
|
|
|
|
|
|
|
|
|
export class MqttApiClient implements ISprinklersApi { |
|
|
|
|
readonly mqttUri: string; |
|
|
|
|
client: mqtt.Client; |
|
|
|
@ -30,7 +33,7 @@ export class MqttApiClient implements ISprinklersApi {
@@ -30,7 +33,7 @@ export class MqttApiClient implements ISprinklersApi {
|
|
|
|
|
|
|
|
|
|
start() { |
|
|
|
|
const clientId = MqttApiClient.newClientId(); |
|
|
|
|
console.log("connecting to mqtt with client id %s", clientId); |
|
|
|
|
log.info({ clientId }, "connecting to mqtt with client id"); |
|
|
|
|
this.client = mqtt.connect(this.mqttUri, { |
|
|
|
|
clientId, |
|
|
|
|
}); |
|
|
|
@ -38,11 +41,11 @@ export class MqttApiClient implements ISprinklersApi {
@@ -38,11 +41,11 @@ export class MqttApiClient implements ISprinklersApi {
|
|
|
|
|
this.client.on("offline", () => { |
|
|
|
|
this.connected = false; |
|
|
|
|
}); |
|
|
|
|
this.client.on("error", (e) => { |
|
|
|
|
console.error("mqtt error: ", e); |
|
|
|
|
this.client.on("error", (err) => { |
|
|
|
|
log.error({ err }, "mqtt error"); |
|
|
|
|
}); |
|
|
|
|
this.client.on("connect", () => { |
|
|
|
|
console.log("mqtt connected"); |
|
|
|
|
log.info("mqtt connected"); |
|
|
|
|
this.connected = true; |
|
|
|
|
for (const prefix of Object.keys(this.devices)) { |
|
|
|
|
const device = this.devices[prefix]; |
|
|
|
@ -76,19 +79,19 @@ export class MqttApiClient implements ISprinklersApi {
@@ -76,19 +79,19 @@ export class MqttApiClient implements ISprinklersApi {
|
|
|
|
|
private onMessageArrived(topic: string, payload: Buffer, packet: mqtt.Packet) { |
|
|
|
|
try { |
|
|
|
|
this.processMessage(topic, payload, packet); |
|
|
|
|
} catch (e) { |
|
|
|
|
console.error("error while processing mqtt message", e); |
|
|
|
|
} catch (err) { |
|
|
|
|
log.error({ err }, "error while processing mqtt message"); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private processMessage(topic: string, payload: Buffer, packet: mqtt.Packet) { |
|
|
|
|
console.log("message arrived: ", { topic, payload }); |
|
|
|
|
log.trace({ topic, payload }, "message arrived: "); |
|
|
|
|
const topicIdx = topic.indexOf("/"); // find the first /
|
|
|
|
|
const prefix = topic.substr(0, topicIdx); // assume prefix does not contain a /
|
|
|
|
|
const topicSuffix = topic.substr(topicIdx + 1); |
|
|
|
|
const device = this.devices[prefix]; |
|
|
|
|
if (!device) { |
|
|
|
|
console.warn(`received message for unknown device. prefix: ${prefix}`); |
|
|
|
|
log.debug({ prefix }, "received message for unknown device"); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
device.onMessage(topicSuffix, payload); |
|
|
|
@ -143,7 +146,7 @@ class MqttSprinklersDevice extends SprinklersDevice {
@@ -143,7 +146,7 @@ class MqttSprinklersDevice extends SprinklersDevice {
|
|
|
|
|
const payload = payloadBuf.toString("utf8"); |
|
|
|
|
if (topic === "connected") { |
|
|
|
|
this.connected = (payload === "true"); |
|
|
|
|
// console.log(`MqttSprinklersDevice with prefix ${this.prefix}: ${this.connected}`)
|
|
|
|
|
log.trace(`MqttSprinklersDevice with prefix ${this.prefix}: ${this.connected}`); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
let matches = topic.match(/^sections(?:\/(\d+)(?:\/?(.+))?)?$/); |
|
|
|
@ -151,7 +154,7 @@ class MqttSprinklersDevice extends SprinklersDevice {
@@ -151,7 +154,7 @@ class MqttSprinklersDevice extends SprinklersDevice {
|
|
|
|
|
//noinspection JSUnusedLocalSymbols
|
|
|
|
|
/* tslint:disable-next-line:no-unused-variable */ |
|
|
|
|
const [_topic, secStr, subTopic] = matches; |
|
|
|
|
// console.log(`section: ${secStr}, topic: ${subTopic}, payload: ${payload}`);
|
|
|
|
|
log.trace({ section: secStr, topic: subTopic, payload }); |
|
|
|
|
if (!secStr) { // new number of sections
|
|
|
|
|
this.sections.length = Number(payload); |
|
|
|
|
} else { |
|
|
|
@ -169,7 +172,7 @@ class MqttSprinklersDevice extends SprinklersDevice {
@@ -169,7 +172,7 @@ class MqttSprinklersDevice extends SprinklersDevice {
|
|
|
|
|
//noinspection JSUnusedLocalSymbols
|
|
|
|
|
/* tslint:disable-next-line:no-unused-variable */ |
|
|
|
|
const [_topic, progStr, subTopic] = matches; |
|
|
|
|
// console.log(`program: ${progStr}, topic: ${subTopic}, payload: ${payload}`);
|
|
|
|
|
log.trace({ program: progStr, topic: subTopic, payload }); |
|
|
|
|
if (!progStr) { // new number of programs
|
|
|
|
|
this.programs.length = Number(payload); |
|
|
|
|
} else { |
|
|
|
@ -192,7 +195,7 @@ class MqttSprinklersDevice extends SprinklersDevice {
@@ -192,7 +195,7 @@ class MqttSprinklersDevice extends SprinklersDevice {
|
|
|
|
|
//noinspection JSUnusedLocalSymbols
|
|
|
|
|
/* tslint:disable-next-line:no-unused-variable */ |
|
|
|
|
const [_topic, respIdStr] = matches; |
|
|
|
|
// console.log(`response: ${respIdStr}`);
|
|
|
|
|
log.trace({ response: respIdStr }); |
|
|
|
|
const respId = parseInt(respIdStr, 10); |
|
|
|
|
const data = JSON.parse(payload) as IResponseData; |
|
|
|
|
const cb = this.responseCallbacks[respId]; |
|
|
|
@ -201,7 +204,7 @@ class MqttSprinklersDevice extends SprinklersDevice {
@@ -201,7 +204,7 @@ class MqttSprinklersDevice extends SprinklersDevice {
|
|
|
|
|
} |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
console.warn(`MqttSprinklersDevice recieved invalid topic: ${topic}`); |
|
|
|
|
log.warn({ topic }, "MqttSprinklersDevice recieved invalid message"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
runSection(section: Section | number, duration: Duration) { |
|
|
|
|