diff --git a/Drone/client.py b/Drone/client.py index 920f089..f0f8412 100644 --- a/Drone/client.py +++ b/Drone/client.py @@ -1,358 +1,154 @@ -from __future__ import print_function -import os -import sys -import socket -import struct -import random import time import errno -import json +import random +import socket +import struct import logging -import threading import ConfigParser from contextlib import closing -import rospy -import pause - -from FlightLib import FlightLib -from FlightLib import LedLib - -import play_animation +from messaging_lib import Message random.seed() logging.basicConfig( # TODO all prints as logs - level=logging.INFO, + level=logging.DEBUG, # INFO format="%(asctime)s [%(name)-7.7s] [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s", handlers=[ logging.FileHandler("client_logs.log"), logging.StreamHandler() ]) -NTP_PACKET_FORMAT = "!12I" -NTP_DELTA = 2208988800L # 1970-01-01 00:00:00 -NTP_QUERY = '\x1b' + 47 * '\0' + +class Client: + def __init__(self, config_path="client_config.ini"): + self.client_socket = None + self.server_host = None + self.server_port = None + self.broadcast_port = None + + self.connected = False + self.client_id = None + + # Init configs + self.config_path = config_path + self.config = ConfigParser.ConfigParser() + self.load_config() + + def load_config(self): + self.config.read(self.config_path) + + self.broadcast_port = self.config.getint('SERVER', 'broadcast_port') + self.server_port = self.config.getint('SERVER', 'port') + self.server_host = self.config.get('SERVER', 'host') + self.BUFFER_SIZE = self.config.getint('SERVER', 'buffer_size') + self.USE_NTP = self.config.getboolean('NTP', 'use_ntp') + self.NTP_HOST = self.config.get('NTP', 'host') + self.NTP_PORT = self.config.getint('NTP', 'port') + + files_directory = self.config.get('FILETRANSFER', 'files_directory') + + #FRAME_ID = self.config.get('COPTERS', 'frame_id') # TODO in play_animation + #self.TAKEOFF_HEIGHT = self.config.getfloat('COPTERS', 'takeoff_height') + #self.TAKEOFF_TIME = self.config.getfloat('COPTERS', 'takeoff_time') + #self.RFP_TIME = self.config.getfloat('COPTERS', 'reach_first_point_time') + #self.SAFE_TAKEOFF = self.config.getboolean('COPTERS', 'safe_takeoff') + + #self.X0_COMMON = self.config.getfloat('COPTERS', 'x0_common') + #self.Y0_COMMON = self.config.getfloat('COPTERS', 'y0_common') + #self.X0 = self.config.getfloat('PRIVATE', 'x0') + #self.Y0 = self.config.getfloat('PRIVATE', 'y0') + + #self.USE_LEDS = config.getboolean('PRIVATE', 'use_leds') + #play_animation.USE_LEDS = USE_LEDS # TODO in copter_client + + self.client_id = self.config.get('PRIVATE', 'id') + if self.client_id == 'default': + client_id = 'copter' + str(random.randrange(9999)).zfill(4) + #write_to_config('PRIVATE', 'id', client_id) + elif self.client_id == '/hostname': + self.client_id = socket.gethostname() + + @staticmethod + def get_ntp_time(ntp_host, ntp_port): + NTP_PACKET_FORMAT = "!12I" + NTP_DELTA = 2208988800L # 1970-01-01 00:00:00 + NTP_QUERY = '\x1b' + 47 * '\0' + + with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s: + s.sendto(bytes(NTP_QUERY), (ntp_host, ntp_port)) + msg, address = s.recvfrom(1024) + unpacked = struct.unpack(NTP_PACKET_FORMAT, msg[0:struct.calcsize(NTP_PACKET_FORMAT)]) + return unpacked[10] + float(unpacked[11]) / 2 ** 32 - NTP_DELTA -def get_ntp_time(ntp_host, ntp_port): - with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s: - s.sendto(NTP_QUERY, (ntp_host, ntp_port)) - msg, address = s.recvfrom(1024) - unpacked = struct.unpack(NTP_PACKET_FORMAT, msg[0:struct.calcsize(NTP_PACKET_FORMAT)]) - return unpacked[10] + float(unpacked[11]) / 2**32 - NTP_DELTA - - -def reconnect(timeout=2, attempt_limit=5): - global clientSocket, host, port - print("Trying to connect to", host, ":", port, "...") - connected = False - attempt_count = 0 - while not connected: - print("Waiting for connection, attempt", attempt_count) - try: - clientSocket = socket.socket() - clientSocket.settimeout(timeout) - clientSocket.connect((host, port)) - connected = True - print("Connection successful") - clientSocket.settimeout(None) - except socket.error as e: - if e.errno != errno.EINTR: - print("Waiting for connection, can not connect:", e) - time.sleep(timeout) - else: - print("Shutting down on keyboard interrupt") - raise KeyboardInterrupt - attempt_count += 1 - - if attempt_count >= attempt_limit: - print("Too many attempts. Trying to get new server IP") - broadcast_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - broadcast_client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - broadcast_client.bind(("", broadcast_port)) - while True: - data, addr = broadcast_client.recvfrom(1024) - print("Received broadcast message %s from %s" % (data, addr)) - command, args = parse_message(data.decode("UTF-8")) - if command == "server_ip": - host, port = args["host"], int(args["port"]) - print("Binding to new IP: ", host, port) - broadcast_client.close() - write_to_config("SERVER", "port", port) - write_to_config("SERVER", "host", host) - attempt_count = 0 - break - -def send_all(msg): - clientSocket.sendall(struct.pack('>I', len(msg)) + msg) - - -def recive_all(n): - data = b'' - while len(data) < n: - packet = clientSocket.recv(min(n - len(data), BUFFER_SIZE)) - print("Receiving packet {}; full data is {}".format(packet, data)) - if not packet: - return None - data += packet - return data - - -def recive_message(): - raw_msglen = recive_all(4) - if not raw_msglen: - print("No valid msg") - return None - msglen = struct.unpack('>I', raw_msglen)[0] - msg = recive_all(msglen) - return msg - - -def form_message(str_command, dict_arguments): - msg_dict = {str_command: dict_arguments} - msg = json.dumps(msg_dict) - return msg - - -def parse_message(msg): - try: - j_message = json.loads(msg) - except ValueError: - print("Json string not in correct format") - return None, None - str_command = list(j_message.keys())[0] - dict_arguments = list(j_message.values())[0] - - return str_command, dict_arguments - - -def recive_file(filename): - print("Receiving file:", filename) - with open(filename, 'wb') as file: # TODO add directory - while True: - data = recive_message() #clientSocket.recv(BUFFER_SIZE) - if data: - print(data) - if parse_message(data.decode("UTF-8"))[0] == "/endoffile": - print("File received") - break - file.write(data) + def reconnect(self, timeout=2, attempt_limit=5): + logging.info("Trying to connect to {}:{} ...".format(self.server_host, self.server_port)) + attempt_count = 0 + while not self.connected: + logging.info("Waiting for connection, attempt {}".format(attempt_count)) + try: + self.client_socket = socket.socket() + self.client_socket.settimeout(timeout) + self.client_socket.connect((self.server_host, self.server_port)) + except socket.error as error: + if error.errno != errno.EINTR: + logging.warning("Can not connect due error: {}".format(error)) + attempt_count += 1 + time.sleep(timeout) + else: + logging.critical("Shutting down on keyboard interrupt") + raise KeyboardInterrupt else: + self.connected = True + #self.client_socket.settimeout(None) + self.client_socket.setblocking(False) + logging.info("Connection to server successful!") break - -def animation_player(running_event, stop_event): - print("Animation thread activated") - frames = play_animation.read_animation_file() - if not frames: - logging.error("Animation is empty, shutting down animation player") - return - - delay_time = 0.125 - - print("Takeoff") - play_animation.takeoff(z=TAKEOFF_HEIGHT, safe_takeoff=SAFE_TAKEOFF) - takeoff_time = starttime + TAKEOFF_TIME - dt = takeoff_time - time.time() - print("Wait until takeoff " + str(dt) + "s: " + time.ctime(takeoff_time)) - pause.until(takeoff_time) - - print("Reach first point") - play_animation.reach_frame(frames[0], x0=X0+X0_COMMON, y0=Y0+Y0_COMMON) #Reach first point at the same time with others - rfp_time = takeoff_time + RFP_TIME - dt = rfp_time - time.time() - print("Wait reaching first point " + str(dt) + "s: " + time.ctime(rfp_time)) - pause.until(rfp_time) - - next_frame_time = rfp_time - print("Start animation at " + str(time.time())) - for frame in frames: - #running_event.wait() - play_animation.animate_frame(frame, x0=X0+X0_COMMON, y0=Y0+Y0_COMMON) - next_frame_time += delay_time - if stop_event.is_set(): - running_animation_event.clear() - break - #rate.sleep() - pause.until(next_frame_time) - else: - play_animation.land() - print("Animation ended") - print("Animation thread closed") + if attempt_count >= attempt_limit: + logging.info("Too many attempts. Trying to get new server IP") + self.broadcast_bind() + attempt_count = 0 -stop_animation_event = threading.Event() -running_animation_event = threading.Event() - - -def start_animation(*args, **kwargs): - animation_thread = threading.Thread(target=animation_player, args=(running_animation_event, stop_animation_event)) - print("Starting animation!") - running_animation_event.set() - stop_animation_event.clear() - animation_thread.start() - - -def resume_animation(): - print("Resuming animation") - running_animation_event.set() - - -def pause_animation(): - print("Pausing animation") - running_animation_event.clear() - - -def stop_animation(): - stop_animation_event.set() - print("Stopping animation") -# animation_thread.join() - - -def selfcheck(): - return FlightLib.selfcheck() - - -def write_to_config(section, option, value): - config.set(section, option, value) - with open(CONFIG_PATH, 'w') as file: # TODO as separate function - config.write(file) - - -def load_config(): - global config, CONFIG_PATH - global broadcast_port, port, host, BUFFER_SIZE - global USE_NTP, NTP_HOST, NTP_PORT - global files_directory, animation_file - global FRAME_ID, TAKEOFF_HEIGHT, TAKEOFF_TIME, SAFE_TAKEOFF, RFP_TIME - global USE_LEDS, COPTER_ID - global X0, X0_COMMON, Y0, Y0_COMMON - CONFIG_PATH = "client_config.ini" - config = ConfigParser.ConfigParser() - config.read(CONFIG_PATH) - - broadcast_port = config.getint('SERVER', 'broadcast_port') - port = config.getint('SERVER', 'port') - host = config.get('SERVER', 'host') - BUFFER_SIZE = config.getint('SERVER', 'buffer_size') - USE_NTP = config.getboolean('NTP', 'use_ntp') - NTP_HOST = config.get('NTP', 'host') - NTP_PORT = config.getint('NTP', 'port') - - files_directory = config.get('FILETRANSFER', 'files_directory') - animation_file = config.get('FILETRANSFER', 'animation_file') - - FRAME_ID = config.get('COPTERS', 'frame_id') # TODO in play_animation - TAKEOFF_HEIGHT = config.getfloat('COPTERS', 'takeoff_height') - TAKEOFF_TIME = config.getfloat('COPTERS', 'takeoff_time') - RFP_TIME = config.getfloat('COPTERS', 'reach_first_point_time') - SAFE_TAKEOFF = config.getboolean('COPTERS', 'safe_takeoff') - - X0_COMMON = config.getfloat('COPTERS', 'x0_common') - Y0_COMMON = config.getfloat('COPTERS', 'y0_common') - X0 = config.getfloat('PRIVATE', 'x0') - Y0 = config.getfloat('PRIVATE', 'y0') - - USE_LEDS = config.getboolean('PRIVATE', 'use_leds') - play_animation.USE_LEDS = USE_LEDS - - COPTER_ID = config.get('PRIVATE', 'id') - if COPTER_ID == 'default': - COPTER_ID = 'copter' + str(random.randrange(9999)).zfill(4) - write_to_config('PRIVATE', 'id', COPTER_ID) - elif COPTER_ID == '/hostname': - COPTER_ID = socket.gethostname() - -load_config() - -rospy.init_node('Swarm_client', anonymous=True) -if USE_LEDS: - LedLib.init_led() - -print("Client started on copter:", COPTER_ID) -if USE_NTP: - print("NTP time:", time.ctime(get_ntp_time(NTP_HOST, NTP_PORT))) -print("System time", time.ctime(time.time())) - -reconnect() - -print("Connected to server") - -try: - while True: + def broadcast_bind(self): + broadcast_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + broadcast_client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + broadcast_client.bind(("", self.broadcast_port)) try: - message = recive_message() - if message: - message = message.decode("UTF-8") - command, args = parse_message(message) - print("Command from server:", command, args) - if command == "writefile": - recive_file(args['filename']) - if bool(args['clever_restart']): - os.system("systemctl restart clever") - elif command == 'config_write': - write_to_config(args['section'], args['option'], args['value']) - elif command == 'config_reload': - load_config() - elif command == "starttime": - global starttime - starttime = float(args['time']) - print("Starting on:", time.ctime(starttime)) - dt = starttime - time.time() - if USE_NTP: - dt = starttime - get_ntp_time(NTP_HOST, NTP_PORT) - print("Until start:", dt) - rospy.Timer(rospy.Duration(dt), start_animation, oneshot=True) - elif command == 'takeoff': - play_animation.takeoff(safe_takeoff=SAFE_TAKEOFF) - elif command == 'pause': - pause_animation() - elif command == 'resume': - resume_animation() - elif command == 'stop': - stop_animation() - FlightLib.interrupt() - elif command == 'land': - play_animation.land() - elif command == 'disarm': - FlightLib.arming(False) - elif command == 'led_test': - LedLib.fill(255, 255, 255) - time.sleep(2) - LedLib.off() + while True: + data, addr = broadcast_client.recvfrom(1024) + message = Message() + message.income_raw = data + message.process_message() + if message.content: + logging.info("Received broadcast message {} from {}".format(message.content, addr)) + if message.content["command"] == "server_ip": + args = message.content["args"] + self.server_host = args["host"] + self.server_port = int(args["port"]) + logging.info("Binding to new IP: {}:{}".format(self.server_host, self.server_port)) + #write_to_config("SERVER", "port", port) + #write_to_config("SERVER", "host", host) # TODO + break + finally: + broadcast_client.close() - elif command == 'request': - request_target = args['value'] - print("Got request for:", request_target) - response = "" - if request_target == 'test': - response = "test_success" - elif request_target == 'id': - response = COPTER_ID - elif request_target == 'selfcheck': - check = FlightLib.selfcheck() - response = check if check else "OK" - elif request_target == 'batt_voltage': - response = FlightLib.get_telemetry('body').voltage - elif request_target == 'cell_voltage': - response = FlightLib.get_telemetry('body').cell_voltage + def mainloop(self): + while True: + #''' + message = Message() + print("Recieving message:") + while not message.content: + message.income_raw += self.client_socket.recv(1) + print(message.income_raw) + message.process_message() + print(message.content) + # ''' - send_all(bytes(form_message("response", - {"status": "ok", "value": response, "value_name": str(request_target)}))) - print("Request responded with:", response) - - except socket.error as e: - if e.errno != errno.EINTR: - print("Connection lost due error:", e) - print("Reconnecting...") - reconnect() - print("Re-connection successful") - else: - print("Interrupted") - raise KeyboardInterrupt -except KeyboardInterrupt: - print("Shutdown on keyboard interrupt") -finally: - clientSocket.close() +if __name__ == "__main__": + client = Client() + client.reconnect() + client.mainloop() \ No newline at end of file diff --git a/Drone/copter_client.py b/Drone/copter_client.py new file mode 100644 index 0000000..d7c8fd6 --- /dev/null +++ b/Drone/copter_client.py @@ -0,0 +1,252 @@ +from __future__ import print_function +import os +import sys +import socket +import struct +import random +import time +import errno +import json +import logging +import threading +import ConfigParser + +import rospy +import pause + +from .. import messaging_lib + +from FlightLib import FlightLib +from FlightLib import LedLib + +import play_animation + +def recive_file(filename): + print("Receiving file:", filename) + with open(filename, 'wb') as file: # TODO add directory + while True: + data = recive_message() #clientSocket.recv(BUFFER_SIZE) + if data: + print(data) + if parse_message(data.decode("UTF-8"))[0] == "/endoffile": + print("File received") + break + file.write(data) + else: + break + + +def animation_player(running_event, stop_event): + print("Animation thread activated") + frames = play_animation.read_animation_file() + if not frames: + logging.error("Animation is empty, shutting down animation player") + return + + delay_time = 0.125 + + print("Takeoff") + play_animation.takeoff(z=TAKEOFF_HEIGHT, safe_takeoff=SAFE_TAKEOFF) + takeoff_time = starttime + TAKEOFF_TIME + dt = takeoff_time - time.time() + print("Wait until takeoff " + str(dt) + "s: " + time.ctime(takeoff_time)) + pause.until(takeoff_time) + + print("Reach first point") + play_animation.reach_frame(frames[0], x0=X0+X0_COMMON, y0=Y0+Y0_COMMON) #Reach first point at the same time with others + rfp_time = takeoff_time + RFP_TIME + dt = rfp_time - time.time() + print("Wait reaching first point " + str(dt) + "s: " + time.ctime(rfp_time)) + pause.until(rfp_time) + + next_frame_time = rfp_time + print("Start animation at " + str(time.time())) + for frame in frames: + running_event.wait() + play_animation.animate_frame(frame, x0=X0+X0_COMMON, y0=Y0+Y0_COMMON) + next_frame_time += delay_time + if stop_event.is_set(): + running_animation_event.clear() + break + #rate.sleep() + pause.until(next_frame_time) + else: + play_animation.land() + print("Animation ended") + print("Animation thread closed") + + +stop_animation_event = threading.Event() +running_animation_event = threading.Event() + + +def start_animation(*args, **kwargs): + animation_thread = threading.Thread(target=animation_player, args=(running_animation_event, stop_animation_event)) + print("Starting animation!") + running_animation_event.set() + stop_animation_event.clear() + animation_thread.start() + + +def resume_animation(): + print("Resuming animation") + running_animation_event.set() + + +def pause_animation(): + print("Pausing animation") + running_animation_event.clear() + + +def stop_animation(): + stop_animation_event.set() + print("Stopping animation") +# animation_thread.join() + + +def selfcheck(): + return FlightLib.selfcheck() + + +def write_to_config(section, option, value): + config.set(section, option, value) + with open(CONFIG_PATH, 'w') as file: # TODO as separate function + config.write(file) + + +def load_config(): + global config, CONFIG_PATH + global broadcast_port, port, host, BUFFER_SIZE + global USE_NTP, NTP_HOST, NTP_PORT + global files_directory, animation_file + global FRAME_ID, TAKEOFF_HEIGHT, TAKEOFF_TIME, SAFE_TAKEOFF, RFP_TIME + global USE_LEDS, COPTER_ID + global X0, X0_COMMON, Y0, Y0_COMMON + CONFIG_PATH = "client_config.ini" + config = ConfigParser.ConfigParser() + config.read(CONFIG_PATH) + + broadcast_port = config.getint('SERVER', 'broadcast_port') + port = config.getint('SERVER', 'port') + host = config.get('SERVER', 'host') + BUFFER_SIZE = config.getint('SERVER', 'buffer_size') + USE_NTP = config.getboolean('NTP', 'use_ntp') + NTP_HOST = config.get('NTP', 'host') + NTP_PORT = config.getint('NTP', 'port') + + files_directory = config.get('FILETRANSFER', 'files_directory') + animation_file = config.get('FILETRANSFER', 'animation_file') + + FRAME_ID = config.get('COPTERS', 'frame_id') # TODO in play_animation + TAKEOFF_HEIGHT = config.getfloat('COPTERS', 'takeoff_height') + TAKEOFF_TIME = config.getfloat('COPTERS', 'takeoff_time') + RFP_TIME = config.getfloat('COPTERS', 'reach_first_point_time') + SAFE_TAKEOFF = config.getboolean('COPTERS', 'safe_takeoff') + + X0_COMMON = config.getfloat('COPTERS', 'x0_common') + Y0_COMMON = config.getfloat('COPTERS', 'y0_common') + X0 = config.getfloat('PRIVATE', 'x0') + Y0 = config.getfloat('PRIVATE', 'y0') + + USE_LEDS = config.getboolean('PRIVATE', 'use_leds') + play_animation.USE_LEDS = USE_LEDS + + COPTER_ID = config.get('PRIVATE', 'id') + if COPTER_ID == 'default': + COPTER_ID = 'copter' + str(random.randrange(9999)).zfill(4) + write_to_config('PRIVATE', 'id', COPTER_ID) + elif COPTER_ID == '/hostname': + COPTER_ID = socket.gethostname() + +load_config() + +rospy.init_node('Swarm_client', anonymous=True) +if USE_LEDS: + LedLib.init_led() + +print("Client started on copter:", COPTER_ID) +if USE_NTP: + print("NTP time:", time.ctime(get_ntp_time(NTP_HOST, NTP_PORT))) +print("System time", time.ctime(time.time())) + +reconnect() + +print("Connected to server") + +try: + while True: + try: + message = recive_message() + if message: + message = message.decode("UTF-8") + command, args = parse_message(message) + print("Command from server:", command, args) + if command == "writefile": + recive_file(args['filename']) + if bool(args['clever_restart']): + os.system("systemctl restart clever") + elif command == 'config_write': + write_to_config(args['section'], args['option'], args['value']) + elif command == 'config_reload': + load_config() + elif command == "starttime": + global starttime + starttime = float(args['time']) + print("Starting on:", time.ctime(starttime)) + dt = starttime - time.time() + if USE_NTP: + dt = starttime - get_ntp_time(NTP_HOST, NTP_PORT) + print("Until start:", dt) + rospy.Timer(rospy.Duration(dt), start_animation, oneshot=True) + elif command == 'takeoff': + play_animation.takeoff(safe_takeoff=SAFE_TAKEOFF) + elif command == 'pause': + pause_animation() + elif command == 'resume': + resume_animation() + elif command == 'stop': + stop_animation() + FlightLib.interrupt() + elif command == 'land': + play_animation.land() + elif command == 'disarm': + FlightLib.arming(False) + elif command == 'led_test': + LedLib.fill(255, 255, 255) + time.sleep(2) + LedLib.off() + + elif command == 'request': + request_target = args['value'] + print("Got request for:", request_target) + response = "" + if request_target == 'test': + response = "test_success" + elif request_target == 'id': + response = COPTER_ID + elif request_target == 'selfcheck': + check = FlightLib.selfcheck() + response = check if check else "OK" + elif request_target == 'batt_voltage': + response = FlightLib.get_telemetry('body').voltage + elif request_target == 'cell_voltage': + response = FlightLib.get_telemetry('body').cell_voltage + + send_all(bytes(form_message("response", + {"status": "ok", "value": response, "value_name": str(request_target)}))) + print("Request responded with:", response) + + except socket.error as e: + if e.errno != errno.EINTR: + print("Connection lost due error:", e) + print("Reconnecting...") + reconnect() + print("Re-connection successful") + else: + print("Interrupted") + raise KeyboardInterrupt +except KeyboardInterrupt: + print("Shutdown on keyboard interrupt") +finally: + clientSocket.close() + diff --git a/Server/server.py b/Server/server.py index 69ed0d6..52bcfdf 100644 --- a/Server/server.py +++ b/Server/server.py @@ -1,44 +1,41 @@ -import os import sys -import math import time -import json -import struct import socket import random import logging import threading +import selectors import collections import configparser +from messaging_lib import Message + # All imports sorted in pyramid just because random.seed() logging.basicConfig( # TODO all prints as logs - level=logging.INFO, + level=logging.DEBUG, format="%(asctime)s [%(name)-7.7s] [%(threadName)-19.19s] [%(levelname)-7.7s] %(message)s", handlers=[ logging.FileHandler("server_logs.log"), logging.StreamHandler() ]) - -class ConfigOption: - def __init__(self, section, option, value): - self.section = section - self.option = option - self.value = value +ConfigOption = collections.namedtuple("ConfigOption", ["section", "option", "value"]) +PendingRequest = collections.namedtuple("PendingRequest", ["value", "requested_value", # "expires_on", + "callback", "callback_args", "callback_kwargs" + ]) class Server: BUFFER_SIZE = 1024 def __init__(self, server_id=None, config_path="server_config.ini"): - self.id = server_id if server_id else str(random.randint(0, 9999)).zfill(4) # Init socket + self.sel = selectors.DefaultSelector() self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.host = socket.gethostname() @@ -47,13 +44,12 @@ class Server: # Init configs self.config_path = config_path self.config = configparser.ConfigParser() - self.config.read(self.config_path) self.load_config() # Init threads - self.autoconnect_thread = threading.Thread(target=self._auto_connect, daemon=True, - name='Client auto-connect') - self.autoconnect_thread_running = threading.Event() # Can be used for manual thread killing + self.autoconnect_thread = threading.Thread(target=self._client_processor, daemon=True, + name='Client processor') + self.client_processor_thread_running = threading.Event() # Can be used for manual thread killing self.broadcast_thread = threading.Thread(target=self._ip_broadcast, daemon=True, name='IP broadcast sender') @@ -64,7 +60,8 @@ class Server: self.listener_thread_running = threading.Event() def load_config(self): - self.port = int(self.config['SERVER']['port']) + self.config.read(self.config_path) + self.port = int(self.config['SERVER']['port']) # TODO try, init def self.broadcast_port = int(self.config['SERVER']['broadcast_port']) self.BROADCAST_DELAY = int(self.config['SERVER']['broadcast_delay']) Server.BUFFER_SIZE = int(self.config['SERVER']['buffer_size']) @@ -74,12 +71,12 @@ class Server: self.NTP_PORT = int(self.config['NTP']['port']) def start(self): # do_auto_connect=True, do_ip_broadcast=True, do_listen_broadcast=False - logging.info("Starting server with id: {} on {} !".format(self.id, self.ip)) + logging.info("Starting server with id: {} on {}:{} !".format(self.id, self.ip, self.port)) logging.info("Starting server socket!") self.server_socket.bind((self.ip, self.port)) - logging.info("Starting client autoconnect thread!") - self.autoconnect_thread_running.set() + logging.info("Starting client processor thread!") + self.client_processor_thread_running.set() self.autoconnect_thread.start() logging.info("Starting broadcast sender thread!") @@ -92,10 +89,11 @@ class Server: def stop(self): logging.info("Stopping server") - self.autoconnect_thread_running.clear() + self.client_processor_thread_running.clear() self.broadcast_thread_running.clear() self.listener_thread_running.clear() self.server_socket.close() + self.sel.close() logging.info("Server stopped") @staticmethod @@ -113,36 +111,59 @@ class Server: msg, _ = ntp_socket.recvfrom(1024) return int.from_bytes(msg[-8:], 'big') / 2 ** 32 - NTP_DELTA - def _auto_connect(self): - logging.info("Client autoconnect thread started!") - while self.autoconnect_thread_running.is_set(): - self.server_socket.listen(1) - c, addr = self.server_socket.accept() - logging.info("Got connection from: {}".format(str(addr))) - if not any(client_addr == addr[0] for client_addr in Client.clients.keys()): - client = Client(addr[0]) - logging.info("New client") - else: - logging.info("Reconnected client") - Client.clients[addr[0]].connect(c, addr) + # noinspection PyArgumentList + def _client_processor(self): + logging.info("Client processor (selector) thread started!") + self.server_socket.listen() + self.server_socket.setblocking(False) + self.sel.register(self.server_socket, selectors.EVENT_READ, data=None) + + while self.client_processor_thread_running.is_set(): + events = self.sel.select(timeout=None) + for key, mask in events: + if key.data is None: + self._connect_client(key.fileobj) + else: + client = key.data + try: + client.process_events(mask) + except Exception as error: + logging.error("Exception {} occurred for {}! Resetting connection!".format(error, client.addr)) + client.close() + logging.info("Client autoconnect thread stopped!") + def _connect_client(self, sock): + conn, addr = sock.accept() + logging.info("Got connection from: {}".format(str(addr))) + conn.setblocking(False) + + if not any(client_addr == addr[0] for client_addr in Client.clients.keys()): + client = Client(addr[0]) + logging.info("New client") + else: + client = Client.clients[addr[0]] + logging.info("Reconnected client") + self.sel.register(conn, selectors.EVENT_READ, data=client) + client.connect(self.sel, conn, addr) + def _ip_broadcast(self): logging.info("Broadcast sender thread started!") - msg = bytes(Client.form_message( - "server_ip", {"host": self.ip, "port": str(self.port), "id": self.id} - ), "UTF-8") + msg = Message.create_simple_message( + "server_ip", {"host": self.ip, "port": str(self.port), "id": self.id}) broadcast_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) logging.info("Formed broadcast message: {}".format(msg)) - while self.broadcast_thread_running.is_set(): - time.sleep(self.BROADCAST_DELAY) - broadcast_sock.sendto(msg, ('255.255.255.255', self.broadcast_port)) - logging.debug("Broadcast sent") - broadcast_sock.close() - logging.info("Broadcast sender thread stopped, socked closed!") + try: + while self.broadcast_thread_running.is_set(): + time.sleep(self.BROADCAST_DELAY) + broadcast_sock.sendto(msg, ('255.255.255.255', self.broadcast_port)) + logging.debug("Broadcast sent") + finally: + broadcast_sock.close() + logging.info("Broadcast sender thread stopped, socked closed!") def _broadcast_listen(self): logging.info("Broadcast listener thread started!") @@ -154,23 +175,35 @@ class Server: logging.critical("Another server is running on this computer, shutting down!") sys.exit() - while self.listener_thread_running.is_set(): - data, addr = broadcast_client.recvfrom(1024) - command, args = Client.parse_message(data.decode("UTF-8")) - if command == "server_ip": - if args["id"] != self.id: - logging.critical("Another server detected on network, shutting down") - sys.exit() - broadcast_client.close() - logging.info("Broadcast listener thread stopped, socked closed!") + try: + while self.listener_thread_running.is_set(): + data, addr = broadcast_client.recvfrom(1024) + message = Message() + message.income_raw = data + message.process_message() + if message.content: + if message.content["command"] == "server_ip": + if message.content["args"]["id"] != self.id: + logging.critical("Another server detected on network, shutting down") + sys.exit() + else: + logging.warning("Got wrong broadcast message from {}".format(addr)) + finally: + broadcast_client.close() + logging.info("Broadcast listener thread stopped, socked closed!") - def send_starttime(self, copter, dt=0): + def time_now(self): if self.USE_NTP: timenow = Server.get_ntp_time(self.NTP_HOST, self.NTP_PORT) else: timenow = time.time() + return timenow + + def send_starttime(self, copter, dt=0): + timenow = self.time_now() print('Now:', time.ctime(timenow), "+ dt =", dt) - copter.send(Client.form_message("starttime", {"time": str(timenow + dt)})) + copter.send(Message.create_simple_message("starttime", {"time": str(timenow + dt)})) # TODO change start_on + # TODO all commands as tasks with timestamp and priority def requires_connect(f): @@ -192,7 +225,7 @@ def requires_any_connected(f): class Client: - resume_quee = True + resume_queue = True clients = {} @@ -201,9 +234,13 @@ class Client: on_disconnect = None def __init__(self, ip): + self.selector = None self.socket = None self.addr = None + self._recv_buffer = b"" + self._send_buffer = b"" + self._send_queue = collections.deque() self._received_queue = collections.deque() self._request_queue = collections.OrderedDict() @@ -212,167 +249,174 @@ class Client: self._request_lock = threading.Lock() self.copter_id = None - self.selected = False # Use to select copters for certain purposes DEPRECATED - Client.clients[ip] = self + self.clients[ip] = self self.connected = False - def connect(self, client_socket, client_addr): - print("Client connected") - if not Client.resume_quee: + def _set_selector_events_mask(self, mode): + """Set selector to listen for events: mode is 'r', 'w', 'rw'.""" + if mode == "r": + events = selectors.EVENT_READ + elif mode == "w": + events = selectors.EVENT_WRITE + elif mode == "rw": + events = selectors.EVENT_READ | selectors.EVENT_WRITE + else: + raise ValueError("Invalid events mask mode {}.".format(mode)) + self.selector.modify(self.socket, events, data=self) + + def connect(self, client_selector, client_socket, client_addr): + logging.info("Client connected") + if not Client.resume_queue: self._send_queue = collections.deque() + self.selector = client_selector self.socket = client_socket self.addr = client_addr - self.socket.setblocking(0) self.connected = True - client_thread = threading.Thread(target=self._run, name="Client {} thread".format(self.addr)) - client_thread.start() + if self.copter_id is None: - self.copter_id = self.get_response("id") - print("Got copter id:", self.copter_id) - if Client.on_first_connect: - Client.on_first_connect(self) + self.get_response("id", self._got_id) - if Client.on_connect: - Client.on_connect(self) + if self.on_connect: + self.on_connect(self) - def _send_all(self, msg): - self.socket.sendall(struct.pack('>I', len(msg)) + msg) + def _got_id(self): + logging.info("Got copter id: {} for client {}".format(self.copter_id, self.addr)) + if Client.on_first_connect: + Client.on_first_connect(self) - def _receive_all(self, n): - data = b'' - while len(data) < n: - packet = self.socket.recv(min(n - len(data), Server.BUFFER_SIZE)) - print("Receiving packet {}; full data is {}".format(packet, data)) - if not packet: - return None - data += packet - return data + def close(self): + logging.info("Closing connection to {}".format(self.addr)) + try: + self.selector.unregister(self.socket) + except AttributeError: + pass + except Exception as error: + logging.error("{}: Error during selector unregistering: {}".format(self.addr, error)) + finally: + self.selector = None - def _receive_message(self): - raw_msglen = self._receive_all(4) - if not raw_msglen: - print("No valid msg") - return None - msglen = struct.unpack('>I', raw_msglen)[0] - msg = self._receive_all(msglen) - return msg + try: + self.socket.close() + except AttributeError: + pass + except OSError as error: + logging.error("{}: Error during socket closing: {}".format(self.addr, error)) + finally: + self.socket = None - def _run(self): - while self.connected: - try: - if self._send_queue: - with self._send_lock: - msg = self._send_queue.popleft() - try: - self._send_all(msg) - print("Send", msg, "to", self.addr) - except socket.error as e: - logging.warning("Attempt to send failed: {}".format(e)) - with self._send_lock: - self._send_queue.appendleft(msg) - raise e + def process_events(self, mask): + print(self.socket, self.selector, mask) + if mask & selectors.EVENT_READ: + self.read() + if mask & selectors.EVENT_WRITE: + self.write() - try: # check if data in buffer - check = self.socket.recv(Server.BUFFER_SIZE, socket.MSG_PEEK) - if check: - received = self._receive_message() - if received: - received = received.decode("UTF-8") - print("Received", received, "from", self.addr) - command, args = Client.parse_message(received) - if command == "response": - with self._request_lock: - for key, value in self._request_queue.items(): - if not value and key == args["value_name"]: - self._request_queue[key] = args['value'] - print("Request successfully closed") - break - else: - print("Unexpected request") - else: - self._received_queue.appendleft(received) - except socket.error: + with self._send_lock: + if (not self._send_buffer) and self._send_queue: + message = self._send_queue.popleft() + self._send_buffer += message + self._set_selector_events_mask('rw') + + def process_received(self): + if self._received_queue: + if self._received_queue[-1].content: + message = self._received_queue.pop() + message_type = message.jsonheader["message-type"] + logging.debug("Received message! Header: {}, content: {}".format(message.jsonheader, message.content)) + if message_type == "message": + pass + elif message_type == "response": + request_id, requested_value = message.content["requst_id"], message.content["requested_value"] + with self._request_lock: + for key, value in self._request_queue.items(): + if (key == request_id) and (value.requested_value == requested_value): + request = self._request_queue.pop(key) + request.value = message.content["value"] + logging.debug( + "Request successfully closed with value {}".format(message.content["value"]) + ) + request.callback(request.value, *request.callback_args, **request.callback_kwargs) + break + else: + logging.warning("Unexpected request response!") + elif message_type == "request": pass - except socket.error as e: - logging.warning("Client error: {}, disconnected".format(e)) + def read(self): + self._read() + if self._recv_buffer: + if not self._received_queue or (self._received_queue[0].content is not None): + self._received_queue.appendleft(Message()) + + self._received_queue[0].income_raw += self._recv_buffer + self._received_queue[0].process_message() + + if self._received_queue[0].content and self._received_queue[0].income_raw: + self._recv_buffer = self._received_queue[0].income_raw + self._recv_buffer + self._received_queue[0].income_raw = b'' + + self.process_received() + + def write(self): + self._write() + if not (self._send_buffer and self._send_queue): + self._set_selector_events_mask("r") + + def _read(self): + try: + data = self.socket.recv(Server.BUFFER_SIZE) + except BlockingIOError: + # Resource temporarily unavailable (errno EWOULDBLOCK) + pass + else: + if data: + self._recv_buffer += data + logging.debug("Received {} from {}".format(data, self.addr)) + else: + logging.warning("Connection to {} lost!".format(self.addr)) self.connected = False - self.socket.close() + if not Client.resume_queue: + self._recv_buffer = b'' + if Client.on_disconnect: Client.on_disconnect(self) - break - # time.sleep(0.05) - @staticmethod - def form_message(command: str, dict_arguments: dict = None): - if dict_arguments is None: - dict_arguments = {} - msg_dict = {command: dict_arguments} - msg = json.dumps(msg_dict) - return msg + raise RuntimeError("Peer closed.") - @staticmethod - def parse_message(msg): - try: - j_message = json.loads(msg) - except ValueError: - print("Json string not in correct format") - return None, None - str_command = list(j_message.keys())[0] - dict_arguments = list(j_message.values())[0] + def _write(self): + if self._send_buffer: + try: + sent = self.socket.send(self._send_buffer) + except BlockingIOError: + # Resource temporarily unavailable (errno EWOULDBLOCK) + pass + except socket.error as error: + logging.warning("Attempt to send message {} to {} failed due error: {}".format( + self._send_buffer, self.addr, error)) - return str_command, dict_arguments + if not Client.resume_queue: + self._send_buffer = b'' + + raise error + else: + print(sent) + logging.debug("Sent {} to {}".format(self._send_buffer[:sent], self.addr)) + self._send_buffer = self._send_buffer[sent:] + print(self._send_buffer) + time.sleep(0.1) @requires_connect def send(self, *messages): for message in messages: with self._send_lock: - self._send_queue.append(bytes(message, "UTF-8")) + self._send_queue.append(message) - @requires_connect - def get_response(self, requested_value): - with self._request_lock: - self._request_queue[requested_value] = "" - self.send(Client.form_message("request", {"value": requested_value})) - - while not self._request_queue[requested_value]: - pass - - with self._request_lock: - return self._request_queue.pop(requested_value) - - @requires_connect - def send_file(self, filepath, dest_filename, clever_restart = False): - print("Sending file ", dest_filename) - chunk_count = math.ceil(os.path.getsize(filepath) / Server.BUFFER_SIZE) - self.send(Client.form_message("writefile", {"filesize": chunk_count, "filename": dest_filename, "clever_restart": clever_restart})) - with open(filepath, 'rb') as file: - chunk = file.read(Server.BUFFER_SIZE) - while chunk: - with self._send_lock: - self._send_queue.append(chunk) - chunk = file.read(Server.BUFFER_SIZE) - - self.send(Client.form_message("/endoffile")) # TODO mb remove - print("File sent") - - @staticmethod - @requires_any_connected - def send_to_selected(message): # DEPRECATED - for client in Client.clients.values(): - if client.connected and client.selected: - client.send(message) - - @staticmethod - @requires_any_connected - def request_to_selected(requested_value): # DEPRECATED - for client in Client.clients.values(): - if client.connected and client.selected: - client.get_response(requested_value) + def send_message(self, command, args=None): + self.send(Message.create_simple_message(command, args)) @staticmethod @requires_any_connected @@ -381,12 +425,48 @@ class Client: if client.connected or force_all: client.send(message) + @classmethod + @requires_any_connected + def broadcast_message(cls, command, args=None, force_all=False): + cls.broadcast(Message.create_simple_message(command, args), force_all) + + def get_response(self, requested_value, callback, request_args=None, # timeout=30, + callback_args=(), callback_kwargs: dict=None): + if request_args is None: + request_args = {} + if callback_kwargs is None: + callback_kwargs = {} + + request_id = str(random.randint(0, 9999)).zfill(4) + with self._request_lock: + self._request_queue[request_id] = PendingRequest( + requested_value=requested_value, + value=None, + # expires_on=Server.time_now()+timeout, + callback=callback, + callback_args=callback_args, + callback_kwargs=callback_kwargs, + ) + self.send(Message.create_request(requested_value, request_id, request_args)) + + def send_file(self, filepath, dest_filepath): # clever_restart=False + try: + with open(filepath, 'rb') as file: + data = file.read() + except OSError as error: + logging.warning("File can not be opened due error: ".format(error)) + else: + logging.info("Sending file {} to {} (as: {})".format(filepath, self.addr, dest_filepath)) + self.send(Message.create_message(data, "binary", "filetransfer", "binary", {"path": dest_filepath})) + def send_config_options(self, *options: ConfigOption): + logging.info("Sending config options: {} to {}".format(options, self.addr)) for option in options: - self.send( - Client.form_message('config_write', - {'section': option.section, 'option': option.option, 'value': option.value})) - self.send(Client.form_message("config_reload")) + self.send_message( + 'config_write', + {'section': option.section, 'option': option.option, 'value': option.value} + ) + self.send_message("config_reload") @staticmethod def get_by_id(copter_id): diff --git a/Server/server_qt.py b/Server/server_qt.py index 161a34b..e6acb1a 100644 --- a/Server/server_qt.py +++ b/Server/server_qt.py @@ -1,8 +1,8 @@ +import os import glob from PyQt5 import QtWidgets from PyQt5.QtGui import QStandardItemModel, QStandardItem -#from PyQt5.QtCore import QModelIndex from PyQt5.QtCore import Qt, pyqtSlot from PyQt5.QtWidgets import QFileDialog, QMessageBox @@ -11,19 +11,19 @@ from PyQt5.QtWidgets import QFileDialog, QMessageBox from server_gui import Ui_MainWindow from server import * -class CopterView(QStandardItemModel): - pass + +# noinspection PyArgumentList,PyCallByClass class MainWindow(QtWidgets.QMainWindow): def __init__(self): super(MainWindow, self).__init__() self.ui = Ui_MainWindow() self.ui.setupUi(self) - self.initUI() + self.init_ui() self.show() - def initUI(self): - #Connecting + def init_ui(self): + # Connecting self.ui.check_button.clicked.connect(self.check_selected) self.ui.start_button.clicked.connect(self.send_starttime) self.ui.pause_button.clicked.connect(self.pause_all) @@ -38,7 +38,7 @@ class MainWindow(QtWidgets.QMainWindow): self.ui.action_send_configurations.triggered.connect(self.send_configurations) self.ui.action_send_Aruco_map.triggered.connect(self.send_aruco) - #Initing table and table model + # Initing table and table model self.ui.tableView.setModel(model) self.ui.tableView.horizontalHeader().setStretchLastSection(True) @@ -49,24 +49,25 @@ class MainWindow(QtWidgets.QMainWindow): if item.isCheckable() and item.checkState() == Qt.Checked: print("Copter {} checked".format(model.item(row_num, 0).text())) copter = Client.get_by_id(item.text()) - batt_total = float(copter.get_response("batt_voltage")) - batt_cell = float(copter.get_response("cell_voltage")) - selfcheck = copter.get_response("selfcheck") - - batt_percent = ((batt_cell-3.2)/(4.2-3.2))*100 - - model.setData(model.index(row_num, 2), "{} V.".format(round(batt_total, 3))) - model.setData(model.index(row_num, 3), "{} %".format(round(batt_percent, 3))) - if selfcheck != "OK": - print(selfcheck) - model.setData(model.index(row_num, 4), str(selfcheck)) - else: - print("Everything ok") - model.setData(model.index(row_num, 4), str(selfcheck)) + copter.get_response("batt_voltage", self._set_copter_data, callback_args=(row_num, 2)) + copter.get_response("cell_voltage", self._set_copter_data, callback_args=(row_num, 3)) + copter.get_response("selfcheck", self._set_copter_data, callback_args=(row_num, 4)) self.ui.start_button.setEnabled(True) self.ui.takeoff_button.setEnabled(True) + def _set_copter_data(self, value, row, col): + if col == 2: + model.setData(model.index(row, col), "{} V.".format(round(float(value), 3))) + elif col == 3: + batt_percent = ((float(value) - 3.2) / (4.2 - 3.2)) * 100 + model.setData(model.index(row, col), "{} %".format(round(batt_percent, 3))) + elif col == 4: + if value != "OK": + model.setData(model.index(row, col), str(value)) + else: + model.setData(model.index(row, col), str(value)) + @pyqtSlot() def send_starttime(self): dt = self.ui.start_delay_spin.value() @@ -88,15 +89,15 @@ class MainWindow(QtWidgets.QMainWindow): @pyqtSlot() def stop_all(self): - Client.broadcast(Client.form_message("stop")) + Client.broadcast_message("stop") @pyqtSlot() def pause_all(self): if self.ui.pause_button.text() == 'Pause': - Client.broadcast(Client.form_message('pause')) + Client.broadcast_message('pause') self.ui.pause_button.setText('Resume') else: - Client.broadcast(Client.form_message('resume')) + Client.broadcast_message('resume') self.ui.pause_button.setText('Pause') @pyqtSlot() @@ -106,7 +107,7 @@ class MainWindow(QtWidgets.QMainWindow): if item.isCheckable() and item.checkState() == Qt.Checked: if True: # TODO checks for batt/selfckeck here copter = Client.get_by_id(item.text()) - copter.send(Client.form_message("led_test")) + copter.send_message("led_test") @pyqtSlot() def takeoff_selected(self): @@ -122,17 +123,18 @@ class MainWindow(QtWidgets.QMainWindow): if item.isCheckable() and item.checkState() == Qt.Checked: if True: # TODO checks for batt/selfckeck here copter = Client.get_by_id(item.text()) - copter.send(Client.form_message("takeoff")) + copter.send_message("takeoff") else: print("Cancelled") + pass @pyqtSlot() def land_all(self): - Client.broadcast(Client.form_message("land")) + Client.broadcast_message("land") @pyqtSlot() def disarm_all(self): - Client.broadcast(Client.form_message("disarm")) + Client.broadcast_message("disarm") @pyqtSlot() def send_animations(self): @@ -181,7 +183,8 @@ class MainWindow(QtWidgets.QMainWindow): item = model.item(row_num, 0) if item.isCheckable() and item.checkState() == Qt.Checked: copter = Client.get_by_id(item.text()) - copter.send_file(path, "/home/pi/catkin_ws/src/clever/aruco_pose/map/animation_map.txt", clever_restart=True) + copter.send_file(path, "/home/pi/catkin_ws/src/clever/aruco_pose/map/animation_map.txt") + # clever_restart=True model = QStandardItemModel() diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/messaging_lib.py b/messaging_lib.py new file mode 100644 index 0000000..c3fb1f4 --- /dev/null +++ b/messaging_lib.py @@ -0,0 +1,108 @@ +import io +import sys +import json +import struct + + +class Message: + def __init__(self): + self.income_raw = b"" + self._jsonheader_len = None + self.jsonheader = None + self.content = None + + @staticmethod + def _json_encode(obj, encoding="utf-8"): + return json.dumps(obj, ensure_ascii=False).encode(encoding) + + @staticmethod + def _json_decode(json_bytes, encoding="utf-8"): + with io.TextIOWrapper(io.BytesIO(json_bytes), encoding=encoding, newline="") as tiow: + obj = json.load(tiow) + return obj + + @classmethod + def create_message(cls, content_bytes, content_type, message_type, content_encoding="utf-8", + additional_headers=None): + jsonheader = { + "byteorder": sys.byteorder, + "content-type": content_type, + "content-encoding": content_encoding, + "content-length": len(content_bytes), + "message-type": message_type, + } + if additional_headers: + jsonheader.update(additional_headers) + + jsonheader_bytes = cls._json_encode(jsonheader, "utf-8") + message_hdr = struct.pack(">H", len(jsonheader_bytes)) + message = message_hdr + jsonheader_bytes + content_bytes + return message + + @classmethod + def create_json_message(cls, contents): + message = cls.create_message(cls._json_encode(contents), "json", "message") + return message + + @classmethod + def create_simple_message(cls, command, args=None): + if args is None: + args = {} + message = cls.create_json_message({"command": command, "args": args}) + return message + + @classmethod + def create_request(cls, requested_value, request_id, args=None): + if args is None: + args = {} + contents = {"requested_value": requested_value, + "requst_id": request_id, + "args": args, + } + message = cls.create_message(cls._json_encode(contents), "json", "request") + return message + + def _process_protoheader(self): + header_len = 2 + if len(self.income_raw) >= header_len: + self._jsonheader_len = struct.unpack(">H", self.income_raw[:header_len])[0] + self.income_raw = self.income_raw[header_len:] + + def _process_jsonheader(self): + header_len = self._jsonheader_len + if len(self.income_raw) >= header_len: + self.jsonheader = self._json_decode(self.income_raw[:header_len], "utf-8") + self.income_raw = self.income_raw[header_len:] + for reqhdr in ( + "byteorder", + "content-length", + "content-type", + "content-encoding", + "message-type", + ): + if reqhdr not in self.jsonheader: + raise ValueError('Missing required header {}'.format(reqhdr)) + + def _process_content(self): + content_len = self.jsonheader["content-length"] + if not len(self.income_raw) >= content_len: + return + data = self.income_raw[:content_len] + self.income_raw = self.income_raw[content_len:] + if self.jsonheader["content-type"] == "json": + encoding = self.jsonheader["content-encoding"] + self.content = self._json_decode(data, encoding) + else: + self.content = data + + def process_message(self): + if self._jsonheader_len is None: + self._process_protoheader() + + if self._jsonheader_len is not None: + if self.jsonheader is None: + self._process_jsonheader() + + if self.jsonheader: + if self.content is None: + self._process_content()