#!/usr/bin/env python3 import serial import struct PARITY_NONE = 0 PARITY_ODD = 1 PARITY_EVEN = 2 BAUD_TABLE = { 0: 1200, 1: 2400, 2: 4800, 3: 9600, 4: 19200, 5: 38400, 6: 57600, 7: 115200, } AIR_DATA_RATE_TABLE = { 0: 300, 1: 1200, 2: 2400, 3: 4800, 4: 9600, 5: 19200, 6: 19200, 7: 19200, } TX_MODE_TRANSPARENT = 0 TX_MODE_FIXED = 1 IO_MODE_OPEN_COLLECTOR = 0 IO_MODE_PUSH_PULL = 1 TX_POWER_TABLE = { 0: 30, 1: 27, 2: 24, 3: 21, } def least_gte(value, dic): items = sorted(dic.items(), key=lambda item: item[1]) last_k = None for k, v in items: last_k = k if v >= value: break return last_k class E32_Params: save: bool address: int parity: int baud: int air_data_rate: int channel: int tx_mode: int io_mode: int wake_up_time: int fec_enabled: bool tx_power: int @staticmethod def default(): p = E32_Params() p.save = True p.address = 0 p.parity = PARITY_NONE p.baud = 9600 p.air_data_rate = 2400 p.channel = 0x17 p.tx_mode = TX_MODE_TRANSPARENT p.io_mode = IO_MODE_PUSH_PULL p.wake_up_time = 250 p.fec_enabled = True p.tx_power = 30 return p @staticmethod def unpack(data): p = E32_Params() datab = bytes(data) if len(datab) != 6: raise Exception('invalid E32_Params data length') if datab[0] == 0xC0: p.save = True elif datab[0] == 0xC2: p.save = False else: raise Exception('invalid E32_Params data header') p.address = (datab[1] << 8) | datab[2] p.parity = (datab[3] >> 6) & 0b11 p.baud = BAUD_TABLE[(datab[3] >> 3) & 0b111] p.air_data_rate = AIR_DATA_RATE_TABLE[datab[3] & 0b111] p.channel = datab[4] p.tx_mode = (datab[5] >> 7) & 1 p.io_mode = (datab[5] >> 6) & 1 p.wake_up_time = 250 * (((datab[5] >> 3) & 0b111) + 1) p.fec_enabled = (datab[5] & (1 << 2)) != 0 p.tx_power = TX_POWER_TABLE[datab[5] & 0b11] return p def pack(self): p = self datab = bytearray(range(6)) if p.save: datab[0] = 0xC0 else: datab[0] = 0xC2 datab[1] = (p.address >> 8) & 0xFF datab[2] = (p.address) & 0xFF datab[3] = 0 datab[3] |= p.parity << 6 datab[3] |= (least_gte(p.baud, BAUD_TABLE) << 3) datab[3] |= (least_gte(p.air_data_rate, AIR_DATA_RATE_TABLE)) datab[4] = p.channel datab[5] = 0 datab[5] |= p.tx_mode << 7 datab[5] |= p.io_mode << 6 datab[5] |= (int((p.wake_up_time / 250) - 1) & 0b111) << 3 datab[5] |= p.fec_enabled << 2 datab[5] |= least_gte(p.tx_power, TX_POWER_TABLE) return datab class E32: ser: serial.Serial def __init__(self, serial_port: serial.Serial): self.ser = serial_port def close(self): self.ser.close() def read_version(self): # self.ser.flush() self.ser.write(b'\xC3\xC3\xC3') version = self.ser.read(size=4) print("version: ", version) return version def reset(self): # self.ser.flush() print("writing: ", b'\xC4\xC4\xC4') self.ser.write(b'0xC40xC40xC4') def read_params(self): # self.ser.flush() self.ser.write(b'\xC1\xC1\xC1') param_bytes = self.ser.read(size=6) print("param_bytes: ", param_bytes) return E32_Params.unpack(param_bytes) if __name__ == "__main__": p = E32_Params.default() print("params: ", p.__dict__) data = p.pack() print("packed data: ", ', '.join(format(x, '02x') for x in data)) p2 = E32_Params.unpack(data) print("unpacked params: ", p2.__dict__)