diff --git a/Drone/FlightLib b/Drone/FlightLib index 701e7aa..9993d88 160000 --- a/Drone/FlightLib +++ b/Drone/FlightLib @@ -1 +1 @@ -Subproject commit 701e7aa4e12bb99b5ddf05ad40ad198f17ed583a +Subproject commit 9993d88fdcc19f5684fd2db58c8ffa76bb38e34d diff --git a/Drone/FlightLib2/FlightLib.py b/Drone/FlightLib2/FlightLib.py index d9c3119..7fa797a 100644 --- a/Drone/FlightLib2/FlightLib.py +++ b/Drone/FlightLib2/FlightLib.py @@ -129,7 +129,7 @@ def navto(x, y, z, yaw=float('nan'), frame_id='aruco_map'): def reach_point(x=0.0, y=0.0, z=0.0, yaw=float('nan'), speed=1.0, tolerance=TOLERANCE, frame_id='aruco_map', - freq=FREQUENCY, timeout=5000, wait=False): + freq=FREQUENCY, timeout=5000, wait=False): module_logger.info('Reaching point: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(x, y, z, yaw)) navigate(frame_id=frame_id, x=x, y=y, z=z, yaw=yaw, speed=speed) @@ -162,7 +162,7 @@ def reach_point(x=0.0, y=0.0, z=0.0, yaw=float('nan'), speed=1.0, tolerance=TOLE def reach_attitude(z=0.0, yaw=float('nan'), speed=1.0, tolerance=TOLERANCE, frame_id='aruco_map', - freq=FREQUENCY, timeout=5000, wait=False): + freq=FREQUENCY, timeout=5000, wait=False): module_logger.info('Reaching attitude: | z: {:.3f} yaw: {:.3f}'.format(z, yaw)) current_telem = get_telemetry(frame_id=frame_id) @@ -188,8 +188,6 @@ def reach_attitude(z=0.0, yaw=float('nan'), speed=1.0, tolerance=TOLERANCE, fram if time_passed >= timeout: module_logger.warning('Reaching attitude timed out! | time: {:3f} seconds'.format(time_passed / 1000)) return wait - else: - return True rate.sleep() else: module_logger.info("Attitude reached!") @@ -236,9 +234,7 @@ def takeoff(z=1.0, speed=0.8, frame_id='body', freq=FREQUENCY, module_logger.info("Arming, going to OFFBOARD mode") # Arming check - #arming(True) set_rates(thrust=0.1, auto_arm=True) - #navigate(frame_id=frame_id, speed=speed, auto_arm=True) telemetry = get_telemetry(frame_id=frame_id) rate = rospy.Rate(freq) time_start = rospy.get_rostime() @@ -250,16 +246,16 @@ def takeoff(z=1.0, speed=0.8, frame_id='body', freq=FREQUENCY, return None telemetry = get_telemetry(frame_id=frame_id) - if telemetry.armed: - break - module_logger.info("Arming...") time_passed = (rospy.get_rostime() - time_start).to_sec() * 1000 if timeout_arm is not None: if time_passed >= timeout_arm: - module_logger.warning('Arming timed out! | time: {:3f} seconds'.format(time_passed / 1000)) - return False + if not telemetry.armed: + module_logger.warning('Arming timed out! | time: {:3f} seconds'.format(time_passed / 1000)) + return False + else: + break rate.sleep() module_logger.info("Armed!") @@ -277,22 +273,21 @@ def takeoff(z=1.0, speed=0.8, frame_id='body', freq=FREQUENCY, return None current_height = abs(get_telemetry().z - z0 - z) - if current_height < tolerance: - break module_logger.info("Takeoff...") - time_passed = (rospy.get_rostime() - time_start).to_sec() * 1000 - #print(time_passed) if timeout_takeoff is not None: if time_passed >= timeout_takeoff: - module_logger.warning('Takeoff timed out! | time: {:3f} seconds'.format(time_passed / 1000)) - if emergency_land: - module_logger.info("Preforming emergency land") - land(descend=False) - return False + if not wait: + module_logger.warning('Takeoff timed out! | time: {:3f} seconds'.format(time_passed / 1000)) + if emergency_land: + module_logger.info("Preforming emergency land") + land(descend=False) + return False + else: + break rate.sleep() module_logger.info("Takeoff succeeded!") - print ("Takeoff succeeded!") + print("Takeoff succeeded!") return True diff --git a/Drone/client.py b/Drone/client.py index de352ed..297658c 100644 --- a/Drone/client.py +++ b/Drone/client.py @@ -10,7 +10,6 @@ import logging import threading import ConfigParser from contextlib import closing -from collections import OrderedDict import rospy import pause @@ -118,7 +117,7 @@ def parse_message(msg): str_command = list(j_message.keys())[0] arguments = list(j_message.values())[0].replace(":", '').split() - dict_arguments = OrderedDict(zip(arguments[::2], arguments[1::2])) + dict_arguments = dict(zip(arguments[::2], arguments[1::2])) return str_command, dict_arguments @@ -133,12 +132,14 @@ def recive_file(filename): print("File received") break file.write(data) + else: + break def animation_player(running_event, stop_event): print("Animation thread activated") frames = play_animation.read_animation_file() - rate = rospy.Rate(1000 / 125) + # rate = rospy.Rate(1000 / 125) delay_time = 0.125 print("Takeoff") @@ -201,13 +202,12 @@ def stop_animation(): def selfcheck(): - telemetry = FlightLib.get_telemetry('body') - return FlightLib.selfcheck(), telemetry.voltage + return FlightLib.selfcheck() def write_to_config(section, option, value): config.set(section, option, value) - with open(CONFIG_PATH, 'w') as file: + with open(CONFIG_PATH, 'w') as file: # TODO as separate function config.write(file) @@ -216,7 +216,7 @@ def load_config(): global broadcast_port, port, host, BUFFER_SIZE global USE_NTP, NTP_HOST, NTP_PORT global files_directory, animation_file - global TAKEOFF_HEIGHT, TAKEOFF_TIME, SAFE_TAKEOFF, RFP_TIME + global FRAME_ID, TAKEOFF_HEIGHT, TAKEOFF_TIME, SAFE_TAKEOFF, RFP_TIME global USE_LEDS, COPTER_ID CONFIG_PATH = "client_config.ini" config = ConfigParser.ConfigParser() @@ -233,6 +233,7 @@ def load_config(): files_directory = config.get('FILETRANSFER', 'files_directory') animation_file = config.get('FILETRANSFER', 'animation_file') + FRAME_ID = config.get('COPTERS', 'frame_id') # TODO in play_animation TAKEOFF_HEIGHT = config.getfloat('COPTERS', 'takeoff_height') TAKEOFF_TIME = config.getfloat('COPTERS', 'takeoff_time') RFP_TIME = config.getfloat('COPTERS', 'reach_first_point_time') @@ -272,14 +273,14 @@ try: command, args = parse_message(message) print("Command from server:", command, args) if command == "writefile": - recive_file(list(args.values())[0]) + recive_file(args['filename']) elif command == 'config_write': write_to_config(args['section'], args['option'], args['value']) elif command == 'config_reload': load_config() elif command == "starttime": global starttime - starttime = float(list(args.values())[0]) + starttime = float(args['time']) print("Starting on:", time.ctime(starttime)) dt = starttime - time.time() if USE_NTP: @@ -309,9 +310,16 @@ try: elif request_target == 'id': response = COPTER_ID elif request_target == 'selfcheck': - response = selfcheck() - send_all(bytes(form_message("response", {"status": "ok", "value": response}))) + response = FlightLib.selfcheck() + elif request_target == 'batt_voltage': + response = FlightLib.get_telemetry('body').voltage + elif request_target == 'cell_voltage': + response = FlightLib.get_telemetry('body').cell_voltage + + send_all(bytes(form_message("response", + {"status": "ok", "value": response, "value_name": str(request_target)}))) print("Request responded with:", response) + except socket.error as e: if e.errno != errno.EINTR: print("Connection lost due error:", e) diff --git a/Drone/client_config.ini b/Drone/client_config.ini index 4af3c49..1a63ad8 100644 --- a/Drone/client_config.ini +++ b/Drone/client_config.ini @@ -14,6 +14,7 @@ host = ntp1.stratum2.ru port = 123 [COPTERS] +frame_id = aruco_map takeoff_height = 1.5 takeoff_time = 8.0 safe_takeoff = False diff --git a/Drone/play_animation.py b/Drone/play_animation.py index 288dc86..2077f28 100644 --- a/Drone/play_animation.py +++ b/Drone/play_animation.py @@ -31,6 +31,7 @@ def animate_frame(current_frame, x0=0.0, y0=0.0): if USE_LEDS: LedLib.fill(current_frame['red'], current_frame['green'], current_frame['blue']) + def reach_frame(current_frame, x0=0.0, y0=0.0, timeout=5000): FlightLib.reach_point(current_frame['x']+x0, current_frame['y']+y0, current_frame['z'], yaw=1.57, timeout=timeout) # TODO yaw if USE_LEDS: diff --git a/Server/server.py b/Server/server.py index eb03d34..815131b 100644 --- a/Server/server.py +++ b/Server/server.py @@ -1,63 +1,176 @@ -from tkinter import * -from tkinter import ttk -from tkinter import filedialog -import ttkwidgets - import os import sys -import glob +import math import time +import json import struct import socket +import random +import logging import threading import collections import configparser -# All imports sorted in pyramid +# All imports sorted in pyramid just because + +random.seed() + +logging.basicConfig( # TODO all prints as logs + level=logging.INFO, + format="%(asctime)s [%(name)-7.7s] [%(threadName)-19.19s] [%(levelname)-7.7s] %(message)s", + handlers=[ + logging.FileHandler("server_logs.log"), + logging.StreamHandler() + ]) -def get_ip_address(): - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.connect(("8.8.8.8", 80)) - ip = s.getsockname()[0] - s.close() - return ip +class ConfigOption: + def __init__(self, section, option, value): + self.section = section + self.option = option + self.value = value -def auto_connect(): - while True: - ServerSocket.listen(1) - c, addr = ServerSocket.accept() - print("Got connection from:", str(addr)) - if not any(client_addr == addr[0] for client_addr in Client.clients.keys()): - client = Client(addr[0]) - print("New client") +class Server: + BUFFER_SIZE = 1024 + + def __init__(self, server_id=None, config_path="server_config.ini"): + + self.id = server_id if server_id else str(random.randint(0, 9999)).zfill(4) + + # Init socket + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.host = socket.gethostname() + self.ip = Server.get_ip_address() + + # Init configs + self.config_path = config_path + self.config = configparser.ConfigParser() + self.config.read(self.config_path) + self.load_config() + + # Init threads + self.autoconnect_thread = threading.Thread(target=self._auto_connect, daemon=True, + name='Client auto-connect') + self.autoconnect_thread_running = threading.Event() # Can be used for manual thread killing + + self.broadcast_thread = threading.Thread(target=self._ip_broadcast, daemon=True, + name='IP broadcast sender') + self.broadcast_thread_running = threading.Event() + + self.listener_thread = threading.Thread(target=self._broadcast_listen, daemon=True, + name='IP broadcast listener') + self.listener_thread_running = threading.Event() + + def load_config(self): + self.port = int(self.config['SERVER']['port']) + self.broadcast_port = int(self.config['SERVER']['broadcast_port']) + self.BROADCAST_DELAY = int(self.config['SERVER']['broadcast_delay']) + Server.BUFFER_SIZE = int(self.config['SERVER']['buffer_size']) + + self.USE_NTP = self.config.getboolean('NTP', 'use_ntp') + self.NTP_HOST = self.config['NTP']['host'] + self.NTP_PORT = int(self.config['NTP']['port']) + + def start(self): # do_auto_connect=True, do_ip_broadcast=True, do_listen_broadcast=False + logging.info("Starting server with id: {} on {} !".format(self.id, self.ip)) + logging.info("Starting server socket!") + self.server_socket.bind((self.ip, self.port)) + + logging.info("Starting client autoconnect thread!") + self.autoconnect_thread_running.set() + self.autoconnect_thread.start() + + logging.info("Starting broadcast sender thread!") + self.broadcast_thread_running.set() + self.broadcast_thread.start() + + logging.info("(not) Starting broadcast listener thread!") + self.listener_thread_running.set() + # listener_thread.start() + + def stop(self): + logging.info("Stopping server") + self.autoconnect_thread_running.clear() + self.broadcast_thread_running.clear() + self.listener_thread_running.clear() + self.server_socket.close() + logging.info("Server stopped") + + @staticmethod + def get_ip_address(): + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ip_socket: + ip_socket.connect(("8.8.8.8", 80)) + return ip_socket.getsockname()[0] + + @staticmethod + def get_ntp_time(ntp_host, ntp_port): + NTP_DELTA = 2208988800 # 1970-01-01 00:00:00 + NTP_QUERY = b'\x1b' + bytes(47) + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ntp_socket: + ntp_socket.sendto(NTP_QUERY, (ntp_host, ntp_port)) + msg, _ = ntp_socket.recvfrom(1024) + return int.from_bytes(msg[-8:], 'big') / 2 ** 32 - NTP_DELTA + + def _auto_connect(self): + logging.info("Client autoconnect thread started!") + while self.autoconnect_thread_running.is_set(): + self.server_socket.listen(1) + c, addr = self.server_socket.accept() + logging.info("Got connection from: {}".format(str(addr))) + if not any(client_addr == addr[0] for client_addr in Client.clients.keys()): + client = Client(addr[0]) + logging.info("New client") + else: + logging.info("Reconnected client") + Client.clients[addr[0]].connect(c, addr) + logging.info("Client autoconnect thread stopped!") + + def _ip_broadcast(self): + logging.info("Broadcast sender thread started!") + msg = bytes(Client.form_message( + "server_ip", {"host": self.ip, "port": str(self.port), "id": self.id} + ), "UTF-8") + broadcast_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + logging.info("Formed broadcast message: {}".format(msg)) + + while self.broadcast_thread_running.is_set(): + time.sleep(self.BROADCAST_DELAY) + broadcast_sock.sendto(msg, ('255.255.255.255', self.broadcast_port)) + logging.debug("Broadcast sent") + broadcast_sock.close() + logging.info("Broadcast sender thread stopped, socked closed!") + + def _broadcast_listen(self): + logging.info("Broadcast listener thread started!") + broadcast_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + broadcast_client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + try: + broadcast_client.bind(("", self.broadcast_port)) + except OSError: + logging.critical("Another server is running on this computer, shutting down!") + sys.exit() + + while self.listener_thread_running.is_set(): + data, addr = broadcast_client.recvfrom(1024) + command, args = Client.parse_message(data.decode("UTF-8")) + if command == "server_ip": + if args["id"] != self.id: + logging.critical("Another server detected on network, shutting down") + sys.exit() + broadcast_client.close() + logging.info("Broadcast listener thread stopped, socked closed!") + + def send_starttime(self, dt=0): + if self.USE_NTP: + timenow = Server.get_ntp_time(self.NTP_HOST, self.NTP_PORT) else: - print("Reconnected client") - Client.clients[addr[0]].connect(c, addr) - - -def ip_broadcast(ip, port): - ip = ip - broadcast_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) - broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - while True: - msg = bytes(Client.form_command("server_ip ", ip, ), "UTF-8") - broadcast_sock.sendto(msg, ('255.255.255.255', 8181)) #TODO to config - print("Broadcast sent") - time.sleep(5) - - -NTP_DELTA = 2208988800 # 1970-01-01 00:00:00 -NTP_QUERY = b'\x1b' + bytes(47) - - -def get_ntp_time(ntp_host, ntp_port): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.sendto(NTP_QUERY, (ntp_host, ntp_port)) - msg, _ = s.recvfrom(1024) - return int.from_bytes(msg[-8:], 'big') / 2 ** 32 - NTP_DELTA + timenow = time.time() + print('Now:', time.ctime(timenow), "+ dt =", dt) + Client.send_to_selected(Client.form_message("starttime", {"time": str(timenow + dt)})) def requires_connect(f): @@ -65,13 +178,28 @@ def requires_connect(f): if args[0].connected: return f(*args, **kwargs) else: - print("Function requires client to be connected!") + logging.warning("Function requires client to be connected!") + return wrapper + + +def requires_any_connected(f): + def wrapper(*args, **kwargs): + if Client.clients: + return f(*args, **kwargs) + else: + logging.warning("No clients were connected!") return wrapper class Client: + resume_quee = True + clients = {} + on_connect = None # Use as callback functions + on_first_connect = None + on_disconnect = None + def __init__(self, ip): self.socket = None self.addr = None @@ -80,8 +208,10 @@ class Client: self._received_queue = collections.deque() self._request_queue = collections.OrderedDict() + self._send_lock = threading.Lock() + self.copter_id = None - self.malfunction = False + self.selected = False # Use to select copters for certain purposes Client.clients[ip] = self @@ -89,19 +219,24 @@ class Client: def connect(self, client_socket, client_addr): print("Client connected") - # self._send_queue = collections.deque() # comment for resuming queue after reconnection + if not Client.resume_quee: + self._send_queue = collections.deque() self.socket = client_socket self.addr = client_addr self.socket.setblocking(0) self.connected = True - client_thread = threading.Thread(target=self._run, args=()) + client_thread = threading.Thread(target=self._run, name="Client {} thread".format(self.addr)) client_thread.start() if self.copter_id is None: self.copter_id = self.get_response("id") print("Got copter id:", self.copter_id) - drone_list.insert("", "end", self.addr[0], text=self.copter_id) + if Client.on_first_connect: + Client.on_first_connect(self) + + if Client.on_connect: + Client.on_connect(self) def _send_all(self, msg): self.socket.sendall(struct.pack('>I', len(msg)) + msg) @@ -109,7 +244,7 @@ class Client: def _receive_all(self, n): data = b'' while len(data) < n: - packet = self.socket.recv(min(n - len(data), BUFFER_SIZE)) + packet = self.socket.recv(min(n - len(data), Server.BUFFER_SIZE)) if not packet: return None data += packet @@ -127,32 +262,33 @@ class Client: while self.connected: try: if self._send_queue: - msg = self._send_queue.popleft() + with self._send_lock: + msg = self._send_queue.popleft() print("Send", msg, "to", self.addr) try: self._send_all(msg) except socket.error as e: print("Attempt to send failed") - self._send_queue.appendleft(msg) + with self._send_lock: + self._send_queue.appendleft(msg) raise e - else: - msg = "ping" - # self._send_all(msg) try: # check if data in buffer - check = self.socket.recv(BUFFER_SIZE, socket.MSG_PEEK) + check = self.socket.recv(Server.BUFFER_SIZE, socket.MSG_PEEK) if check: received = self._receive_message() if received: received = received.decode("UTF-8") - print("Recived", received, "from", self.addr) - command, args = Client.parse_command(received) + print("Received", received, "from", self.addr) + command, args = Client.parse_message(received) if command == "response": for key, value in self._request_queue.items(): - if not value: - self._request_queue[key] = args[0] + if not value and key == args["value_name"]: + self._request_queue[key] = args['value'] print("Request successfully closed") break + else: + print("Unexpected request") else: self._received_queue.appendleft(received) except socket.error: @@ -162,178 +298,94 @@ class Client: print("Client error: {}, disconnected".format(e)) self.connected = False self.socket.close() + if Client.on_disconnect: + Client.on_disconnect(self) break # time.sleep(0.05) @staticmethod - def form_command(command: str, args=()): # Change for different protocol - return " ".join([command, *args]) + def form_message(command: str, dict_arguments: dict = None): + if dict_arguments is None: + dict_arguments = {} + msg_dict = {command: str(dict_arguments).replace(",", '').replace("'", '')[1:-1]} + msg = json.dumps(msg_dict) + return msg @staticmethod - def parse_command(command_input): - args = command_input.split() - command = args.pop(0) - return command, args + def parse_message(msg): + try: + j_message = json.loads(msg) + except json.decoder.JSONDecodeError: + print("Json string not in correct format") + return None, None + + str_command = list(j_message.keys())[0] + + arguments = list(j_message.values())[0].replace(":", '').split() + dict_arguments = collections.OrderedDict(zip(arguments[::2], arguments[1::2])) + return str_command, dict_arguments @requires_connect def send(self, *messages): for message in messages: - self._send_queue.append(bytes(message, "UTF-8")) - - @staticmethod - def broadcast(message, force_all=False): - if Client.clients: - for client in Client.clients.values(): - if (not client.malfunction) or force_all: - client.send(message) - else: - print("No clients were connected!") - - @requires_connect - def send_file(self, filepath, dest_filename): - print("Sending file ", dest_filename) - self.send(Client.form_command("writefile", (dest_filename,))) - file = open(filepath, 'rb') - chunk = file.read(BUFFER_SIZE) - while chunk: - self._send_queue.append(chunk) - chunk = file.read(BUFFER_SIZE) - file.close() - self.send(Client.form_command("/endoffile")) - print("File sent") + with self._send_lock: + self._send_queue.append(bytes(message, "UTF-8")) @requires_connect def get_response(self, requested_value): self._request_queue[requested_value] = "" - self.send(Client.form_command("request", (requested_value, ))) + self.send(Client.form_message("request", {"value": requested_value})) while not self._request_queue[requested_value]: pass return self._request_queue.pop(requested_value) + @requires_connect + def send_file(self, filepath, dest_filename): + print("Sending file ", dest_filename) + chunk_count = math.ceil(os.path.getsize(filepath) / Server.BUFFER_SIZE) + self.send(Client.form_message("writefile", {"filesize": chunk_count, "filename": dest_filename})) + with open(filepath, 'rb') as file: + chunk = file.read(Server.BUFFER_SIZE) + while chunk: + with self._send_lock: + self._send_queue.append(chunk) + chunk = file.read(Server.BUFFER_SIZE) -# UI functions -def stop_swarm(): - Client.broadcast("stop") # для тестирования + self.send(Client.form_message("/endoffile")) # TODO mb remove + print("File sent") + @staticmethod + @requires_any_connected + def send_to_selected(message): + for client in Client.clients.values(): + if client.connected and client.selected: + client.send(message) -def land_all(): - Client.broadcast("land") + @staticmethod + @requires_any_connected + def request_to_selected(requested_value): + for client in Client.clients.values(): + if client.connected and client.selected: + client.get_response(requested_value) + @staticmethod + @requires_any_connected + def broadcast(message, force_all=False): + for client in Client.clients.values(): + if client.connected or force_all: + client.send(message) -def disarm_all(): - Client.broadcast("disarm") + @staticmethod + def send_config_options(*options: ConfigOption): + for option in options: + Client.send_to_selected( + Client.form_message('config_write', + {'section': option.section, 'option': option.option, 'value': option.value})) + Client.send_to_selected(Client.form_message("config_reload")) -def takeoff_all(): - Client.broadcast("takeoff") - - -def send_animations(): - path = filedialog.askdirectory(title="Animation directory") - if path: - print("Selected directory:", path) - files = [file for file in glob.glob(path+'/*.csv')] - names = [os.path.basename(file).split(".")[0] for file in files] - print(files) - for file, name in zip(files, names): - for copter in Client.clients.values(): - if name == copter.copter_id: - copter.send_file(file, "animation.csv") # TODO config - else: - print("Filename not matches with any drone connected") - # dr = next(iter(Client.clients.values())) # костыль для тестирования - # ANS = dr.get_response("someshit") - # print(ANS) - - -def send_starttime(dt=15): - timenow = time.time() - print('Now:', time.ctime(timenow), "+ dt =", dt) - Client.broadcast(Client.form_command("starttime", (str(timenow+dt), ))) - - -# UI build here -root = Tk() -root.wm_title("Drone swarm operation server") -root.style = ttk.Style() -root.style.theme_use("default") - -leftFrame = Frame(root) -leftFrame.grid(row=0, column=0, padx=10, pady=10) -rightFrame = Frame(root) -rightFrame.grid(row=0, column=1, padx=10, pady=10) - -drone_list = ttkwidgets.CheckboxTreeview(leftFrame, columns=("addr", "connected")) -# drone_list["columns"] = ("addr") -# drone_list.column("name") #width=100 -# drone_list.column("addr") -drone_list.heading("#0", text="Drone name") -drone_list.heading("#1", text="Connection adress") -drone_list.heading("#2", text="Connection status") -drone_list.pack() - -button_frame = Frame(leftFrame, borderwidth=1, relief="solid") -button_frame.pack(fill=BOTH, expand=True) - -land_all_btn = ttk.Button(button_frame, text="Disarm all", command=disarm_all) -land_all_btn.pack(side=RIGHT, padx=5, pady=5) - -land_all_btn = ttk.Button(button_frame, text="Land all", command=land_all) -land_all_btn.pack(side=RIGHT, padx=5, pady=5) - -stop_all_btn = ttk.Button(button_frame, text="Stop swarm", command=stop_swarm) -stop_all_btn.pack(side=RIGHT, padx=5, pady=5) - - -send_animation_btn = ttk.Button(button_frame, text="Send animations", command=send_animations) -send_animation_btn.pack(side=LEFT, padx=5, pady=5) - -send_starttime_btn = Button(button_frame, bg='red', text="Takeoff all", command=takeoff_all) -send_starttime_btn.pack(side=LEFT, padx=5, pady=5) - -send_starttime_btn = ttk.Button(button_frame, text="Start animation after...", command=send_starttime) -send_starttime_btn.pack(side=LEFT, padx=5, pady=5) - - -def gui_update(): - time.sleep(0.1) - - -# reading config -config = configparser.ConfigParser() -config.read("server_config.ini") - -port = int(config['SERVER']['port']) -BUFFER_SIZE = int(config['SERVER']['buffer_size']) -NTP_HOST = config['NTP']['host'] -NTP_PORT = int(config['NTP']['port']) - - -ServerSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -ServerSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) -host = socket.gethostname() -ip = get_ip_address() - -print('Server started on', host, ip, ":", port) -# print('Now:', time.ctime(get_ntp_time(NTP_HOST, NTP_PORT))) -print('Waiting for clients...') -ServerSocket.bind((ip, port)) - -autoconnect_thread = threading.Thread(target=auto_connect) -autoconnect_thread.daemon = True -autoconnect_thread.start() - -broadcast_thread = threading.Thread(target=ip_broadcast, args=(ip, port, )) -broadcast_thread.daemon = True -broadcast_thread.start() - if __name__ == '__main__': - try: - mainloop() - except KeyboardInterrupt: - print("Stopping server by keyboard interrupt") - finally: - ServerSocket.close() - print("Server shutdown") + server = Server() + server.start() diff --git a/Server/server_config.ini b/Server/server_config.ini index 2cd2740..e3361fa 100644 --- a/Server/server_config.ini +++ b/Server/server_config.ini @@ -1,6 +1,7 @@ [SERVER] port = 25000 broadcast_port = 8181 +broadcast_delay = 5 buffer_size = 1024 [NTP] diff --git a/Server/server_qt.py b/Server/server_qt.py index ade8d90..d53c7b0 100644 --- a/Server/server_qt.py +++ b/Server/server_qt.py @@ -1,3 +1,5 @@ +import glob + from PyQt5 import QtWidgets from PyQt5.QtGui import QStandardItem from PyQt5.QtGui import QStandardItemModel @@ -10,274 +12,7 @@ from PyQt5.QtWidgets import QFileDialog # Importing gui form from server_gui import Ui_MainWindow - -import os -import sys -import glob -import time -import json -import struct -import socket -import random -import threading -import collections -import configparser - -# All imports sorted in pyramid - -# Functions - - -def get_ip_address(): - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.connect(("8.8.8.8", 80)) - my_ip = s.getsockname()[0] - s.close() - return my_ip - - -def auto_connect(): - while True: - ServerSocket.listen(1) - c, addr = ServerSocket.accept() - print("Got connection from:", str(addr)) - if not any(client_addr == addr[0] for client_addr in Client.clients.keys()): - client = Client(addr[0]) - print("New client") - else: - print("Reconnected client") - Client.clients[addr[0]].connect(c, addr) - - -def ip_broadcast(server_ip, server_port): - msg = bytes(Client.form_message( - "server_ip", {"host": server_ip, "port": str(server_port), "id": SERVER_TID} - ), "UTF-8") - broadcast_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) - broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - while True: - time.sleep(10) - broadcast_sock.sendto(msg, ('255.255.255.255', broadcast_port)) - print("Broadcast sent") - - -def broadcast_listener(): - broadcast_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - broadcast_client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - try: - broadcast_client.bind(("", broadcast_port)) - except OSError: - print("Another server is running on this computer, shutting down!") - # error_dialog = QtWidgets.QErrorMessage() - # error_dialog.setModal(True) - # error_dialog.showMessage('Another server detected on network, shutting down!') - # error_dialog.exec_() - sys.exit() - - while True: - data, addr = broadcast_client.recvfrom(1024) - command, args = Client.parse_message(data.decode("UTF-8")) - if command == "server_ip": - if args["id"] != SERVER_TID: - print("Another server detected on network, shutting down") - # error_dialog = QtWidgets.QErrorMessage() - # error_dialog.setModal(True) - # error_dialog.showMessage('Another server detected on network, shutting down!') - # error_dialog.exec_() - sys.exit() - - -NTP_DELTA = 2208988800 # 1970-01-01 00:00:00 -NTP_QUERY = b'\x1b' + bytes(47) - - -def get_ntp_time(ntp_host, ntp_port): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - s.sendto(NTP_QUERY, (ntp_host, ntp_port)) - msg, _ = s.recvfrom(1024) - return int.from_bytes(msg[-8:], 'big') / 2 ** 32 - NTP_DELTA - - -def requires_connect(f): - def wrapper(*args, **kwargs): - if args[0].connected: - return f(*args, **kwargs) - else: - print("Function requires client to be connected!") - return wrapper - - -class Client: - clients = {} - - def __init__(self, ip): - self.socket = None - self.addr = None - - self._send_queue = collections.deque() - self._received_queue = collections.deque() - self._request_queue = collections.OrderedDict() - - self.copter_id = None - self.malfunction = False - - Client.clients[ip] = self - - self.connected = False - - def connect(self, client_socket, client_addr): - print("Client connected") - # self._send_queue = collections.deque() # comment for resuming queue after reconnection - - self.socket = client_socket - self.addr = client_addr - - self.socket.setblocking(0) - self.connected = True - client_thread = threading.Thread(target=self._run, args=()) - client_thread.start() - if self.copter_id is None: - self.copter_id = self.get_response("id") - print("Got copter id:", self.copter_id) - model.appendRow((QStandardItem(self.copter_id), )) # TODO: get responses for another columns - - def _send_all(self, msg): - self.socket.sendall(struct.pack('>I', len(msg)) + msg) - - def _receive_all(self, n): - data = b'' - while len(data) < n: - packet = self.socket.recv(min(n - len(data), BUFFER_SIZE)) - if not packet: - return None - data += packet - return data - - def _receive_message(self): - raw_msglen = self._receive_all(4) - if not raw_msglen: - return None - msglen = struct.unpack('>I', raw_msglen)[0] - msg = self._receive_all(msglen) - return msg - - def _run(self): - while self.connected: - try: - if self._send_queue: - msg = self._send_queue.popleft() - print("Send", msg, "to", self.addr) - try: - self._send_all(msg) - except socket.error as e: - print("Attempt to send failed") - self._send_queue.appendleft(msg) - raise e - else: - msg = "ping" - # self._send_all(msg) - - try: # check if data in buffer - check = self.socket.recv(BUFFER_SIZE, socket.MSG_PEEK) - if check: - received = self._receive_message() - if received: - received = received.decode("UTF-8") - print("Received", received, "from", self.addr) - command, args = Client.parse_message(received) - if command == "response": - for key, value in self._request_queue.items(): - if not value: - self._request_queue[key] = args['value'] - print("Request successfully closed") - break - else: - self._received_queue.appendleft(received) - except socket.error: - pass - - except socket.error as e: - print("Client error: {}, disconnected".format(e)) - self.connected = False - self.socket.close() - break - # time.sleep(0.05) - - @staticmethod - def form_message(command: str, dict_arguments: dict = None): - if dict_arguments is None: - dict_arguments = {} - msg_dict = {command: str(dict_arguments).replace(",", '').replace("'", '')[1:-1]} - msg = json.dumps(msg_dict) - return msg - - @staticmethod - def parse_message(msg): - try: - j_message = json.loads(msg) - except json.decoder.JSONDecodeError: - print("Json string not in correct format") - return None - - str_command = list(j_message.keys())[0] - - arguments = list(j_message.values())[0].replace(":", '').split() - dict_arguments = collections.OrderedDict(zip(arguments[::2], arguments[1::2])) - return str_command, dict_arguments - - @requires_connect - def send(self, *messages): - for message in messages: - self._send_queue.append(bytes(message, "UTF-8")) - - @requires_connect - def get_response(self, requested_value): - self._request_queue[requested_value] = "" - self.send(Client.form_message("request", {"value": requested_value})) - - while not self._request_queue[requested_value]: - pass - - return self._request_queue.pop(requested_value) - - @staticmethod - def send_to_selected(*messages): - if Client.clients: - for client in Client.clients.values(): # TODO change to selected - client.send(*messages) - else: - print("No clients were connected!") - - @staticmethod - def request_to_selected(requested_value): - if Client.clients: - for client in Client.clients.values(): # TODO change to selected - client.get_response(requested_value) - else: - print("No clients were connected!") - - @staticmethod - def broadcast(message, force_all=False): - if Client.clients: - for client in Client.clients.values(): - if (not client.malfunction) or force_all: - client.send(message) - else: - print("No clients were connected!") - - @requires_connect - def send_file(self, filepath, dest_filename): - print("Sending file ", dest_filename) - self.send(Client.form_message("writefile", {"filename": dest_filename})) - file = open(filepath, 'rb') - chunk = file.read(BUFFER_SIZE) - while chunk: - self._send_queue.append(chunk) # TODO os.sendfile - chunk = file.read(BUFFER_SIZE) - file.close() - self.send(Client.form_message("/endoffile")) - print("File sent") +from server import * class MainWindow(QtWidgets.QMainWindow): @@ -315,12 +50,7 @@ class MainWindow(QtWidgets.QMainWindow): @pyqtSlot() def send_starttime(self): dt = self.ui.start_delay_spin.value() - if USE_NTP: - timenow = get_ntp_time(NTP_HOST, NTP_PORT) - else: - timenow = time.time() - print('Now:', time.ctime(timenow), "+ dt =", dt) - Client.send_to_selected(Client.form_message("starttime", {"time": str(timenow + dt)})) + server.send_starttime(dt) @pyqtSlot() def stop_all(self): @@ -369,14 +99,13 @@ class MainWindow(QtWidgets.QMainWindow): print("Selected file:", path) sendable_config = configparser.ConfigParser() sendable_config.read(path) + options = [] for section in sendable_config.sections(): for option in dict(sendable_config.items(section)): value = sendable_config[section][option] print("Got item from config:", section, option, value) - Client.send_to_selected( - Client.form_message('config_write', {'section': section, 'option': option, 'value': value}) - ) - Client.send_to_selected(Client.form_message("config_reload")) + options.append(ConfigOption(section, option, value)) + Client.send_config_options(*options) model = QStandardItemModel() @@ -386,53 +115,25 @@ model.setHorizontalHeaderLabels( model.setColumnCount(6) model.setRowCount(0) -# Pre-initialization -# reading config -config = configparser.ConfigParser() -config.read("server_config.ini") -port = int(config['SERVER']['port']) -broadcast_port = int(config['SERVER']['broadcast_port']) -BUFFER_SIZE = int(config['SERVER']['buffer_size']) -USE_NTP = config.getboolean('NTP', 'use_ntp') -NTP_HOST = config['NTP']['host'] -NTP_PORT = int(config['NTP']['port']) +def client_connected(self: Client): + batt = self.get_response("batt_voltage") + model.appendRow((QStandardItem(self.copter_id), )) # TODO: get responses for another columns + model.setData(model.index(0, 2), batt) -random.seed() -SERVER_TID = str(random.randint(0, 9999)).zfill(4) -ServerSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -ServerSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) -host = socket.gethostname() -ip = get_ip_address() + + +Client.on_connect = client_connected if __name__ == "__main__": app = QtWidgets.QApplication(sys.argv) window = MainWindow() - print('Server started on', host, ip, ":", port) + server = Server() + server.start() - if USE_NTP: - print("Using NTP time:") - now = get_ntp_time(NTP_HOST, NTP_PORT) - else: - now = time.time() - print('Now:', time.ctime(now)) - - print('Waiting for clients...') - ServerSocket.bind((ip, port)) - - autoconnect_thread = threading.Thread(target=auto_connect) - autoconnect_thread.daemon = True - autoconnect_thread.start() - - broadcast_thread = threading.Thread(target=ip_broadcast, args=(ip, port,)) - broadcast_thread.daemon = True - broadcast_thread.start() - - listener_thread = threading.Thread(target=broadcast_listener) - listener_thread.daemon = True - #listener_thread.start() - - sys.exit(app.exec_()) + app.exec_() + server.stop() + sys.exit()