Minimal operateable server+copter client. Many improvements, including architecture improvements

This commit is contained in:
Artem30801
2019-04-16 23:03:24 +03:00
parent 733b4e84c1
commit efa081efc3
6 changed files with 165 additions and 326 deletions

View File

@@ -5,17 +5,17 @@ import socket
import struct
import logging
import collections
import selectors2 as selectors
import ConfigParser
import selectors2 as selectors
from contextlib import closing
import os,sys,inspect
import os,sys,inspect # Add parent dir to PATH to import messaging_lib
current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parent_dir = os.path.dirname(current_dir)
sys.path.insert(0, parent_dir)
import messaging_lib as messaging
random.seed()
logging.basicConfig( # TODO all prints as logs
level=logging.DEBUG, # INFO
@@ -25,9 +25,11 @@ logging.basicConfig( # TODO all prints as logs
logging.StreamHandler()
])
ConfigOption = collections.namedtuple("ConfigOption", ["section", "option", "value"])
class Client:
active_client = None
active_client = None # maybe needs to be refactored
class Client(object):
def __init__(self, config_path="client_config.ini"):
self.selector = selectors.DefaultSelector()
self.client_socket = None
@@ -46,7 +48,8 @@ class Client:
self.config = ConfigParser.ConfigParser()
self.load_config()
Client.active_client = self
global active_client
active_client = self
def load_config(self):
self.config.read(self.config_path)
@@ -59,29 +62,27 @@ class Client:
self.NTP_HOST = self.config.get('NTP', 'host')
self.NTP_PORT = self.config.getint('NTP', 'port')
files_directory = self.config.get('FILETRANSFER', 'files_directory')
#FRAME_ID = self.config.get('COPTERS', 'frame_id') # TODO in play_animation
#self.TAKEOFF_HEIGHT = self.config.getfloat('COPTERS', 'takeoff_height')
#self.TAKEOFF_TIME = self.config.getfloat('COPTERS', 'takeoff_time')
#self.RFP_TIME = self.config.getfloat('COPTERS', 'reach_first_point_time')
#self.SAFE_TAKEOFF = self.config.getboolean('COPTERS', 'safe_takeoff')
#self.X0_COMMON = self.config.getfloat('COPTERS', 'x0_common')
#self.Y0_COMMON = self.config.getfloat('COPTERS', 'y0_common')
#self.X0 = self.config.getfloat('PRIVATE', 'x0')
#self.Y0 = self.config.getfloat('PRIVATE', 'y0')
#self.USE_LEDS = config.getboolean('PRIVATE', 'use_leds')
#play_animation.USE_LEDS = USE_LEDS # TODO in copter_client
self.files_directory = self.config.get('FILETRANSFER', 'files_directory')
self.client_id = self.config.get('PRIVATE', 'id')
if self.client_id == 'default':
self.client_id = 'copter' + str(random.randrange(9999)).zfill(4)
#write_to_config('PRIVATE', 'id', client_id)
self.write_config(False, 'PRIVATE', 'id', self.client_id)
elif self.client_id == '/hostname':
self.client_id = socket.gethostname()
def rewrite_config(self):
with open(self.config_path, 'w') as file:
self.config.write(file)
def write_config(self, reload_config=True, *config_options):
for config_option in config_options:
self.config.set(config_option.section, config_option.option, config_option.value)
self.rewrite_config()
if reload_config:
self.load_config()
@staticmethod
def get_ntp_time(ntp_host, ntp_port):
NTP_PACKET_FORMAT = "!12I"
@@ -94,13 +95,20 @@ class Client:
unpacked = struct.unpack(NTP_PACKET_FORMAT, msg[0:struct.calcsize(NTP_PACKET_FORMAT)])
return unpacked[10] + float(unpacked[11]) / 2 ** 32 - NTP_DELTA
def time_now(self):
if self.USE_NTP:
timenow = self.get_ntp_time(self.NTP_HOST, self.NTP_PORT)
else:
timenow = time.time()
return timenow
def start(self):
try:
while True:
self._reconnect()
self._process_connections()
except (KeyboardInterrupt, errno.EINTR):
except (KeyboardInterrupt, InterruptedError):
logging.critical("Caught interrupt, exiting!")
self.selector.close()
@@ -155,11 +163,12 @@ class Client:
logging.info("Received broadcast message {} from {}".format(message.content, addr))
if message.content["command"] == "server_ip":
args = message.content["args"]
self.server_host = args["host"]
self.server_port = int(args["port"])
self.server_host = args["host"]
self.write_config(False,
ConfigOption("SERVER", "port", self.server_port),
ConfigOption("SERVER", "host", self.server_host))
logging.info("Binding to new IP: {}:{}".format(self.server_host, self.server_port))
#write_to_config("SERVER", "port", port)
#write_to_config("SERVER", "host", host) # TODO
break
finally:
broadcast_client.close()
@@ -187,10 +196,20 @@ class Client:
logging.warning("No active connections left!")
return
@messaging.request_callback("id")
def response_id():
return Client.active_client.client_id
@messaging.request_callback("id")
def _response_id():
return active_client.client_id
@messaging.request_callback("time")
def _response_time():
return active_client.time_now()
@messaging.message_callback("config_write")
def _command_config_write(*args, **kwargs):
options = [ConfigOption(**raw_option) for raw_option in kwargs["options"]]
logging.info("Writing config options: {}".format(options))
active_client.write_config(kwargs["reload"], *options)
if __name__ == "__main__":
client = Client()

View File

@@ -1,252 +1,64 @@
from __future__ import print_function
import os
import sys
import socket
import struct
import random
import time
import errno
import json
import logging
import threading
import ConfigParser
import rospy
import pause
from .. import messaging_lib
from FlightLib import FlightLib
from FlightLib import LedLib
import client
import messaging_lib as messaging
import play_animation
def recive_file(filename):
print("Receiving file:", filename)
with open(filename, 'wb') as file: # TODO add directory
while True:
data = recive_message() #clientSocket.recv(BUFFER_SIZE)
if data:
print(data)
if parse_message(data.decode("UTF-8"))[0] == "/endoffile":
print("File received")
break
file.write(data)
else:
break
def
class CopterClient(client.Client):
def load_config(self):
super(CopterClient, self).load_config()
self.FRAME_ID = self.config.get('COPTERS', 'frame_id') # TODO in play_animation
self.TAKEOFF_HEIGHT = self.config.getfloat('COPTERS', 'takeoff_height')
self.TAKEOFF_TIME = self.config.getfloat('COPTERS', 'takeoff_time')
self.RFP_TIME = self.config.getfloat('COPTERS', 'reach_first_point_time')
self.SAFE_TAKEOFF = self.config.getboolean('COPTERS', 'safe_takeoff')
self.X0_COMMON = self.config.getfloat('COPTERS', 'x0_common')
self.Y0_COMMON = self.config.getfloat('COPTERS', 'y0_common')
self.X0 = self.config.getfloat('PRIVATE', 'x0')
self.Y0 = self.config.getfloat('PRIVATE', 'y0')
self.USE_LEDS = self.config.getboolean('PRIVATE', 'use_leds')
play_animation.USE_LEDS = self.USE_LEDS
def start(self):
super(CopterClient, self).start()
rospy.init_node('Swarm_client', anonymous=True)
if self.USE_LEDS:
LedLib.init_led()
def animation_player(running_event, stop_event):
print("Animation thread activated")
frames = play_animation.read_animation_file()
if not frames:
logging.error("Animation is empty, shutting down animation player")
return
delay_time = 0.125
print("Takeoff")
play_animation.takeoff(z=TAKEOFF_HEIGHT, safe_takeoff=SAFE_TAKEOFF)
takeoff_time = starttime + TAKEOFF_TIME
dt = takeoff_time - time.time()
print("Wait until takeoff " + str(dt) + "s: " + time.ctime(takeoff_time))
pause.until(takeoff_time)
print("Reach first point")
play_animation.reach_frame(frames[0], x0=X0+X0_COMMON, y0=Y0+Y0_COMMON) #Reach first point at the same time with others
rfp_time = takeoff_time + RFP_TIME
dt = rfp_time - time.time()
print("Wait reaching first point " + str(dt) + "s: " + time.ctime(rfp_time))
pause.until(rfp_time)
next_frame_time = rfp_time
print("Start animation at " + str(time.time()))
for frame in frames:
running_event.wait()
play_animation.animate_frame(frame, x0=X0+X0_COMMON, y0=Y0+Y0_COMMON)
next_frame_time += delay_time
if stop_event.is_set():
running_animation_event.clear()
break
#rate.sleep()
pause.until(next_frame_time)
else:
play_animation.land()
print("Animation ended")
print("Animation thread closed")
stop_animation_event = threading.Event()
running_animation_event = threading.Event()
def start_animation(*args, **kwargs):
animation_thread = threading.Thread(target=animation_player, args=(running_animation_event, stop_animation_event))
print("Starting animation!")
running_animation_event.set()
stop_animation_event.clear()
animation_thread.start()
def resume_animation():
print("Resuming animation")
running_animation_event.set()
def pause_animation():
print("Pausing animation")
running_animation_event.clear()
def stop_animation():
stop_animation_event.set()
print("Stopping animation")
# animation_thread.join()
def selfcheck():
@messaging.request_callback("selfcheck")
def _response_selfcheck():
return FlightLib.selfcheck()
@messaging.request_callback("batt_voltage")
def _response_batt():
return FlightLib.get_telemetry('body').voltage
def write_to_config(section, option, value):
config.set(section, option, value)
with open(CONFIG_PATH, 'w') as file: # TODO as separate function
config.write(file)
@messaging.request_callback("cell_voltage")
def _response_cell():
return FlightLib.get_telemetry('body').cell_voltage
@messaging.message_callback("service_restart")
def _message_service_restart(*args, **kwargs):
os.system("systemctl restart"+kwargs["name"])
@messaging.message_callback("led_test")
def _command_config_write(*args, **kwargs):
LedLib.chase(255, 255, 255)
time.sleep(2)
LedLib.off()
def load_config():
global config, CONFIG_PATH
global broadcast_port, port, host, BUFFER_SIZE
global USE_NTP, NTP_HOST, NTP_PORT
global files_directory, animation_file
global FRAME_ID, TAKEOFF_HEIGHT, TAKEOFF_TIME, SAFE_TAKEOFF, RFP_TIME
global USE_LEDS, COPTER_ID
global X0, X0_COMMON, Y0, Y0_COMMON
CONFIG_PATH = "client_config.ini"
config = ConfigParser.ConfigParser()
config.read(CONFIG_PATH)
broadcast_port = config.getint('SERVER', 'broadcast_port')
port = config.getint('SERVER', 'port')
host = config.get('SERVER', 'host')
BUFFER_SIZE = config.getint('SERVER', 'buffer_size')
USE_NTP = config.getboolean('NTP', 'use_ntp')
NTP_HOST = config.get('NTP', 'host')
NTP_PORT = config.getint('NTP', 'port')
files_directory = config.get('FILETRANSFER', 'files_directory')
animation_file = config.get('FILETRANSFER', 'animation_file')
FRAME_ID = config.get('COPTERS', 'frame_id') # TODO in play_animation
TAKEOFF_HEIGHT = config.getfloat('COPTERS', 'takeoff_height')
TAKEOFF_TIME = config.getfloat('COPTERS', 'takeoff_time')
RFP_TIME = config.getfloat('COPTERS', 'reach_first_point_time')
SAFE_TAKEOFF = config.getboolean('COPTERS', 'safe_takeoff')
X0_COMMON = config.getfloat('COPTERS', 'x0_common')
Y0_COMMON = config.getfloat('COPTERS', 'y0_common')
X0 = config.getfloat('PRIVATE', 'x0')
Y0 = config.getfloat('PRIVATE', 'y0')
USE_LEDS = config.getboolean('PRIVATE', 'use_leds')
play_animation.USE_LEDS = USE_LEDS
COPTER_ID = config.get('PRIVATE', 'id')
if COPTER_ID == 'default':
COPTER_ID = 'copter' + str(random.randrange(9999)).zfill(4)
write_to_config('PRIVATE', 'id', COPTER_ID)
elif COPTER_ID == '/hostname':
COPTER_ID = socket.gethostname()
load_config()
rospy.init_node('Swarm_client', anonymous=True)
if USE_LEDS:
LedLib.init_led()
print("Client started on copter:", COPTER_ID)
if USE_NTP:
print("NTP time:", time.ctime(get_ntp_time(NTP_HOST, NTP_PORT)))
print("System time", time.ctime(time.time()))
reconnect()
print("Connected to server")
try:
while True:
try:
message = recive_message()
if message:
message = message.decode("UTF-8")
command, args = parse_message(message)
print("Command from server:", command, args)
if command == "writefile":
recive_file(args['filename'])
if bool(args['clever_restart']):
os.system("systemctl restart clever")
elif command == 'config_write':
write_to_config(args['section'], args['option'], args['value'])
elif command == 'config_reload':
load_config()
elif command == "starttime":
global starttime
starttime = float(args['time'])
print("Starting on:", time.ctime(starttime))
dt = starttime - time.time()
if USE_NTP:
dt = starttime - get_ntp_time(NTP_HOST, NTP_PORT)
print("Until start:", dt)
rospy.Timer(rospy.Duration(dt), start_animation, oneshot=True)
elif command == 'takeoff':
play_animation.takeoff(safe_takeoff=SAFE_TAKEOFF)
elif command == 'pause':
pause_animation()
elif command == 'resume':
resume_animation()
elif command == 'stop':
stop_animation()
FlightLib.interrupt()
elif command == 'land':
play_animation.land()
elif command == 'disarm':
FlightLib.arming(False)
elif command == 'led_test':
LedLib.fill(255, 255, 255)
time.sleep(2)
LedLib.off()
elif command == 'request':
request_target = args['value']
print("Got request for:", request_target)
response = ""
if request_target == 'test':
response = "test_success"
elif request_target == 'id':
response = COPTER_ID
elif request_target == 'selfcheck':
check = FlightLib.selfcheck()
response = check if check else "OK"
elif request_target == 'batt_voltage':
response = FlightLib.get_telemetry('body').voltage
elif request_target == 'cell_voltage':
response = FlightLib.get_telemetry('body').cell_voltage
send_all(bytes(form_message("response",
{"status": "ok", "value": response, "value_name": str(request_target)})))
print("Request responded with:", response)
except socket.error as e:
if e.errno != errno.EINTR:
print("Connection lost due error:", e)
print("Reconnecting...")
reconnect()
print("Re-connection successful")
else:
print("Interrupted")
raise KeyboardInterrupt
except KeyboardInterrupt:
print("Shutdown on keyboard interrupt")
finally:
clientSocket.close()
if __name__ == "__main__":
copter_client = CopterClient()
copter_client.start()

View File

@@ -16,9 +16,8 @@ def takeoff(z=1.5, safe_takeoff=True, timeout=5000):
if USE_LEDS:
LedLib.wipe_to(255, 0, 0)
if safe_takeoff:
FlightLib.takeoff(z=z, wait=True, timeout_takeoff = timeout, emergency_land=True) # TODO dont forget change back to takeoff
else:
FlightLib.takeoff(z=z, wait=True, emergency_land=False)
FlightLib.takeoff(z=z, wait=True, timeout_takeoff=timeout, emergency_land=safe_takeoff)
LedLib.blink(0, 255, 0)
def land(descend=False):

View File

@@ -95,9 +95,13 @@ class Server:
@staticmethod
def get_ip_address():
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ip_socket:
ip_socket.connect(("8.8.8.8", 80))
return ip_socket.getsockname()[0]
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ip_socket:
ip_socket.connect(("8.8.8.8", 80))
return ip_socket.getsockname()[0]
except OSError:
logging.warning("No network connection detected, starting on localhost")
return "localhost"
@staticmethod
def get_ntp_time(ntp_host, ntp_port):
@@ -108,6 +112,13 @@ class Server:
msg, _ = ntp_socket.recvfrom(1024)
return int.from_bytes(msg[-8:], 'big') / 2 ** 32 - NTP_DELTA
def time_now(self):
if self.USE_NTP:
timenow = self.get_ntp_time(self.NTP_HOST, self.NTP_PORT)
else:
timenow = time.time()
return timenow
# noinspection PyArgumentList
def _client_processor(self):
logging.info("Client processor (selector) thread started!")
@@ -189,13 +200,6 @@ class Server:
broadcast_client.close()
logging.info("Broadcast listener thread stopped, socked closed!")
def time_now(self):
if self.USE_NTP:
timenow = Server.get_ntp_time(self.NTP_HOST, self.NTP_PORT)
else:
timenow = time.time()
return timenow
def send_starttime(self, copter, dt=0):
timenow = self.time_now()
print('Now:', time.ctime(timenow), "+ dt =", dt)
@@ -270,20 +274,19 @@ class Client(messaging.ConnectionManager):
super(Client, self).close()
@requires_connect
def _send(self, data):
super(Client, self)._send(data)
logging.debug("Queued data to send: {}".format(data))
def send_config_options(self, *options: ConfigOption):
def send_config_options(self, *options: ConfigOption, reload_config=True):
logging.info("Sending config options: {} to {}".format(options, self.addr))
for option in options:
self.send_message(
'config_write',
{'section': option.section, 'option': option.option, 'value': option.value}
)
self.send_message("config_reload")
sending_options = [{'section': option.section, 'option': option.option, 'value': option.value}
for option in options]
print(sending_options)
self.send_message(
'config_write', {"options": sending_options, "reload": reload_config}
)
@staticmethod
@requires_any_connected

View File

@@ -165,7 +165,7 @@ class MainWindow(QtWidgets.QMainWindow):
for section in sendable_config.sections():
for option in dict(sendable_config.items(section)):
value = sendable_config[section][option]
print("Got item from config:", section, option, value)
logging.debug("Got item from config:".format(section, option, value))
options.append(ConfigOption(section, option, value))
for row_num in range(model.rowCount()):
item = model.item(row_num, 0)
@@ -184,7 +184,7 @@ class MainWindow(QtWidgets.QMainWindow):
if item.isCheckable() and item.checkState() == Qt.Checked:
copter = Client.get_by_id(item.text())
copter.send_file(path, "/home/pi/catkin_ws/src/clever/aruco_pose/map/animation_map.txt")
# clever_restart=True
copter.send_message("service_restart", {"name": "clever"})
model = QStandardItemModel()

View File

@@ -9,12 +9,12 @@ import collections
try:
import selectors
except:
except ImportError:
import selectors2 as selectors
PendingRequest = collections.namedtuple("PendingRequest", ["value", "requested_value", # "expires_on",
"callback", "callback_args", "callback_kwargs",
#"obj",
# "obj",
])
@@ -97,11 +97,11 @@ class MessageManager:
self.jsonheader = self._json_decode(self.income_raw[:header_len], "utf-8")
self.income_raw = self.income_raw[header_len:]
for reqhdr in (
"byteorder",
"content-length",
"content-type",
"content-encoding",
"message-type",
"byteorder",
"content-length",
"content-type",
"content-encoding",
"message-type",
):
if reqhdr not in self.jsonheader:
raise ValueError('Missing required header {}'.format(reqhdr))
@@ -131,13 +131,29 @@ class MessageManager:
self._process_content()
def request_callback(string_command):
def message_callback(string_command):
def inner(f):
ConnectionManager.requests_callbacks.update({string_command: f})
ConnectionManager.messages_callbacks[string_command] = f
logging.debug("Registered message function {} for {}".format(f, string_command))
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
return inner
def request_callback(string_command):
def inner(f):
ConnectionManager.requests_callbacks[string_command] = f
logging.debug("Registered callback function {} for {}".format(f, string_command))
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
return inner
@@ -219,13 +235,17 @@ class ConnectionManager(object):
self._received_queue.appendleft(MessageManager())
self._received_queue[0].income_raw += self._recv_buffer
self._recv_buffer = b''
self._received_queue[0].process_message()
# if something left after processing message - put it back
if self._received_queue[0].content and self._received_queue[0].income_raw:
self._recv_buffer = self._received_queue[0].income_raw + self._recv_buffer
self._received_queue[0].income_raw = b''
self.process_received()
if self._received_queue:
if self._received_queue[-1].content:
self.process_received()
def _read(self):
try:
@@ -245,19 +265,19 @@ class ConnectionManager(object):
raise RuntimeError("Peer closed.")
def process_received(self):
if self._received_queue:
if self._received_queue[-1].content:
message = self._received_queue.pop()
message_type = message.jsonheader["message-type"]
logging.debug("Received message! Header: {}, content: {}".format(message.jsonheader, message.content))
if message_type == "message":
self._process_message(message)
elif message_type == "response":
self._process_response(message)
elif message_type == "request":
self._process_request(message)
elif message_type == "filetransfer":
self._process_filetransfer(message)
income_message = self._received_queue.pop()
message_type = income_message.jsonheader["message-type"]
logging.debug("Received message! Header: {}, content: {}".format(
income_message.jsonheader, income_message.content))
if message_type == "message":
self._process_message(income_message)
elif message_type == "response":
self._process_response(income_message)
elif message_type == "request":
self._process_request(income_message)
elif message_type == "filetransfer":
self._process_filetransfer(income_message)
def _process_message(self, message):
command = message.content["command"]
@@ -295,19 +315,8 @@ class ConnectionManager(object):
logging.debug(
"Request successfully closed with value {}".format(message.content["value"])
)
'''
if request.obj:
obj = request.obj
f = request.callback
obj.f(request.value, *request.callback_args, **request.callback_kwargs)
else:
f = request.callback
f(request.value, *request.callback_args, **request.callback_kwargs)
'''
f = request.callback
print(f)
f(value, *request.callback_args, **request.callback_kwargs)
break
@@ -329,7 +338,6 @@ class ConnectionManager(object):
with self._send_lock:
if (not self._send_buffer) and self._send_queue:
message = self._send_queue.popleft()
print(self._send_queue)
self._send_buffer += message
if self._send_buffer:
self._write()
@@ -351,14 +359,13 @@ class ConnectionManager(object):
else:
logging.debug("Sent {} to {}".format(self._send_buffer[:sent], self.addr))
self._send_buffer = self._send_buffer[sent:]
print(self._send_buffer)
def _send(self, data):
with self._send_lock:
self._send_queue.append(data)
def get_response(self, requested_value, callback, request_args=None, # timeout=30,
callback_args=(), callback_kwargs=None, obj=None):
callback_args=(), callback_kwargs=None):
if request_args is None:
request_args = {}
if callback_kwargs is None:
@@ -373,7 +380,6 @@ class ConnectionManager(object):
callback=callback,
callback_args=callback_args,
callback_kwargs=callback_kwargs,
#obj=obj
)
self._send(MessageManager.create_request(requested_value, request_id, request_args))
@@ -393,4 +399,4 @@ class ConnectionManager(object):
logging.info("Sending file {} to {} (as: {})".format(filepath, self.addr, dest_filepath))
self._send(MessageManager.create_message(
data, "binary", "filetransfer", "binary", {"filepath": dest_filepath}
))
))