Browse Source

Made requests a lot better to match grinklers

update-deps
Alex Mikhalev 7 years ago
parent
commit
8ea82950fa
  1. 34
      app/state/websocket.ts
  2. 10
      common/sprinklers/Program.ts
  3. 6
      common/sprinklers/Section.ts
  4. 21
      common/sprinklers/SectionRunner.ts
  5. 36
      common/sprinklers/SprinklersDevice.ts
  6. 82
      common/sprinklers/mqtt/index.ts
  7. 49
      common/sprinklers/requests.ts
  8. 50
      common/sprinklers/schema/common.ts
  9. 69
      common/sprinklers/schema/index.ts
  10. 65
      common/sprinklers/schema/requests.ts
  11. 8
      common/sprinklers/websocketData.ts
  12. 1
      common/tsconfig.json
  13. 37
      server/index.ts

34
app/state/websocket.ts

@ -2,9 +2,10 @@ import { update } from "serializr"; @@ -2,9 +2,10 @@ import { update } from "serializr";
import logger from "@common/logger";
import * as s from "@common/sprinklers";
import * as requests from "@common/sprinklers/requests";
import * as schema from "@common/sprinklers/schema";
import { seralizeRequest } from "@common/sprinklers/schema/requests";
import * as ws from "@common/sprinklers/websocketData";
import { checkedIndexOf } from "@common/utils";
const log = logger.child({ source: "websocket" });
@ -20,26 +21,8 @@ export class WebSprinklersDevice extends s.SprinklersDevice { @@ -20,26 +21,8 @@ export class WebSprinklersDevice extends s.SprinklersDevice {
return "grinklers";
}
runSection(section: number | s.Section, duration: s.Duration): Promise<{}> {
const secNum = checkedIndexOf(section, this.sections, "Section");
const dur = duration.toSeconds();
return this.makeCall("runSection", secNum, dur);
}
async runProgram(program: number | s.Program): Promise<{}> {
return {};
}
async cancelSectionRunById(id: number): Promise<{}> {
return {};
}
async pauseSectionRunner(): Promise<{}> {
return {};
}
async unpauseSectionRunner(): Promise<{}> {
return {};
}
private makeCall(method: string, ...args: any[]) {
return this.api.makeDeviceCall(this.id, method, ...args);
makeRequest(request: requests.Request): Promise<requests.Response> {
return this.api.makeDeviceCall(this.id, request);
}
}
@ -77,15 +60,16 @@ export class WebApiClient implements s.ISprinklersApi { @@ -77,15 +60,16 @@ export class WebApiClient implements s.ISprinklersApi {
}
// args must all be JSON serializable
makeDeviceCall(deviceName: string, method: string, ...args: any[]): Promise<any> {
makeDeviceCall(deviceName: string, request: requests.Request): Promise<requests.Response> {
const requestData = seralizeRequest(request);
const id = this.nextDeviceRequestId++;
const data: ws.IDeviceCallRequest = {
type: "deviceCallRequest",
id, deviceName, method, args,
id, deviceName, data: requestData,
};
const promise = new Promise((resolve, reject) => {
const promise = new Promise<requests.Response>((resolve, reject) => {
this.deviceResponseCallbacks[id] = (resData) => {
if (resData.result === "success") {
if (resData.data.result === "success") {
resolve(resData.data);
} else {
reject(resData.data);

10
common/sprinklers/Program.ts

@ -31,7 +31,15 @@ export class Program { @@ -31,7 +31,15 @@ export class Program {
}
run() {
return this.device.runProgram(this);
return this.device.runProgram({ programId: this.id });
}
cancel() {
return this.device.cancelProgram({ programId: this.id });
}
update(data: any) {
return this.device.updateProgram({ programId: this.id, data });
}
toString(): string {

6
common/sprinklers/Section.ts

@ -15,7 +15,11 @@ export class Section { @@ -15,7 +15,11 @@ export class Section {
}
run(duration: Duration) {
return this.device.runSection(this, duration);
return this.device.runSection({ sectionId: this.id, duration });
}
cancel() {
return this.device.cancelSection({ sectionId: this.id });
}
toString(): string {

21
common/sprinklers/SectionRunner.ts

@ -3,18 +3,21 @@ import { Duration } from "./Duration"; @@ -3,18 +3,21 @@ import { Duration } from "./Duration";
import { SprinklersDevice } from "./SprinklersDevice";
export class SectionRun {
readonly sectionRunner: SectionRunner;
readonly id: number;
section: number;
duration: Duration;
startTime: Date | null;
pauseTime: Date | null;
duration: Duration = new Duration();
startTime: Date | null = null;
pauseTime: Date | null = null;
constructor(id: number = 0, section: number = 0, duration: Duration = new Duration()) {
constructor(sectionRunner: SectionRunner, id: number = 0, section: number = 0) {
this.sectionRunner = sectionRunner;
this.id = id;
this.section = section;
this.duration = duration;
this.startTime = null;
this.pauseTime = null;
}
cancel() {
return this.sectionRunner.cancelRunById(this.id);
}
toString() {
@ -34,8 +37,8 @@ export class SectionRunner { @@ -34,8 +37,8 @@ export class SectionRunner {
this.device = device;
}
cancelRunById(id: number): Promise<{}> {
return this.device.cancelSectionRunById(id);
cancelRunById(runId: number) {
return this.device.cancelSectionRunId({ runId });
}
toString(): string {

36
common/sprinklers/SprinklersDevice.ts

@ -1,6 +1,6 @@ @@ -1,6 +1,6 @@
import { observable } from "mobx";
import { Duration } from "./Duration";
import { Program } from "./Program";
import * as requests from "./requests";
import { Section } from "./Section";
import { SectionRunner } from "./SectionRunner";
@ -15,11 +15,35 @@ export abstract class SprinklersDevice { @@ -15,11 +15,35 @@ export abstract class SprinklersDevice {
}
abstract get id(): string;
abstract runSection(section: number | Section, duration: Duration): Promise<{}>;
abstract runProgram(program: number | Program): Promise<{}>;
abstract cancelSectionRunById(id: number): Promise<{}>;
abstract pauseSectionRunner(): Promise<{}>;
abstract unpauseSectionRunner(): Promise<{}>;
abstract makeRequest(request: requests.Request): Promise<requests.Response>;
runProgram(opts: requests.WithProgram) {
return this.makeRequest({ ...opts, type: "runProgram" });
}
cancelProgram(opts: requests.WithProgram) {
return this.makeRequest({ ...opts, type: "cancelProgram" });
}
updateProgram(opts: requests.UpdateProgramData): Promise<requests.UpdateProgramResponse> {
return this.makeRequest({ ...opts, type: "updateProgram" }) as Promise<any>;
}
runSection(opts: requests.RunSectionData): Promise<requests.RunSectionResponse> {
return this.makeRequest({ ...opts, type: "runSection" }) as Promise<any>;
}
cancelSection(opts: requests.WithSection) {
return this.makeRequest({ ...opts, type: "cancelSection" });
}
cancelSectionRunId(opts: requests.CancelSectionRunIdData) {
return this.makeRequest({ ...opts, type: "cancelSectionRunId" });
}
pauseSectionRunner(opts: requests.PauseSectionRunnerData) {
return this.makeRequest({ ...opts, type: "pauseSectionRunner" });
}
get sectionConstructor(): typeof Section {
return Section;

82
common/sprinklers/mqtt/index.ts

@ -3,11 +3,16 @@ import { update } from "serializr"; @@ -3,11 +3,16 @@ import { update } from "serializr";
import logger from "@common/logger";
import * as s from "@common/sprinklers";
import * as requests from "@common/sprinklers/requests";
import * as schema from "@common/sprinklers/schema";
import { checkedIndexOf } from "@common/utils";
import { seralizeRequest } from "@common/sprinklers/schema/requests";
const log = logger.child({ source: "mqtt" });
interface WithRid {
rid: number;
}
export class MqttApiClient implements s.ISprinklersApi {
readonly mqttUri: string;
client: mqtt.Client;
@ -97,7 +102,7 @@ const subscriptions = [ @@ -97,7 +102,7 @@ const subscriptions = [
"/sections/+/#",
"/programs",
"/programs/+/#",
"/responses/+",
"/responses",
"/section_runner",
];
@ -163,52 +168,28 @@ class MqttSprinklersDevice extends s.SprinklersDevice { @@ -163,52 +168,28 @@ class MqttSprinklersDevice extends s.SprinklersDevice {
log.warn({ topic }, "MqttSprinklersDevice recieved message on invalid topic");
}
runSection(section: s.Section | number, duration: s.Duration) {
const sectionNum = checkedIndexOf(section, this.sections, "Section");
const payload: IRunSectionJSON = {
duration: duration.toSeconds(),
};
return this.makeRequest(`sections/${sectionNum}/run`, payload);
}
runProgram(program: s.Program | number) {
const programNum = checkedIndexOf(program, this.programs, "Program");
return this.makeRequest(`programs/${programNum}/run`);
}
cancelSectionRunById(id: number) {
return this.makeRequest(`section_runner/cancel_id`, { id });
}
pauseSectionRunner() {
return this.makeRequest(`section_runner/pause`);
}
unpauseSectionRunner() {
return this.makeRequest(`section_runner/unpause`);
}
private getRequestId(): number {
return this.nextRequestId++;
}
private makeRequest(topic: string, payload: any = {}): Promise<IResponseData> {
return new Promise<IResponseData>((resolve, reject) => {
const requestId = payload.rid = this.getRequestId();
const payloadStr = JSON.stringify(payload);
const fullTopic = this.prefix + "/" + topic;
makeRequest(request: requests.Request): Promise<requests.Response> {
return new Promise<requests.Response>((resolve, reject) => {
const topic = this.prefix + "/requests";
const json = seralizeRequest(request);
const requestId = json.rid = this.getRequestId();
const payloadStr = JSON.stringify(json);
this.responseCallbacks.set(requestId, (data) => {
if (data.error != null) {
if (data.result === "error") {
reject(data);
} else {
resolve(data);
}
this.responseCallbacks.delete(requestId);
});
this.apiClient.client.publish(fullTopic, payloadStr, { qos: 1 });
this.apiClient.client.publish(topic, payloadStr, { qos: 1 });
});
}
private getRequestId(): number {
return this.nextRequestId++;
}
/* tslint:disable:no-unused-variable */
@handler(/^connected$/)
private handleConnected(payload: string) {
@ -252,31 +233,20 @@ class MqttSprinklersDevice extends s.SprinklersDevice { @@ -252,31 +233,20 @@ class MqttSprinklersDevice extends s.SprinklersDevice {
(this.sectionRunner as MqttSectionRunner).onMessage(payload);
}
@handler(/^responses\/(\d+)$/)
private handleResponse(payload: string, responseIdStr: string) {
log.trace({ response: responseIdStr }, "handling request response");
const respId = parseInt(responseIdStr, 10);
const data = JSON.parse(payload) as IResponseData;
const cb = this.responseCallbacks.get(respId);
@handler(/^responses$/)
private handleResponse(payload: string) {
const data = JSON.parse(payload) as requests.Response & WithRid;
log.trace({ rid: data.rid }, "handling request response");
const cb = this.responseCallbacks.get(data.rid);
if (typeof cb === "function") {
delete data.rid;
cb(data);
}
}
/* tslint:enable:no-unused-variable */
}
interface IResponseData {
reqTopic: string;
error?: string;
[key: string]: any;
}
type ResponseCallback = (data: IResponseData) => void;
interface IRunSectionJSON {
duration: number;
}
type ResponseCallback = (response: requests.Response) => void;
class MqttSection extends s.Section {
onMessage(payload: string, topic: string | undefined) {

49
common/sprinklers/requests.ts

@ -0,0 +1,49 @@ @@ -0,0 +1,49 @@
import { Duration } from "./Duration";
export interface WithType<Type extends string = string> {
type: Type;
}
export interface WithProgram { programId: number; }
export type RunProgramRequest = WithProgram & WithType<"runProgram">;
export type CancelProgramRequest = WithProgram & WithType<"cancelProgram">;
export type UpdateProgramData = WithProgram & { data: any };
export type UpdateProgramRequest = UpdateProgramData & WithType<"updateProgram">;
export type UpdateProgramResponse = Response<"updateProgram", { data: any }>;
export interface WithSection { sectionId: number; }
export type RunSectionData = WithSection & { duration: Duration };
export type RunSectionReqeust = RunSectionData & WithType<"runSection">;
export type RunSectionResponse = Response<"runSection", { runId: number }>;
export type CancelSectionRequest = WithSection & WithType<"cancelSection">;
export interface CancelSectionRunIdData { runId: number; }
export type CancelSectionRunIdRequest = CancelSectionRunIdData & WithType<"cancelSectionRunId">;
export interface PauseSectionRunnerData { paused: boolean; }
export type PauseSectionRunnerRequest = PauseSectionRunnerData & WithType<"pauseSectionRunner">;
export type Request = RunProgramRequest | CancelProgramRequest | UpdateProgramRequest |
RunSectionReqeust | CancelSectionRequest | CancelSectionRunIdRequest | PauseSectionRunnerRequest;
export type RequestType = Request["type"];
export interface SuccessResponseData<Type extends string = string> extends WithType<Type> {
result: "success";
message: string;
}
export interface ErrorResponseData<Type extends string = string> extends WithType<Type> {
result: "error";
error: string;
offset?: number;
code?: number;
}
export type Response<Type extends string = string, Res = {}> =
(SuccessResponseData<Type> & Res) |
(ErrorResponseData<Type>);

50
common/sprinklers/schema/common.ts

@ -0,0 +1,50 @@ @@ -0,0 +1,50 @@
import {
ModelSchema, primitive, PropSchema,
} from "serializr";
import * as s from "..";
export const duration: PropSchema = {
serializer: (d: s.Duration | null) =>
d != null ? d.toSeconds() : null,
deserializer: (json: any, done) => {
if (typeof json === "number") {
done(null, s.Duration.fromSeconds(json));
} else {
done(new Error(`Duration expects a number, not ${json}`), undefined);
}
},
};
export const date: PropSchema = {
serializer: (jsDate: Date | null) => jsDate != null ?
jsDate.toISOString() : null,
deserializer: (json: any, done) => {
if (json === null) {
return done(null, null);
}
try {
done(null, new Date(json));
} catch (e) {
done(e, undefined);
}
},
};
export const dateOfYear: ModelSchema<s.DateOfYear> = {
factory: () => new s.DateOfYear(),
props: {
year: primitive(),
month: primitive(), // this only works if it is represented as a # from 0-12
day: primitive(),
},
};
export const timeOfDay: ModelSchema<s.TimeOfDay> = {
factory: () => new s.TimeOfDay(),
props: {
hour: primitive(),
minute: primitive(),
second: primitive(),
millisecond: primitive(),
},
};

69
common/sprinklers/schema/index.ts

@ -1,60 +1,20 @@ @@ -1,60 +1,20 @@
/* tslint:disable:ordered-imports object-literal-shorthand */
import {
createSimpleSchema, primitive, object, ModelSchema, PropSchema,
createSimpleSchema, ModelSchema, object, primitive,
} from "serializr";
import list from "./list";
import * as s from "..";
import list from "./list";
export const duration: PropSchema = {
serializer: (d: s.Duration | null) =>
d != null ? d.toSeconds() : null,
deserializer: (json: any, done) => {
if (typeof json === "number") {
done(null, s.Duration.fromSeconds(json));
} else {
done(new Error(`Duration expects a number, not ${json}`), undefined);
}
},
};
export const date: PropSchema = {
serializer: (jsDate: Date | null) => jsDate != null ?
jsDate.toISOString() : null,
deserializer: (json: any, done) => {
if (json === null) {
return done(null, null);
}
try {
done(null, new Date(json));
} catch (e) {
done(e, undefined);
}
},
};
export const dateOfYear: ModelSchema<s.DateOfYear> = {
factory: () => new s.DateOfYear(),
props: {
year: primitive(),
month: primitive(), // this only works if it is represented as a # from 0-12
day: primitive(),
},
};
import * as requests from "./requests";
export { requests };
export const timeOfDay: ModelSchema<s.TimeOfDay> = {
factory: () => new s.TimeOfDay(),
props: {
hour: primitive(),
minute: primitive(),
second: primitive(),
millisecond: primitive(),
},
};
import * as common from "./common";
export * from "./common";
export const section: ModelSchema<s.Section> = {
factory: (c) => new (c.parentContext.target as s.SprinklersDevice).sectionConstructor(
c.parentContext.target, c.json.id),
props: {
id: primitive(),
name: primitive(),
state: primitive(),
},
@ -65,9 +25,9 @@ export const sectionRun: ModelSchema<s.SectionRun> = { @@ -65,9 +25,9 @@ export const sectionRun: ModelSchema<s.SectionRun> = {
props: {
id: primitive(),
section: primitive(),
duration: duration,
startTime: date,
endTime: date,
duration: common.duration,
startTime: common.date,
endTime: common.date,
},
};
@ -84,10 +44,10 @@ export const sectionRunner: ModelSchema<s.SectionRunner> = { @@ -84,10 +44,10 @@ export const sectionRunner: ModelSchema<s.SectionRunner> = {
export const schedule: ModelSchema<s.Schedule> = {
factory: () => new s.Schedule(),
props: {
times: list(object(timeOfDay)),
times: list(object(common.timeOfDay)),
weekdays: list(primitive()),
from: object(dateOfYear),
to: object(dateOfYear),
from: object(common.dateOfYear),
to: object(common.dateOfYear),
},
};
@ -95,7 +55,7 @@ export const programItem: ModelSchema<s.ProgramItem> = { @@ -95,7 +55,7 @@ export const programItem: ModelSchema<s.ProgramItem> = {
factory: () => new s.ProgramItem(),
props: {
section: primitive(),
duration: duration,
duration: common.duration,
},
};
@ -103,6 +63,7 @@ export const program: ModelSchema<s.Program> = { @@ -103,6 +63,7 @@ export const program: ModelSchema<s.Program> = {
factory: (c) => new (c.parentContext.target as s.SprinklersDevice).programConstructor(
c.parentContext.target, c.json.id),
props: {
id: primitive(),
name: primitive(),
enabled: primitive(),
schedule: object(schedule),

65
common/sprinklers/schema/requests.ts

@ -0,0 +1,65 @@ @@ -0,0 +1,65 @@
import { createSimpleSchema, deserialize, ModelSchema, object, primitive, serialize } from "serializr";
import * as requests from "../requests";
import * as common from "./common";
export const withType: ModelSchema<requests.WithType> = createSimpleSchema({
type: primitive(),
});
export const withProgram: ModelSchema<requests.WithProgram> = createSimpleSchema({
...withType.props,
programId: primitive(),
});
export const withSection: ModelSchema<requests.WithSection> = createSimpleSchema({
...withType.props,
sectionId: primitive(),
});
export const updateProgramData: ModelSchema<requests.UpdateProgramData> = createSimpleSchema({
...withProgram.props,
data: object(createSimpleSchema({ "*": true })),
});
export const runSection: ModelSchema<requests.RunSectionData> = createSimpleSchema({
...withSection.props,
duration: common.duration,
});
export const cancelSectionRunId: ModelSchema<requests.CancelSectionRunIdData> = createSimpleSchema({
...withType.props,
runId: primitive(),
});
export const pauseSectionRunner: ModelSchema<requests.PauseSectionRunnerData> = createSimpleSchema({
...withType.props,
paused: primitive(),
});
export function getRequestSchema(request: requests.WithType): ModelSchema<any> {
switch (request.type as requests.RequestType) {
case "runProgram":
case "cancelProgram":
return withProgram;
case "updateProgram":
throw new Error("updateProgram not implemented");
case "runSection":
return runSection;
case "cancelSection":
return withSection;
case "cancelSectionRunId":
return cancelSectionRunId;
case "pauseSectionRunner":
return pauseSectionRunner;
default:
throw new Error(`Cannot serialize request with type "${request.type}"`);
}
}
export function seralizeRequest(request: requests.Request): any {
return serialize(getRequestSchema(request), request);
}
export function deserializeRequest(json: any): requests.Request {
return deserialize(getRequestSchema(json), json);
}

8
common/sprinklers/websocketData.ts

@ -1,3 +1,5 @@ @@ -1,3 +1,5 @@
import { Response as ResponseData } from "@common/sprinklers/requests";
export interface IDeviceUpdate {
type: "deviceUpdate";
name: string;
@ -7,8 +9,7 @@ export interface IDeviceUpdate { @@ -7,8 +9,7 @@ export interface IDeviceUpdate {
export interface IDeviceCallResponse {
type: "deviceCallResponse";
id: number;
result: "success" | "error";
data: any;
data: ResponseData;
}
export type IServerMessage = IDeviceUpdate | IDeviceCallResponse;
@ -17,8 +18,7 @@ export interface IDeviceCallRequest { @@ -17,8 +18,7 @@ export interface IDeviceCallRequest {
type: "deviceCallRequest";
id: number;
deviceName: string;
method: string;
args: any[];
data: any;
}
export type IClientMessage = IDeviceCallRequest;

1
common/tsconfig.json

@ -8,6 +8,7 @@ @@ -8,6 +8,7 @@
"types": [
"node"
],
"module": "commonjs",
"strict": true,
"allowJs": true,
"baseUrl": "..",

37
server/index.ts

@ -12,8 +12,8 @@ import app from "./app"; @@ -12,8 +12,8 @@ import app from "./app";
const mqttClient = new mqtt.MqttApiClient("mqtt://localhost:1883");
mqttClient.start();
import * as s from "@common/sprinklers";
import * as schema from "@common/sprinklers/schema";
import * as requests from "@common/sprinklers/requests";
import * as ws from "@common/sprinklers/websocketData";
import { autorunAsync } from "mobx";
import { serialize } from "serializr";
@ -24,40 +24,31 @@ app.get("/api/grinklers", (req, res) => { @@ -24,40 +24,31 @@ app.get("/api/grinklers", (req, res) => {
res.send(j);
});
async function doDeviceCallRequest(data: ws.IDeviceCallRequest): Promise<any> {
const { deviceName, method, args } = data;
async function doDeviceCallRequest(requestData: ws.IDeviceCallRequest) {
const { deviceName, data } = requestData;
if (deviceName !== "grinklers") {
// error handling? or just get the right device
return;
}
switch (method) {
case "runSection":
return device.runSection(args[0], s.Duration.fromSeconds(args[1]));
default:
// new Error(`unsupported device call: ${data.method}`) // TODO: error handling?
return;
return false;
}
const request = schema.requests.deserializeRequest(data);
return device.makeRequest(request);
}
async function deviceCallRequest(socket: WebSocket, data: ws.IDeviceCallRequest): Promise<void> {
let resData: ws.IDeviceCallResponse;
let response: requests.Response | false;
try {
const result = await doDeviceCallRequest(data);
resData = {
type: "deviceCallResponse",
id: data.id,
result: "success",
data: result,
};
response = await doDeviceCallRequest(data);
} catch (err) {
resData = {
response = err;
}
if (response) {
const resData: ws.IDeviceCallResponse = {
type: "deviceCallResponse",
id: data.id,
result: "error",
data: err,
data: response,
};
socket.send(JSON.stringify(resData));
}
socket.send(JSON.stringify(resData));
}
function webSocketHandler(socket: WebSocket) {

Loading…
Cancel
Save