diff --git a/.gitignore b/.gitignore index 96305fc..2c2068e 100644 --- a/.gitignore +++ b/.gitignore @@ -109,3 +109,5 @@ Drone/test_animation/ Drone/animation.csv Drone/client_logs images/ + +\.idea/ diff --git a/Drone/FlightLib/FlightLib.py b/Drone/FlightLib/FlightLib.py index e2960b5..196bd28 100644 --- a/Drone/FlightLib/FlightLib.py +++ b/Drone/FlightLib/FlightLib.py @@ -11,7 +11,7 @@ from mavros_msgs.srv import SetMode from mavros_msgs.srv import CommandBool from std_srvs.srv import Trigger -module_logger = logging.getLogger("FlightLib") +logger = logging.getLogger(__name__) # create proxy service navigate = rospy.ServiceProxy('navigate', srv.Navigate) @@ -22,24 +22,36 @@ get_telemetry = rospy.ServiceProxy('get_telemetry', srv.GetTelemetry) arming = rospy.ServiceProxy('/mavros/cmd/arming', CommandBool) landing = rospy.ServiceProxy('/land', Trigger) -module_logger.info("Proxy services inited") +logger.info("Proxy services inited") # globals -FREQUENCY = 1000/25 # HZ +FREQUENCY = 40 # HZ TOLERANCE = 0.2 +SPEED = 1.0 +SPEED_TAKEOFF = 0.8 +TIMEOUT = 5.0 +TIMEOUT_ARMED = 2.0 +TIMEOUT_DESCEND = TIMEOUT +TIMEOUT_LAND = 8.0 +Z_DESCEND = 0.5 +Z_TAKEOFF = 1.0 +FRAME_ID = 'map' +INTERRUPTER = threading.Event() -interrupt_event = threading.Event() +checklist = [] +def arming_wrapper(state=False, interrupter=INTERRUPTER): + arming(state) def interrupt(): - module_logger.info("Performing function interrupt") - interrupt_event.set() + logger.info("Performing function interrupt") + INTERRUPTER.set() def init(node_name="CleverSwarmFlight", anon=True, no_signals=True): - module_logger.info("Initing ROS node") + logger.info("Initing ROS node") rospy.init_node(node_name, anonymous=anon, disable_signals=no_signals) - module_logger.info("Ros node inited") + logger.info("Ros node inited") def get_distance3d(x1, y1, z1, x2, y2, z2): @@ -50,250 +62,298 @@ def check(check_name): def inner(f): def wrapper(*args, **kwargs): failures = f(*args, **kwargs) - if failures: - msgs = [] - for failure in failures: - msg = "[{}]: Failure: {}".format(check_name, failure) - msgs.append(msg) - module_logger.warning(msg) + print(failures) + msgs = [] + for failure in failures: + print(failure) + msg = "[{}]: Failure: {}".format(check_name, failure) + msgs.append(msg) + logger.warning(msg) + + if msgs: return msgs else: - module_logger.info("[{}]: OK".format(check_name)) + logger.info("[{}]: OK".format(check_name)) return None + + checklist.append(wrapper) return wrapper + return inner +def _check_nans(*values): + return any(math.isnan(x) for x in values) + + +@check("FCU connection") +def check_connection(): + telemetry = get_telemetry() + if not telemetry.connected: + yield ("Flight controller is not connected!") + + @check("Linear velocity estimation") -def check_linear_speeds(): - failures = [] +def check_linear_speeds(speed_limit=0.1): telemetry = get_telemetry(frame_id='body') - speed_limit = 0.1 + + if _check_nans(telemetry.vx, telemetry.vy, telemetry.vz): + yield ("Velocity estimation is not available") + if telemetry.vx >= speed_limit: - failures.append("X velocity estimation: {:.3f} m/s".format(telemetry.vx)) + yield ("X velocity estimation: {:.3f} m/s".format(telemetry.vx)) if telemetry.vy >= speed_limit: - failures.append("Y velocity estimation: {:.3f} m/s".format(telemetry.vy)) + yield ("Y velocity estimation: {:.3f} m/s".format(telemetry.vy)) if telemetry.vz >= speed_limit: - failures.append("Z velocity estimation: {:.3f} m/s".format(telemetry.vz)) - return failures + yield ("Z velocity estimation: {:.3f} m/s".format(telemetry.vz)) @check("Angular velocity estimation") -def check_angular_speeds(): - failures = [] +def check_angular_speeds(rate_limit=0.05): telemetry = get_telemetry(frame_id='body') - rate_limit = 0.05 + + if _check_nans(telemetry.pitch_rate, telemetry.roll_rate, telemetry.yaw_rate): + yield ("Angular velocities estimation is not available") + if telemetry.pitch_rate >= rate_limit: - failures.append("Pitch rate estimation: {:.3f} rad/s".format(telemetry.pitch_rate)) + yield ("Pitch rate estimation: {:.3f} rad/s".format(telemetry.pitch_rate)) if telemetry.roll_rate >= rate_limit: - failures.append("Roll rate estimation: {:.3f} rad/s".format(telemetry.roll_rate)) + yield ("Roll rate estimation: {:.3f} rad/s".format(telemetry.roll_rate)) if telemetry.yaw_rate >= rate_limit: - failures.append("Yaw rate estimation: {:.3f} rad/s".format(telemetry.yaw_rate)) - return failures + yield ("Yaw rate estimation: {:.3f} rad/s".format(telemetry.yaw_rate)) @check("Angles estimation") -def check_angles(): - failures = [] +def check_angles(angle_limit=math.radians(5)): telemetry = get_telemetry(frame_id='body') - angle_limit = math.radians(5) + + if _check_nans(telemetry.pitch, telemetry.roll, telemetry.yaw): + yield ("Angular velocities estimation is not available") + if abs(telemetry.pitch) >= angle_limit: - failures.append("Pitch estimation: {:.3f} rad;{:.3f} degrees".format(telemetry.pitch, - math.degrees(telemetry.pitch))) + yield ("Pitch estimation: {:.3f} rad;{:.3f} degrees".format(telemetry.pitch, + math.degrees(telemetry.pitch))) if abs(telemetry.roll) >= angle_limit: - failures.append("Roll estimation: {:.3f} rad;{:.3f} degrees".format(telemetry.roll, - math.degrees(telemetry.roll))) + yield ("Roll estimation: {:.3f} rad;{:.3f} degrees".format(telemetry.roll, + math.degrees(telemetry.roll))) if abs(telemetry.yaw) >= angle_limit: - failures.append("Yaw estimation: {:.3f} rad;{:.3f} degrees".format(telemetry.yaw, - math.degrees(telemetry.yaw))) - return failures + yield ("Yaw estimation: {:.3f} rad;{:.3f} degrees".format(telemetry.yaw, + math.degrees(telemetry.yaw))) def selfcheck(): - msgs = [] - linear_speeds = check_linear_speeds() - angular_speeds = check_angular_speeds() - angles = check_angles() - if linear_speeds: - msgs.extend(linear_speeds) - if angular_speeds: - msgs.extend(angular_speeds) - if angles: - msgs.extend(angles) + checks = [] + for check_f in checklist: + msg = check_f() + checks += msg if msg else [] - return msgs + return checks -def navto(x, y, z, yaw=float('nan'), frame_id='aruco_map'): +def navto(x, y, z, yaw=float('nan'), frame_id=FRAME_ID, **kwargs): set_position(frame_id=frame_id, x=x, y=y, z=z, yaw=yaw) telemetry = get_telemetry(frame_id=frame_id) - module_logger.info('Going to: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(x, y, z, yaw)) - module_logger.info('Telemetry now: | z: {:.3f}'.format(telemetry.z)) + logger.info('Going to: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(x, y, z, yaw)) + print('Going to: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(x, y, z, yaw)) + logger.info('Telemetry now: | z: {:.3f}'.format(telemetry.z)) + print('Telemetry now: | z: {:.3f}'.format(telemetry.z)) return True -def reach_point(x=0.0, y=0.0, z=0.0, yaw=float('nan'), speed=1.0, tolerance=TOLERANCE, frame_id='aruco_map', - freq=FREQUENCY, timeout=5000, wait=False): - - module_logger.info('Reaching point: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(x, y, z, yaw)) +def reach_point(x=0.0, y=0.0, z=0.0, yaw=float('nan'), speed=SPEED, tolerance=TOLERANCE, frame_id=FRAME_ID, + freq=FREQUENCY, timeout=TIMEOUT, interrupter=INTERRUPTER, wait=False): + logger.info('Reaching point: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(x, y, z, yaw)) + print('Reaching point: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(x, y, z, yaw)) navigate(frame_id=frame_id, x=x, y=y, z=z, yaw=yaw, speed=speed) # waiting for completion telemetry = get_telemetry(frame_id=frame_id) rate = rospy.Rate(freq) - time_start = rospy.get_rostime() + time_start = time.time() while (get_distance3d(x, y, z, telemetry.x, telemetry.y, telemetry.z) > tolerance) or wait: - if interrupt_event.is_set(): - module_logger.warning("Flight function interrupted!") - interrupt_event.clear() - break + if interrupter.is_set(): + logger.warning("Reach point function interrupted!") + print("Reach point function interrupted!") + interrupter.clear() + return False telemetry = get_telemetry(frame_id=frame_id) - module_logger.info('Telemetry: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format( + logger.info('Telemetry: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format( telemetry.x, telemetry.y, telemetry.z, telemetry.yaw)) + print('Telemetry: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format( + telemetry.x, telemetry.y, telemetry.z, telemetry.yaw)) + logger.info('Current delta: | {:.3f}'.format( + get_distance3d(x, y, z, telemetry.x, telemetry.y, telemetry.z))) + print('Current delta: | {:.3f}'.format( + get_distance3d(x, y, z, telemetry.x, telemetry.y, telemetry.z))) - time_passed = (rospy.get_rostime() - time_start).to_sec() * 1000 + time_passed = time.time() - time_start if timeout is not None: if time_passed >= timeout: - module_logger.warning('Reaching point timed out! | time: {:3f} seconds'.format(time_passed / 1000)) + logger.warning('Reaching point timed out! | time: {:3f} seconds'.format(time_passed)) + print('Reaching point timed out! | time: {:3f} seconds'.format(time_passed)) return wait rate.sleep() - else: - module_logger.info("Point reached!") - return True + + logger.info("Point reached!") + print("Point reached!") + return True -def reach_attitude(z=0.0, yaw=float('nan'), speed=1.0, tolerance=TOLERANCE, frame_id='aruco_map', - freq=FREQUENCY, timeout=5000, wait=False): - - module_logger.info('Reaching attitude: | z: {:.3f} yaw: {:.3f}'.format(z, yaw)) +def reach_altitude(z=0.0, yaw=float('nan'), speed=SPEED, tolerance=TOLERANCE, frame_id=FRAME_ID, + freq=FREQUENCY, timeout=TIMEOUT, interrupter=INTERRUPTER, wait=False): + logger.info('Reaching attitude: | z: {:.3f} yaw: {:.3f}'.format(z, yaw)) + print('Reaching attitude: | z: {:.3f} yaw: {:.3f}'.format(z, yaw)) current_telem = get_telemetry(frame_id=frame_id) navigate(frame_id=frame_id, x=current_telem.x, y=current_telem.y, z=z, yaw=yaw, speed=speed) - # waiting for completion telemetry = get_telemetry(frame_id=frame_id) rate = rospy.Rate(freq) - time_start = rospy.get_rostime() + time_start = time.time() while (abs(z - telemetry.z) > tolerance) or wait: - if interrupt_event.is_set(): - module_logger.warning("Flight function interrupted!") - interrupt_event.clear() - break + if interrupter.is_set(): + logger.warning("Reach altitude function interrupted!") + print("Reach altitude function interrupted!") + interrupter.clear() + return telemetry = get_telemetry(frame_id=frame_id) - module_logger.info('Telemetry: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format( + logger.info('Telemetry: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format( + telemetry.x, telemetry.y, telemetry.z, telemetry.yaw)) + print('Telemetry: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format( telemetry.x, telemetry.y, telemetry.z, telemetry.yaw)) - time_passed = (rospy.get_rostime() - time_start).to_sec() * 1000 + time_passed = time.time() - time_start if timeout is not None: if time_passed >= timeout: - module_logger.warning('Reaching attitude timed out! | time: {:3f} seconds'.format(time_passed / 1000)) + logger.warning('Reaching attitude timed out! | time: {:3f} seconds'.format(time_passed)) + print('Reaching attitude timed out! | time: {:3f} seconds'.format(time_passed)) return wait rate.sleep() - else: - module_logger.info("Attitude reached!") - return True + + logger.info("Altitude reached!") + print("Altitude reached!") + return True -def land(descend=True, z=1.0, frame_id_descend="aruco_map", frame_id_land="aruco_map", - timeout_descend=5000, timeout_land=7500, freq=FREQUENCY): +def stop(frame_id='body', hold_speed=SPEED): + navigate(frame_id=frame_id, yaw=float('nan'), speed=hold_speed) + + +def land(descend=True, z=Z_DESCEND, frame_id_descend=FRAME_ID, frame_id_land=FRAME_ID, + timeout_descend=TIMEOUT_DESCEND, timeout_land=TIMEOUT_LAND, freq=FREQUENCY, interrupter=INTERRUPTER): if descend: - module_logger.info("Descending to: | z: {:.3f}".format(z)) - reach_attitude(z=z, frame_id=frame_id_descend, timeout=timeout_descend, freq=freq, yaw=1.57) #TODO yaw + logger.info("Descending to: | z: {:.3f}".format(z)) + print("Descending to: | z: {:.3f}".format(z)) + reach_altitude(z=z, frame_id=frame_id_descend, timeout=timeout_descend, freq=freq, yaw=float('nan'), # TODO yaw + interrupter=interrupter) landing() + print("Land is started") - telemetry = get_telemetry(frame_id='aruco_map') + telemetry = get_telemetry(frame_id=frame_id_land) rate = rospy.Rate(freq) - time_start = rospy.get_rostime() + time_start = time.time() while telemetry.armed: - if interrupt_event.is_set(): - module_logger.warning("Flight function interrupted!") - interrupt_event.clear() - break + if interrupter.is_set(): + logger.warning("Land function interrupted!") + print("Land function interrupted!") + interrupter.clear() + return False telemetry = get_telemetry(frame_id=frame_id_land) - module_logger.info("Landing...") - time_passed = (rospy.get_rostime() - time_start).to_sec() * 1000 + logger.info("Landing... | z: {}".format(telemetry.z)) + print("Landing... | z: {}".format(telemetry.z)) + time_passed = time.time() - time_start if timeout_land is not None: if time_passed >= timeout_land: - module_logger.warning('Landing timed out! | time: {:3f} seconds'.format(time_passed / 1000)) - module_logger.warning("Disarming!") + logger.warning('Landing timed out! | time: {:3f} seconds'.format(time_passed)) + logger.warning("Disarming!") + print("Landing timed out, disarming!!!") arming(False) return False rate.sleep() - else: - module_logger.info("Landing succeeded!") - return True + + logger.info("Landing succeeded!") + print("Landing succeeded!") + return True -def takeoff(z=1.0, speed=0.8, frame_id='body', freq=FREQUENCY, - timeout_arm=2000, timeout_takeoff=5000, wait=False, tolerance=TOLERANCE, emergency_land=False): - module_logger.info("Starting takeoff!") +def takeoff(z=Z_TAKEOFF, speed=SPEED_TAKEOFF, frame_id='body', freq=FREQUENCY, + timeout_arm=TIMEOUT_ARMED, timeout_takeoff=TIMEOUT, wait=False, tolerance=TOLERANCE, emergency_land=False, + interrupter=INTERRUPTER): + logger.info("Starting takeoff!") print("Starting takeoff!") - module_logger.info("Arming, going to OFFBOARD mode") + logger.info("Arming, going to OFFBOARD mode") # Arming check set_rates(thrust=0.1, auto_arm=True) telemetry = get_telemetry(frame_id=frame_id) rate = rospy.Rate(freq) - time_start = rospy.get_rostime() + time_start = time.time() while (not telemetry.armed) or wait: - if interrupt_event.is_set(): - module_logger.warning("Flight function interrupted!") - interrupt_event.clear() - return None + if interrupter.is_set(): + logger.warning("Takeoff function interrupted!") + print("Takeoff function interrupted!") + interrupter.clear() + return telemetry = get_telemetry(frame_id=frame_id) - module_logger.info("Arming...") - time_passed = (rospy.get_rostime() - time_start).to_sec() * 1000 + logger.info("Arming...") + print("Arming...") + time_passed = time.time() - time_start if timeout_arm is not None: if time_passed >= timeout_arm: if not telemetry.armed: - module_logger.warning('Arming timed out! | time: {:3f} seconds'.format(time_passed / 1000)) + logger.warning('Arming timed out! | time: {:3f} seconds'.format(time_passed)) + print('Arming timed out! | time: {:3f} seconds'.format(time_passed)) return False else: break rate.sleep() - module_logger.info("Armed!") - print("Armed!") - - # Reach height - telemetry = get_telemetry(frame_id=frame_id) - z0 = get_telemetry().z - navigate(z=z, speed=speed, frame_id=frame_id, auto_arm=True) - current_height = abs(get_telemetry().z - z0 - z) - while current_height > tolerance or wait: - if interrupt_event.is_set(): - module_logger.warning("Flight function interrupted!") - interrupt_event.clear() - return None + logger.info("Armed!") + print("Armed") - current_height = abs(get_telemetry().z - z0 - z) - module_logger.info("Takeoff...") - time_passed = (rospy.get_rostime() - time_start).to_sec() * 1000 + # Reach height + z0 = get_telemetry().z + z_dest = z + z0 + navigate(z=z, speed=speed, frame_id=frame_id, auto_arm=True) + current_diff = abs(get_telemetry().z - z_dest) + while (current_diff > tolerance) or wait: + if interrupter.is_set(): + logger.warning("Flight function interrupted!") + print("Flight function interrupted!") + interrupter.clear() + return + + current_diff = abs(get_telemetry().z - z_dest) + logger.info("Takeoff...") + print("Takeoff...") + time_passed = time.time() - time_start if timeout_takeoff is not None: if time_passed >= timeout_takeoff: if not wait: - module_logger.warning('Takeoff timed out! | time: {:3f} seconds'.format(time_passed / 1000)) + logger.warning('Takeoff timed out! | time: {:3f} seconds'.format(time_passed)) + print('Takeoff timed out! | time: {:3f} seconds'.format(time_passed)) if emergency_land: - module_logger.info("Preforming emergency land") - land(descend=False) + logger.info("Preforming emergency land") + print("Preforming emergency land") + land(descend=False, interrupter=interrupter) return False else: break rate.sleep() - module_logger.info("Takeoff succeeded!") + logger.info("Takeoff succeeded!") print("Takeoff succeeded!") return True diff --git a/Drone/FlightLib/LedLib.py b/Drone/FlightLib/LedLib.py index 4215bef..ef3b7a9 100644 --- a/Drone/FlightLib/LedLib.py +++ b/Drone/FlightLib/LedLib.py @@ -1,8 +1,8 @@ from __future__ import print_function -from threading import Thread +import threading import time from rpi_ws281x import * - +from tasking_lib import wait as wait_until # LED strip configuration: LED_COUNT = 29 # Number of LED pixels. LED_PIN = 21 # GPIO pin connected to the pixels (18 uses PWM!) (10 uses SPI /dev/spidev0.0). @@ -29,6 +29,15 @@ direct = False l = 0 wait_ms = 10 +INTERRUPTER = threading.Event() +INTERRUPTER_UNSET = threading.Event() + +def delay(delay_time, interrupter=INTERRUPTER, maxsleep=0.01): + global mode + wait_until(time.time()+delay_time, interrupter, maxsleep) + if interrupter.is_set(): + mode = "off" + # functions def math_wheel(pos): @@ -43,52 +52,57 @@ def math_wheel(pos): return Color(0, pos * 3, 255 - pos * 3) -def rainbow(wait=10, direction=False): - global wait_ms, direct, mode +def rainbow(wait=10, direction=False, interrupter=INTERRUPTER): + global wait_ms, direct, mode, INTERRUPTER wait_ms = wait direct = direction mode = "rainbow" + INTERRUPTER=interrupter -def fill(red, green, blue): - global r, g, b, mode +def fill(red, green, blue, interrupter=INTERRUPTER): + global r, g, b, mode, INTERRUPTER r = red g = green b = blue mode = "fill" + INTERRUPTER=interrupter -def blink(red, green, blue, wait=250): - global r, g, b, wait_ms, mode +def blink(red, green, blue, wait=250, interrupter=INTERRUPTER): + global r, g, b, wait_ms, mode, INTERRUPTER r = red g = green b = blue wait_ms = wait mode = "blink" + INTERRUPTER=interrupter -def chase(red, green, blue, wait=50, direction=False): - global r, g, b, wait_ms, direct, mode +def chase(red, green, blue, wait=50, direction=False, interrupter=INTERRUPTER): + global r, g, b, wait_ms, direct, mode, INTERRUPTER r = red g = green b = blue wait_ms = wait direct = direction mode = "chase" + INTERRUPTER=interrupter -def wipe_to(red, green, blue, wait=50, direction=False): - global r, g, b, wait_ms, direct, mode +def wipe_to(red, green, blue, wait=50, direction=False, interrupter=INTERRUPTER): + global r, g, b, wait_ms, direct, mode, INTERRUPTER r = red g = green b = blue wait_ms = wait direct = direction mode = "wipe_to" + INTERRUPTER=interrupter -def fade_to(red, green, blue, wait=20): # do not working with rainbow (solid colors only) - global r, g, b, r_prev, g_prev, b_prev, wait_ms, mode +def fade_to(red, green, blue, wait=20, interrupter=INTERRUPTER): # do not working with rainbow (solid colors only) + global r, g, b, r_prev, g_prev, b_prev, wait_ms, mode, INTERRUPTER r_prev = r g_prev = g b_prev = b @@ -97,10 +111,11 @@ def fade_to(red, green, blue, wait=20): # do not working with rainbow (solid co b = blue wait_ms = wait mode = "fade_to" + INTERRUPTER=interrupter -def run(red, green, blue, length=strip.numPixels(), direction=False, wait=25): - global r, g, b, l, wait_ms, direct, mode +def run(red, green, blue, length=strip.numPixels(), direction=False, wait=25, interrupter=INTERRUPTER): + global r, g, b, l, wait_ms, direct, mode, INTERRUPTER r = red g = green b = blue @@ -108,6 +123,7 @@ def run(red, green, blue, length=strip.numPixels(), direction=False, wait=25): direct = direction wait_ms = wait mode = "run" + INTERRUPTER=interrupter def off(): @@ -128,27 +144,29 @@ def strip_rainbow_frame(iteration, direction): strip.show() -def strip_chase_step(color, direction): +def strip_chase_step(color, direction, interrupter=INTERRUPTER): for q in range(3): for i in range(0, strip.numPixels(), 3): n = ((strip.numPixels() - 1) * direction) - (i + q) strip.setPixelColor(abs(n), color) strip.show() - time.sleep(wait_ms / 1000.0) + delay(wait_ms / 1000.0, interrupter) for i in range(0, strip.numPixels(), 3): n = ((strip.numPixels() - 1) * direction) - (i + q) strip.setPixelColor(abs(n), 0) -def strip_wipe(color, direction): +def strip_wipe(color, direction, interrupter=INTERRUPTER): for i in range(strip.numPixels()): n = ((strip.numPixels() - 1) * direction) - i strip.setPixelColor(abs(n), color) - time.sleep(wait_ms / 1000.0) + delay(wait_ms / 1000.0, interrupter) + if interrupter.is_set(): + return strip.show() -def strip_fade(r1, g1, b1, r2, g2, b2, frames=50): +def strip_fade(r1, g1, b1, r2, g2, b2, frames=50, interrupter=INTERRUPTER): r_delta = (r2-r1)//frames g_delta = (g2-g1)//frames b_delta = (b2-b1)//frames @@ -157,7 +175,9 @@ def strip_fade(r1, g1, b1, r2, g2, b2, frames=50): green = g1 + (g_delta * i) blue = b1 + (b_delta * i) strip_set(Color(red, green, blue)) - time.sleep(wait_ms / 1000.0) + delay(wait_ms / 1000.0, interrupter) + if interrupter.is_set(): + return strip_set(Color(r2, g2, b2)) @@ -190,39 +210,39 @@ def led_thread(): if iteration >= 256: iteration = 0 strip_rainbow_frame(iteration, direct) - time.sleep(wait_ms / 1000.0) + delay(wait_ms / 1000.0, INTERRUPTER) iteration += 1 elif mode == "fill": strip_set(Color(r, g, b)) mode = "" elif mode == "blink": strip_set(Color(r, g, b)) - time.sleep(wait_ms / 1000.0) + delay(wait_ms / 1000.0, INTERRUPTER) strip_set(Color(0, 0, 0)) - time.sleep(wait_ms / 1000.0) + delay(wait_ms / 1000.0, INTERRUPTER) elif mode == "chase": strip_chase_step(Color(r, g, b), direct) elif mode == "wipe_to": - strip_wipe(Color(r, g, b,), direct) + strip_wipe(Color(r, g, b,), direct, INTERRUPTER) mode = "fill" elif mode == "fade_to": - strip_fade(r_prev, g_prev, b_prev, r, g, b) + strip_fade(r_prev, g_prev, b_prev, r, g, b, interrupter=INTERRUPTER) mode = "" elif mode == "run": strip_run_step(r, g, b, l, direct, iteration) - time.sleep(wait_ms / 1000.0) + delay(wait_ms / 1000.0, INTERRUPTER) iteration += 1 elif mode == "off": strip_off() mode = "" else: - time.sleep(1 / 1000) + delay(1 / 1000.0, interrupter=INTERRUPTER_UNSET) # init def init_led(): strip.begin() - t_l = Thread(target=led_thread) + t_l = threading.Thread(target=led_thread) t_l.daemon = True t_l.start() diff --git a/Drone/animation_lib.py b/Drone/animation_lib.py new file mode 100644 index 0000000..9a9a2af --- /dev/null +++ b/Drone/animation_lib.py @@ -0,0 +1,91 @@ +import time +import csv +import rospy +import logging +import threading + +from FlightLib import FlightLib +from FlightLib import LedLib + +import tasking_lib as tasking + +logger = logging.getLogger(__name__) + +interrupt_event = threading.Event() + + +def load_animation(filepath="animation.csv", x0=0, y0=0, z0=0): + imported_frames = [] + try: + animation_file = open(filepath) + except IOError: + logging.error("File {} can't be opened".format(filepath)) + else: + with animation_file: + csv_reader = csv.reader( + animation_file, delimiter=',', quotechar='|' + ) + for row in csv_reader: + frame_number, x, y, z, yaw, red, green, blue = row + imported_frames.append({ + 'number': int(frame_number), + 'x': float(x) + x0, + 'y': float(y) + y0, + 'z': float(z) + z0, + 'yaw': float(yaw), + 'red': int(red), + 'green': int(green), + 'blue': int(blue), + }) + return imported_frames + + +def convert_frame(frame): + return (frame['x'], frame['y'], frame['z']), (frame["red"], frame["green"], frame["blue"]), frame["yaw"] + + +def execute_frame(point=(), color=(), yaw=float('Nan'), frame_id='aruco_map', use_leds=True, + flight_func=FlightLib.navto, flight_kwargs={}, interrupter=interrupt_event): + flight_func(*point, yaw=yaw, frame_id=frame_id, interrupter=interrupt_event, **flight_kwargs) + if use_leds: + if color: + LedLib.fill(*color) + + +def execute_animation(frames, frame_delay, frame_id='aruco_map', use_leds=True, flight_func=FlightLib.navto, + interrupter=interrupt_event): + next_frame_time = 0 + for frame in frames: + if interrupter.is_set(): + logger.warning("Animation playing function interrupted!") + interrupter.clear() + return + execute_frame(*convert_frame(frame), frame_id=frame_id, use_leds=use_leds, flight_func=flight_func, + interrupter=interrupter) + + next_frame_time += frame_delay + tasking.wait(next_frame_time, interrupter) + + +def takeoff(z=1.5, safe_takeoff=True, timeout=5000, frame_id='aruco_map', use_leds=True, + interrupter=interrupt_event): + print(interrupter.is_set()) + if use_leds: + LedLib.wipe_to(255, 0, 0, interrupter=interrupter) + if interrupter.is_set(): + return + FlightLib.takeoff(z=z, wait=False, timeout_takeoff=timeout, frame_id=frame_id, emergency_land=safe_takeoff, + interrupter=interrupter) + if interrupter.is_set(): + return + if use_leds: + LedLib.blink(0, 255, 0, wait=50, interrupter=interrupter) + + +def land(z=1.5, descend=False, timeout=5000, frame_id='aruco_map', use_leds=True, + interrupter=interrupt_event): + if use_leds: + LedLib.blink(255, 0, 0, interrupter=interrupter) + FlightLib.land(z=z, descend=descend, timeout_land=timeout, frame_id_land=frame_id, interrupter=interrupter) + if use_leds: + LedLib.off() diff --git a/Drone/client.py b/Drone/client.py index 920f089..3b8e9ea 100644 --- a/Drone/client.py +++ b/Drone/client.py @@ -1,358 +1,225 @@ -from __future__ import print_function -import os -import sys -import socket -import struct -import random import time import errno -import json +import random +import socket +import struct import logging -import threading +import collections import ConfigParser +import selectors2 as selectors + from contextlib import closing -import rospy -import pause +import os,sys,inspect # Add parent dir to PATH to import messaging_lib +current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) +parent_dir = os.path.dirname(current_dir) +sys.path.insert(0, parent_dir) -from FlightLib import FlightLib -from FlightLib import LedLib - -import play_animation - -random.seed() +import messaging_lib as messaging logging.basicConfig( # TODO all prints as logs - level=logging.INFO, + level=logging.DEBUG, # INFO format="%(asctime)s [%(name)-7.7s] [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s", handlers=[ logging.FileHandler("client_logs.log"), - logging.StreamHandler() + logging.StreamHandler(), ]) -NTP_PACKET_FORMAT = "!12I" -NTP_DELTA = 2208988800L # 1970-01-01 00:00:00 -NTP_QUERY = '\x1b' + 47 * '\0' +logger = logging.getLogger(__name__) +ConfigOption = collections.namedtuple("ConfigOption", ["section", "option", "value"]) -def get_ntp_time(ntp_host, ntp_port): - with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s: - s.sendto(NTP_QUERY, (ntp_host, ntp_port)) - msg, address = s.recvfrom(1024) - unpacked = struct.unpack(NTP_PACKET_FORMAT, msg[0:struct.calcsize(NTP_PACKET_FORMAT)]) - return unpacked[10] + float(unpacked[11]) / 2**32 - NTP_DELTA +active_client = None # maybe needs to be refactored +class Client(object): + def __init__(self, config_path="client_config.ini"): + self.selector = selectors.DefaultSelector() + self.client_socket = None -def reconnect(timeout=2, attempt_limit=5): - global clientSocket, host, port - print("Trying to connect to", host, ":", port, "...") - connected = False - attempt_count = 0 - while not connected: - print("Waiting for connection, attempt", attempt_count) + self.server_connection = messaging.ConnectionManager() + + self.server_host = None + self.server_port = None + self.broadcast_port = None + + self.connected = False + self.client_id = None + + # Init configs + self.config_path = config_path + self.config = ConfigParser.ConfigParser() + self.load_config() + + global active_client + active_client = self + + def load_config(self): + self.config.read(self.config_path) + + self.broadcast_port = self.config.getint('SERVER', 'broadcast_port') + self.server_port = self.config.getint('SERVER', 'port') + self.server_host = self.config.get('SERVER', 'host') + self.BUFFER_SIZE = self.config.getint('SERVER', 'buffer_size') + self.USE_NTP = self.config.getboolean('NTP', 'use_ntp') + self.NTP_HOST = self.config.get('NTP', 'host') + self.NTP_PORT = self.config.getint('NTP', 'port') + + self.files_directory = self.config.get('FILETRANSFER', 'files_directory') + + self.client_id = self.config.get('PRIVATE', 'id') + if self.client_id == 'default': + self.client_id = 'copter' + str(random.randrange(9999)).zfill(4) + self.write_config(False, 'PRIVATE', 'id', self.client_id) + elif self.client_id == '/hostname': + self.client_id = socket.gethostname() + + def rewrite_config(self): + with open(self.config_path, 'w') as file: + self.config.write(file) + + def write_config(self, reload_config=True, *config_options): + for config_option in config_options: + self.config.set(config_option.section, config_option.option, config_option.value) + self.rewrite_config() + + if reload_config: + self.load_config() + + @staticmethod + def get_ntp_time(ntp_host, ntp_port): + NTP_PACKET_FORMAT = "!12I" + NTP_DELTA = 2208988800L # 1970-01-01 00:00:00 + NTP_QUERY = '\x1b' + 47 * '\0' + + with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s: + s.sendto(bytes(NTP_QUERY), (ntp_host, ntp_port)) + msg, address = s.recvfrom(1024) + unpacked = struct.unpack(NTP_PACKET_FORMAT, msg[0:struct.calcsize(NTP_PACKET_FORMAT)]) + return unpacked[10] + float(unpacked[11]) / 2 ** 32 - NTP_DELTA + + def time_now(self): + if self.USE_NTP: + timenow = self.get_ntp_time(self.NTP_HOST, self.NTP_PORT) + else: + timenow = time.time() + return timenow + + def start(self): + logger.info("Starting client") try: - clientSocket = socket.socket() - clientSocket.settimeout(timeout) - clientSocket.connect((host, port)) - connected = True - print("Connection successful") - clientSocket.settimeout(None) - except socket.error as e: - if e.errno != errno.EINTR: - print("Waiting for connection, can not connect:", e) - time.sleep(timeout) - else: - print("Shutting down on keyboard interrupt") - raise KeyboardInterrupt - attempt_count += 1 - - if attempt_count >= attempt_limit: - print("Too many attempts. Trying to get new server IP") - broadcast_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - broadcast_client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - broadcast_client.bind(("", broadcast_port)) while True: - data, addr = broadcast_client.recvfrom(1024) - print("Received broadcast message %s from %s" % (data, addr)) - command, args = parse_message(data.decode("UTF-8")) - if command == "server_ip": - host, port = args["host"], int(args["port"]) - print("Binding to new IP: ", host, port) - broadcast_client.close() - write_to_config("SERVER", "port", port) - write_to_config("SERVER", "host", host) - attempt_count = 0 - break + self._reconnect() + self._process_connections() -def send_all(msg): - clientSocket.sendall(struct.pack('>I', len(msg)) + msg) + except (KeyboardInterrupt, ): + logger.critical("Caught interrupt, exiting!") + self.selector.close() + def _reconnect(self, timeout=2, attempt_limit=5): + logger.info("Trying to connect to {}:{} ...".format(self.server_host, self.server_port)) + attempt_count = 0 + while not self.connected: + logger.info("Waiting for connection, attempt {}".format(attempt_count)) + try: + self.client_socket = socket.socket() + self.client_socket.settimeout(timeout) + self.client_socket.connect((self.server_host, self.server_port)) + except socket.error as error: + if isinstance(error, OSError): + if error.errno == errno.EINTR: + logger.critical("Shutting down on keyboard interrupt") + raise KeyboardInterrupt -def recive_all(n): - data = b'' - while len(data) < n: - packet = clientSocket.recv(min(n - len(data), BUFFER_SIZE)) - print("Receiving packet {}; full data is {}".format(packet, data)) - if not packet: - return None - data += packet - return data + logger.warning("Can not connect due error: {}".format(error)) + attempt_count += 1 + time.sleep(timeout) - -def recive_message(): - raw_msglen = recive_all(4) - if not raw_msglen: - print("No valid msg") - return None - msglen = struct.unpack('>I', raw_msglen)[0] - msg = recive_all(msglen) - return msg - - -def form_message(str_command, dict_arguments): - msg_dict = {str_command: dict_arguments} - msg = json.dumps(msg_dict) - return msg - - -def parse_message(msg): - try: - j_message = json.loads(msg) - except ValueError: - print("Json string not in correct format") - return None, None - str_command = list(j_message.keys())[0] - dict_arguments = list(j_message.values())[0] - - return str_command, dict_arguments - - -def recive_file(filename): - print("Receiving file:", filename) - with open(filename, 'wb') as file: # TODO add directory - while True: - data = recive_message() #clientSocket.recv(BUFFER_SIZE) - if data: - print(data) - if parse_message(data.decode("UTF-8"))[0] == "/endoffile": - print("File received") - break - file.write(data) else: + logger.info("Connection to server successful!") + self._connect() break - -def animation_player(running_event, stop_event): - print("Animation thread activated") - frames = play_animation.read_animation_file() - if not frames: - logging.error("Animation is empty, shutting down animation player") - return - - delay_time = 0.125 - - print("Takeoff") - play_animation.takeoff(z=TAKEOFF_HEIGHT, safe_takeoff=SAFE_TAKEOFF) - takeoff_time = starttime + TAKEOFF_TIME - dt = takeoff_time - time.time() - print("Wait until takeoff " + str(dt) + "s: " + time.ctime(takeoff_time)) - pause.until(takeoff_time) - - print("Reach first point") - play_animation.reach_frame(frames[0], x0=X0+X0_COMMON, y0=Y0+Y0_COMMON) #Reach first point at the same time with others - rfp_time = takeoff_time + RFP_TIME - dt = rfp_time - time.time() - print("Wait reaching first point " + str(dt) + "s: " + time.ctime(rfp_time)) - pause.until(rfp_time) - - next_frame_time = rfp_time - print("Start animation at " + str(time.time())) - for frame in frames: - #running_event.wait() - play_animation.animate_frame(frame, x0=X0+X0_COMMON, y0=Y0+Y0_COMMON) - next_frame_time += delay_time - if stop_event.is_set(): - running_animation_event.clear() - break - #rate.sleep() - pause.until(next_frame_time) - else: - play_animation.land() - print("Animation ended") - print("Animation thread closed") + if attempt_count >= attempt_limit: + logger.info("Too many attempts. Trying to get new server IP") + self.broadcast_bind() + attempt_count = 0 -stop_animation_event = threading.Event() -running_animation_event = threading.Event() + def _connect(self): + self.connected = True + self.client_socket.setblocking(False) + events = selectors.EVENT_READ | selectors.EVENT_WRITE + self.selector.register(self.client_socket, events, data=self.server_connection) + self.server_connection.connect(self.selector, self.client_socket, (self.server_host, self.server_port)) + self._process_connections() -def start_animation(*args, **kwargs): - animation_thread = threading.Thread(target=animation_player, args=(running_animation_event, stop_animation_event)) - print("Starting animation!") - running_animation_event.set() - stop_animation_event.clear() - animation_thread.start() - - -def resume_animation(): - print("Resuming animation") - running_animation_event.set() - - -def pause_animation(): - print("Pausing animation") - running_animation_event.clear() - - -def stop_animation(): - stop_animation_event.set() - print("Stopping animation") -# animation_thread.join() - - -def selfcheck(): - return FlightLib.selfcheck() - - -def write_to_config(section, option, value): - config.set(section, option, value) - with open(CONFIG_PATH, 'w') as file: # TODO as separate function - config.write(file) - - -def load_config(): - global config, CONFIG_PATH - global broadcast_port, port, host, BUFFER_SIZE - global USE_NTP, NTP_HOST, NTP_PORT - global files_directory, animation_file - global FRAME_ID, TAKEOFF_HEIGHT, TAKEOFF_TIME, SAFE_TAKEOFF, RFP_TIME - global USE_LEDS, COPTER_ID - global X0, X0_COMMON, Y0, Y0_COMMON - CONFIG_PATH = "client_config.ini" - config = ConfigParser.ConfigParser() - config.read(CONFIG_PATH) - - broadcast_port = config.getint('SERVER', 'broadcast_port') - port = config.getint('SERVER', 'port') - host = config.get('SERVER', 'host') - BUFFER_SIZE = config.getint('SERVER', 'buffer_size') - USE_NTP = config.getboolean('NTP', 'use_ntp') - NTP_HOST = config.get('NTP', 'host') - NTP_PORT = config.getint('NTP', 'port') - - files_directory = config.get('FILETRANSFER', 'files_directory') - animation_file = config.get('FILETRANSFER', 'animation_file') - - FRAME_ID = config.get('COPTERS', 'frame_id') # TODO in play_animation - TAKEOFF_HEIGHT = config.getfloat('COPTERS', 'takeoff_height') - TAKEOFF_TIME = config.getfloat('COPTERS', 'takeoff_time') - RFP_TIME = config.getfloat('COPTERS', 'reach_first_point_time') - SAFE_TAKEOFF = config.getboolean('COPTERS', 'safe_takeoff') - - X0_COMMON = config.getfloat('COPTERS', 'x0_common') - Y0_COMMON = config.getfloat('COPTERS', 'y0_common') - X0 = config.getfloat('PRIVATE', 'x0') - Y0 = config.getfloat('PRIVATE', 'y0') - - USE_LEDS = config.getboolean('PRIVATE', 'use_leds') - play_animation.USE_LEDS = USE_LEDS - - COPTER_ID = config.get('PRIVATE', 'id') - if COPTER_ID == 'default': - COPTER_ID = 'copter' + str(random.randrange(9999)).zfill(4) - write_to_config('PRIVATE', 'id', COPTER_ID) - elif COPTER_ID == '/hostname': - COPTER_ID = socket.gethostname() - -load_config() - -rospy.init_node('Swarm_client', anonymous=True) -if USE_LEDS: - LedLib.init_led() - -print("Client started on copter:", COPTER_ID) -if USE_NTP: - print("NTP time:", time.ctime(get_ntp_time(NTP_HOST, NTP_PORT))) -print("System time", time.ctime(time.time())) - -reconnect() - -print("Connected to server") - -try: - while True: + def broadcast_bind(self): + broadcast_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + broadcast_client.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + broadcast_client.bind(("", self.broadcast_port)) try: - message = recive_message() - if message: - message = message.decode("UTF-8") - command, args = parse_message(message) - print("Command from server:", command, args) - if command == "writefile": - recive_file(args['filename']) - if bool(args['clever_restart']): - os.system("systemctl restart clever") - elif command == 'config_write': - write_to_config(args['section'], args['option'], args['value']) - elif command == 'config_reload': - load_config() - elif command == "starttime": - global starttime - starttime = float(args['time']) - print("Starting on:", time.ctime(starttime)) - dt = starttime - time.time() - if USE_NTP: - dt = starttime - get_ntp_time(NTP_HOST, NTP_PORT) - print("Until start:", dt) - rospy.Timer(rospy.Duration(dt), start_animation, oneshot=True) - elif command == 'takeoff': - play_animation.takeoff(safe_takeoff=SAFE_TAKEOFF) - elif command == 'pause': - pause_animation() - elif command == 'resume': - resume_animation() - elif command == 'stop': - stop_animation() - FlightLib.interrupt() - elif command == 'land': - play_animation.land() - elif command == 'disarm': - FlightLib.arming(False) - elif command == 'led_test': - LedLib.fill(255, 255, 255) - time.sleep(2) - LedLib.off() + while True: + data, addr = broadcast_client.recvfrom(self.BUFFER_SIZE) + message = messaging.MessageManager() + message.income_raw = data + message.process_message() + if message.content: + logger.info("Received broadcast message {} from {}".format(message.content, addr)) + if message.content["command"] == "server_ip": + args = message.content["args"] + self.server_port = int(args["port"]) + self.server_host = args["host"] + self.write_config(False, + ConfigOption("SERVER", "port", self.server_port), + ConfigOption("SERVER", "host", self.server_host)) + logger.info("Binding to new IP: {}:{}".format(self.server_host, self.server_port)) + break + finally: + broadcast_client.close() - elif command == 'request': - request_target = args['value'] - print("Got request for:", request_target) - response = "" - if request_target == 'test': - response = "test_success" - elif request_target == 'id': - response = COPTER_ID - elif request_target == 'selfcheck': - check = FlightLib.selfcheck() - response = check if check else "OK" - elif request_target == 'batt_voltage': - response = FlightLib.get_telemetry('body').voltage - elif request_target == 'cell_voltage': - response = FlightLib.get_telemetry('body').cell_voltage + def _process_connections(self): + while True: + events = self.selector.select(timeout=1) + if events: + for key, mask in events: + if key.data is None: + pass + else: + connection = key.data + try: + connection.process_events(mask) - send_all(bytes(form_message("response", - {"status": "ok", "value": response, "value_name": str(request_target)}))) - print("Request responded with:", response) + except Exception as error: + logger.error( + "Exception {} occurred for {}! Resetting connection!".format(error, connection.addr) + ) + self.server_connection.close() + self.connected = False - except socket.error as e: - if e.errno != errno.EINTR: - print("Connection lost due error:", e) - print("Reconnecting...") - reconnect() - print("Re-connection successful") - else: - print("Interrupted") - raise KeyboardInterrupt -except KeyboardInterrupt: - print("Shutdown on keyboard interrupt") -finally: - clientSocket.close() + if isinstance(error, OSError): + if error.errno == errno.EINTR: + raise KeyboardInterrupt + if not self.selector.get_map(): + logger.warning("No active connections left!") + return + + +@messaging.request_callback("id") +def _response_id(): + return active_client.client_id + +@messaging.request_callback("time") +def _response_time(): + return active_client.time_now() + +@messaging.message_callback("config_write") +def _command_config_write(*args, **kwargs): + options = [ConfigOption(**raw_option) for raw_option in kwargs["options"]] + logger.info("Writing config options: {}".format(options)) + active_client.write_config(kwargs["reload"], *options) + +if __name__ == "__main__": + client = Client() + client.start() diff --git a/Drone/client_config.ini b/Drone/client_config.ini index 1a63ad8..2fe7540 100644 --- a/Drone/client_config.ini +++ b/Drone/client_config.ini @@ -1,7 +1,7 @@ [SERVER] port = 25000 broadcast_port = 8181 -host = 192.168.1.104 +host = 192.168.43.168 buffer_size = 1024 [FILETRANSFER] @@ -27,3 +27,4 @@ id = /hostname use_leds = True x0 = 0 y0 = 0 + diff --git a/Drone/copter_client.py b/Drone/copter_client.py new file mode 100644 index 0000000..ae46194 --- /dev/null +++ b/Drone/copter_client.py @@ -0,0 +1,192 @@ +import os +import time +import rospy +import logging + +from FlightLib import FlightLib +from FlightLib import LedLib + +import client + +import messaging_lib as messaging +import tasking_lib as tasking +import animation_lib as animation + +import ros_logging + +class CopterClient(client.Client): + def load_config(self): + super(CopterClient, self).load_config() + self.FRAME_ID = self.config.get('COPTERS', 'frame_id') + self.TAKEOFF_HEIGHT = self.config.getfloat('COPTERS', 'takeoff_height') + self.TAKEOFF_TIME = self.config.getfloat('COPTERS', 'takeoff_time') + self.SAFE_TAKEOFF = self.config.getboolean('COPTERS', 'safe_takeoff') + self.RFP_TIME = self.config.getfloat('COPTERS', 'reach_first_point_time') + + self.X0_COMMON = self.config.getfloat('COPTERS', 'x0_common') + self.Y0_COMMON = self.config.getfloat('COPTERS', 'y0_common') + self.X0 = self.config.getfloat('PRIVATE', 'x0') + self.Y0 = self.config.getfloat('PRIVATE', 'y0') + + self.USE_LEDS = self.config.getboolean('PRIVATE', 'use_leds') + + def start(self, task_manager_instance): + client.logger.info("Init ROS node") + rospy.init_node('Swarm_client', anonymous=True, log_level=rospy.DEBUG) + if self.USE_LEDS: + LedLib.init_led() + + task_manager_instance.start() + + super(CopterClient, self).start() + + +@messaging.request_callback("selfcheck") +def _response_selfcheck(): + check = FlightLib.selfcheck() + return check if check else "OK" + + +@messaging.request_callback("batt_voltage") +def _response_batt(): + return FlightLib.get_telemetry('body').voltage + + +@messaging.request_callback("cell_voltage") +def _response_cell(): + return FlightLib.get_telemetry('body').cell_voltage + + +@messaging.message_callback("service_restart") +def _command_service_restart(**kwargs): + os.system("systemctl restart" + kwargs["name"]) + + +@messaging.message_callback("led_test") +def _command_led_test(*args, **kwargs): + LedLib.chase(255, 255, 255) + time.sleep(2) + LedLib.off() + + +@messaging.message_callback("takeoff") +def _command_takeoff(**kwargs): + task_manager.add_task(time.time(), 0, animation.takeoff, + task_kwargs={ + "z": client.active_client.TAKEOFF_HEIGHT, + "timeout": client.active_client.TAKEOFF_TIME, + "safe_takeoff": client.active_client.SAFE_TAKEOFF, + "use_leds": client.active_client.USE_LEDS, + } + ) + + +@messaging.message_callback("land") +def _command_land(**kwargs): + task_manager.reset() + task_manager.add_task(0, 0, animation.land, + task_kwargs={ + "z": client.active_client.TAKEOFF_HEIGHT, + "timeout": client.active_client.TAKEOFF_TIME, + "frame_id": client.active_client.FRAME_ID, + "use_leds": client.active_client.USE_LEDS, + } + ) + + +@messaging.message_callback("disarm") +def _command_disarm(**kwargs): + task_manager.reset() + task_manager.add_task(-5, 0, FlightLib.arming_wrapper, + task_kwargs={ + "state": False + } + ) + + +@messaging.message_callback("stop") +def _command_stop(**kwargs): + task_manager.stop() + + +@messaging.message_callback("pause") +def _command_stop(**kwargs): + task_manager.pause() + + +@messaging.message_callback("resume") +def _command_stop(**kwargs): + task_manager.resume() + + +@messaging.message_callback("start") +def _play_animation(**kwargs): + gap = 0.25 + start_time = kwargs["time"] # TODO +""" print('start time = {}'.format(start_time)) + frames = animation.load_animation(os.path.abspath("animation.csv"), + x0=client.active_client.X0 + client.active_client.X0_COMMON, + y0=client.active_client.Y0 + client.active_client.Y0_COMMON, + ) + + task_manager.add_task(start_time, -1, animation.takeoff, + task_kwargs={ + "z": client.active_client.TAKEOFF_HEIGHT, + "timeout": client.active_client.TAKEOFF_TIME, + "safe_takeoff": client.active_client.SAFE_TAKEOFF, + "frame_id": client.active_client.FRAME_ID, + "use_leds": client.active_client.USE_LEDS, + } + ) + + rfp_time = start_time + client.active_client.TAKEOFF_TIME + gap + task_manager.add_task(rfp_time, -1, animation.execute_frame, + task_kwargs={ + "point": animation.convert_frame(frames[0]), + "timeout": client.active_client.RFP_TIME, + "frame_id": client.active_client.FRAME_ID, + "use_leds": client.active_client.USE_LEDS, + "flight_func": FlightLib.reach_point, + } + ) + + animation_time = rfp_time + client.active_client.RFP_TIME + gap + frame_delay = 0.125 # TODO from animation file + task_manager.add_task(animation_time, -1, animation.execute_animation, + task_kwargs={ + "frames": frames, + "frame_delay": frame_delay, + "frame_id": client.active_client.FRAME_ID, + "use_leds": client.active_client.USE_LEDS, + } + ) + + land_time = animation_time + len(frames)*frame_delay + gap + task_manager.add_task(land_time, -1, animation.land, + task_kwargs={ + "z": client.active_client.TAKEOFF_HEIGHT, + "timeout": client.active_client.TAKEOFF_TIME, + "frame_id": client.active_client.FRAME_ID, + "use_leds": client.active_client.USE_LEDS, + }, + ) """ + + +if __name__ == "__main__": + # rospy.init_node('Swarm_client', anonymous=True) + copter_client = CopterClient() + task_manager = tasking.TaskManager() + + #print logging.root.manager.loggerDict + #task_manager.start() + copter_client.start(task_manager) + loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict] + if loggers is None: + print("No loggers!") + else: + print("Loggers",loggers) + ros_logging.route_logger_to_ros() + ros_logging.route_logger_to_ros("__main__") + ros_logging.route_logger_to_ros("client") + ros_logging.route_logger_to_ros("messaging") + diff --git a/Drone/play_animation.py b/Drone/play_animation.py index 5bd9a2e..cf0cf0b 100644 --- a/Drone/play_animation.py +++ b/Drone/play_animation.py @@ -5,20 +5,19 @@ import logging from FlightLib import FlightLib from FlightLib import LedLib -module_logger = logging.getLogger("Animation player") +logger = logging.getLogger(__name__) animation_file_path = 'animation.csv' USE_LEDS = True +FRAME_ID = 'aruco_map' def takeoff(z=1.5, safe_takeoff=True, timeout=5000): if USE_LEDS: LedLib.wipe_to(255, 0, 0) - if safe_takeoff: - FlightLib.takeoff(z=z, wait=True, timeout_takeoff = timeout, emergency_land=True) # TODO dont forget change back to takeoff - else: - FlightLib.takeoff(z=z, wait=True, emergency_land=False) + FlightLib.takeoff(z=z, wait=True, timeout_takeoff=timeout, emergency_land=safe_takeoff,) + LedLib.blink(0, 255, 0, wait=50) def land(descend=False): @@ -29,14 +28,16 @@ def land(descend=False): LedLib.off() -def animate_frame(current_frame, x0=0.0, y0=0.0): - FlightLib.navto(current_frame['x']+x0, current_frame['y']+y0, current_frame['z'], yaw=1.57) # TODO yaw +def animate_frame(current_frame, x0=0.0, y0=0.0, copter_frame_id=FRAME_ID): + FlightLib.navto(current_frame['x']+x0, current_frame['y']+y0, current_frame['z'], + yaw=1.57, frame_id=copter_frame_id) # TODO yaw if USE_LEDS: LedLib.fill(current_frame['red'], current_frame['green'], current_frame['blue']) -def reach_frame(current_frame, x0=0.0, y0=0.0, timeout=5000): - FlightLib.reach_point(current_frame['x']+x0, current_frame['y']+y0, current_frame['z'], yaw=1.57, timeout=timeout) # TODO yaw +def reach_frame(current_frame, x0=0.0, y0=0.0, timeout=5000, copter_frame_id=FRAME_ID): + FlightLib.reach_point(current_frame['x']+x0, current_frame['y']+y0, current_frame['z'], + yaw=1.57, timeout=timeout, frame_id=copter_frame_id) # TODO yaw if USE_LEDS: LedLib.fill(current_frame['red'], current_frame['green'], current_frame['blue']) diff --git a/Drone/ros_logging.py b/Drone/ros_logging.py new file mode 100644 index 0000000..c395f3c --- /dev/null +++ b/Drone/ros_logging.py @@ -0,0 +1,29 @@ +import logging +import rospy + + +class RosHandler(logging.Handler): + + level_map = { + logging.DEBUG: rospy.logdebug, + logging.INFO: rospy.loginfo, + logging.WARNING: rospy.logwarn, + logging.ERROR: rospy.logerr, + logging.CRITICAL: rospy.logfatal + } + + def emit(self, record): + print(record.levelno, record.name, record.msg) + if "rosout" not in record.msg: + try: + pass + #self.level_map[record.levelno]("%s: %s" % (record.name, record.msg)) + except KeyError: + rospy.logerr("unknown log level %s LOG: %s: %s" % (record.levelno, record.name, record.msg)) + + +def route_logger_to_ros(logger_name=None): + if logger_name is not None: + logging.getLogger(logger_name).addHandler(RosHandler()) + else: + logging.getLogger().addHandler(RosHandler()) diff --git a/Drone/tasking_lib.py b/Drone/tasking_lib.py new file mode 100644 index 0000000..42bfbf4 --- /dev/null +++ b/Drone/tasking_lib.py @@ -0,0 +1,193 @@ +import heapq +import time +import logging +import threading +import collections +import itertools + +logger = logging.getLogger(__name__) +Task = collections.namedtuple("Task", ["func", "args", "kwargs", "delayable", ]) + +INTERRUPTER = threading.Event() + +def wait(end, interrupter=INTERRUPTER, maxsleep=0.1): # Added features to interrupter sleep and set max sleeping interval + + while not interrupter.is_set(): # Basic implementation of pause module until() + now = time.time() + diff = min(end - now, maxsleep) + if diff <= 0: + break + else: + time.sleep(diff / 2) + else: + logger.warning("Waiting was interrupted!") + print("Waiting was interrupted!") + + +class TaskManager(object): + def __init__(self): + self.task_queue = [] + self._counter = itertools.count() # unique sequence count + + self._processor_thread = threading.Thread(target=self._task_processor, name="Task processing thread") + self._processor_thread.daemon = True + self._task_queue_lock = threading.RLock() + + self._running_event = threading.Event() + self._reset_event = threading.Event() + self._wait_interrupt_event = threading.Event() + self._task_interrupt_event = threading.Event() + self._shutdown_event = threading.Event() + + def add_task(self, timestamp, priority, task_function, + task_args=(), task_kwargs={}, task_delayable=False): + + self._wait_interrupt_event.set() + self._running_event.clear() + + task = Task(task_function, task_args, task_kwargs, task_delayable) + + count = next(self._counter) + entry = (timestamp, priority, count, task) + + with self._task_queue_lock: + if self.task_queue: + entry_old = self.task_queue[0] + else: + entry_old = entry + + heapq.heappush(self.task_queue, entry) + + if self.task_queue[0] != entry_old: + self._task_interrupt_event.set() + print("Task queue updated with more priority task") + + if self._reset_event.is_set(): + self._task_interrupt_event.set() + self._reset_event.clear() + print("Task queue updated after reset") + + self._wait_interrupt_event.clear() + self._running_event.set() + + print(self.task_queue) + + def pop_task(self): + with self._task_queue_lock: + if self.task_queue: + return heapq.heappop(self.task_queue) + raise KeyError('Pop from an empty priority queue') + + def start(self, timeouts=False): + print("Task manager is started") + logger.info("Task manager is started") + self._processor_thread.start() + self.resume() + + def stop(self): + self.pause(interrupt=True) + with self._task_queue_lock: + del self.task_queue[:] + print self.task_queue + + def shutdown(self): + self.stop() + self._shutdown_event.set() + self._wait_interrupt_event.set() + self._task_interrupt_event.set() + self._running_event.clear() + self._processor_thread.join(timeout=5) + + def pause(self, interrupt=True): + if interrupt: + self._wait_interrupt_event.set() + self._task_interrupt_event.set() + self._running_event.clear() + logger.info("Task queue paused") + print("Task queue paused") + + def resume(self): + self._running_event.set() + self._wait_interrupt_event.clear() + self._task_interrupt_event.clear() + logger.info("Task queue resumed") + print("Task queue resumed") + + def reset(self): + self.stop() + self.resume() + self._reset_event.set() + +# def interrupt(self): +# self._interrupt_event.set() +# while self._interrupt_event.is_set(): +# pass +# logger.info("Task execution successfully interrupted") + + def execute_task(self): + with self._task_queue_lock: + if self.task_queue: + start_time, priority, count, task = self.task_queue[0] + else: + return + + logger.info("Waiting util task execution time:{}".format(start_time)) + print("Waiting util task execution time:{}".format(start_time)) + wait(start_time, self._wait_interrupt_event) + + if not self._wait_interrupt_event.is_set(): + logger.info("Executing task {}".format(task)) + print("Executing task {}".format(task)) + try: + task.func(*task.args, interrupter=self._task_interrupt_event, **task.kwargs) + except Exception as e: + logger.error("Error '{}' occurred in task {}".format(e, task)) + print("Error '{}' occurred in task {}".format(e, task)) + else: + logger.warning("Task interrupted before execution") + print("Task interrupted before execution") + self._wait_interrupt_event.clear() + return + + if time.time() > start_time: + start_time_n, priority_n, count_n, task_n = self.task_queue[0] + if (task_n == task) and (start_time_n == start_time): + self.pop_task() + try: + print("Pop {} function!".format(task.func.__name__)) + except Exception as e: + print("Pop something!") + + if self._task_interrupt_event.is_set(): + self._task_interrupt_event.clear() + + logger.info("Execution done") + print("Execution done") + + def _task_processor(self): + logger.info("Tasking thread started") + # self._update_queue() # Initial tick if tasks added before start + while not self._shutdown_event.is_set(): + self._running_event.wait() + self.execute_task() + +if __name__ == "__main__": + logger.addHandler(logging.StreamHandler()) + logger.setLevel(logging.DEBUG) + + def printer(stri, interrupter, *args, **kwargs): + logger.info("String: {}, timenow: {}".format(stri, time.time())) + wait(time.time()+30, interrupter) + + tasker = TaskManager() # Lower priority first! + + tasker.start() + tasker.add_task(time.time(), 10, printer, ("Task1 ", )) + tasker.add_task(time.time()+10, 5, printer, ("Task2 ", )) + time.sleep(1) + tasker.add_task(time.time()+7, 1, printer, ("Task3", )) + time.sleep(3) + tasker.add_task(time.time()+7, 0, printer, ("Task4", )) + + while True: + pass diff --git a/Server/server.py b/Server/server.py index 69ed0d6..b1ee126 100644 --- a/Server/server.py +++ b/Server/server.py @@ -1,44 +1,42 @@ -import os import sys -import math import time -import json -import struct import socket import random import logging import threading +import selectors import collections import configparser +import os,sys,inspect # Add parent dir to PATH to import messaging_lib +current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) +parent_dir = os.path.dirname(current_dir) +sys.path.insert(0, parent_dir) +import messaging_lib as messaging + # All imports sorted in pyramid just because random.seed() logging.basicConfig( # TODO all prints as logs - level=logging.INFO, + level=logging.DEBUG, format="%(asctime)s [%(name)-7.7s] [%(threadName)-19.19s] [%(levelname)-7.7s] %(message)s", handlers=[ logging.FileHandler("server_logs.log"), logging.StreamHandler() ]) - -class ConfigOption: - def __init__(self, section, option, value): - self.section = section - self.option = option - self.value = value +ConfigOption = collections.namedtuple("ConfigOption", ["section", "option", "value"]) class Server: BUFFER_SIZE = 1024 def __init__(self, server_id=None, config_path="server_config.ini"): - self.id = server_id if server_id else str(random.randint(0, 9999)).zfill(4) # Init socket + self.sel = selectors.DefaultSelector() self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.host = socket.gethostname() @@ -47,13 +45,12 @@ class Server: # Init configs self.config_path = config_path self.config = configparser.ConfigParser() - self.config.read(self.config_path) self.load_config() # Init threads - self.autoconnect_thread = threading.Thread(target=self._auto_connect, daemon=True, - name='Client auto-connect') - self.autoconnect_thread_running = threading.Event() # Can be used for manual thread killing + self.autoconnect_thread = threading.Thread(target=self._client_processor, daemon=True, + name='Client processor') + self.client_processor_thread_running = threading.Event() # Can be used for manual thread killing self.broadcast_thread = threading.Thread(target=self._ip_broadcast, daemon=True, name='IP broadcast sender') @@ -64,7 +61,8 @@ class Server: self.listener_thread_running = threading.Event() def load_config(self): - self.port = int(self.config['SERVER']['port']) + self.config.read(self.config_path) + self.port = int(self.config['SERVER']['port']) # TODO try, init def self.broadcast_port = int(self.config['SERVER']['broadcast_port']) self.BROADCAST_DELAY = int(self.config['SERVER']['broadcast_delay']) Server.BUFFER_SIZE = int(self.config['SERVER']['buffer_size']) @@ -74,12 +72,12 @@ class Server: self.NTP_PORT = int(self.config['NTP']['port']) def start(self): # do_auto_connect=True, do_ip_broadcast=True, do_listen_broadcast=False - logging.info("Starting server with id: {} on {} !".format(self.id, self.ip)) + logging.info("Starting server with id: {} on {}:{} !".format(self.id, self.ip, self.port)) logging.info("Starting server socket!") self.server_socket.bind((self.ip, self.port)) - logging.info("Starting client autoconnect thread!") - self.autoconnect_thread_running.set() + logging.info("Starting client processor thread!") + self.client_processor_thread_running.set() self.autoconnect_thread.start() logging.info("Starting broadcast sender thread!") @@ -92,17 +90,22 @@ class Server: def stop(self): logging.info("Stopping server") - self.autoconnect_thread_running.clear() + self.client_processor_thread_running.clear() self.broadcast_thread_running.clear() self.listener_thread_running.clear() self.server_socket.close() + self.sel.close() logging.info("Server stopped") @staticmethod def get_ip_address(): - with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ip_socket: - ip_socket.connect(("8.8.8.8", 80)) - return ip_socket.getsockname()[0] + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as ip_socket: + ip_socket.connect(("8.8.8.8", 80)) + return ip_socket.getsockname()[0] + except OSError: + logging.warning("No network connection detected, starting on localhost") + return "localhost" @staticmethod def get_ntp_time(ntp_host, ntp_port): @@ -113,36 +116,66 @@ class Server: msg, _ = ntp_socket.recvfrom(1024) return int.from_bytes(msg[-8:], 'big') / 2 ** 32 - NTP_DELTA - def _auto_connect(self): - logging.info("Client autoconnect thread started!") - while self.autoconnect_thread_running.is_set(): - self.server_socket.listen(1) - c, addr = self.server_socket.accept() - logging.info("Got connection from: {}".format(str(addr))) - if not any(client_addr == addr[0] for client_addr in Client.clients.keys()): - client = Client(addr[0]) - logging.info("New client") - else: - logging.info("Reconnected client") - Client.clients[addr[0]].connect(c, addr) + def time_now(self): + if self.USE_NTP: + timenow = self.get_ntp_time(self.NTP_HOST, self.NTP_PORT) + else: + timenow = time.time() + return timenow + + # noinspection PyArgumentList + def _client_processor(self): + logging.info("Client processor (selector) thread started!") + self.server_socket.listen() + self.server_socket.setblocking(False) + self.sel.register(self.server_socket, selectors.EVENT_READ | selectors.EVENT_WRITE, data=None) + + while self.client_processor_thread_running.is_set(): + events = self.sel.select(timeout=None) + for key, mask in events: + if key.data is None: + self._connect_client(key.fileobj) + else: + client = key.data + try: + client.process_events(mask) + except Exception as error: + logging.error("Exception {} occurred for {}! Resetting connection!".format(error, client.addr)) + client.close() + logging.info("Client autoconnect thread stopped!") + def _connect_client(self, sock): + conn, addr = sock.accept() + logging.info("Got connection from: {}".format(str(addr))) + conn.setblocking(False) + + if not any(client_addr == addr[0] for client_addr in Client.clients.keys()): + client = Client(addr[0]) + logging.info("New client") + else: + client = Client.clients[addr[0]] + logging.info("Reconnected client") + self.sel.register(conn, selectors.EVENT_READ, data=client) + client.connect(self.sel, conn, addr) + def _ip_broadcast(self): logging.info("Broadcast sender thread started!") - msg = bytes(Client.form_message( - "server_ip", {"host": self.ip, "port": str(self.port), "id": self.id} - ), "UTF-8") + msg = messaging.MessageManager.create_simple_message( + "server_ip", {"host": self.ip, "port": str(self.port), "id": self.id}) broadcast_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) broadcast_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) logging.info("Formed broadcast message: {}".format(msg)) - while self.broadcast_thread_running.is_set(): - time.sleep(self.BROADCAST_DELAY) - broadcast_sock.sendto(msg, ('255.255.255.255', self.broadcast_port)) - logging.debug("Broadcast sent") - broadcast_sock.close() - logging.info("Broadcast sender thread stopped, socked closed!") + try: + while self.broadcast_thread_running.is_set(): + time.sleep(self.BROADCAST_DELAY) + broadcast_sock.sendto(msg, ('255.255.255.255', self.broadcast_port)) + logging.debug("Broadcast sent") + finally: + broadcast_sock.close() + logging.info("Broadcast sender thread stopped, socked closed!") def _broadcast_listen(self): logging.info("Broadcast listener thread started!") @@ -154,23 +187,27 @@ class Server: logging.critical("Another server is running on this computer, shutting down!") sys.exit() - while self.listener_thread_running.is_set(): - data, addr = broadcast_client.recvfrom(1024) - command, args = Client.parse_message(data.decode("UTF-8")) - if command == "server_ip": - if args["id"] != self.id: - logging.critical("Another server detected on network, shutting down") - sys.exit() - broadcast_client.close() - logging.info("Broadcast listener thread stopped, socked closed!") + try: + while self.listener_thread_running.is_set(): + data, addr = broadcast_client.recvfrom(1024) + message = messaging.MessageManager() + message.income_raw = data + message.process_message() + if message.content: + if message.content["command"] == "server_ip": + if message.content["args"]["id"] != self.id: + logging.critical("Another server detected on network, shutting down") + sys.exit() + else: + logging.warning("Got wrong broadcast message from {}".format(addr)) + finally: + broadcast_client.close() + logging.info("Broadcast listener thread stopped, socked closed!") def send_starttime(self, copter, dt=0): - if self.USE_NTP: - timenow = Server.get_ntp_time(self.NTP_HOST, self.NTP_PORT) - else: - timenow = time.time() + timenow = self.time_now() print('Now:', time.ctime(timenow), "+ dt =", dt) - copter.send(Client.form_message("starttime", {"time": str(timenow + dt)})) + copter.send_message("start", {"time": str(timenow + dt)}) def requires_connect(f): @@ -191,9 +228,7 @@ def requires_any_connected(f): return wrapper -class Client: - resume_quee = True - +class Client(messaging.ConnectionManager): clients = {} on_connect = None # Use as callback functions @@ -201,198 +236,72 @@ class Client: on_disconnect = None def __init__(self, ip): - self.socket = None - self.addr = None - - self._send_queue = collections.deque() - self._received_queue = collections.deque() - self._request_queue = collections.OrderedDict() - - self._send_lock = threading.Lock() - self._request_lock = threading.Lock() - + super(Client, self).__init__() self.copter_id = None - self.selected = False # Use to select copters for certain purposes DEPRECATED - - Client.clients[ip] = self - self.connected = False - def connect(self, client_socket, client_addr): - print("Client connected") - if not Client.resume_quee: + self.clients[ip] = self + + @staticmethod + def get_by_id(copter_id): + for client in Client.clients.values(): + if client.copter_id == copter_id: + return client + + def connect(self, client_selector, client_socket, client_addr): + logging.info("Client connected") + if not self.resume_queue: self._send_queue = collections.deque() - self.socket = client_socket - self.addr = client_addr + super(Client, self).connect(client_selector, client_socket, client_addr) - self.socket.setblocking(0) self.connected = True - client_thread = threading.Thread(target=self._run, name="Client {} thread".format(self.addr)) - client_thread.start() + if self.copter_id is None: - self.copter_id = self.get_response("id") - print("Got copter id:", self.copter_id) - if Client.on_first_connect: - Client.on_first_connect(self) + self.get_response("id", self._got_id) - if Client.on_connect: - Client.on_connect(self) + if self.on_connect: + self.on_connect(self) - def _send_all(self, msg): - self.socket.sendall(struct.pack('>I', len(msg)) + msg) + def _got_id(self, value): + logging.info("Got copter id: {} for client {}".format(value, self.addr)) + self.copter_id = value + if Client.on_first_connect: + Client.on_first_connect(self) - def _receive_all(self, n): - data = b'' - while len(data) < n: - packet = self.socket.recv(min(n - len(data), Server.BUFFER_SIZE)) - print("Receiving packet {}; full data is {}".format(packet, data)) - if not packet: - return None - data += packet - return data + def close(self): + self.connected = False - def _receive_message(self): - raw_msglen = self._receive_all(4) - if not raw_msglen: - print("No valid msg") - return None - msglen = struct.unpack('>I', raw_msglen)[0] - msg = self._receive_all(msglen) - return msg + if Client.on_disconnect: + Client.on_disconnect(self) - def _run(self): - while self.connected: - try: - if self._send_queue: - with self._send_lock: - msg = self._send_queue.popleft() - try: - self._send_all(msg) - print("Send", msg, "to", self.addr) - except socket.error as e: - logging.warning("Attempt to send failed: {}".format(e)) - with self._send_lock: - self._send_queue.appendleft(msg) - raise e - - try: # check if data in buffer - check = self.socket.recv(Server.BUFFER_SIZE, socket.MSG_PEEK) - if check: - received = self._receive_message() - if received: - received = received.decode("UTF-8") - print("Received", received, "from", self.addr) - command, args = Client.parse_message(received) - if command == "response": - with self._request_lock: - for key, value in self._request_queue.items(): - if not value and key == args["value_name"]: - self._request_queue[key] = args['value'] - print("Request successfully closed") - break - else: - print("Unexpected request") - else: - self._received_queue.appendleft(received) - except socket.error: - pass - - except socket.error as e: - logging.warning("Client error: {}, disconnected".format(e)) - self.connected = False - self.socket.close() - if Client.on_disconnect: - Client.on_disconnect(self) - break - # time.sleep(0.05) - - @staticmethod - def form_message(command: str, dict_arguments: dict = None): - if dict_arguments is None: - dict_arguments = {} - msg_dict = {command: dict_arguments} - msg = json.dumps(msg_dict) - return msg - - @staticmethod - def parse_message(msg): - try: - j_message = json.loads(msg) - except ValueError: - print("Json string not in correct format") - return None, None - str_command = list(j_message.keys())[0] - dict_arguments = list(j_message.values())[0] - - return str_command, dict_arguments + super(Client, self).close() @requires_connect - def send(self, *messages): - for message in messages: - with self._send_lock: - self._send_queue.append(bytes(message, "UTF-8")) + def _send(self, data): + super(Client, self)._send(data) + logging.debug("Queued data to send: {}".format(data)) - @requires_connect - def get_response(self, requested_value): - with self._request_lock: - self._request_queue[requested_value] = "" - self.send(Client.form_message("request", {"value": requested_value})) - - while not self._request_queue[requested_value]: - pass - - with self._request_lock: - return self._request_queue.pop(requested_value) - - @requires_connect - def send_file(self, filepath, dest_filename, clever_restart = False): - print("Sending file ", dest_filename) - chunk_count = math.ceil(os.path.getsize(filepath) / Server.BUFFER_SIZE) - self.send(Client.form_message("writefile", {"filesize": chunk_count, "filename": dest_filename, "clever_restart": clever_restart})) - with open(filepath, 'rb') as file: - chunk = file.read(Server.BUFFER_SIZE) - while chunk: - with self._send_lock: - self._send_queue.append(chunk) - chunk = file.read(Server.BUFFER_SIZE) - - self.send(Client.form_message("/endoffile")) # TODO mb remove - print("File sent") - - @staticmethod - @requires_any_connected - def send_to_selected(message): # DEPRECATED - for client in Client.clients.values(): - if client.connected and client.selected: - client.send(message) - - @staticmethod - @requires_any_connected - def request_to_selected(requested_value): # DEPRECATED - for client in Client.clients.values(): - if client.connected and client.selected: - client.get_response(requested_value) + def send_config_options(self, *options: ConfigOption, reload_config=True): + logging.info("Sending config options: {} to {}".format(options, self.addr)) + sending_options = [{'section': option.section, 'option': option.option, 'value': option.value} + for option in options] + print(sending_options) + self.send_message( + 'config_write', {"options": sending_options, "reload": reload_config} + ) @staticmethod @requires_any_connected def broadcast(message, force_all=False): for client in Client.clients.values(): if client.connected or force_all: - client.send(message) + client._send(message) - def send_config_options(self, *options: ConfigOption): - for option in options: - self.send( - Client.form_message('config_write', - {'section': option.section, 'option': option.option, 'value': option.value})) - self.send(Client.form_message("config_reload")) - - @staticmethod - def get_by_id(copter_id): - for copter in Client.clients.values(): - if copter.copter_id == copter_id: - return copter + @classmethod + @requires_any_connected + def broadcast_message(cls, command, args=None, force_all=False): + cls.broadcast(messaging.MessageManager.create_simple_message(command, args), force_all) if __name__ == '__main__': diff --git a/Server/server_qt.py b/Server/server_qt.py index 161a34b..20bd373 100644 --- a/Server/server_qt.py +++ b/Server/server_qt.py @@ -1,8 +1,8 @@ +import os import glob from PyQt5 import QtWidgets from PyQt5.QtGui import QStandardItemModel, QStandardItem -#from PyQt5.QtCore import QModelIndex from PyQt5.QtCore import Qt, pyqtSlot from PyQt5.QtWidgets import QFileDialog, QMessageBox @@ -11,19 +11,19 @@ from PyQt5.QtWidgets import QFileDialog, QMessageBox from server_gui import Ui_MainWindow from server import * -class CopterView(QStandardItemModel): - pass + +# noinspection PyArgumentList,PyCallByClass class MainWindow(QtWidgets.QMainWindow): def __init__(self): super(MainWindow, self).__init__() self.ui = Ui_MainWindow() self.ui.setupUi(self) - self.initUI() + self.init_ui() self.show() - def initUI(self): - #Connecting + def init_ui(self): + # Connecting self.ui.check_button.clicked.connect(self.check_selected) self.ui.start_button.clicked.connect(self.send_starttime) self.ui.pause_button.clicked.connect(self.pause_all) @@ -38,7 +38,7 @@ class MainWindow(QtWidgets.QMainWindow): self.ui.action_send_configurations.triggered.connect(self.send_configurations) self.ui.action_send_Aruco_map.triggered.connect(self.send_aruco) - #Initing table and table model + # Initing table and table model self.ui.tableView.setModel(model) self.ui.tableView.horizontalHeader().setStretchLastSection(True) @@ -49,24 +49,28 @@ class MainWindow(QtWidgets.QMainWindow): if item.isCheckable() and item.checkState() == Qt.Checked: print("Copter {} checked".format(model.item(row_num, 0).text())) copter = Client.get_by_id(item.text()) - batt_total = float(copter.get_response("batt_voltage")) - batt_cell = float(copter.get_response("cell_voltage")) - selfcheck = copter.get_response("selfcheck") - - batt_percent = ((batt_cell-3.2)/(4.2-3.2))*100 - - model.setData(model.index(row_num, 2), "{} V.".format(round(batt_total, 3))) - model.setData(model.index(row_num, 3), "{} %".format(round(batt_percent, 3))) - if selfcheck != "OK": - print(selfcheck) - model.setData(model.index(row_num, 4), str(selfcheck)) - else: - print("Everything ok") - model.setData(model.index(row_num, 4), str(selfcheck)) + copter.get_response("batt_voltage", self._set_copter_data, callback_args=(row_num, 2)) + copter.get_response("cell_voltage", self._set_copter_data, callback_args=(row_num, 3)) + copter.get_response("selfcheck", self._set_copter_data, callback_args=(row_num, 4)) + copter.get_response("time", self._set_copter_data, callback_args=(row_num, 5)) self.ui.start_button.setEnabled(True) self.ui.takeoff_button.setEnabled(True) + def _set_copter_data(self, value, row, col): + if col == 2: + model.setData(model.index(row, col), "{} V.".format(round(float(value), 3))) + elif col == 3: + batt_percent = ((float(value) - 3.2) / (4.2 - 3.2)) * 100 + model.setData(model.index(row, col), "{} %".format(round(batt_percent, 3))) + elif col == 4: + if value != "OK": + model.setData(model.index(row, col), str(value)) # TODO different handling + else: + model.setData(model.index(row, col), str(value)) + elif col == 5: + model.setData(model.index(row, col), time.ctime(int(value))) + @pyqtSlot() def send_starttime(self): dt = self.ui.start_delay_spin.value() @@ -88,15 +92,15 @@ class MainWindow(QtWidgets.QMainWindow): @pyqtSlot() def stop_all(self): - Client.broadcast(Client.form_message("stop")) + Client.broadcast_message("stop") @pyqtSlot() def pause_all(self): if self.ui.pause_button.text() == 'Pause': - Client.broadcast(Client.form_message('pause')) + Client.broadcast_message('pause') self.ui.pause_button.setText('Resume') else: - Client.broadcast(Client.form_message('resume')) + Client.broadcast_message('resume') self.ui.pause_button.setText('Pause') @pyqtSlot() @@ -106,7 +110,7 @@ class MainWindow(QtWidgets.QMainWindow): if item.isCheckable() and item.checkState() == Qt.Checked: if True: # TODO checks for batt/selfckeck here copter = Client.get_by_id(item.text()) - copter.send(Client.form_message("led_test")) + copter.send_message("led_test") @pyqtSlot() def takeoff_selected(self): @@ -122,17 +126,18 @@ class MainWindow(QtWidgets.QMainWindow): if item.isCheckable() and item.checkState() == Qt.Checked: if True: # TODO checks for batt/selfckeck here copter = Client.get_by_id(item.text()) - copter.send(Client.form_message("takeoff")) + copter.send_message("takeoff") else: print("Cancelled") + pass @pyqtSlot() def land_all(self): - Client.broadcast(Client.form_message("land")) + Client.broadcast_message("land") @pyqtSlot() def disarm_all(self): - Client.broadcast(Client.form_message("disarm")) + Client.broadcast_message("disarm") @pyqtSlot() def send_animations(self): @@ -163,7 +168,7 @@ class MainWindow(QtWidgets.QMainWindow): for section in sendable_config.sections(): for option in dict(sendable_config.items(section)): value = sendable_config[section][option] - print("Got item from config:", section, option, value) + logging.debug("Got item from config:".format(section, option, value)) options.append(ConfigOption(section, option, value)) for row_num in range(model.rowCount()): item = model.item(row_num, 0) @@ -181,7 +186,8 @@ class MainWindow(QtWidgets.QMainWindow): item = model.item(row_num, 0) if item.isCheckable() and item.checkState() == Qt.Checked: copter = Client.get_by_id(item.text()) - copter.send_file(path, "/home/pi/catkin_ws/src/clever/aruco_pose/map/animation_map.txt", clever_restart=True) + copter.send_file(path, "/home/pi/catkin_ws/src/clever/aruco_pose/map/animation_map.txt") + copter.send_message("service_restart", {"name": "clever"}) model = QStandardItemModel() diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/logging_lib.py b/logging_lib.py new file mode 100644 index 0000000..d53100a --- /dev/null +++ b/logging_lib.py @@ -0,0 +1,44 @@ +import logging + +try: + import rospy +except ImportError: + ros = False +else: + ros = True + + +class Logger: + def __init__(self, logger=logging.getLogger(), use_ros=False): + self.ros = True if use_ros and ros else False + self.logger = logger + + def info(self, msg): + self.logger.info(msg) + + if self.ros: + rospy.loginfo(msg) + + def debug(self, msg): + self.logger.debug(msg) + + if self.ros: + rospy.logdebug(msg) + + def warning(self, msg): + self.logger.warning(msg) + + if self.ros: + rospy.logwarn(msg) + + def error(self, msg): + self.logger.error(msg) + + if self.ros: + rospy.logerr(msg) + + def critical(self, msg): + self.logger.critical(msg) + + if self.ros: + rospy.logfatal(msg) diff --git a/messaging_lib.py b/messaging_lib.py new file mode 100644 index 0000000..e23f422 --- /dev/null +++ b/messaging_lib.py @@ -0,0 +1,401 @@ +import io +import sys +import json +import struct +import random +import logging +import threading +import collections + +try: + import selectors +except ImportError: + import selectors2 as selectors + +import logging_lib + +PendingRequest = collections.namedtuple("PendingRequest", ["value", "requested_value", # "expires_on", + "callback", "callback_args", "callback_kwargs", + ]) +_logger = logging.getLogger(__name__) +logger = logging_lib.Logger(_logger, True) + + +class MessageManager: + def __init__(self): + self.income_raw = b"" + self._jsonheader_len = None + self.jsonheader = None + self.content = None + + @staticmethod + def _json_encode(obj, encoding="utf-8"): + return json.dumps(obj, ensure_ascii=False).encode(encoding) + + @staticmethod + def _json_decode(json_bytes, encoding="utf-8"): + with io.TextIOWrapper(io.BytesIO(json_bytes), encoding=encoding, newline="") as tiow: + obj = json.load(tiow) + return obj + + @classmethod + def create_message(cls, content_bytes, content_type, message_type, content_encoding="utf-8", + additional_headers=None): + jsonheader = { + "byteorder": sys.byteorder, + "content-type": content_type, + "content-encoding": content_encoding, + "content-length": len(content_bytes), + "message-type": message_type, + } + if additional_headers: + jsonheader.update(additional_headers) + + jsonheader_bytes = cls._json_encode(jsonheader, "utf-8") + message_hdr = struct.pack(">H", len(jsonheader_bytes)) + message = message_hdr + jsonheader_bytes + content_bytes + return message + + @classmethod + def create_json_message(cls, contents): + message = cls.create_message(cls._json_encode(contents), "json", "message") + return message + + @classmethod + def create_simple_message(cls, command, args=None): + if args is None: + args = {} + message = cls.create_json_message({"command": command, "args": args}) + return message + + @classmethod + def create_request(cls, requested_value, request_id, args=None): + if args is None: + args = {} + contents = {"requested_value": requested_value, + "request_id": request_id, + "args": args, + } + message = cls.create_message(cls._json_encode(contents), "json", "request") + return message + + @classmethod + def create_response(cls, requested_value, request_id, value): + contents = {"requested_value": requested_value, + "request_id": request_id, + "value": value, + } + message = cls.create_message(cls._json_encode(contents), "json", "response") + return message + + def _process_protoheader(self): + header_len = 2 + if len(self.income_raw) >= header_len: + self._jsonheader_len = struct.unpack(">H", self.income_raw[:header_len])[0] + self.income_raw = self.income_raw[header_len:] + + def _process_jsonheader(self): + header_len = self._jsonheader_len + if len(self.income_raw) >= header_len: + self.jsonheader = self._json_decode(self.income_raw[:header_len], "utf-8") + self.income_raw = self.income_raw[header_len:] + for reqhdr in ( + "byteorder", + "content-length", + "content-type", + "content-encoding", + "message-type", + ): + if reqhdr not in self.jsonheader: + raise ValueError('Missing required header {}'.format(reqhdr)) + + def _process_content(self): + content_len = self.jsonheader["content-length"] + if not len(self.income_raw) >= content_len: + return + data = self.income_raw[:content_len] + self.income_raw = self.income_raw[content_len:] + if self.jsonheader["content-type"] == "json": + encoding = self.jsonheader["content-encoding"] + self.content = self._json_decode(data, encoding) + else: + self.content = data + + def process_message(self): + if self._jsonheader_len is None: + self._process_protoheader() + + if self._jsonheader_len is not None: + if self.jsonheader is None: + self._process_jsonheader() + + if self.jsonheader: + if self.content is None: + self._process_content() + + +def message_callback(string_command): + def inner(f): + ConnectionManager.messages_callbacks[string_command] = f + logger.debug("Registered message function {} for {}".format(f, string_command)) + + def wrapper(*args, **kwargs): + return f(*args, **kwargs) + + return wrapper + + return inner + + +def request_callback(string_command): + def inner(f): + ConnectionManager.requests_callbacks[string_command] = f + logger.debug("Registered callback function {} for {}".format(f, string_command)) + + def wrapper(*args, **kwargs): + return f(*args, **kwargs) + + return wrapper + + return inner + + +class ConnectionManager(object): + messages_callbacks = {} + requests_callbacks = {} + + def __init__(self): + self.selector = None + self.socket = None + self.addr = None + + self.selector = None + self.socket = None + self.addr = None + + self._recv_buffer = b"" + self._send_buffer = b"" + + self._send_queue = collections.deque() + self._received_queue = collections.deque() + self._request_queue = collections.OrderedDict() + + self._send_lock = threading.Lock() + self._request_lock = threading.Lock() + + self.BUFFER_SIZE = 1024 + self.resume_queue = False + + def _set_selector_events_mask(self, mode): + """Set selector to listen for events: mode is 'r', 'w', 'rw'.""" + if mode == "r": + events = selectors.EVENT_READ + elif mode == "w": + events = selectors.EVENT_WRITE + elif mode == "rw": + events = selectors.EVENT_READ | selectors.EVENT_WRITE + else: + raise ValueError("Invalid events mask mode {}.".format(mode)) + self.selector.modify(self.socket, events, data=self) + + def connect(self, client_selector, client_socket, client_addr): + self.selector = client_selector + self.socket = client_socket + self.addr = client_addr + + self._set_selector_events_mask('rw') + + def close(self): + logger.info("Closing connection to {}".format(self.addr)) + try: + self.selector.unregister(self.socket) + except AttributeError: + pass + except Exception as error: + logger.error("{}: Error during selector unregistering: {}".format(self.addr, error)) + finally: + self.selector = None + + try: + self.socket.close() + except AttributeError: + pass + except OSError as error: + logger.error("{}: Error during socket closing: {}".format(self.addr, error)) + finally: + self.socket = None + + def process_events(self, mask): + if mask & selectors.EVENT_READ: + self.read() + if mask & selectors.EVENT_WRITE: + self.write() + + def read(self): + self._read() + while self._recv_buffer: + if not self._received_queue or (self._received_queue[0].content is not None): + self._received_queue.appendleft(MessageManager()) + + self._received_queue[0].income_raw += self._recv_buffer + self._recv_buffer = b'' + self._received_queue[0].process_message() + + # if something left after processing message - put it back + if self._received_queue[0].content and self._received_queue[0].income_raw: + self._recv_buffer = self._received_queue[0].income_raw + self._recv_buffer + self._received_queue[0].income_raw = b'' + + if self._received_queue: + if self._received_queue[-1].content: + self.process_received(self._received_queue.popleft()) + + def _read(self): + try: + data = self.socket.recv(self.BUFFER_SIZE) + except io.BlockingIOError: + # Resource temporarily unavailable (errno EWOULDBLOCK) + pass + else: + if data: + self._recv_buffer += data + logger.debug("Received {} from {}".format(data, self.addr)) + else: + logger.warning("Connection to {} lost!".format(self.addr)) + if not self.resume_queue: + self._recv_buffer = b'' + + raise RuntimeError("Peer closed.") + + def process_received(self, income_message): + message_type = income_message.jsonheader["message-type"] + logger.debug("Received message! Header: {}, content: {}".format( + income_message.jsonheader, income_message.content)) + + if message_type == "message": + self._process_message(income_message) + elif message_type == "response": + self._process_response(income_message) + elif message_type == "request": + self._process_request(income_message) + elif message_type == "filetransfer": + self._process_filetransfer(income_message) + + def _process_message(self, message): + command = message.content["command"] + args = message.content["args"] + try: + self.messages_callbacks[command](**args) + except KeyError: + logger.warning("Command {} does not exist!".format(command)) + except Exception as error: + logger.error("Error during command {} execution: {}".format(command, error)) + + def _process_request(self, message): + command = message.content["requested_value"] + request_id = message.content["request_id"] + args = message.content["args"] + try: + value = self.requests_callbacks[command](**args) + except KeyError: + logger.warning("Request {} does not exist!".format(command)) + except Exception as error: # TODO send response error\cancel + logger.error("Error during request {} processing: {}".format(command, error)) + else: + self._send_response(command, request_id, value) + + def _process_response(self, message): + request_id, requested_value = message.content["request_id"], message.content["requested_value"] + with self._request_lock: + for key, value in self._request_queue.items(): # TODO as try [] + if (key == request_id) and (value.requested_value == requested_value): + request = self._request_queue.pop(key) + value = message.content["value"] + logger.debug( + "Request {} successfully closed with value {}".format(request, message.content["value"]) + ) + + f = request.callback + f(value, *request.callback_args, **request.callback_kwargs) + + break + else: + logger.warning("Unexpected response!") + + def _process_filetransfer(self, message): # TODO path? + if message.jsonheader["content-type"] == "binary": + filepath = message.jsonheader["filepath"] + try: + with open(filepath, 'wb') as f: + f.write(message.content) + except OSError as error: + logger.error("File {} can not be written due error: {}".format(filepath, error)) + else: + logger.info("File {} successfully received ".format(filepath)) + + def write(self): + with self._send_lock: + if (not self._send_buffer) and self._send_queue: + message = self._send_queue.popleft() + self._send_buffer += message + if self._send_buffer: + self._write() + + def _write(self): + try: + sent = self.socket.send(self._send_buffer) + except io.BlockingIOError: + # Resource temporarily unavailable (errno EWOULDBLOCK) + pass + except Exception as error: + logger.warning("Attempt to send message {} to {} failed due error: {}".format( + self._send_buffer, self.addr, error)) + + if not self.resume_queue: + self._send_buffer = b'' + + raise error + else: + logger.debug("Sent {} to {}".format(self._send_buffer[:sent], self.addr)) + self._send_buffer = self._send_buffer[sent:] + + def _send(self, data): + with self._send_lock: + self._send_queue.append(data) + + def get_response(self, requested_value, callback, request_args=None, # timeout=30, + callback_args=(), callback_kwargs=None): + if request_args is None: + request_args = {} + if callback_kwargs is None: + callback_kwargs = {} + + request_id = str(random.randint(0, 9999)).zfill(4) + with self._request_lock: + self._request_queue[request_id] = PendingRequest( + requested_value=requested_value, + value=None, + # expires_on=Server.time_now()+timeout, #TODO + callback=callback, + callback_args=callback_args, + callback_kwargs=callback_kwargs, + ) + self._send(MessageManager.create_request(requested_value, request_id, request_args)) + + def send_message(self, command, args=None): + self._send(MessageManager.create_simple_message(command, args)) + + def _send_response(self, requested_value, request_id, value): + self._send(MessageManager.create_response(requested_value, request_id, value)) + + def send_file(self, filepath, dest_filepath): # clever_restart=False + try: + with open(filepath, 'rb') as f: + data = f.read() + except OSError as error: + logger.warning("File can not be opened due error: ".format(error)) + else: + logger.info("Sending file {} to {} (as: {})".format(filepath, self.addr, dest_filepath)) + self._send(MessageManager.create_message( + data, "binary", "filetransfer", "binary", {"filepath": dest_filepath} + ))