From 0e02e2ee9d879ebaad6e914ce082b4c988c6171b Mon Sep 17 00:00:00 2001 From: Artem30801 Date: Thu, 23 Jan 2020 17:05:59 +0300 Subject: [PATCH] Updated messaging. File requests + args --- Drone/client.py | 22 ++--- Drone/copter_client.py | 8 +- Server/server.py | 12 +-- Server/server_qt.py | 15 ++-- messaging_lib.py | 186 +++++++++++++++++++++++------------------ 5 files changed, 134 insertions(+), 109 deletions(-) diff --git a/Drone/client.py b/Drone/client.py index 315168e..43f9b74 100644 --- a/Drone/client.py +++ b/Drone/client.py @@ -148,18 +148,18 @@ class Client(object): message = messaging.MessageManager() message.income_raw = data message.process_message() - if message.content: + if message.content and message.jsonheader["action"] == "server_ip": logger.info("Received broadcast message {} from {}".format(message.content, addr)) - if message.content["command"] == "server_ip": - args = message.content["args"] - self.config.set("SERVER", "port", int(args["port"])) - self.config.set("SERVER", "host", args["host"]) - self.config.write() - logger.info("Binding to new IP: {}:{}".format( - self.config.server_host, self.config.server_port)) - self.on_broadcast_bind() - break + kwargs = message.content["kwargs"] + self.config.set("SERVER", "port", int(kwargs["port"])) + self.config.set("SERVER", "host", kwargs["host"]) + self.config.write() + + logger.info("Binding to new IP: {}:{}".format( + self.config.server_host, self.config.server_port)) + self.on_broadcast_bind() + break finally: broadcast_client.close() @@ -168,6 +168,7 @@ class Client(object): def _process_connections(self): while True: + #self.server_connection.send_message("telemetry", kwargs={"value":{"time": time.time()}}) events = self.selector.select(timeout=1) for key, mask in events: @@ -226,6 +227,7 @@ def _response_id(*args, **kwargs): if new_id is not None: active_client.config.set("PRIVATE", "id", new_id, True) active_client.load_config() + # TODO renaming here return active_client.client_id diff --git a/Drone/copter_client.py b/Drone/copter_client.py index 7b42a54..85c6fe2 100644 --- a/Drone/copter_client.py +++ b/Drone/copter_client.py @@ -123,7 +123,7 @@ def execute_command(command): os.system(command) -def configure_chrony_ip(ip, path="/etc/chrony/chrony.conf", ip_index=1): +def configure_chrony_ip(ip, path="/etc/chrony/chrony.conf", ip_index=1): # TODO simplify try: with open(path, 'r') as f: raw_content = f.read() @@ -245,7 +245,7 @@ def _execute(*args, **kwargs): logger.info("Executing done") -@messaging.message_callback("id") +@messaging.message_callback("id") # TODO redo def _response_id(*args, **kwargs): new_id = kwargs.get("new_id", None) if new_id is not None: @@ -828,9 +828,9 @@ class Telemetry: self._tasks_cleared = False self._last_state = state - def transmit_message(self): + def transmit_message(self): # todo if connected try: - client.active_client.server_connection.send_message('telemetry', args={'value': self.create_msg_contents()}) + client.active_client.server_connection.send_message('telemetry', kwargs={'value': self.create_msg_contents()}) except AttributeError as e: logger.debug(e) diff --git a/Server/server.py b/Server/server.py index 94be295..c4726d5 100644 --- a/Server/server.py +++ b/Server/server.py @@ -185,7 +185,7 @@ class Server(messaging.Singleton): def _ip_broadcast(self): logging.info("Broadcast sender thread started!") - msg = messaging.MessageManager.create_simple_message( + msg = messaging.MessageManager.create_action_message( "server_ip", {"host": self.ip, "port": str(self.config.server_port), "id": self.id, "start_time": str(self.time_started)}) logging.debug("Formed broadcast message: {}".format(msg)) @@ -231,11 +231,11 @@ class Server(messaging.Singleton): message.process_message() content = message.content - right_command = (content and content["command"] == "server_ip") + right_command = (content and message.jsonheader["action"] == "server_ip") if right_command: - different_id = content["args"]["id"] != str(self.id) - self_younger = float(message.content["args"]["start_time"]) <= self.time_started + different_id = content["kwargs"]["id"] != str(self.id) + self_younger = float(content["kwargs"]["start_time"]) <= self.time_started if different_id and self_younger: # younger server should shut down @@ -253,7 +253,7 @@ class Server(messaging.Singleton): logging.info("Broadcast listener thread stopped, socked closed!") def send_starttime(self, copter, start_time): - copter.send_message("start", {"time": str(start_time)}) + copter.send_message("start", kwargs={"time": str(start_time)}) def requires_connect(f): @@ -356,7 +356,7 @@ class Client(messaging.ConnectionManager): @classmethod @requires_any_connected def broadcast_message(cls, command, args=None, force_all=False): - cls.broadcast(messaging.MessageManager.create_simple_message(command, args), force_all) + cls.broadcast(messaging.MessageManager.create_action_message(command, args), force_all) if __name__ == '__main__': diff --git a/Server/server_qt.py b/Server/server_qt.py index c81cd42..242cd71 100644 --- a/Server/server_qt.py +++ b/Server/server_qt.py @@ -126,9 +126,9 @@ class MainWindow(QtWidgets.QMainWindow): self.ui.action_send_any_file.triggered.connect(self.send_any_file) self.ui.action_send_any_command.triggered.connect(self.send_any_command) self.ui.action_restart_clever.triggered.connect( - b_partial(self.send_to_selected, "service_restart", {"name": "clever"})) + b_partial(self.send_to_selected, "service_restart", kwargs={"name": "clever"})) self.ui.action_restart_clever_show.triggered.connect( - b_partial(self.send_to_selected, "service_restart", {"name": "clever-show"})) + b_partial(self.send_to_selected, "service_restart", kwargs={"name": "clever-show"})) self.ui.action_update_client_repo.triggered.connect(b_partial(self.send_to_selected, "update_repo")) self.ui.action_reboot_all.triggered.connect(b_partial(self.send_to_selected, "reboot_all")) self.ui.action_set_start_to_current_position.triggered.connect(b_partial(self.send_to_selected, "move_start")) @@ -200,8 +200,9 @@ class MainWindow(QtWidgets.QMainWindow): yield f(copter, *args, **kwargs) @pyqtSlot() - def send_to_selected(self, command, command_args=None): - return list(self.iterate_selected(lambda copter: copter.client.send_message(command, command_args))) + def send_to_selected(self, command, command_args=(), command_kwargs=None): + return list(self.iterate_selected(lambda copter: copter.client.send_message( + command, command_args, command_kwargs))) def new_client_connected(self, client: Client): logging.debug("Added client {}".format(client)) @@ -295,7 +296,7 @@ class MainWindow(QtWidgets.QMainWindow): for copter in self.model.user_selected(): if self.model.checks.takeoff_checks(copter): if self.ui.z_checkbox.isChecked(): - copter.client.send_message("takeoff_z", {"z": str(self.ui.z_spin.value())}) # todo int + copter.client.send_message("takeoff_z", {"z": str(self.ui.z_spin.value())}) # todo int, merge commands else: copter.client.send_message("takeoff") @@ -425,7 +426,7 @@ class MainWindow(QtWidgets.QMainWindow): @pyqtSlot() def send_aruco(self): def callback(copter): - copter.client.send_message("service_restart", {"name": "clever"}) + copter.client.send_message("service_restart", kwargs={"name": "clever"}) self.send_files("Select aruco map configuration file", "Aruco map files (*.txt)", onefile=True, client_path="/home/pi/catkin_ws/src/clever/aruco_pose/map/", @@ -465,7 +466,7 @@ class MainWindow(QtWidgets.QMainWindow): copters = self.model.user_selected() for copter in copters: - copter.client.send_message("config", {"config": data, "mode": mode.lower()}) + copter.client.send_message("config", kwargs={"config": data, "mode": mode.lower()}) @pyqtSlot() def send_any_command(self): diff --git a/messaging_lib.py b/messaging_lib.py index dab54f5..b6938dc 100644 --- a/messaging_lib.py +++ b/messaging_lib.py @@ -94,35 +94,41 @@ class MessageManager: return message @classmethod - def create_json_message(cls, contents): - message = cls.create_message(cls._json_encode(contents), "json", "message") + def create_json_message(cls, contents, additional_headers=None): + message = cls.create_message(cls._json_encode(contents), "json", "message", + additional_headers=additional_headers) return message @classmethod - def create_simple_message(cls, command, args=None): - if args is None: - args = {} - message = cls.create_json_message({"command": command, "args": args}) + def create_action_message(cls, action, args=(), kwargs=None): + if kwargs is None: + kwargs = {} + message = cls.create_json_message({"args": args, "kwargs": kwargs}, {"action": action, }) return message @classmethod - def create_request(cls, requested_value, request_id, args=None): - if args is None: - args = {} + def create_request(cls, requested_value, request_id, args=(), kwargs=None): + if kwargs is None: + kwargs = {} contents = {"requested_value": requested_value, "request_id": request_id, "args": args, + "kwargs": kwargs, } message = cls.create_message(cls._json_encode(contents), "json", "request") return message @classmethod - def create_response(cls, requested_value, request_id, value): - contents = {"requested_value": requested_value, - "request_id": request_id, - "value": value, - } - message = cls.create_message(cls._json_encode(contents), "json", "response") + def create_response(cls, requested_value, request_id, value, filetransfer=False): + headers = {"requested_value": requested_value, + "request_id": request_id, # TODO status + } + if filetransfer: + contents = value + else: + contents = cls._json_encode({"value": value, }) + message = cls.create_message(contents, "binary" if filetransfer else "json", + "response", additional_headers=headers) return message def _process_protoheader(self): @@ -171,10 +177,10 @@ class MessageManager: self._process_content() -def message_callback(string_command): +def message_callback(action_string): def inner(f): - ConnectionManager.messages_callbacks[string_command] = f - logger.debug("Registered message function {} for {}".format(f, string_command)) + ConnectionManager.messages_callbacks[action_string] = f + logger.debug("Registered message function {} for {}".format(f, action_string)) def wrapper(*args, **kwargs): return f(*args, **kwargs) @@ -340,84 +346,99 @@ class ConnectionManager(object): raise RuntimeError("Peer closed.") - def process_received(self, income_message): - message_type = income_message.jsonheader["message-type"] - content = income_message.content if message_type != "filetransfer" else income_message.content[:256] + def process_received(self, message): + message_type = message.jsonheader["message-type"] + content = message.content if message.jsonheader["content-type"] != "binary"\ + else message.content[:256] logger.debug( - "Received message! Header: {}, content: {}".format(income_message.jsonheader, content)) + "Received message! Header: {}, content: {}".format(message.jsonheader, content)) if message_type == "message": - self._process_message(income_message) + self._process_message(message) elif message_type == "response": - self._process_response(income_message) + self._process_response(message) elif message_type == "request": - self._process_request(income_message) - elif message_type == "filetransfer": - self._process_filetransfer(income_message) + self._process_request(message) def _process_message(self, message): - command = message.content["command"] + if message.jsonheader["action"] == "filetransfer": + self._process_filetransfer(message.content, message.jsonheader["filepath"]) + else: + self._process_action(message) + + def _process_action(self, message): + action = message.jsonheader["action"] args = message.content["args"] - callback = self.messages_callbacks.get(command, None) + kwargs = message.content["kwargs"] + callback = self.messages_callbacks.get(action, None) if callback is None: - logger.warning("Command {} does not exist!".format(command)) + logger.warning("Action {} does not exist!".format(action)) return try: - callback(self, **args) + callback(self, *args, **kwargs) except Exception as error: - logger.error("Error during command {} execution: {}".format(command, error)) + logger.error("Error during action {} execution: {}".format(action, error)) def _process_request(self, message): - command = message.content["requested_value"] + requested_value = message.content["requested_value"] request_id = message.content["request_id"] args = message.content["args"] - callback = self.requests_callbacks.get(command, None) + kwargs = message.content["kwargs"] + + callback = self.requests_callbacks.get(requested_value, None) if callback is None: - logger.warning("Request {} does not exist!".format(command)) + logger.warning("Request {} does not exist!".format(requested_value)) return + filetransfer = requested_value == "filetransfer" try: - value = callback(self, **args) + if filetransfer: + value = self._read_file(kwargs["filepath"]) + else: + value = callback(self, *args, **kwargs) except Exception as error: # TODO send response error\cancel - logger.error("Error during request {} processing: {}".format(command, error)) + logger.error("Error during request {} processing: {}".format(requested_value, error)) else: - self._send_response(command, request_id, value) + self._send_response(requested_value, request_id, value, filetransfer) def _process_response(self, message): - request_id, requested_value = message.content["request_id"], message.content["requested_value"] + request_id, requested_value = message.jsonheader["request_id"], message.jsonheader["requested_value"] with self._request_lock: request = self._request_queue.pop(request_id, None) + if (request is None) or (request.requested_value != requested_value): + logger.warning("Unexpected response!") + return - if (request is not None) and (request.requested_value == requested_value): - value = message.content["value"] - logger.debug( - "Request {} successfully closed with value {}".format(request, message.content["value"]) - ) - print(self, "CALLBACK", request.callback, "VAL", value, "ARGS",request.callback_args, request.callback_kwargs) - try: - request.callback(self, value, *request.callback_args, **request.callback_kwargs) - print(1) - except Exception as e: - logging.error("Error during callback call of request") # TODO more info - traceback.print_exc() - print(e) - + if requested_value == "filetransfer": + value = True + self._process_filetransfer(message.content, request.callback_kwargs["filepath"]) else: - logger.warning("Unexpected response!") + value = message.content["value"] - def _process_filetransfer(self, message): # TODO path? - if message.jsonheader["content-type"] == "binary": - filepath = message.jsonheader["filepath"] - try: - with open(filepath, 'wb') as f: - f.write(message.content) - except OSError as error: - logger.error("File {} can not be written due error: {}".format(filepath, error)) - else: - logger.info("File {} successfully received ".format(filepath)) - if self.whoami == "pi": - logger.info("Return rights to pi:pi after file transfer") - os.system("chown pi:pi {}".format(filepath)) + logger.debug( + "Request {} successfully closed with value {}".format(request, message.content["value"]) + ) + try: + request.callback(self, value, *request.callback_args, **request.callback_kwargs) + except Exception as error: + logger.error("Error during response {} processing: {}".format(request, error)) + + @staticmethod + def _read_file(filepath): + with open(filepath, mode='rb') as f: + return f.read() + + def _process_filetransfer(self, content, filepath): + try: + with open(filepath, 'wb') as f: + f.write(content) + except OSError as error: + logger.error("File {} can not be written due error: {}".format(filepath, error)) + else: + logger.info("File {} successfully received ".format(filepath)) + if self.whoami == "pi": + logger.info("Return rights to pi:pi after file transfer") + os.system("chown pi:pi {}".format(filepath)) def write(self): with self._send_lock: @@ -453,14 +474,15 @@ class ConnectionManager(object): self._set_selector_events_mask('rw') NotifierSock().notify() - def get_response(self, requested_value, callback, request_args=None, # timeout=30, - callback_args=(), callback_kwargs=None): - if request_args is None: - request_args = {} + def get_response(self, requested_value, callback, # timeout=30, + request_args=(), request_kwargs=None, + callback_args=(), callback_kwargs=None, ): + if request_kwargs is None: + request_kwargs = {} if callback_kwargs is None: callback_kwargs = {} - request_id = str(random.randint(0, 9999)).zfill(4) + request_id = str(random.randint(0, 9999)).zfill(4) # maybe hash with self._request_lock: self._request_queue[request_id] = PendingRequest( requested_value=requested_value, @@ -470,24 +492,25 @@ class ConnectionManager(object): callback_args=callback_args, callback_kwargs=callback_kwargs, request_args=request_args, + request_kwargs=request_kwargs, resend=True, ) - self._send(MessageManager.create_request(requested_value, request_id, request_args)) + self._send(MessageManager.create_request(requested_value, request_id, request_args, request_kwargs)) def _resend_requests(self): with self._request_lock: - for request_id, request in self._request_queue.items(): #TODO filter + for request_id, request in self._request_queue.items(): # TODO filter if request.resend: self._send(MessageManager.create_request( - request.requested_value, request_id, request.request_args.update(resend=request.resend)) + request.requested_value, request_id, request.request_kwargs.update(resend=request.resend)) ) request.resend = False - def send_message(self, command, args=None): - self._send(MessageManager.create_simple_message(command, args)) + def send_message(self, action, args=(), kwargs=None): + self._send(MessageManager.create_action_message(action, args, kwargs)) - def _send_response(self, requested_value, request_id, value): - self._send(MessageManager.create_response(requested_value, request_id, value)) + def _send_response(self, requested_value, request_id, value, filetransfer=False): + self._send(MessageManager.create_response(requested_value, request_id, value, filetransfer)) def send_file(self, filepath, dest_filepath): # clever_restart=False try: @@ -497,9 +520,8 @@ class ConnectionManager(object): logger.warning("File can not be opened due error: ".format(error)) else: logger.info("Sending file {} to {} (as: {})".format(filepath, self.addr, dest_filepath)) - self._send(MessageManager.create_message( - data, "binary", "filetransfer", "binary", {"filepath": dest_filepath} - )) + self._send(MessageManager.create_message(data, "binary", "message", + additional_headers={"action": "filetransfer", "filepath": dest_filepath})) class NotifierSock(Singleton):