mirror of
https://github.com/CopterExpress/clever-show.git
synced 2026-05-31 09:19:33 +00:00
2
.gitignore
vendored
2
.gitignore
vendored
@@ -109,3 +109,5 @@ Drone/test_animation/
|
||||
Drone/animation.csv
|
||||
Drone/client_logs
|
||||
images/
|
||||
|
||||
\.idea/
|
||||
|
||||
@@ -11,7 +11,7 @@ from mavros_msgs.srv import SetMode
|
||||
from mavros_msgs.srv import CommandBool
|
||||
from std_srvs.srv import Trigger
|
||||
|
||||
module_logger = logging.getLogger("FlightLib")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# create proxy service
|
||||
navigate = rospy.ServiceProxy('navigate', srv.Navigate)
|
||||
@@ -22,24 +22,36 @@ get_telemetry = rospy.ServiceProxy('get_telemetry', srv.GetTelemetry)
|
||||
arming = rospy.ServiceProxy('/mavros/cmd/arming', CommandBool)
|
||||
landing = rospy.ServiceProxy('/land', Trigger)
|
||||
|
||||
module_logger.info("Proxy services inited")
|
||||
logger.info("Proxy services inited")
|
||||
|
||||
# globals
|
||||
FREQUENCY = 1000/25 # HZ
|
||||
FREQUENCY = 40 # HZ
|
||||
TOLERANCE = 0.2
|
||||
SPEED = 1.0
|
||||
SPEED_TAKEOFF = 0.8
|
||||
TIMEOUT = 5.0
|
||||
TIMEOUT_ARMED = 2.0
|
||||
TIMEOUT_DESCEND = TIMEOUT
|
||||
TIMEOUT_LAND = 8.0
|
||||
Z_DESCEND = 0.5
|
||||
Z_TAKEOFF = 1.0
|
||||
FRAME_ID = 'map'
|
||||
INTERRUPTER = threading.Event()
|
||||
|
||||
interrupt_event = threading.Event()
|
||||
checklist = []
|
||||
|
||||
def arming_wrapper(state=False, interrupter=INTERRUPTER):
|
||||
arming(state)
|
||||
|
||||
def interrupt():
|
||||
module_logger.info("Performing function interrupt")
|
||||
interrupt_event.set()
|
||||
logger.info("Performing function interrupt")
|
||||
INTERRUPTER.set()
|
||||
|
||||
|
||||
def init(node_name="CleverSwarmFlight", anon=True, no_signals=True):
|
||||
module_logger.info("Initing ROS node")
|
||||
logger.info("Initing ROS node")
|
||||
rospy.init_node(node_name, anonymous=anon, disable_signals=no_signals)
|
||||
module_logger.info("Ros node inited")
|
||||
logger.info("Ros node inited")
|
||||
|
||||
|
||||
def get_distance3d(x1, y1, z1, x2, y2, z2):
|
||||
@@ -50,250 +62,298 @@ def check(check_name):
|
||||
def inner(f):
|
||||
def wrapper(*args, **kwargs):
|
||||
failures = f(*args, **kwargs)
|
||||
if failures:
|
||||
msgs = []
|
||||
for failure in failures:
|
||||
msg = "[{}]: Failure: {}".format(check_name, failure)
|
||||
msgs.append(msg)
|
||||
module_logger.warning(msg)
|
||||
print(failures)
|
||||
msgs = []
|
||||
for failure in failures:
|
||||
print(failure)
|
||||
msg = "[{}]: Failure: {}".format(check_name, failure)
|
||||
msgs.append(msg)
|
||||
logger.warning(msg)
|
||||
|
||||
if msgs:
|
||||
return msgs
|
||||
else:
|
||||
module_logger.info("[{}]: OK".format(check_name))
|
||||
logger.info("[{}]: OK".format(check_name))
|
||||
return None
|
||||
|
||||
checklist.append(wrapper)
|
||||
return wrapper
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
def _check_nans(*values):
|
||||
return any(math.isnan(x) for x in values)
|
||||
|
||||
|
||||
@check("FCU connection")
|
||||
def check_connection():
|
||||
telemetry = get_telemetry()
|
||||
if not telemetry.connected:
|
||||
yield ("Flight controller is not connected!")
|
||||
|
||||
|
||||
@check("Linear velocity estimation")
|
||||
def check_linear_speeds():
|
||||
failures = []
|
||||
def check_linear_speeds(speed_limit=0.1):
|
||||
telemetry = get_telemetry(frame_id='body')
|
||||
speed_limit = 0.1
|
||||
|
||||
if _check_nans(telemetry.vx, telemetry.vy, telemetry.vz):
|
||||
yield ("Velocity estimation is not available")
|
||||
|
||||
if telemetry.vx >= speed_limit:
|
||||
failures.append("X velocity estimation: {:.3f} m/s".format(telemetry.vx))
|
||||
yield ("X velocity estimation: {:.3f} m/s".format(telemetry.vx))
|
||||
if telemetry.vy >= speed_limit:
|
||||
failures.append("Y velocity estimation: {:.3f} m/s".format(telemetry.vy))
|
||||
yield ("Y velocity estimation: {:.3f} m/s".format(telemetry.vy))
|
||||
if telemetry.vz >= speed_limit:
|
||||
failures.append("Z velocity estimation: {:.3f} m/s".format(telemetry.vz))
|
||||
return failures
|
||||
yield ("Z velocity estimation: {:.3f} m/s".format(telemetry.vz))
|
||||
|
||||
|
||||
@check("Angular velocity estimation")
|
||||
def check_angular_speeds():
|
||||
failures = []
|
||||
def check_angular_speeds(rate_limit=0.05):
|
||||
telemetry = get_telemetry(frame_id='body')
|
||||
rate_limit = 0.05
|
||||
|
||||
if _check_nans(telemetry.pitch_rate, telemetry.roll_rate, telemetry.yaw_rate):
|
||||
yield ("Angular velocities estimation is not available")
|
||||
|
||||
if telemetry.pitch_rate >= rate_limit:
|
||||
failures.append("Pitch rate estimation: {:.3f} rad/s".format(telemetry.pitch_rate))
|
||||
yield ("Pitch rate estimation: {:.3f} rad/s".format(telemetry.pitch_rate))
|
||||
if telemetry.roll_rate >= rate_limit:
|
||||
failures.append("Roll rate estimation: {:.3f} rad/s".format(telemetry.roll_rate))
|
||||
yield ("Roll rate estimation: {:.3f} rad/s".format(telemetry.roll_rate))
|
||||
if telemetry.yaw_rate >= rate_limit:
|
||||
failures.append("Yaw rate estimation: {:.3f} rad/s".format(telemetry.yaw_rate))
|
||||
return failures
|
||||
yield ("Yaw rate estimation: {:.3f} rad/s".format(telemetry.yaw_rate))
|
||||
|
||||
|
||||
@check("Angles estimation")
|
||||
def check_angles():
|
||||
failures = []
|
||||
def check_angles(angle_limit=math.radians(5)):
|
||||
telemetry = get_telemetry(frame_id='body')
|
||||
angle_limit = math.radians(5)
|
||||
|
||||
if _check_nans(telemetry.pitch, telemetry.roll, telemetry.yaw):
|
||||
yield ("Angular velocities estimation is not available")
|
||||
|
||||
if abs(telemetry.pitch) >= angle_limit:
|
||||
failures.append("Pitch estimation: {:.3f} rad;{:.3f} degrees".format(telemetry.pitch,
|
||||
math.degrees(telemetry.pitch)))
|
||||
yield ("Pitch estimation: {:.3f} rad;{:.3f} degrees".format(telemetry.pitch,
|
||||
math.degrees(telemetry.pitch)))
|
||||
if abs(telemetry.roll) >= angle_limit:
|
||||
failures.append("Roll estimation: {:.3f} rad;{:.3f} degrees".format(telemetry.roll,
|
||||
math.degrees(telemetry.roll)))
|
||||
yield ("Roll estimation: {:.3f} rad;{:.3f} degrees".format(telemetry.roll,
|
||||
math.degrees(telemetry.roll)))
|
||||
if abs(telemetry.yaw) >= angle_limit:
|
||||
failures.append("Yaw estimation: {:.3f} rad;{:.3f} degrees".format(telemetry.yaw,
|
||||
math.degrees(telemetry.yaw)))
|
||||
return failures
|
||||
yield ("Yaw estimation: {:.3f} rad;{:.3f} degrees".format(telemetry.yaw,
|
||||
math.degrees(telemetry.yaw)))
|
||||
|
||||
|
||||
def selfcheck():
|
||||
msgs = []
|
||||
linear_speeds = check_linear_speeds()
|
||||
angular_speeds = check_angular_speeds()
|
||||
angles = check_angles()
|
||||
if linear_speeds:
|
||||
msgs.extend(linear_speeds)
|
||||
if angular_speeds:
|
||||
msgs.extend(angular_speeds)
|
||||
if angles:
|
||||
msgs.extend(angles)
|
||||
checks = []
|
||||
for check_f in checklist:
|
||||
msg = check_f()
|
||||
checks += msg if msg else []
|
||||
|
||||
return msgs
|
||||
return checks
|
||||
|
||||
|
||||
def navto(x, y, z, yaw=float('nan'), frame_id='aruco_map'):
|
||||
def navto(x, y, z, yaw=float('nan'), frame_id=FRAME_ID, **kwargs):
|
||||
set_position(frame_id=frame_id, x=x, y=y, z=z, yaw=yaw)
|
||||
telemetry = get_telemetry(frame_id=frame_id)
|
||||
|
||||
module_logger.info('Going to: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(x, y, z, yaw))
|
||||
module_logger.info('Telemetry now: | z: {:.3f}'.format(telemetry.z))
|
||||
logger.info('Going to: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(x, y, z, yaw))
|
||||
print('Going to: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(x, y, z, yaw))
|
||||
logger.info('Telemetry now: | z: {:.3f}'.format(telemetry.z))
|
||||
print('Telemetry now: | z: {:.3f}'.format(telemetry.z))
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def reach_point(x=0.0, y=0.0, z=0.0, yaw=float('nan'), speed=1.0, tolerance=TOLERANCE, frame_id='aruco_map',
|
||||
freq=FREQUENCY, timeout=5000, wait=False):
|
||||
|
||||
module_logger.info('Reaching point: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(x, y, z, yaw))
|
||||
def reach_point(x=0.0, y=0.0, z=0.0, yaw=float('nan'), speed=SPEED, tolerance=TOLERANCE, frame_id=FRAME_ID,
|
||||
freq=FREQUENCY, timeout=TIMEOUT, interrupter=INTERRUPTER, wait=False):
|
||||
logger.info('Reaching point: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(x, y, z, yaw))
|
||||
print('Reaching point: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(x, y, z, yaw))
|
||||
navigate(frame_id=frame_id, x=x, y=y, z=z, yaw=yaw, speed=speed)
|
||||
|
||||
# waiting for completion
|
||||
telemetry = get_telemetry(frame_id=frame_id)
|
||||
rate = rospy.Rate(freq)
|
||||
time_start = rospy.get_rostime()
|
||||
time_start = time.time()
|
||||
|
||||
while (get_distance3d(x, y, z, telemetry.x, telemetry.y, telemetry.z) > tolerance) or wait:
|
||||
if interrupt_event.is_set():
|
||||
module_logger.warning("Flight function interrupted!")
|
||||
interrupt_event.clear()
|
||||
break
|
||||
if interrupter.is_set():
|
||||
logger.warning("Reach point function interrupted!")
|
||||
print("Reach point function interrupted!")
|
||||
interrupter.clear()
|
||||
return False
|
||||
|
||||
telemetry = get_telemetry(frame_id=frame_id)
|
||||
module_logger.info('Telemetry: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(
|
||||
logger.info('Telemetry: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(
|
||||
telemetry.x, telemetry.y, telemetry.z, telemetry.yaw))
|
||||
print('Telemetry: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(
|
||||
telemetry.x, telemetry.y, telemetry.z, telemetry.yaw))
|
||||
logger.info('Current delta: | {:.3f}'.format(
|
||||
get_distance3d(x, y, z, telemetry.x, telemetry.y, telemetry.z)))
|
||||
print('Current delta: | {:.3f}'.format(
|
||||
get_distance3d(x, y, z, telemetry.x, telemetry.y, telemetry.z)))
|
||||
|
||||
time_passed = (rospy.get_rostime() - time_start).to_sec() * 1000
|
||||
time_passed = time.time() - time_start
|
||||
|
||||
if timeout is not None:
|
||||
if time_passed >= timeout:
|
||||
module_logger.warning('Reaching point timed out! | time: {:3f} seconds'.format(time_passed / 1000))
|
||||
logger.warning('Reaching point timed out! | time: {:3f} seconds'.format(time_passed))
|
||||
print('Reaching point timed out! | time: {:3f} seconds'.format(time_passed))
|
||||
return wait
|
||||
rate.sleep()
|
||||
else:
|
||||
module_logger.info("Point reached!")
|
||||
return True
|
||||
|
||||
logger.info("Point reached!")
|
||||
print("Point reached!")
|
||||
return True
|
||||
|
||||
|
||||
def reach_attitude(z=0.0, yaw=float('nan'), speed=1.0, tolerance=TOLERANCE, frame_id='aruco_map',
|
||||
freq=FREQUENCY, timeout=5000, wait=False):
|
||||
|
||||
module_logger.info('Reaching attitude: | z: {:.3f} yaw: {:.3f}'.format(z, yaw))
|
||||
def reach_altitude(z=0.0, yaw=float('nan'), speed=SPEED, tolerance=TOLERANCE, frame_id=FRAME_ID,
|
||||
freq=FREQUENCY, timeout=TIMEOUT, interrupter=INTERRUPTER, wait=False):
|
||||
logger.info('Reaching attitude: | z: {:.3f} yaw: {:.3f}'.format(z, yaw))
|
||||
print('Reaching attitude: | z: {:.3f} yaw: {:.3f}'.format(z, yaw))
|
||||
current_telem = get_telemetry(frame_id=frame_id)
|
||||
navigate(frame_id=frame_id, x=current_telem.x, y=current_telem.y, z=z, yaw=yaw, speed=speed)
|
||||
|
||||
# waiting for completion
|
||||
telemetry = get_telemetry(frame_id=frame_id)
|
||||
rate = rospy.Rate(freq)
|
||||
time_start = rospy.get_rostime()
|
||||
time_start = time.time()
|
||||
|
||||
while (abs(z - telemetry.z) > tolerance) or wait:
|
||||
if interrupt_event.is_set():
|
||||
module_logger.warning("Flight function interrupted!")
|
||||
interrupt_event.clear()
|
||||
break
|
||||
if interrupter.is_set():
|
||||
logger.warning("Reach altitude function interrupted!")
|
||||
print("Reach altitude function interrupted!")
|
||||
interrupter.clear()
|
||||
return
|
||||
|
||||
telemetry = get_telemetry(frame_id=frame_id)
|
||||
module_logger.info('Telemetry: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(
|
||||
logger.info('Telemetry: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(
|
||||
telemetry.x, telemetry.y, telemetry.z, telemetry.yaw))
|
||||
print('Telemetry: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(
|
||||
telemetry.x, telemetry.y, telemetry.z, telemetry.yaw))
|
||||
|
||||
time_passed = (rospy.get_rostime() - time_start).to_sec() * 1000
|
||||
time_passed = time.time() - time_start
|
||||
if timeout is not None:
|
||||
if time_passed >= timeout:
|
||||
module_logger.warning('Reaching attitude timed out! | time: {:3f} seconds'.format(time_passed / 1000))
|
||||
logger.warning('Reaching attitude timed out! | time: {:3f} seconds'.format(time_passed))
|
||||
print('Reaching attitude timed out! | time: {:3f} seconds'.format(time_passed))
|
||||
return wait
|
||||
rate.sleep()
|
||||
else:
|
||||
module_logger.info("Attitude reached!")
|
||||
return True
|
||||
|
||||
logger.info("Altitude reached!")
|
||||
print("Altitude reached!")
|
||||
return True
|
||||
|
||||
|
||||
def land(descend=True, z=1.0, frame_id_descend="aruco_map", frame_id_land="aruco_map",
|
||||
timeout_descend=5000, timeout_land=7500, freq=FREQUENCY):
|
||||
def stop(frame_id='body', hold_speed=SPEED):
|
||||
navigate(frame_id=frame_id, yaw=float('nan'), speed=hold_speed)
|
||||
|
||||
|
||||
def land(descend=True, z=Z_DESCEND, frame_id_descend=FRAME_ID, frame_id_land=FRAME_ID,
|
||||
timeout_descend=TIMEOUT_DESCEND, timeout_land=TIMEOUT_LAND, freq=FREQUENCY, interrupter=INTERRUPTER):
|
||||
if descend:
|
||||
module_logger.info("Descending to: | z: {:.3f}".format(z))
|
||||
reach_attitude(z=z, frame_id=frame_id_descend, timeout=timeout_descend, freq=freq, yaw=1.57) #TODO yaw
|
||||
logger.info("Descending to: | z: {:.3f}".format(z))
|
||||
print("Descending to: | z: {:.3f}".format(z))
|
||||
reach_altitude(z=z, frame_id=frame_id_descend, timeout=timeout_descend, freq=freq, yaw=float('nan'), # TODO yaw
|
||||
interrupter=interrupter)
|
||||
landing()
|
||||
print("Land is started")
|
||||
|
||||
telemetry = get_telemetry(frame_id='aruco_map')
|
||||
telemetry = get_telemetry(frame_id=frame_id_land)
|
||||
rate = rospy.Rate(freq)
|
||||
time_start = rospy.get_rostime()
|
||||
time_start = time.time()
|
||||
|
||||
while telemetry.armed:
|
||||
if interrupt_event.is_set():
|
||||
module_logger.warning("Flight function interrupted!")
|
||||
interrupt_event.clear()
|
||||
break
|
||||
if interrupter.is_set():
|
||||
logger.warning("Land function interrupted!")
|
||||
print("Land function interrupted!")
|
||||
interrupter.clear()
|
||||
return False
|
||||
|
||||
telemetry = get_telemetry(frame_id=frame_id_land)
|
||||
module_logger.info("Landing...")
|
||||
time_passed = (rospy.get_rostime() - time_start).to_sec() * 1000
|
||||
logger.info("Landing... | z: {}".format(telemetry.z))
|
||||
print("Landing... | z: {}".format(telemetry.z))
|
||||
time_passed = time.time() - time_start
|
||||
|
||||
if timeout_land is not None:
|
||||
if time_passed >= timeout_land:
|
||||
module_logger.warning('Landing timed out! | time: {:3f} seconds'.format(time_passed / 1000))
|
||||
module_logger.warning("Disarming!")
|
||||
logger.warning('Landing timed out! | time: {:3f} seconds'.format(time_passed))
|
||||
logger.warning("Disarming!")
|
||||
print("Landing timed out, disarming!!!")
|
||||
arming(False)
|
||||
return False
|
||||
rate.sleep()
|
||||
else:
|
||||
module_logger.info("Landing succeeded!")
|
||||
return True
|
||||
|
||||
logger.info("Landing succeeded!")
|
||||
print("Landing succeeded!")
|
||||
return True
|
||||
|
||||
|
||||
def takeoff(z=1.0, speed=0.8, frame_id='body', freq=FREQUENCY,
|
||||
timeout_arm=2000, timeout_takeoff=5000, wait=False, tolerance=TOLERANCE, emergency_land=False):
|
||||
module_logger.info("Starting takeoff!")
|
||||
def takeoff(z=Z_TAKEOFF, speed=SPEED_TAKEOFF, frame_id='body', freq=FREQUENCY,
|
||||
timeout_arm=TIMEOUT_ARMED, timeout_takeoff=TIMEOUT, wait=False, tolerance=TOLERANCE, emergency_land=False,
|
||||
interrupter=INTERRUPTER):
|
||||
logger.info("Starting takeoff!")
|
||||
print("Starting takeoff!")
|
||||
module_logger.info("Arming, going to OFFBOARD mode")
|
||||
logger.info("Arming, going to OFFBOARD mode")
|
||||
|
||||
# Arming check
|
||||
set_rates(thrust=0.1, auto_arm=True)
|
||||
telemetry = get_telemetry(frame_id=frame_id)
|
||||
rate = rospy.Rate(freq)
|
||||
time_start = rospy.get_rostime()
|
||||
time_start = time.time()
|
||||
|
||||
while (not telemetry.armed) or wait:
|
||||
if interrupt_event.is_set():
|
||||
module_logger.warning("Flight function interrupted!")
|
||||
interrupt_event.clear()
|
||||
return None
|
||||
if interrupter.is_set():
|
||||
logger.warning("Takeoff function interrupted!")
|
||||
print("Takeoff function interrupted!")
|
||||
interrupter.clear()
|
||||
return
|
||||
|
||||
telemetry = get_telemetry(frame_id=frame_id)
|
||||
module_logger.info("Arming...")
|
||||
time_passed = (rospy.get_rostime() - time_start).to_sec() * 1000
|
||||
logger.info("Arming...")
|
||||
print("Arming...")
|
||||
time_passed = time.time() - time_start
|
||||
|
||||
if timeout_arm is not None:
|
||||
if time_passed >= timeout_arm:
|
||||
if not telemetry.armed:
|
||||
module_logger.warning('Arming timed out! | time: {:3f} seconds'.format(time_passed / 1000))
|
||||
logger.warning('Arming timed out! | time: {:3f} seconds'.format(time_passed))
|
||||
print('Arming timed out! | time: {:3f} seconds'.format(time_passed))
|
||||
return False
|
||||
else:
|
||||
break
|
||||
rate.sleep()
|
||||
|
||||
module_logger.info("Armed!")
|
||||
print("Armed!")
|
||||
|
||||
# Reach height
|
||||
telemetry = get_telemetry(frame_id=frame_id)
|
||||
z0 = get_telemetry().z
|
||||
navigate(z=z, speed=speed, frame_id=frame_id, auto_arm=True)
|
||||
current_height = abs(get_telemetry().z - z0 - z)
|
||||
while current_height > tolerance or wait:
|
||||
if interrupt_event.is_set():
|
||||
module_logger.warning("Flight function interrupted!")
|
||||
interrupt_event.clear()
|
||||
return None
|
||||
logger.info("Armed!")
|
||||
print("Armed")
|
||||
|
||||
current_height = abs(get_telemetry().z - z0 - z)
|
||||
module_logger.info("Takeoff...")
|
||||
time_passed = (rospy.get_rostime() - time_start).to_sec() * 1000
|
||||
# Reach height
|
||||
z0 = get_telemetry().z
|
||||
z_dest = z + z0
|
||||
navigate(z=z, speed=speed, frame_id=frame_id, auto_arm=True)
|
||||
current_diff = abs(get_telemetry().z - z_dest)
|
||||
while (current_diff > tolerance) or wait:
|
||||
if interrupter.is_set():
|
||||
logger.warning("Flight function interrupted!")
|
||||
print("Flight function interrupted!")
|
||||
interrupter.clear()
|
||||
return
|
||||
|
||||
current_diff = abs(get_telemetry().z - z_dest)
|
||||
logger.info("Takeoff...")
|
||||
print("Takeoff...")
|
||||
time_passed = time.time() - time_start
|
||||
|
||||
if timeout_takeoff is not None:
|
||||
if time_passed >= timeout_takeoff:
|
||||
if not wait:
|
||||
module_logger.warning('Takeoff timed out! | time: {:3f} seconds'.format(time_passed / 1000))
|
||||
logger.warning('Takeoff timed out! | time: {:3f} seconds'.format(time_passed))
|
||||
print('Takeoff timed out! | time: {:3f} seconds'.format(time_passed))
|
||||
if emergency_land:
|
||||
module_logger.info("Preforming emergency land")
|
||||
land(descend=False)
|
||||
logger.info("Preforming emergency land")
|
||||
print("Preforming emergency land")
|
||||
land(descend=False, interrupter=interrupter)
|
||||
return False
|
||||
else:
|
||||
break
|
||||
rate.sleep()
|
||||
|
||||
module_logger.info("Takeoff succeeded!")
|
||||
logger.info("Takeoff succeeded!")
|
||||
print("Takeoff succeeded!")
|
||||
return True
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
from __future__ import print_function
|
||||
from threading import Thread
|
||||
import threading
|
||||
import time
|
||||
from rpi_ws281x import *
|
||||
|
||||
from tasking_lib import wait as wait_until
|
||||
# LED strip configuration:
|
||||
LED_COUNT = 29 # Number of LED pixels.
|
||||
LED_PIN = 21 # GPIO pin connected to the pixels (18 uses PWM!) (10 uses SPI /dev/spidev0.0).
|
||||
@@ -29,6 +29,15 @@ direct = False
|
||||
l = 0
|
||||
wait_ms = 10
|
||||
|
||||
INTERRUPTER = threading.Event()
|
||||
INTERRUPTER_UNSET = threading.Event()
|
||||
|
||||
def delay(delay_time, interrupter=INTERRUPTER, maxsleep=0.01):
|
||||
global mode
|
||||
wait_until(time.time()+delay_time, interrupter, maxsleep)
|
||||
if interrupter.is_set():
|
||||
mode = "off"
|
||||
|
||||
|
||||
# functions
|
||||
def math_wheel(pos):
|
||||
@@ -43,52 +52,57 @@ def math_wheel(pos):
|
||||
return Color(0, pos * 3, 255 - pos * 3)
|
||||
|
||||
|
||||
def rainbow(wait=10, direction=False):
|
||||
global wait_ms, direct, mode
|
||||
def rainbow(wait=10, direction=False, interrupter=INTERRUPTER):
|
||||
global wait_ms, direct, mode, INTERRUPTER
|
||||
wait_ms = wait
|
||||
direct = direction
|
||||
mode = "rainbow"
|
||||
INTERRUPTER=interrupter
|
||||
|
||||
|
||||
def fill(red, green, blue):
|
||||
global r, g, b, mode
|
||||
def fill(red, green, blue, interrupter=INTERRUPTER):
|
||||
global r, g, b, mode, INTERRUPTER
|
||||
r = red
|
||||
g = green
|
||||
b = blue
|
||||
mode = "fill"
|
||||
INTERRUPTER=interrupter
|
||||
|
||||
|
||||
def blink(red, green, blue, wait=250):
|
||||
global r, g, b, wait_ms, mode
|
||||
def blink(red, green, blue, wait=250, interrupter=INTERRUPTER):
|
||||
global r, g, b, wait_ms, mode, INTERRUPTER
|
||||
r = red
|
||||
g = green
|
||||
b = blue
|
||||
wait_ms = wait
|
||||
mode = "blink"
|
||||
INTERRUPTER=interrupter
|
||||
|
||||
|
||||
def chase(red, green, blue, wait=50, direction=False):
|
||||
global r, g, b, wait_ms, direct, mode
|
||||
def chase(red, green, blue, wait=50, direction=False, interrupter=INTERRUPTER):
|
||||
global r, g, b, wait_ms, direct, mode, INTERRUPTER
|
||||
r = red
|
||||
g = green
|
||||
b = blue
|
||||
wait_ms = wait
|
||||
direct = direction
|
||||
mode = "chase"
|
||||
INTERRUPTER=interrupter
|
||||
|
||||
|
||||
def wipe_to(red, green, blue, wait=50, direction=False):
|
||||
global r, g, b, wait_ms, direct, mode
|
||||
def wipe_to(red, green, blue, wait=50, direction=False, interrupter=INTERRUPTER):
|
||||
global r, g, b, wait_ms, direct, mode, INTERRUPTER
|
||||
r = red
|
||||
g = green
|
||||
b = blue
|
||||
wait_ms = wait
|
||||
direct = direction
|
||||
mode = "wipe_to"
|
||||
INTERRUPTER=interrupter
|
||||
|
||||
|
||||
def fade_to(red, green, blue, wait=20): # do not working with rainbow (solid colors only)
|
||||
global r, g, b, r_prev, g_prev, b_prev, wait_ms, mode
|
||||
def fade_to(red, green, blue, wait=20, interrupter=INTERRUPTER): # do not working with rainbow (solid colors only)
|
||||
global r, g, b, r_prev, g_prev, b_prev, wait_ms, mode, INTERRUPTER
|
||||
r_prev = r
|
||||
g_prev = g
|
||||
b_prev = b
|
||||
@@ -97,10 +111,11 @@ def fade_to(red, green, blue, wait=20): # do not working with rainbow (solid co
|
||||
b = blue
|
||||
wait_ms = wait
|
||||
mode = "fade_to"
|
||||
INTERRUPTER=interrupter
|
||||
|
||||
|
||||
def run(red, green, blue, length=strip.numPixels(), direction=False, wait=25):
|
||||
global r, g, b, l, wait_ms, direct, mode
|
||||
def run(red, green, blue, length=strip.numPixels(), direction=False, wait=25, interrupter=INTERRUPTER):
|
||||
global r, g, b, l, wait_ms, direct, mode, INTERRUPTER
|
||||
r = red
|
||||
g = green
|
||||
b = blue
|
||||
@@ -108,6 +123,7 @@ def run(red, green, blue, length=strip.numPixels(), direction=False, wait=25):
|
||||
direct = direction
|
||||
wait_ms = wait
|
||||
mode = "run"
|
||||
INTERRUPTER=interrupter
|
||||
|
||||
|
||||
def off():
|
||||
@@ -128,27 +144,29 @@ def strip_rainbow_frame(iteration, direction):
|
||||
strip.show()
|
||||
|
||||
|
||||
def strip_chase_step(color, direction):
|
||||
def strip_chase_step(color, direction, interrupter=INTERRUPTER):
|
||||
for q in range(3):
|
||||
for i in range(0, strip.numPixels(), 3):
|
||||
n = ((strip.numPixels() - 1) * direction) - (i + q)
|
||||
strip.setPixelColor(abs(n), color)
|
||||
strip.show()
|
||||
time.sleep(wait_ms / 1000.0)
|
||||
delay(wait_ms / 1000.0, interrupter)
|
||||
for i in range(0, strip.numPixels(), 3):
|
||||
n = ((strip.numPixels() - 1) * direction) - (i + q)
|
||||
strip.setPixelColor(abs(n), 0)
|
||||
|
||||
|
||||
def strip_wipe(color, direction):
|
||||
def strip_wipe(color, direction, interrupter=INTERRUPTER):
|
||||
for i in range(strip.numPixels()):
|
||||
n = ((strip.numPixels() - 1) * direction) - i
|
||||
strip.setPixelColor(abs(n), color)
|
||||
time.sleep(wait_ms / 1000.0)
|
||||
delay(wait_ms / 1000.0, interrupter)
|
||||
if interrupter.is_set():
|
||||
return
|
||||
strip.show()
|
||||
|
||||
|
||||
def strip_fade(r1, g1, b1, r2, g2, b2, frames=50):
|
||||
def strip_fade(r1, g1, b1, r2, g2, b2, frames=50, interrupter=INTERRUPTER):
|
||||
r_delta = (r2-r1)//frames
|
||||
g_delta = (g2-g1)//frames
|
||||
b_delta = (b2-b1)//frames
|
||||
@@ -157,7 +175,9 @@ def strip_fade(r1, g1, b1, r2, g2, b2, frames=50):
|
||||
green = g1 + (g_delta * i)
|
||||
blue = b1 + (b_delta * i)
|
||||
strip_set(Color(red, green, blue))
|
||||
time.sleep(wait_ms / 1000.0)
|
||||
delay(wait_ms / 1000.0, interrupter)
|
||||
if interrupter.is_set():
|
||||
return
|
||||
strip_set(Color(r2, g2, b2))
|
||||
|
||||
|
||||
@@ -190,39 +210,39 @@ def led_thread():
|
||||
if iteration >= 256:
|
||||
iteration = 0
|
||||
strip_rainbow_frame(iteration, direct)
|
||||
time.sleep(wait_ms / 1000.0)
|
||||
delay(wait_ms / 1000.0, INTERRUPTER)
|
||||
iteration += 1
|
||||
elif mode == "fill":
|
||||
strip_set(Color(r, g, b))
|
||||
mode = ""
|
||||
elif mode == "blink":
|
||||
strip_set(Color(r, g, b))
|
||||
time.sleep(wait_ms / 1000.0)
|
||||
delay(wait_ms / 1000.0, INTERRUPTER)
|
||||
strip_set(Color(0, 0, 0))
|
||||
time.sleep(wait_ms / 1000.0)
|
||||
delay(wait_ms / 1000.0, INTERRUPTER)
|
||||
elif mode == "chase":
|
||||
strip_chase_step(Color(r, g, b), direct)
|
||||
elif mode == "wipe_to":
|
||||
strip_wipe(Color(r, g, b,), direct)
|
||||
strip_wipe(Color(r, g, b,), direct, INTERRUPTER)
|
||||
mode = "fill"
|
||||
elif mode == "fade_to":
|
||||
strip_fade(r_prev, g_prev, b_prev, r, g, b)
|
||||
strip_fade(r_prev, g_prev, b_prev, r, g, b, interrupter=INTERRUPTER)
|
||||
mode = ""
|
||||
elif mode == "run":
|
||||
strip_run_step(r, g, b, l, direct, iteration)
|
||||
time.sleep(wait_ms / 1000.0)
|
||||
delay(wait_ms / 1000.0, INTERRUPTER)
|
||||
iteration += 1
|
||||
elif mode == "off":
|
||||
strip_off()
|
||||
mode = ""
|
||||
else:
|
||||
time.sleep(1 / 1000)
|
||||
delay(1 / 1000.0, interrupter=INTERRUPTER_UNSET)
|
||||
|
||||
|
||||
# init
|
||||
def init_led():
|
||||
strip.begin()
|
||||
t_l = Thread(target=led_thread)
|
||||
t_l = threading.Thread(target=led_thread)
|
||||
t_l.daemon = True
|
||||
t_l.start()
|
||||
|
||||
|
||||
91
Drone/animation_lib.py
Normal file
91
Drone/animation_lib.py
Normal file
@@ -0,0 +1,91 @@
|
||||
import time
|
||||
import csv
|
||||
import rospy
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from FlightLib import FlightLib
|
||||
from FlightLib import LedLib
|
||||
|
||||
import tasking_lib as tasking
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
interrupt_event = threading.Event()
|
||||
|
||||
|
||||
def load_animation(filepath="animation.csv", x0=0, y0=0, z0=0):
|
||||
imported_frames = []
|
||||
try:
|
||||
animation_file = open(filepath)
|
||||
except IOError:
|
||||
logging.error("File {} can't be opened".format(filepath))
|
||||
else:
|
||||
with animation_file:
|
||||
csv_reader = csv.reader(
|
||||
animation_file, delimiter=',', quotechar='|'
|
||||
)
|
||||
for row in csv_reader:
|
||||
frame_number, x, y, z, yaw, red, green, blue = row
|
||||
imported_frames.append({
|
||||
'number': int(frame_number),
|
||||
'x': float(x) + x0,
|
||||
'y': float(y) + y0,
|
||||
'z': float(z) + z0,
|
||||
'yaw': float(yaw),
|
||||
'red': int(red),
|
||||
'green': int(green),
|
||||
'blue': int(blue),
|
||||
})
|
||||
return imported_frames
|
||||
|
||||
|
||||
def convert_frame(frame):
|
||||
return (frame['x'], frame['y'], frame['z']), (frame["red"], frame["green"], frame["blue"]), frame["yaw"]
|
||||
|
||||
|
||||
def execute_frame(point=(), color=(), yaw=float('Nan'), frame_id='aruco_map', use_leds=True,
|
||||
flight_func=FlightLib.navto, flight_kwargs={}, interrupter=interrupt_event):
|
||||
flight_func(*point, yaw=yaw, frame_id=frame_id, interrupter=interrupt_event, **flight_kwargs)
|
||||
if use_leds:
|
||||
if color:
|
||||
LedLib.fill(*color)
|
||||
|
||||
|
||||
def execute_animation(frames, frame_delay, frame_id='aruco_map', use_leds=True, flight_func=FlightLib.navto,
|
||||
interrupter=interrupt_event):
|
||||
next_frame_time = 0
|
||||
for frame in frames:
|
||||
if interrupter.is_set():
|
||||
logger.warning("Animation playing function interrupted!")
|
||||
interrupter.clear()
|
||||
return
|
||||
execute_frame(*convert_frame(frame), frame_id=frame_id, use_leds=use_leds, flight_func=flight_func,
|
||||
interrupter=interrupter)
|
||||
|
||||
next_frame_time += frame_delay
|
||||
tasking.wait(next_frame_time, interrupter)
|
||||
|
||||
|
||||
def takeoff(z=1.5, safe_takeoff=True, timeout=5000, frame_id='aruco_map', use_leds=True,
|
||||
interrupter=interrupt_event):
|
||||
print(interrupter.is_set())
|
||||
if use_leds:
|
||||
LedLib.wipe_to(255, 0, 0, interrupter=interrupter)
|
||||
if interrupter.is_set():
|
||||
return
|
||||
FlightLib.takeoff(z=z, wait=False, timeout_takeoff=timeout, frame_id=frame_id, emergency_land=safe_takeoff,
|
||||
interrupter=interrupter)
|
||||
if interrupter.is_set():
|
||||
return
|
||||
if use_leds:
|
||||
LedLib.blink(0, 255, 0, wait=50, interrupter=interrupter)
|
||||
|
||||
|
||||
def land(z=1.5, descend=False, timeout=5000, frame_id='aruco_map', use_leds=True,
|
||||
interrupter=interrupt_event):
|
||||
if use_leds:
|
||||
LedLib.blink(255, 0, 0, interrupter=interrupter)
|
||||
FlightLib.land(z=z, descend=descend, timeout_land=timeout, frame_id_land=frame_id, interrupter=interrupter)
|
||||
if use_leds:
|
||||
LedLib.off()
|
||||
511
Drone/client.py
511
Drone/client.py
@@ -1,358 +1,225 @@
|
||||
from __future__ import print_function
|
||||
import os
|
||||
import sys
|
||||
import socket
|
||||
import struct
|
||||
import random
|
||||
import time
|
||||
import errno
|
||||
import json
|
||||
import random
|
||||
import socket
|
||||
import struct
|
||||
import logging
|
||||
import threading
|
||||
import collections
|
||||
import ConfigParser
|
||||
import selectors2 as selectors
|
||||
|
||||
from contextlib import closing
|
||||
|
||||
import rospy
|
||||
import pause
|
||||
import os,sys,inspect # Add parent dir to PATH to import messaging_lib
|
||||
current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
|
||||
parent_dir = os.path.dirname(current_dir)
|
||||
sys.path.insert(0, parent_dir)
|
||||
|
||||
from FlightLib import FlightLib
|
||||
from FlightLib import LedLib
|
||||
|
||||
import play_animation
|
||||
|
||||
random.seed()
|
||||
import messaging_lib as messaging
|
||||
|
||||
logging.basicConfig( # TODO all prints as logs
|
||||
level=logging.INFO,
|
||||
level=logging.DEBUG, # INFO
|
||||
format="%(asctime)s [%(name)-7.7s] [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s",
|
||||
handlers=[
|
||||
logging.FileHandler("client_logs.log"),
|
||||
logging.StreamHandler()
|
||||
logging.StreamHandler(),
|
||||
])
|
||||
|
||||
NTP_PACKET_FORMAT = "!12I"
|
||||
NTP_DELTA = 2208988800L # 1970-01-01 00:00:00
|
||||
NTP_QUERY = '\x1b' + 47 * '\0'
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
ConfigOption = collections.namedtuple("ConfigOption", ["section", "option", "value"])
|
||||
|
||||
def get_ntp_time(ntp_host, ntp_port):
|
||||
with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s:
|
||||
s.sendto(NTP_QUERY, (ntp_host, ntp_port))
|
||||
msg, address = s.recvfrom(1024)
|
||||
unpacked = struct.unpack(NTP_PACKET_FORMAT, msg[0:struct.calcsize(NTP_PACKET_FORMAT)])
|
||||
return unpacked[10] + float(unpacked[11]) / 2**32 - NTP_DELTA
|
||||
active_client = None # maybe needs to be refactored
|
||||
|
||||
class Client(object):
|
||||
def __init__(self, config_path="client_config.ini"):
|
||||
self.selector = selectors.DefaultSelector()
|
||||
self.client_socket = None
|
||||
|
||||
def reconnect(timeout=2, attempt_limit=5):
|
||||
global clientSocket, host, port
|
||||
print("Trying to connect to", host, ":", port, "...")
|
||||
connected = False
|
||||
attempt_count = 0
|
||||
while not connected:
|
||||
print("Waiting for connection, attempt", attempt_count)
|
||||
self.server_connection = messaging.ConnectionManager()
|
||||
|
||||
self.server_host = None
|
||||
self.server_port = None
|
||||
self.broadcast_port = None
|
||||
|
||||
self.connected = False
|
||||
self.client_id = None
|
||||
|
||||
# Init configs
|
||||
self.config_path = config_path
|
||||
self.config = ConfigParser.ConfigParser()
|
||||
self.load_config()
|
||||
|
||||
global active_client
|
||||
active_client = self
|
||||
|
||||
def load_config(self):
|
||||
self.config.read(self.config_path)
|
||||
|
||||
self.broadcast_port = self.config.getint('SERVER', 'broadcast_port')
|
||||
self.server_port = self.config.getint('SERVER', 'port')
|
||||
self.server_host = self.config.get('SERVER', 'host')
|
||||
self.BUFFER_SIZE = self.config.getint('SERVER', 'buffer_size')
|
||||
self.USE_NTP = self.config.getboolean('NTP', 'use_ntp')
|
||||
self.NTP_HOST = self.config.get('NTP', 'host')
|
||||
self.NTP_PORT = self.config.getint('NTP', 'port')
|
||||
|
||||
self.files_directory = self.config.get('FILETRANSFER', 'files_directory')
|
||||
|
||||
self.client_id = self.config.get('PRIVATE', 'id')
|
||||
if self.client_id == 'default':
|
||||
self.client_id = 'copter' + str(random.randrange(9999)).zfill(4)
|
||||
self.write_config(False, 'PRIVATE', 'id', self.client_id)
|
||||
elif self.client_id == '/hostname':
|
||||
self.client_id = socket.gethostname()
|
||||
|
||||
def rewrite_config(self):
|
||||
with open(self.config_path, 'w') as file:
|
||||
self.config.write(file)
|
||||
|
||||
def write_config(self, reload_config=True, *config_options):
|
||||
for config_option in config_options:
|
||||
self.config.set(config_option.section, config_option.option, config_option.value)
|
||||
self.rewrite_config()
|
||||
|
||||
if reload_config:
|
||||
self.load_config()
|
||||
|
||||
@staticmethod
|
||||
def get_ntp_time(ntp_host, ntp_port):
|
||||
NTP_PACKET_FORMAT = "!12I"
|
||||
NTP_DELTA = 2208988800L # 1970-01-01 00:00:00
|
||||
NTP_QUERY = '\x1b' + 47 * '\0'
|
||||
|
||||
with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s:
|
||||
s.sendto(bytes(NTP_QUERY), (ntp_host, ntp_port))
|
||||
msg, address = s.recvfrom(1024)
|
||||
unpacked = struct.unpack(NTP_PACKET_FORMAT, msg[0:struct.calcsize(NTP_PACKET_FORMAT)])
|
||||
return unpacked[10] + float(unpacked[11]) / 2 ** 32 - NTP_DELTA
|
||||
|
||||
def time_now(self):
|
||||
if self.USE_NTP:
|
||||
timenow = self.get_ntp_time(self.NTP_HOST, self.NTP_PORT)
|
||||
else:
|
||||
timenow = time.time()
|
||||
return timenow
|
||||
|
||||
def start(self):
|
||||
logger.info("Starting client")
|
||||
try:
|
||||
clientSocket = socket.socket()
|
||||
clientSocket.settimeout(timeout)
|
||||
clientSocket.connect((host, port))
|
||||
connected = True
|
||||
print("Connection successful")
|
||||
clientSocket.settimeout(None)
|
||||
except socket.error as e:
|
||||
if e.errno != errno.EINTR:
|
||||
print("Waiting for connection, can not connect:", e)
|
||||
time.sleep(timeout)
|
||||
else:
|
||||
print("Shutting down on keyboard interrupt")
|
||||
raise KeyboardInterrupt
|
||||
attempt_count += 1
|
||||
|
||||
if attempt_count >= attempt_limit:
|
||||
print("Too many attempts. Trying to get new server IP")
|
||||
broadcast_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
broadcast_client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||
broadcast_client.bind(("", broadcast_port))
|
||||
while True:
|
||||
data, addr = broadcast_client.recvfrom(1024)
|
||||
print("Received broadcast message %s from %s" % (data, addr))
|
||||
command, args = parse_message(data.decode("UTF-8"))
|
||||
if command == "server_ip":
|
||||
host, port = args["host"], int(args["port"])
|
||||
print("Binding to new IP: ", host, port)
|
||||
broadcast_client.close()
|
||||
write_to_config("SERVER", "port", port)
|
||||
write_to_config("SERVER", "host", host)
|
||||
attempt_count = 0
|
||||
break
|
||||
self._reconnect()
|
||||
self._process_connections()
|
||||
|
||||
def send_all(msg):
|
||||
clientSocket.sendall(struct.pack('>I', len(msg)) + msg)
|
||||
except (KeyboardInterrupt, ):
|
||||
logger.critical("Caught interrupt, exiting!")
|
||||
self.selector.close()
|
||||
|
||||
def _reconnect(self, timeout=2, attempt_limit=5):
|
||||
logger.info("Trying to connect to {}:{} ...".format(self.server_host, self.server_port))
|
||||
attempt_count = 0
|
||||
while not self.connected:
|
||||
logger.info("Waiting for connection, attempt {}".format(attempt_count))
|
||||
try:
|
||||
self.client_socket = socket.socket()
|
||||
self.client_socket.settimeout(timeout)
|
||||
self.client_socket.connect((self.server_host, self.server_port))
|
||||
except socket.error as error:
|
||||
if isinstance(error, OSError):
|
||||
if error.errno == errno.EINTR:
|
||||
logger.critical("Shutting down on keyboard interrupt")
|
||||
raise KeyboardInterrupt
|
||||
|
||||
def recive_all(n):
|
||||
data = b''
|
||||
while len(data) < n:
|
||||
packet = clientSocket.recv(min(n - len(data), BUFFER_SIZE))
|
||||
print("Receiving packet {}; full data is {}".format(packet, data))
|
||||
if not packet:
|
||||
return None
|
||||
data += packet
|
||||
return data
|
||||
logger.warning("Can not connect due error: {}".format(error))
|
||||
attempt_count += 1
|
||||
time.sleep(timeout)
|
||||
|
||||
|
||||
def recive_message():
|
||||
raw_msglen = recive_all(4)
|
||||
if not raw_msglen:
|
||||
print("No valid msg")
|
||||
return None
|
||||
msglen = struct.unpack('>I', raw_msglen)[0]
|
||||
msg = recive_all(msglen)
|
||||
return msg
|
||||
|
||||
|
||||
def form_message(str_command, dict_arguments):
|
||||
msg_dict = {str_command: dict_arguments}
|
||||
msg = json.dumps(msg_dict)
|
||||
return msg
|
||||
|
||||
|
||||
def parse_message(msg):
|
||||
try:
|
||||
j_message = json.loads(msg)
|
||||
except ValueError:
|
||||
print("Json string not in correct format")
|
||||
return None, None
|
||||
str_command = list(j_message.keys())[0]
|
||||
dict_arguments = list(j_message.values())[0]
|
||||
|
||||
return str_command, dict_arguments
|
||||
|
||||
|
||||
def recive_file(filename):
|
||||
print("Receiving file:", filename)
|
||||
with open(filename, 'wb') as file: # TODO add directory
|
||||
while True:
|
||||
data = recive_message() #clientSocket.recv(BUFFER_SIZE)
|
||||
if data:
|
||||
print(data)
|
||||
if parse_message(data.decode("UTF-8"))[0] == "/endoffile":
|
||||
print("File received")
|
||||
break
|
||||
file.write(data)
|
||||
else:
|
||||
logger.info("Connection to server successful!")
|
||||
self._connect()
|
||||
break
|
||||
|
||||
|
||||
def animation_player(running_event, stop_event):
|
||||
print("Animation thread activated")
|
||||
frames = play_animation.read_animation_file()
|
||||
if not frames:
|
||||
logging.error("Animation is empty, shutting down animation player")
|
||||
return
|
||||
|
||||
delay_time = 0.125
|
||||
|
||||
print("Takeoff")
|
||||
play_animation.takeoff(z=TAKEOFF_HEIGHT, safe_takeoff=SAFE_TAKEOFF)
|
||||
takeoff_time = starttime + TAKEOFF_TIME
|
||||
dt = takeoff_time - time.time()
|
||||
print("Wait until takeoff " + str(dt) + "s: " + time.ctime(takeoff_time))
|
||||
pause.until(takeoff_time)
|
||||
|
||||
print("Reach first point")
|
||||
play_animation.reach_frame(frames[0], x0=X0+X0_COMMON, y0=Y0+Y0_COMMON) #Reach first point at the same time with others
|
||||
rfp_time = takeoff_time + RFP_TIME
|
||||
dt = rfp_time - time.time()
|
||||
print("Wait reaching first point " + str(dt) + "s: " + time.ctime(rfp_time))
|
||||
pause.until(rfp_time)
|
||||
|
||||
next_frame_time = rfp_time
|
||||
print("Start animation at " + str(time.time()))
|
||||
for frame in frames:
|
||||
#running_event.wait()
|
||||
play_animation.animate_frame(frame, x0=X0+X0_COMMON, y0=Y0+Y0_COMMON)
|
||||
next_frame_time += delay_time
|
||||
if stop_event.is_set():
|
||||
running_animation_event.clear()
|
||||
break
|
||||
#rate.sleep()
|
||||
pause.until(next_frame_time)
|
||||
else:
|
||||
play_animation.land()
|
||||
print("Animation ended")
|
||||
print("Animation thread closed")
|
||||
if attempt_count >= attempt_limit:
|
||||
logger.info("Too many attempts. Trying to get new server IP")
|
||||
self.broadcast_bind()
|
||||
attempt_count = 0
|
||||
|
||||
|
||||
stop_animation_event = threading.Event()
|
||||
running_animation_event = threading.Event()
|
||||
def _connect(self):
|
||||
self.connected = True
|
||||
self.client_socket.setblocking(False)
|
||||
events = selectors.EVENT_READ | selectors.EVENT_WRITE
|
||||
self.selector.register(self.client_socket, events, data=self.server_connection)
|
||||
self.server_connection.connect(self.selector, self.client_socket, (self.server_host, self.server_port))
|
||||
self._process_connections()
|
||||
|
||||
|
||||
def start_animation(*args, **kwargs):
|
||||
animation_thread = threading.Thread(target=animation_player, args=(running_animation_event, stop_animation_event))
|
||||
print("Starting animation!")
|
||||
running_animation_event.set()
|
||||
stop_animation_event.clear()
|
||||
animation_thread.start()
|
||||
|
||||
|
||||
def resume_animation():
|
||||
print("Resuming animation")
|
||||
running_animation_event.set()
|
||||
|
||||
|
||||
def pause_animation():
|
||||
print("Pausing animation")
|
||||
running_animation_event.clear()
|
||||
|
||||
|
||||
def stop_animation():
|
||||
stop_animation_event.set()
|
||||
print("Stopping animation")
|
||||
# animation_thread.join()
|
||||
|
||||
|
||||
def selfcheck():
|
||||
return FlightLib.selfcheck()
|
||||
|
||||
|
||||
def write_to_config(section, option, value):
|
||||
config.set(section, option, value)
|
||||
with open(CONFIG_PATH, 'w') as file: # TODO as separate function
|
||||
config.write(file)
|
||||
|
||||
|
||||
def load_config():
|
||||
global config, CONFIG_PATH
|
||||
global broadcast_port, port, host, BUFFER_SIZE
|
||||
global USE_NTP, NTP_HOST, NTP_PORT
|
||||
global files_directory, animation_file
|
||||
global FRAME_ID, TAKEOFF_HEIGHT, TAKEOFF_TIME, SAFE_TAKEOFF, RFP_TIME
|
||||
global USE_LEDS, COPTER_ID
|
||||
global X0, X0_COMMON, Y0, Y0_COMMON
|
||||
CONFIG_PATH = "client_config.ini"
|
||||
config = ConfigParser.ConfigParser()
|
||||
config.read(CONFIG_PATH)
|
||||
|
||||
broadcast_port = config.getint('SERVER', 'broadcast_port')
|
||||
port = config.getint('SERVER', 'port')
|
||||
host = config.get('SERVER', 'host')
|
||||
BUFFER_SIZE = config.getint('SERVER', 'buffer_size')
|
||||
USE_NTP = config.getboolean('NTP', 'use_ntp')
|
||||
NTP_HOST = config.get('NTP', 'host')
|
||||
NTP_PORT = config.getint('NTP', 'port')
|
||||
|
||||
files_directory = config.get('FILETRANSFER', 'files_directory')
|
||||
animation_file = config.get('FILETRANSFER', 'animation_file')
|
||||
|
||||
FRAME_ID = config.get('COPTERS', 'frame_id') # TODO in play_animation
|
||||
TAKEOFF_HEIGHT = config.getfloat('COPTERS', 'takeoff_height')
|
||||
TAKEOFF_TIME = config.getfloat('COPTERS', 'takeoff_time')
|
||||
RFP_TIME = config.getfloat('COPTERS', 'reach_first_point_time')
|
||||
SAFE_TAKEOFF = config.getboolean('COPTERS', 'safe_takeoff')
|
||||
|
||||
X0_COMMON = config.getfloat('COPTERS', 'x0_common')
|
||||
Y0_COMMON = config.getfloat('COPTERS', 'y0_common')
|
||||
X0 = config.getfloat('PRIVATE', 'x0')
|
||||
Y0 = config.getfloat('PRIVATE', 'y0')
|
||||
|
||||
USE_LEDS = config.getboolean('PRIVATE', 'use_leds')
|
||||
play_animation.USE_LEDS = USE_LEDS
|
||||
|
||||
COPTER_ID = config.get('PRIVATE', 'id')
|
||||
if COPTER_ID == 'default':
|
||||
COPTER_ID = 'copter' + str(random.randrange(9999)).zfill(4)
|
||||
write_to_config('PRIVATE', 'id', COPTER_ID)
|
||||
elif COPTER_ID == '/hostname':
|
||||
COPTER_ID = socket.gethostname()
|
||||
|
||||
load_config()
|
||||
|
||||
rospy.init_node('Swarm_client', anonymous=True)
|
||||
if USE_LEDS:
|
||||
LedLib.init_led()
|
||||
|
||||
print("Client started on copter:", COPTER_ID)
|
||||
if USE_NTP:
|
||||
print("NTP time:", time.ctime(get_ntp_time(NTP_HOST, NTP_PORT)))
|
||||
print("System time", time.ctime(time.time()))
|
||||
|
||||
reconnect()
|
||||
|
||||
print("Connected to server")
|
||||
|
||||
try:
|
||||
while True:
|
||||
def broadcast_bind(self):
|
||||
broadcast_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
broadcast_client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||
broadcast_client.bind(("", self.broadcast_port))
|
||||
try:
|
||||
message = recive_message()
|
||||
if message:
|
||||
message = message.decode("UTF-8")
|
||||
command, args = parse_message(message)
|
||||
print("Command from server:", command, args)
|
||||
if command == "writefile":
|
||||
recive_file(args['filename'])
|
||||
if bool(args['clever_restart']):
|
||||
os.system("systemctl restart clever")
|
||||
elif command == 'config_write':
|
||||
write_to_config(args['section'], args['option'], args['value'])
|
||||
elif command == 'config_reload':
|
||||
load_config()
|
||||
elif command == "starttime":
|
||||
global starttime
|
||||
starttime = float(args['time'])
|
||||
print("Starting on:", time.ctime(starttime))
|
||||
dt = starttime - time.time()
|
||||
if USE_NTP:
|
||||
dt = starttime - get_ntp_time(NTP_HOST, NTP_PORT)
|
||||
print("Until start:", dt)
|
||||
rospy.Timer(rospy.Duration(dt), start_animation, oneshot=True)
|
||||
elif command == 'takeoff':
|
||||
play_animation.takeoff(safe_takeoff=SAFE_TAKEOFF)
|
||||
elif command == 'pause':
|
||||
pause_animation()
|
||||
elif command == 'resume':
|
||||
resume_animation()
|
||||
elif command == 'stop':
|
||||
stop_animation()
|
||||
FlightLib.interrupt()
|
||||
elif command == 'land':
|
||||
play_animation.land()
|
||||
elif command == 'disarm':
|
||||
FlightLib.arming(False)
|
||||
elif command == 'led_test':
|
||||
LedLib.fill(255, 255, 255)
|
||||
time.sleep(2)
|
||||
LedLib.off()
|
||||
while True:
|
||||
data, addr = broadcast_client.recvfrom(self.BUFFER_SIZE)
|
||||
message = messaging.MessageManager()
|
||||
message.income_raw = data
|
||||
message.process_message()
|
||||
if message.content:
|
||||
logger.info("Received broadcast message {} from {}".format(message.content, addr))
|
||||
if message.content["command"] == "server_ip":
|
||||
args = message.content["args"]
|
||||
self.server_port = int(args["port"])
|
||||
self.server_host = args["host"]
|
||||
self.write_config(False,
|
||||
ConfigOption("SERVER", "port", self.server_port),
|
||||
ConfigOption("SERVER", "host", self.server_host))
|
||||
logger.info("Binding to new IP: {}:{}".format(self.server_host, self.server_port))
|
||||
break
|
||||
finally:
|
||||
broadcast_client.close()
|
||||
|
||||
elif command == 'request':
|
||||
request_target = args['value']
|
||||
print("Got request for:", request_target)
|
||||
response = ""
|
||||
if request_target == 'test':
|
||||
response = "test_success"
|
||||
elif request_target == 'id':
|
||||
response = COPTER_ID
|
||||
elif request_target == 'selfcheck':
|
||||
check = FlightLib.selfcheck()
|
||||
response = check if check else "OK"
|
||||
elif request_target == 'batt_voltage':
|
||||
response = FlightLib.get_telemetry('body').voltage
|
||||
elif request_target == 'cell_voltage':
|
||||
response = FlightLib.get_telemetry('body').cell_voltage
|
||||
def _process_connections(self):
|
||||
while True:
|
||||
events = self.selector.select(timeout=1)
|
||||
if events:
|
||||
for key, mask in events:
|
||||
if key.data is None:
|
||||
pass
|
||||
else:
|
||||
connection = key.data
|
||||
try:
|
||||
connection.process_events(mask)
|
||||
|
||||
send_all(bytes(form_message("response",
|
||||
{"status": "ok", "value": response, "value_name": str(request_target)})))
|
||||
print("Request responded with:", response)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
"Exception {} occurred for {}! Resetting connection!".format(error, connection.addr)
|
||||
)
|
||||
self.server_connection.close()
|
||||
self.connected = False
|
||||
|
||||
except socket.error as e:
|
||||
if e.errno != errno.EINTR:
|
||||
print("Connection lost due error:", e)
|
||||
print("Reconnecting...")
|
||||
reconnect()
|
||||
print("Re-connection successful")
|
||||
else:
|
||||
print("Interrupted")
|
||||
raise KeyboardInterrupt
|
||||
except KeyboardInterrupt:
|
||||
print("Shutdown on keyboard interrupt")
|
||||
finally:
|
||||
clientSocket.close()
|
||||
if isinstance(error, OSError):
|
||||
if error.errno == errno.EINTR:
|
||||
raise KeyboardInterrupt
|
||||
|
||||
if not self.selector.get_map():
|
||||
logger.warning("No active connections left!")
|
||||
return
|
||||
|
||||
|
||||
@messaging.request_callback("id")
|
||||
def _response_id():
|
||||
return active_client.client_id
|
||||
|
||||
@messaging.request_callback("time")
|
||||
def _response_time():
|
||||
return active_client.time_now()
|
||||
|
||||
@messaging.message_callback("config_write")
|
||||
def _command_config_write(*args, **kwargs):
|
||||
options = [ConfigOption(**raw_option) for raw_option in kwargs["options"]]
|
||||
logger.info("Writing config options: {}".format(options))
|
||||
active_client.write_config(kwargs["reload"], *options)
|
||||
|
||||
if __name__ == "__main__":
|
||||
client = Client()
|
||||
client.start()
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[SERVER]
|
||||
port = 25000
|
||||
broadcast_port = 8181
|
||||
host = 192.168.1.104
|
||||
host = 192.168.43.168
|
||||
buffer_size = 1024
|
||||
|
||||
[FILETRANSFER]
|
||||
@@ -27,3 +27,4 @@ id = /hostname
|
||||
use_leds = True
|
||||
x0 = 0
|
||||
y0 = 0
|
||||
|
||||
|
||||
192
Drone/copter_client.py
Normal file
192
Drone/copter_client.py
Normal file
@@ -0,0 +1,192 @@
|
||||
import os
|
||||
import time
|
||||
import rospy
|
||||
import logging
|
||||
|
||||
from FlightLib import FlightLib
|
||||
from FlightLib import LedLib
|
||||
|
||||
import client
|
||||
|
||||
import messaging_lib as messaging
|
||||
import tasking_lib as tasking
|
||||
import animation_lib as animation
|
||||
|
||||
import ros_logging
|
||||
|
||||
class CopterClient(client.Client):
|
||||
def load_config(self):
|
||||
super(CopterClient, self).load_config()
|
||||
self.FRAME_ID = self.config.get('COPTERS', 'frame_id')
|
||||
self.TAKEOFF_HEIGHT = self.config.getfloat('COPTERS', 'takeoff_height')
|
||||
self.TAKEOFF_TIME = self.config.getfloat('COPTERS', 'takeoff_time')
|
||||
self.SAFE_TAKEOFF = self.config.getboolean('COPTERS', 'safe_takeoff')
|
||||
self.RFP_TIME = self.config.getfloat('COPTERS', 'reach_first_point_time')
|
||||
|
||||
self.X0_COMMON = self.config.getfloat('COPTERS', 'x0_common')
|
||||
self.Y0_COMMON = self.config.getfloat('COPTERS', 'y0_common')
|
||||
self.X0 = self.config.getfloat('PRIVATE', 'x0')
|
||||
self.Y0 = self.config.getfloat('PRIVATE', 'y0')
|
||||
|
||||
self.USE_LEDS = self.config.getboolean('PRIVATE', 'use_leds')
|
||||
|
||||
def start(self, task_manager_instance):
|
||||
client.logger.info("Init ROS node")
|
||||
rospy.init_node('Swarm_client', anonymous=True, log_level=rospy.DEBUG)
|
||||
if self.USE_LEDS:
|
||||
LedLib.init_led()
|
||||
|
||||
task_manager_instance.start()
|
||||
|
||||
super(CopterClient, self).start()
|
||||
|
||||
|
||||
@messaging.request_callback("selfcheck")
|
||||
def _response_selfcheck():
|
||||
check = FlightLib.selfcheck()
|
||||
return check if check else "OK"
|
||||
|
||||
|
||||
@messaging.request_callback("batt_voltage")
|
||||
def _response_batt():
|
||||
return FlightLib.get_telemetry('body').voltage
|
||||
|
||||
|
||||
@messaging.request_callback("cell_voltage")
|
||||
def _response_cell():
|
||||
return FlightLib.get_telemetry('body').cell_voltage
|
||||
|
||||
|
||||
@messaging.message_callback("service_restart")
|
||||
def _command_service_restart(**kwargs):
|
||||
os.system("systemctl restart" + kwargs["name"])
|
||||
|
||||
|
||||
@messaging.message_callback("led_test")
|
||||
def _command_led_test(*args, **kwargs):
|
||||
LedLib.chase(255, 255, 255)
|
||||
time.sleep(2)
|
||||
LedLib.off()
|
||||
|
||||
|
||||
@messaging.message_callback("takeoff")
|
||||
def _command_takeoff(**kwargs):
|
||||
task_manager.add_task(time.time(), 0, animation.takeoff,
|
||||
task_kwargs={
|
||||
"z": client.active_client.TAKEOFF_HEIGHT,
|
||||
"timeout": client.active_client.TAKEOFF_TIME,
|
||||
"safe_takeoff": client.active_client.SAFE_TAKEOFF,
|
||||
"use_leds": client.active_client.USE_LEDS,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@messaging.message_callback("land")
|
||||
def _command_land(**kwargs):
|
||||
task_manager.reset()
|
||||
task_manager.add_task(0, 0, animation.land,
|
||||
task_kwargs={
|
||||
"z": client.active_client.TAKEOFF_HEIGHT,
|
||||
"timeout": client.active_client.TAKEOFF_TIME,
|
||||
"frame_id": client.active_client.FRAME_ID,
|
||||
"use_leds": client.active_client.USE_LEDS,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@messaging.message_callback("disarm")
|
||||
def _command_disarm(**kwargs):
|
||||
task_manager.reset()
|
||||
task_manager.add_task(-5, 0, FlightLib.arming_wrapper,
|
||||
task_kwargs={
|
||||
"state": False
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@messaging.message_callback("stop")
|
||||
def _command_stop(**kwargs):
|
||||
task_manager.stop()
|
||||
|
||||
|
||||
@messaging.message_callback("pause")
|
||||
def _command_stop(**kwargs):
|
||||
task_manager.pause()
|
||||
|
||||
|
||||
@messaging.message_callback("resume")
|
||||
def _command_stop(**kwargs):
|
||||
task_manager.resume()
|
||||
|
||||
|
||||
@messaging.message_callback("start")
|
||||
def _play_animation(**kwargs):
|
||||
gap = 0.25
|
||||
start_time = kwargs["time"] # TODO
|
||||
""" print('start time = {}'.format(start_time))
|
||||
frames = animation.load_animation(os.path.abspath("animation.csv"),
|
||||
x0=client.active_client.X0 + client.active_client.X0_COMMON,
|
||||
y0=client.active_client.Y0 + client.active_client.Y0_COMMON,
|
||||
)
|
||||
|
||||
task_manager.add_task(start_time, -1, animation.takeoff,
|
||||
task_kwargs={
|
||||
"z": client.active_client.TAKEOFF_HEIGHT,
|
||||
"timeout": client.active_client.TAKEOFF_TIME,
|
||||
"safe_takeoff": client.active_client.SAFE_TAKEOFF,
|
||||
"frame_id": client.active_client.FRAME_ID,
|
||||
"use_leds": client.active_client.USE_LEDS,
|
||||
}
|
||||
)
|
||||
|
||||
rfp_time = start_time + client.active_client.TAKEOFF_TIME + gap
|
||||
task_manager.add_task(rfp_time, -1, animation.execute_frame,
|
||||
task_kwargs={
|
||||
"point": animation.convert_frame(frames[0]),
|
||||
"timeout": client.active_client.RFP_TIME,
|
||||
"frame_id": client.active_client.FRAME_ID,
|
||||
"use_leds": client.active_client.USE_LEDS,
|
||||
"flight_func": FlightLib.reach_point,
|
||||
}
|
||||
)
|
||||
|
||||
animation_time = rfp_time + client.active_client.RFP_TIME + gap
|
||||
frame_delay = 0.125 # TODO from animation file
|
||||
task_manager.add_task(animation_time, -1, animation.execute_animation,
|
||||
task_kwargs={
|
||||
"frames": frames,
|
||||
"frame_delay": frame_delay,
|
||||
"frame_id": client.active_client.FRAME_ID,
|
||||
"use_leds": client.active_client.USE_LEDS,
|
||||
}
|
||||
)
|
||||
|
||||
land_time = animation_time + len(frames)*frame_delay + gap
|
||||
task_manager.add_task(land_time, -1, animation.land,
|
||||
task_kwargs={
|
||||
"z": client.active_client.TAKEOFF_HEIGHT,
|
||||
"timeout": client.active_client.TAKEOFF_TIME,
|
||||
"frame_id": client.active_client.FRAME_ID,
|
||||
"use_leds": client.active_client.USE_LEDS,
|
||||
},
|
||||
) """
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# rospy.init_node('Swarm_client', anonymous=True)
|
||||
copter_client = CopterClient()
|
||||
task_manager = tasking.TaskManager()
|
||||
|
||||
#print logging.root.manager.loggerDict
|
||||
#task_manager.start()
|
||||
copter_client.start(task_manager)
|
||||
loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
|
||||
if loggers is None:
|
||||
print("No loggers!")
|
||||
else:
|
||||
print("Loggers",loggers)
|
||||
ros_logging.route_logger_to_ros()
|
||||
ros_logging.route_logger_to_ros("__main__")
|
||||
ros_logging.route_logger_to_ros("client")
|
||||
ros_logging.route_logger_to_ros("messaging")
|
||||
|
||||
@@ -5,20 +5,19 @@ import logging
|
||||
from FlightLib import FlightLib
|
||||
from FlightLib import LedLib
|
||||
|
||||
module_logger = logging.getLogger("Animation player")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
animation_file_path = 'animation.csv'
|
||||
USE_LEDS = True
|
||||
FRAME_ID = 'aruco_map'
|
||||
|
||||
|
||||
def takeoff(z=1.5, safe_takeoff=True, timeout=5000):
|
||||
if USE_LEDS:
|
||||
LedLib.wipe_to(255, 0, 0)
|
||||
if safe_takeoff:
|
||||
FlightLib.takeoff(z=z, wait=True, timeout_takeoff = timeout, emergency_land=True) # TODO dont forget change back to takeoff
|
||||
else:
|
||||
FlightLib.takeoff(z=z, wait=True, emergency_land=False)
|
||||
FlightLib.takeoff(z=z, wait=True, timeout_takeoff=timeout, emergency_land=safe_takeoff,)
|
||||
LedLib.blink(0, 255, 0, wait=50)
|
||||
|
||||
|
||||
def land(descend=False):
|
||||
@@ -29,14 +28,16 @@ def land(descend=False):
|
||||
LedLib.off()
|
||||
|
||||
|
||||
def animate_frame(current_frame, x0=0.0, y0=0.0):
|
||||
FlightLib.navto(current_frame['x']+x0, current_frame['y']+y0, current_frame['z'], yaw=1.57) # TODO yaw
|
||||
def animate_frame(current_frame, x0=0.0, y0=0.0, copter_frame_id=FRAME_ID):
|
||||
FlightLib.navto(current_frame['x']+x0, current_frame['y']+y0, current_frame['z'],
|
||||
yaw=1.57, frame_id=copter_frame_id) # TODO yaw
|
||||
if USE_LEDS:
|
||||
LedLib.fill(current_frame['red'], current_frame['green'], current_frame['blue'])
|
||||
|
||||
|
||||
def reach_frame(current_frame, x0=0.0, y0=0.0, timeout=5000):
|
||||
FlightLib.reach_point(current_frame['x']+x0, current_frame['y']+y0, current_frame['z'], yaw=1.57, timeout=timeout) # TODO yaw
|
||||
def reach_frame(current_frame, x0=0.0, y0=0.0, timeout=5000, copter_frame_id=FRAME_ID):
|
||||
FlightLib.reach_point(current_frame['x']+x0, current_frame['y']+y0, current_frame['z'],
|
||||
yaw=1.57, timeout=timeout, frame_id=copter_frame_id) # TODO yaw
|
||||
if USE_LEDS:
|
||||
LedLib.fill(current_frame['red'], current_frame['green'], current_frame['blue'])
|
||||
|
||||
|
||||
29
Drone/ros_logging.py
Normal file
29
Drone/ros_logging.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import logging
|
||||
import rospy
|
||||
|
||||
|
||||
class RosHandler(logging.Handler):
|
||||
|
||||
level_map = {
|
||||
logging.DEBUG: rospy.logdebug,
|
||||
logging.INFO: rospy.loginfo,
|
||||
logging.WARNING: rospy.logwarn,
|
||||
logging.ERROR: rospy.logerr,
|
||||
logging.CRITICAL: rospy.logfatal
|
||||
}
|
||||
|
||||
def emit(self, record):
|
||||
print(record.levelno, record.name, record.msg)
|
||||
if "rosout" not in record.msg:
|
||||
try:
|
||||
pass
|
||||
#self.level_map[record.levelno]("%s: %s" % (record.name, record.msg))
|
||||
except KeyError:
|
||||
rospy.logerr("unknown log level %s LOG: %s: %s" % (record.levelno, record.name, record.msg))
|
||||
|
||||
|
||||
def route_logger_to_ros(logger_name=None):
|
||||
if logger_name is not None:
|
||||
logging.getLogger(logger_name).addHandler(RosHandler())
|
||||
else:
|
||||
logging.getLogger().addHandler(RosHandler())
|
||||
193
Drone/tasking_lib.py
Normal file
193
Drone/tasking_lib.py
Normal file
@@ -0,0 +1,193 @@
|
||||
import heapq
|
||||
import time
|
||||
import logging
|
||||
import threading
|
||||
import collections
|
||||
import itertools
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
Task = collections.namedtuple("Task", ["func", "args", "kwargs", "delayable", ])
|
||||
|
||||
INTERRUPTER = threading.Event()
|
||||
|
||||
def wait(end, interrupter=INTERRUPTER, maxsleep=0.1): # Added features to interrupter sleep and set max sleeping interval
|
||||
|
||||
while not interrupter.is_set(): # Basic implementation of pause module until()
|
||||
now = time.time()
|
||||
diff = min(end - now, maxsleep)
|
||||
if diff <= 0:
|
||||
break
|
||||
else:
|
||||
time.sleep(diff / 2)
|
||||
else:
|
||||
logger.warning("Waiting was interrupted!")
|
||||
print("Waiting was interrupted!")
|
||||
|
||||
|
||||
class TaskManager(object):
|
||||
def __init__(self):
|
||||
self.task_queue = []
|
||||
self._counter = itertools.count() # unique sequence count
|
||||
|
||||
self._processor_thread = threading.Thread(target=self._task_processor, name="Task processing thread")
|
||||
self._processor_thread.daemon = True
|
||||
self._task_queue_lock = threading.RLock()
|
||||
|
||||
self._running_event = threading.Event()
|
||||
self._reset_event = threading.Event()
|
||||
self._wait_interrupt_event = threading.Event()
|
||||
self._task_interrupt_event = threading.Event()
|
||||
self._shutdown_event = threading.Event()
|
||||
|
||||
def add_task(self, timestamp, priority, task_function,
|
||||
task_args=(), task_kwargs={}, task_delayable=False):
|
||||
|
||||
self._wait_interrupt_event.set()
|
||||
self._running_event.clear()
|
||||
|
||||
task = Task(task_function, task_args, task_kwargs, task_delayable)
|
||||
|
||||
count = next(self._counter)
|
||||
entry = (timestamp, priority, count, task)
|
||||
|
||||
with self._task_queue_lock:
|
||||
if self.task_queue:
|
||||
entry_old = self.task_queue[0]
|
||||
else:
|
||||
entry_old = entry
|
||||
|
||||
heapq.heappush(self.task_queue, entry)
|
||||
|
||||
if self.task_queue[0] != entry_old:
|
||||
self._task_interrupt_event.set()
|
||||
print("Task queue updated with more priority task")
|
||||
|
||||
if self._reset_event.is_set():
|
||||
self._task_interrupt_event.set()
|
||||
self._reset_event.clear()
|
||||
print("Task queue updated after reset")
|
||||
|
||||
self._wait_interrupt_event.clear()
|
||||
self._running_event.set()
|
||||
|
||||
print(self.task_queue)
|
||||
|
||||
def pop_task(self):
|
||||
with self._task_queue_lock:
|
||||
if self.task_queue:
|
||||
return heapq.heappop(self.task_queue)
|
||||
raise KeyError('Pop from an empty priority queue')
|
||||
|
||||
def start(self, timeouts=False):
|
||||
print("Task manager is started")
|
||||
logger.info("Task manager is started")
|
||||
self._processor_thread.start()
|
||||
self.resume()
|
||||
|
||||
def stop(self):
|
||||
self.pause(interrupt=True)
|
||||
with self._task_queue_lock:
|
||||
del self.task_queue[:]
|
||||
print self.task_queue
|
||||
|
||||
def shutdown(self):
|
||||
self.stop()
|
||||
self._shutdown_event.set()
|
||||
self._wait_interrupt_event.set()
|
||||
self._task_interrupt_event.set()
|
||||
self._running_event.clear()
|
||||
self._processor_thread.join(timeout=5)
|
||||
|
||||
def pause(self, interrupt=True):
|
||||
if interrupt:
|
||||
self._wait_interrupt_event.set()
|
||||
self._task_interrupt_event.set()
|
||||
self._running_event.clear()
|
||||
logger.info("Task queue paused")
|
||||
print("Task queue paused")
|
||||
|
||||
def resume(self):
|
||||
self._running_event.set()
|
||||
self._wait_interrupt_event.clear()
|
||||
self._task_interrupt_event.clear()
|
||||
logger.info("Task queue resumed")
|
||||
print("Task queue resumed")
|
||||
|
||||
def reset(self):
|
||||
self.stop()
|
||||
self.resume()
|
||||
self._reset_event.set()
|
||||
|
||||
# def interrupt(self):
|
||||
# self._interrupt_event.set()
|
||||
# while self._interrupt_event.is_set():
|
||||
# pass
|
||||
# logger.info("Task execution successfully interrupted")
|
||||
|
||||
def execute_task(self):
|
||||
with self._task_queue_lock:
|
||||
if self.task_queue:
|
||||
start_time, priority, count, task = self.task_queue[0]
|
||||
else:
|
||||
return
|
||||
|
||||
logger.info("Waiting util task execution time:{}".format(start_time))
|
||||
print("Waiting util task execution time:{}".format(start_time))
|
||||
wait(start_time, self._wait_interrupt_event)
|
||||
|
||||
if not self._wait_interrupt_event.is_set():
|
||||
logger.info("Executing task {}".format(task))
|
||||
print("Executing task {}".format(task))
|
||||
try:
|
||||
task.func(*task.args, interrupter=self._task_interrupt_event, **task.kwargs)
|
||||
except Exception as e:
|
||||
logger.error("Error '{}' occurred in task {}".format(e, task))
|
||||
print("Error '{}' occurred in task {}".format(e, task))
|
||||
else:
|
||||
logger.warning("Task interrupted before execution")
|
||||
print("Task interrupted before execution")
|
||||
self._wait_interrupt_event.clear()
|
||||
return
|
||||
|
||||
if time.time() > start_time:
|
||||
start_time_n, priority_n, count_n, task_n = self.task_queue[0]
|
||||
if (task_n == task) and (start_time_n == start_time):
|
||||
self.pop_task()
|
||||
try:
|
||||
print("Pop {} function!".format(task.func.__name__))
|
||||
except Exception as e:
|
||||
print("Pop something!")
|
||||
|
||||
if self._task_interrupt_event.is_set():
|
||||
self._task_interrupt_event.clear()
|
||||
|
||||
logger.info("Execution done")
|
||||
print("Execution done")
|
||||
|
||||
def _task_processor(self):
|
||||
logger.info("Tasking thread started")
|
||||
# self._update_queue() # Initial tick if tasks added before start
|
||||
while not self._shutdown_event.is_set():
|
||||
self._running_event.wait()
|
||||
self.execute_task()
|
||||
|
||||
if __name__ == "__main__":
|
||||
logger.addHandler(logging.StreamHandler())
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
def printer(stri, interrupter, *args, **kwargs):
|
||||
logger.info("String: {}, timenow: {}".format(stri, time.time()))
|
||||
wait(time.time()+30, interrupter)
|
||||
|
||||
tasker = TaskManager() # Lower priority first!
|
||||
|
||||
tasker.start()
|
||||
tasker.add_task(time.time(), 10, printer, ("Task1 ", ))
|
||||
tasker.add_task(time.time()+10, 5, printer, ("Task2 ", ))
|
||||
time.sleep(1)
|
||||
tasker.add_task(time.time()+7, 1, printer, ("Task3", ))
|
||||
time.sleep(3)
|
||||
tasker.add_task(time.time()+7, 0, printer, ("Task4", ))
|
||||
|
||||
while True:
|
||||
pass
|
||||
371
Server/server.py
371
Server/server.py
@@ -1,44 +1,42 @@
|
||||
import os
|
||||
import sys
|
||||
import math
|
||||
import time
|
||||
import json
|
||||
import struct
|
||||
import socket
|
||||
import random
|
||||
import logging
|
||||
import threading
|
||||
import selectors
|
||||
import collections
|
||||
import configparser
|
||||
|
||||
import os,sys,inspect # Add parent dir to PATH to import messaging_lib
|
||||
current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
|
||||
parent_dir = os.path.dirname(current_dir)
|
||||
sys.path.insert(0, parent_dir)
|
||||
import messaging_lib as messaging
|
||||
|
||||
# All imports sorted in pyramid just because
|
||||
|
||||
random.seed()
|
||||
|
||||
logging.basicConfig( # TODO all prints as logs
|
||||
level=logging.INFO,
|
||||
level=logging.DEBUG,
|
||||
format="%(asctime)s [%(name)-7.7s] [%(threadName)-19.19s] [%(levelname)-7.7s] %(message)s",
|
||||
handlers=[
|
||||
logging.FileHandler("server_logs.log"),
|
||||
logging.StreamHandler()
|
||||
])
|
||||
|
||||
|
||||
class ConfigOption:
|
||||
def __init__(self, section, option, value):
|
||||
self.section = section
|
||||
self.option = option
|
||||
self.value = value
|
||||
ConfigOption = collections.namedtuple("ConfigOption", ["section", "option", "value"])
|
||||
|
||||
|
||||
class Server:
|
||||
BUFFER_SIZE = 1024
|
||||
|
||||
def __init__(self, server_id=None, config_path="server_config.ini"):
|
||||
|
||||
self.id = server_id if server_id else str(random.randint(0, 9999)).zfill(4)
|
||||
|
||||
# Init socket
|
||||
self.sel = selectors.DefaultSelector()
|
||||
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.host = socket.gethostname()
|
||||
@@ -47,13 +45,12 @@ class Server:
|
||||
# Init configs
|
||||
self.config_path = config_path
|
||||
self.config = configparser.ConfigParser()
|
||||
self.config.read(self.config_path)
|
||||
self.load_config()
|
||||
|
||||
# Init threads
|
||||
self.autoconnect_thread = threading.Thread(target=self._auto_connect, daemon=True,
|
||||
name='Client auto-connect')
|
||||
self.autoconnect_thread_running = threading.Event() # Can be used for manual thread killing
|
||||
self.autoconnect_thread = threading.Thread(target=self._client_processor, daemon=True,
|
||||
name='Client processor')
|
||||
self.client_processor_thread_running = threading.Event() # Can be used for manual thread killing
|
||||
|
||||
self.broadcast_thread = threading.Thread(target=self._ip_broadcast, daemon=True,
|
||||
name='IP broadcast sender')
|
||||
@@ -64,7 +61,8 @@ class Server:
|
||||
self.listener_thread_running = threading.Event()
|
||||
|
||||
def load_config(self):
|
||||
self.port = int(self.config['SERVER']['port'])
|
||||
self.config.read(self.config_path)
|
||||
self.port = int(self.config['SERVER']['port']) # TODO try, init def
|
||||
self.broadcast_port = int(self.config['SERVER']['broadcast_port'])
|
||||
self.BROADCAST_DELAY = int(self.config['SERVER']['broadcast_delay'])
|
||||
Server.BUFFER_SIZE = int(self.config['SERVER']['buffer_size'])
|
||||
@@ -74,12 +72,12 @@ class Server:
|
||||
self.NTP_PORT = int(self.config['NTP']['port'])
|
||||
|
||||
def start(self): # do_auto_connect=True, do_ip_broadcast=True, do_listen_broadcast=False
|
||||
logging.info("Starting server with id: {} on {} !".format(self.id, self.ip))
|
||||
logging.info("Starting server with id: {} on {}:{} !".format(self.id, self.ip, self.port))
|
||||
logging.info("Starting server socket!")
|
||||
self.server_socket.bind((self.ip, self.port))
|
||||
|
||||
logging.info("Starting client autoconnect thread!")
|
||||
self.autoconnect_thread_running.set()
|
||||
logging.info("Starting client processor thread!")
|
||||
self.client_processor_thread_running.set()
|
||||
self.autoconnect_thread.start()
|
||||
|
||||
logging.info("Starting broadcast sender thread!")
|
||||
@@ -92,17 +90,22 @@ class Server:
|
||||
|
||||
def stop(self):
|
||||
logging.info("Stopping server")
|
||||
self.autoconnect_thread_running.clear()
|
||||
self.client_processor_thread_running.clear()
|
||||
self.broadcast_thread_running.clear()
|
||||
self.listener_thread_running.clear()
|
||||
self.server_socket.close()
|
||||
self.sel.close()
|
||||
logging.info("Server stopped")
|
||||
|
||||
@staticmethod
|
||||
def get_ip_address():
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ip_socket:
|
||||
ip_socket.connect(("8.8.8.8", 80))
|
||||
return ip_socket.getsockname()[0]
|
||||
try:
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ip_socket:
|
||||
ip_socket.connect(("8.8.8.8", 80))
|
||||
return ip_socket.getsockname()[0]
|
||||
except OSError:
|
||||
logging.warning("No network connection detected, starting on localhost")
|
||||
return "localhost"
|
||||
|
||||
@staticmethod
|
||||
def get_ntp_time(ntp_host, ntp_port):
|
||||
@@ -113,36 +116,66 @@ class Server:
|
||||
msg, _ = ntp_socket.recvfrom(1024)
|
||||
return int.from_bytes(msg[-8:], 'big') / 2 ** 32 - NTP_DELTA
|
||||
|
||||
def _auto_connect(self):
|
||||
logging.info("Client autoconnect thread started!")
|
||||
while self.autoconnect_thread_running.is_set():
|
||||
self.server_socket.listen(1)
|
||||
c, addr = self.server_socket.accept()
|
||||
logging.info("Got connection from: {}".format(str(addr)))
|
||||
if not any(client_addr == addr[0] for client_addr in Client.clients.keys()):
|
||||
client = Client(addr[0])
|
||||
logging.info("New client")
|
||||
else:
|
||||
logging.info("Reconnected client")
|
||||
Client.clients[addr[0]].connect(c, addr)
|
||||
def time_now(self):
|
||||
if self.USE_NTP:
|
||||
timenow = self.get_ntp_time(self.NTP_HOST, self.NTP_PORT)
|
||||
else:
|
||||
timenow = time.time()
|
||||
return timenow
|
||||
|
||||
# noinspection PyArgumentList
|
||||
def _client_processor(self):
|
||||
logging.info("Client processor (selector) thread started!")
|
||||
self.server_socket.listen()
|
||||
self.server_socket.setblocking(False)
|
||||
self.sel.register(self.server_socket, selectors.EVENT_READ | selectors.EVENT_WRITE, data=None)
|
||||
|
||||
while self.client_processor_thread_running.is_set():
|
||||
events = self.sel.select(timeout=None)
|
||||
for key, mask in events:
|
||||
if key.data is None:
|
||||
self._connect_client(key.fileobj)
|
||||
else:
|
||||
client = key.data
|
||||
try:
|
||||
client.process_events(mask)
|
||||
except Exception as error:
|
||||
logging.error("Exception {} occurred for {}! Resetting connection!".format(error, client.addr))
|
||||
client.close()
|
||||
|
||||
logging.info("Client autoconnect thread stopped!")
|
||||
|
||||
def _connect_client(self, sock):
|
||||
conn, addr = sock.accept()
|
||||
logging.info("Got connection from: {}".format(str(addr)))
|
||||
conn.setblocking(False)
|
||||
|
||||
if not any(client_addr == addr[0] for client_addr in Client.clients.keys()):
|
||||
client = Client(addr[0])
|
||||
logging.info("New client")
|
||||
else:
|
||||
client = Client.clients[addr[0]]
|
||||
logging.info("Reconnected client")
|
||||
self.sel.register(conn, selectors.EVENT_READ, data=client)
|
||||
client.connect(self.sel, conn, addr)
|
||||
|
||||
def _ip_broadcast(self):
|
||||
logging.info("Broadcast sender thread started!")
|
||||
msg = bytes(Client.form_message(
|
||||
"server_ip", {"host": self.ip, "port": str(self.port), "id": self.id}
|
||||
), "UTF-8")
|
||||
msg = messaging.MessageManager.create_simple_message(
|
||||
"server_ip", {"host": self.ip, "port": str(self.port), "id": self.id})
|
||||
broadcast_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
|
||||
broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
||||
logging.info("Formed broadcast message: {}".format(msg))
|
||||
|
||||
while self.broadcast_thread_running.is_set():
|
||||
time.sleep(self.BROADCAST_DELAY)
|
||||
broadcast_sock.sendto(msg, ('255.255.255.255', self.broadcast_port))
|
||||
logging.debug("Broadcast sent")
|
||||
broadcast_sock.close()
|
||||
logging.info("Broadcast sender thread stopped, socked closed!")
|
||||
try:
|
||||
while self.broadcast_thread_running.is_set():
|
||||
time.sleep(self.BROADCAST_DELAY)
|
||||
broadcast_sock.sendto(msg, ('255.255.255.255', self.broadcast_port))
|
||||
logging.debug("Broadcast sent")
|
||||
finally:
|
||||
broadcast_sock.close()
|
||||
logging.info("Broadcast sender thread stopped, socked closed!")
|
||||
|
||||
def _broadcast_listen(self):
|
||||
logging.info("Broadcast listener thread started!")
|
||||
@@ -154,23 +187,27 @@ class Server:
|
||||
logging.critical("Another server is running on this computer, shutting down!")
|
||||
sys.exit()
|
||||
|
||||
while self.listener_thread_running.is_set():
|
||||
data, addr = broadcast_client.recvfrom(1024)
|
||||
command, args = Client.parse_message(data.decode("UTF-8"))
|
||||
if command == "server_ip":
|
||||
if args["id"] != self.id:
|
||||
logging.critical("Another server detected on network, shutting down")
|
||||
sys.exit()
|
||||
broadcast_client.close()
|
||||
logging.info("Broadcast listener thread stopped, socked closed!")
|
||||
try:
|
||||
while self.listener_thread_running.is_set():
|
||||
data, addr = broadcast_client.recvfrom(1024)
|
||||
message = messaging.MessageManager()
|
||||
message.income_raw = data
|
||||
message.process_message()
|
||||
if message.content:
|
||||
if message.content["command"] == "server_ip":
|
||||
if message.content["args"]["id"] != self.id:
|
||||
logging.critical("Another server detected on network, shutting down")
|
||||
sys.exit()
|
||||
else:
|
||||
logging.warning("Got wrong broadcast message from {}".format(addr))
|
||||
finally:
|
||||
broadcast_client.close()
|
||||
logging.info("Broadcast listener thread stopped, socked closed!")
|
||||
|
||||
def send_starttime(self, copter, dt=0):
|
||||
if self.USE_NTP:
|
||||
timenow = Server.get_ntp_time(self.NTP_HOST, self.NTP_PORT)
|
||||
else:
|
||||
timenow = time.time()
|
||||
timenow = self.time_now()
|
||||
print('Now:', time.ctime(timenow), "+ dt =", dt)
|
||||
copter.send(Client.form_message("starttime", {"time": str(timenow + dt)}))
|
||||
copter.send_message("start", {"time": str(timenow + dt)})
|
||||
|
||||
|
||||
def requires_connect(f):
|
||||
@@ -191,9 +228,7 @@ def requires_any_connected(f):
|
||||
return wrapper
|
||||
|
||||
|
||||
class Client:
|
||||
resume_quee = True
|
||||
|
||||
class Client(messaging.ConnectionManager):
|
||||
clients = {}
|
||||
|
||||
on_connect = None # Use as callback functions
|
||||
@@ -201,198 +236,72 @@ class Client:
|
||||
on_disconnect = None
|
||||
|
||||
def __init__(self, ip):
|
||||
self.socket = None
|
||||
self.addr = None
|
||||
|
||||
self._send_queue = collections.deque()
|
||||
self._received_queue = collections.deque()
|
||||
self._request_queue = collections.OrderedDict()
|
||||
|
||||
self._send_lock = threading.Lock()
|
||||
self._request_lock = threading.Lock()
|
||||
|
||||
super(Client, self).__init__()
|
||||
self.copter_id = None
|
||||
self.selected = False # Use to select copters for certain purposes DEPRECATED
|
||||
|
||||
Client.clients[ip] = self
|
||||
|
||||
self.connected = False
|
||||
|
||||
def connect(self, client_socket, client_addr):
|
||||
print("Client connected")
|
||||
if not Client.resume_quee:
|
||||
self.clients[ip] = self
|
||||
|
||||
@staticmethod
|
||||
def get_by_id(copter_id):
|
||||
for client in Client.clients.values():
|
||||
if client.copter_id == copter_id:
|
||||
return client
|
||||
|
||||
def connect(self, client_selector, client_socket, client_addr):
|
||||
logging.info("Client connected")
|
||||
if not self.resume_queue:
|
||||
self._send_queue = collections.deque()
|
||||
|
||||
self.socket = client_socket
|
||||
self.addr = client_addr
|
||||
super(Client, self).connect(client_selector, client_socket, client_addr)
|
||||
|
||||
self.socket.setblocking(0)
|
||||
self.connected = True
|
||||
client_thread = threading.Thread(target=self._run, name="Client {} thread".format(self.addr))
|
||||
client_thread.start()
|
||||
|
||||
if self.copter_id is None:
|
||||
self.copter_id = self.get_response("id")
|
||||
print("Got copter id:", self.copter_id)
|
||||
if Client.on_first_connect:
|
||||
Client.on_first_connect(self)
|
||||
self.get_response("id", self._got_id)
|
||||
|
||||
if Client.on_connect:
|
||||
Client.on_connect(self)
|
||||
if self.on_connect:
|
||||
self.on_connect(self)
|
||||
|
||||
def _send_all(self, msg):
|
||||
self.socket.sendall(struct.pack('>I', len(msg)) + msg)
|
||||
def _got_id(self, value):
|
||||
logging.info("Got copter id: {} for client {}".format(value, self.addr))
|
||||
self.copter_id = value
|
||||
if Client.on_first_connect:
|
||||
Client.on_first_connect(self)
|
||||
|
||||
def _receive_all(self, n):
|
||||
data = b''
|
||||
while len(data) < n:
|
||||
packet = self.socket.recv(min(n - len(data), Server.BUFFER_SIZE))
|
||||
print("Receiving packet {}; full data is {}".format(packet, data))
|
||||
if not packet:
|
||||
return None
|
||||
data += packet
|
||||
return data
|
||||
def close(self):
|
||||
self.connected = False
|
||||
|
||||
def _receive_message(self):
|
||||
raw_msglen = self._receive_all(4)
|
||||
if not raw_msglen:
|
||||
print("No valid msg")
|
||||
return None
|
||||
msglen = struct.unpack('>I', raw_msglen)[0]
|
||||
msg = self._receive_all(msglen)
|
||||
return msg
|
||||
if Client.on_disconnect:
|
||||
Client.on_disconnect(self)
|
||||
|
||||
def _run(self):
|
||||
while self.connected:
|
||||
try:
|
||||
if self._send_queue:
|
||||
with self._send_lock:
|
||||
msg = self._send_queue.popleft()
|
||||
try:
|
||||
self._send_all(msg)
|
||||
print("Send", msg, "to", self.addr)
|
||||
except socket.error as e:
|
||||
logging.warning("Attempt to send failed: {}".format(e))
|
||||
with self._send_lock:
|
||||
self._send_queue.appendleft(msg)
|
||||
raise e
|
||||
|
||||
try: # check if data in buffer
|
||||
check = self.socket.recv(Server.BUFFER_SIZE, socket.MSG_PEEK)
|
||||
if check:
|
||||
received = self._receive_message()
|
||||
if received:
|
||||
received = received.decode("UTF-8")
|
||||
print("Received", received, "from", self.addr)
|
||||
command, args = Client.parse_message(received)
|
||||
if command == "response":
|
||||
with self._request_lock:
|
||||
for key, value in self._request_queue.items():
|
||||
if not value and key == args["value_name"]:
|
||||
self._request_queue[key] = args['value']
|
||||
print("Request successfully closed")
|
||||
break
|
||||
else:
|
||||
print("Unexpected request")
|
||||
else:
|
||||
self._received_queue.appendleft(received)
|
||||
except socket.error:
|
||||
pass
|
||||
|
||||
except socket.error as e:
|
||||
logging.warning("Client error: {}, disconnected".format(e))
|
||||
self.connected = False
|
||||
self.socket.close()
|
||||
if Client.on_disconnect:
|
||||
Client.on_disconnect(self)
|
||||
break
|
||||
# time.sleep(0.05)
|
||||
|
||||
@staticmethod
|
||||
def form_message(command: str, dict_arguments: dict = None):
|
||||
if dict_arguments is None:
|
||||
dict_arguments = {}
|
||||
msg_dict = {command: dict_arguments}
|
||||
msg = json.dumps(msg_dict)
|
||||
return msg
|
||||
|
||||
@staticmethod
|
||||
def parse_message(msg):
|
||||
try:
|
||||
j_message = json.loads(msg)
|
||||
except ValueError:
|
||||
print("Json string not in correct format")
|
||||
return None, None
|
||||
str_command = list(j_message.keys())[0]
|
||||
dict_arguments = list(j_message.values())[0]
|
||||
|
||||
return str_command, dict_arguments
|
||||
super(Client, self).close()
|
||||
|
||||
@requires_connect
|
||||
def send(self, *messages):
|
||||
for message in messages:
|
||||
with self._send_lock:
|
||||
self._send_queue.append(bytes(message, "UTF-8"))
|
||||
def _send(self, data):
|
||||
super(Client, self)._send(data)
|
||||
logging.debug("Queued data to send: {}".format(data))
|
||||
|
||||
@requires_connect
|
||||
def get_response(self, requested_value):
|
||||
with self._request_lock:
|
||||
self._request_queue[requested_value] = ""
|
||||
self.send(Client.form_message("request", {"value": requested_value}))
|
||||
|
||||
while not self._request_queue[requested_value]:
|
||||
pass
|
||||
|
||||
with self._request_lock:
|
||||
return self._request_queue.pop(requested_value)
|
||||
|
||||
@requires_connect
|
||||
def send_file(self, filepath, dest_filename, clever_restart = False):
|
||||
print("Sending file ", dest_filename)
|
||||
chunk_count = math.ceil(os.path.getsize(filepath) / Server.BUFFER_SIZE)
|
||||
self.send(Client.form_message("writefile", {"filesize": chunk_count, "filename": dest_filename, "clever_restart": clever_restart}))
|
||||
with open(filepath, 'rb') as file:
|
||||
chunk = file.read(Server.BUFFER_SIZE)
|
||||
while chunk:
|
||||
with self._send_lock:
|
||||
self._send_queue.append(chunk)
|
||||
chunk = file.read(Server.BUFFER_SIZE)
|
||||
|
||||
self.send(Client.form_message("/endoffile")) # TODO mb remove
|
||||
print("File sent")
|
||||
|
||||
@staticmethod
|
||||
@requires_any_connected
|
||||
def send_to_selected(message): # DEPRECATED
|
||||
for client in Client.clients.values():
|
||||
if client.connected and client.selected:
|
||||
client.send(message)
|
||||
|
||||
@staticmethod
|
||||
@requires_any_connected
|
||||
def request_to_selected(requested_value): # DEPRECATED
|
||||
for client in Client.clients.values():
|
||||
if client.connected and client.selected:
|
||||
client.get_response(requested_value)
|
||||
def send_config_options(self, *options: ConfigOption, reload_config=True):
|
||||
logging.info("Sending config options: {} to {}".format(options, self.addr))
|
||||
sending_options = [{'section': option.section, 'option': option.option, 'value': option.value}
|
||||
for option in options]
|
||||
print(sending_options)
|
||||
self.send_message(
|
||||
'config_write', {"options": sending_options, "reload": reload_config}
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
@requires_any_connected
|
||||
def broadcast(message, force_all=False):
|
||||
for client in Client.clients.values():
|
||||
if client.connected or force_all:
|
||||
client.send(message)
|
||||
client._send(message)
|
||||
|
||||
def send_config_options(self, *options: ConfigOption):
|
||||
for option in options:
|
||||
self.send(
|
||||
Client.form_message('config_write',
|
||||
{'section': option.section, 'option': option.option, 'value': option.value}))
|
||||
self.send(Client.form_message("config_reload"))
|
||||
|
||||
@staticmethod
|
||||
def get_by_id(copter_id):
|
||||
for copter in Client.clients.values():
|
||||
if copter.copter_id == copter_id:
|
||||
return copter
|
||||
@classmethod
|
||||
@requires_any_connected
|
||||
def broadcast_message(cls, command, args=None, force_all=False):
|
||||
cls.broadcast(messaging.MessageManager.create_simple_message(command, args), force_all)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import os
|
||||
import glob
|
||||
|
||||
from PyQt5 import QtWidgets
|
||||
from PyQt5.QtGui import QStandardItemModel, QStandardItem
|
||||
#from PyQt5.QtCore import QModelIndex
|
||||
from PyQt5.QtCore import Qt, pyqtSlot
|
||||
|
||||
from PyQt5.QtWidgets import QFileDialog, QMessageBox
|
||||
@@ -11,19 +11,19 @@ from PyQt5.QtWidgets import QFileDialog, QMessageBox
|
||||
from server_gui import Ui_MainWindow
|
||||
|
||||
from server import *
|
||||
class CopterView(QStandardItemModel):
|
||||
pass
|
||||
|
||||
|
||||
# noinspection PyArgumentList,PyCallByClass
|
||||
class MainWindow(QtWidgets.QMainWindow):
|
||||
def __init__(self):
|
||||
super(MainWindow, self).__init__()
|
||||
self.ui = Ui_MainWindow()
|
||||
self.ui.setupUi(self)
|
||||
self.initUI()
|
||||
self.init_ui()
|
||||
self.show()
|
||||
|
||||
def initUI(self):
|
||||
#Connecting
|
||||
def init_ui(self):
|
||||
# Connecting
|
||||
self.ui.check_button.clicked.connect(self.check_selected)
|
||||
self.ui.start_button.clicked.connect(self.send_starttime)
|
||||
self.ui.pause_button.clicked.connect(self.pause_all)
|
||||
@@ -38,7 +38,7 @@ class MainWindow(QtWidgets.QMainWindow):
|
||||
self.ui.action_send_configurations.triggered.connect(self.send_configurations)
|
||||
self.ui.action_send_Aruco_map.triggered.connect(self.send_aruco)
|
||||
|
||||
#Initing table and table model
|
||||
# Initing table and table model
|
||||
self.ui.tableView.setModel(model)
|
||||
self.ui.tableView.horizontalHeader().setStretchLastSection(True)
|
||||
|
||||
@@ -49,24 +49,28 @@ class MainWindow(QtWidgets.QMainWindow):
|
||||
if item.isCheckable() and item.checkState() == Qt.Checked:
|
||||
print("Copter {} checked".format(model.item(row_num, 0).text()))
|
||||
copter = Client.get_by_id(item.text())
|
||||
batt_total = float(copter.get_response("batt_voltage"))
|
||||
batt_cell = float(copter.get_response("cell_voltage"))
|
||||
selfcheck = copter.get_response("selfcheck")
|
||||
|
||||
batt_percent = ((batt_cell-3.2)/(4.2-3.2))*100
|
||||
|
||||
model.setData(model.index(row_num, 2), "{} V.".format(round(batt_total, 3)))
|
||||
model.setData(model.index(row_num, 3), "{} %".format(round(batt_percent, 3)))
|
||||
if selfcheck != "OK":
|
||||
print(selfcheck)
|
||||
model.setData(model.index(row_num, 4), str(selfcheck))
|
||||
else:
|
||||
print("Everything ok")
|
||||
model.setData(model.index(row_num, 4), str(selfcheck))
|
||||
copter.get_response("batt_voltage", self._set_copter_data, callback_args=(row_num, 2))
|
||||
copter.get_response("cell_voltage", self._set_copter_data, callback_args=(row_num, 3))
|
||||
copter.get_response("selfcheck", self._set_copter_data, callback_args=(row_num, 4))
|
||||
copter.get_response("time", self._set_copter_data, callback_args=(row_num, 5))
|
||||
|
||||
self.ui.start_button.setEnabled(True)
|
||||
self.ui.takeoff_button.setEnabled(True)
|
||||
|
||||
def _set_copter_data(self, value, row, col):
|
||||
if col == 2:
|
||||
model.setData(model.index(row, col), "{} V.".format(round(float(value), 3)))
|
||||
elif col == 3:
|
||||
batt_percent = ((float(value) - 3.2) / (4.2 - 3.2)) * 100
|
||||
model.setData(model.index(row, col), "{} %".format(round(batt_percent, 3)))
|
||||
elif col == 4:
|
||||
if value != "OK":
|
||||
model.setData(model.index(row, col), str(value)) # TODO different handling
|
||||
else:
|
||||
model.setData(model.index(row, col), str(value))
|
||||
elif col == 5:
|
||||
model.setData(model.index(row, col), time.ctime(int(value)))
|
||||
|
||||
@pyqtSlot()
|
||||
def send_starttime(self):
|
||||
dt = self.ui.start_delay_spin.value()
|
||||
@@ -88,15 +92,15 @@ class MainWindow(QtWidgets.QMainWindow):
|
||||
|
||||
@pyqtSlot()
|
||||
def stop_all(self):
|
||||
Client.broadcast(Client.form_message("stop"))
|
||||
Client.broadcast_message("stop")
|
||||
|
||||
@pyqtSlot()
|
||||
def pause_all(self):
|
||||
if self.ui.pause_button.text() == 'Pause':
|
||||
Client.broadcast(Client.form_message('pause'))
|
||||
Client.broadcast_message('pause')
|
||||
self.ui.pause_button.setText('Resume')
|
||||
else:
|
||||
Client.broadcast(Client.form_message('resume'))
|
||||
Client.broadcast_message('resume')
|
||||
self.ui.pause_button.setText('Pause')
|
||||
|
||||
@pyqtSlot()
|
||||
@@ -106,7 +110,7 @@ class MainWindow(QtWidgets.QMainWindow):
|
||||
if item.isCheckable() and item.checkState() == Qt.Checked:
|
||||
if True: # TODO checks for batt/selfckeck here
|
||||
copter = Client.get_by_id(item.text())
|
||||
copter.send(Client.form_message("led_test"))
|
||||
copter.send_message("led_test")
|
||||
|
||||
@pyqtSlot()
|
||||
def takeoff_selected(self):
|
||||
@@ -122,17 +126,18 @@ class MainWindow(QtWidgets.QMainWindow):
|
||||
if item.isCheckable() and item.checkState() == Qt.Checked:
|
||||
if True: # TODO checks for batt/selfckeck here
|
||||
copter = Client.get_by_id(item.text())
|
||||
copter.send(Client.form_message("takeoff"))
|
||||
copter.send_message("takeoff")
|
||||
else:
|
||||
print("Cancelled")
|
||||
pass
|
||||
|
||||
@pyqtSlot()
|
||||
def land_all(self):
|
||||
Client.broadcast(Client.form_message("land"))
|
||||
Client.broadcast_message("land")
|
||||
|
||||
@pyqtSlot()
|
||||
def disarm_all(self):
|
||||
Client.broadcast(Client.form_message("disarm"))
|
||||
Client.broadcast_message("disarm")
|
||||
|
||||
@pyqtSlot()
|
||||
def send_animations(self):
|
||||
@@ -163,7 +168,7 @@ class MainWindow(QtWidgets.QMainWindow):
|
||||
for section in sendable_config.sections():
|
||||
for option in dict(sendable_config.items(section)):
|
||||
value = sendable_config[section][option]
|
||||
print("Got item from config:", section, option, value)
|
||||
logging.debug("Got item from config:".format(section, option, value))
|
||||
options.append(ConfigOption(section, option, value))
|
||||
for row_num in range(model.rowCount()):
|
||||
item = model.item(row_num, 0)
|
||||
@@ -181,7 +186,8 @@ class MainWindow(QtWidgets.QMainWindow):
|
||||
item = model.item(row_num, 0)
|
||||
if item.isCheckable() and item.checkState() == Qt.Checked:
|
||||
copter = Client.get_by_id(item.text())
|
||||
copter.send_file(path, "/home/pi/catkin_ws/src/clever/aruco_pose/map/animation_map.txt", clever_restart=True)
|
||||
copter.send_file(path, "/home/pi/catkin_ws/src/clever/aruco_pose/map/animation_map.txt")
|
||||
copter.send_message("service_restart", {"name": "clever"})
|
||||
|
||||
|
||||
model = QStandardItemModel()
|
||||
|
||||
0
__init__.py
Normal file
0
__init__.py
Normal file
44
logging_lib.py
Normal file
44
logging_lib.py
Normal file
@@ -0,0 +1,44 @@
|
||||
import logging
|
||||
|
||||
try:
|
||||
import rospy
|
||||
except ImportError:
|
||||
ros = False
|
||||
else:
|
||||
ros = True
|
||||
|
||||
|
||||
class Logger:
|
||||
def __init__(self, logger=logging.getLogger(), use_ros=False):
|
||||
self.ros = True if use_ros and ros else False
|
||||
self.logger = logger
|
||||
|
||||
def info(self, msg):
|
||||
self.logger.info(msg)
|
||||
|
||||
if self.ros:
|
||||
rospy.loginfo(msg)
|
||||
|
||||
def debug(self, msg):
|
||||
self.logger.debug(msg)
|
||||
|
||||
if self.ros:
|
||||
rospy.logdebug(msg)
|
||||
|
||||
def warning(self, msg):
|
||||
self.logger.warning(msg)
|
||||
|
||||
if self.ros:
|
||||
rospy.logwarn(msg)
|
||||
|
||||
def error(self, msg):
|
||||
self.logger.error(msg)
|
||||
|
||||
if self.ros:
|
||||
rospy.logerr(msg)
|
||||
|
||||
def critical(self, msg):
|
||||
self.logger.critical(msg)
|
||||
|
||||
if self.ros:
|
||||
rospy.logfatal(msg)
|
||||
401
messaging_lib.py
Normal file
401
messaging_lib.py
Normal file
@@ -0,0 +1,401 @@
|
||||
import io
|
||||
import sys
|
||||
import json
|
||||
import struct
|
||||
import random
|
||||
import logging
|
||||
import threading
|
||||
import collections
|
||||
|
||||
try:
|
||||
import selectors
|
||||
except ImportError:
|
||||
import selectors2 as selectors
|
||||
|
||||
import logging_lib
|
||||
|
||||
PendingRequest = collections.namedtuple("PendingRequest", ["value", "requested_value", # "expires_on",
|
||||
"callback", "callback_args", "callback_kwargs",
|
||||
])
|
||||
_logger = logging.getLogger(__name__)
|
||||
logger = logging_lib.Logger(_logger, True)
|
||||
|
||||
|
||||
class MessageManager:
|
||||
def __init__(self):
|
||||
self.income_raw = b""
|
||||
self._jsonheader_len = None
|
||||
self.jsonheader = None
|
||||
self.content = None
|
||||
|
||||
@staticmethod
|
||||
def _json_encode(obj, encoding="utf-8"):
|
||||
return json.dumps(obj, ensure_ascii=False).encode(encoding)
|
||||
|
||||
@staticmethod
|
||||
def _json_decode(json_bytes, encoding="utf-8"):
|
||||
with io.TextIOWrapper(io.BytesIO(json_bytes), encoding=encoding, newline="") as tiow:
|
||||
obj = json.load(tiow)
|
||||
return obj
|
||||
|
||||
@classmethod
|
||||
def create_message(cls, content_bytes, content_type, message_type, content_encoding="utf-8",
|
||||
additional_headers=None):
|
||||
jsonheader = {
|
||||
"byteorder": sys.byteorder,
|
||||
"content-type": content_type,
|
||||
"content-encoding": content_encoding,
|
||||
"content-length": len(content_bytes),
|
||||
"message-type": message_type,
|
||||
}
|
||||
if additional_headers:
|
||||
jsonheader.update(additional_headers)
|
||||
|
||||
jsonheader_bytes = cls._json_encode(jsonheader, "utf-8")
|
||||
message_hdr = struct.pack(">H", len(jsonheader_bytes))
|
||||
message = message_hdr + jsonheader_bytes + content_bytes
|
||||
return message
|
||||
|
||||
@classmethod
|
||||
def create_json_message(cls, contents):
|
||||
message = cls.create_message(cls._json_encode(contents), "json", "message")
|
||||
return message
|
||||
|
||||
@classmethod
|
||||
def create_simple_message(cls, command, args=None):
|
||||
if args is None:
|
||||
args = {}
|
||||
message = cls.create_json_message({"command": command, "args": args})
|
||||
return message
|
||||
|
||||
@classmethod
|
||||
def create_request(cls, requested_value, request_id, args=None):
|
||||
if args is None:
|
||||
args = {}
|
||||
contents = {"requested_value": requested_value,
|
||||
"request_id": request_id,
|
||||
"args": args,
|
||||
}
|
||||
message = cls.create_message(cls._json_encode(contents), "json", "request")
|
||||
return message
|
||||
|
||||
@classmethod
|
||||
def create_response(cls, requested_value, request_id, value):
|
||||
contents = {"requested_value": requested_value,
|
||||
"request_id": request_id,
|
||||
"value": value,
|
||||
}
|
||||
message = cls.create_message(cls._json_encode(contents), "json", "response")
|
||||
return message
|
||||
|
||||
def _process_protoheader(self):
|
||||
header_len = 2
|
||||
if len(self.income_raw) >= header_len:
|
||||
self._jsonheader_len = struct.unpack(">H", self.income_raw[:header_len])[0]
|
||||
self.income_raw = self.income_raw[header_len:]
|
||||
|
||||
def _process_jsonheader(self):
|
||||
header_len = self._jsonheader_len
|
||||
if len(self.income_raw) >= header_len:
|
||||
self.jsonheader = self._json_decode(self.income_raw[:header_len], "utf-8")
|
||||
self.income_raw = self.income_raw[header_len:]
|
||||
for reqhdr in (
|
||||
"byteorder",
|
||||
"content-length",
|
||||
"content-type",
|
||||
"content-encoding",
|
||||
"message-type",
|
||||
):
|
||||
if reqhdr not in self.jsonheader:
|
||||
raise ValueError('Missing required header {}'.format(reqhdr))
|
||||
|
||||
def _process_content(self):
|
||||
content_len = self.jsonheader["content-length"]
|
||||
if not len(self.income_raw) >= content_len:
|
||||
return
|
||||
data = self.income_raw[:content_len]
|
||||
self.income_raw = self.income_raw[content_len:]
|
||||
if self.jsonheader["content-type"] == "json":
|
||||
encoding = self.jsonheader["content-encoding"]
|
||||
self.content = self._json_decode(data, encoding)
|
||||
else:
|
||||
self.content = data
|
||||
|
||||
def process_message(self):
|
||||
if self._jsonheader_len is None:
|
||||
self._process_protoheader()
|
||||
|
||||
if self._jsonheader_len is not None:
|
||||
if self.jsonheader is None:
|
||||
self._process_jsonheader()
|
||||
|
||||
if self.jsonheader:
|
||||
if self.content is None:
|
||||
self._process_content()
|
||||
|
||||
|
||||
def message_callback(string_command):
|
||||
def inner(f):
|
||||
ConnectionManager.messages_callbacks[string_command] = f
|
||||
logger.debug("Registered message function {} for {}".format(f, string_command))
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
def request_callback(string_command):
|
||||
def inner(f):
|
||||
ConnectionManager.requests_callbacks[string_command] = f
|
||||
logger.debug("Registered callback function {} for {}".format(f, string_command))
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
class ConnectionManager(object):
|
||||
messages_callbacks = {}
|
||||
requests_callbacks = {}
|
||||
|
||||
def __init__(self):
|
||||
self.selector = None
|
||||
self.socket = None
|
||||
self.addr = None
|
||||
|
||||
self.selector = None
|
||||
self.socket = None
|
||||
self.addr = None
|
||||
|
||||
self._recv_buffer = b""
|
||||
self._send_buffer = b""
|
||||
|
||||
self._send_queue = collections.deque()
|
||||
self._received_queue = collections.deque()
|
||||
self._request_queue = collections.OrderedDict()
|
||||
|
||||
self._send_lock = threading.Lock()
|
||||
self._request_lock = threading.Lock()
|
||||
|
||||
self.BUFFER_SIZE = 1024
|
||||
self.resume_queue = False
|
||||
|
||||
def _set_selector_events_mask(self, mode):
|
||||
"""Set selector to listen for events: mode is 'r', 'w', 'rw'."""
|
||||
if mode == "r":
|
||||
events = selectors.EVENT_READ
|
||||
elif mode == "w":
|
||||
events = selectors.EVENT_WRITE
|
||||
elif mode == "rw":
|
||||
events = selectors.EVENT_READ | selectors.EVENT_WRITE
|
||||
else:
|
||||
raise ValueError("Invalid events mask mode {}.".format(mode))
|
||||
self.selector.modify(self.socket, events, data=self)
|
||||
|
||||
def connect(self, client_selector, client_socket, client_addr):
|
||||
self.selector = client_selector
|
||||
self.socket = client_socket
|
||||
self.addr = client_addr
|
||||
|
||||
self._set_selector_events_mask('rw')
|
||||
|
||||
def close(self):
|
||||
logger.info("Closing connection to {}".format(self.addr))
|
||||
try:
|
||||
self.selector.unregister(self.socket)
|
||||
except AttributeError:
|
||||
pass
|
||||
except Exception as error:
|
||||
logger.error("{}: Error during selector unregistering: {}".format(self.addr, error))
|
||||
finally:
|
||||
self.selector = None
|
||||
|
||||
try:
|
||||
self.socket.close()
|
||||
except AttributeError:
|
||||
pass
|
||||
except OSError as error:
|
||||
logger.error("{}: Error during socket closing: {}".format(self.addr, error))
|
||||
finally:
|
||||
self.socket = None
|
||||
|
||||
def process_events(self, mask):
|
||||
if mask & selectors.EVENT_READ:
|
||||
self.read()
|
||||
if mask & selectors.EVENT_WRITE:
|
||||
self.write()
|
||||
|
||||
def read(self):
|
||||
self._read()
|
||||
while self._recv_buffer:
|
||||
if not self._received_queue or (self._received_queue[0].content is not None):
|
||||
self._received_queue.appendleft(MessageManager())
|
||||
|
||||
self._received_queue[0].income_raw += self._recv_buffer
|
||||
self._recv_buffer = b''
|
||||
self._received_queue[0].process_message()
|
||||
|
||||
# if something left after processing message - put it back
|
||||
if self._received_queue[0].content and self._received_queue[0].income_raw:
|
||||
self._recv_buffer = self._received_queue[0].income_raw + self._recv_buffer
|
||||
self._received_queue[0].income_raw = b''
|
||||
|
||||
if self._received_queue:
|
||||
if self._received_queue[-1].content:
|
||||
self.process_received(self._received_queue.popleft())
|
||||
|
||||
def _read(self):
|
||||
try:
|
||||
data = self.socket.recv(self.BUFFER_SIZE)
|
||||
except io.BlockingIOError:
|
||||
# Resource temporarily unavailable (errno EWOULDBLOCK)
|
||||
pass
|
||||
else:
|
||||
if data:
|
||||
self._recv_buffer += data
|
||||
logger.debug("Received {} from {}".format(data, self.addr))
|
||||
else:
|
||||
logger.warning("Connection to {} lost!".format(self.addr))
|
||||
if not self.resume_queue:
|
||||
self._recv_buffer = b''
|
||||
|
||||
raise RuntimeError("Peer closed.")
|
||||
|
||||
def process_received(self, income_message):
|
||||
message_type = income_message.jsonheader["message-type"]
|
||||
logger.debug("Received message! Header: {}, content: {}".format(
|
||||
income_message.jsonheader, income_message.content))
|
||||
|
||||
if message_type == "message":
|
||||
self._process_message(income_message)
|
||||
elif message_type == "response":
|
||||
self._process_response(income_message)
|
||||
elif message_type == "request":
|
||||
self._process_request(income_message)
|
||||
elif message_type == "filetransfer":
|
||||
self._process_filetransfer(income_message)
|
||||
|
||||
def _process_message(self, message):
|
||||
command = message.content["command"]
|
||||
args = message.content["args"]
|
||||
try:
|
||||
self.messages_callbacks[command](**args)
|
||||
except KeyError:
|
||||
logger.warning("Command {} does not exist!".format(command))
|
||||
except Exception as error:
|
||||
logger.error("Error during command {} execution: {}".format(command, error))
|
||||
|
||||
def _process_request(self, message):
|
||||
command = message.content["requested_value"]
|
||||
request_id = message.content["request_id"]
|
||||
args = message.content["args"]
|
||||
try:
|
||||
value = self.requests_callbacks[command](**args)
|
||||
except KeyError:
|
||||
logger.warning("Request {} does not exist!".format(command))
|
||||
except Exception as error: # TODO send response error\cancel
|
||||
logger.error("Error during request {} processing: {}".format(command, error))
|
||||
else:
|
||||
self._send_response(command, request_id, value)
|
||||
|
||||
def _process_response(self, message):
|
||||
request_id, requested_value = message.content["request_id"], message.content["requested_value"]
|
||||
with self._request_lock:
|
||||
for key, value in self._request_queue.items(): # TODO as try []
|
||||
if (key == request_id) and (value.requested_value == requested_value):
|
||||
request = self._request_queue.pop(key)
|
||||
value = message.content["value"]
|
||||
logger.debug(
|
||||
"Request {} successfully closed with value {}".format(request, message.content["value"])
|
||||
)
|
||||
|
||||
f = request.callback
|
||||
f(value, *request.callback_args, **request.callback_kwargs)
|
||||
|
||||
break
|
||||
else:
|
||||
logger.warning("Unexpected response!")
|
||||
|
||||
def _process_filetransfer(self, message): # TODO path?
|
||||
if message.jsonheader["content-type"] == "binary":
|
||||
filepath = message.jsonheader["filepath"]
|
||||
try:
|
||||
with open(filepath, 'wb') as f:
|
||||
f.write(message.content)
|
||||
except OSError as error:
|
||||
logger.error("File {} can not be written due error: {}".format(filepath, error))
|
||||
else:
|
||||
logger.info("File {} successfully received ".format(filepath))
|
||||
|
||||
def write(self):
|
||||
with self._send_lock:
|
||||
if (not self._send_buffer) and self._send_queue:
|
||||
message = self._send_queue.popleft()
|
||||
self._send_buffer += message
|
||||
if self._send_buffer:
|
||||
self._write()
|
||||
|
||||
def _write(self):
|
||||
try:
|
||||
sent = self.socket.send(self._send_buffer)
|
||||
except io.BlockingIOError:
|
||||
# Resource temporarily unavailable (errno EWOULDBLOCK)
|
||||
pass
|
||||
except Exception as error:
|
||||
logger.warning("Attempt to send message {} to {} failed due error: {}".format(
|
||||
self._send_buffer, self.addr, error))
|
||||
|
||||
if not self.resume_queue:
|
||||
self._send_buffer = b''
|
||||
|
||||
raise error
|
||||
else:
|
||||
logger.debug("Sent {} to {}".format(self._send_buffer[:sent], self.addr))
|
||||
self._send_buffer = self._send_buffer[sent:]
|
||||
|
||||
def _send(self, data):
|
||||
with self._send_lock:
|
||||
self._send_queue.append(data)
|
||||
|
||||
def get_response(self, requested_value, callback, request_args=None, # timeout=30,
|
||||
callback_args=(), callback_kwargs=None):
|
||||
if request_args is None:
|
||||
request_args = {}
|
||||
if callback_kwargs is None:
|
||||
callback_kwargs = {}
|
||||
|
||||
request_id = str(random.randint(0, 9999)).zfill(4)
|
||||
with self._request_lock:
|
||||
self._request_queue[request_id] = PendingRequest(
|
||||
requested_value=requested_value,
|
||||
value=None,
|
||||
# expires_on=Server.time_now()+timeout, #TODO
|
||||
callback=callback,
|
||||
callback_args=callback_args,
|
||||
callback_kwargs=callback_kwargs,
|
||||
)
|
||||
self._send(MessageManager.create_request(requested_value, request_id, request_args))
|
||||
|
||||
def send_message(self, command, args=None):
|
||||
self._send(MessageManager.create_simple_message(command, args))
|
||||
|
||||
def _send_response(self, requested_value, request_id, value):
|
||||
self._send(MessageManager.create_response(requested_value, request_id, value))
|
||||
|
||||
def send_file(self, filepath, dest_filepath): # clever_restart=False
|
||||
try:
|
||||
with open(filepath, 'rb') as f:
|
||||
data = f.read()
|
||||
except OSError as error:
|
||||
logger.warning("File can not be opened due error: ".format(error))
|
||||
else:
|
||||
logger.info("Sending file {} to {} (as: {})".format(filepath, self.addr, dest_filepath))
|
||||
self._send(MessageManager.create_message(
|
||||
data, "binary", "filetransfer", "binary", {"filepath": dest_filepath}
|
||||
))
|
||||
Reference in New Issue
Block a user