import io import os import sys import json import socket import struct import random import logging import threading import collections import platform import traceback from contextlib import closing try: import selectors except ImportError: import selectors2 as selectors class Namespace: def __init__(self, **kwargs): self.__dict__.update(kwargs) def __getitem__(self, key): return self.__dict__[key] def __setitem__(self, key, value): self.__dict__[key] = value class PendingRequest(Namespace): pass logger = logging.getLogger(__name__) def get_ip_address(): try: with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as ip_socket: ip_socket.connect(("8.8.8.8", 80)) return ip_socket.getsockname()[0] except OSError: logger.warning("No network connection detected, using localhost") return "localhost" def set_keepalive(sock, after_idle_sec=1, interval_sec=3, max_fails=5): current_platform = platform.system() # could be empty if current_platform == "Linux": return _set_keepalive_linux(sock, after_idle_sec, interval_sec, max_fails) if current_platform == "Windows": return _set_keepalive_windows(sock, after_idle_sec, interval_sec) if current_platform == "Darwin": return _set_keepalive_osx(sock, interval_sec) def _set_keepalive_linux(sock, after_idle_sec, interval_sec, max_fails): sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, after_idle_sec) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval_sec) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, max_fails) def _set_keepalive_windows(sock, after_idle_sec, interval_sec): sock.ioctl(socket.SIO_KEEPALIVE_VALS, (1, after_idle_sec*1000, interval_sec*1000)) def _set_keepalive_osx(sock, interval_sec): TCP_KEEPALIVE = 0x10 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) sock.setsockopt(socket.IPPROTO_TCP, TCP_KEEPALIVE, interval_sec) class _Singleton(type): """ A metaclass that creates a Singleton base class when called. """ _instances = {} def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super(_Singleton, cls).__call__(*args, **kwargs) return cls._instances[cls] class Singleton(_Singleton('SingletonMeta', (object,), {})): pass class MessageManager: def __init__(self): self.income_raw = b"" self._jsonheader_len = None self.jsonheader = None self.content = None @staticmethod def _json_encode(obj, encoding="utf-8"): return json.dumps(obj, ensure_ascii=False).encode(encoding) @staticmethod def _json_decode(json_bytes, encoding="utf-8"): with io.TextIOWrapper(io.BytesIO(json_bytes), encoding=encoding, newline="") as tiow: obj = json.load(tiow, object_pairs_hook=collections.OrderedDict) return obj @classmethod def create_message(cls, content_bytes, content_type, message_type, content_encoding="utf-8", additional_headers=None): jsonheader = { "byteorder": sys.byteorder, "content-type": content_type, "content-encoding": content_encoding, "content-length": len(content_bytes), "message-type": message_type, } if additional_headers: jsonheader.update(additional_headers) jsonheader_bytes = cls._json_encode(jsonheader, "utf-8") message_hdr = struct.pack(">H", len(jsonheader_bytes)) message = message_hdr + jsonheader_bytes + content_bytes return message @classmethod def create_json_message(cls, contents, additional_headers=None): message = cls.create_message(cls._json_encode(contents), "json", "message", additional_headers=additional_headers) return message @classmethod def create_action_message(cls, action, args=(), kwargs=None): if kwargs is None: kwargs = {} message = cls.create_json_message({"args": args, "kwargs": kwargs}, {"action": action, }) return message @classmethod def create_request(cls, requested_value, request_id, args=(), kwargs=None): if kwargs is None: kwargs = {} contents = {"requested_value": requested_value, "request_id": request_id, "args": args, "kwargs": kwargs, } message = cls.create_message(cls._json_encode(contents), "json", "request") return message @classmethod def create_response(cls, requested_value, request_id, value, filetransfer=False): headers = {"requested_value": requested_value, "request_id": request_id, # TODO status } if filetransfer: contents = value else: contents = cls._json_encode({"value": value, }) message = cls.create_message(contents, "binary" if filetransfer else "json", "response", additional_headers=headers) return message def _process_protoheader(self): header_len = 2 if len(self.income_raw) >= header_len: self._jsonheader_len = struct.unpack(">H", self.income_raw[:header_len])[0] self.income_raw = self.income_raw[header_len:] def _process_jsonheader(self): header_len = self._jsonheader_len if len(self.income_raw) >= header_len: self.jsonheader = self._json_decode(self.income_raw[:header_len], "utf-8") self.income_raw = self.income_raw[header_len:] for reqhdr in ( "byteorder", "content-length", "content-type", "content-encoding", "message-type", ): if reqhdr not in self.jsonheader: raise ValueError('Missing required header {}'.format(reqhdr)) def _process_content(self): content_len = self.jsonheader["content-length"] if not len(self.income_raw) >= content_len: return data = self.income_raw[:content_len] self.income_raw = self.income_raw[content_len:] if self.jsonheader["content-type"] == "json": encoding = self.jsonheader["content-encoding"] self.content = self._json_decode(data, encoding) else: self.content = data def process_message(self): if self._jsonheader_len is None: self._process_protoheader() if self._jsonheader_len is not None: if self.jsonheader is None: self._process_jsonheader() if self.jsonheader: if self.content is None: self._process_content() def message_callback(action_string): def inner(f): ConnectionManager.messages_callbacks[action_string] = f logger.debug("Registered message function {} for {}".format(f, action_string)) def wrapper(*args, **kwargs): return f(*args, **kwargs) return wrapper return inner def request_callback(string_command): def inner(f): ConnectionManager.requests_callbacks[string_command] = f logger.debug("Registered callback function {} for {}".format(f, string_command)) def wrapper(*args, **kwargs): return f(*args, **kwargs) return wrapper return inner class ConnectionManager(object): messages_callbacks = {} requests_callbacks = {} def __init__(self, whoami="computer"): self.selector = None self.socket = None self.addr = None self._should_close = False self._recv_buffer = b"" self._send_buffer = b"" self.whoami = whoami self._send_queue = collections.deque() self._received_queue = collections.deque() self._request_queue = collections.OrderedDict() self._send_lock = threading.Lock() self._request_lock = threading.Lock() self._close_lock = threading.Lock() self.buffer_size = 1024 self.resume_queue = False self.resend_requests = True def _set_selector_events_mask(self, mode): """Set selector to listen for events: mode is 'r', 'w', 'rw'.""" if mode == "r": events = selectors.EVENT_READ elif mode == "w": events = selectors.EVENT_WRITE elif mode == "rw": events = selectors.EVENT_READ | selectors.EVENT_WRITE else: raise ValueError("Invalid events mask mode {}.".format(mode)) key = self.selector.modify(self.socket, events, data=self) logger.debug("Switched selector of {} to mode {}".format(self.addr, key.events)) return key def connect(self, client_selector, client_socket, client_addr): self.selector = client_selector self.socket = client_socket self.addr = client_addr self._clear() self._set_selector_events_mask('r') if self.resend_requests: self._resend_requests() def _clear(self): if not self.resume_queue: # maybe needs locks self._recv_buffer = b'' self._send_buffer = b'' self._received_queue.clear() self._send_queue.clear() def close(self): with self._close_lock: self._should_close = True self._set_selector_events_mask('w') NotifierSock().notify() def _close(self): logger.info("Closing connection to {}".format(self.addr)) try: logger.info("Unregistering selector of {}".format(self.addr)) self.selector.unregister(self.socket) except AttributeError: pass except Exception as error: logger.error("{}: Error during selector unregistering: {}".format(self.addr, error)) finally: self.selector = None try: logger.info("Closing socket of of {}".format(self.addr)) self.socket.close() except AttributeError: pass except OSError as error: logger.error("{}: Error during socket closing: {}".format(self.addr, error)) finally: self.socket = None with self._close_lock: self._should_close = False self._clear() logger.info("CLOSED connection to {}".format(self.addr)) def process_events(self, mask): with self._close_lock: close = self._should_close if close: self._close() return if mask & selectors.EVENT_READ: self.read() if mask & selectors.EVENT_WRITE: self.write() def read(self): self._read() while self._recv_buffer: # add new message object if queue is empty or last message already processed if not self._received_queue or (self._received_queue[0].content is not None): self._received_queue.appendleft(MessageManager()) last_message = self._received_queue[0] last_message.income_raw += self._recv_buffer self._recv_buffer = b'' last_message.process_message() # if something left after processing message - put it back if last_message.content is not None and last_message.income_raw: self._recv_buffer = last_message.income_raw + self._recv_buffer last_message.income_raw = b'' if self._received_queue and last_message.content is not None: self.process_received(self._received_queue.popleft()) def _read(self): try: data = self.socket.recv(self.buffer_size) except io.BlockingIOError: # Resource temporarily unavailable (errno EWOULDBLOCK) pass else: if data: self._recv_buffer += data logger.debug("Received {} bytes from {}".format(len(data), self.addr)) else: logger.warning("Connection to {} lost!".format(self.addr)) raise RuntimeError("Peer closed.") def process_received(self, message): message_type = message.jsonheader["message-type"] content = message.content if message.jsonheader["content-type"] != "binary"\ else message.content[:256] logger.debug( "Received message! Header: {}, content: {}".format(message.jsonheader, content)) if message_type == "message": self._process_message(message) elif message_type == "response": self._process_response(message) elif message_type == "request": self._process_request(message) def _process_message(self, message): if message.jsonheader["action"] == "filetransfer": self._process_filetransfer(message.content, message.jsonheader["filepath"]) else: self._process_action(message) def _process_action(self, message): action = message.jsonheader["action"] args = message.content["args"] kwargs = message.content["kwargs"] callback = self.messages_callbacks.get(action, None) if callback is None: logger.warning("Action {} does not exist!".format(action)) return try: callback(self, *args, **kwargs) except Exception as error: logger.error("Error during action {} execution: {}".format(action, error)) traceback.print_exc() def _process_request(self, message): requested_value = message.content["requested_value"] request_id = message.content["request_id"] args = message.content["args"] kwargs = message.content["kwargs"] filetransfer = requested_value == "filetransfer" try: if filetransfer: value = self._read_file(kwargs["filepath"]) else: callback = self.requests_callbacks.get(requested_value, None) if callback is None: logger.warning("Request {} does not exist!".format(requested_value)) return value = callback(self, *args, **kwargs) except Exception as error: # TODO send response error\cancel logger.error("Error during request {} processing: {}".format(requested_value, error)) else: self._send_response(requested_value, request_id, value, filetransfer) def _process_response(self, message): request_id, requested_value = message.jsonheader["request_id"], message.jsonheader["requested_value"] with self._request_lock: request = self._request_queue.pop(request_id, None) if (request is None) or (request.requested_value != requested_value): logger.warning("Unexpected response!") return if requested_value == "filetransfer": value = True self._process_filetransfer(message.content, request.callback_kwargs["filepath"]) logger.debug( "Request {} successfully closed with file bytes {}...".format(request, message.content[:256]) ) else: value = message.content["value"] logger.debug( "Request {} successfully closed with value {}".format(request, message.content["value"]) ) if request.callback is not None: try: request.callback(self, value, *request.callback_args, **request.callback_kwargs) except Exception as error: logger.error("Error during response {} processing: {}".format(request, error)) else: logger.info("No callback were registered for response: {}".format(request)) @staticmethod def _read_file(filepath): with open(filepath, mode='rb') as f: return f.read() def _process_filetransfer(self, content, filepath): try: with open(filepath, 'wb') as f: f.write(content) except OSError as error: logger.error("File {} can not be written due error: {}".format(filepath, error)) else: logger.info("File {} successfully received ".format(filepath)) if self.whoami == "pi": logger.info("Return rights to pi:pi after file transfer") os.system("chown pi:pi {}".format(filepath)) def write(self): with self._send_lock: if (not self._send_buffer) and self._send_queue: message = self._send_queue.popleft() self._send_buffer += message if self._send_buffer: self._write() else: self._set_selector_events_mask('r') # we're done writing def _write(self): try: sent = self.socket.send(self._send_buffer[:self.buffer_size]) except io.BlockingIOError: # Resource temporarily unavailable (errno EWOULDBLOCK) pass except Exception as error: logger.warning( "Attempt to send message {} to {} failed due error: {}".format(self._send_buffer, self.addr, error)) raise error else: self._send_buffer = self._send_buffer[sent:] left = len(self._send_buffer) logger.debug("Sent message to {}: sent {} bytes, {} bytes left.".format(self.addr, sent, left)) def _send(self, data): with self._send_lock: self._send_queue.append(data) if self.selector.get_key(self.socket).events != selectors.EVENT_WRITE: self._set_selector_events_mask('rw') NotifierSock().notify() def get_response(self, requested_value, callback, # timeout=30, request_args=(), request_kwargs=None, callback_args=(), callback_kwargs=None, ): if request_kwargs is None: request_kwargs = {} if callback_kwargs is None: callback_kwargs = {} request_id = str(random.randint(0, 9999)).zfill(4) # maybe hash with self._request_lock: self._request_queue[request_id] = PendingRequest( requested_value=requested_value, value=None, # expires_on=Server.time_now()+timeout, #TODO callback=callback, callback_args=callback_args, callback_kwargs=callback_kwargs, request_args=request_args, request_kwargs=request_kwargs, resend=True, ) self._send(MessageManager.create_request(requested_value, request_id, request_args, request_kwargs)) def get_file(self, client_filepath, filepath=None, callback=None, callback_args=(), callback_kwargs=None, ): if callback_kwargs is None: callback_kwargs = {} if filepath is None: filepath = os.path.split(client_filepath)[1] request_kwargs = {"filepath": client_filepath} callback_kwargs.update({"filepath": filepath}) self.get_response("filetransfer", callback, request_kwargs=request_kwargs, callback_args=callback_args, callback_kwargs=callback_kwargs) def _resend_requests(self): with self._request_lock: for request_id, request in self._request_queue.items(): # TODO filter if request.resend: self._send(MessageManager.create_request( request.requested_value, request_id, request.request_kwargs.update(resend=request.resend)) ) request.resend = False def send_message(self, action, args=(), kwargs=None): self._send(MessageManager.create_action_message(action, args, kwargs)) def _send_response(self, requested_value, request_id, value, filetransfer=False): self._send(MessageManager.create_response(requested_value, request_id, value, filetransfer)) def send_file(self, filepath, dest_filepath): # clever_restart=False try: with open(filepath, 'rb') as f: data = f.read() except OSError as error: logger.warning("File can not be opened due error: ".format(error)) else: logger.info("Sending file {} to {} (as: {})".format(filepath, self.addr, dest_filepath)) self._send(MessageManager.create_message(data, "binary", "message", additional_headers={"action": "filetransfer", "filepath": dest_filepath})) class NotifierSock(Singleton): def __init__(self): self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) self._server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self._sending_sock = socket.socket() self._send_lock = threading.Lock() self._receiving_sock = None def init(self, selector, port=26000): port += random.randint(0, 100) # local testing fix self._server_socket.bind(('', port)) self._server_socket.listen(1) self._sending_sock.connect(('127.0.0.1', port)) self._receiving_sock, _ = self._server_socket.accept() logger.info("Notify socket: connected") selector.register(self._receiving_sock, selectors.EVENT_READ, data=self) logger.info("Notify socket: selector registered") def get_sock(self): return self._receiving_sock def notify(self): with self._send_lock: if self._receiving_sock is None: return self._sending_sock.sendall(bytes(1)) logger.debug("Notify socket: notified") def process_events(self, mask): if mask & selectors.EVENT_READ and self._receiving_sock is not None: try: self._receiving_sock.recv(1024) logger.debug("Notify socket: received") except io.BlockingIOError: pass except Exception as e: logger.error(e) def close(self): try: self._server_socket.close() self._sending_sock.close() self._receiving_sock.close() except (OSError, KeyError) as error: logger.error("Error during unregistring notifier socket: {}".format(error))