#!/usr/bin/env python3 import sys import serial import time import logging import readline import yaml try: from yaml import CLoader as YamlLoader, CDumper as YamlDumper except ImportError: from yaml import Loader as YamlLoader, Dumper as YamlDumper import types from ugv import UGVComms import messages_pb2 as messages import config_pb2 log = logging.getLogger("ugv_cmd") def dict2pb(d, pb): for key in d: val = d[key] if isinstance(val, dict): dict2pb(val, getattr(pb, key)) else: setattr(pb, key, val) class CLI_CMD: def __init__(self, func, names=[], description=""): self.func = func self.names = names self.description = description cli_commands = [] def cli_cmd(names=None, description=""): def dec(fn: types.FunctionType): if dec.names is None: dec.names = [fn.__name__] cli_commands.append(CLI_CMD(fn, dec.names, dec.description)) return fn dec.names = names dec.description = description return dec class UGV_CLI: def __init__(self, on_msg_received=None): self.on_msg_received = on_msg_received self.is_running = False self.last_state = messages.STATE_IDLE self.commands = { 'get_status': self.get_status, 's': self.get_status, } self.ugv = None pass @cli_cmd(names=["help", "h", "?"], description="Print this help message") def help_msg(self): print("Commands:") for cmd in cli_commands: names = ", ".join(cmd.names).ljust(30, ' ') print("{}: {}".format(names, cmd.description)) print() @cli_cmd(names=["exit", "q", "C-d", "C-c"], description="Quit the program") def exit(self): self.is_running = False @cli_cmd(names=["disable", "d"], description="Disable the UGV") def disable(self): self.ugv.write_command(messages.CMD_DISABLE) @cli_cmd(names=["set_target", "st"], description="Set the target to ") def set_target(self, lat=34.068415, long=-118.443217): lat = float(lat) long = float(long) cmd = messages.GroundCommand() cmd.type = messages.CMD_SET_TARGET cmd.target_location.latitude = lat cmd.target_location.longitude = long self.ugv.write_command(cmd) log.info("set target to (%f, %f)", lat, long) @cli_cmd(names=["set_config", "sc"], description="Load configuration from config.yml and send") def set_config(self, config_file_name="./tools/config.yml"): with open(config_file_name, 'r') as configfile: config = yaml.load(configfile, Loader=YamlLoader) if 'REVISION' in config: config_rev = config['REVISION'] del config['REVISION'] else: config_rev = 1 cmd = messages.GroundCommand() cmd.type = messages.CMD_SET_CONFIG dict2pb(config, cmd.config) self.ugv.write_command(cmd) log.info("updated config") @cli_cmd(names=["drive_heading", "dh"], description="Drive a with a forward ") def drive_heading(self, heading=65, power=0.0): heading = float(heading) power = float(power) cmd = messages.GroundCommand() cmd.type = messages.CMD_DRIVE_HEADING cmd.drive_heading.heading = heading cmd.drive_heading.power = power self.ugv.write_command(cmd) log.info("driving heading %f at power %f", heading, power) @cli_cmd(names=["drive_to_target", "dt"], description="Drive to the drop target") def drive_to_target(self): cmd = messages.GroundCommand() cmd.type = messages.CMD_DRIVE_TO_TARGET self.ugv.write_command(cmd) log.info("driving to target") @cli_cmd(names=["run_test", "rt"], description="Run test mode") def run_test(self): cmd = messages.GroundCommand() cmd.type = messages.CMD_TEST self.ugv.write_command(cmd) log.info("running test mode") @cli_cmd(names=["last_status", "ls", "s"], description="Print the last status of the UGV") def last_status(self): if self.ugv.last_status_time is None: log.info("no status received") else: last_status_delay = time.time() - self.ugv.last_status_time log.info("last status (%.4f seconds ago): %s", last_status_delay, self.ugv.last_status) @cli_cmd(names=["get_status", "gs"], description="Get the current status of the UGV") def get_status(self): cmd = messages.GroundCommand() cmd.type = messages.CMD_GET_STATUS self.ugv.write_command(cmd) self.last_status() @cli_cmd(names=["ping", "p"], description="Ping the UGV") def ping(self): cmd = messages.GroundCommand() cmd.type = messages.CMD_PING self.ugv.write_command(cmd) print("Received ping response") @staticmethod def find_command(name): for cmd in cli_commands: if name in cmd.names: return cmd return None @staticmethod def complete_command(text, state): options = [name for cmd in cli_commands for name in cmd.names if name.startswith(text)] if state < len(options): return options[state] else: return None def start(self): self.is_running = True if len(sys.argv) >= 2: ser_url = sys.argv[1] else: ser_url = "hwgrep://USB" ser = serial.serial_for_url(ser_url, baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=0.5) self.ugv = UGVComms(ser, self.on_msg_received) self.ugv.start() def run_cli(self): if self.ugv is None: self.start() readline.parse_and_bind("tab: complete") readline.set_completer(self.complete_command) last_line = None try: print("Run 'help' to find out what commands are available") while self.is_running: line = input("UGV> ") if len(line) is 0 and last_line is not None: print(last_line) line = last_line line_parts = line.split(' ') if len(line_parts) is 0: continue cmd = self.find_command(line_parts[0]) if cmd is None: print("Unknown command: '%s'" % line_parts[0]) continue last_line = line try: cmd.func(self, *line_parts[1:]) except KeyboardInterrupt: print("Command interrupted") except Exception as e: print("Error executing command: ", e) # TODO: continuously write state # while True: # if self.ugv.last_status is None or self.ugv.last_status.state is not messages.STATE_DRIVE_HEADING: except (KeyboardInterrupt, EOFError): self.exit() finally: log.info("disabling UGV...") try: self.ugv.write_command(messages.CMD_DISABLE) log.info("done. exiting") except KeyboardInterrupt: log.info("force exiting...") self.ugv.stop() if __name__ == "__main__": logging.basicConfig( format='%(asctime)s [%(name)s] %(levelname)s: %(message)s', datefmt='%Y-%b-%d %H:%M:%S') logging.getLogger().setLevel(logging.INFO) UGV_CLI().run_cli()