#!/usr/bin/env python3 import sys import serial import threading from threading import Thread import time import binascii from base64 import b64decode, b64encode from e32 import E32 import messages_pb2 as messages from google.protobuf.message import Message class UGVComms(E32): MAX_WRITE_RETRY = 5 RETRY_TIME = 1.0 def __init__(self, serial_port: serial.Serial): E32.__init__(self, serial_port) self.msg_acks = [] self.ack_cv = threading.Condition() self.next_command_id = 1 self.last_status = None self.rx_thread = None def write_base64(self, data: bytes): encoded = b64encode(data) self.ser.write(encoded) self.ser.write(b'\n') def write_message(self, msg: Message): print("writing message: ", msg) data = msg.SerializeToString() self.write_base64(data) def write_command(self, cmd_type: messages.GroundCommandType, retry=True): cmdid = self.next_command_id self.next_command_id += 1 gmsg = messages.GroundMessage() gmsg.command.id = cmdid gmsg.command.type = cmd_type self.write_message(gmsg) last_write_time = time.time() if not retry: return with self.ack_cv: while True: if cmdid in self.msg_acks: self.msg_acks.remove(cmdid) print("received ack for command") return time_left = time.time() - last_write_time if time_left >= self.RETRY_TIME: print("retry writing command") self.write_message(gmsg) last_write_time = time.time() self.ack_cv.wait(timeout=time_left) def read_message(self): data = self.ser.read_until(terminator=b'\n') if len(data) is 0: return None try: decoded = b64decode(data, validate=True) except binascii.Error: print("read bad data: ", data) self.ser.flush() return None msg = messages.UGV_Message() msg.ParseFromString(decoded) return msg def process_message(self, msg: messages.UGV_Message): if msg is None: return print("received UGV message: ", msg) if msg.HasField("command_ack"): with self.ack_cv: self.msg_acks.append(msg.command_ack) self.ack_cv.notify() elif msg.HasField("status"): self.last_status = msg.status def start(self): self.rx_thread = Thread(target=self.__rx_thread_entry, daemon=True) self.rx_thread.start() def stop(self): self.rx_thread.join() def __rx_thread_entry(self): while self.ser.is_open: try: msg = self.read_message() self.process_message(msg) except serial.SerialException as e: print("serial error: ", e, file=sys.stderr) return except Exception as e: print("error reading message: ", e, file=sys.stderr) continue def main(): if len(sys.argv) >= 2: ser_url = sys.argv[1] else: ser_url = "hwgrep://" ser = serial.serial_for_url(ser_url, baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=0.5) ugv = UGVComms(ser) ugv.start() time.sleep(0.2) try: while True: if ugv.last_status is None or ugv.last_status.state is not messages.STATE_TEST: ugv.write_command(messages.CMD_TEST) time.sleep(2.) except KeyboardInterrupt: ugv.write_command(messages.CMD_DISABLE) print("exiting...") finally: ugv.ser.flush() ugv.ser.close() ugv.stop() if __name__ == "__main__": main()