2017-10-03 12:18:30 -06:00
|
|
|
import * as mqtt from "mqtt";
|
2017-10-05 14:31:07 -06:00
|
|
|
import { update } from "serializr";
|
2017-09-06 23:54:22 -06:00
|
|
|
|
2017-10-04 13:22:10 -06:00
|
|
|
import logger from "@common/logger";
|
2017-10-05 14:27:43 -06:00
|
|
|
import * as s from "@common/sprinklers";
|
|
|
|
import * as schema from "@common/sprinklers/json";
|
2017-10-04 13:22:10 -06:00
|
|
|
import { checkedIndexOf } from "@common/utils";
|
2017-05-02 20:03:48 -06:00
|
|
|
|
2017-09-30 11:04:25 -06:00
|
|
|
const log = logger.child({ source: "mqtt" });
|
|
|
|
|
2017-10-05 14:27:43 -06:00
|
|
|
export class MqttApiClient implements s.ISprinklersApi {
|
2017-09-27 19:20:50 -06:00
|
|
|
readonly mqttUri: string;
|
2017-09-26 14:24:15 -06:00
|
|
|
client: mqtt.Client;
|
2017-06-20 08:45:25 -06:00
|
|
|
connected: boolean;
|
|
|
|
devices: { [prefix: string]: MqttSprinklersDevice } = {};
|
2017-05-02 20:03:48 -06:00
|
|
|
|
2017-09-27 19:20:50 -06:00
|
|
|
constructor(mqttUri: string) {
|
|
|
|
this.mqttUri = mqttUri;
|
2017-05-02 20:03:48 -06:00
|
|
|
}
|
|
|
|
|
2017-06-30 01:08:51 -06:00
|
|
|
private static newClientId() {
|
|
|
|
return "sprinklers3-MqttApiClient-" + Math.round(Math.random() * 1000);
|
|
|
|
}
|
|
|
|
|
2017-06-20 08:45:25 -06:00
|
|
|
start() {
|
2017-09-26 14:24:15 -06:00
|
|
|
const clientId = MqttApiClient.newClientId();
|
2017-09-30 11:04:25 -06:00
|
|
|
log.info({ clientId }, "connecting to mqtt with client id");
|
2017-09-27 19:20:50 -06:00
|
|
|
this.client = mqtt.connect(this.mqttUri, {
|
2017-09-26 14:24:15 -06:00
|
|
|
clientId,
|
|
|
|
});
|
|
|
|
this.client.on("message", this.onMessageArrived.bind(this));
|
|
|
|
this.client.on("offline", () => {
|
|
|
|
this.connected = false;
|
|
|
|
});
|
2017-09-30 11:04:25 -06:00
|
|
|
this.client.on("error", (err) => {
|
|
|
|
log.error({ err }, "mqtt error");
|
2017-09-26 14:24:15 -06:00
|
|
|
});
|
|
|
|
this.client.on("connect", () => {
|
2017-09-30 11:04:25 -06:00
|
|
|
log.info("mqtt connected");
|
2017-09-26 14:24:15 -06:00
|
|
|
this.connected = true;
|
|
|
|
for (const prefix of Object.keys(this.devices)) {
|
|
|
|
const device = this.devices[prefix];
|
|
|
|
device.doSubscribe();
|
|
|
|
}
|
2017-05-06 15:39:25 -06:00
|
|
|
});
|
2017-05-02 20:03:48 -06:00
|
|
|
}
|
|
|
|
|
2017-10-05 14:27:43 -06:00
|
|
|
getDevice(prefix: string): s.SprinklersDevice {
|
2017-05-03 16:12:51 -06:00
|
|
|
if (/\//.test(prefix)) {
|
|
|
|
throw new Error("Prefix cannot contain a /");
|
|
|
|
}
|
2017-05-02 20:03:48 -06:00
|
|
|
if (!this.devices[prefix]) {
|
|
|
|
const device = this.devices[prefix] = new MqttSprinklersDevice(this, prefix);
|
|
|
|
if (this.connected) {
|
|
|
|
device.doSubscribe();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return this.devices[prefix];
|
|
|
|
}
|
|
|
|
|
2017-06-20 08:45:25 -06:00
|
|
|
removeDevice(prefix: string) {
|
2017-05-02 20:03:48 -06:00
|
|
|
const device = this.devices[prefix];
|
2017-05-06 15:39:25 -06:00
|
|
|
if (!device) {
|
|
|
|
return;
|
|
|
|
}
|
2017-05-02 20:03:48 -06:00
|
|
|
device.doUnsubscribe();
|
|
|
|
delete this.devices[prefix];
|
|
|
|
}
|
|
|
|
|
2017-09-26 14:24:15 -06:00
|
|
|
private onMessageArrived(topic: string, payload: Buffer, packet: mqtt.Packet) {
|
2017-05-07 23:30:36 -06:00
|
|
|
try {
|
2017-09-26 14:24:15 -06:00
|
|
|
this.processMessage(topic, payload, packet);
|
2017-09-30 11:04:25 -06:00
|
|
|
} catch (err) {
|
|
|
|
log.error({ err }, "error while processing mqtt message");
|
2017-05-07 23:30:36 -06:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-10-05 14:27:43 -06:00
|
|
|
private processMessage(topic: string, payloadBuf: Buffer, packet: mqtt.Packet) {
|
|
|
|
const payload = payloadBuf.toString("utf8");
|
2017-09-30 11:04:25 -06:00
|
|
|
log.trace({ topic, payload }, "message arrived: ");
|
2017-09-26 14:24:15 -06:00
|
|
|
const topicIdx = topic.indexOf("/"); // find the first /
|
|
|
|
const prefix = topic.substr(0, topicIdx); // assume prefix does not contain a /
|
|
|
|
const topicSuffix = topic.substr(topicIdx + 1);
|
2017-05-03 16:12:51 -06:00
|
|
|
const device = this.devices[prefix];
|
|
|
|
if (!device) {
|
2017-09-30 11:04:25 -06:00
|
|
|
log.debug({ prefix }, "received message for unknown device");
|
2017-05-03 16:12:51 -06:00
|
|
|
return;
|
2017-05-02 20:03:48 -06:00
|
|
|
}
|
2017-09-26 14:24:15 -06:00
|
|
|
device.onMessage(topicSuffix, payload);
|
2017-05-02 20:03:48 -06:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-09-26 14:24:15 -06:00
|
|
|
const subscriptions = [
|
2017-09-11 14:45:52 -06:00
|
|
|
"/connected",
|
|
|
|
"/sections",
|
|
|
|
"/sections/+/#",
|
|
|
|
"/programs",
|
|
|
|
"/programs/+/#",
|
|
|
|
"/responses/+",
|
|
|
|
"/section_runner",
|
|
|
|
];
|
|
|
|
|
2017-10-05 14:27:43 -06:00
|
|
|
class MqttSprinklersDevice extends s.SprinklersDevice {
|
2017-06-20 08:45:25 -06:00
|
|
|
readonly apiClient: MqttApiClient;
|
|
|
|
readonly prefix: string;
|
2017-05-02 20:03:48 -06:00
|
|
|
|
2017-05-30 16:45:25 -06:00
|
|
|
private responseCallbacks: {
|
|
|
|
[rid: number]: ResponseCallback;
|
|
|
|
} = {};
|
|
|
|
|
2017-05-02 20:03:48 -06:00
|
|
|
constructor(apiClient: MqttApiClient, prefix: string) {
|
|
|
|
super();
|
|
|
|
this.apiClient = apiClient;
|
|
|
|
this.prefix = prefix;
|
2017-06-30 00:09:25 -06:00
|
|
|
this.sectionRunner = new MqttSectionRunner(this);
|
2017-05-02 20:03:48 -06:00
|
|
|
}
|
|
|
|
|
2017-06-30 01:08:51 -06:00
|
|
|
get id(): string {
|
|
|
|
return this.prefix;
|
|
|
|
}
|
|
|
|
|
2017-10-05 14:27:43 -06:00
|
|
|
get sectionConstructor() { return MqttSection; }
|
|
|
|
get sectionRunnerConstructor() { return MqttSectionRunner; }
|
|
|
|
get programConstructor() { return MqttProgram; }
|
|
|
|
|
2017-06-20 08:45:25 -06:00
|
|
|
doSubscribe() {
|
2017-09-26 14:24:15 -06:00
|
|
|
const topics = subscriptions.map((filter) => this.prefix + filter);
|
|
|
|
this.apiClient.client.subscribe(topics, { qos: 1 });
|
2017-05-02 20:03:48 -06:00
|
|
|
}
|
|
|
|
|
2017-06-20 08:45:25 -06:00
|
|
|
doUnsubscribe() {
|
2017-09-26 14:24:15 -06:00
|
|
|
const topics = subscriptions.map((filter) => this.prefix + filter);
|
|
|
|
this.apiClient.client.unsubscribe(topics);
|
2017-05-02 20:03:48 -06:00
|
|
|
}
|
|
|
|
|
2017-05-03 16:12:51 -06:00
|
|
|
/**
|
|
|
|
* Updates this device with the specified message
|
|
|
|
* @param topic The topic, with prefix removed
|
2017-09-26 14:24:15 -06:00
|
|
|
* @param payload The payload buffer
|
2017-05-03 16:12:51 -06:00
|
|
|
*/
|
2017-10-05 14:27:43 -06:00
|
|
|
onMessage(topic: string, payload: string) {
|
2017-05-06 15:39:25 -06:00
|
|
|
if (topic === "connected") {
|
|
|
|
this.connected = (payload === "true");
|
2017-09-30 11:04:25 -06:00
|
|
|
log.trace(`MqttSprinklersDevice with prefix ${this.prefix}: ${this.connected}`);
|
2017-05-06 15:39:25 -06:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
let matches = topic.match(/^sections(?:\/(\d+)(?:\/?(.+))?)?$/);
|
|
|
|
if (matches != null) {
|
2017-06-20 08:45:25 -06:00
|
|
|
//noinspection JSUnusedLocalSymbols
|
2017-09-08 09:49:30 -06:00
|
|
|
/* tslint:disable-next-line:no-unused-variable */
|
2017-05-06 15:39:25 -06:00
|
|
|
const [_topic, secStr, subTopic] = matches;
|
2017-09-30 11:04:25 -06:00
|
|
|
log.trace({ section: secStr, topic: subTopic, payload });
|
2017-05-03 16:12:51 -06:00
|
|
|
if (!secStr) { // new number of sections
|
2017-05-07 14:25:52 -06:00
|
|
|
this.sections.length = Number(payload);
|
2017-05-03 16:12:51 -06:00
|
|
|
} else {
|
|
|
|
const secNum = Number(secStr);
|
2017-05-06 15:39:25 -06:00
|
|
|
let section = this.sections[secNum];
|
2017-05-03 16:12:51 -06:00
|
|
|
if (!section) {
|
2017-10-05 09:07:07 -06:00
|
|
|
this.sections[secNum] = section = new MqttSection(this, secNum);
|
2017-05-03 16:12:51 -06:00
|
|
|
}
|
|
|
|
(section as MqttSection).onMessage(subTopic, payload);
|
|
|
|
}
|
2017-05-06 15:39:25 -06:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
matches = topic.match(/^programs(?:\/(\d+)(?:\/?(.+))?)?$/);
|
|
|
|
if (matches != null) {
|
2017-06-20 08:45:25 -06:00
|
|
|
//noinspection JSUnusedLocalSymbols
|
2017-09-08 09:49:30 -06:00
|
|
|
/* tslint:disable-next-line:no-unused-variable */
|
2017-05-06 15:39:25 -06:00
|
|
|
const [_topic, progStr, subTopic] = matches;
|
2017-09-30 11:04:25 -06:00
|
|
|
log.trace({ program: progStr, topic: subTopic, payload });
|
2017-05-03 23:00:10 -06:00
|
|
|
if (!progStr) { // new number of programs
|
2017-05-07 14:25:52 -06:00
|
|
|
this.programs.length = Number(payload);
|
2017-05-03 23:00:10 -06:00
|
|
|
} else {
|
|
|
|
const progNum = Number(progStr);
|
2017-05-06 15:39:25 -06:00
|
|
|
let program = this.programs[progNum];
|
2017-05-03 23:00:10 -06:00
|
|
|
if (!program) {
|
2017-10-05 09:07:07 -06:00
|
|
|
this.programs[progNum] = program = new MqttProgram(this, progNum);
|
2017-05-03 23:00:10 -06:00
|
|
|
}
|
|
|
|
(program as MqttProgram).onMessage(subTopic, payload);
|
|
|
|
}
|
2017-05-30 16:45:25 -06:00
|
|
|
return;
|
2017-05-02 20:03:48 -06:00
|
|
|
}
|
2017-06-30 00:09:25 -06:00
|
|
|
matches = topic.match(/^section_runner$/);
|
|
|
|
if (matches != null) {
|
2017-08-29 22:42:56 -06:00
|
|
|
(this.sectionRunner as MqttSectionRunner).onMessage(payload);
|
2017-06-30 00:30:41 -06:00
|
|
|
return;
|
2017-06-30 00:09:25 -06:00
|
|
|
}
|
2017-05-30 16:45:25 -06:00
|
|
|
matches = topic.match(/^responses\/(\d+)$/);
|
|
|
|
if (matches != null) {
|
2017-06-20 08:45:25 -06:00
|
|
|
//noinspection JSUnusedLocalSymbols
|
2017-09-08 09:49:30 -06:00
|
|
|
/* tslint:disable-next-line:no-unused-variable */
|
2017-05-30 16:45:25 -06:00
|
|
|
const [_topic, respIdStr] = matches;
|
2017-09-30 11:04:25 -06:00
|
|
|
log.trace({ response: respIdStr });
|
2017-05-30 16:45:25 -06:00
|
|
|
const respId = parseInt(respIdStr, 10);
|
|
|
|
const data = JSON.parse(payload) as IResponseData;
|
|
|
|
const cb = this.responseCallbacks[respId];
|
|
|
|
if (typeof cb === "function") {
|
|
|
|
cb(data);
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
}
|
2017-09-30 11:04:25 -06:00
|
|
|
log.warn({ topic }, "MqttSprinklersDevice recieved invalid message");
|
2017-05-02 20:03:48 -06:00
|
|
|
}
|
|
|
|
|
2017-10-05 14:27:43 -06:00
|
|
|
runSection(section: s.Section | number, duration: s.Duration) {
|
2017-05-30 16:45:25 -06:00
|
|
|
const sectionNum = checkedIndexOf(section, this.sections, "Section");
|
2017-06-21 19:04:15 -06:00
|
|
|
const payload: IRunSectionJSON = {
|
|
|
|
duration: duration.toSeconds(),
|
|
|
|
};
|
|
|
|
return this.makeRequest(`sections/${sectionNum}/run`, payload);
|
2017-05-30 16:45:25 -06:00
|
|
|
}
|
|
|
|
|
2017-10-05 14:27:43 -06:00
|
|
|
runProgram(program: s.Program | number) {
|
2017-05-30 16:45:25 -06:00
|
|
|
const programNum = checkedIndexOf(program, this.programs, "Program");
|
2017-10-05 14:27:43 -06:00
|
|
|
return this.makeRequest(`programs/${programNum}/run`);
|
2017-05-30 16:45:25 -06:00
|
|
|
}
|
|
|
|
|
2017-06-30 00:09:25 -06:00
|
|
|
cancelSectionRunById(id: number) {
|
2017-08-29 22:42:56 -06:00
|
|
|
return this.makeRequest(`section_runner/cancel_id`, { id });
|
|
|
|
}
|
|
|
|
|
|
|
|
pauseSectionRunner() {
|
|
|
|
return this.makeRequest(`section_runner/pause`);
|
|
|
|
}
|
|
|
|
|
|
|
|
unpauseSectionRunner() {
|
|
|
|
return this.makeRequest(`section_runner/unpause`);
|
2017-06-30 00:09:25 -06:00
|
|
|
}
|
|
|
|
|
2017-06-20 08:45:25 -06:00
|
|
|
//noinspection JSMethodCanBeStatic
|
2017-05-30 16:45:25 -06:00
|
|
|
private nextRequestId(): number {
|
|
|
|
return Math.floor(Math.random() * 1000000000);
|
|
|
|
}
|
|
|
|
|
2017-09-26 14:24:15 -06:00
|
|
|
private makeRequest(topic: string, payload: any = {}): Promise<IResponseData> {
|
2017-05-30 16:45:25 -06:00
|
|
|
return new Promise<IResponseData>((resolve, reject) => {
|
2017-09-26 14:24:15 -06:00
|
|
|
const requestId = payload.rid = this.nextRequestId();
|
|
|
|
const payloadStr = JSON.stringify(payload);
|
|
|
|
const fullTopic = this.prefix + "/" + topic;
|
2017-05-30 16:45:25 -06:00
|
|
|
this.responseCallbacks[requestId] = (data) => {
|
|
|
|
if (data.error != null) {
|
|
|
|
reject(data);
|
|
|
|
} else {
|
|
|
|
resolve(data);
|
|
|
|
}
|
|
|
|
};
|
2017-09-26 14:24:15 -06:00
|
|
|
this.apiClient.client.publish(fullTopic, payloadStr, { qos: 1 });
|
2017-05-30 16:45:25 -06:00
|
|
|
});
|
|
|
|
|
2017-05-27 14:03:13 -06:00
|
|
|
}
|
2017-05-03 16:12:51 -06:00
|
|
|
}
|
|
|
|
|
2017-05-30 16:45:25 -06:00
|
|
|
interface IResponseData {
|
|
|
|
reqTopic: string;
|
|
|
|
error?: string;
|
2017-06-30 01:08:51 -06:00
|
|
|
|
2017-05-30 16:45:25 -06:00
|
|
|
[key: string]: any;
|
|
|
|
}
|
|
|
|
|
2017-08-29 22:42:56 -06:00
|
|
|
type ResponseCallback = (data: IResponseData) => void;
|
2017-05-30 16:45:25 -06:00
|
|
|
|
2017-05-27 14:03:13 -06:00
|
|
|
interface IRunSectionJSON {
|
|
|
|
duration: number;
|
|
|
|
}
|
|
|
|
|
2017-10-05 14:27:43 -06:00
|
|
|
class MqttSection extends s.Section {
|
2017-06-20 08:45:25 -06:00
|
|
|
onMessage(topic: string, payload: string) {
|
2017-05-06 15:39:25 -06:00
|
|
|
if (topic === "state") {
|
|
|
|
this.state = (payload === "true");
|
2017-05-03 16:12:51 -06:00
|
|
|
} else if (topic == null) {
|
2017-10-05 14:27:43 -06:00
|
|
|
this.updateFromJSON(JSON.parse(payload));
|
2017-05-03 16:12:51 -06:00
|
|
|
}
|
|
|
|
}
|
2017-05-07 23:30:36 -06:00
|
|
|
|
2017-10-05 14:27:43 -06:00
|
|
|
updateFromJSON(json: any) {
|
|
|
|
update(schema.sectionSchema, this, json);
|
|
|
|
}
|
2017-05-03 23:00:10 -06:00
|
|
|
}
|
|
|
|
|
2017-10-05 14:27:43 -06:00
|
|
|
class MqttProgram extends s.Program {
|
2017-06-20 08:45:25 -06:00
|
|
|
onMessage(topic: string, payload: string) {
|
2017-05-06 15:39:25 -06:00
|
|
|
if (topic === "running") {
|
|
|
|
this.running = (payload === "true");
|
2017-05-03 23:00:10 -06:00
|
|
|
} else if (topic == null) {
|
2017-10-05 14:27:43 -06:00
|
|
|
this.updateFromJSON(JSON.parse(payload));
|
2017-05-07 23:30:36 -06:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-10-05 14:27:43 -06:00
|
|
|
updateFromJSON(json: any) {
|
|
|
|
update(schema.programSchema, this, json);
|
2017-05-03 23:00:10 -06:00
|
|
|
}
|
2017-05-06 15:39:25 -06:00
|
|
|
}
|
2017-06-30 00:09:25 -06:00
|
|
|
|
2017-10-05 14:27:43 -06:00
|
|
|
class MqttSectionRunner extends s.SectionRunner {
|
2017-08-29 22:42:56 -06:00
|
|
|
onMessage(payload: string) {
|
2017-10-05 14:27:43 -06:00
|
|
|
this.updateFromJSON(JSON.parse(payload));
|
2017-06-30 00:09:25 -06:00
|
|
|
}
|
|
|
|
|
2017-10-05 14:27:43 -06:00
|
|
|
updateFromJSON(json: any) {
|
|
|
|
update(schema.sectionRunnerSchema, this, json);
|
2017-06-30 00:09:25 -06:00
|
|
|
}
|
|
|
|
}
|