Request resending after disconnection

This commit is contained in:
Artem30801
2019-10-26 14:36:04 +03:00
parent 7232bdae36
commit 10ce1c0ce1

View File

@@ -19,6 +19,7 @@ except ImportError:
PendingRequest = collections.namedtuple("PendingRequest", ["value", "requested_value", # "expires_on",
"callback", "callback_args", "callback_kwargs",
"resend"
])
logger = logging.getLogger(__name__)
@@ -212,6 +213,7 @@ class ConnectionManager(object):
self.BUFFER_SIZE = 1024
self.resume_queue = False
self.resend_requests = True
def _set_selector_events_mask(self, mode):
"""Set selector to listen for events: mode is 'r', 'w', 'rw'."""
@@ -234,16 +236,24 @@ class ConnectionManager(object):
self.addr = client_addr
self._set_selector_events_mask('r')
if self.resend_requests:
self._resend_requests()
def close(self):
with self._close_lock:
self._should_close = True
self._set_selector_events_mask('w')
self._set_selector_events_mask('rw')
NotifierSock().notify()
def _close(self):
logger.info("Closing connection to {}".format(self.addr))
if not self.resume_queue:
self._recv_buffer = b''
self._send_buffer = b''
self._received_queue.clear() #
try:
logger.info("Unregistering selector of {}".format(self.addr))
self.selector.unregister(self.socket)
@@ -297,7 +307,7 @@ class ConnectionManager(object):
self._received_queue[0].income_raw = b''
if self._received_queue:
if self._received_queue[-1].content:
if self._received_queue[0].content:
self.process_received(self._received_queue.popleft())
def _read(self):
@@ -312,8 +322,6 @@ class ConnectionManager(object):
logger.debug("Received {} from {}".format(data, self.addr))
else:
logger.warning("Connection to {} lost!".format(self.addr))
if not self.resume_queue:
self._recv_buffer = b''
raise RuntimeError("Peer closed.")
@@ -415,7 +423,7 @@ class ConnectionManager(object):
self._send_queue.append(data)
if self.selector.get_key(self.socket).events != selectors.EVENT_WRITE:
self._set_selector_events_mask('w')
self._set_selector_events_mask('rw')
NotifierSock().notify()
def get_response(self, requested_value, callback, request_args=None, # timeout=30,
@@ -434,9 +442,21 @@ class ConnectionManager(object):
callback=callback,
callback_args=callback_args,
callback_kwargs=callback_kwargs,
resend=True,
)
self._send(MessageManager.create_request(requested_value, request_id, request_args))
def _resend_requests(self):
with self._request_lock:
for request_id, request in self._request_queue.items():
if request.resend:
self._send(MessageManager.create_request(
request.requested_value, request_id, request.request_args)
)
request.resend = False
self._request_queue.clear()
def send_message(self, command, args=None):
self._send(MessageManager.create_simple_message(command, args))
@@ -456,7 +476,7 @@ class ConnectionManager(object):
))
class NotifierSock(Singleton):
class NotifierSock(Singleton): #TODO remake as connecting ONLY to self socket and selector
def __init__(self):
self.receive_socket = None
self.addr = None