NEW server app as API module

This commit is contained in:
Artem30801
2019-03-23 21:58:45 +03:00
parent 67f83797c5
commit da7d972e10
3 changed files with 223 additions and 522 deletions

View File

@@ -1,63 +1,151 @@
from tkinter import *
from tkinter import ttk
from tkinter import filedialog
import ttkwidgets
import os
import sys
import glob
import math
import time
import json
import struct
import socket
import random
import threading
import collections
import configparser
# All imports sorted in pyramid
random.seed()
def get_ip_address():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
BUFFER_SIZE = 1024
def auto_connect():
while True:
ServerSocket.listen(1)
c, addr = ServerSocket.accept()
print("Got connection from:", str(addr))
if not any(client_addr == addr[0] for client_addr in Client.clients.keys()):
client = Client(addr[0])
print("New client")
class ConfigOption:
def __init__(self, section, option, value):
self.section = section
self.option = option
self.value = value
class Server:
def __init__(self, server_id=None, config_path="server_config.ini"):
self.id = server_id if server_id else str(random.randint(0, 9999)).zfill(4)
# Init socket
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.host = socket.gethostname()
self.ip = Server.get_ip_address()
# Init configs
self.config_path = config_path
self.config = configparser.ConfigParser()
self.config.read(self.config_path)
self.load_config()
# Init threads
self.autoconnect_thread = threading.Thread(target=self._auto_connect, daemon=True,
name='Client auto-connect thread')
self.autoconnect_thread_running = threading.Event() # Can be used for manual thread killing
self.broadcast_thread = threading.Thread(target=self._ip_broadcast, daemon=True,
name='IP message broadcast sender thread')
self.broadcast_thread_running = threading.Event()
self.listener_thread = threading.Thread(target=self._broadcast_listen, daemon=True,
name='IP message broadcast listener thread')
self.listener_thread_running = threading.Event()
def load_config(self):
self.port = int(self.config['SERVER']['port'])
self.broadcast_port = int(self.config['SERVER']['broadcast_port'])
self.BUFFER_SIZE = int(self.config['SERVER']['buffer_size'])
self.USE_NTP = self.config.getboolean('NTP', 'use_ntp')
self.NTP_HOST = self.config['NTP']['host']
self.NTP_PORT = int(self.config['NTP']['port'])
def start(self): # do_auto_connect=True, do_ip_broadcast=True, do_listen_broadcast=False
print("Starting server!")
self.server_socket.bind((self.ip, self.port))
self.autoconnect_thread_running.set()
self.autoconnect_thread.start()
self.broadcast_thread_running.set()
self.broadcast_thread.start()
self.listener_thread_running.set()
# listener_thread.start()
def stop(self):
self.autoconnect_thread_running.clear()
self.broadcast_thread_running.clear()
self.listener_thread_running.clear()
self.server_socket.close()
@staticmethod
def get_ip_address():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ip_socket:
ip_socket.connect(("8.8.8.8", 80))
return ip_socket.getsockname()[0]
@staticmethod
def get_ntp_time(ntp_host, ntp_port):
NTP_DELTA = 2208988800 # 1970-01-01 00:00:00
NTP_QUERY = b'\x1b' + bytes(47)
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ntp_socket:
ntp_socket.sendto(NTP_QUERY, (ntp_host, ntp_port))
msg, _ = ntp_socket.recvfrom(1024)
return int.from_bytes(msg[-8:], 'big') / 2 ** 32 - NTP_DELTA
def _auto_connect(self):
while self.autoconnect_thread_running.is_set():
self.server_socket.listen(1)
c, addr = self.server_socket.accept()
print("Got connection from:", str(addr))
if not any(client_addr == addr[0] for client_addr in Client.clients.keys()):
client = Client(addr[0])
print("New client")
else:
print("Reconnected client")
Client.clients[addr[0]].connect(c, addr)
def _ip_broadcast(self):
msg = bytes(Client.form_message(
"server_ip", {"host": self.ip, "port": str(self.port), "id": self.id}
), "UTF-8")
broadcast_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
while self.broadcast_thread_running.is_set():
time.sleep(10)
broadcast_sock.sendto(msg, ('255.255.255.255', self.broadcast_port))
print("Broadcast sent")
broadcast_sock.close()
def _broadcast_listen(self):
broadcast_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
broadcast_client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
try:
broadcast_client.bind(("", self.broadcast_port))
except OSError:
print("Another server is running on this computer, shutting down!")
sys.exit()
while self.listener_thread_running.is_set():
data, addr = broadcast_client.recvfrom(1024)
command, args = Client.parse_message(data.decode("UTF-8"))
if command == "server_ip":
if args["id"] != self.id:
print("Another server detected on network, shutting down")
sys.exit()
def send_starttime(self, dt=0):
if self.USE_NTP:
timenow = Server.get_ntp_time(self.NTP_HOST, self.NTP_PORT)
else:
print("Reconnected client")
Client.clients[addr[0]].connect(c, addr)
def ip_broadcast(ip, port):
ip = ip
broadcast_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
while True:
msg = bytes(Client.form_command("server_ip ", ip, ), "UTF-8")
broadcast_sock.sendto(msg, ('255.255.255.255', 8181)) #TODO to config
print("Broadcast sent")
time.sleep(5)
NTP_DELTA = 2208988800 # 1970-01-01 00:00:00
NTP_QUERY = b'\x1b' + bytes(47)
def get_ntp_time(ntp_host, ntp_port):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.sendto(NTP_QUERY, (ntp_host, ntp_port))
msg, _ = s.recvfrom(1024)
return int.from_bytes(msg[-8:], 'big') / 2 ** 32 - NTP_DELTA
timenow = time.time()
print('Now:', time.ctime(timenow), "+ dt =", dt)
Client.send_to_selected(Client.form_message("starttime", {"time": str(timenow + dt)}))
def requires_connect(f):
@@ -69,8 +157,20 @@ def requires_connect(f):
return wrapper
def requires_any_connected(f):
def wrapper(*args, **kwargs):
if Client.clients:
return f(*args, **kwargs)
else:
print("No clients were connected!")
return wrapper
class Client:
resume_quee = True
clients = {}
on_connect = None # Use as callback functions
on_disconnect = None
def __init__(self, ip):
self.socket = None
@@ -81,7 +181,7 @@ class Client:
self._request_queue = collections.OrderedDict()
self.copter_id = None
self.malfunction = False
self.selected = False # Use to select copters for certain purposes
Client.clients[ip] = self
@@ -89,7 +189,8 @@ class Client:
def connect(self, client_socket, client_addr):
print("Client connected")
# self._send_queue = collections.deque() # comment for resuming queue after reconnection
if not Client.resume_quee:
self._send_queue = collections.deque() # comment for resuming queue after reconnection
self.socket = client_socket
self.addr = client_addr
@@ -101,7 +202,6 @@ class Client:
if self.copter_id is None:
self.copter_id = self.get_response("id")
print("Got copter id:", self.copter_id)
drone_list.insert("", "end", self.addr[0], text=self.copter_id)
def _send_all(self, msg):
self.socket.sendall(struct.pack('>I', len(msg)) + msg)
@@ -135,9 +235,6 @@ class Client:
print("Attempt to send failed")
self._send_queue.appendleft(msg)
raise e
else:
msg = "ping"
# self._send_all(msg)
try: # check if data in buffer
check = self.socket.recv(BUFFER_SIZE, socket.MSG_PEEK)
@@ -145,12 +242,12 @@ class Client:
received = self._receive_message()
if received:
received = received.decode("UTF-8")
print("Recived", received, "from", self.addr)
command, args = Client.parse_command(received)
print("Received", received, "from", self.addr)
command, args = Client.parse_message(received)
if command == "response":
for key, value in self._request_queue.items():
if not value:
self._request_queue[key] = args[0]
self._request_queue[key] = args['value']
print("Request successfully closed")
break
else:
@@ -166,174 +263,86 @@ class Client:
# time.sleep(0.05)
@staticmethod
def form_command(command: str, args=()): # Change for different protocol
return " ".join([command, *args])
def form_message(command: str, dict_arguments: dict = None):
if dict_arguments is None:
dict_arguments = {}
msg_dict = {command: str(dict_arguments).replace(",", '').replace("'", '')[1:-1]}
msg = json.dumps(msg_dict)
return msg
@staticmethod
def parse_command(command_input):
args = command_input.split()
command = args.pop(0)
return command, args
def parse_message(msg):
try:
j_message = json.loads(msg)
except json.decoder.JSONDecodeError:
print("Json string not in correct format")
return None
str_command = list(j_message.keys())[0]
arguments = list(j_message.values())[0].replace(":", '').split()
dict_arguments = collections.OrderedDict(zip(arguments[::2], arguments[1::2]))
return str_command, dict_arguments
@requires_connect
def send(self, *messages):
for message in messages:
self._send_queue.append(bytes(message, "UTF-8"))
@staticmethod
def broadcast(message, force_all=False):
if Client.clients:
for client in Client.clients.values():
if (not client.malfunction) or force_all:
client.send(message)
else:
print("No clients were connected!")
@requires_connect
def send_file(self, filepath, dest_filename):
print("Sending file ", dest_filename)
self.send(Client.form_command("writefile", (dest_filename,)))
file = open(filepath, 'rb')
chunk = file.read(BUFFER_SIZE)
while chunk:
self._send_queue.append(chunk)
chunk = file.read(BUFFER_SIZE)
file.close()
self.send(Client.form_command("/endoffile"))
print("File sent")
@requires_connect
def get_response(self, requested_value):
self._request_queue[requested_value] = ""
self.send(Client.form_command("request", (requested_value, )))
self.send(Client.form_message("request", {"value": requested_value}))
while not self._request_queue[requested_value]:
pass
return self._request_queue.pop(requested_value)
@requires_connect
def send_file(self, filepath, dest_filename):
print("Sending file ", dest_filename)
chunk_count = math.ceil(os.path.getsize(filepath) / BUFFER_SIZE)
self.send(Client.form_message("writefile", {"filesize": chunk_count, "filename": dest_filename}))
with open(filepath, 'rb') as file:
chunk = file.read(BUFFER_SIZE)
while chunk:
self._send_queue.append(chunk) # TODO lock!!!!!
chunk = file.read(BUFFER_SIZE)
# UI functions
def stop_swarm():
Client.broadcast("stop") # для тестирования
self.send(Client.form_message("/endoffile")) # TODO mb remove
print("File sent")
@staticmethod
@requires_any_connected
def send_to_selected(message):
for client in Client.clients.values():
if client.connected and client.selected:
client.send(message)
def land_all():
Client.broadcast("land")
@staticmethod
@requires_any_connected
def request_to_selected(requested_value):
for client in Client.clients.values(): # TODO change to selected
if client.connected and client.selected:
client.get_response(requested_value)
@staticmethod
@requires_any_connected
def broadcast(message, force_all=False):
for client in Client.clients.values():
if client.connected or force_all:
client.send(message)
def disarm_all():
Client.broadcast("disarm")
@staticmethod
def send_config_options(*options: ConfigOption):
for option in options:
Client.send_to_selected(
Client.form_message('config_write',
{'section': option.section, 'option': option.option, 'value': option.value}))
Client.send_to_selected(Client.form_message("config_reload"))
def takeoff_all():
Client.broadcast("takeoff")
def send_animations():
path = filedialog.askdirectory(title="Animation directory")
if path:
print("Selected directory:", path)
files = [file for file in glob.glob(path+'/*.csv')]
names = [os.path.basename(file).split(".")[0] for file in files]
print(files)
for file, name in zip(files, names):
for copter in Client.clients.values():
if name == copter.copter_id:
copter.send_file(file, "animation.csv") # TODO config
else:
print("Filename not matches with any drone connected")
# dr = next(iter(Client.clients.values())) # костыль для тестирования
# ANS = dr.get_response("someshit")
# print(ANS)
def send_starttime(dt=15):
timenow = time.time()
print('Now:', time.ctime(timenow), "+ dt =", dt)
Client.broadcast(Client.form_command("starttime", (str(timenow+dt), )))
# UI build here
root = Tk()
root.wm_title("Drone swarm operation server")
root.style = ttk.Style()
root.style.theme_use("default")
leftFrame = Frame(root)
leftFrame.grid(row=0, column=0, padx=10, pady=10)
rightFrame = Frame(root)
rightFrame.grid(row=0, column=1, padx=10, pady=10)
drone_list = ttkwidgets.CheckboxTreeview(leftFrame, columns=("addr", "connected"))
# drone_list["columns"] = ("addr")
# drone_list.column("name") #width=100
# drone_list.column("addr")
drone_list.heading("#0", text="Drone name")
drone_list.heading("#1", text="Connection adress")
drone_list.heading("#2", text="Connection status")
drone_list.pack()
button_frame = Frame(leftFrame, borderwidth=1, relief="solid")
button_frame.pack(fill=BOTH, expand=True)
land_all_btn = ttk.Button(button_frame, text="Disarm all", command=disarm_all)
land_all_btn.pack(side=RIGHT, padx=5, pady=5)
land_all_btn = ttk.Button(button_frame, text="Land all", command=land_all)
land_all_btn.pack(side=RIGHT, padx=5, pady=5)
stop_all_btn = ttk.Button(button_frame, text="Stop swarm", command=stop_swarm)
stop_all_btn.pack(side=RIGHT, padx=5, pady=5)
send_animation_btn = ttk.Button(button_frame, text="Send animations", command=send_animations)
send_animation_btn.pack(side=LEFT, padx=5, pady=5)
send_starttime_btn = Button(button_frame, bg='red', text="Takeoff all", command=takeoff_all)
send_starttime_btn.pack(side=LEFT, padx=5, pady=5)
send_starttime_btn = ttk.Button(button_frame, text="Start animation after...", command=send_starttime)
send_starttime_btn.pack(side=LEFT, padx=5, pady=5)
def gui_update():
time.sleep(0.1)
# reading config
config = configparser.ConfigParser()
config.read("server_config.ini")
port = int(config['SERVER']['port'])
BUFFER_SIZE = int(config['SERVER']['buffer_size'])
NTP_HOST = config['NTP']['host']
NTP_PORT = int(config['NTP']['port'])
ServerSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ServerSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
host = socket.gethostname()
ip = get_ip_address()
print('Server started on', host, ip, ":", port)
# print('Now:', time.ctime(get_ntp_time(NTP_HOST, NTP_PORT)))
print('Waiting for clients...')
ServerSocket.bind((ip, port))
autoconnect_thread = threading.Thread(target=auto_connect)
autoconnect_thread.daemon = True
autoconnect_thread.start()
broadcast_thread = threading.Thread(target=ip_broadcast, args=(ip, port, ))
broadcast_thread.daemon = True
broadcast_thread.start()
if __name__ == '__main__':
try:
mainloop()
except KeyboardInterrupt:
print("Stopping server by keyboard interrupt")
finally:
ServerSocket.close()
print("Server shutdown")
server = Server()
server.start()