Improved notifier closing and error handling + fixes

This commit is contained in:
Artem30801
2020-01-02 19:26:27 +03:00
parent 63f6bdbe52
commit 0ed088a99d
3 changed files with 47 additions and 36 deletions

View File

@@ -101,9 +101,6 @@ def check_start_pos_status(item):
def check_time_delta(item):
return abs(item) < ModelChecks.time_delta_max
battery_min = config.getfloat('CHECKS', 'battery_percentage_min')
start_pos_delta_max = config.getfloat('CHECKS', 'start_pos_delta_max')
time_delta_max = config.getfloat('CHECKS', 'time_delta_max')
class ModelChecks:
checks_dict = {}
@@ -151,7 +148,7 @@ def check_anim(item):
def check_bat(item):
if item == "NO_INFO":
return False
return item[1]*100 > battery_min
return item[1]*100 > ModelChecks.battery_min
@ModelChecks.col_check(4)
@@ -188,7 +185,7 @@ def check_start_pos_status(item):
@ModelChecks.col_check(10)
def check_time_delta(item):
return abs(item) < time_delta_max
return abs(item) < ModelChecks.time_delta_max
class CopterData:

View File

@@ -1,3 +1,4 @@
import os
import sys
import time
import socket
@@ -8,7 +9,7 @@ import threading
import selectors
import collections
import os, inspect # Add parent dir to PATH to import messaging_lib and config_lib
import inspect # Add parent dir to PATH to import messaging_lib and config_lib
current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parent_dir = os.path.dirname(current_dir)
@@ -140,14 +141,11 @@ class Server(messaging.Singleton):
self.server_socket.listen()
self.server_socket.setblocking(False)
self.sel.register(self.server_socket, selectors.EVENT_READ, data=None) #| selectors.EVENT_WRITE
self.sel.register(self.server_socket, selectors.EVENT_READ, data=None)
while self.client_processor_thread_running.is_set():
events = self.sel.select(timeout=1)
#logging.error('tick')
for key, mask in events:
# logging.error(mask)
# logging.error(str(key.data))
client = key.data
if client is None:
self._connect_client(key.fileobj)
@@ -197,8 +195,15 @@ class Server(messaging.Singleton):
try:
while self.broadcast_thread_running.is_set():
self.broadcast_thread_interrupt.wait(timeout=self.config.broadcast_delay)
broadcast_sock.sendto(msg, ('255.255.255.255', self.config.broadcast_port))
logging.debug("Broadcast sent")
try:
broadcast_sock.sendto(msg, ('255.255.255.255', self.config.broadcast_port))
except OSError as e:
logging.error(f"Cannot send broadcast due error {e}")
else:
logging.debug("Broadcast sent")
except Exception as e:
logging.error(f"Unexpected error {e}!")
raise
finally:
broadcast_sock.close()
@@ -208,6 +213,7 @@ class Server(messaging.Singleton):
logging.info("Broadcast listener thread started!")
broadcast_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
broadcast_client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# broadcast_client.settimeout(1)
try:
broadcast_client.bind(("", self.config.broadcast_port))
except OSError:
@@ -216,7 +222,12 @@ class Server(messaging.Singleton):
try:
while self.listener_thread_running.is_set():
data, addr = broadcast_client.recvfrom(1024) # TODO nonblock
try:
data, addr = broadcast_client.recvfrom(1024) # TODO nonblock
except OSError:
logging.error(f"Cannot receive broadcast due error {e}")
continue
message = messaging.MessageManager()
message.income_raw = data
message.process_message()
@@ -234,12 +245,16 @@ class Server(messaging.Singleton):
else:
logging.warning("Got wrong broadcast message from {}".format(addr))
except Exception as e:
logging.error(f"Unexpected error {e}!")
raise
finally:
broadcast_client.close()
logging.info("Broadcast listener thread stopped, socked closed!")
def send_starttime(self, copter, start_time):
print('start_time: {}'.format(start_time))
copter.send_message("start", {"time": str(start_time)})
@@ -247,8 +262,7 @@ def requires_connect(f):
def wrapper(*args, **kwargs):
if args[0].connected:
return f(*args, **kwargs)
else:
logging.warning("Function requires client to be connected!")
logging.warning("Function requires client to be connected!")
return wrapper
@@ -257,8 +271,7 @@ def requires_any_connected(f):
def wrapper(*args, **kwargs):
if Client.clients:
return f(*args, **kwargs)
else:
logging.warning("No clients were connected!")
logging.warning("No clients were connected!")
return wrapper
@@ -335,15 +348,6 @@ class Client(messaging.ConnectionManager):
super()._send(data)
logging.debug("Queued data to send (first 256 bytes): {}".format(data[:256]))
def send_config_options(self, *options: ConfigOption, reload_config=True):
logging.info("Sending config options: {} to {}".format(options, self.addr))
sending_options = [{'section': option.section, 'option': option.option, 'value': option.value}
for option in options]
print(sending_options)
self.send_message(
'config_write', {"options": sending_options, "reload": reload_config}
)
@staticmethod
@requires_any_connected
def broadcast(message, force_all=False):
@@ -370,4 +374,4 @@ if __name__ == '__main__':
server.start()
while True:
pass
pass

View File

@@ -5,7 +5,6 @@ import json
import socket
import struct
import random
import inspect
import logging
import threading
import collections
@@ -35,9 +34,6 @@ class PendingRequest(Namespace): pass
logger = logging.getLogger(__name__)
# logger = logging_lib.Logger(_logger, True)
def get_ip_address():
try:
with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as ip_socket:
@@ -506,6 +502,7 @@ class NotifierSock(Singleton):
self._send_lock = threading.Lock()
self._receiving_sock = None
self._selector = None
def init(self, selector, port=26000):
port += random.randint(0, 100) # local testing fix
@@ -517,6 +514,7 @@ class NotifierSock(Singleton):
logger.info("Notify socket: connected")
selector.register(self._receiving_sock, selectors.EVENT_READ, data=self)
self._selector = selector
logger.info("Notify socket: selector registered")
def get_sock(self):
@@ -524,9 +522,10 @@ class NotifierSock(Singleton):
def notify(self):
with self._send_lock:
if self._receiving_sock is not None:
self._sending_sock.sendall(bytes(1))
logger.debug("Notify socket: notified")
if self._receiving_sock is None:
return
self._sending_sock.sendall(bytes(1))
logger.debug("Notify socket: notified")
def process_events(self, mask):
if mask & selectors.EVENT_READ and self._receiving_sock is not None:
@@ -536,4 +535,15 @@ class NotifierSock(Singleton):
except io.BlockingIOError:
pass
except Exception as e:
print(e)
logger.error(e)
def close(self):
try:
self._selector.unregister(self._receiving_sock)
self._server_socket.close()
self._sending_sock.close()
self._receiving_sock.close()
except OSError as e:
pass