|
|
|
#!/usr/bin/env python3
|
|
|
|
|
|
|
|
import serial
|
|
|
|
import struct
|
|
|
|
|
|
|
|
PARITY_NONE = 0
|
|
|
|
PARITY_ODD = 1
|
|
|
|
PARITY_EVEN = 2
|
|
|
|
|
|
|
|
BAUD_TABLE = {
|
|
|
|
0: 1200,
|
|
|
|
1: 2400,
|
|
|
|
2: 4800,
|
|
|
|
3: 9600,
|
|
|
|
4: 19200,
|
|
|
|
5: 38400,
|
|
|
|
6: 57600,
|
|
|
|
7: 115200,
|
|
|
|
}
|
|
|
|
|
|
|
|
AIR_DATA_RATE_TABLE = {
|
|
|
|
0: 300,
|
|
|
|
1: 1200,
|
|
|
|
2: 2400,
|
|
|
|
3: 4800,
|
|
|
|
4: 9600,
|
|
|
|
5: 19200,
|
|
|
|
6: 19200,
|
|
|
|
7: 19200,
|
|
|
|
}
|
|
|
|
|
|
|
|
TX_MODE_TRANSPARENT = 0
|
|
|
|
TX_MODE_FIXED = 1
|
|
|
|
|
|
|
|
IO_MODE_OPEN_COLLECTOR = 0
|
|
|
|
IO_MODE_PUSH_PULL = 1
|
|
|
|
|
|
|
|
TX_POWER_TABLE = {
|
|
|
|
0: 30,
|
|
|
|
1: 27,
|
|
|
|
2: 24,
|
|
|
|
3: 21,
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def __least_gte(value, dic):
|
|
|
|
items = sorted(dic.items(), key=lambda item: item[1])
|
|
|
|
last_k = None
|
|
|
|
for k, v in items:
|
|
|
|
last_k = k
|
|
|
|
if v >= value:
|
|
|
|
break
|
|
|
|
return last_k
|
|
|
|
|
|
|
|
|
|
|
|
class E32_Params:
|
|
|
|
save: bool
|
|
|
|
address: int
|
|
|
|
parity: int
|
|
|
|
baud: int
|
|
|
|
air_data_rate: int
|
|
|
|
channel: int
|
|
|
|
tx_mode: int
|
|
|
|
io_mode: int
|
|
|
|
wake_up_time: int
|
|
|
|
fec_enabled: bool
|
|
|
|
tx_power: int
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def default():
|
|
|
|
p = E32_Params()
|
|
|
|
p.save = True
|
|
|
|
p.address = 0
|
|
|
|
p.parity = PARITY_NONE
|
|
|
|
p.baud = 9600
|
|
|
|
p.air_data_rate = 2400
|
|
|
|
p.channel = 0x17
|
|
|
|
p.tx_mode = TX_MODE_TRANSPARENT
|
|
|
|
p.io_mode = IO_MODE_PUSH_PULL
|
|
|
|
p.wake_up_time = 250
|
|
|
|
p.fec_enabled = True
|
|
|
|
p.tx_power = 30
|
|
|
|
return p
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def unpack(data):
|
|
|
|
p = E32_Params()
|
|
|
|
datab = bytes(data)
|
|
|
|
if len(datab) != 6:
|
|
|
|
raise Exception('invalid E32_Params data length')
|
|
|
|
if datab[0] == 0xC0:
|
|
|
|
p.save = True
|
|
|
|
elif datab[0] == 0xC2:
|
|
|
|
p.save = False
|
|
|
|
else:
|
|
|
|
raise Exception('invalid E32_Params data header')
|
|
|
|
p.address = (datab[1] << 8) | datab[2]
|
|
|
|
p.parity = (datab[3] >> 6) & 0b11
|
|
|
|
p.baud = BAUD_TABLE[(datab[3] >> 3) & 0b111]
|
|
|
|
p.air_data_rate = AIR_DATA_RATE_TABLE[datab[3] & 0b111]
|
|
|
|
p.channel = datab[4]
|
|
|
|
p.tx_mode = (datab[5] >> 7) & 1
|
|
|
|
p.io_mode = (datab[5] >> 6) & 1
|
|
|
|
p.wake_up_time = 250 * (((datab[5] >> 3) & 0b111) + 1)
|
|
|
|
p.fec_enabled = (datab[5] & (1 << 2)) != 0
|
|
|
|
p.tx_power = TX_POWER_TABLE[datab[5] & 0b11]
|
|
|
|
return p
|
|
|
|
|
|
|
|
def pack(self):
|
|
|
|
p = self
|
|
|
|
datab = bytearray(range(6))
|
|
|
|
if p.save:
|
|
|
|
datab[0] = 0xC0
|
|
|
|
else:
|
|
|
|
datab[0] = 0xC2
|
|
|
|
datab[1] = (p.address >> 8) & 0xFF
|
|
|
|
datab[2] = (p.address) & 0xFF
|
|
|
|
datab[3] = 0
|
|
|
|
datab[3] |= p.parity << 6
|
|
|
|
datab[3] |= (__least_gte(p.baud, BAUD_TABLE) << 3)
|
|
|
|
datab[3] |= (__least_gte(p.air_data_rate, AIR_DATA_RATE_TABLE))
|
|
|
|
datab[4] = p.channel
|
|
|
|
datab[5] = 0
|
|
|
|
datab[5] |= p.tx_mode << 7
|
|
|
|
datab[5] |= p.io_mode << 6
|
|
|
|
datab[5] |= (int((p.wake_up_time / 250) - 1) & 0b111) << 3
|
|
|
|
datab[5] |= p.fec_enabled << 2
|
|
|
|
datab[5] |= __least_gte(p.tx_power, TX_POWER_TABLE)
|
|
|
|
return datab
|
|
|
|
|
|
|
|
|
|
|
|
class E32:
|
|
|
|
ser: serial.Serial
|
|
|
|
|
|
|
|
def __init__(self, serial_port: serial.Serial):
|
|
|
|
self.ser = serial_port
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
self.ser.close()
|
|
|
|
|
|
|
|
def read_version(self):
|
|
|
|
# self.ser.flush()
|
|
|
|
self.ser.write(b'\xC3\xC3\xC3')
|
|
|
|
version = self.ser.read(size=4)
|
|
|
|
print("version: ", version)
|
|
|
|
return version
|
|
|
|
|
|
|
|
def reset(self):
|
|
|
|
# self.ser.flush()
|
|
|
|
print("writing: ", b'\xC4\xC4\xC4')
|
|
|
|
self.ser.write(b'0xC40xC40xC4')
|
|
|
|
|
|
|
|
def read_params(self):
|
|
|
|
# self.ser.flush()
|
|
|
|
self.ser.write(b'\xC1\xC1\xC1')
|
|
|
|
param_bytes = self.ser.read(size=6)
|
|
|
|
print("param_bytes: ", param_bytes)
|
|
|
|
return E32_Params.unpack(param_bytes)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
p = E32_Params.default()
|
|
|
|
print("params: ", p.__dict__)
|
|
|
|
data = p.pack()
|
|
|
|
print("packed data: ", ', '.join(format(x, '02x') for x in data))
|
|
|
|
p2 = E32_Params.unpack(data)
|
|
|
|
print("unpacked params: ", p2.__dict__)
|