mirror of
https://github.com/CopterExpress/clever-show.git
synced 2026-05-29 16:29:34 +00:00
Remove Flightlib files back
This commit is contained in:
293
Drone/FlightLib/FlightLib.py
Normal file
293
Drone/FlightLib/FlightLib.py
Normal file
@@ -0,0 +1,293 @@
|
||||
#!/usr/bin/python
|
||||
from __future__ import print_function
|
||||
import sys
|
||||
import math
|
||||
import time
|
||||
import logging
|
||||
import threading
|
||||
import rospy
|
||||
from clever import srv
|
||||
from mavros_msgs.srv import SetMode
|
||||
from mavros_msgs.srv import CommandBool
|
||||
from std_srvs.srv import Trigger
|
||||
|
||||
module_logger = logging.getLogger("FlightLib.FlightLib")
|
||||
|
||||
# create proxy service
|
||||
navigate = rospy.ServiceProxy('navigate', srv.Navigate)
|
||||
set_position = rospy.ServiceProxy('set_position', srv.SetPosition)
|
||||
set_rates = rospy.ServiceProxy('/set_rates', srv.SetRates)
|
||||
set_mode = rospy.ServiceProxy('/mavros/set_mode', SetMode)
|
||||
get_telemetry = rospy.ServiceProxy('get_telemetry', srv.GetTelemetry)
|
||||
arming = rospy.ServiceProxy('/mavros/cmd/arming', CommandBool)
|
||||
landing = rospy.ServiceProxy('/land', Trigger)
|
||||
|
||||
module_logger.info("Proxy services inited")
|
||||
|
||||
# globals
|
||||
FREQUENCY = 1000/25 # HZ
|
||||
TOLERANCE = 0.2
|
||||
|
||||
interrupt_event = threading.Event()
|
||||
|
||||
|
||||
def interrupt():
|
||||
module_logger.info("Performing function interrupt")
|
||||
interrupt_event.set()
|
||||
|
||||
|
||||
def init(node_name="CleverSwarmFlight", anon=True, no_signals=True):
|
||||
module_logger.info("Initing ROS node")
|
||||
rospy.init_node(node_name, anonymous=anon, disable_signals=no_signals)
|
||||
module_logger.info("Ros node inited")
|
||||
|
||||
|
||||
def get_distance3d(x1, y1, z1, x2, y2, z2):
|
||||
return math.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2 + (z1 - z2) ** 2)
|
||||
|
||||
|
||||
def check(check_name):
|
||||
def inner(f):
|
||||
def wrapper(*args, **kwargs):
|
||||
result, failures = f(*args, **kwargs)
|
||||
if failures:
|
||||
msgs = []
|
||||
for failure in failures:
|
||||
msg = "[{}]: Failure: {}".format(check_name, failure)
|
||||
msgs.append(msg)
|
||||
module_logger.warning(msg)
|
||||
return msgs
|
||||
else:
|
||||
module_logger.info("[{}]: OK".format(check_name))
|
||||
return None
|
||||
return wrapper
|
||||
return inner
|
||||
|
||||
|
||||
@check("Linear velocity estimation")
|
||||
def check_linear_speeds():
|
||||
failures = []
|
||||
telemetry = get_telemetry(frame_id='body')
|
||||
speed_limit = 0.1
|
||||
if telemetry.vx >= speed_limit:
|
||||
failures.append("X velocity estimation: {:.3f} m/s".format(telemetry.vx))
|
||||
if telemetry.vy >= speed_limit:
|
||||
failures.append("Y velocity estimation: {:.3f} m/s".format(telemetry.vy))
|
||||
if telemetry.vz >= speed_limit:
|
||||
failures.append("Z velocity estimation: {:.3f} m/s".format(telemetry.vz))
|
||||
return failures
|
||||
|
||||
|
||||
@check("Angular velocity estimation")
|
||||
def check_angular_speeds():
|
||||
failures = []
|
||||
telemetry = get_telemetry(frame_id='body')
|
||||
rate_limit = 0.05
|
||||
if telemetry.pitch_rate >= rate_limit:
|
||||
failures.append("Pitch rate estimation: {:.3f} rad/s".format(telemetry.pitch_rate))
|
||||
if telemetry.roll_rate >= rate_limit:
|
||||
failures.append("Roll rate estimation: {:.3f} rad/s".format(telemetry.roll_rate))
|
||||
if telemetry.yaw_rate >= rate_limit:
|
||||
failures.append("Yaw rate estimation: {:.3f} rad/s".format(telemetry.yaw_rate))
|
||||
return failures
|
||||
|
||||
|
||||
@check("Angles estimation")
|
||||
def check_angles():
|
||||
failures = []
|
||||
telemetry = get_telemetry(frame_id='body')
|
||||
angle_limit = math.radians(1)
|
||||
if abs(telemetry.pitch) >= angle_limit:
|
||||
failures.append("Pitch estimation: {:.3f} rad;{:.3f} degrees".format(telemetry.pitch,
|
||||
math.degrees(telemetry.pitch)))
|
||||
if abs(telemetry.roll) >= angle_limit:
|
||||
failures.append("Roll estimation: {:.3f} rad;{:.3f} degrees".format(telemetry.roll,
|
||||
math.degrees(telemetry.roll)))
|
||||
if abs(telemetry.yaw) >= angle_limit:
|
||||
failures.append("Yaw estimation: {:.3f} rad;{:.3f} degrees".format(telemetry.yaw,
|
||||
math.degrees(telemetry.yaw)))
|
||||
return failures
|
||||
|
||||
|
||||
def selfcheck():
|
||||
msgs = []
|
||||
msgs.extend(check_linear_speeds())
|
||||
msgs.extend(check_angular_speeds())
|
||||
msgs.extend(check_angles())
|
||||
|
||||
return msgs
|
||||
|
||||
|
||||
def navto(x, y, z, yaw=float('nan'), frame_id='aruco_map'):
|
||||
set_position(frame_id=frame_id, x=x, y=y, z=z, yaw=yaw)
|
||||
telemetry = get_telemetry(frame_id=frame_id)
|
||||
|
||||
module_logger.info('Going to: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(x, y, z, yaw))
|
||||
module_logger.info('Telemetry now: | z: {:.3f}'.format(telemetry.z))
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def reach_point(x=0.0, y=0.0, z=0.0, yaw=float('nan'), speed=1.0, tolerance=TOLERANCE, frame_id='aruco_map',
|
||||
freq=FREQUENCY, timeout=5000, wait=False):
|
||||
|
||||
module_logger.info('Reaching point: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(x, y, z, yaw))
|
||||
navigate(frame_id=frame_id, x=x, y=y, z=z, yaw=yaw, speed=speed)
|
||||
|
||||
# waiting for completion
|
||||
telemetry = get_telemetry(frame_id=frame_id)
|
||||
rate = rospy.Rate(freq)
|
||||
time_start = rospy.get_rostime()
|
||||
|
||||
while (get_distance3d(x, y, z, telemetry.x, telemetry.y, telemetry.z) > tolerance) or wait:
|
||||
if interrupt_event.is_set():
|
||||
module_logger.warning("Flight function interrupted!")
|
||||
interrupt_event.clear()
|
||||
break
|
||||
|
||||
telemetry = get_telemetry(frame_id=frame_id)
|
||||
module_logger.info('Telemetry: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(
|
||||
telemetry.x, telemetry.y, telemetry.z, telemetry.yaw))
|
||||
|
||||
time_passed = (rospy.get_rostime() - time_start).to_sec() * 1000
|
||||
|
||||
if timeout is not None:
|
||||
if time_passed >= timeout:
|
||||
module_logger.warning('Reaching point timed out! | time: {:3f} seconds'.format(time_passed / 1000))
|
||||
return wait
|
||||
rate.sleep()
|
||||
else:
|
||||
module_logger.info("Point reached!")
|
||||
return True
|
||||
|
||||
|
||||
def reach_attitude(z=0.0, yaw=float('nan'), speed=1.0, tolerance=TOLERANCE, frame_id='aruco_map',
|
||||
freq=FREQUENCY, timeout=5000, wait=False):
|
||||
|
||||
module_logger.info('Reaching attitude: | z: {:.3f} yaw: {:.3f}'.format(z, yaw))
|
||||
current_telem = get_telemetry(frame_id=frame_id)
|
||||
navigate(frame_id=frame_id, x=current_telem.x, y=current_telem.y, z=z, yaw=yaw, speed=speed)
|
||||
|
||||
# waiting for completion
|
||||
telemetry = get_telemetry(frame_id=frame_id)
|
||||
rate = rospy.Rate(freq)
|
||||
time_start = rospy.get_rostime()
|
||||
|
||||
while (abs(z - telemetry.z) > tolerance) or wait:
|
||||
if interrupt_event.is_set():
|
||||
module_logger.warning("Flight function interrupted!")
|
||||
interrupt_event.clear()
|
||||
break
|
||||
|
||||
telemetry = get_telemetry(frame_id=frame_id)
|
||||
module_logger.info('Telemetry: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(
|
||||
telemetry.x, telemetry.y, telemetry.z, telemetry.yaw))
|
||||
|
||||
time_passed = (rospy.get_rostime() - time_start).to_sec() * 1000
|
||||
if timeout is not None:
|
||||
if time_passed >= timeout:
|
||||
module_logger.warning('Reaching attitude timed out! | time: {:3f} seconds'.format(time_passed / 1000))
|
||||
return wait
|
||||
rate.sleep()
|
||||
else:
|
||||
module_logger.info("Attitude reached!")
|
||||
return True
|
||||
|
||||
|
||||
def land(descend=True, z=1.0, frame_id_descend="aruco_map", frame_id_land="aruco_map",
|
||||
timeout_descend=5000, timeout_land=7500, freq=FREQUENCY):
|
||||
if descend:
|
||||
module_logger.info("Descending to: | z: {:.3f}".format(z))
|
||||
reach_attitude(z=z, frame_id=frame_id_descend, timeout=timeout_descend, freq=freq, yaw=1.57) #TODO yaw
|
||||
landing()
|
||||
|
||||
telemetry = get_telemetry(frame_id='aruco_map')
|
||||
rate = rospy.Rate(freq)
|
||||
time_start = rospy.get_rostime()
|
||||
|
||||
while telemetry.armed:
|
||||
if interrupt_event.is_set():
|
||||
module_logger.warning("Flight function interrupted!")
|
||||
interrupt_event.clear()
|
||||
break
|
||||
|
||||
telemetry = get_telemetry(frame_id=frame_id_land)
|
||||
module_logger.info("Landing...")
|
||||
time_passed = (rospy.get_rostime() - time_start).to_sec() * 1000
|
||||
|
||||
if timeout_land is not None:
|
||||
if time_passed >= timeout_land:
|
||||
module_logger.warning('Landing timed out! | time: {:3f} seconds'.format(time_passed / 1000))
|
||||
module_logger.warning("Disarming!")
|
||||
arming(False)
|
||||
return False
|
||||
rate.sleep()
|
||||
else:
|
||||
module_logger.info("Landing succeeded!")
|
||||
return True
|
||||
|
||||
|
||||
def takeoff(z=1.0, speed=0.8, frame_id='body', freq=FREQUENCY,
|
||||
timeout_arm=2000, timeout_takeoff=5000, wait=False, tolerance=TOLERANCE, emergency_land=False):
|
||||
module_logger.info("Starting takeoff!")
|
||||
print("Starting takeoff!")
|
||||
module_logger.info("Arming, going to OFFBOARD mode")
|
||||
|
||||
# Arming check
|
||||
set_rates(thrust=0.1, auto_arm=True)
|
||||
telemetry = get_telemetry(frame_id=frame_id)
|
||||
rate = rospy.Rate(freq)
|
||||
time_start = rospy.get_rostime()
|
||||
|
||||
while (not telemetry.armed) or wait:
|
||||
if interrupt_event.is_set():
|
||||
module_logger.warning("Flight function interrupted!")
|
||||
interrupt_event.clear()
|
||||
return None
|
||||
|
||||
telemetry = get_telemetry(frame_id=frame_id)
|
||||
module_logger.info("Arming...")
|
||||
time_passed = (rospy.get_rostime() - time_start).to_sec() * 1000
|
||||
|
||||
if timeout_arm is not None:
|
||||
if time_passed >= timeout_arm:
|
||||
if not telemetry.armed:
|
||||
module_logger.warning('Arming timed out! | time: {:3f} seconds'.format(time_passed / 1000))
|
||||
return False
|
||||
else:
|
||||
break
|
||||
rate.sleep()
|
||||
|
||||
module_logger.info("Armed!")
|
||||
print("Armed!")
|
||||
|
||||
# Reach height
|
||||
telemetry = get_telemetry(frame_id=frame_id)
|
||||
z0 = get_telemetry().z
|
||||
navigate(z=z, speed=speed, frame_id=frame_id, auto_arm=True)
|
||||
current_height = abs(get_telemetry().z - z0 - z)
|
||||
while current_height > tolerance or wait:
|
||||
if interrupt_event.is_set():
|
||||
module_logger.warning("Flight function interrupted!")
|
||||
interrupt_event.clear()
|
||||
return None
|
||||
|
||||
current_height = abs(get_telemetry().z - z0 - z)
|
||||
module_logger.info("Takeoff...")
|
||||
time_passed = (rospy.get_rostime() - time_start).to_sec() * 1000
|
||||
|
||||
if timeout_takeoff is not None:
|
||||
if time_passed >= timeout_takeoff:
|
||||
if not wait:
|
||||
module_logger.warning('Takeoff timed out! | time: {:3f} seconds'.format(time_passed / 1000))
|
||||
if emergency_land:
|
||||
module_logger.info("Preforming emergency land")
|
||||
land(descend=False)
|
||||
return False
|
||||
else:
|
||||
break
|
||||
rate.sleep()
|
||||
|
||||
module_logger.info("Takeoff succeeded!")
|
||||
print("Takeoff succeeded!")
|
||||
return True
|
||||
235
Drone/FlightLib/LedLib.py
Normal file
235
Drone/FlightLib/LedLib.py
Normal file
@@ -0,0 +1,235 @@
|
||||
from __future__ import print_function
|
||||
from threading import Thread
|
||||
import time
|
||||
from rpi_ws281x import *
|
||||
|
||||
# LED strip configuration:
|
||||
LED_COUNT = 29 # Number of LED pixels.
|
||||
LED_PIN = 21 # GPIO pin connected to the pixels (18 uses PWM!) (10 uses SPI /dev/spidev0.0).
|
||||
LED_FREQ_HZ = 800000 # LED signal frequency in hertz (usually 800khz)
|
||||
LED_DMA = 10 # DMA channel to use for generating signal (try 10)
|
||||
LED_BRIGHTNESS = 255 # Set to 0 for darkest and 255 for brightest
|
||||
LED_INVERT = False # True to invert the signal (when using NPN transistor level shift)
|
||||
LED_CHANNEL = 0 # Set to '1' for GPIOs 13, 19, 41, 45 or 53
|
||||
|
||||
# define led strip
|
||||
strip = Adafruit_NeoPixel(LED_COUNT, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, LED_BRIGHTNESS, LED_CHANNEL)
|
||||
|
||||
# variables
|
||||
mode = ""
|
||||
r = 0
|
||||
g = 0
|
||||
b = 0
|
||||
|
||||
r_prev = 0
|
||||
g_prev = 0
|
||||
b_prev = 0
|
||||
|
||||
direct = False
|
||||
l = 0
|
||||
wait_ms = 10
|
||||
|
||||
|
||||
# functions
|
||||
def math_wheel(pos):
|
||||
"""Generate rainbow colors across 0-255 positions."""
|
||||
if pos < 85:
|
||||
return Color(pos * 3, 255 - pos * 3, 0)
|
||||
elif pos < 170:
|
||||
pos -= 85
|
||||
return Color(255 - pos * 3, 0, pos * 3)
|
||||
else:
|
||||
pos -= 170
|
||||
return Color(0, pos * 3, 255 - pos * 3)
|
||||
|
||||
|
||||
def rainbow(wait=10, direction=False):
|
||||
global wait_ms, direct, mode
|
||||
wait_ms = wait
|
||||
direct = direction
|
||||
mode = "rainbow"
|
||||
|
||||
|
||||
def fill(red, green, blue):
|
||||
global r, g, b, mode
|
||||
r = red
|
||||
g = green
|
||||
b = blue
|
||||
mode = "fill"
|
||||
|
||||
|
||||
def blink(red, green, blue, wait=250):
|
||||
global r, g, b, wait_ms, mode
|
||||
r = red
|
||||
g = green
|
||||
b = blue
|
||||
wait_ms = wait
|
||||
mode = "blink"
|
||||
|
||||
|
||||
def chase(red, green, blue, wait=50, direction=False):
|
||||
global r, g, b, wait_ms, direct, mode
|
||||
r = red
|
||||
g = green
|
||||
b = blue
|
||||
wait_ms = wait
|
||||
direct = direction
|
||||
mode = "chase"
|
||||
|
||||
|
||||
def wipe_to(red, green, blue, wait=50, direction=False):
|
||||
global r, g, b, wait_ms, direct, mode
|
||||
r = red
|
||||
g = green
|
||||
b = blue
|
||||
wait_ms = wait
|
||||
direct = direction
|
||||
mode = "wipe_to"
|
||||
|
||||
|
||||
def fade_to(red, green, blue, wait=20): # do not working with rainbow (solid colors only)
|
||||
global r, g, b, r_prev, g_prev, b_prev, wait_ms, mode
|
||||
r_prev = r
|
||||
g_prev = g
|
||||
b_prev = b
|
||||
r = red
|
||||
g = green
|
||||
b = blue
|
||||
wait_ms = wait
|
||||
mode = "fade_to"
|
||||
|
||||
|
||||
def run(red, green, blue, length=strip.numPixels(), direction=False, wait=25):
|
||||
global r, g, b, l, wait_ms, direct, mode
|
||||
r = red
|
||||
g = green
|
||||
b = blue
|
||||
l = length
|
||||
direct = direction
|
||||
wait_ms = wait
|
||||
mode = "run"
|
||||
|
||||
|
||||
def off():
|
||||
global mode
|
||||
mode = "off"
|
||||
|
||||
|
||||
def strip_set(color):
|
||||
for i in range(strip.numPixels()):
|
||||
strip.setPixelColor(i, color)
|
||||
strip.show()
|
||||
|
||||
|
||||
def strip_rainbow_frame(iteration, direction):
|
||||
for i in range(strip.numPixels()):
|
||||
n = ((strip.numPixels()-1)*direction) - i
|
||||
strip.setPixelColor(abs(n), math_wheel((int(i * 256 / strip.numPixels()) + iteration) & 255))
|
||||
strip.show()
|
||||
|
||||
|
||||
def strip_chase_step(color, direction):
|
||||
for q in range(3):
|
||||
for i in range(0, strip.numPixels(), 3):
|
||||
n = ((strip.numPixels() - 1) * direction) - (i + q)
|
||||
strip.setPixelColor(abs(n), color)
|
||||
strip.show()
|
||||
time.sleep(wait_ms / 1000.0)
|
||||
for i in range(0, strip.numPixels(), 3):
|
||||
n = ((strip.numPixels() - 1) * direction) - (i + q)
|
||||
strip.setPixelColor(abs(n), 0)
|
||||
|
||||
|
||||
def strip_wipe(color, direction):
|
||||
for i in range(strip.numPixels()):
|
||||
n = ((strip.numPixels() - 1) * direction) - i
|
||||
strip.setPixelColor(abs(n), color)
|
||||
time.sleep(wait_ms / 1000.0)
|
||||
strip.show()
|
||||
|
||||
|
||||
def strip_fade(r1, g1, b1, r2, g2, b2, frames=50):
|
||||
r_delta = (r2-r1)//frames
|
||||
g_delta = (g2-g1)//frames
|
||||
b_delta = (b2-b1)//frames
|
||||
for i in range(frames):
|
||||
red = r1 + (r_delta * i)
|
||||
green = g1 + (g_delta * i)
|
||||
blue = b1 + (b_delta * i)
|
||||
strip_set(Color(red, green, blue))
|
||||
time.sleep(wait_ms / 1000.0)
|
||||
strip_set(Color(r2, g2, b2))
|
||||
|
||||
|
||||
def strip_run_step(red, green, blue, length, direction, iteration):
|
||||
r_delta = red // length
|
||||
g_delta = green // length
|
||||
b_delta = blue // length
|
||||
direction = not direction
|
||||
for i in range(strip.numPixels()):
|
||||
n = ((strip.numPixels()-1)*direction)-((i+iteration) % strip.numPixels())
|
||||
r_fin = max(0, (red - (r_delta * i)))
|
||||
g_fin = max(0, (green - (g_delta * i)))
|
||||
b_fin = max(0, (blue - (b_delta * i)))
|
||||
strip.setPixelColor(abs(n), Color(r_fin, g_fin, b_fin))
|
||||
strip.show()
|
||||
|
||||
|
||||
def strip_off():
|
||||
for i in range(strip.numPixels()):
|
||||
strip.setPixelColor(i, Color(0, 0, 0))
|
||||
strip.show()
|
||||
|
||||
|
||||
def led_thread():
|
||||
global mode
|
||||
print("Starting LedLib thread")
|
||||
iteration = 0
|
||||
while True:
|
||||
if mode == "rainbow":
|
||||
if iteration >= 256:
|
||||
iteration = 0
|
||||
strip_rainbow_frame(iteration, direct)
|
||||
time.sleep(wait_ms / 1000.0)
|
||||
iteration += 1
|
||||
elif mode == "fill":
|
||||
strip_set(Color(r, g, b))
|
||||
mode = ""
|
||||
elif mode == "blink":
|
||||
strip_set(Color(r, g, b))
|
||||
time.sleep(wait_ms / 1000.0)
|
||||
strip_set(Color(0, 0, 0))
|
||||
time.sleep(wait_ms / 1000.0)
|
||||
elif mode == "chase":
|
||||
strip_chase_step(Color(r, g, b), direct)
|
||||
elif mode == "wipe_to":
|
||||
strip_wipe(Color(r, g, b,), direct)
|
||||
mode = "fill"
|
||||
elif mode == "fade_to":
|
||||
strip_fade(r_prev, g_prev, b_prev, r, g, b)
|
||||
mode = ""
|
||||
elif mode == "run":
|
||||
strip_run_step(r, g, b, l, direct, iteration)
|
||||
time.sleep(wait_ms / 1000.0)
|
||||
iteration += 1
|
||||
elif mode == "off":
|
||||
strip_off()
|
||||
mode = ""
|
||||
else:
|
||||
time.sleep(1 / 1000)
|
||||
|
||||
|
||||
# init
|
||||
def init_led():
|
||||
strip.begin()
|
||||
t_l = Thread(target=led_thread)
|
||||
t_l.daemon = True
|
||||
t_l.start()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
init_led()
|
||||
try:
|
||||
rainbow()
|
||||
except KeyboardInterrupt:
|
||||
off()
|
||||
1
Drone/FlightLib/__init__.py
Normal file
1
Drone/FlightLib/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
__all__ = ['FlightLib', 'LedLib']
|
||||
Reference in New Issue
Block a user