mirror of
https://github.com/CopterExpress/clever-show.git
synced 2026-05-31 17:29:33 +00:00
Rewritten animation lib, added support for extra functions and params
This commit is contained in:
@@ -1,8 +1,9 @@
|
||||
import time
|
||||
import csv
|
||||
import rospy
|
||||
import json
|
||||
import time
|
||||
import logging
|
||||
import threading
|
||||
import numpy as np
|
||||
|
||||
from FlightLib import FlightLib
|
||||
from FlightLib import LedLib
|
||||
@@ -13,112 +14,219 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
interrupt_event = threading.Event()
|
||||
|
||||
anim_id = "Empty id"
|
||||
|
||||
# TODO refactor as class
|
||||
# TODO separate code for frames transformations (e.g. for gps)
|
||||
|
||||
|
||||
def get_id(filepath="animation.csv"):
|
||||
global anim_id
|
||||
try:
|
||||
animation_file = open(filepath)
|
||||
except IOError:
|
||||
logging.error("File {} can't be opened".format(filepath))
|
||||
anim_id = "No animation"
|
||||
return anim_id
|
||||
else:
|
||||
with animation_file:
|
||||
csv_reader = csv.reader(
|
||||
animation_file, delimiter=',', quotechar='|'
|
||||
)
|
||||
row_0 = csv_reader.next()
|
||||
if len(row_0) == 1:
|
||||
anim_id = row_0[0]
|
||||
print("Got animation_id: {}".format(anim_id))
|
||||
def requires_import(f):
|
||||
def wrapper(*args, **kwargs):
|
||||
if args[0].imported:
|
||||
return f(*args, **kwargs)
|
||||
else:
|
||||
raise RuntimeError("Frames were never imported!")
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class AnimationLoader:
|
||||
def __init__(self):
|
||||
self.anim_id = None
|
||||
self.fps = None
|
||||
self.anim_headers = {}
|
||||
|
||||
self.imported = False
|
||||
self.frames = 0
|
||||
self.current_frame = 0
|
||||
|
||||
self.points = np.empty((0, 3)) # to transform points
|
||||
self.colors = np.empty((0, 3), int) # to manipulate brightness/etc
|
||||
self.yaws = np.empty((0, 0)) # to rotate
|
||||
self.extras = []
|
||||
|
||||
def load_csv(self, filepath="animation.csv"):
|
||||
try:
|
||||
animation_file = open(filepath, mode="r")
|
||||
except IOError:
|
||||
logging.error("File {} can't be opened".format(filepath))
|
||||
return False
|
||||
else:
|
||||
with animation_file:
|
||||
csv_reader = csv.reader(animation_file, delimiter=',', quotechar='|')
|
||||
lines = list(csv_reader)
|
||||
print(lines)
|
||||
|
||||
if len(lines[0]) == 1:
|
||||
header = lines.pop(0)[0]
|
||||
print("Got animation header: {}".format(header))
|
||||
jsonheader = json.loads(header)
|
||||
self.anim_id = jsonheader.pop("file", None)
|
||||
self.anim_headers = header
|
||||
else:
|
||||
print("No header in the file")
|
||||
pass
|
||||
|
||||
self.frames = len(lines)
|
||||
print(lines)
|
||||
for row in lines:
|
||||
if row:
|
||||
self._parse_row(row)
|
||||
|
||||
self.imported = True
|
||||
return True
|
||||
|
||||
def _parse_row(self, row):
|
||||
_frame_number, x, y, z, yaw, red, green, blue = row[:8]
|
||||
try:
|
||||
extra = row[8]
|
||||
except IndexError:
|
||||
extra = {}
|
||||
|
||||
point = float(x), float(y), float(z)
|
||||
color = int(red), int(green), int(blue)
|
||||
yaw = float(yaw)
|
||||
|
||||
self.points = np.append(self.points, [point], axis=0)
|
||||
self.colors = np.append(self.colors, [color], axis=0)
|
||||
self.yaws = np.append(self.yaws, [yaw])
|
||||
self.extras.append(extra)
|
||||
|
||||
@requires_import
|
||||
def get_frame(self, i):
|
||||
point = tuple(self.points[i])
|
||||
color = tuple(self.colors[i])
|
||||
yaw = self.yaws[i]
|
||||
extra = self.extras[i]
|
||||
|
||||
return point, color, yaw, extra
|
||||
|
||||
def __getitem__(self, item):
|
||||
return self.get_frame(item)
|
||||
|
||||
def __next__(self):
|
||||
try:
|
||||
val = self.get_frame(self.current_frame)
|
||||
except IndexError:
|
||||
raise StopIteration
|
||||
else:
|
||||
self.current_frame += 1
|
||||
return val
|
||||
|
||||
next = __next__ # python 2 compatibility
|
||||
|
||||
|
||||
if __name__ == "__main__": # for testing animation loader
|
||||
anim = AnimationLoader()
|
||||
anim.load_csv("animation.csv")
|
||||
for fr in anim:
|
||||
print(fr)
|
||||
|
||||
|
||||
def property_true(value):
|
||||
return value == 1
|
||||
|
||||
|
||||
class AnimationPlayer:
|
||||
def __init__(self, frames, frame_delay, interrupter=interrupt_event, common_kwargs=None):
|
||||
self.frames = list(frames)
|
||||
self._frame_time = None
|
||||
self.frame_delay = frame_delay
|
||||
|
||||
self.interrupter = interrupter
|
||||
self.common_kwargs = common_kwargs if common_kwargs is not None else {}
|
||||
|
||||
self._override = None
|
||||
|
||||
def _execution_wrapper(self, f, *args, **kwargs):
|
||||
f(*args, **kwargs)
|
||||
|
||||
def _after_frame(self):
|
||||
self._frame_time += self.frame_delay
|
||||
tasking.wait(self._frame_time, self.interrupter)
|
||||
|
||||
def execute_animation(self, start_time=None):
|
||||
self._frame_time = start_time if start_time is not None else time.time()
|
||||
|
||||
for frame in self.frames:
|
||||
if self.interrupter.is_set():
|
||||
logger.warning("Animation playing function interrupted!")
|
||||
self.interrupter.clear()
|
||||
return 'interrupted'
|
||||
|
||||
self._execute_frame(frame)
|
||||
|
||||
self._after_frame()
|
||||
|
||||
def _execute_frame(self, frame):
|
||||
point, color, yaw, extras = frame
|
||||
|
||||
if self._override is not None:
|
||||
if property_true(extras.get(self._override, False)):
|
||||
return # don't execute anything if long-term override is active
|
||||
else:
|
||||
print("No animation id in file")
|
||||
return anim_id
|
||||
self._override = None # reset override when not active anymore
|
||||
|
||||
|
||||
def load_animation(filepath="animation.csv", x0=0, y0=0, z0=0):
|
||||
imported_frames = []
|
||||
global anim_id
|
||||
try:
|
||||
animation_file = open(filepath)
|
||||
except IOError:
|
||||
logging.error("File {} can't be opened".format(filepath))
|
||||
anim_id = "No animation"
|
||||
else:
|
||||
with animation_file:
|
||||
csv_reader = csv.reader(
|
||||
animation_file, delimiter=',', quotechar='|'
|
||||
)
|
||||
row_0 = csv_reader.next()
|
||||
if len(row_0) == 1:
|
||||
anim_id = row_0[0]
|
||||
print("Got animation_id: {}".format(anim_id))
|
||||
for key, val in extras.keys():
|
||||
try:
|
||||
extra = extra_functions[key]
|
||||
except KeyError:
|
||||
pass
|
||||
else:
|
||||
print("No animation id in file")
|
||||
frame_number, x, y, z, yaw, red, green, blue = row_0
|
||||
imported_frames.append({
|
||||
'number': int(frame_number),
|
||||
'x': float(x) + x0,
|
||||
'y': float(y) + y0,
|
||||
'z': float(z) + z0,
|
||||
'yaw': float(yaw),
|
||||
'red': int(red),
|
||||
'green': int(green),
|
||||
'blue': int(blue),
|
||||
})
|
||||
for row in csv_reader:
|
||||
frame_number, x, y, z, yaw, red, green, blue = row
|
||||
imported_frames.append({
|
||||
'number': int(frame_number),
|
||||
'x': float(x) + x0,
|
||||
'y': float(y) + y0,
|
||||
'z': float(z) + z0,
|
||||
'yaw': float(yaw),
|
||||
'red': int(red),
|
||||
'green': int(green),
|
||||
'blue': int(blue),
|
||||
})
|
||||
return imported_frames
|
||||
if property_true(val):
|
||||
if self._execute_extra(frame, key, extra):
|
||||
return # returns if override extra is preformed
|
||||
|
||||
point_flight(*frame, interrupter=self.interrupter, **self.common_kwargs)
|
||||
|
||||
def _execute_extra(self, frame, key, extra):
|
||||
extra_f, params = extra
|
||||
|
||||
self._execution_wrapper(extra_f, frame, interrupter=self.interrupter, **self.common_kwargs)
|
||||
|
||||
override = params.get("override", False) # False is default if not found
|
||||
|
||||
if params.get("oneshot", False):
|
||||
self._override = key
|
||||
|
||||
return override
|
||||
|
||||
|
||||
def convert_frame(frame):
|
||||
return ((frame['x'], frame['y'], frame['z']), (frame['red'], frame['green'], frame['blue']), frame['yaw'])
|
||||
extra_functions = {}
|
||||
|
||||
|
||||
def execute_frame(point=(), color=(), yaw=float('Nan'), frame_id='aruco_map', use_leds=True,
|
||||
flight_func=FlightLib.navto, flight_kwargs=None, interrupter=interrupt_event):
|
||||
def extra_function(key, **params):
|
||||
def inner(f):
|
||||
extra_functions[key] = f, params
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
@extra_function("drone_flip", override=True, oneshot=True)
|
||||
def execute_flip(frame, frame_id='aruco_map', flight_kwargs=None, interrupter=interrupt_event, **kwargs):
|
||||
if flight_kwargs is None:
|
||||
flight_kwargs = {}
|
||||
|
||||
flight_func(*point, yaw=yaw, frame_id=frame_id, interrupter=interrupt_event, **flight_kwargs)
|
||||
FlightLib.flip(frame_id=frame_id, **flight_kwargs)
|
||||
|
||||
|
||||
def point_flight(frame, frame_id='aruco_map', use_leds=True,
|
||||
flight_func=FlightLib.navto, flight_kwargs=None,
|
||||
interrupter=interrupt_event, **kwargs):
|
||||
if flight_kwargs is None:
|
||||
flight_kwargs = {}
|
||||
|
||||
point, color, yaw, extras = frame
|
||||
|
||||
flight_func(*point, yaw=yaw, frame_id=frame_id, interrupter=interrupter, **flight_kwargs)
|
||||
if use_leds:
|
||||
if color:
|
||||
LedLib.fill(*color)
|
||||
|
||||
|
||||
def execute_animation(frames, frame_delay, frame_id='aruco_map', use_leds=True, flight_func=FlightLib.navto,
|
||||
interrupter=interrupt_event):
|
||||
next_frame_time = 0
|
||||
for frame in frames:
|
||||
if interrupter.is_set():
|
||||
logger.warning("Animation playing function interrupted!")
|
||||
interrupter.clear()
|
||||
return
|
||||
execute_frame(*convert_frame(frame), frame_id=frame_id, use_leds=use_leds, flight_func=flight_func,
|
||||
interrupter=interrupter)
|
||||
|
||||
next_frame_time += frame_delay
|
||||
tasking.wait(next_frame_time, interrupter)
|
||||
LedLib.fill(*color)
|
||||
|
||||
|
||||
def takeoff(z=1.5, safe_takeoff=True, frame_id='map', timeout=5.0, use_leds=True,
|
||||
interrupter=interrupt_event):
|
||||
print(interrupter.is_set())
|
||||
if use_leds:
|
||||
LedLib.wipe_to(255, 0, 0, interrupter=interrupter)
|
||||
if interrupter.is_set():
|
||||
@@ -127,6 +235,7 @@ def takeoff(z=1.5, safe_takeoff=True, frame_id='map', timeout=5.0, use_leds=True
|
||||
interrupter=interrupter)
|
||||
if result == 'not armed':
|
||||
raise Exception('STOP') # Raise exception to clear task_manager if copter can't arm
|
||||
|
||||
if interrupter.is_set():
|
||||
return
|
||||
if use_leds:
|
||||
|
||||
Reference in New Issue
Block a user