Drone: Update animation_lib module

This commit is contained in:
Arthur Golubtsov
2020-05-23 15:56:43 +03:00
parent 72e1d1e916
commit 9f22603ecb

View File

@@ -1,6 +1,8 @@
import os
import time
import csv
import copy
import numpy
import rospy
import logging
import threading
@@ -15,165 +17,211 @@ try:
except ImportError:
print("Can't import LedLib")
import tasking_lib as tasking
logger = logging.getLogger(__name__)
interrupt_event = threading.Event()
anim_id = "Empty id"
def moving(f1, f2, delta, x = True, y = True, z = True):
return ((abs(f1['x'] - f2['x']) > delta) and x
or (abs(f1['y'] - f2['y']) > delta) and y
or (abs(f1['z'] - f2['z']) > delta) and z)
# TODO refactor as class
# TODO separate code for frames transformations (e.g. for gps)
class Animation(object):
def __init__(self, config = None, filepath = "animation.csv"):
self.id = None
self.static_begin_time = None
self.takeoff_time = None
self.original_frames = None
self.static_begin_frames = None
self.takeoff_frames = None
self.route_frames = None
self.land_frames = None
self.static_end_frames = None
self.output_frames = None
self.output_frames_min_z = None
self.filepath = filepath
if config is not None:
self.update_frames(config, filepath)
def get_id(filepath="animation.csv"):
global anim_id
try:
animation_file = open(filepath)
except IOError:
logger.debug("File {} can't be opened".format(filepath))
anim_id = "No animation"
return anim_id
else:
with animation_file:
csv_reader = csv.reader(
animation_file, delimiter=',', quotechar='|'
)
row_0 = csv_reader.next()
if len(row_0) == 1:
anim_id = row_0[0]
logger.debug("Got animation_id: {}".format(anim_id))
else:
anim_id = "Empty id"
logger.debug("No animation id in file")
return anim_id
def get_start_xy(filepath="animation.csv", x_ratio=1, y_ratio=1, z_ratio=1):
try:
animation_file = open(filepath)
except IOError:
logger.debug("File {} can't be opened".format(filepath))
anim_id = "No animation"
return float('nan'), float('nan')
else:
with animation_file:
csv_reader = csv.reader(
animation_file, delimiter=',', quotechar='|'
)
try:
row_frame = csv_reader.next()
except:
return float('nan'), float('nan')
if len(row_frame) == 1:
anim_id = row_frame[0]
logger.debug("Got animation_id: {}".format(row_frame[0]))
row_frame = csv_reader.next()
if len(row_frame) == 2:
logger.debug("Got frame delay: {}".format(row_frame[1]))
row_frame - csv_reader.next()
try:
frame_number, x, y, z, yaw, red, green, blue = row_frame
except:
return float('nan'), float('nan')
return float(x)*x_ratio, float(y)*y_ratio
def load_animation(filepath="animation.csv", default_delay = 0.1, x0=0, y0=0, z0=0, x_ratio=1, y_ratio=1, z_ratio=1):
imported_frames = []
global anim_id
try:
animation_file = open(filepath)
except IOError:
logger.debug("File {} can't be opened".format(filepath))
anim_id = "No animation"
else:
with animation_file:
current_frame_delay = default_delay
csv_reader = csv.reader(
animation_file, delimiter=',', quotechar='|'
)
row_0 = csv_reader.next()
if len(row_0) == 1:
anim_id = row_0[0]
logger.debug("Got animation_id: {}".format(anim_id))
elif len(row_0) == 2:
current_frame_delay = float(row_0[1])
logger.debug("Got new frame delay: {}".format(current_frame_delay))
else:
logger.debug("No animation id in file")
frame_number, x, y, z, yaw, red, green, blue = row_0
imported_frames.append({
'number': int(frame_number),
'x': x_ratio*float(x) + x0,
'y': y_ratio*float(y) + y0,
'z': z_ratio*float(z) + z0,
'yaw': float(yaw),
'red': int(red),
'green': int(green),
'blue': int(blue),
'delay': current_frame_delay
})
for row in csv_reader:
if len(row) == 2:
current_frame_delay = float(row[1])
def load(self, delay=0.1, filepath="animation.csv"):
self.original_frames = []
self.corrected_frames = []
self.filepath = filepath
try:
animation_file = open(filepath)
except IOError:
logger.debug("File {} can't be opened".format(filepath))
self.id = "No animation"
else:
with animation_file:
current_frame_delay = delay
csv_reader = csv.reader(
animation_file, delimiter=',', quotechar='|'
)
row_0 = csv_reader.next()
if len(row_0) == 1:
self.id = row_0[0]
logger.debug("Got animation_id: {}".format(self.id))
elif len(row_0) == 2:
current_frame_delay = float(row_0[1])
logger.debug("Got new frame delay: {}".format(current_frame_delay))
else:
frame_number, x, y, z, yaw, red, green, blue = row
imported_frames.append({
logger.debug("No animation id in file")
frame_number, x, y, z, yaw, red, green, blue = row_0
self.original_frames.append({
'number': int(frame_number),
'x': x_ratio*float(x) + x0,
'y': y_ratio*float(y) + y0,
'z': z_ratio*float(z) + z0,
'x': float(x),
'y': float(y),
'z': float(z),
'yaw': float(yaw),
'red': int(red),
'green': int(green),
'blue': int(blue),
'delay': current_frame_delay
})
return imported_frames
for row in csv_reader:
if len(row) == 2:
current_frame_delay = float(row[1])
logger.debug("Got new frame delay: {}".format(current_frame_delay))
else:
frame_number, x, y, z, yaw, red, green, blue = row
self.original_frames.append({
'number': int(frame_number),
'x': float(x),
'y': float(y),
'z': float(z),
'yaw': float(yaw),
'red': int(red),
'green': int(green),
'blue': int(blue),
'delay': current_frame_delay
})
self.split_animation()
def correct_animation(frames, frame_delay=0.1, min_takeoff_height=0.5, move_delta=0.01, check_takeoff=True, check_land=True):
corrected_frames = copy.deepcopy(frames)
start_action = 'takeoff'
frames_to_start = 0
time_to_start = 0
if len(corrected_frames) == 0:
raise Exception('Nothing to correct!')
# Check takeoff
# If copter takes off in animation file, copter must be armed first and then all animation can be played
if (corrected_frames[0]['z'] < min_takeoff_height) and check_takeoff:
start_action = 'arm'
# If the first point is low, then detect moment to arm,
# delete all points, where copter is standing, and count time_delta
for i in range(len(corrected_frames)-1):
if corrected_frames[i-frames_to_start+1]['z'] - corrected_frames[i-frames_to_start]['z'] > move_delta:
'''
Split animation into 5 parts: static_begin, takeoff, route, land, static_end
* static_begin and static_end are arrays of frames in the beginning and the end of animation,
where the drone doesn't move
* takeoff and land are arrays of frames after and before static frames of animation,
where the drone doesn't move in xy plane, and it's z coordinate only increases or decreases, respectively.
* route is the rest of the animation
Count static_begin_time and takeoff_time
'''
def split_animation(self, move_delta=0.01):
if len(self.original_frames) == 0:
return
frames = copy.deepcopy(self.original_frames)
self.static_begin_frames = []
self.takeoff_frames = []
self.route_frames = []
self.land_frames = []
self.static_end_frames = []
self.static_begin_time = 0
self.takeoff_time = 0
i = 0 # Moving index from the beginning
# Select static begin frames
while i < len(frames) - 1:
if moving(frames[i], frames[i+1], move_delta):
break
time_to_start += corrected_frames[i-frames_to_start]['delay']
del corrected_frames[i-frames_to_start]
frames_to_start += 1
start_delay = time_to_start
# Check Land
# If copter lands in animation, landing points can be deleted
if (corrected_frames[len(corrected_frames)-1]['z'] < min_takeoff_height) and check_land:
for i in range(len(corrected_frames)-1,0,-1):
# print i
if abs(corrected_frames[i-1]['z'] - corrected_frames[i]['z']) < move_delta:
self.static_begin_time += frames[i]['delay']
i += 1
self.static_begin_frames = frames[:i+1]
frames = frames[i+1:]
i = 0
# Select takeoff frames
while i < len(frames) - 1:
if moving(frames[i], frames[i+1], move_delta, z = False) or frames[i]['z'] - frames[i+1]['z'] <= 0:
break
del corrected_frames[i]
for i in range(len(corrected_frames)-1,0,-1):
if (abs(corrected_frames[i-1]['x'] - corrected_frames[i]['x']) > move_delta or
abs(corrected_frames[i-1]['y'] - corrected_frames[i]['y']) > move_delta):
self.takeoff_time += frames[i]['delay']
i += 1
self.takeoff_frames = frames[:i+1]
frames = frames[i+1:]
i = len(frames) - 1 # Moving index from the end
# Select static end frames
while i >= 0:
if moving(frames[i], frames[i-1], move_delta):
break
del corrected_frames[i]
return corrected_frames, start_action, start_delay
i -= 1
self.static_end_frames = frames[i:]
frames = frames[:i]
i -= len(frames) - 1
# Select land frames
while i >= 0:
if moving(frames[i], frames[i-1], move_delta, z = False) or frames[i-1]['z'] - frames[i]['z'] >= 0:
break
i -= 1
self.land_frames = frames[i:]
# Get route frames
self.route_frames = frames[:i]
# Needs for test
def save_corrected_animation(frames, filename="corrected_animation.csv"):
corrected_animation = open(filename, mode='w+')
csv_writer = csv.writer(corrected_animation, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
for frame in frames:
csv_writer.writerow([frame['number'],frame['x'], frame['y'], frame['z'], frame['delay']])
# print frame
corrected_animation.close()
def make_output_frames(self, static_begin, takeoff, route, land, static_end):
self.output_frames = []
if static_begin:
self.output_frames += self.static_begin_frames
if takeoff:
self.output_frames += self.takeoff_frames
if route:
self.output_frames += self.route_frames
if land:
self.output_frames += self.land_frames
if static_end:
self.output_frames += self.static_end_frames
self.output_frames_min_z = min(self.output_frames, key = lambda p: p['z'])['z']
def update_frames(self, config, filepath):
self.load(config.animation_frame_delay, filepath)
self.make_output_frames(config.animation_output_static_begin,
config.animation_output_takeoff,
config.animation_output_route,
config.animation_output_land,
config.animation_output_static_end)
def get_scaled_output(self, ratio = (1,1,1), offset = (0,0,0)):
x0, y0, z0 = offset
x_ratio, y_ratio, z_ratio = ratio
scaled_frames = copy.deepcopy(self.output_frames)
for frame in scaled_frames:
frame['x'] = x_ratio*frame['x'] + x0
frame['y'] = y_ratio*frame['y'] + y0
frame['z'] = z_ratio*frame['z'] + z0
return scaled_frames
def get_scaled_output_min_z(self, ratio = (1,1,1), offset = (0,0,0)):
x0, y0, z0 = offset
x_ratio, y_ratio, z_ratio = ratio
return self.output_frames_min_z*z_ratio + z0
def get_start_point(self, ratio = (1,1,1), offset = (0,0,0)):
x0, y0, z0 = offset
x_ratio, y_ratio, z_ratio = ratio
first_frame = self.output_frames[0]
x = x_ratio*first_frame['x'] + x0
y = y_ratio*first_frame['y'] + y0
z = z_ratio*first_frame['z'] + z0
return x, y, z
def get_start_action(self, start_action, current_height, takeoff_level):
if start_action is 'auto':
if current_height > takeoff_level:
return 'takeoff'
else:
return 'play'
elif start_action in ('takeoff', 'play'):
return start_action
else:
return 'error'
def check_ground(self, ground_level = 0, ratio = (1,1,1), offset = (0,0,0)):
return ground_level <= self.get_scaled_output_min_z(ratio, offset)
# Need for tests
def save_corrected_animation(self):
name, ext = os.path.splitext(self.filepath)
filepath = name + '_corrected' + ext
with open(filepath, mode='w+') as corrected_animation:
csv_writer = csv.writer(corrected_animation, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
for frame in self.corrected_frames:
csv_writer.writerow([frame['number'], frame['x'], frame['y'], frame['z'], frame['red'], frame['green'], frame['blue'], frame['delay']])
def convert_frame(frame):
return ((frame['x'], frame['y'], frame['z']), (frame['red'], frame['green'], frame['blue']), frame['yaw'])
@@ -189,24 +237,6 @@ try:
if color:
LedLib.fill(*color)
def execute_animation(frames, frame_delay, frame_id='aruco_map', use_leds=True, flight_func=FlightLib.navto,
interrupter=interrupt_event):
next_frame_time = 0
for frame in frames:
if interrupter.is_set():
logger.warning("Animation playing function interrupted!")
interrupter.clear()
return
execute_frame(*convert_frame(frame), frame_id=frame_id, use_leds=use_leds, flight_func=flight_func,
interrupter=interrupter)
next_frame_time += frame_delay
tasking.wait(next_frame_time, interrupter)
def takeoff(z=1.5, safe_takeoff=True, frame_id='map', timeout=5.0, use_leds=True,
interrupter=interrupt_event):
if use_leds: