Files
clever-show/Server/server.py
artem30801 ce36c6f1e3 Feature branch: IMPORTANT connection+telemetry+table fixes and improvements (#55)
* .client_connected > .new_client_connected

* Fixed 'confirmation_required' wrapper

* Logging impr

* Changed and optimized a lot checks behaviour

* Added indication of connected/disconnected copters

* update_data_signal changed signature

* Added client removing functionality

* Option for automatically remove disconnected copters from table

* Renaming copters from QT server table on the go + some improvements

* Server: Check if self.clients list is not empty when trying to pop element from it

* Probably fixes behaviour of non-immidiate data sending from server

* Added changing hostname of copter

* Updated config

* Preview of selfchecheck results on double click

* Delete doc_2019-10-16_17-57-17.bashrc

* Update table data models for selfcheck

* Server: modify set id request to message

* Update client_config default file

* Client: modify set new id function

* Client: add avahi-daemon to restart when restarting network

* Client: add new hostname to ssh motd message, do not change hostname if no network restart in config

* Client: add newline to motd message

* Optimized request behaviour

* Client: fix service file and restart order

* Client: Add SO_KEEPALIVE and TCP_NODELAY options to client socket

* Modify to last tests with ping

* Client: remove ping

* Client: select reboot option when change id and add execute command

* Server: Add SO_KEEPALIVE option to server socket

* Server: Change removing copter

* Request resending after disconnection

* Resending improval (for furthrer functionality & fixes

* Fix of client removing behaviour

* Debugging

* Revert dubug code; 'Remove' fix confirmed

* do not clear requests queue

* Update requirements.txt

* Added namespace class to fix resend

* Improvements and simplification of notifier + port to client

* Refactor of telemetry thread

* Simplify lambdas

* Compress hostname check to single regex

* Changes in telemetry

* Refactored formatting of telemetry in table. NOT DONE

* Fix

* Git checkout. REVERT later!

* Conection fix

* Compability fixes

* Update start position

* Fix for reconnection with notifier socket

* Added traceback for pyqt5

* Fixes in new telemetry display

* Added lock to Telemetry

* Fixes for table display

* Fix of doubling line of client in table

* Fix of mass-removing clients from table

* Fix for clinet double-connection+removal

* Fix lock in Telemetry

* Changed signature of response callbacks for better syntax & fixes (all tested)

* Revert "Git checkout. REVERT later!"

This reverts commit 6122352380.

* Server: fix formatters

* Client: Remove telemetry_loop, small refactor of Telemetry class

* Server: Add formatters

* Server: Very small refactor

* Server: Fix checks and formatters

* Client: Fix check_failsafe function, small code refactor

* Client: update default config file
2019-12-05 15:10:21 +03:00

368 lines
13 KiB
Python

import sys
import time
import socket
import random
import logging
import datetime
import threading
import selectors
import collections
import configparser
import os, inspect # Add parent dir to PATH to import messaging_lib
current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parent_dir = os.path.dirname(current_dir)
sys.path.insert(0, parent_dir)
import messaging_lib as messaging
random.seed()
now = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
path = 'server_logs'
if not os.path.exists(path):
try:
os.mkdir(path)
except OSError:
print("Creation of the directory %s failed" % path)
else:
print("Successfully created the directory %s " % path)
logging.basicConfig( # TODO all prints as logs
level=logging.DEBUG,
format="%(asctime)s [%(name)-7.7s] [%(threadName)-19.19s] [%(levelname)-7.7s] %(message)s",
handlers=[
logging.FileHandler("server_logs/{}.log".format(now)),
logging.StreamHandler()
])
ConfigOption = collections.namedtuple("ConfigOption", ["section", "option", "value"])
class Server(messaging.Singleton):
def __init__(self, server_id=None, config_path="server_config.ini", on_stop=None):
self.id = server_id if server_id else str(random.randint(0, 9999)).zfill(4)
self.time_started = 0
self.on_stop = on_stop
# Init socket
self.sel = selectors.DefaultSelector()
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.host = socket.gethostname()
self.ip = messaging.get_ip_address()
# Init configs
self.config_path = config_path
self.config = configparser.ConfigParser()
self.load_config()
# Init threads
self.autoconnect_thread = threading.Thread(target=self._client_processor, daemon=True,
name='Client processor')
self.client_processor_thread_running = threading.Event() # Can be used for manual thread killing
self.broadcast_thread = threading.Thread(target=self._ip_broadcast, daemon=True,
name='IP broadcast sender')
self.broadcast_thread_running = threading.Event()
self.listener_thread = threading.Thread(target=self._broadcast_listen, daemon=True,
name='IP broadcast listener')
self.listener_thread_running = threading.Event()
def load_config(self):
self.config.read(self.config_path)
self.port = int(self.config['SERVER']['port']) # TODO try, init def
self.BUFFER_SIZE = int(self.config['SERVER']['buffer_size']) # TODO connect to connection manager
self.remove_disconnected = self.config.getboolean('SERVER', 'remove_disconnected')
self.use_broadcast = self.config.getboolean('BROADCAST', 'use_broadcast')
self.broadcast_port = int(self.config['BROADCAST']['broadcast_port'])
self.BROADCAST_DELAY = int(self.config['BROADCAST']['broadcast_delay'])
self.USE_NTP = self.config.getboolean('NTP', 'use_ntp')
self.NTP_HOST = self.config['NTP']['host']
self.NTP_PORT = int(self.config['NTP']['port'])
def start(self, do_ip_broadcast=None): # do_auto_connect=True, , do_listen_broadcast=False
self.time_started = time.time()
if do_ip_broadcast is None:
do_ip_broadcast = self.use_broadcast
logging.info("Starting server with id: {} on {}:{} !".format(self.id, self.ip, self.port))
logging.info("Starting server socket!")
self.server_socket.bind((self.ip, self.port))
logging.info("Starting client processor thread!")
self.client_processor_thread_running.set()
self.autoconnect_thread.start()
if do_ip_broadcast:
logging.info("Starting broadcast sender thread!")
self.broadcast_thread_running.set()
self.broadcast_thread.start()
logging.info("Starting broadcast listener thread!")
self.listener_thread_running.set()
self.listener_thread.start()
def stop(self):
logging.info("Stopping server")
self.client_processor_thread_running.clear()
self.broadcast_thread_running.clear()
self.listener_thread_running.clear()
self.server_socket.close()
self.sel.close()
logging.info("Server stopped")
if self.on_stop is not None:
self.on_stop()
sys.exit("Stopped")
@staticmethod
def get_ntp_time(ntp_host, ntp_port):
NTP_DELTA = 2208988800 # 1970-01-01 00:00:00
NTP_QUERY = b'\x1b' + bytes(47)
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ntp_socket:
ntp_socket.sendto(NTP_QUERY, (ntp_host, ntp_port))
msg, _ = ntp_socket.recvfrom(1024)
return int.from_bytes(msg[-8:], 'big') / 2 ** 32 - NTP_DELTA
def time_now(self):
if self.USE_NTP:
timenow = self.get_ntp_time(self.NTP_HOST, self.NTP_PORT)
else:
timenow = time.time()
return timenow
# noinspection PyArgumentList
def _client_processor(self):
logging.info("Client processor (selector) thread started!")
messaging.NotifierSock().init(self.sel)
self.server_socket.listen()
self.server_socket.setblocking(False)
self.sel.register(self.server_socket, selectors.EVENT_READ, data=None) #| selectors.EVENT_WRITE
while self.client_processor_thread_running.is_set():
events = self.sel.select()
#logging.error('tick')
for key, mask in events:
# logging.error(mask)
# logging.error(str(key.data))
client = key.data
if client is None:
self._connect_client(key.fileobj)
elif isinstance(client, messaging.ConnectionManager):
try:
client.process_events(mask)
except Exception as error:
logging.error("Exception {} occurred for {}! Resetting connection!".format(error, client.addr))
client.close(True)
else: # Notifier
client.process_events(mask)
logging.info("Client autoconnect thread stopped!")
def _connect_client(self, sock):
conn, addr = sock.accept()
logging.info("Got connection from: {}".format(str(addr)))
conn.setblocking(False)
if not any([client_addr == addr[0] for client_addr in Client.clients.keys()]):
client = Client(addr[0])
logging.info("New client")
else:
client = Client.clients[addr[0]]
client.close(True) # to ensure in unregistering
logging.info("Reconnected client")
self.sel.register(conn, selectors.EVENT_READ, data=client)
client.connect(self.sel, conn, addr)
def _ip_broadcast(self):
logging.info("Broadcast sender thread started!")
msg = messaging.MessageManager.create_simple_message(
"server_ip", {"host": self.ip, "port": str(self.port), "id": self.id, "start_time": str(self.time_started)})
broadcast_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
logging.info("Formed broadcast message: {}".format(msg))
time.sleep(self.BROADCAST_DELAY)
try:
while self.broadcast_thread_running.is_set():
broadcast_sock.sendto(msg, ('255.255.255.255', self.broadcast_port))
logging.debug("Broadcast sent")
time.sleep(self.BROADCAST_DELAY)
finally:
broadcast_sock.close()
logging.info("Broadcast sender thread stopped, socked closed!")
def _broadcast_listen(self):
logging.info("Broadcast listener thread started!")
broadcast_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
broadcast_client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
try:
broadcast_client.bind(("", self.broadcast_port))
except OSError:
logging.critical("Another server is running on this computer, shutting down!")
# TODO popup and as function
self.stop()
try:
while self.listener_thread_running.is_set():
data, addr = broadcast_client.recvfrom(1024) # TODO nonblock
message = messaging.MessageManager()
message.income_raw = data
message.process_message()
if message.content:
if message.content["command"] == "server_ip":
if message.content["args"]["id"] != str(self.id) \
and float(message.content["args"]["start_time"]) <= self.time_started:
# younger server should shut down
logging.critical("Another server detected over the network, shutting down!")
# TODO popup
self.stop()
else:
logging.warning("Got wrong broadcast message from {}".format(addr))
finally:
broadcast_client.close()
logging.info("Broadcast listener thread stopped, socked closed!")
def send_starttime(self, copter, start_time):
print('start_time: {}'.format(start_time))
copter.send_message("start", {"time": str(start_time)})
def requires_connect(f):
def wrapper(*args, **kwargs):
if args[0].connected:
return f(*args, **kwargs)
else:
logging.warning("Function requires client to be connected!")
return wrapper
def requires_any_connected(f):
def wrapper(*args, **kwargs):
if Client.clients:
return f(*args, **kwargs)
else:
logging.warning("No clients were connected!")
return wrapper
class Client(messaging.ConnectionManager):
clients = {}
on_connect = None # Use as callback functions
on_first_connect = None
on_disconnect = None
def __init__(self, ip):
super(Client, self).__init__()
self.copter_id = None
self.connected = False
self.clients[ip] = self
@staticmethod
def get_by_id(copter_id):
for client in Client.clients.values(): # TODO filter
if client.copter_id == copter_id:
return client
def connect(self, client_selector, client_socket, client_addr):
logging.info("Client connected")
if not self.resume_queue:
self._send_queue = collections.deque()
super(Client, self).connect(client_selector, client_socket, client_addr)
self.connected = True
if self.copter_id is None:
self.get_response("id", self._got_id)
if self.on_connect:
self.on_connect(self)
def _got_id(self, _client, value):
logging.info("Got copter id: {} for client {}".format(value, self.addr))
old_id = self.copter_id
self.copter_id = value
if old_id is None and self.on_first_connect:
self.on_first_connect(self)
def close(self, inner=False):
self.connected = False
if self.on_disconnect:
self.on_disconnect(self)
if inner:
super(Client, self)._close()
else:
super(Client, self).close()
logging.info("Connection to {} closed!".format(self.copter_id))
def remove(self):
if self.connected:
self.close()
try:
self.clients.pop(self.addr[0])
except KeyError as e:
logging.error(e)
logging.info("Client {} successfully removed!".format(self.copter_id))
@requires_connect
def _send(self, data):
super(Client, self)._send(data)
logging.debug("Queued data to send: {}".format(data))
def send_config_options(self, *options: ConfigOption, reload_config=True):
logging.info("Sending config options: {} to {}".format(options, self.addr))
sending_options = [{'section': option.section, 'option': option.option, 'value': option.value}
for option in options]
print(sending_options)
self.send_message(
'config_write', {"options": sending_options, "reload": reload_config}
)
@staticmethod
@requires_any_connected
def broadcast(message, force_all=False):
for client in Client.clients.values():
if client.connected or force_all:
client._send(message)
@classmethod
@requires_any_connected
def broadcast_message(cls, command, args=None, force_all=False):
cls.broadcast(messaging.MessageManager.create_simple_message(command, args), force_all)
if __name__ == '__main__':
server = Server()
server.start()
while True:
pass