Browse Source

Work on good tools

master
Alex Mikhalev 6 years ago
parent
commit
c9e872c355
  1. 10
      tools/config.yml
  2. 43
      tools/config2c.py
  3. 0
      tools/config_pb2.py
  4. 0
      tools/e32.py
  5. 0
      tools/messages_pb2.py
  6. 0
      tools/requirements.txt
  7. 19
      tools/ugv.py
  8. 115
      tools/ugv_cmd.py

10
tools/config.yml

@ -0,0 +1,10 @@
REVISION: 1
angle_pid:
kp: 0.10
ki: 0.0
kd: 0.4
max_output: 0.5
max_i_error: 15.0
min_target_dist: 10.0
min_flip_pitch: 90.0

43
tools/config2c.py

@ -0,0 +1,43 @@
#!/usr/bin/env python3
import config_pb2
import yaml
def dict2pb(d, pb):
for key in d:
val = d[key]
if isinstance(val, dict):
dict2pb(val, getattr(pb, key))
else:
setattr(pb, key, val)
def main():
with open('./tools/config.yml', 'r') as configfile:
config = yaml.load(configfile)
if 'REVISION' in config:
config_rev = config['REVISION']
del config['REVISION']
else:
config_rev = 1
confpb = config_pb2.Config()
dict2pb(config, confpb)
pbdata = confpb.SerializeToString()
pbdataarry = ','.join([str(int(b)) for b in pbdata])
cfile = """#include <stdint.h>
uint8_t CONFIG_DATA[] = {%s};
size_t CONFIG_DATA_LEN = %s;
int CONFIG_REV = %s;""" % (pbdataarry, len(pbdata), int(config_rev))
print(cfile)
if __name__ == "__main__":
main()

0
e32_client/config_pb2.py → tools/config_pb2.py

0
e32_client/e32.py → tools/e32.py

0
e32_client/messages_pb2.py → tools/messages_pb2.py

0
e32_client/requirements.txt → tools/requirements.txt

19
e32_client/ugv.py → tools/ugv.py

@ -13,9 +13,7 @@ from e32 import E32
import messages_pb2 as messages import messages_pb2 as messages
from google.protobuf.message import Message from google.protobuf.message import Message
logging.basicConfig(format='%(asctime)s [%(name)s] %(levelname)s: %(message)s', datefmt='%Y-%b-%d %H:%M:%S')
log = logging.getLogger("ugv") log = logging.getLogger("ugv")
log.setLevel(logging.DEBUG)
class UGVComms(E32): class UGVComms(E32):
MAX_WRITE_RETRY = 5 MAX_WRITE_RETRY = 5
@ -28,6 +26,7 @@ class UGVComms(E32):
self.next_command_id = 1 self.next_command_id = 1
self.last_status = None self.last_status = None
self.rx_thread = None self.rx_thread = None
self.is_running = False
def write_base64(self, data: bytes): def write_base64(self, data: bytes):
encoded = b64encode(data) encoded = b64encode(data)
@ -92,19 +91,29 @@ class UGVComms(E32):
self.last_status = msg.status self.last_status = msg.status
def start(self): def start(self):
if self.is_running:
log.warning("RX thread already running")
return False
self.rx_thread = Thread(target=self.__rx_thread_entry, daemon=True) self.rx_thread = Thread(target=self.__rx_thread_entry, daemon=True)
self.rx_thread.start() self.rx_thread.start()
log.debug("started RX thread")
return True
def stop(self): def stop(self):
if not self.is_running:
return False
self.is_running = False
self.ser.close()
self.rx_thread.join() self.rx_thread.join()
return True
def __rx_thread_entry(self): def __rx_thread_entry(self):
while self.ser.is_open: while self.is_running and self.ser.is_open:
try: try:
msg = self.read_message() msg = self.read_message()
self.process_message(msg) self.process_message(msg)
except serial.SerialException: except serial.SerialException:
if not self.ser.is_open: # port was probably just closed if not self.ser.is_open or not self.is_running: # port was probably just closed
return return
log.error("serial error", exc_info=True) log.error("serial error", exc_info=True)
return return
@ -157,4 +166,6 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(format='%(asctime)s [%(name)s] %(levelname)s: %(message)s', datefmt='%Y-%b-%d %H:%M:%S')
log.setLevel(logging.DEBUG)
main() main()

115
tools/ugv_cmd.py

@ -0,0 +1,115 @@
#!/usr/bin/env python3
import sys
import serial
import time
import logging
import readline
from ugv import UGVComms
import messages_pb2 as messages
log = logging.getLogger("ugv_cmd")
class UGV_CLI:
def __init__(self):
self.is_running = False
self.last_state = messages.STATE_IDLE
self.commands = {
'help': self.help_msg,
'h': self.help_msg,
'?': self.help_msg,
'exit': self.exit,
'q': self.exit
}
pass
def help_msg(self):
print("""Commands:
help, h, ?: Print this help message
exit, q, C-c, C-d: Quit the program
""")
def exit(self):
self.is_running = False
def disable(self):
self.ugv.write_command(messages.CMD_DISABLE)
def set_target(self):
# TODO: parse arguments somehow
cmd = messages.GroundCommand()
cmd.type = messages.CMD_SET_TARGET
cmd.target_location.latitude = 34.068415
cmd.target_location.longitude = -118.443217
self.ugv.write_command(cmd)
def set_config(self):
# TODO: read from config.yml
cmd = messages.GroundCommand()
cmd.type = messages.CMD_SET_CONFIG
cmd.config.angle_pid.kp = 0.10
cmd.config.angle_pid.ki = 0 # .00005
cmd.config.angle_pid.kd = 0.4
cmd.config.angle_pid.max_output = 0.5
cmd.config.angle_pid.max_i_error = 15.0
cmd.config.min_target_dist = 10.0
cmd.config.min_flip_pitch = 90.0
self.ugv.write_command(cmd)
def drive_heading(self):
# TODO: parse arguments somehow
cmd = messages.GroundCommand()
cmd.type = messages.CMD_DRIVE_HEADING
cmd.drive_heading.heading = -115.0 - 180
cmd.drive_heading.power = 0.3
self.ugv.write_command(cmd)
time.sleep(2.0)
def start(self):
self.is_running = True
readline.parse_and_bind('tab: complete')
#readline.parse_and_bind('set editing-mode vi')
if len(sys.argv) >= 2:
ser_url = sys.argv[1]
else:
ser_url = "loop://"
ser = serial.serial_for_url(ser_url, baudrate=9600, parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS,
timeout=0.5)
self.ugv = UGVComms(ser)
self.ugv.start()
time.sleep(0.2)
try:
while self.is_running:
line = input("UGV> ")
line_parts = line.split(' ')
if len(line_parts) is 0:
continue
try:
cmd = self.commands[line_parts[0]]
except KeyError:
print("Unknown command: '%s'" % line_parts[0])
continue
cmd()
# TODO: continuously write state
# while True:
# if self.ugv.last_status is None or self.ugv.last_status.state is not messages.STATE_DRIVE_HEADING:
except (KeyboardInterrupt, EOFError):
self.exit()
finally:
log.info("disabling UGV...")
try:
self.ugv.write_command(messages.CMD_DISABLE)
log.info("done. exiting")
except KeyboardInterrupt:
log.info("force exiting...")
self.ugv.stop()
if __name__ == "__main__":
logging.basicConfig(format='%(asctime)s [%(name)s] %(levelname)s: %(message)s', datefmt='%Y-%b-%d %H:%M:%S')
logging.getLogger().setLevel(logging.DEBUG)
UGV_CLI().start()
Loading…
Cancel
Save