Browse Source

improve CLI interface

master
Alex Mikhalev 6 years ago
parent
commit
2cbf80f293
  1. 85
      tools/ugv_cmd.py

85
tools/ugv_cmd.py

@ -6,6 +6,7 @@ import time
import logging import logging
import readline import readline
import yaml import yaml
import types
from ugv import UGVComms from ugv import UGVComms
import messages_pb2 as messages import messages_pb2 as messages
@ -13,6 +14,7 @@ import config_pb2
log = logging.getLogger("ugv_cmd") log = logging.getLogger("ugv_cmd")
def dict2pb(d, pb): def dict2pb(d, pb):
for key in d: for key in d:
val = d[key] val = d[key]
@ -22,44 +24,56 @@ def dict2pb(d, pb):
setattr(pb, key, val) setattr(pb, key, val)
class CLI_CMD:
def __init__(self, func, names=[], description=""):
self.func = func
self.names = names
self.description = description
cli_commands = []
def cli_cmd(names=[], description=""):
def dec(fn: types.FunctionType):
if len(dec.names) is 0:
dec.names = [fn.__name__]
cli_commands.append(CLI_CMD(fn, dec.names, dec.description))
return fn
dec.names = names
dec.description = description
return dec
class UGV_CLI: class UGV_CLI:
def __init__(self): def __init__(self):
self.is_running = False self.is_running = False
self.last_state = messages.STATE_IDLE self.last_state = messages.STATE_IDLE
self.commands = { self.commands = {
'help': self.help_msg,
'h': self.help_msg,
'?': self.help_msg,
'exit': self.exit,
'q': self.exit,
'disable': self.disable,
'd': self.disable,
'set_target': self.set_target,
'st': self.set_target,
'set_config': self.set_config,
'sc': self.set_config,
'drive_heading': self.drive_heading,
'dh': self.drive_heading,
'drive_to_target': self.drive_to_target,
'dt': self.drive_to_target,
'get_status': self.get_status, 'get_status': self.get_status,
's': self.get_status, 's': self.get_status,
} }
pass pass
@cli_cmd(names=["help", "h", "?"], description="Print this help message")
def help_msg(self): def help_msg(self):
print("""Commands: print("Commands:")
help, h, ?: Print this help message for cmd in cli_commands:
exit, q, C-c, C-d: Quit the program names = ", ".join(cmd.names).ljust(30, ' ')
""") print("{}: {}".format(names, cmd.description))
print()
@cli_cmd(names=["exit", "q"], description="Quit the program")
def exit(self): def exit(self):
self.is_running = False self.is_running = False
@cli_cmd(names=["disable", "d"], description="Disable the UGV")
def disable(self): def disable(self):
self.ugv.write_command(messages.CMD_DISABLE) self.ugv.write_command(messages.CMD_DISABLE)
@cli_cmd(names=["set_target", "st"], description="Set the target to <lat> <long>")
def set_target(self, lat=34.068415, long=-118.443217): def set_target(self, lat=34.068415, long=-118.443217):
lat = float(lat) lat = float(lat)
long = float(long) long = float(long)
@ -70,6 +84,7 @@ exit, q, C-c, C-d: Quit the program
self.ugv.write_command(cmd) self.ugv.write_command(cmd)
log.info("set target to (%f, %f)", lat, long) log.info("set target to (%f, %f)", lat, long)
@cli_cmd(names=["set_config", "sc"], description="Load configuration from config.yml and send")
def set_config(self): def set_config(self):
with open('./tools/config.yml', 'r') as configfile: with open('./tools/config.yml', 'r') as configfile:
config = yaml.load(configfile) config = yaml.load(configfile)
@ -86,6 +101,7 @@ exit, q, C-c, C-d: Quit the program
self.ugv.write_command(cmd) self.ugv.write_command(cmd)
log.info("updated config") log.info("updated config")
@cli_cmd(names=["drive_heading", "dh"], description="Drive a <heading> with a forward <power>")
def drive_heading(self, heading=65, power=0.0): def drive_heading(self, heading=65, power=0.0):
heading = float(heading) heading = float(heading)
power = float(power) power = float(power)
@ -96,18 +112,27 @@ exit, q, C-c, C-d: Quit the program
self.ugv.write_command(cmd) self.ugv.write_command(cmd)
log.info("driving heading %f at power %f", heading, power) log.info("driving heading %f at power %f", heading, power)
@cli_cmd(names=["drive_to_target", "dt"], description="Drive to the drop target")
def drive_to_target(self): def drive_to_target(self):
cmd = messages.GroundCommand() cmd = messages.GroundCommand()
cmd.type = messages.CMD_DRIVE_TO_TARGET cmd.type = messages.CMD_DRIVE_TO_TARGET
self.ugv.write_command(cmd) self.ugv.write_command(cmd)
log.info("driving to target") log.info("driving to target")
@cli_cmd(names=["get_status", "s"], description="Print the last status of the UGV")
def get_status(self): def get_status(self):
if self.ugv.last_status_time is None: if self.ugv.last_status_time is None:
log.info("no status received") log.info("no status received")
else: else:
last_status_delay = time.time() - self.ugv.last_status_time last_status_delay = time.time() - self.ugv.last_status_time
log.info("last status (%.4f seconds ago): %s", last_status_delay, self.ugv.last_status) log.info("last status (%.4f seconds ago): %s",
last_status_delay, self.ugv.last_status)
def find_command(self, name):
for cmd in cli_commands:
if name in cmd.names:
return cmd
return None
def start(self): def start(self):
self.is_running = True self.is_running = True
@ -115,26 +140,31 @@ exit, q, C-c, C-d: Quit the program
if len(sys.argv) >= 2: if len(sys.argv) >= 2:
ser_url = sys.argv[1] ser_url = sys.argv[1]
else: else:
ser_url = "hwgrep://USB1" ser_url = "hwgrep://USB"
ser = serial.serial_for_url(ser_url, baudrate=9600, parity=serial.PARITY_NONE, ser = serial.serial_for_url(ser_url, baudrate=9600, parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS,
timeout=0.5) timeout=0.5)
self.ugv = UGVComms(ser) self.ugv = UGVComms(ser)
self.ugv.start() self.ugv.start()
time.sleep(0.2) time.sleep(0.2)
last_line = None
try: try:
while self.is_running: while self.is_running:
line = input("UGV> ") line = input("UGV> ")
if len(line) is 0 and last_line is not None:
line = last_line
line_parts = line.split(' ') line_parts = line.split(' ')
if len(line_parts) is 0: if len(line_parts) is 0:
continue continue
try: cmd = self.find_command(line_parts[0])
cmd = self.commands[line_parts[0]] if cmd is None:
except KeyError:
print("Unknown command: '%s'" % line_parts[0]) print("Unknown command: '%s'" % line_parts[0])
continue continue
last_line = line
try: try:
cmd(*line_parts[1:]) cmd.func(self, *line_parts[1:])
except KeyboardInterrupt:
print("Command interrupted")
except Exception as e: except Exception as e:
print("Error executing command: ", e) print("Error executing command: ", e)
# TODO: continuously write state # TODO: continuously write state
@ -153,6 +183,7 @@ exit, q, C-c, C-d: Quit the program
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(format='%(asctime)s [%(name)s] %(levelname)s: %(message)s', datefmt='%Y-%b-%d %H:%M:%S') logging.basicConfig(
format='%(asctime)s [%(name)s] %(levelname)s: %(message)s', datefmt='%Y-%b-%d %H:%M:%S')
logging.getLogger().setLevel(logging.INFO) logging.getLogger().setLevel(logging.INFO)
UGV_CLI().start() UGV_CLI().start()

Loading…
Cancel
Save