mirror of
https://github.com/CopterExpress/clever-show.git
synced 2026-06-02 10:09:32 +00:00
Feature branch: IMPORTANT connection+telemetry+table fixes and improvements (#55)
* .client_connected > .new_client_connected
* Fixed 'confirmation_required' wrapper
* Logging impr
* Changed and optimized a lot checks behaviour
* Added indication of connected/disconnected copters
* update_data_signal changed signature
* Added client removing functionality
* Option for automatically remove disconnected copters from table
* Renaming copters from QT server table on the go + some improvements
* Server: Check if self.clients list is not empty when trying to pop element from it
* Probably fixes behaviour of non-immidiate data sending from server
* Added changing hostname of copter
* Updated config
* Preview of selfchecheck results on double click
* Delete doc_2019-10-16_17-57-17.bashrc
* Update table data models for selfcheck
* Server: modify set id request to message
* Update client_config default file
* Client: modify set new id function
* Client: add avahi-daemon to restart when restarting network
* Client: add new hostname to ssh motd message, do not change hostname if no network restart in config
* Client: add newline to motd message
* Optimized request behaviour
* Client: fix service file and restart order
* Client: Add SO_KEEPALIVE and TCP_NODELAY options to client socket
* Modify to last tests with ping
* Client: remove ping
* Client: select reboot option when change id and add execute command
* Server: Add SO_KEEPALIVE option to server socket
* Server: Change removing copter
* Request resending after disconnection
* Resending improval (for furthrer functionality & fixes
* Fix of client removing behaviour
* Debugging
* Revert dubug code; 'Remove' fix confirmed
* do not clear requests queue
* Update requirements.txt
* Added namespace class to fix resend
* Improvements and simplification of notifier + port to client
* Refactor of telemetry thread
* Simplify lambdas
* Compress hostname check to single regex
* Changes in telemetry
* Refactored formatting of telemetry in table. NOT DONE
* Fix
* Git checkout. REVERT later!
* Conection fix
* Compability fixes
* Update start position
* Fix for reconnection with notifier socket
* Added traceback for pyqt5
* Fixes in new telemetry display
* Added lock to Telemetry
* Fixes for table display
* Fix of doubling line of client in table
* Fix of mass-removing clients from table
* Fix for clinet double-connection+removal
* Fix lock in Telemetry
* Changed signature of response callbacks for better syntax & fixes (all tested)
* Revert "Git checkout. REVERT later!"
This reverts commit 6122352380.
* Server: fix formatters
* Client: Remove telemetry_loop, small refactor of Telemetry class
* Server: Add formatters
* Server: Very small refactor
* Server: Fix checks and formatters
* Client: Fix check_failsafe function, small code refactor
* Client: update default config file
This commit is contained in:
committed by
Arthur Golubtsov
parent
53dad0e3fd
commit
ce36c6f1e3
100
messaging_lib.py
100
messaging_lib.py
@@ -17,12 +17,24 @@ try:
|
||||
except ImportError:
|
||||
import selectors2 as selectors
|
||||
|
||||
|
||||
# import logging_lib
|
||||
|
||||
PendingRequest = collections.namedtuple("PendingRequest", ["value", "requested_value", # "expires_on",
|
||||
"callback", "callback_args", "callback_kwargs",
|
||||
"request_args", "resend",
|
||||
])
|
||||
|
||||
class Namespace:
|
||||
def __init__(self, **kwargs):
|
||||
self.__dict__.update(kwargs)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.__dict__[key]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.__dict__[key] = value
|
||||
|
||||
|
||||
class PendingRequest(Namespace): pass
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -239,10 +251,19 @@ class ConnectionManager(object):
|
||||
self.socket = client_socket
|
||||
self.addr = client_addr
|
||||
|
||||
self._clear()
|
||||
|
||||
self._set_selector_events_mask('r')
|
||||
if self.resend_requests:
|
||||
self._resend_requests()
|
||||
|
||||
def _clear(self):
|
||||
if not self.resume_queue: # maybe needs locks
|
||||
self._recv_buffer = b''
|
||||
self._send_buffer = b''
|
||||
self._received_queue.clear()
|
||||
self._send_queue.clear()
|
||||
|
||||
def close(self):
|
||||
with self._close_lock:
|
||||
self._should_close = True
|
||||
@@ -253,11 +274,6 @@ class ConnectionManager(object):
|
||||
def _close(self):
|
||||
logger.info("Closing connection to {}".format(self.addr))
|
||||
|
||||
if not self.resume_queue:
|
||||
self._recv_buffer = b''
|
||||
self._send_buffer = b''
|
||||
self._received_queue.clear() #
|
||||
|
||||
try:
|
||||
logger.info("Unregistering selector of {}".format(self.addr))
|
||||
self.selector.unregister(self.socket)
|
||||
@@ -281,6 +297,7 @@ class ConnectionManager(object):
|
||||
with self._close_lock:
|
||||
self._should_close = False
|
||||
|
||||
self._clear()
|
||||
logger.info("CLOSED connection to {}".format(self.addr))
|
||||
|
||||
def process_events(self, mask):
|
||||
@@ -379,7 +396,7 @@ class ConnectionManager(object):
|
||||
)
|
||||
|
||||
f = request.callback
|
||||
f(value, *request.callback_args, **request.callback_kwargs)
|
||||
f(self, value, *request.callback_args, **request.callback_kwargs)
|
||||
else:
|
||||
logger.warning("Unexpected response!")
|
||||
|
||||
@@ -417,9 +434,6 @@ class ConnectionManager(object):
|
||||
logger.warning(
|
||||
"Attempt to send message {} to {} failed due error: {}".format(self._send_buffer, self.addr, error))
|
||||
|
||||
if not self.resume_queue:
|
||||
self._send_buffer = b''
|
||||
|
||||
raise error
|
||||
else:
|
||||
logger.debug("Sent {} to {}".format(self._send_buffer[:sent], self.addr))
|
||||
@@ -456,14 +470,12 @@ class ConnectionManager(object):
|
||||
|
||||
def _resend_requests(self):
|
||||
with self._request_lock:
|
||||
for request_id, request in self._request_queue.items():
|
||||
for request_id, request in self._request_queue.items(): #TODO filter
|
||||
if request.resend:
|
||||
self._send(MessageManager.create_request(
|
||||
request.requested_value, request_id, request.request_args.update(resend=request.resend))
|
||||
)
|
||||
#request.resend = False
|
||||
|
||||
# self._request_queue.clear()
|
||||
request.resend = False
|
||||
|
||||
def send_message(self, command, args=None):
|
||||
self._send(MessageManager.create_simple_message(command, args))
|
||||
@@ -484,41 +496,45 @@ class ConnectionManager(object):
|
||||
))
|
||||
|
||||
|
||||
class NotifierSock(Singleton): #TODO remake as connecting ONLY to self socket and selector
|
||||
class NotifierSock(Singleton):
|
||||
def __init__(self):
|
||||
self.receive_socket = None
|
||||
self.addr = None
|
||||
self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self._server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self._server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
||||
self._server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
|
||||
self._notify_socket = None
|
||||
self._notify_lock = threading.Lock()
|
||||
self._sending_sock = socket.socket()
|
||||
self._send_lock = threading.Lock()
|
||||
|
||||
def bind(self, server_addr):
|
||||
self._notify_socket = socket.socket()
|
||||
self._notify_socket.connect(server_addr)
|
||||
logger.info("Notify socket: bind")
|
||||
self._receiving_sock = None
|
||||
|
||||
def connect(self, _, client_socket, client_addr):
|
||||
self.receive_socket = client_socket
|
||||
self.addr = client_addr
|
||||
def init(self, selector, port=26000):
|
||||
port += random.randint(0, 100) # local testing fix
|
||||
|
||||
self._server_socket.bind(('', port))
|
||||
self._server_socket.listen(1)
|
||||
self._sending_sock.connect(('127.0.0.1', port))
|
||||
self._receiving_sock, _ = self._server_socket.accept()
|
||||
logger.info("Notify socket: connected")
|
||||
|
||||
selector.register(self._receiving_sock, selectors.EVENT_READ, data=self)
|
||||
logger.info("Notify socket: selector registered")
|
||||
|
||||
def get_sock(self):
|
||||
return self._receiving_sock
|
||||
|
||||
def notify(self):
|
||||
with self._notify_lock:
|
||||
if self.addr is not None:
|
||||
self._notify_socket.sendall(bytes(1))
|
||||
with self._send_lock:
|
||||
if self._receiving_sock is not None:
|
||||
self._sending_sock.sendall(bytes(1))
|
||||
logger.debug("Notify socket: notified")
|
||||
|
||||
def process_events(self, mask):
|
||||
if mask & selectors.EVENT_READ:
|
||||
if mask & selectors.EVENT_READ and self._receiving_sock is not None:
|
||||
try:
|
||||
data = self.receive_socket.recv(1024)
|
||||
except Exception: # TODO remove
|
||||
self._receiving_sock.recv(1024)
|
||||
logger.debug("Notify socket: received")
|
||||
except io.BlockingIOError:
|
||||
pass
|
||||
else:
|
||||
if data:
|
||||
logger.debug("Notifier received {} from {}".format(data, self.addr))
|
||||
else:
|
||||
self.addr = None
|
||||
logger.warning("Notifier: connection to {} lost!".format(self.addr))
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
Reference in New Issue
Block a user