#!/usr/bin/env python3 import socketio import logging from base64 import b64encode from threading import Thread from ugv_cmd import UGV_CLI import messages_pb2 as messages from google.protobuf.message import Message def encode_msg(msg: Message): data = msg.SerializeToString() return b64encode(data) if __name__ == "__main__": logging.basicConfig(format='%(asctime)s [%(name)s] %(levelname)s: %(message)s', datefmt='%Y-%b-%d %H:%M:%S') logging.getLogger().setLevel(logging.INFO) server_ip = 'localhost' sio = socketio.Client() def on_msg_received(msg: messages.UGV_Message): sio.emit('UGV_MESSAGE', encode_msg(msg), namespace='/ugv') ugv_cli = UGV_CLI(on_msg_received) ugv_cli.start() @sio.on('connect', namespace='/ugv') def on_connect(): print("connected to ground server") @sio.on('disconnect', namespace='/ugv') def on_disconnect(): print("disconnected from ground server!") @sio.on('SET_TARGET', namespace='/ugv') def set_target(msg): print("Setting UGV target") ugv_cli.set_target(msg['lat'], msg['lng']) @sio.on('DRIVE_TO_TARGET', namespace='/ugv') def drive_to_target(): print("Driving to target!") ugv_cli.drive_to_target() while True: try: sio.connect('http://'+server_ip+':8081', namespaces=['/ugv'], transports='websocket') break except: print("Can't connect to ground server. Retrying in 2 seconds...") sio.sleep(2) ugv_cli.run_cli()