Files
clever-show/Server/server.py
2019-04-08 18:20:02 +03:00

481 lines
18 KiB
Python

import sys
import time
import socket
import random
import logging
import threading
import selectors
import collections
import configparser
from messaging_lib import Message
# All imports sorted in pyramid just because
random.seed()
logging.basicConfig( # TODO all prints as logs
level=logging.DEBUG,
format="%(asctime)s [%(name)-7.7s] [%(threadName)-19.19s] [%(levelname)-7.7s] %(message)s",
handlers=[
logging.FileHandler("server_logs.log"),
logging.StreamHandler()
])
ConfigOption = collections.namedtuple("ConfigOption", ["section", "option", "value"])
PendingRequest = collections.namedtuple("PendingRequest", ["value", "requested_value", # "expires_on",
"callback", "callback_args", "callback_kwargs"
])
class Server:
BUFFER_SIZE = 1024
def __init__(self, server_id=None, config_path="server_config.ini"):
self.id = server_id if server_id else str(random.randint(0, 9999)).zfill(4)
# Init socket
self.sel = selectors.DefaultSelector()
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.host = socket.gethostname()
self.ip = Server.get_ip_address()
# Init configs
self.config_path = config_path
self.config = configparser.ConfigParser()
self.load_config()
# Init threads
self.autoconnect_thread = threading.Thread(target=self._client_processor, daemon=True,
name='Client processor')
self.client_processor_thread_running = threading.Event() # Can be used for manual thread killing
self.broadcast_thread = threading.Thread(target=self._ip_broadcast, daemon=True,
name='IP broadcast sender')
self.broadcast_thread_running = threading.Event()
self.listener_thread = threading.Thread(target=self._broadcast_listen, daemon=True,
name='IP broadcast listener')
self.listener_thread_running = threading.Event()
def load_config(self):
self.config.read(self.config_path)
self.port = int(self.config['SERVER']['port']) # TODO try, init def
self.broadcast_port = int(self.config['SERVER']['broadcast_port'])
self.BROADCAST_DELAY = int(self.config['SERVER']['broadcast_delay'])
Server.BUFFER_SIZE = int(self.config['SERVER']['buffer_size'])
self.USE_NTP = self.config.getboolean('NTP', 'use_ntp')
self.NTP_HOST = self.config['NTP']['host']
self.NTP_PORT = int(self.config['NTP']['port'])
def start(self): # do_auto_connect=True, do_ip_broadcast=True, do_listen_broadcast=False
logging.info("Starting server with id: {} on {}:{} !".format(self.id, self.ip, self.port))
logging.info("Starting server socket!")
self.server_socket.bind((self.ip, self.port))
logging.info("Starting client processor thread!")
self.client_processor_thread_running.set()
self.autoconnect_thread.start()
logging.info("Starting broadcast sender thread!")
self.broadcast_thread_running.set()
self.broadcast_thread.start()
logging.info("(not) Starting broadcast listener thread!")
self.listener_thread_running.set()
# listener_thread.start()
def stop(self):
logging.info("Stopping server")
self.client_processor_thread_running.clear()
self.broadcast_thread_running.clear()
self.listener_thread_running.clear()
self.server_socket.close()
self.sel.close()
logging.info("Server stopped")
@staticmethod
def get_ip_address():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ip_socket:
ip_socket.connect(("8.8.8.8", 80))
return ip_socket.getsockname()[0]
@staticmethod
def get_ntp_time(ntp_host, ntp_port):
NTP_DELTA = 2208988800 # 1970-01-01 00:00:00
NTP_QUERY = b'\x1b' + bytes(47)
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ntp_socket:
ntp_socket.sendto(NTP_QUERY, (ntp_host, ntp_port))
msg, _ = ntp_socket.recvfrom(1024)
return int.from_bytes(msg[-8:], 'big') / 2 ** 32 - NTP_DELTA
# noinspection PyArgumentList
def _client_processor(self):
logging.info("Client processor (selector) thread started!")
self.server_socket.listen()
self.server_socket.setblocking(False)
self.sel.register(self.server_socket, selectors.EVENT_READ, data=None)
while self.client_processor_thread_running.is_set():
events = self.sel.select(timeout=None)
for key, mask in events:
if key.data is None:
self._connect_client(key.fileobj)
else:
client = key.data
try:
client.process_events(mask)
except Exception as error:
logging.error("Exception {} occurred for {}! Resetting connection!".format(error, client.addr))
client.close()
logging.info("Client autoconnect thread stopped!")
def _connect_client(self, sock):
conn, addr = sock.accept()
logging.info("Got connection from: {}".format(str(addr)))
conn.setblocking(False)
if not any(client_addr == addr[0] for client_addr in Client.clients.keys()):
client = Client(addr[0])
logging.info("New client")
else:
client = Client.clients[addr[0]]
logging.info("Reconnected client")
self.sel.register(conn, selectors.EVENT_READ, data=client)
client.connect(self.sel, conn, addr)
def _ip_broadcast(self):
logging.info("Broadcast sender thread started!")
msg = Message.create_simple_message(
"server_ip", {"host": self.ip, "port": str(self.port), "id": self.id})
broadcast_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
logging.info("Formed broadcast message: {}".format(msg))
try:
while self.broadcast_thread_running.is_set():
time.sleep(self.BROADCAST_DELAY)
broadcast_sock.sendto(msg, ('255.255.255.255', self.broadcast_port))
logging.debug("Broadcast sent")
finally:
broadcast_sock.close()
logging.info("Broadcast sender thread stopped, socked closed!")
def _broadcast_listen(self):
logging.info("Broadcast listener thread started!")
broadcast_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
broadcast_client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
try:
broadcast_client.bind(("", self.broadcast_port))
except OSError:
logging.critical("Another server is running on this computer, shutting down!")
sys.exit()
try:
while self.listener_thread_running.is_set():
data, addr = broadcast_client.recvfrom(1024)
message = Message()
message.income_raw = data
message.process_message()
if message.content:
if message.content["command"] == "server_ip":
if message.content["args"]["id"] != self.id:
logging.critical("Another server detected on network, shutting down")
sys.exit()
else:
logging.warning("Got wrong broadcast message from {}".format(addr))
finally:
broadcast_client.close()
logging.info("Broadcast listener thread stopped, socked closed!")
def time_now(self):
if self.USE_NTP:
timenow = Server.get_ntp_time(self.NTP_HOST, self.NTP_PORT)
else:
timenow = time.time()
return timenow
def send_starttime(self, copter, dt=0):
timenow = self.time_now()
print('Now:', time.ctime(timenow), "+ dt =", dt)
copter.send(Message.create_simple_message("starttime", {"time": str(timenow + dt)})) # TODO change start_on
# TODO all commands as tasks with timestamp and priority
def requires_connect(f):
def wrapper(*args, **kwargs):
if args[0].connected:
return f(*args, **kwargs)
else:
logging.warning("Function requires client to be connected!")
return wrapper
def requires_any_connected(f):
def wrapper(*args, **kwargs):
if Client.clients:
return f(*args, **kwargs)
else:
logging.warning("No clients were connected!")
return wrapper
class Client:
resume_queue = True
clients = {}
on_connect = None # Use as callback functions
on_first_connect = None
on_disconnect = None
def __init__(self, ip):
self.selector = None
self.socket = None
self.addr = None
self._recv_buffer = b""
self._send_buffer = b""
self._send_queue = collections.deque()
self._received_queue = collections.deque()
self._request_queue = collections.OrderedDict()
self._send_lock = threading.Lock()
self._request_lock = threading.Lock()
self.copter_id = None
self.clients[ip] = self
self.connected = False
def _set_selector_events_mask(self, mode):
"""Set selector to listen for events: mode is 'r', 'w', 'rw'."""
if mode == "r":
events = selectors.EVENT_READ
elif mode == "w":
events = selectors.EVENT_WRITE
elif mode == "rw":
events = selectors.EVENT_READ | selectors.EVENT_WRITE
else:
raise ValueError("Invalid events mask mode {}.".format(mode))
self.selector.modify(self.socket, events, data=self)
def connect(self, client_selector, client_socket, client_addr):
logging.info("Client connected")
if not Client.resume_queue:
self._send_queue = collections.deque()
self.selector = client_selector
self.socket = client_socket
self.addr = client_addr
self.connected = True
if self.copter_id is None:
self.get_response("id", self._got_id)
if self.on_connect:
self.on_connect(self)
def _got_id(self):
logging.info("Got copter id: {} for client {}".format(self.copter_id, self.addr))
if Client.on_first_connect:
Client.on_first_connect(self)
def close(self):
logging.info("Closing connection to {}".format(self.addr))
try:
self.selector.unregister(self.socket)
except AttributeError:
pass
except Exception as error:
logging.error("{}: Error during selector unregistering: {}".format(self.addr, error))
finally:
self.selector = None
try:
self.socket.close()
except AttributeError:
pass
except OSError as error:
logging.error("{}: Error during socket closing: {}".format(self.addr, error))
finally:
self.socket = None
def process_events(self, mask):
print(self.socket, self.selector, mask)
if mask & selectors.EVENT_READ:
self.read()
if mask & selectors.EVENT_WRITE:
self.write()
with self._send_lock:
if (not self._send_buffer) and self._send_queue:
message = self._send_queue.popleft()
self._send_buffer += message
self._set_selector_events_mask('rw')
def process_received(self):
if self._received_queue:
if self._received_queue[-1].content:
message = self._received_queue.pop()
message_type = message.jsonheader["message-type"]
logging.debug("Received message! Header: {}, content: {}".format(message.jsonheader, message.content))
if message_type == "message":
pass
elif message_type == "response":
request_id, requested_value = message.content["requst_id"], message.content["requested_value"]
with self._request_lock:
for key, value in self._request_queue.items():
if (key == request_id) and (value.requested_value == requested_value):
request = self._request_queue.pop(key)
request.value = message.content["value"]
logging.debug(
"Request successfully closed with value {}".format(message.content["value"])
)
request.callback(request.value, *request.callback_args, **request.callback_kwargs)
break
else:
logging.warning("Unexpected request response!")
elif message_type == "request":
pass
def read(self):
self._read()
if self._recv_buffer:
if not self._received_queue or (self._received_queue[0].content is not None):
self._received_queue.appendleft(Message())
self._received_queue[0].income_raw += self._recv_buffer
self._received_queue[0].process_message()
if self._received_queue[0].content and self._received_queue[0].income_raw:
self._recv_buffer = self._received_queue[0].income_raw + self._recv_buffer
self._received_queue[0].income_raw = b''
self.process_received()
def write(self):
self._write()
if not (self._send_buffer and self._send_queue):
self._set_selector_events_mask("r")
def _read(self):
try:
data = self.socket.recv(Server.BUFFER_SIZE)
except BlockingIOError:
# Resource temporarily unavailable (errno EWOULDBLOCK)
pass
else:
if data:
self._recv_buffer += data
logging.debug("Received {} from {}".format(data, self.addr))
else:
logging.warning("Connection to {} lost!".format(self.addr))
self.connected = False
if not Client.resume_queue:
self._recv_buffer = b''
if Client.on_disconnect:
Client.on_disconnect(self)
raise RuntimeError("Peer closed.")
def _write(self):
if self._send_buffer:
try:
sent = self.socket.send(self._send_buffer)
except BlockingIOError:
# Resource temporarily unavailable (errno EWOULDBLOCK)
pass
except socket.error as error:
logging.warning("Attempt to send message {} to {} failed due error: {}".format(
self._send_buffer, self.addr, error))
if not Client.resume_queue:
self._send_buffer = b''
raise error
else:
print(sent)
logging.debug("Sent {} to {}".format(self._send_buffer[:sent], self.addr))
self._send_buffer = self._send_buffer[sent:]
print(self._send_buffer)
time.sleep(0.1)
@requires_connect
def send(self, *messages):
for message in messages:
with self._send_lock:
self._send_queue.append(message)
def send_message(self, command, args=None):
self.send(Message.create_simple_message(command, args))
@staticmethod
@requires_any_connected
def broadcast(message, force_all=False):
for client in Client.clients.values():
if client.connected or force_all:
client.send(message)
@classmethod
@requires_any_connected
def broadcast_message(cls, command, args=None, force_all=False):
cls.broadcast(Message.create_simple_message(command, args), force_all)
def get_response(self, requested_value, callback, request_args=None, # timeout=30,
callback_args=(), callback_kwargs: dict=None):
if request_args is None:
request_args = {}
if callback_kwargs is None:
callback_kwargs = {}
request_id = str(random.randint(0, 9999)).zfill(4)
with self._request_lock:
self._request_queue[request_id] = PendingRequest(
requested_value=requested_value,
value=None,
# expires_on=Server.time_now()+timeout,
callback=callback,
callback_args=callback_args,
callback_kwargs=callback_kwargs,
)
self.send(Message.create_request(requested_value, request_id, request_args))
def send_file(self, filepath, dest_filepath): # clever_restart=False
try:
with open(filepath, 'rb') as file:
data = file.read()
except OSError as error:
logging.warning("File can not be opened due error: ".format(error))
else:
logging.info("Sending file {} to {} (as: {})".format(filepath, self.addr, dest_filepath))
self.send(Message.create_message(data, "binary", "filetransfer", "binary", {"path": dest_filepath}))
def send_config_options(self, *options: ConfigOption):
logging.info("Sending config options: {} to {}".format(options, self.addr))
for option in options:
self.send_message(
'config_write',
{'section': option.section, 'option': option.option, 'value': option.value}
)
self.send_message("config_reload")
@staticmethod
def get_by_id(copter_id):
for copter in Client.clients.values():
if copter.copter_id == copter_id:
return copter
if __name__ == '__main__':
server = Server()
server.start()