From 9f22603ecb286f18cffd3290fb20d89883c2db1b Mon Sep 17 00:00:00 2001 From: Arthur Golubtsov Date: Sat, 23 May 2020 15:56:43 +0300 Subject: [PATCH] Drone: Update animation_lib module --- Drone/animation_lib.py | 346 ++++++++++++++++++++++------------------- 1 file changed, 188 insertions(+), 158 deletions(-) diff --git a/Drone/animation_lib.py b/Drone/animation_lib.py index 2018fdb..6d2c4ad 100644 --- a/Drone/animation_lib.py +++ b/Drone/animation_lib.py @@ -1,6 +1,8 @@ +import os import time import csv import copy +import numpy import rospy import logging import threading @@ -15,165 +17,211 @@ try: except ImportError: print("Can't import LedLib") -import tasking_lib as tasking - logger = logging.getLogger(__name__) interrupt_event = threading.Event() -anim_id = "Empty id" +def moving(f1, f2, delta, x = True, y = True, z = True): + return ((abs(f1['x'] - f2['x']) > delta) and x + or (abs(f1['y'] - f2['y']) > delta) and y + or (abs(f1['z'] - f2['z']) > delta) and z) -# TODO refactor as class -# TODO separate code for frames transformations (e.g. for gps) +class Animation(object): + def __init__(self, config = None, filepath = "animation.csv"): + self.id = None + self.static_begin_time = None + self.takeoff_time = None + self.original_frames = None + self.static_begin_frames = None + self.takeoff_frames = None + self.route_frames = None + self.land_frames = None + self.static_end_frames = None + self.output_frames = None + self.output_frames_min_z = None + self.filepath = filepath + if config is not None: + self.update_frames(config, filepath) -def get_id(filepath="animation.csv"): - global anim_id - try: - animation_file = open(filepath) - except IOError: - logger.debug("File {} can't be opened".format(filepath)) - anim_id = "No animation" - return anim_id - else: - with animation_file: - csv_reader = csv.reader( - animation_file, delimiter=',', quotechar='|' - ) - row_0 = csv_reader.next() - if len(row_0) == 1: - anim_id = row_0[0] - logger.debug("Got animation_id: {}".format(anim_id)) - else: - anim_id = "Empty id" - logger.debug("No animation id in file") - return anim_id - -def get_start_xy(filepath="animation.csv", x_ratio=1, y_ratio=1, z_ratio=1): - try: - animation_file = open(filepath) - except IOError: - logger.debug("File {} can't be opened".format(filepath)) - anim_id = "No animation" - return float('nan'), float('nan') - else: - with animation_file: - csv_reader = csv.reader( - animation_file, delimiter=',', quotechar='|' - ) - try: - row_frame = csv_reader.next() - except: - return float('nan'), float('nan') - if len(row_frame) == 1: - anim_id = row_frame[0] - logger.debug("Got animation_id: {}".format(row_frame[0])) - row_frame = csv_reader.next() - if len(row_frame) == 2: - logger.debug("Got frame delay: {}".format(row_frame[1])) - row_frame - csv_reader.next() - try: - frame_number, x, y, z, yaw, red, green, blue = row_frame - except: - return float('nan'), float('nan') - return float(x)*x_ratio, float(y)*y_ratio - - -def load_animation(filepath="animation.csv", default_delay = 0.1, x0=0, y0=0, z0=0, x_ratio=1, y_ratio=1, z_ratio=1): - imported_frames = [] - global anim_id - try: - animation_file = open(filepath) - except IOError: - logger.debug("File {} can't be opened".format(filepath)) - anim_id = "No animation" - else: - with animation_file: - current_frame_delay = default_delay - csv_reader = csv.reader( - animation_file, delimiter=',', quotechar='|' - ) - row_0 = csv_reader.next() - if len(row_0) == 1: - anim_id = row_0[0] - logger.debug("Got animation_id: {}".format(anim_id)) - elif len(row_0) == 2: - current_frame_delay = float(row_0[1]) - logger.debug("Got new frame delay: {}".format(current_frame_delay)) - else: - logger.debug("No animation id in file") - frame_number, x, y, z, yaw, red, green, blue = row_0 - imported_frames.append({ - 'number': int(frame_number), - 'x': x_ratio*float(x) + x0, - 'y': y_ratio*float(y) + y0, - 'z': z_ratio*float(z) + z0, - 'yaw': float(yaw), - 'red': int(red), - 'green': int(green), - 'blue': int(blue), - 'delay': current_frame_delay - }) - for row in csv_reader: - if len(row) == 2: - current_frame_delay = float(row[1]) + def load(self, delay=0.1, filepath="animation.csv"): + self.original_frames = [] + self.corrected_frames = [] + self.filepath = filepath + try: + animation_file = open(filepath) + except IOError: + logger.debug("File {} can't be opened".format(filepath)) + self.id = "No animation" + else: + with animation_file: + current_frame_delay = delay + csv_reader = csv.reader( + animation_file, delimiter=',', quotechar='|' + ) + row_0 = csv_reader.next() + if len(row_0) == 1: + self.id = row_0[0] + logger.debug("Got animation_id: {}".format(self.id)) + elif len(row_0) == 2: + current_frame_delay = float(row_0[1]) + logger.debug("Got new frame delay: {}".format(current_frame_delay)) else: - frame_number, x, y, z, yaw, red, green, blue = row - imported_frames.append({ + logger.debug("No animation id in file") + frame_number, x, y, z, yaw, red, green, blue = row_0 + self.original_frames.append({ 'number': int(frame_number), - 'x': x_ratio*float(x) + x0, - 'y': y_ratio*float(y) + y0, - 'z': z_ratio*float(z) + z0, + 'x': float(x), + 'y': float(y), + 'z': float(z), 'yaw': float(yaw), 'red': int(red), 'green': int(green), 'blue': int(blue), 'delay': current_frame_delay }) - return imported_frames + for row in csv_reader: + if len(row) == 2: + current_frame_delay = float(row[1]) + logger.debug("Got new frame delay: {}".format(current_frame_delay)) + else: + frame_number, x, y, z, yaw, red, green, blue = row + self.original_frames.append({ + 'number': int(frame_number), + 'x': float(x), + 'y': float(y), + 'z': float(z), + 'yaw': float(yaw), + 'red': int(red), + 'green': int(green), + 'blue': int(blue), + 'delay': current_frame_delay + }) + self.split_animation() -def correct_animation(frames, frame_delay=0.1, min_takeoff_height=0.5, move_delta=0.01, check_takeoff=True, check_land=True): - corrected_frames = copy.deepcopy(frames) - start_action = 'takeoff' - frames_to_start = 0 - time_to_start = 0 - if len(corrected_frames) == 0: - raise Exception('Nothing to correct!') - # Check takeoff - # If copter takes off in animation file, copter must be armed first and then all animation can be played - if (corrected_frames[0]['z'] < min_takeoff_height) and check_takeoff: - start_action = 'arm' - # If the first point is low, then detect moment to arm, - # delete all points, where copter is standing, and count time_delta - for i in range(len(corrected_frames)-1): - if corrected_frames[i-frames_to_start+1]['z'] - corrected_frames[i-frames_to_start]['z'] > move_delta: + ''' + Split animation into 5 parts: static_begin, takeoff, route, land, static_end + * static_begin and static_end are arrays of frames in the beginning and the end of animation, + where the drone doesn't move + * takeoff and land are arrays of frames after and before static frames of animation, + where the drone doesn't move in xy plane, and it's z coordinate only increases or decreases, respectively. + * route is the rest of the animation + Count static_begin_time and takeoff_time + ''' + def split_animation(self, move_delta=0.01): + if len(self.original_frames) == 0: + return + frames = copy.deepcopy(self.original_frames) + self.static_begin_frames = [] + self.takeoff_frames = [] + self.route_frames = [] + self.land_frames = [] + self.static_end_frames = [] + self.static_begin_time = 0 + self.takeoff_time = 0 + i = 0 # Moving index from the beginning + # Select static begin frames + while i < len(frames) - 1: + if moving(frames[i], frames[i+1], move_delta): break - time_to_start += corrected_frames[i-frames_to_start]['delay'] - del corrected_frames[i-frames_to_start] - frames_to_start += 1 - start_delay = time_to_start - # Check Land - # If copter lands in animation, landing points can be deleted - if (corrected_frames[len(corrected_frames)-1]['z'] < min_takeoff_height) and check_land: - for i in range(len(corrected_frames)-1,0,-1): - # print i - if abs(corrected_frames[i-1]['z'] - corrected_frames[i]['z']) < move_delta: + self.static_begin_time += frames[i]['delay'] + i += 1 + self.static_begin_frames = frames[:i+1] + frames = frames[i+1:] + i = 0 + # Select takeoff frames + while i < len(frames) - 1: + if moving(frames[i], frames[i+1], move_delta, z = False) or frames[i]['z'] - frames[i+1]['z'] <= 0: break - del corrected_frames[i] - for i in range(len(corrected_frames)-1,0,-1): - if (abs(corrected_frames[i-1]['x'] - corrected_frames[i]['x']) > move_delta or - abs(corrected_frames[i-1]['y'] - corrected_frames[i]['y']) > move_delta): + self.takeoff_time += frames[i]['delay'] + i += 1 + self.takeoff_frames = frames[:i+1] + frames = frames[i+1:] + i = len(frames) - 1 # Moving index from the end + # Select static end frames + while i >= 0: + if moving(frames[i], frames[i-1], move_delta): break - del corrected_frames[i] - return corrected_frames, start_action, start_delay + i -= 1 + self.static_end_frames = frames[i:] + frames = frames[:i] + i -= len(frames) - 1 + # Select land frames + while i >= 0: + if moving(frames[i], frames[i-1], move_delta, z = False) or frames[i-1]['z'] - frames[i]['z'] >= 0: + break + i -= 1 + self.land_frames = frames[i:] + # Get route frames + self.route_frames = frames[:i] -# Needs for test -def save_corrected_animation(frames, filename="corrected_animation.csv"): - corrected_animation = open(filename, mode='w+') - csv_writer = csv.writer(corrected_animation, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) - for frame in frames: - csv_writer.writerow([frame['number'],frame['x'], frame['y'], frame['z'], frame['delay']]) - # print frame - corrected_animation.close() + def make_output_frames(self, static_begin, takeoff, route, land, static_end): + self.output_frames = [] + if static_begin: + self.output_frames += self.static_begin_frames + if takeoff: + self.output_frames += self.takeoff_frames + if route: + self.output_frames += self.route_frames + if land: + self.output_frames += self.land_frames + if static_end: + self.output_frames += self.static_end_frames + self.output_frames_min_z = min(self.output_frames, key = lambda p: p['z'])['z'] + + def update_frames(self, config, filepath): + self.load(config.animation_frame_delay, filepath) + self.make_output_frames(config.animation_output_static_begin, + config.animation_output_takeoff, + config.animation_output_route, + config.animation_output_land, + config.animation_output_static_end) + + def get_scaled_output(self, ratio = (1,1,1), offset = (0,0,0)): + x0, y0, z0 = offset + x_ratio, y_ratio, z_ratio = ratio + scaled_frames = copy.deepcopy(self.output_frames) + for frame in scaled_frames: + frame['x'] = x_ratio*frame['x'] + x0 + frame['y'] = y_ratio*frame['y'] + y0 + frame['z'] = z_ratio*frame['z'] + z0 + return scaled_frames + + def get_scaled_output_min_z(self, ratio = (1,1,1), offset = (0,0,0)): + x0, y0, z0 = offset + x_ratio, y_ratio, z_ratio = ratio + return self.output_frames_min_z*z_ratio + z0 + + def get_start_point(self, ratio = (1,1,1), offset = (0,0,0)): + x0, y0, z0 = offset + x_ratio, y_ratio, z_ratio = ratio + first_frame = self.output_frames[0] + x = x_ratio*first_frame['x'] + x0 + y = y_ratio*first_frame['y'] + y0 + z = z_ratio*first_frame['z'] + z0 + return x, y, z + + def get_start_action(self, start_action, current_height, takeoff_level): + if start_action is 'auto': + if current_height > takeoff_level: + return 'takeoff' + else: + return 'play' + elif start_action in ('takeoff', 'play'): + return start_action + else: + return 'error' + + def check_ground(self, ground_level = 0, ratio = (1,1,1), offset = (0,0,0)): + return ground_level <= self.get_scaled_output_min_z(ratio, offset) + + # Need for tests + def save_corrected_animation(self): + name, ext = os.path.splitext(self.filepath) + filepath = name + '_corrected' + ext + with open(filepath, mode='w+') as corrected_animation: + csv_writer = csv.writer(corrected_animation, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) + for frame in self.corrected_frames: + csv_writer.writerow([frame['number'], frame['x'], frame['y'], frame['z'], frame['red'], frame['green'], frame['blue'], frame['delay']]) def convert_frame(frame): return ((frame['x'], frame['y'], frame['z']), (frame['red'], frame['green'], frame['blue']), frame['yaw']) @@ -189,24 +237,6 @@ try: if color: LedLib.fill(*color) - - - - def execute_animation(frames, frame_delay, frame_id='aruco_map', use_leds=True, flight_func=FlightLib.navto, - interrupter=interrupt_event): - next_frame_time = 0 - for frame in frames: - if interrupter.is_set(): - logger.warning("Animation playing function interrupted!") - interrupter.clear() - return - execute_frame(*convert_frame(frame), frame_id=frame_id, use_leds=use_leds, flight_func=flight_func, - interrupter=interrupter) - - next_frame_time += frame_delay - tasking.wait(next_frame_time, interrupter) - - def takeoff(z=1.5, safe_takeoff=True, frame_id='map', timeout=5.0, use_leds=True, interrupter=interrupt_event): if use_leds: