#!/usr/bin/env python3 import socketio import logging from base64 import b64encode from threading import Thread import time from ugv_cmd import UGV_CLI import messages_pb2 as messages from google.protobuf.message import Message log = logging.getLogger("ugv_to_ground") def encode_msg(msg: Message): data = msg.SerializeToString() return b64encode(data).decode('utf-8') def ping_thread_entry(ugv_cli): while ugv_cli.is_running and ugv_cli.ugv.ser.is_open: try: ugv_cli.get_status() time.sleep(5.0) except IOError as e: log.error("Error pinging UGV: {}".format(e)) def start_ugv_to_ground(): server_ip = 'localhost' sio = socketio.Client() def on_msg_received(msg: messages.UGV_Message): sio.emit('UGV_MESSAGE', encode_msg(msg), namespace='/ugv') pass ugv_cli = UGV_CLI(on_msg_received) ugv_cli.start() @sio.on('connect', namespace='/ugv') def on_connect(): log.info("connected to ground server") @sio.on('disconnect', namespace='/ugv') def on_disconnect(): log.info("disconnected from ground server!") @sio.on('SET_TARGET', namespace='/ugv') def set_target(msg): log.info("Setting UGV target") try: ugv_cli.set_target(msg['lat'], msg['lng']) except IOError as e: log.error("Error setting target: {}".format(e)) @sio.on('DRIVE_TO_TARGET', namespace='/ugv') def drive_to_target(): log.info("Driving to target!") try: ugv_cli.drive_to_target() except IOError as e: log.error("Error setting target: {}".format(e)) while True: try: sio.connect('http://'+server_ip+':8081', namespaces=['/ugv'], transports='websocket') break except: print("Can't connect to ground server. Retrying in 2 seconds...") sio.sleep(2) ping_thread = Thread(target=ping_thread_entry, args=(ugv_cli, ), daemon=True) ping_thread.start() ugv_cli.run_cli() if __name__ == "__main__": logging.basicConfig(format='%(asctime)s [%(name)s] %(levelname)s: %(message)s', datefmt='%Y-%b-%d %H:%M:%S') logging.getLogger().setLevel(logging.INFO) start_ugv_to_ground()