You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

78 lines
2.2 KiB

#!/usr/bin/env python3
import serial
from threading import Thread
import time
from e32 import E32
import messages_pb2 as messages
from google.protobuf.message import Message
class UGVComms(E32):
def __init__(self, serial_port: serial.Serial):
E32.__init__(self, serial_port)
def write_len_delimited(self, data: bytes):
len_data = (len(data)).to_bytes(
1, byteorder='big') # TODO: check byte order
self.ser.write(len_data)
self.ser.write(data)
def write_message(self, msg: Message):
data = msg.SerializeToString()
self.write_len_delimited(data)
def read_message(self):
len_data = self.ser.read(size=1)
if len(len_data) != 1:
return None
msg_len = int.from_bytes(len_data, byteorder='big')
data = self.ser.read(size=msg_len)
if len(data) != msg_len:
print("read bad data: ", data)
self.ser.flush()
return None
msg = messages.UGV_Message()
msg.ParseFromString(data)
return msg
def __rx_thread_entry(ugv: UGVComms):
while ugv.ser.is_open:
try:
msg = ugv.read_message()
if msg is not None:
print("received UGV message: ", msg)
except Exception as e:
print("error reading message: ", e)
continue
if __name__ == "__main__":
ser = serial.serial_for_url("/dev/ttyUSB1", baudrate=9600, parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS,
timeout=2.0)
ugv = UGVComms(ser)
rx_thread = Thread(target=__rx_thread_entry, args=(ugv, ), daemon=True)
rx_thread.start()
# print("resetting")
# ugv.reset()
cmd_id = 1
time.sleep(0.2)
try:
while True:
gmsg = messages.GroundMessage()
gmsg.command.id = cmd_id
gmsg.command.type = messages.DISABLE
cmd_id += 1
print("writing message: ", gmsg)
ugv.write_message(gmsg)
time.sleep(2.)
except KeyboardInterrupt:
print("exiting...")
finally:
ugv.ser.close()
rx_thread.join()