mirror of
https://github.com/CopterExpress/clever-show.git
synced 2026-06-04 02:59:32 +00:00
QT server refactor and improvements
This commit is contained in:
123
Server/server.py
123
Server/server.py
@@ -22,33 +22,23 @@ random.seed()
|
||||
|
||||
now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
|
||||
path = 'server_logs'
|
||||
if not os.path.exists(path):
|
||||
log_path = 'server_logs'
|
||||
if not os.path.exists(log_path):
|
||||
try:
|
||||
os.mkdir(path)
|
||||
os.mkdir(log_path)
|
||||
except OSError:
|
||||
print("Creation of the directory %s failed" % path)
|
||||
print("Creation of the directory {} failed".format(log_path))
|
||||
else:
|
||||
print("Successfully created the directory %s " % path)
|
||||
|
||||
logging.basicConfig( # TODO all prints as logs
|
||||
level=logging.DEBUG,
|
||||
format="%(asctime)s [%(name)-7.7s] [%(threadName)-19.19s] [%(levelname)-7.7s] %(message)s",
|
||||
handlers=[
|
||||
logging.FileHandler("server_logs/{}.log".format(now)),
|
||||
logging.StreamHandler()
|
||||
])
|
||||
print("Successfully created the directory {}".format(log_path))
|
||||
|
||||
ConfigOption = collections.namedtuple("ConfigOption", ["section", "option", "value"])
|
||||
|
||||
|
||||
class Server(messaging.Singleton):
|
||||
def __init__(self, server_id=None, config_path="config/server.ini", on_stop=None):
|
||||
def __init__(self, server_id=None, config_path="config/server.ini"):
|
||||
self.id = server_id if server_id else str(random.randint(0, 9999)).zfill(4)
|
||||
self.time_started = 0
|
||||
|
||||
self.on_stop = on_stop
|
||||
|
||||
# Init socket
|
||||
self.sel = selectors.DefaultSelector()
|
||||
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
@@ -70,16 +60,20 @@ class Server(messaging.Singleton):
|
||||
|
||||
self.broadcast_thread = threading.Thread(target=self._ip_broadcast, daemon=True,
|
||||
name='IP broadcast sender')
|
||||
self.broadcast_thread_running = threading.Event()
|
||||
self.broadcast_thread_running = threading.Event() # TOOD replace by interrupt
|
||||
self.broadcast_thread_interrupt = threading.Event()
|
||||
|
||||
self.listener_thread = threading.Thread(target=self._broadcast_listen, daemon=True,
|
||||
name='IP broadcast listener')
|
||||
self.listener_thread_running = threading.Event()
|
||||
|
||||
def start(self): # do_auto_connect=True, , do_listen_broadcast=False
|
||||
# load config on startup
|
||||
def load_config(self):
|
||||
self.config.load_config_and_spec(self.config_path)
|
||||
|
||||
def start(self):
|
||||
# load config on startup
|
||||
self.load_config()
|
||||
|
||||
self.time_started = time.time()
|
||||
|
||||
logging.info("Starting server with id: {} on {}:{} !".format(self.id, self.ip, self.config.server_port))
|
||||
@@ -102,18 +96,26 @@ class Server(messaging.Singleton):
|
||||
|
||||
def stop(self):
|
||||
logging.info("Stopping server")
|
||||
|
||||
self.client_processor_thread_running.clear()
|
||||
|
||||
self.broadcast_thread_interrupt.set()
|
||||
self.broadcast_thread_running.clear()
|
||||
|
||||
self.listener_thread_running.clear()
|
||||
|
||||
messaging.NotifierSock().notify()
|
||||
|
||||
self.server_socket.close()
|
||||
self.sel.close()
|
||||
|
||||
messaging.NotifierSock().close()
|
||||
|
||||
logging.info("Server stopped")
|
||||
|
||||
if self.on_stop is not None:
|
||||
self.on_stop()
|
||||
|
||||
sys.exit("Stopped")
|
||||
def terminate(self, reason="Terminated"):
|
||||
self.stop()
|
||||
logging.critical(reason)
|
||||
|
||||
@staticmethod
|
||||
def get_ntp_time(ntp_host, ntp_port):
|
||||
@@ -126,22 +128,22 @@ class Server(messaging.Singleton):
|
||||
|
||||
def time_now(self):
|
||||
if self.config.ntp_use:
|
||||
timenow = self.get_ntp_time(self.config.ntp_host, self.config.ntp_port)
|
||||
else:
|
||||
timenow = time.time()
|
||||
return timenow
|
||||
return self.get_ntp_time(self.config.ntp_host, self.config.ntp_port)
|
||||
|
||||
return time.time()
|
||||
|
||||
# noinspection PyArgumentList
|
||||
def _client_processor(self):
|
||||
logging.info("Client processor (selector) thread started!")
|
||||
|
||||
messaging.NotifierSock().init(self.sel)
|
||||
|
||||
self.server_socket.listen()
|
||||
self.server_socket.setblocking(False)
|
||||
self.sel.register(self.server_socket, selectors.EVENT_READ, data=None) #| selectors.EVENT_WRITE
|
||||
|
||||
messaging.NotifierSock().bind((self.ip, self.config.server_port))
|
||||
|
||||
while self.client_processor_thread_running.is_set():
|
||||
events = self.sel.select()
|
||||
events = self.sel.select(timeout=1)
|
||||
#logging.error('tick')
|
||||
for key, mask in events:
|
||||
# logging.error(mask)
|
||||
@@ -161,19 +163,21 @@ class Server(messaging.Singleton):
|
||||
logging.info("Client autoconnect thread stopped!")
|
||||
|
||||
def _connect_client(self, sock):
|
||||
conn, addr = sock.accept()
|
||||
try:
|
||||
conn, addr = sock.accept()
|
||||
except OSError:
|
||||
logging.error("Error while connecting socket!")
|
||||
return
|
||||
|
||||
logging.info("Got connection from: {}".format(str(addr)))
|
||||
conn.setblocking(False)
|
||||
|
||||
if addr[0] == self.ip and messaging.NotifierSock().addr is None:
|
||||
client = messaging.NotifierSock()
|
||||
logging.info("Notifier sock client")
|
||||
|
||||
elif not any([client_addr == addr[0] for client_addr in Client.clients.keys()]):
|
||||
if not any([client_addr == addr[0] for client_addr in Client.clients.keys()]):
|
||||
client = Client(addr[0])
|
||||
logging.info("New client")
|
||||
else:
|
||||
client = Client.clients[addr[0]]
|
||||
client.close(True) # to ensure in unregistering
|
||||
logging.info("Reconnected client")
|
||||
self.sel.register(conn, selectors.EVENT_READ, data=client)
|
||||
client.connect(self.sel, conn, addr)
|
||||
@@ -191,6 +195,7 @@ class Server(messaging.Singleton):
|
||||
|
||||
try:
|
||||
while self.broadcast_thread_running.is_set():
|
||||
self.broadcast_thread_interrupt.wait(timeout=self.config.broadcast_delay)
|
||||
broadcast_sock.sendto(msg, ('255.255.255.255', self.config.broadcast_port))
|
||||
logging.debug("Broadcast sent")
|
||||
|
||||
@@ -205,9 +210,8 @@ class Server(messaging.Singleton):
|
||||
try:
|
||||
broadcast_client.bind(("", self.config.broadcast_port))
|
||||
except OSError:
|
||||
logging.critical("Another server is running on this computer, shutting down!")
|
||||
# TODO popup and as function
|
||||
self.stop()
|
||||
self.terminate("Another server is running on this computer, shutting down!")
|
||||
return
|
||||
|
||||
try:
|
||||
while self.listener_thread_running.is_set():
|
||||
@@ -225,9 +229,7 @@ class Server(messaging.Singleton):
|
||||
|
||||
if different_id and self_younger:
|
||||
# younger server should shut down
|
||||
logging.critical("Another server detected over the network, shutting down!")
|
||||
# TODO popup
|
||||
self.stop()
|
||||
self.terminate("Another server detected over the network, shutting down!")
|
||||
|
||||
else:
|
||||
logging.warning("Got wrong broadcast message from {}".format(addr))
|
||||
@@ -268,7 +270,7 @@ class Client(messaging.ConnectionManager):
|
||||
on_disconnect = None
|
||||
|
||||
def __init__(self, ip):
|
||||
super(Client, self).__init__()
|
||||
super().__init__()
|
||||
self.copter_id = None
|
||||
self.connected = False
|
||||
|
||||
@@ -276,7 +278,7 @@ class Client(messaging.ConnectionManager):
|
||||
|
||||
@staticmethod
|
||||
def get_by_id(copter_id):
|
||||
for client in Client.clients.values():
|
||||
for client in Client.clients.values(): # TODO filter
|
||||
if client.copter_id == copter_id:
|
||||
return client
|
||||
|
||||
@@ -285,7 +287,7 @@ class Client(messaging.ConnectionManager):
|
||||
if not self.resume_queue:
|
||||
self._send_queue = collections.deque()
|
||||
|
||||
super(Client, self).connect(client_selector, client_socket, client_addr)
|
||||
super().connect(client_selector, client_socket, client_addr)
|
||||
|
||||
self.connected = True
|
||||
|
||||
@@ -295,10 +297,12 @@ class Client(messaging.ConnectionManager):
|
||||
if self.on_connect:
|
||||
self.on_connect(self)
|
||||
|
||||
def _got_id(self, value):
|
||||
def _got_id(self, _client, value):
|
||||
logging.info("Got copter id: {} for client {}".format(value, self.addr))
|
||||
old_id = self.copter_id
|
||||
self.copter_id = value
|
||||
if self.on_first_connect:
|
||||
|
||||
if old_id is None and self.on_first_connect:
|
||||
self.on_first_connect(self)
|
||||
|
||||
def close(self, inner=False):
|
||||
@@ -308,25 +312,26 @@ class Client(messaging.ConnectionManager):
|
||||
self.on_disconnect(self)
|
||||
|
||||
if inner:
|
||||
super(Client, self)._close()
|
||||
super()._close()
|
||||
else:
|
||||
super(Client, self).close()
|
||||
super().close()
|
||||
|
||||
logging.info("Connection to {} closed!".format(self.copter_id))
|
||||
|
||||
def remove(self):
|
||||
if self.connected:
|
||||
self.close()
|
||||
if self.clients:
|
||||
try:
|
||||
self.clients.pop(self.addr[0])
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
|
||||
try:
|
||||
self.clients.pop(self.addr[0])
|
||||
except KeyError as e:
|
||||
logging.error(e)
|
||||
|
||||
logging.info("Client {} successfully removed!".format(self.copter_id))
|
||||
|
||||
@requires_connect
|
||||
def _send(self, data):
|
||||
super(Client, self)._send(data)
|
||||
super()._send(data)
|
||||
logging.debug("Queued data to send: {}".format(data))
|
||||
|
||||
def send_config_options(self, *options: ConfigOption, reload_config=True):
|
||||
@@ -352,6 +357,14 @@ class Client(messaging.ConnectionManager):
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
format="%(asctime)s [%(name)-7.7s] [%(threadName)-19.19s] [%(levelname)-7.7s] %(message)s",
|
||||
handlers=[
|
||||
logging.FileHandler("server_logs/{}.log".format(now)),
|
||||
logging.StreamHandler()
|
||||
])
|
||||
|
||||
server = Server()
|
||||
server.start()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user