You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

140 lines
4.2 KiB

#!/usr/bin/env python3
import sys
import serial
import threading
from threading import Thread
import time
import binascii
from base64 import b64decode, b64encode
from e32 import E32
import messages_pb2 as messages
from google.protobuf.message import Message
class UGVComms(E32):
MAX_WRITE_RETRY = 5
RETRY_TIME = 1.0
def __init__(self, serial_port: serial.Serial):
E32.__init__(self, serial_port)
self.msg_acks = []
self.ack_cv = threading.Condition()
self.next_command_id = 1
self.last_status = None
self.rx_thread = None
def write_base64(self, data: bytes):
encoded = b64encode(data)
self.ser.write(encoded)
self.ser.write(b'\n')
def write_message(self, msg: Message):
print("writing message: ", msg)
data = msg.SerializeToString()
self.write_base64(data)
def write_command(self, command, retry=True):
cmdid = self.next_command_id
self.next_command_id += 1
gmsg = messages.GroundMessage()
if type(command) is int:
gmsg.command.type = command
else:
gmsg.command.CopyFrom(command)
gmsg.command.id = cmdid
self.write_message(gmsg)
last_write_time = time.time()
if not retry:
return
with self.ack_cv:
while True:
if cmdid in self.msg_acks:
self.msg_acks.remove(cmdid)
print("received ack for command")
return
time_left = time.time() - last_write_time
if time_left >= self.RETRY_TIME:
print("retry writing command")
self.write_message(gmsg)
last_write_time = time.time()
self.ack_cv.wait(timeout=time_left)
def read_message(self):
data = self.ser.read_until(terminator=b'\n')
if len(data) is 0:
return None
try:
decoded = b64decode(data, validate=True)
except binascii.Error:
print("read bad data: ", data)
self.ser.flush()
return None
msg = messages.UGV_Message()
msg.ParseFromString(decoded)
return msg
def process_message(self, msg: messages.UGV_Message):
if msg is None:
return
print("received UGV message: ", msg)
if msg.HasField("command_ack"):
with self.ack_cv:
self.msg_acks.append(msg.command_ack)
self.ack_cv.notify()
elif msg.HasField("status"):
self.last_status = msg.status
def start(self):
self.rx_thread = Thread(target=self.__rx_thread_entry, daemon=True)
self.rx_thread.start()
def stop(self):
self.rx_thread.join()
def __rx_thread_entry(self):
while self.ser.is_open:
try:
msg = self.read_message()
self.process_message(msg)
except serial.SerialException as e:
print("serial error: ", e, file=sys.stderr)
return
except Exception as e:
print("error reading message: ", e, file=sys.stderr)
continue
def main():
if len(sys.argv) >= 2:
ser_url = sys.argv[1]
else:
ser_url = "hwgrep://"
ser = serial.serial_for_url(ser_url, baudrate=9600, parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS,
timeout=0.5)
ugv = UGVComms(ser)
ugv.start()
time.sleep(0.2)
try:
while True:
if ugv.last_status is None or ugv.last_status.state is not messages.STATE_DRIVE_HEADING:
cmd = messages.GroundCommand()
cmd.type = messages.CMD_DRIVE_HEADING
cmd.drive_heading.heading = -90.0
cmd.drive_heading.power = 0.2
ugv.write_command(cmd)
time.sleep(1.0)
except KeyboardInterrupt:
ugv.write_command(messages.CMD_DISABLE)
print("exiting...")
finally:
ugv.ser.flush()
ugv.ser.close()
ugv.stop()
if __name__ == "__main__":
main()