diff --git a/messaging_lib.py b/messaging_lib.py index 6b2116f..8f1b8b8 100644 --- a/messaging_lib.py +++ b/messaging_lib.py @@ -19,6 +19,7 @@ except ImportError: PendingRequest = collections.namedtuple("PendingRequest", ["value", "requested_value", # "expires_on", "callback", "callback_args", "callback_kwargs", + "resend" ]) logger = logging.getLogger(__name__) @@ -212,6 +213,7 @@ class ConnectionManager(object): self.BUFFER_SIZE = 1024 self.resume_queue = False + self.resend_requests = True def _set_selector_events_mask(self, mode): """Set selector to listen for events: mode is 'r', 'w', 'rw'.""" @@ -234,16 +236,24 @@ class ConnectionManager(object): self.addr = client_addr self._set_selector_events_mask('r') + if self.resend_requests: + self._resend_requests() def close(self): with self._close_lock: self._should_close = True - self._set_selector_events_mask('w') + self._set_selector_events_mask('rw') NotifierSock().notify() def _close(self): logger.info("Closing connection to {}".format(self.addr)) + + if not self.resume_queue: + self._recv_buffer = b'' + self._send_buffer = b'' + self._received_queue.clear() # + try: logger.info("Unregistering selector of {}".format(self.addr)) self.selector.unregister(self.socket) @@ -297,7 +307,7 @@ class ConnectionManager(object): self._received_queue[0].income_raw = b'' if self._received_queue: - if self._received_queue[-1].content: + if self._received_queue[0].content: self.process_received(self._received_queue.popleft()) def _read(self): @@ -312,8 +322,6 @@ class ConnectionManager(object): logger.debug("Received {} from {}".format(data, self.addr)) else: logger.warning("Connection to {} lost!".format(self.addr)) - if not self.resume_queue: - self._recv_buffer = b'' raise RuntimeError("Peer closed.") @@ -415,7 +423,7 @@ class ConnectionManager(object): self._send_queue.append(data) if self.selector.get_key(self.socket).events != selectors.EVENT_WRITE: - self._set_selector_events_mask('w') + self._set_selector_events_mask('rw') NotifierSock().notify() def get_response(self, requested_value, callback, request_args=None, # timeout=30, @@ -434,9 +442,21 @@ class ConnectionManager(object): callback=callback, callback_args=callback_args, callback_kwargs=callback_kwargs, + resend=True, ) self._send(MessageManager.create_request(requested_value, request_id, request_args)) + def _resend_requests(self): + with self._request_lock: + for request_id, request in self._request_queue.items(): + if request.resend: + self._send(MessageManager.create_request( + request.requested_value, request_id, request.request_args) + ) + request.resend = False + + self._request_queue.clear() + def send_message(self, command, args=None): self._send(MessageManager.create_simple_message(command, args)) @@ -456,7 +476,7 @@ class ConnectionManager(object): )) -class NotifierSock(Singleton): +class NotifierSock(Singleton): #TODO remake as connecting ONLY to self socket and selector def __init__(self): self.receive_socket = None self.addr = None