#!/usr/bin/env python3 import serial from threading import Thread import time from e32 import E32 import messages_pb2 as messages from google.protobuf.message import Message class UGVComms(E32): def __init__(self, serial_port: serial.Serial): E32.__init__(self, serial_port) def write_len_delimited(self, data: bytes): len_data = (len(data)).to_bytes( 1, byteorder='big') # TODO: check byte order self.ser.write(len_data) self.ser.write(data) def write_message(self, msg: Message): data = msg.SerializeToString() self.write_len_delimited(data) def read_message(self): len_data = self.ser.read(size=1) if len(len_data) != 1: return None msg_len = int.from_bytes(len_data, byteorder='big') data = self.ser.read(size=msg_len) if len(data) != msg_len: print("read bad data: ", data) self.ser.flush() return None msg = messages.UGV_Message() msg.ParseFromString(data) return msg def __rx_thread_entry(ugv: UGVComms): while ugv.ser.is_open: try: msg = ugv.read_message() if msg is not None: print("received UGV message: ", msg) except Exception as e: print("error reading message: ", e) continue if __name__ == "__main__": ser = serial.serial_for_url("/dev/ttyUSB1", baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=2.0) ugv = UGVComms(ser) rx_thread = Thread(target=__rx_thread_entry, args=(ugv, ), daemon=True) rx_thread.start() # print("resetting") # ugv.reset() cmd_id = 1 time.sleep(0.2) try: while True: gmsg = messages.GroundMessage() gmsg.command.id = cmd_id gmsg.command.type = messages.DISABLE cmd_id += 1 print("writing message: ", gmsg) ugv.write_message(gmsg) time.sleep(2.) except KeyboardInterrupt: print("exiting...") finally: ugv.ser.close() rx_thread.join()