#!/usr/bin/env python3 import sys import serial import threading from threading import Thread import time from e32 import E32 import messages_pb2 as messages from google.protobuf.message import Message class UGVComms(E32): MAX_WRITE_RETRY=5 RETRY_TIME=1.0 def __init__(self, serial_port: serial.Serial): E32.__init__(self, serial_port) self.msg_acks = [] self.ack_cv = threading.Condition() self.next_command_id = 1 self.last_status = None self.rx_thread = None def write_len_delimited(self, data: bytes): len_data = (len(data)).to_bytes( 1, byteorder='big') # TODO: check byte order self.ser.write(len_data) self.ser.write(data) def write_message(self, msg: Message): print("writing message: ", msg) data = msg.SerializeToString() self.write_len_delimited(data) def write_command(self, cmd_type: messages.GroundCommandType, retry=True): cmdid = self.next_command_id self.next_command_id += 1 gmsg = messages.GroundMessage() gmsg.command.id = cmdid gmsg.command.type = cmd_type self.write_message(gmsg) last_write_time = time.time() if not retry: return with self.ack_cv: while True: if cmdid in self.msg_acks: self.msg_acks.remove(cmdid) print("received ack for command") return time_left = time.time() - last_write_time if time_left >= self.RETRY_TIME: print("retry writing command") self.write_message(gmsg) last_write_time = time.time() self.ack_cv.wait(timeout=time_left) def read_message(self): len_data = self.ser.read(size=1) if len(len_data) != 1: return None msg_len = int.from_bytes(len_data, byteorder='big') data = self.ser.read(size=msg_len) if len(data) != msg_len: print("read bad data: ", data) self.ser.flush() return None msg = messages.UGV_Message() msg.ParseFromString(data) return msg def process_message(self, msg: messages.UGV_Message): if msg is None: return print("received UGV message: ", msg) if msg.HasField("command_ack"): with self.ack_cv: self.msg_acks.append(msg.command_ack) self.ack_cv.notify() elif msg.HasField("status"): self.last_status = msg.status def start(self): self.rx_thread = Thread(target=self.__rx_thread_entry, daemon=True) self.rx_thread.start() def stop(self): self.rx_thread.join() def __rx_thread_entry(self): while self.ser.is_open: try: msg = self.read_message() self.process_message(msg) except serial.SerialException as e: print("serial error: ", e, file=sys.stderr) return except Exception as e: print("error reading message: ", e, file=sys.stderr) continue def main(): ser = serial.serial_for_url("/dev/ttyUSB2", baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=2.0) ugv = UGVComms(ser) ugv.start() time.sleep(0.2) try: while True: if ugv.last_status is None or ugv.last_status.state is not messages.STATE_TEST: ugv.write_command(messages.CMD_TEST) time.sleep(2.) except KeyboardInterrupt: ugv.write_command(messages.CMD_DISABLE) print("exiting...") finally: ugv.ser.flush() ugv.ser.close() ugv.stop() if __name__ == "__main__": main()