#!/usr/bin/env python3 import logging import sys import serial import threading from threading import Thread import time import binascii from base64 import b64decode, b64encode import messages_pb2 as messages from google.protobuf.message import Message log = logging.getLogger("ugv") class UGVComms: MAX_WRITE_RETRY = 5 RETRY_TIME = 1.5 def __init__(self, serial_port: serial.Serial, on_msg_received=None): self.ser = serial_port self.on_msg_received = on_msg_received self.msg_acks = [] self.ack_cv = threading.Condition() self.next_command_id = 1 self.last_status = None self.last_status_time = None self.rx_thread = None self.is_running = False def write_base64(self, data: bytes): encoded = b64encode(data) self.ser.write(encoded) self.ser.write(b'\n') def write_message(self, msg: Message): log.debug("writing message: %s", msg) data = msg.SerializeToString() self.write_base64(data) def write_command(self, command, retry=True): cmdid = self.next_command_id self.next_command_id += 1 gmsg = messages.GroundMessage() if type(command) is int: gmsg.command.type = command else: gmsg.command.CopyFrom(command) gmsg.command.id = cmdid self.write_message(gmsg) last_write_time = time.time() if not retry: return tries = UGVComms.MAX_WRITE_RETRY with self.ack_cv: while tries > 0: if cmdid in self.msg_acks: self.msg_acks.remove(cmdid) log.debug("received ack for command") return time_left = time.time() - last_write_time if time_left >= self.RETRY_TIME: log.warning("retry writing command") self.write_message(gmsg) last_write_time = time.time() tries -= 1 self.ack_cv.wait(timeout=time_left) raise TimeoutError("Timeout waiting for command ack") def read_message(self): data = self.ser.read_until(terminator=b'\n') if len(data) is 0: return None try: decoded = b64decode(data, validate=True) except binascii.Error: log.warning("read bad data: %s", data) self.ser.flush() return None msg = messages.UGV_Message() msg.ParseFromString(decoded) return msg def process_message(self, msg: messages.UGV_Message): if msg is None: return log.debug("received UGV message: %s", msg) if self.on_msg_received: self.on_msg_received(msg) if msg.HasField("command_ack"): with self.ack_cv: self.msg_acks.append(msg.command_ack) self.ack_cv.notify() elif msg.HasField("status"): self.last_status = msg.status self.last_status_time = time.time() else: log.warn("unknown UGV message: %s", msg) def start(self): if self.is_running: log.warning("RX thread already running") return False self.is_running = True self.rx_thread = Thread(target=self.__rx_thread_entry, daemon=True) self.rx_thread.start() log.debug("started RX thread") return True def stop(self): if not self.is_running: return False self.is_running = False self.ser.close() self.rx_thread.join() return True def __rx_thread_entry(self): while self.is_running and self.ser.is_open: try: msg = self.read_message() self.process_message(msg) except serial.SerialException: if not self.ser.is_open or not self.is_running: # port was probably just closed return log.error("serial error", exc_info=True) return except Exception: log.error("error reading message", exc_info=True) continue def main(): if len(sys.argv) >= 2: ser_url = sys.argv[1] else: ser_url = "hwgrep://" ser = serial.serial_for_url(ser_url, baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=0.5) ugv = UGVComms(ser) ugv.start() time.sleep(0.2) try: cmd = messages.GroundCommand() cmd.type = messages.CMD_SET_TARGET cmd.target_location.latitude = 34.068415 cmd.target_location.longitude = -118.443217 # ugv.write_command(cmd) cmd.type = messages.CMD_SET_CONFIG cmd.config.angle_pid.kp = 0.10 cmd.config.angle_pid.ki = 0 # .00005 cmd.config.angle_pid.kd = 0.4 cmd.config.angle_pid.max_output = 0.5 cmd.config.angle_pid.max_i_error = 15.0 cmd.config.min_target_dist = 10.0 cmd.config.min_flip_pitch = 90.0 ugv.write_command(cmd) while True: if ugv.last_status is None or ugv.last_status.state is not messages.STATE_DRIVE_HEADING: cmd = messages.GroundCommand() cmd.type = messages.CMD_DRIVE_HEADING cmd.drive_heading.heading = -115.0 - 180 cmd.drive_heading.power = 0.3 ugv.write_command(cmd) time.sleep(2.0) except KeyboardInterrupt: ugv.write_command(messages.CMD_DISABLE) log.info("exiting...") finally: ugv.ser.flush() ugv.ser.close() ugv.stop() if __name__ == "__main__": logging.basicConfig(format='%(asctime)s [%(name)s] %(levelname)s: %(message)s', datefmt='%Y-%b-%d %H:%M:%S') log.setLevel(logging.DEBUG) main()