diff --git a/Drone/client.py b/Drone/client.py index 87614ac..241a0d3 100644 --- a/Drone/client.py +++ b/Drone/client.py @@ -4,6 +4,7 @@ import random import socket import struct import logging +import collections import selectors2 as selectors import ConfigParser from contextlib import closing @@ -13,7 +14,7 @@ current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentfra parent_dir = os.path.dirname(current_dir) sys.path.insert(0, parent_dir) -from messaging_lib import Message +import messaging_lib as messaging random.seed() logging.basicConfig( # TODO all prints as logs @@ -29,6 +30,9 @@ class Client: def __init__(self, config_path="client_config.ini"): self.selector = selectors.DefaultSelector() self.client_socket = None + + self.server_connection = messaging.ConnectionManager() + self.server_host = None self.server_port = None self.broadcast_port = None @@ -70,7 +74,7 @@ class Client: self.client_id = self.config.get('PRIVATE', 'id') if self.client_id == 'default': - client_id = 'copter' + str(random.randrange(9999)).zfill(4) + self.client_id = 'copter' + str(random.randrange(9999)).zfill(4) #write_to_config('PRIVATE', 'id', client_id) elif self.client_id == '/hostname': self.client_id = socket.gethostname() @@ -105,11 +109,6 @@ class Client: logging.critical("Shutting down on keyboard interrupt") raise KeyboardInterrupt else: - self.connected = True - #self.client_socket.settimeout(None) - self.client_socket.setblocking(False) - events = selectors.EVENT_READ | selectors.EVENT_WRITE - self.selector.register(self.client_socket, events, data=None) logging.info("Connection to server successful!") break @@ -118,14 +117,20 @@ class Client: self.broadcast_bind() attempt_count = 0 + def _connect(self): + self.connected = True + self.client_socket.setblocking(False) + events = selectors.EVENT_READ | selectors.EVENT_WRITE + self.selector.register(self.client_socket, events, data=None) + def broadcast_bind(self): broadcast_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) broadcast_client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) broadcast_client.bind(("", self.broadcast_port)) try: while True: - data, addr = broadcast_client.recvfrom(1024) - message = Message() + data, addr = broadcast_client.recvfrom(self.BUFFER_SIZE) + message = messaging.MessageManager() message.income_raw = data message.process_message() if message.content: @@ -141,39 +146,25 @@ class Client: finally: broadcast_client.close() - def service_connection(self, key, mask): - sock = key.fileobj - data = key.data - if mask & selectors.EVENT_READ: - recv_data = sock.recv(1024) # Should be ready to read - if recv_data: - print("received", repr(recv_data), "from connection", ) - if not recv_data: - print("closing connection",) - self.selector.unregister(sock) - self.client_socket.close() - if mask & selectors.EVENT_READ: - pass - - def mainloop(self): - #self.client_socket.send("104771313759739") try: while True: events = self.selector.select(timeout=1) if events: for key, mask in events: - self.service_connection(key, mask) - pass + if key.data is None: + connection = key.data + connection.process_events(mask) if not self.selector.get_map(): - logging.warning("No active connections left") + logging.warning("No active connections left!") self.reconnect() - except KeyboardInterrupt: - print("caught keyboard interrupt, exiting") + except (KeyboardInterrupt, errno.EINTR): + logging.critical("Caught interrupt, exiting!") finally: self.selector.close() +# TODO class connection if __name__ == "__main__": client = Client() diff --git a/Server/server.py b/Server/server.py index 40f4d82..6dc972a 100644 --- a/Server/server.py +++ b/Server/server.py @@ -8,7 +8,7 @@ import selectors import collections import configparser -from messaging_lib import Message +import messaging_lib as messaging # All imports sorted in pyramid just because @@ -23,9 +23,6 @@ logging.basicConfig( # TODO all prints as logs ]) ConfigOption = collections.namedtuple("ConfigOption", ["section", "option", "value"]) -PendingRequest = collections.namedtuple("PendingRequest", ["value", "requested_value", # "expires_on", - "callback", "callback_args", "callback_kwargs" - ]) class Server: @@ -149,7 +146,7 @@ class Server: def _ip_broadcast(self): logging.info("Broadcast sender thread started!") - msg = Message.create_simple_message( + msg = messaging.MessageManager.create_simple_message( "server_ip", {"host": self.ip, "port": str(self.port), "id": self.id}) broadcast_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -178,7 +175,7 @@ class Server: try: while self.listener_thread_running.is_set(): data, addr = broadcast_client.recvfrom(1024) - message = Message() + message = messaging.MessageManager() message.income_raw = data message.process_message() if message.content: @@ -202,7 +199,7 @@ class Server: def send_starttime(self, copter, dt=0): timenow = self.time_now() print('Now:', time.ctime(timenow), "+ dt =", dt) - copter.send(Message.create_simple_message("starttime", {"time": str(timenow + dt)})) # TODO change start_on + copter.send_message("starttime", {"time": str(timenow + dt)}) # TODO change start_on # TODO all commands as tasks with timestamp and priority @@ -224,9 +221,7 @@ def requires_any_connected(f): return wrapper -class Client: - resume_queue = True - +class Client(messaging.ConnectionManager): clients = {} on_connect = None # Use as callback functions @@ -234,48 +229,24 @@ class Client: on_disconnect = None def __init__(self, ip): - self.selector = None - self.socket = None - self.addr = None - - self._recv_buffer = b"" - self._send_buffer = b"" - - self._send_queue = collections.deque() - self._received_queue = collections.deque() - self._request_queue = collections.OrderedDict() - - self._send_lock = threading.Lock() - self._request_lock = threading.Lock() - + super(Client, self).__init__() self.copter_id = None + self.connected = False self.clients[ip] = self - self.connected = False - - def _set_selector_events_mask(self, mode): - """Set selector to listen for events: mode is 'r', 'w', 'rw'.""" - if mode == "r": - events = selectors.EVENT_READ - elif mode == "w": - events = selectors.EVENT_WRITE - elif mode == "rw": - events = selectors.EVENT_READ | selectors.EVENT_WRITE - else: - raise ValueError("Invalid events mask mode {}.".format(mode)) - self.selector.modify(self.socket, events, data=self) + @staticmethod + def get_by_id(copter_id): + for client in Client.clients.values(): + if client.copter_id == copter_id: + return client def connect(self, client_selector, client_socket, client_addr): logging.info("Client connected") - if not Client.resume_queue: + if not self.resume_queue: self._send_queue = collections.deque() - self.selector = client_selector - self.socket = client_socket - self.addr = client_addr - - self._set_selector_events_mask('rw') + super(Client, self).connect(client_selector, client_socket, client_addr) self.connected = True @@ -291,175 +262,18 @@ class Client: Client.on_first_connect(self) def close(self): - logging.info("Closing connection to {}".format(self.addr)) - try: - self.selector.unregister(self.socket) - except AttributeError: - pass - except Exception as error: - logging.error("{}: Error during selector unregistering: {}".format(self.addr, error)) - finally: - self.selector = None + self.connected = False - try: - self.socket.close() - except AttributeError: - pass - except OSError as error: - logging.error("{}: Error during socket closing: {}".format(self.addr, error)) - finally: - self.socket = None + if Client.on_disconnect: + Client.on_disconnect(self) - def process_events(self, mask): - print(self.socket, self.selector, mask) - if mask & selectors.EVENT_READ: - self.read() - if mask & selectors.EVENT_WRITE: - self.write() + super(Client, self).close() - with self._send_lock: - if (not self._send_buffer) and self._send_queue: - message = self._send_queue.popleft() - self._send_buffer += message - # self._set_selector_events_mask('rw') - - def process_received(self): - if self._received_queue: - if self._received_queue[-1].content: - message = self._received_queue.pop() - message_type = message.jsonheader["message-type"] - logging.debug("Received message! Header: {}, content: {}".format(message.jsonheader, message.content)) - if message_type == "message": - pass - elif message_type == "response": - request_id, requested_value = message.content["requst_id"], message.content["requested_value"] - with self._request_lock: - for key, value in self._request_queue.items(): - if (key == request_id) and (value.requested_value == requested_value): - request = self._request_queue.pop(key) - request.value = message.content["value"] - logging.debug( - "Request successfully closed with value {}".format(message.content["value"]) - ) - request.callback(request.value, *request.callback_args, **request.callback_kwargs) - break - else: - logging.warning("Unexpected request response!") - elif message_type == "request": - pass - - def read(self): - self._read() - if self._recv_buffer: - if not self._received_queue or (self._received_queue[0].content is not None): - self._received_queue.appendleft(Message()) - - self._received_queue[0].income_raw += self._recv_buffer - self._received_queue[0].process_message() - - if self._received_queue[0].content and self._received_queue[0].income_raw: - self._recv_buffer = self._received_queue[0].income_raw + self._recv_buffer - self._received_queue[0].income_raw = b'' - - self.process_received() - - def write(self): - self._write() - # if not (self._send_buffer and self._send_queue): - # self._set_selector_events_mask("r") - - def _read(self): - try: - data = self.socket.recv(Server.BUFFER_SIZE) - except BlockingIOError: - # Resource temporarily unavailable (errno EWOULDBLOCK) - pass - else: - if data: - self._recv_buffer += data - logging.debug("Received {} from {}".format(data, self.addr)) - else: - logging.warning("Connection to {} lost!".format(self.addr)) - self.connected = False - if not Client.resume_queue: - self._recv_buffer = b'' - - if Client.on_disconnect: - Client.on_disconnect(self) - - raise RuntimeError("Peer closed.") - - def _write(self): - if self._send_buffer: - try: - sent = self.socket.send(self._send_buffer) - except BlockingIOError: - # Resource temporarily unavailable (errno EWOULDBLOCK) - pass - except socket.error as error: - logging.warning("Attempt to send message {} to {} failed due error: {}".format( - self._send_buffer, self.addr, error)) - - if not Client.resume_queue: - self._send_buffer = b'' - - raise error - else: - print(sent) - logging.debug("Sent {} to {}".format(self._send_buffer[:sent], self.addr)) - self._send_buffer = self._send_buffer[sent:] - print(self._send_buffer) - time.sleep(0.1) @requires_connect - def send(self, *messages): - for message in messages: - with self._send_lock: - self._send_queue.append(message) - - def send_message(self, command, args=None): - self.send(Message.create_simple_message(command, args)) - - @staticmethod - @requires_any_connected - def broadcast(message, force_all=False): - for client in Client.clients.values(): - if client.connected or force_all: - client.send(message) - - @classmethod - @requires_any_connected - def broadcast_message(cls, command, args=None, force_all=False): - cls.broadcast(Message.create_simple_message(command, args), force_all) - - def get_response(self, requested_value, callback, request_args=None, # timeout=30, - callback_args=(), callback_kwargs: dict=None): - if request_args is None: - request_args = {} - if callback_kwargs is None: - callback_kwargs = {} - - request_id = str(random.randint(0, 9999)).zfill(4) - with self._request_lock: - self._request_queue[request_id] = PendingRequest( - requested_value=requested_value, - value=None, - # expires_on=Server.time_now()+timeout, - callback=callback, - callback_args=callback_args, - callback_kwargs=callback_kwargs, - ) - self.send(Message.create_request(requested_value, request_id, request_args)) - - def send_file(self, filepath, dest_filepath): # clever_restart=False - try: - with open(filepath, 'rb') as file: - data = file.read() - except OSError as error: - logging.warning("File can not be opened due error: ".format(error)) - else: - logging.info("Sending file {} to {} (as: {})".format(filepath, self.addr, dest_filepath)) - self.send(Message.create_message(data, "binary", "filetransfer", "binary", {"path": dest_filepath})) + def _send(self, data): + super(Client, self)._send(data) + logging.debug("Queued data to send: {}".format(data)) def send_config_options(self, *options: ConfigOption): logging.info("Sending config options: {} to {}".format(options, self.addr)) @@ -471,10 +285,16 @@ class Client: self.send_message("config_reload") @staticmethod - def get_by_id(copter_id): - for copter in Client.clients.values(): - if copter.copter_id == copter_id: - return copter + @requires_any_connected + def broadcast(message, force_all=False): + for client in Client.clients.values(): + if client.connected or force_all: + client.send(message) + + @classmethod + @requires_any_connected + def broadcast_message(cls, command, args=None, force_all=False): + cls.broadcast(messaging.MessageManager.create_simple_message(command, args), force_all) if __name__ == '__main__': diff --git a/messaging_lib.py b/messaging_lib.py index c3fb1f4..561b6ce 100644 --- a/messaging_lib.py +++ b/messaging_lib.py @@ -2,9 +2,22 @@ import io import sys import json import struct +import random +import logging +import threading +import collections + +try: + import selectors +except: + import selectors2 as selectors + +PendingRequest = collections.namedtuple("PendingRequest", ["value", "requested_value", # "expires_on", + "callback", "callback_args", "callback_kwargs" + ]) -class Message: +class MessageManager: def __init__(self): self.income_raw = b"" self._jsonheader_len = None @@ -62,6 +75,15 @@ class Message: message = cls.create_message(cls._json_encode(contents), "json", "request") return message + @classmethod + def create_response(cls, requested_value, request_id, value): + contents = {"requested_value": requested_value, + "requst_id": request_id, + "value": value, + } + message = cls.create_message(cls._json_encode(contents), "json", "response") + return message + def _process_protoheader(self): header_len = 2 if len(self.income_raw) >= header_len: @@ -106,3 +128,238 @@ class Message: if self.jsonheader: if self.content is None: self._process_content() + + +def message_callback(string_command): + def inner(f): + ConnectionManager.requests_callbacks.update({string_command: f}) + + def wrapper(*args, **kwargs): + return f(*args, **kwargs) + return wrapper + return inner + + +class ConnectionManager(object): + messages_callbacks = {} + requests_callbacks = {} + + def __init__(self): + self.selector = None + self.socket = None + self.addr = None + + self.selector = None + self.socket = None + self.addr = None + + self._recv_buffer = b"" + self._send_buffer = b"" + + self._send_queue = collections.deque() + self._received_queue = collections.deque() + self._request_queue = collections.OrderedDict() + + self._send_lock = threading.Lock() + self._request_lock = threading.Lock() + + self.BUFFER_SIZE = 1024 + self.resume_queue = True + + def _set_selector_events_mask(self, mode): + """Set selector to listen for events: mode is 'r', 'w', 'rw'.""" + if mode == "r": + events = selectors.EVENT_READ + elif mode == "w": + events = selectors.EVENT_WRITE + elif mode == "rw": + events = selectors.EVENT_READ | selectors.EVENT_WRITE + else: + raise ValueError("Invalid events mask mode {}.".format(mode)) + self.selector.modify(self.socket, events, data=self) + + def connect(self, client_selector, client_socket, client_addr): + self.selector = client_selector + self.socket = client_socket + self.addr = client_addr + + self._set_selector_events_mask('rw') + + def close(self): + logging.info("Closing connection to {}".format(self.addr)) + try: + self.selector.unregister(self.socket) + except AttributeError: + pass + except Exception as error: + logging.error("{}: Error during selector unregistering: {}".format(self.addr, error)) + finally: + self.selector = None + + try: + self.socket.close() + except AttributeError: + pass + except OSError as error: + logging.error("{}: Error during socket closing: {}".format(self.addr, error)) + finally: + self.socket = None + + def process_events(self, mask): + print(self.socket, self.selector, mask) + if mask & selectors.EVENT_READ: + self.read() + if mask & selectors.EVENT_WRITE: + self.write() + + def read(self): + self._read() + if self._recv_buffer: + if not self._received_queue or (self._received_queue[0].content is not None): + self._received_queue.appendleft(MessageManager()) + + self._received_queue[0].income_raw += self._recv_buffer + self._received_queue[0].process_message() + + if self._received_queue[0].content and self._received_queue[0].income_raw: + self._recv_buffer = self._received_queue[0].income_raw + self._recv_buffer + self._received_queue[0].income_raw = b'' + + self.process_received() + + def _read(self): + try: + data = self.socket.recv(self.BUFFER_SIZE) + except io.BlockingIOError: + # Resource temporarily unavailable (errno EWOULDBLOCK) + pass + else: + if data: + self._recv_buffer += data + logging.debug("Received {} from {}".format(data, self.addr)) + else: + logging.warning("Connection to {} lost!".format(self.addr)) + if not self.resume_queue: + self._recv_buffer = b'' + + raise RuntimeError("Peer closed.") + + def process_received(self): + if self._received_queue: + if self._received_queue[-1].content: + message = self._received_queue.pop() + message_type = message.jsonheader["message-type"] + logging.debug("Received message! Header: {}, content: {}".format(message.jsonheader, message.content)) + if message_type == "message": + self._process_message(message) + elif message_type == "response": + self._process_response(message) + elif message_type == "request": + self._process_request(message) + elif message_type == "filetransfer": + self._process_filetransfer(message) + + def _process_message(self, message): + command = message.content["command"] + args = message.content["args"] + self.messages_callbacks[command](**args) + + def _process_request(self, message): + command = message.content["requested_value"] + request_id = message.content["request_id"] + args = message.content["args"] + value = self.requests_callbacks[command](**args) + self._send_response(command, request_id, value) + + def _process_response(self, message): + request_id, requested_value = message.content["requst_id"], message.content["requested_value"] + with self._request_lock: + for key, value in self._request_queue.items(): + if (key == request_id) and (value.requested_value == requested_value): + request = self._request_queue.pop(key) + request.value = message.content["value"] + logging.debug( + "Request successfully closed with value {}".format(message.content["value"]) + ) + request.callback(request.value, *request.callback_args, **request.callback_kwargs) + break + else: + logging.warning("Unexpected response!") + + def _process_filetransfer(self, message): + if message.jsonheader["content-type"] == "binary": + filepath = message.jsonheader["filepath"] + try: + with open(filepath, 'wb') as f: + f.write(message.content) + except OSError as error: + logging.warning("File can not be written due error: ".format(error)) + else: + logging.info("File {} successfully received ".format(filepath)) + + def write(self): + with self._send_lock: + if (not self._send_buffer) and self._send_queue: + message = self._send_queue.popleft() + self._send_buffer += message + if self._send_buffer: + self._write() + + def _write(self): + try: + sent = self.socket.send(self._send_buffer) + except io.BlockingIOError: + # Resource temporarily unavailable (errno EWOULDBLOCK) + pass + except Exception as error: + logging.warning("Attempt to send message {} to {} failed due error: {}".format( + self._send_buffer, self.addr, error)) + + if not self.resume_queue: + self._send_buffer = b'' + + raise error + else: + logging.debug("Sent {} to {}".format(self._send_buffer[:sent], self.addr)) + self._send_buffer = self._send_buffer[sent:] + + def _send(self, data): + with self._send_lock: + self._send_queue.append(data) + + def get_response(self, requested_value, callback, request_args=None, # timeout=30, + callback_args=(), callback_kwargs=None): + if request_args is None: + request_args = {} + if callback_kwargs is None: + callback_kwargs = {} + + request_id = str(random.randint(0, 9999)).zfill(4) + with self._request_lock: + self._request_queue[request_id] = PendingRequest( + requested_value=requested_value, + value=None, + # expires_on=Server.time_now()+timeout, #TODO + callback=callback, + callback_args=callback_args, + callback_kwargs=callback_kwargs, + ) + self._send(MessageManager.create_request(requested_value, request_id, request_args)) + + def send_message(self, command, args=None): + self._send(MessageManager.create_simple_message(command, args)) + + def _send_response(self, requested_value, request_id, value): + self._send(MessageManager.create_response(requested_value, request_id, value)) + + def send_file(self, filepath, dest_filepath): # clever_restart=False + try: + with open(filepath, 'rb') as f: + data = f.read() + except OSError as error: + logging.warning("File can not be opened due error: ".format(error)) + else: + logging.info("Sending file {} to {} (as: {})".format(filepath, self.addr, dest_filepath)) + self._send(MessageManager.create_message( + data, "binary", "filetransfer", "binary", {"filepath": dest_filepath} + )) \ No newline at end of file