#!/usr/bin/env python3 import sys import serial import time import logging import readline import yaml from ugv import UGVComms import messages_pb2 as messages import config_pb2 log = logging.getLogger("ugv_cmd") def dict2pb(d, pb): for key in d: val = d[key] if isinstance(val, dict): dict2pb(val, getattr(pb, key)) else: setattr(pb, key, val) class UGV_CLI: def __init__(self): self.is_running = False self.last_state = messages.STATE_IDLE self.commands = { 'help': self.help_msg, 'h': self.help_msg, '?': self.help_msg, 'exit': self.exit, 'q': self.exit, 'disable': self.disable, 'd': self.disable, 'target': self.set_target, 'config': self.set_config, 'c': self.set_config, 'drive_heading': self.drive_heading, 'drive_to_target': self.drive_to_target, 'get_status': self.get_status, 's': self.get_status, } pass def help_msg(self): print("""Commands: help, h, ?: Print this help message exit, q, C-c, C-d: Quit the program """) def exit(self): self.is_running = False def disable(self): self.ugv.write_command(messages.CMD_DISABLE) def set_target(self, lat=34.068415, long=-118.443217): cmd = messages.GroundCommand() cmd.type = messages.CMD_SET_TARGET cmd.target_location.latitude = lat cmd.target_location.longitude = long self.ugv.write_command(cmd) log.info("set target to (%d, %d)", lat, long) def set_config(self): with open('./tools/config.yml', 'r') as configfile: config = yaml.load(configfile) if 'REVISION' in config: config_rev = config['REVISION'] del config['REVISION'] else: config_rev = 1 cmd = messages.GroundCommand() cmd.type = messages.CMD_SET_CONFIG dict2pb(config, cmd.config) self.ugv.write_command(cmd) log.info("updated config") def drive_heading(self, heading=65, power=0.0): cmd = messages.GroundCommand() cmd.type = messages.CMD_DRIVE_HEADING cmd.drive_heading.heading = float(heading) cmd.drive_heading.power = float(power) self.ugv.write_command(cmd) log.info("driving heading %d at power %d", heading, power) def drive_to_target(self): cmd = messages.GroundCommand() cmd.type = messages.CMD_DRIVE_TO_TARGET self.ugv.write_command(cmd) log.info("driving to target") def get_status(self): if self.ugv.last_status_time is None: log.info("no status received") else: last_status_delay = time.time() - self.ugv.last_status_time log.info("last status (%.4f seconds ago): %s", last_status_delay, self.ugv.last_status) def start(self): self.is_running = True if len(sys.argv) >= 2: ser_url = sys.argv[1] else: ser_url = "hwgrep://USB1" ser = serial.serial_for_url(ser_url, baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=0.5) self.ugv = UGVComms(ser) self.ugv.start() time.sleep(0.2) try: while self.is_running: line = input("UGV> ") line_parts = line.split(' ') if len(line_parts) is 0: continue try: cmd = self.commands[line_parts[0]] except KeyError: print("Unknown command: '%s'" % line_parts[0]) continue try: cmd(*line_parts[1:]) except Exception as e: print("Error executing command: ", e) # TODO: continuously write state # while True: # if self.ugv.last_status is None or self.ugv.last_status.state is not messages.STATE_DRIVE_HEADING: except (KeyboardInterrupt, EOFError): self.exit() finally: log.info("disabling UGV...") try: self.ugv.write_command(messages.CMD_DISABLE) log.info("done. exiting") except KeyboardInterrupt: log.info("force exiting...") self.ugv.stop() if __name__ == "__main__": logging.basicConfig(format='%(asctime)s [%(name)s] %(levelname)s: %(message)s', datefmt='%Y-%b-%d %H:%M:%S') logging.getLogger().setLevel(logging.INFO) UGV_CLI().start()