You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

241 lines
7.8 KiB

#!/usr/bin/env python3
import sys
import serial
import time
import logging
import readline
import yaml
try:
from yaml import CLoader as YamlLoader, CDumper as YamlDumper
except ImportError:
from yaml import Loader as YamlLoader, Dumper as YamlDumper
import types
from ugv import UGVComms
import messages_pb2 as messages
import config_pb2
log = logging.getLogger("ugv_cmd")
def dict2pb(d, pb):
for key in d:
val = d[key]
if isinstance(val, dict):
dict2pb(val, getattr(pb, key))
else:
setattr(pb, key, val)
class CLI_CMD:
def __init__(self, func, names=[], description=""):
self.func = func
self.names = names
self.description = description
cli_commands = []
def cli_cmd(names=None, description=""):
def dec(fn: types.FunctionType):
if dec.names is None:
dec.names = [fn.__name__]
cli_commands.append(CLI_CMD(fn, dec.names, dec.description))
return fn
dec.names = names
dec.description = description
return dec
class UGV_CLI:
def __init__(self, on_msg_received=None):
self.on_msg_received = on_msg_received
self.is_running = False
self.last_state = messages.STATE_IDLE
self.commands = {
'get_status': self.get_status,
's': self.get_status,
}
self.ugv = None
pass
@cli_cmd(names=["help", "h", "?"], description="Print this help message")
def help_msg(self):
print("Commands:")
for cmd in cli_commands:
names = ", ".join(cmd.names).ljust(30, ' ')
print("{}: {}".format(names, cmd.description))
print()
@cli_cmd(names=["exit", "q", "C-d", "C-c"], description="Quit the program")
def exit(self):
self.is_running = False
@cli_cmd(names=["disable", "d"], description="Disable the UGV")
def disable(self):
self.ugv.write_command(messages.CMD_DISABLE)
@cli_cmd(names=["set_target", "st"], description="Set the target to <lat> <long>")
def set_target(self, lat=34.068415, long=-118.443217):
lat = float(lat)
long = float(long)
cmd = messages.GroundCommand()
cmd.type = messages.CMD_SET_TARGET
cmd.target_location.latitude = lat
cmd.target_location.longitude = long
self.ugv.write_command(cmd)
log.info("set target to (%f, %f)", lat, long)
@cli_cmd(names=["set_config", "sc"], description="Load configuration from config.yml and send")
def set_config(self, config_file_name="./tools/config.yml"):
with open(config_file_name, 'r') as configfile:
config = yaml.load(configfile, Loader=YamlLoader)
if 'REVISION' in config:
config_rev = config['REVISION']
del config['REVISION']
else:
config_rev = 1
cmd = messages.GroundCommand()
cmd.type = messages.CMD_SET_CONFIG
dict2pb(config, cmd.config)
self.ugv.write_command(cmd)
log.info("updated config")
@cli_cmd(names=["drive_heading", "dh"], description="Drive a <heading> with a forward <power>")
def drive_heading(self, heading=65, power=0.0):
heading = float(heading)
power = float(power)
cmd = messages.GroundCommand()
cmd.type = messages.CMD_DRIVE_HEADING
cmd.drive_heading.heading = heading
cmd.drive_heading.power = power
self.ugv.write_command(cmd)
log.info("driving heading %f at power %f", heading, power)
@cli_cmd(names=["drive_to_target", "dt"], description="Drive to the drop target")
def drive_to_target(self):
cmd = messages.GroundCommand()
cmd.type = messages.CMD_DRIVE_TO_TARGET
self.ugv.write_command(cmd)
log.info("driving to target")
@cli_cmd(names=["run_test", "rt"], description="Run test mode")
def run_test(self):
cmd = messages.GroundCommand()
cmd.type = messages.CMD_TEST
self.ugv.write_command(cmd)
log.info("running test mode")
@cli_cmd(names=["last_status", "ls", "s"], description="Print the last status of the UGV")
def last_status(self):
if self.ugv.last_status_time is None:
log.info("no status received")
else:
last_status_delay = time.time() - self.ugv.last_status_time
log.info("last status (%.4f seconds ago): %s",
last_status_delay, self.ugv.last_status)
@cli_cmd(names=["get_status", "gs"], description="Get the current status of the UGV")
def get_status(self):
cmd = messages.GroundCommand()
cmd.type = messages.CMD_GET_STATUS
self.ugv.write_command(cmd)
self.last_status()
@cli_cmd(names=["ping", "p"], description="Ping the UGV")
def ping(self):
cmd = messages.GroundCommand()
cmd.type = messages.CMD_PING
self.ugv.write_command(cmd)
print("Received ping response")
@cli_cmd(names=["save_logs", "sl"], description="Save logs to a file")
def save_logs(self, file=None):
if file is None:
file = 'ugv_log.txt'
self.ugv.save_logs(file)
print("Saving logs to {}".format(file))
@staticmethod
def find_command(name):
for cmd in cli_commands:
if name in cmd.names:
return cmd
return None
@staticmethod
def complete_command(text, state):
options = [name for cmd in cli_commands for name in cmd.names if name.startswith(text)]
if state < len(options):
return options[state]
else:
return None
def start(self):
self.is_running = True
if len(sys.argv) >= 2:
ser_url = sys.argv[1]
else:
ser_url = "hwgrep://USB"
ser = serial.serial_for_url(ser_url, baudrate=9600, parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS,
timeout=0.5)
self.ugv = UGVComms(ser, self.on_msg_received)
self.ugv.start()
def run_cli(self):
if self.ugv is None:
self.start()
readline.parse_and_bind("tab: complete")
readline.set_completer(self.complete_command)
last_line = None
try:
print("Run 'help' to find out what commands are available")
while self.is_running:
line = input("UGV> ")
if len(line) is 0 and last_line is not None:
print(last_line)
line = last_line
line_parts = line.split(' ')
if len(line_parts) is 0:
continue
cmd = self.find_command(line_parts[0])
if cmd is None:
print("Unknown command: '%s'" % line_parts[0])
continue
last_line = line
try:
cmd.func(self, *line_parts[1:])
except KeyboardInterrupt:
print("Command interrupted")
except Exception as e:
print("Error executing command: ", e)
# TODO: continuously write state
# while True:
# if self.ugv.last_status is None or self.ugv.last_status.state is not messages.STATE_DRIVE_HEADING:
except (KeyboardInterrupt, EOFError):
self.exit()
finally:
log.info("disabling UGV...")
try:
self.ugv.write_command(messages.CMD_DISABLE)
log.info("done. exiting")
except KeyboardInterrupt:
log.info("force exiting...")
self.ugv.stop()
if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s [%(name)s] %(levelname)s: %(message)s', datefmt='%Y-%b-%d %H:%M:%S')
logging.getLogger().setLevel(logging.INFO)
UGV_CLI().run_cli()