From 2cbf80f2938adb2990c981ffc9c5da2c85acc468 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Thu, 23 May 2019 12:34:43 -0700 Subject: [PATCH] improve CLI interface --- tools/ugv_cmd.py | 85 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 58 insertions(+), 27 deletions(-) diff --git a/tools/ugv_cmd.py b/tools/ugv_cmd.py index 62a47a1..33beb48 100755 --- a/tools/ugv_cmd.py +++ b/tools/ugv_cmd.py @@ -6,6 +6,7 @@ import time import logging import readline import yaml +import types from ugv import UGVComms import messages_pb2 as messages @@ -13,6 +14,7 @@ import config_pb2 log = logging.getLogger("ugv_cmd") + def dict2pb(d, pb): for key in d: val = d[key] @@ -22,44 +24,56 @@ def dict2pb(d, pb): setattr(pb, key, val) +class CLI_CMD: + def __init__(self, func, names=[], description=""): + self.func = func + self.names = names + self.description = description + + +cli_commands = [] + + +def cli_cmd(names=[], description=""): + def dec(fn: types.FunctionType): + if len(dec.names) is 0: + dec.names = [fn.__name__] + cli_commands.append(CLI_CMD(fn, dec.names, dec.description)) + return fn + + dec.names = names + dec.description = description + + return dec + + class UGV_CLI: def __init__(self): self.is_running = False self.last_state = messages.STATE_IDLE self.commands = { - 'help': self.help_msg, - 'h': self.help_msg, - '?': self.help_msg, - 'exit': self.exit, - 'q': self.exit, - 'disable': self.disable, - 'd': self.disable, - 'set_target': self.set_target, - 'st': self.set_target, - 'set_config': self.set_config, - 'sc': self.set_config, - 'drive_heading': self.drive_heading, - 'dh': self.drive_heading, - 'drive_to_target': self.drive_to_target, - 'dt': self.drive_to_target, 'get_status': self.get_status, 's': self.get_status, } pass - + @cli_cmd(names=["help", "h", "?"], description="Print this help message") def help_msg(self): - print("""Commands: -help, h, ?: Print this help message -exit, q, C-c, C-d: Quit the program -""") + print("Commands:") + for cmd in cli_commands: + names = ", ".join(cmd.names).ljust(30, ' ') + print("{}: {}".format(names, cmd.description)) + print() + @cli_cmd(names=["exit", "q"], description="Quit the program") def exit(self): self.is_running = False + @cli_cmd(names=["disable", "d"], description="Disable the UGV") def disable(self): self.ugv.write_command(messages.CMD_DISABLE) + @cli_cmd(names=["set_target", "st"], description="Set the target to ") def set_target(self, lat=34.068415, long=-118.443217): lat = float(lat) long = float(long) @@ -70,6 +84,7 @@ exit, q, C-c, C-d: Quit the program self.ugv.write_command(cmd) log.info("set target to (%f, %f)", lat, long) + @cli_cmd(names=["set_config", "sc"], description="Load configuration from config.yml and send") def set_config(self): with open('./tools/config.yml', 'r') as configfile: config = yaml.load(configfile) @@ -86,6 +101,7 @@ exit, q, C-c, C-d: Quit the program self.ugv.write_command(cmd) log.info("updated config") + @cli_cmd(names=["drive_heading", "dh"], description="Drive a with a forward ") def drive_heading(self, heading=65, power=0.0): heading = float(heading) power = float(power) @@ -96,18 +112,27 @@ exit, q, C-c, C-d: Quit the program self.ugv.write_command(cmd) log.info("driving heading %f at power %f", heading, power) + @cli_cmd(names=["drive_to_target", "dt"], description="Drive to the drop target") def drive_to_target(self): cmd = messages.GroundCommand() cmd.type = messages.CMD_DRIVE_TO_TARGET self.ugv.write_command(cmd) log.info("driving to target") + @cli_cmd(names=["get_status", "s"], description="Print the last status of the UGV") def get_status(self): if self.ugv.last_status_time is None: log.info("no status received") else: last_status_delay = time.time() - self.ugv.last_status_time - log.info("last status (%.4f seconds ago): %s", last_status_delay, self.ugv.last_status) + log.info("last status (%.4f seconds ago): %s", + last_status_delay, self.ugv.last_status) + + def find_command(self, name): + for cmd in cli_commands: + if name in cmd.names: + return cmd + return None def start(self): self.is_running = True @@ -115,26 +140,31 @@ exit, q, C-c, C-d: Quit the program if len(sys.argv) >= 2: ser_url = sys.argv[1] else: - ser_url = "hwgrep://USB1" + ser_url = "hwgrep://USB" ser = serial.serial_for_url(ser_url, baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=0.5) self.ugv = UGVComms(ser) self.ugv.start() time.sleep(0.2) + last_line = None try: while self.is_running: line = input("UGV> ") + if len(line) is 0 and last_line is not None: + line = last_line line_parts = line.split(' ') if len(line_parts) is 0: continue - try: - cmd = self.commands[line_parts[0]] - except KeyError: + cmd = self.find_command(line_parts[0]) + if cmd is None: print("Unknown command: '%s'" % line_parts[0]) continue + last_line = line try: - cmd(*line_parts[1:]) + cmd.func(self, *line_parts[1:]) + except KeyboardInterrupt: + print("Command interrupted") except Exception as e: print("Error executing command: ", e) # TODO: continuously write state @@ -153,6 +183,7 @@ exit, q, C-c, C-d: Quit the program if __name__ == "__main__": - logging.basicConfig(format='%(asctime)s [%(name)s] %(levelname)s: %(message)s', datefmt='%Y-%b-%d %H:%M:%S') + logging.basicConfig( + format='%(asctime)s [%(name)s] %(levelname)s: %(message)s', datefmt='%Y-%b-%d %H:%M:%S') logging.getLogger().setLevel(logging.INFO) UGV_CLI().start()