Updated messaging. File requests + args

This commit is contained in:
Artem30801
2020-01-23 17:05:59 +03:00
parent 8920c1d2da
commit 0e02e2ee9d
5 changed files with 134 additions and 109 deletions

View File

@@ -148,18 +148,18 @@ class Client(object):
message = messaging.MessageManager()
message.income_raw = data
message.process_message()
if message.content:
if message.content and message.jsonheader["action"] == "server_ip":
logger.info("Received broadcast message {} from {}".format(message.content, addr))
if message.content["command"] == "server_ip":
args = message.content["args"]
self.config.set("SERVER", "port", int(args["port"]))
self.config.set("SERVER", "host", args["host"])
self.config.write()
logger.info("Binding to new IP: {}:{}".format(
self.config.server_host, self.config.server_port))
self.on_broadcast_bind()
break
kwargs = message.content["kwargs"]
self.config.set("SERVER", "port", int(kwargs["port"]))
self.config.set("SERVER", "host", kwargs["host"])
self.config.write()
logger.info("Binding to new IP: {}:{}".format(
self.config.server_host, self.config.server_port))
self.on_broadcast_bind()
break
finally:
broadcast_client.close()
@@ -168,6 +168,7 @@ class Client(object):
def _process_connections(self):
while True:
#self.server_connection.send_message("telemetry", kwargs={"value":{"time": time.time()}})
events = self.selector.select(timeout=1)
for key, mask in events:
@@ -226,6 +227,7 @@ def _response_id(*args, **kwargs):
if new_id is not None:
active_client.config.set("PRIVATE", "id", new_id, True)
active_client.load_config()
# TODO renaming here
return active_client.client_id

View File

@@ -123,7 +123,7 @@ def execute_command(command):
os.system(command)
def configure_chrony_ip(ip, path="/etc/chrony/chrony.conf", ip_index=1):
def configure_chrony_ip(ip, path="/etc/chrony/chrony.conf", ip_index=1): # TODO simplify
try:
with open(path, 'r') as f:
raw_content = f.read()
@@ -245,7 +245,7 @@ def _execute(*args, **kwargs):
logger.info("Executing done")
@messaging.message_callback("id")
@messaging.message_callback("id") # TODO redo
def _response_id(*args, **kwargs):
new_id = kwargs.get("new_id", None)
if new_id is not None:
@@ -828,9 +828,9 @@ class Telemetry:
self._tasks_cleared = False
self._last_state = state
def transmit_message(self):
def transmit_message(self): # todo if connected
try:
client.active_client.server_connection.send_message('telemetry', args={'value': self.create_msg_contents()})
client.active_client.server_connection.send_message('telemetry', kwargs={'value': self.create_msg_contents()})
except AttributeError as e:
logger.debug(e)

View File

@@ -185,7 +185,7 @@ class Server(messaging.Singleton):
def _ip_broadcast(self):
logging.info("Broadcast sender thread started!")
msg = messaging.MessageManager.create_simple_message(
msg = messaging.MessageManager.create_action_message(
"server_ip", {"host": self.ip, "port": str(self.config.server_port), "id": self.id,
"start_time": str(self.time_started)})
logging.debug("Formed broadcast message: {}".format(msg))
@@ -231,11 +231,11 @@ class Server(messaging.Singleton):
message.process_message()
content = message.content
right_command = (content and content["command"] == "server_ip")
right_command = (content and message.jsonheader["action"] == "server_ip")
if right_command:
different_id = content["args"]["id"] != str(self.id)
self_younger = float(message.content["args"]["start_time"]) <= self.time_started
different_id = content["kwargs"]["id"] != str(self.id)
self_younger = float(content["kwargs"]["start_time"]) <= self.time_started
if different_id and self_younger:
# younger server should shut down
@@ -253,7 +253,7 @@ class Server(messaging.Singleton):
logging.info("Broadcast listener thread stopped, socked closed!")
def send_starttime(self, copter, start_time):
copter.send_message("start", {"time": str(start_time)})
copter.send_message("start", kwargs={"time": str(start_time)})
def requires_connect(f):
@@ -356,7 +356,7 @@ class Client(messaging.ConnectionManager):
@classmethod
@requires_any_connected
def broadcast_message(cls, command, args=None, force_all=False):
cls.broadcast(messaging.MessageManager.create_simple_message(command, args), force_all)
cls.broadcast(messaging.MessageManager.create_action_message(command, args), force_all)
if __name__ == '__main__':

View File

@@ -126,9 +126,9 @@ class MainWindow(QtWidgets.QMainWindow):
self.ui.action_send_any_file.triggered.connect(self.send_any_file)
self.ui.action_send_any_command.triggered.connect(self.send_any_command)
self.ui.action_restart_clever.triggered.connect(
b_partial(self.send_to_selected, "service_restart", {"name": "clever"}))
b_partial(self.send_to_selected, "service_restart", kwargs={"name": "clever"}))
self.ui.action_restart_clever_show.triggered.connect(
b_partial(self.send_to_selected, "service_restart", {"name": "clever-show"}))
b_partial(self.send_to_selected, "service_restart", kwargs={"name": "clever-show"}))
self.ui.action_update_client_repo.triggered.connect(b_partial(self.send_to_selected, "update_repo"))
self.ui.action_reboot_all.triggered.connect(b_partial(self.send_to_selected, "reboot_all"))
self.ui.action_set_start_to_current_position.triggered.connect(b_partial(self.send_to_selected, "move_start"))
@@ -200,8 +200,9 @@ class MainWindow(QtWidgets.QMainWindow):
yield f(copter, *args, **kwargs)
@pyqtSlot()
def send_to_selected(self, command, command_args=None):
return list(self.iterate_selected(lambda copter: copter.client.send_message(command, command_args)))
def send_to_selected(self, command, command_args=(), command_kwargs=None):
return list(self.iterate_selected(lambda copter: copter.client.send_message(
command, command_args, command_kwargs)))
def new_client_connected(self, client: Client):
logging.debug("Added client {}".format(client))
@@ -295,7 +296,7 @@ class MainWindow(QtWidgets.QMainWindow):
for copter in self.model.user_selected():
if self.model.checks.takeoff_checks(copter):
if self.ui.z_checkbox.isChecked():
copter.client.send_message("takeoff_z", {"z": str(self.ui.z_spin.value())}) # todo int
copter.client.send_message("takeoff_z", {"z": str(self.ui.z_spin.value())}) # todo int, merge commands
else:
copter.client.send_message("takeoff")
@@ -425,7 +426,7 @@ class MainWindow(QtWidgets.QMainWindow):
@pyqtSlot()
def send_aruco(self):
def callback(copter):
copter.client.send_message("service_restart", {"name": "clever"})
copter.client.send_message("service_restart", kwargs={"name": "clever"})
self.send_files("Select aruco map configuration file", "Aruco map files (*.txt)", onefile=True,
client_path="/home/pi/catkin_ws/src/clever/aruco_pose/map/",
@@ -465,7 +466,7 @@ class MainWindow(QtWidgets.QMainWindow):
copters = self.model.user_selected()
for copter in copters:
copter.client.send_message("config", {"config": data, "mode": mode.lower()})
copter.client.send_message("config", kwargs={"config": data, "mode": mode.lower()})
@pyqtSlot()
def send_any_command(self):

View File

@@ -94,35 +94,41 @@ class MessageManager:
return message
@classmethod
def create_json_message(cls, contents):
message = cls.create_message(cls._json_encode(contents), "json", "message")
def create_json_message(cls, contents, additional_headers=None):
message = cls.create_message(cls._json_encode(contents), "json", "message",
additional_headers=additional_headers)
return message
@classmethod
def create_simple_message(cls, command, args=None):
if args is None:
args = {}
message = cls.create_json_message({"command": command, "args": args})
def create_action_message(cls, action, args=(), kwargs=None):
if kwargs is None:
kwargs = {}
message = cls.create_json_message({"args": args, "kwargs": kwargs}, {"action": action, })
return message
@classmethod
def create_request(cls, requested_value, request_id, args=None):
if args is None:
args = {}
def create_request(cls, requested_value, request_id, args=(), kwargs=None):
if kwargs is None:
kwargs = {}
contents = {"requested_value": requested_value,
"request_id": request_id,
"args": args,
"kwargs": kwargs,
}
message = cls.create_message(cls._json_encode(contents), "json", "request")
return message
@classmethod
def create_response(cls, requested_value, request_id, value):
contents = {"requested_value": requested_value,
"request_id": request_id,
"value": value,
}
message = cls.create_message(cls._json_encode(contents), "json", "response")
def create_response(cls, requested_value, request_id, value, filetransfer=False):
headers = {"requested_value": requested_value,
"request_id": request_id, # TODO status
}
if filetransfer:
contents = value
else:
contents = cls._json_encode({"value": value, })
message = cls.create_message(contents, "binary" if filetransfer else "json",
"response", additional_headers=headers)
return message
def _process_protoheader(self):
@@ -171,10 +177,10 @@ class MessageManager:
self._process_content()
def message_callback(string_command):
def message_callback(action_string):
def inner(f):
ConnectionManager.messages_callbacks[string_command] = f
logger.debug("Registered message function {} for {}".format(f, string_command))
ConnectionManager.messages_callbacks[action_string] = f
logger.debug("Registered message function {} for {}".format(f, action_string))
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
@@ -340,84 +346,99 @@ class ConnectionManager(object):
raise RuntimeError("Peer closed.")
def process_received(self, income_message):
message_type = income_message.jsonheader["message-type"]
content = income_message.content if message_type != "filetransfer" else income_message.content[:256]
def process_received(self, message):
message_type = message.jsonheader["message-type"]
content = message.content if message.jsonheader["content-type"] != "binary"\
else message.content[:256]
logger.debug(
"Received message! Header: {}, content: {}".format(income_message.jsonheader, content))
"Received message! Header: {}, content: {}".format(message.jsonheader, content))
if message_type == "message":
self._process_message(income_message)
self._process_message(message)
elif message_type == "response":
self._process_response(income_message)
self._process_response(message)
elif message_type == "request":
self._process_request(income_message)
elif message_type == "filetransfer":
self._process_filetransfer(income_message)
self._process_request(message)
def _process_message(self, message):
command = message.content["command"]
if message.jsonheader["action"] == "filetransfer":
self._process_filetransfer(message.content, message.jsonheader["filepath"])
else:
self._process_action(message)
def _process_action(self, message):
action = message.jsonheader["action"]
args = message.content["args"]
callback = self.messages_callbacks.get(command, None)
kwargs = message.content["kwargs"]
callback = self.messages_callbacks.get(action, None)
if callback is None:
logger.warning("Command {} does not exist!".format(command))
logger.warning("Action {} does not exist!".format(action))
return
try:
callback(self, **args)
callback(self, *args, **kwargs)
except Exception as error:
logger.error("Error during command {} execution: {}".format(command, error))
logger.error("Error during action {} execution: {}".format(action, error))
def _process_request(self, message):
command = message.content["requested_value"]
requested_value = message.content["requested_value"]
request_id = message.content["request_id"]
args = message.content["args"]
callback = self.requests_callbacks.get(command, None)
kwargs = message.content["kwargs"]
callback = self.requests_callbacks.get(requested_value, None)
if callback is None:
logger.warning("Request {} does not exist!".format(command))
logger.warning("Request {} does not exist!".format(requested_value))
return
filetransfer = requested_value == "filetransfer"
try:
value = callback(self, **args)
if filetransfer:
value = self._read_file(kwargs["filepath"])
else:
value = callback(self, *args, **kwargs)
except Exception as error: # TODO send response error\cancel
logger.error("Error during request {} processing: {}".format(command, error))
logger.error("Error during request {} processing: {}".format(requested_value, error))
else:
self._send_response(command, request_id, value)
self._send_response(requested_value, request_id, value, filetransfer)
def _process_response(self, message):
request_id, requested_value = message.content["request_id"], message.content["requested_value"]
request_id, requested_value = message.jsonheader["request_id"], message.jsonheader["requested_value"]
with self._request_lock:
request = self._request_queue.pop(request_id, None)
if (request is None) or (request.requested_value != requested_value):
logger.warning("Unexpected response!")
return
if (request is not None) and (request.requested_value == requested_value):
value = message.content["value"]
logger.debug(
"Request {} successfully closed with value {}".format(request, message.content["value"])
)
print(self, "CALLBACK", request.callback, "VAL", value, "ARGS",request.callback_args, request.callback_kwargs)
try:
request.callback(self, value, *request.callback_args, **request.callback_kwargs)
print(1)
except Exception as e:
logging.error("Error during callback call of request") # TODO more info
traceback.print_exc()
print(e)
if requested_value == "filetransfer":
value = True
self._process_filetransfer(message.content, request.callback_kwargs["filepath"])
else:
logger.warning("Unexpected response!")
value = message.content["value"]
def _process_filetransfer(self, message): # TODO path?
if message.jsonheader["content-type"] == "binary":
filepath = message.jsonheader["filepath"]
try:
with open(filepath, 'wb') as f:
f.write(message.content)
except OSError as error:
logger.error("File {} can not be written due error: {}".format(filepath, error))
else:
logger.info("File {} successfully received ".format(filepath))
if self.whoami == "pi":
logger.info("Return rights to pi:pi after file transfer")
os.system("chown pi:pi {}".format(filepath))
logger.debug(
"Request {} successfully closed with value {}".format(request, message.content["value"])
)
try:
request.callback(self, value, *request.callback_args, **request.callback_kwargs)
except Exception as error:
logger.error("Error during response {} processing: {}".format(request, error))
@staticmethod
def _read_file(filepath):
with open(filepath, mode='rb') as f:
return f.read()
def _process_filetransfer(self, content, filepath):
try:
with open(filepath, 'wb') as f:
f.write(content)
except OSError as error:
logger.error("File {} can not be written due error: {}".format(filepath, error))
else:
logger.info("File {} successfully received ".format(filepath))
if self.whoami == "pi":
logger.info("Return rights to pi:pi after file transfer")
os.system("chown pi:pi {}".format(filepath))
def write(self):
with self._send_lock:
@@ -453,14 +474,15 @@ class ConnectionManager(object):
self._set_selector_events_mask('rw')
NotifierSock().notify()
def get_response(self, requested_value, callback, request_args=None, # timeout=30,
callback_args=(), callback_kwargs=None):
if request_args is None:
request_args = {}
def get_response(self, requested_value, callback, # timeout=30,
request_args=(), request_kwargs=None,
callback_args=(), callback_kwargs=None, ):
if request_kwargs is None:
request_kwargs = {}
if callback_kwargs is None:
callback_kwargs = {}
request_id = str(random.randint(0, 9999)).zfill(4)
request_id = str(random.randint(0, 9999)).zfill(4) # maybe hash
with self._request_lock:
self._request_queue[request_id] = PendingRequest(
requested_value=requested_value,
@@ -470,24 +492,25 @@ class ConnectionManager(object):
callback_args=callback_args,
callback_kwargs=callback_kwargs,
request_args=request_args,
request_kwargs=request_kwargs,
resend=True,
)
self._send(MessageManager.create_request(requested_value, request_id, request_args))
self._send(MessageManager.create_request(requested_value, request_id, request_args, request_kwargs))
def _resend_requests(self):
with self._request_lock:
for request_id, request in self._request_queue.items(): #TODO filter
for request_id, request in self._request_queue.items(): # TODO filter
if request.resend:
self._send(MessageManager.create_request(
request.requested_value, request_id, request.request_args.update(resend=request.resend))
request.requested_value, request_id, request.request_kwargs.update(resend=request.resend))
)
request.resend = False
def send_message(self, command, args=None):
self._send(MessageManager.create_simple_message(command, args))
def send_message(self, action, args=(), kwargs=None):
self._send(MessageManager.create_action_message(action, args, kwargs))
def _send_response(self, requested_value, request_id, value):
self._send(MessageManager.create_response(requested_value, request_id, value))
def _send_response(self, requested_value, request_id, value, filetransfer=False):
self._send(MessageManager.create_response(requested_value, request_id, value, filetransfer))
def send_file(self, filepath, dest_filepath): # clever_restart=False
try:
@@ -497,9 +520,8 @@ class ConnectionManager(object):
logger.warning("File can not be opened due error: ".format(error))
else:
logger.info("Sending file {} to {} (as: {})".format(filepath, self.addr, dest_filepath))
self._send(MessageManager.create_message(
data, "binary", "filetransfer", "binary", {"filepath": dest_filepath}
))
self._send(MessageManager.create_message(data, "binary", "message",
additional_headers={"action": "filetransfer", "filepath": dest_filepath}))
class NotifierSock(Singleton):