From 011d0adfde3c70fc1ed2f61ff533f0de409a4497 Mon Sep 17 00:00:00 2001 From: Arthur Golubtsov Date: Wed, 17 Jun 2020 19:17:14 +0300 Subject: [PATCH] Add action to frame, update animation module --- drone/modules/animation.py | 450 +++++++++++++++++++++---------------- 1 file changed, 259 insertions(+), 191 deletions(-) diff --git a/drone/modules/animation.py b/drone/modules/animation.py index c805fac..b8bde27 100644 --- a/drone/modules/animation.py +++ b/drone/modules/animation.py @@ -30,14 +30,21 @@ def moving(f1, f2, delta, x=True, y=True, z=True): def get_numbers(frames): numbers = [] - if frames: - for frame in frames: - numbers.append(frame.number) + for frame in frames: + numbers.append(frame.number) return numbers +def get_duration(frames): + duration = 0 + for frame in frames: + duration += frame.delay + return duration + + class Frame(object): params_dict = { "number": None, + "action": 'fly', # fly, arm, land, stand "x": None, "y": None, "z": None, @@ -45,7 +52,7 @@ class Frame(object): "red": None, "green": None, "blue": None, - "delay": None, + "delay": 0, } def __init__(self, csv_row=None, delay=None): for key, value in self.params_dict.items(): @@ -86,261 +93,329 @@ class Frame(object): return self.get_pos() and (self.yaw is not None) class Animation(object): - def __init__(self, config=None, filepath="animation.csv"): - self.reset(filepath) + # filepath - path to csv animation file, config - config 'ANIMATION' section (dictionary) + def __init__(self, filepath="animation.csv", config=None): + self.reset(filepath, config) if config is not None: - self.update_frames(config, filepath) + self.on_animation_update(self.filepath) - def reset(self, filepath): + def reset(self, filepath, config): self.id = None - self.start_delay = 0 - self.original_frames = None - self.static_begin_frames = None - self.static_begin_time = 0 - self.takeoff_frames = None - self.takeoff_time = 0 - self.route_frames = None - self.route_time = 0 - self.land_frames = None - self.land_time = 0 - self.static_end_frames = None - self.static_end_time = 0 - self.output_frames = None + self.original_frames = [] + self.transformed_frames = [] + self.static_begin_index = 0 + self.takeoff_index = 0 + self.route_index = 0 + self.land_index = 0 + self.static_end_index = 0 + self.output_frames = [] self.output_frames_min_z = None + self.output_frames_takeoff = [] + self.output_frames_takeoff_min_z = None self.filepath = filepath + self.config = config self.state = None - def load(self, filepath="animation.csv", config = None): - self.original_frames = [] - self.corrected_frames = [] - self.filepath = filepath + def set_state(self, state, log_error=False): + self.state = state + if log_error: + logger.error(state) + + def load(self): self.state = "OK" - delay = config.animation_frame_delay try: - animation_file = open(filepath) + delay = self.config.animation_frame_delay + except (TypeError, KeyError): + self.set_state("Bad animation delay from config 'ANIMATION' section", log_error=True) + return + try: + animation_file = open(self.filepath) except IOError: - logger.debug("File {} can't be opened".format(filepath)) - self.state = "No animation" + self.set_state("File {} can't be opened".format(self.filepath), log_error=True) else: with animation_file: current_frame_delay = delay csv_reader = csv.reader( animation_file, delimiter=',', quotechar='|' ) - row_0 = csv_reader.next() + try: + row_0 = csv_reader.next() + except StopIteration: + self.set_state("Animation file is empty", log_error=True) + return if len(row_0) == 1: self.id = row_0[0] logger.debug("Got animation_id: {}".format(self.id)) elif len(row_0) == 2: - current_frame_delay = float(row_0[1]) - logger.debug("Got new frame delay: {}".format(current_frame_delay)) + try: + current_frame_delay = float(row_0[1]) + logger.debug("Got new frame delay: {}".format(current_frame_delay)) + except ValueError as e: + self.set_state("Can't parse delay row in csv file. {}".format(e), log_error=True) + return else: logger.debug("No animation id in file") self.id = "No animation id" try: frame = Frame(row_0, current_frame_delay) except ValueError as e: - logger.error("Can't parse row in csv file. {}".format(e)) - self.state = "Bad animation file" - return - try: - frame.set_yaw(config.animation_yaw) - except ValueError as e: - logger.error("Can't set yaw from configuration") - self.state = "Bad yaw from config" + self.set_state("Can't parse frame row in csv file. {}".format(e), log_error=True) return self.original_frames.append(frame) for row in csv_reader: if len(row) == 2: - current_frame_delay = float(row[1]) - logger.debug("Got new frame delay: {}".format(current_frame_delay)) + try: + current_frame_delay = float(row_0[1]) + logger.debug("Got new frame delay: {}".format(current_frame_delay)) + except ValueError as e: + self.set_state("Can't parse delay row in csv file. {}".format(e), log_error=True) + return else: try: frame = Frame(row, current_frame_delay) except ValueError as e: - logger.error("Can't parse row in csv file. {}".format(e)) - self.state = "Bad animation file" - return - try: - frame.set_yaw(config.animation_yaw) - except ValueError as e: - logger.error("Can't set yaw from configuration") - self.state = "Bad yaw from config" + self.set_state("Can't parse frame row in csv file. {}".format(e), log_error=True) return self.original_frames.append(frame) - self.split_animation() + self.set_yaw() + if self.state == "OK": + if self.original_frames: + self.split() + else: + self.set_state("No frames loaded!", log_error=True) - ''' - Split animation into 5 parts: static_begin, takeoff, route, land, static_end - * static_begin and static_end are chains of frames in the beginning and the end of animation, - where the drone doesn't move - * takeoff and land are chains of frames after and before static frames of animation, - where the drone doesn't move in xy plane, and it's z coordinate only increases or decreases, respectively. - * route is the rest of the animation - Count static_begin_time and takeoff_time - ''' - def split_animation(self, move_delta=0.01): - self.static_begin_frames = [] - self.takeoff_frames = [] - self.route_frames = [] - self.land_frames = [] - self.static_end_frames = [] - self.static_begin_time = 0 - self.takeoff_time = 0 - self.route_time = 0 - self.land_time = 0 + def set_yaw(self): + try: + yaw = self.config.animation_yaw + except (TypeError, KeyError) as e: + self.set_state("Can't set yaw from config 'ANIMATION' section. {}".format(e), log_error=True) + return + for frame in self.original_frames: + try: + frame.set_yaw(yaw) + except ValueError as e: + self.set_state("Can't set yaw from config 'ANIMATION' section. {}".format(e), log_error=True) + return + + def split(self, move_delta=0.01): + ''' + Split animation into 5 parts: static_begin, takeoff, route, land, static_end + * static_begin and static_end are chains of frames in the beginning and the end of animation, + where the drone doesn't move + * takeoff and land are chains of frames after and before static frames of animation, + where the drone doesn't move in xy plane, and it's z coordinate only increases or decreases, respectively. + * route is the rest of the animation + ''' if len(self.original_frames) == 0: return frames = copy.deepcopy(self.original_frames) + # Get takeoff index i = 0 # Moving index from the beginning - # Select static begin frames - while i < len(frames) - 1: - self.static_begin_time += frames[i].delay + while i < len(frames): if moving(frames[i], frames[i+1], move_delta): break i += 1 - if i > 0: - self.static_begin_frames = frames[:i+1] - frames = frames[i+1:] - i = 0 - else: - self.static_begin_time = 0 - # Select takeoff frames - while i < len(frames) - 1: - self.takeoff_time += frames[i].delay + self.takeoff_index = i + # Get route index + while i < len(frames): if moving(frames[i], frames[i+1], move_delta, z = False) or (frames[i+1].z - frames[i].z <= 0): break i += 1 - if i > 0: - self.takeoff_frames = frames[:i+1] - frames = frames[i+1:] - else: - self.takeoff_time = 0 + self.route_index = i + # Get static end index i = len(frames) - 1 # Moving index from the end - # Select static end frames while i >= 0: - self.static_end_time += frames[i].delay if moving(frames[i], frames[i-1], move_delta): break i -= 1 - if i < len(frames) - 1: - self.static_end_frames = frames[i+1:] - frames = frames[:i+1] - i = len(frames) - 1 - else: - self.static_end_time = 0 - # Select land frames + self.static_end_index = i + # Get land index while i >= 0: - self.land_time += frames[i].delay if moving(frames[i], frames[i-1], move_delta, z = False) or (frames[i-1].z - frames[i].z <= 0): break i -= 1 - if i < len(frames) - 1: - self.land_frames = frames[i+1:] - frames = frames[:i+1] - else: - self.land_time = 0 - # Get route frames - self.route_frames = frames - for frame in self.route_frames: - self.route_time += frame.delay + self.land_index = i - def make_output_frames(self, static_begin, takeoff, route, land, static_end): - self.output_frames = [] - self.output_frames_min_z = None - self.start_delay = 0. - if not self.original_frames: + def transform(self): + try: + x0, y0, z0 = self.config.animation_offset + except (ValueError, KeyError): + self.set_state("Can't transform animation: bad or empty config (offset in 'ANIMATION')", log_error=True) return - if not static_begin and not takeoff and not route and not land and not static_end: + try: + x_ratio, y_ratio, z_ratio = self.config.animation_ratio + except (ValueError, KeyError): + self.set_state("Can't transform animation: bad or empty config (ratio in 'ANIMATION')", log_error=True) + return + self.transformed_frames = copy.deepcopy(self.original_frames) + for frame in self.transformed_frames: + frame.x = x_ratio*frame.x + x0 + frame.y = y_ratio*frame.y + y0 + frame.z = z_ratio*frame.z + z0 + + def mark_stand_frames(self): + if not self.transformed_frames: + return + try: + takeoff_level = self.config.animation_takeoff_level + except (ValueError, KeyError): + self.set_state("Can't set frame actions: bad or empty config (takeoff_level in 'ANIMATION')", log_error=True) + return + static_begin_action = "fly" + static_end_action = "fly" + + # Check first frame + if self.transformed_frames[0].z < takeoff_level: + static_begin_action = "stand" + # Set action for static_begin frames + for frame in self.transformed_frames[:self.takeoff_index]: + frame.action = static_begin_action + + # Check last frame + if self.transformed_frames[-1].z < takeoff_level: + static_end_action = "stand" + # Set action for static_end frames + for frame in self.transformed_frames[self.static_end_index:]: + frame.action = static_end_action + + def apply_flags(self): + self.output_frames = [] + self.output_frames_takeoff = [] + if not self.transformed_frames: + return + try: + static_begin = self.config.animation_output_static_begin + takeoff = self.config.animation_output_takeoff + route = self.config.animation_output_route + land = self.config.animation_output_land + static_end = self.config.animation_output_static_end + except (ValueError, KeyError): + self.set_state("Can't get output flags: bad or empty config ('OUTPUT' in 'ANIMATION')", log_error=True) + return + try: + takeoff_level = self.config.animation_takeoff_level + except (ValueError, KeyError): + self.set_state("Can't set frame actions: bad or empty config (takeoff_level in 'ANIMATION')", log_error=True) return if static_begin: - self.output_frames += self.static_begin_frames - if not static_begin: - self.start_delay += self.static_begin_time + self.output_frames += self.transformed_frames[:self.takeoff_index] + self.output_frames_takeoff += self.transformed_frames[:self.takeoff_index] if takeoff: - self.output_frames += self.takeoff_frames - if not static_begin and not takeoff: - self.start_delay += self.takeoff_time + self.output_frames += self.transformed_frames[self.takeoff_index:self.route_index] + if self.transformed_frames[self.takeoff_index].z >= takeoff_level: + self.output_frames_takeoff += self.transformed_frames[self.takeoff_index:self.route_index] if route: - self.output_frames += self.route_frames - if not static_begin and not takeoff and not route: - self.start_delay += self.route_time + self.output_frames += self.transformed_frames[self.route_index:self.land_index] + self.output_frames_takeoff += self.transformed_frames[self.route_index:self.land_index] if land: - self.output_frames += self.land_frames - if not static_begin and not takeoff and not route and not land: - self.start_delay += self.land_time + self.output_frames += self.transformed_frames[self.land_index:self.static_end_index] + self.output_frames_takeoff += self.transformed_frames[self.land_index:self.static_end_index] if static_end: - self.output_frames += self.static_end_frames + self.output_frames += self.transformed_frames[self.static_end_index:] + self.output_frames_takeoff += self.transformed_frames[self.static_end_index:] if self.output_frames: self.output_frames_min_z = min(self.output_frames, key = lambda p: p.z).z + if self.output_frames_takeoff: + self.output_frames_takeoff_min_z = min(self.output_frames_takeoff, key = lambda p: p.z).z - def update_frames(self, config, filepath): - self.reset(filepath) - self.load(filepath, config) - self.make_output_frames(config.animation_output_static_begin, - config.animation_output_takeoff, - config.animation_output_route, - config.animation_output_land, - config.animation_output_static_end) + def mark_flight(self): + try: + arming_time = self.config.flight_arming_time + takeoff_time = self.config.flight_takeoff_time + rfp_time = self.config.flight_reach_first_point_time + land_delay = self.config.flight_land_delay + except (ValueError, KeyError): + self.set_state("Can't mark flight: bad or empty config ('FLIGHT' section)", log_error=True) + return + # add arm frame to output_frames + i = 0 + while i < len(self.output_frames): + if self.output_frames[i].action == 'fly': + frame = copy.deepcopy(self.output_frames[i]) + frame.action = 'arm' + frame.delay = arming_time + self.output_frames.insert(i, frame) + break + i += 1 + # add takeoff frame to output_frames_takeoff + i = 0 + while i < len(self.output_frames_takeoff): + if self.output_frames_takeoff[i].action == 'fly': + # set first fly frame action to reach point + self.output_frames_takeoff[i].action = 'reach' + self.output_frames_takeoff[i].delay = rfp_time + # add takeoff action before reach point + frame = copy.deepcopy(self.output_frames_takeoff[i]) + frame.action = 'takeoff' + frame.delay = takeoff_time + self.output_frames_takeoff.insert(i, frame) + break + i += 1 + # add land frame to output_frames + i = len(self.output_frames) - 1 + while i >=0: + if self.output_frames[i].action == 'fly': + frame = copy.deepcopy(self.output_frames[i]) + frame.action = 'land' + self.output_frames[i].delay = land_delay + self.output_frames.insert(i, frame) + i = len(self.output_frames_takeoff) - 1 + # add land frame to output_frames_takeoff + while i >=0: + if self.output_frames_takeoff[i].action == 'fly': + frame = copy.deepcopy(self.output_frames_takeoff[i]) + frame.action = 'land' + self.output_frames_takeoff[i].delay = land_delay + self.output_frames_takeoff.insert(i, frame) - def get_scaled_output(self, ratio=(1,1,1), offset=(0,0,0)): - x0, y0, z0 = offset - x_ratio, y_ratio, z_ratio = ratio - scaled_frames = copy.deepcopy(self.output_frames) - if scaled_frames: - for frame in scaled_frames: - frame.x = x_ratio*frame.x + x0 - frame.y = y_ratio*frame.y + y0 - frame.z = z_ratio*frame.z + z0 - return scaled_frames + def on_animation_update(self, filepath="animation.csv"): + self.filepath = filepath + self.load() + if self.original_frames: + self.on_config_update(self.config) - def get_scaled_output_min_z(self, ratio=(1,1,1), offset=(0,0,0)): - if self.output_frames_min_z is None: - return None - z0 = offset[2] - z_ratio = ratio[2] - return self.output_frames_min_z*z_ratio + z0 + def on_config_update(self, config): + self.config = config + self.transform() + self.mark_stand_frames + self.apply_flags() + self.mark_flight() - def get_start_point(self, ratio=(1,1,1), offset=(0,0,0)): + def get_start_frame(self): if not self.output_frames: - return [] - x0, y0, z0 = offset - x_ratio, y_ratio, z_ratio = ratio - first_frame = self.output_frames[0] - x = x_ratio*first_frame.x + x0 - y = y_ratio*first_frame.y + y0 - z = z_ratio*first_frame.z + z0 - return [x, y, z] + return Frame() + return self.output_frames[0] - def get_start_yaw(self): - if not self.output_frames: - return float('nan') - return math.degrees(self.output_frames[0].yaw) - - def check_ground(self, ground_level=0, ratio=(1,1,1), offset=(0,0,0)): - return ground_level <= self.get_scaled_output_min_z(ratio, offset) - - def get_start_action(self, start_action, current_height, takeoff_level, - ground_level, ratio=(1,1,1), offset=(0,0,0), state="STANDBY"): + def get_start_action(self, current_height, state="STANDBY", tolerance = 0.2): # Check output frames if not self.output_frames: return 'error: empty output frames' + # Check current_height if math.isnan(current_height): return 'error: bad copter height' # Check that bottom point of animation is higher than ground level - if ground_level == 'current': - ground_level = current_height try: + ground_level = self.config.animation_ground_level + if ground_level == 'current': + ground_level = current_height ground_level = float(ground_level) - except ValueError: + except (ValueError, KeyError): return 'error in [ANIMATION] ground_level parameter' - if state != "ACTIVE" and ground_level > self.get_scaled_output_min_z(ratio, offset): - return 'error: some animation points are lower than ground level for {:.2f}m (max)'.format( - ground_level - self.get_scaled_output_min_z(ratio, offset) + if state != "ACTIVE" and ground_level - tolerance > self.output_frames_min_z: + return 'error: animation is lower than ground level for {:.2f}m'.format( + ground_level - self.output_frames_min_z ) # Select start action + try: + start_action = self.config.animation_start_action + except (ValueError, KeyError): + return 'error in [ANIMATION] start_action' + try: + takeoff_level = self.config.animation_takeoff_level + except (ValueError, KeyError): + return 'error in [ANIMATION] takeoff_level' if start_action == 'auto': - if self.get_start_point(ratio, offset)[2] - current_height > takeoff_level: + if self.get_start_frame().z > takeoff_level: return 'takeoff' else: return 'fly' @@ -349,24 +424,17 @@ class Animation(object): else: return 'error in [ANIMATION] start_action parameter' - # Need for tests - def save_corrected_animation(self): - name, ext = os.path.splitext(self.filepath) - filepath = name + '_corrected' + ext - with open(filepath, mode='w+') as corrected_animation: - csv_writer = csv.writer(corrected_animation, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) - for frame in self.corrected_frames: - csv_writer.writerow([frame.number, frame.x, frame.y, frame.z, frame.red, frame.green, frame.blue, frame.delay]) - try: - def execute_frame(frame, frame_id='aruco_map', use_leds=True, - flight_func=flight.navto, auto_arm=False, flight_kwargs=None, interrupter=interrupt_event): + def execute_frame(frame, frame_id='map', use_leds=True, auto_arm=False, interrupter=interrupt_event, flight_kwargs=None): if flight_kwargs is None: flight_kwargs = {} - if frame.pose_is_valid(): - flight_func(x=frame.x, y=frame.y, z=frame.z, yaw=frame.yaw, frame_id=frame_id, auto_arm=auto_arm, interrupter=interrupt_event, **flight_kwargs) - else: - logger.error("Frame pose is not valid for flying") + if frame.action in ('fly', 'arm'): + if frame.action == 'arm': + auto_arm = True + if frame.pose_is_valid(): + flight_func(x=frame.x, y=frame.y, z=frame.z, yaw=frame.yaw, frame_id=frame_id, auto_arm=auto_arm, interrupter=interrupt_event, **flight_kwargs) + else: + logger.error("Frame pose is not valid for flying") if use_leds: try: red, green, blue = frame.get_color() @@ -387,7 +455,7 @@ try: led.set_effect(effect='blink_fast', r=0, g=255, b=0) - def land(z=1.5, descend=False, timeout=5.0, frame_id='aruco_map', use_leds=True, + def land(z=1.5, descend=False, timeout=5.0, frame_id='map', use_leds=True, interrupter=interrupt_event): if use_leds: led.set_effect(effect='blink_fast', r=255, g=0, b=0)