mirror of
https://github.com/CopterExpress/clever-show.git
synced 2026-05-30 08:49:33 +00:00
Merged master+server improvements
This commit is contained in:
@@ -44,6 +44,7 @@ FLIP_MIN_Z = 2.0
|
||||
|
||||
checklist = []
|
||||
get_telemetry_lock = threading.Lock()
|
||||
delta = 0.0
|
||||
|
||||
def get_telemetry_locked(*args, **kwargs):
|
||||
with get_telemetry_lock:
|
||||
@@ -161,9 +162,6 @@ def check_angles(angle_limit=math.radians(5)):
|
||||
if abs(telemetry.roll) >= angle_limit:
|
||||
yield ("Roll estimation: {:.3f} rad;{:.3f} degrees".format(telemetry.roll,
|
||||
math.degrees(telemetry.roll)))
|
||||
if abs(telemetry.yaw) >= angle_limit:
|
||||
yield ("Yaw estimation: {:.3f} rad;{:.3f} degrees".format(telemetry.yaw,
|
||||
math.degrees(telemetry.yaw)))
|
||||
|
||||
|
||||
def selfcheck():
|
||||
@@ -174,12 +172,22 @@ def selfcheck():
|
||||
|
||||
return checks
|
||||
|
||||
def get_delta():
|
||||
global delta
|
||||
return delta
|
||||
|
||||
def reset_delta():
|
||||
global delta
|
||||
delta = 0
|
||||
|
||||
def navto(x, y, z, yaw=float('nan'), frame_id=FRAME_ID, auto_arm=False, **kwargs):
|
||||
global delta
|
||||
set_position(frame_id=frame_id, x=x, y=y, z=z, yaw=yaw, auto_arm=auto_arm)
|
||||
#telemetry = get_telemetry_locked(frame_id=frame_id)
|
||||
telemetry = get_telemetry_locked(frame_id=frame_id)
|
||||
delta = get_distance3d(x, y, z, telemetry.x, telemetry.y, telemetry.z)
|
||||
|
||||
logger.info('Going to: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(x, y, z, yaw))
|
||||
#logger.info('Delta: {}'.format(delta))
|
||||
#print('Going to: | x: {:.3f} y: {:.3f} z: {:.3f} yaw: {:.3f}'.format(x, y, z, yaw))
|
||||
##logger.info('Telemetry now: | z: {:.3f}'.format(telemetry.z))
|
||||
#print('Telemetry now: | z: {:.3f}'.format(telemetry.z))
|
||||
@@ -272,6 +280,7 @@ def stop(frame_id='body', hold_speed=SPEED):
|
||||
|
||||
def land(descend=True, z=Z_DESCEND, frame_id_descend=FRAME_ID, frame_id_land=FRAME_ID,
|
||||
timeout_descend=TIMEOUT_DESCEND, timeout_land=TIMEOUT_LAND, freq=FREQUENCY, interrupter=INTERRUPTER):
|
||||
reset_delta()
|
||||
if descend:
|
||||
logger.info("Descending to: | z: {:.3f}".format(z))
|
||||
#print("Descending to: | z: {:.3f}".format(z))
|
||||
|
||||
@@ -4,9 +4,16 @@ import copy
|
||||
import rospy
|
||||
import logging
|
||||
import threading
|
||||
import ConfigParser
|
||||
|
||||
from FlightLib import FlightLib
|
||||
from FlightLib import LedLib
|
||||
try:
|
||||
from FlightLib import FlightLib
|
||||
except ImportError:
|
||||
print("Can't import FlightLib")
|
||||
try:
|
||||
from FlightLib import LedLib
|
||||
except ImportError:
|
||||
print("Can't import LedLib")
|
||||
|
||||
import tasking_lib as tasking
|
||||
|
||||
@@ -14,6 +21,11 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
interrupt_event = threading.Event()
|
||||
|
||||
config = ConfigParser.ConfigParser()
|
||||
config.read("client_config.ini")
|
||||
|
||||
default_delay = config.getfloat('ANIMATION', 'frame_delay')
|
||||
|
||||
anim_id = "Empty id"
|
||||
|
||||
# TODO refactor as class
|
||||
@@ -54,23 +66,20 @@ def get_start_xy(filepath="animation.csv", x_ratio=1, y_ratio=1):
|
||||
animation_file, delimiter=',', quotechar='|'
|
||||
)
|
||||
try:
|
||||
row_0 = csv_reader.next()
|
||||
row_frame = csv_reader.next()
|
||||
except:
|
||||
return float('nan'), float('nan')
|
||||
if len(row_frame) == 1:
|
||||
anim_id = row_frame[0]
|
||||
logger.debug("Got animation_id: {}".format(row_frame[0]))
|
||||
row_frame = csv_reader.next()
|
||||
if len(row_frame) == 2:
|
||||
logger.debug("Got frame delay: {}".format(row_frame[1]))
|
||||
row_frame - csv_reader.next()
|
||||
try:
|
||||
frame_number, x, y, z, yaw, red, green, blue = row_frame
|
||||
except:
|
||||
return float('nan'), float('nan')
|
||||
if len(row_0) == 1:
|
||||
anim_id = row_0[0]
|
||||
logger.debug("Got animation_id: {}".format(anim_id))
|
||||
try:
|
||||
frame_number, x, y, z, yaw, red, green, blue = csv_reader.next()
|
||||
except:
|
||||
return float('nan'), float('nan')
|
||||
else:
|
||||
anim_id = "Empty id"
|
||||
logger.debug("No animation id in file")
|
||||
try:
|
||||
frame_number, x, y, z, yaw, red, green, blue = row_0
|
||||
except:
|
||||
return float('nan'), float('nan')
|
||||
return float(x)*x_ratio, float(y)*y_ratio
|
||||
|
||||
|
||||
@@ -84,6 +93,7 @@ def load_animation(filepath="animation.csv", x0=0, y0=0, z0=0, x_ratio=1, y_rati
|
||||
anim_id = "No animation"
|
||||
else:
|
||||
with animation_file:
|
||||
current_frame_delay = default_delay
|
||||
csv_reader = csv.reader(
|
||||
animation_file, delimiter=',', quotechar='|'
|
||||
)
|
||||
@@ -91,6 +101,9 @@ def load_animation(filepath="animation.csv", x0=0, y0=0, z0=0, x_ratio=1, y_rati
|
||||
if len(row_0) == 1:
|
||||
anim_id = row_0[0]
|
||||
logger.debug("Got animation_id: {}".format(anim_id))
|
||||
elif len(row_0) == 2:
|
||||
current_frame_delay = float(row_0[1])
|
||||
logger.debug("Got new frame delay: {}".format(current_frame_delay))
|
||||
else:
|
||||
logger.debug("No animation id in file")
|
||||
frame_number, x, y, z, yaw, red, green, blue = row_0
|
||||
@@ -103,25 +116,31 @@ def load_animation(filepath="animation.csv", x0=0, y0=0, z0=0, x_ratio=1, y_rati
|
||||
'red': int(red),
|
||||
'green': int(green),
|
||||
'blue': int(blue),
|
||||
'delay': current_frame_delay
|
||||
})
|
||||
for row in csv_reader:
|
||||
frame_number, x, y, z, yaw, red, green, blue = row
|
||||
imported_frames.append({
|
||||
'number': int(frame_number),
|
||||
'x': x_ratio*float(x) + x0,
|
||||
'y': y_ratio*float(y) + y0,
|
||||
'z': z_ratio*float(z) + z0,
|
||||
'yaw': float(yaw),
|
||||
'red': int(red),
|
||||
'green': int(green),
|
||||
'blue': int(blue),
|
||||
})
|
||||
if len(row) == 2:
|
||||
current_frame_delay = float(row[1])
|
||||
else:
|
||||
frame_number, x, y, z, yaw, red, green, blue = row
|
||||
imported_frames.append({
|
||||
'number': int(frame_number),
|
||||
'x': x_ratio*float(x) + x0,
|
||||
'y': y_ratio*float(y) + y0,
|
||||
'z': z_ratio*float(z) + z0,
|
||||
'yaw': float(yaw),
|
||||
'red': int(red),
|
||||
'green': int(green),
|
||||
'blue': int(blue),
|
||||
'delay': current_frame_delay
|
||||
})
|
||||
return imported_frames
|
||||
|
||||
def correct_animation(frames, frame_delay=0.1, min_takeoff_height=0.5, move_delta=0.01, check_takeoff=True, check_land=True):
|
||||
corrected_frames = copy.deepcopy(frames)
|
||||
start_action = 'takeoff'
|
||||
frames_to_start = 0
|
||||
time_to_start = 0
|
||||
if len(corrected_frames) == 0:
|
||||
raise Exception('Nothing to correct!')
|
||||
# Check takeoff
|
||||
@@ -133,9 +152,10 @@ def correct_animation(frames, frame_delay=0.1, min_takeoff_height=0.5, move_delt
|
||||
for i in range(len(corrected_frames)-1):
|
||||
if corrected_frames[i-frames_to_start+1]['z'] - corrected_frames[i-frames_to_start]['z'] > move_delta:
|
||||
break
|
||||
time_to_start += corrected_frames[i-frames_to_start]['delay']
|
||||
del corrected_frames[i-frames_to_start]
|
||||
frames_to_start += 1
|
||||
start_delay = frames_to_start*frame_delay
|
||||
start_delay = time_to_start
|
||||
# Check Land
|
||||
# If copter lands in animation, landing points can be deleted
|
||||
if (corrected_frames[len(corrected_frames)-1]['z'] < min_takeoff_height) and check_land:
|
||||
@@ -156,60 +176,61 @@ def save_corrected_animation(frames, filename="corrected_animation.csv"):
|
||||
corrected_animation = open(filename, mode='w+')
|
||||
csv_writer = csv.writer(corrected_animation, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
|
||||
for frame in frames:
|
||||
csv_writer.writerow([frame['number'],frame['x'], frame['y'], frame['z']])
|
||||
csv_writer.writerow([frame['number'],frame['x'], frame['y'], frame['z'], frame['delay']])
|
||||
# print frame
|
||||
corrected_animation.close()
|
||||
|
||||
def convert_frame(frame):
|
||||
return ((frame['x'], frame['y'], frame['z']), (frame['red'], frame['green'], frame['blue']), frame['yaw'])
|
||||
|
||||
try:
|
||||
def execute_frame(point=(), color=(), yaw=float('Nan'), frame_id='aruco_map', use_leds=True,
|
||||
flight_func=FlightLib.navto, auto_arm=False, flight_kwargs=None, interrupter=interrupt_event):
|
||||
if flight_kwargs is None:
|
||||
flight_kwargs = {}
|
||||
|
||||
def execute_frame(point=(), color=(), yaw=float('Nan'), frame_id='aruco_map', use_leds=True,
|
||||
flight_func=FlightLib.navto, auto_arm=False, flight_kwargs=None, interrupter=interrupt_event):
|
||||
if flight_kwargs is None:
|
||||
flight_kwargs = {}
|
||||
|
||||
flight_func(*point, yaw=yaw, frame_id=frame_id, auto_arm=auto_arm, interrupter=interrupt_event, **flight_kwargs)
|
||||
if use_leds:
|
||||
if color:
|
||||
LedLib.fill(*color)
|
||||
flight_func(*point, yaw=yaw, frame_id=frame_id, auto_arm=auto_arm, interrupter=interrupt_event, **flight_kwargs)
|
||||
if use_leds:
|
||||
if color:
|
||||
LedLib.fill(*color)
|
||||
|
||||
|
||||
def execute_animation(frames, frame_delay, frame_id='aruco_map', use_leds=True, flight_func=FlightLib.navto,
|
||||
interrupter=interrupt_event):
|
||||
next_frame_time = 0
|
||||
for frame in frames:
|
||||
if interrupter.is_set():
|
||||
logger.warning("Animation playing function interrupted!")
|
||||
interrupter.clear()
|
||||
return
|
||||
execute_frame(*convert_frame(frame), frame_id=frame_id, use_leds=use_leds, flight_func=flight_func,
|
||||
interrupter=interrupter)
|
||||
|
||||
next_frame_time += frame_delay
|
||||
tasking.wait(next_frame_time, interrupter)
|
||||
|
||||
|
||||
def takeoff(z=1.5, safe_takeoff=True, frame_id='map', timeout=5.0, use_leds=True,
|
||||
def execute_animation(frames, frame_delay, frame_id='aruco_map', use_leds=True, flight_func=FlightLib.navto,
|
||||
interrupter=interrupt_event):
|
||||
next_frame_time = 0
|
||||
for frame in frames:
|
||||
if interrupter.is_set():
|
||||
logger.warning("Animation playing function interrupted!")
|
||||
interrupter.clear()
|
||||
return
|
||||
execute_frame(*convert_frame(frame), frame_id=frame_id, use_leds=use_leds, flight_func=flight_func,
|
||||
interrupter=interrupter)
|
||||
|
||||
next_frame_time += frame_delay
|
||||
tasking.wait(next_frame_time, interrupter)
|
||||
|
||||
|
||||
def takeoff(z=1.5, safe_takeoff=True, frame_id='map', timeout=5.0, use_leds=True,
|
||||
interrupter=interrupt_event):
|
||||
if use_leds:
|
||||
LedLib.wipe_to(255, 0, 0, interrupter=interrupter)
|
||||
result = FlightLib.takeoff(height=z, timeout_takeoff=timeout, frame_id=frame_id,
|
||||
emergency_land=safe_takeoff, interrupter=interrupter)
|
||||
if result == 'not armed' or result == 'timeout':
|
||||
raise Exception('STOP') # Raise exception to clear task_manager if copter can't arm
|
||||
if use_leds:
|
||||
LedLib.blink(0, 255, 0, wait=50, interrupter=interrupter)
|
||||
|
||||
|
||||
def land(z=1.5, descend=False, timeout=5.0, frame_id='aruco_map', use_leds=True,
|
||||
interrupter=interrupt_event):
|
||||
if use_leds:
|
||||
LedLib.wipe_to(255, 0, 0, interrupter=interrupter)
|
||||
if interrupter.is_set():
|
||||
return
|
||||
result = FlightLib.takeoff(height=z, timeout_takeoff=timeout, frame_id=frame_id,
|
||||
emergency_land=safe_takeoff, interrupter=interrupter)
|
||||
if result == 'not armed' or result == 'timeout':
|
||||
raise Exception('STOP') # Raise exception to clear task_manager if copter can't arm
|
||||
if interrupter.is_set():
|
||||
return
|
||||
if use_leds:
|
||||
LedLib.blink(0, 255, 0, wait=50, interrupter=interrupter)
|
||||
if use_leds:
|
||||
LedLib.blink(255, 0, 0, interrupter=interrupter)
|
||||
FlightLib.land(z=z, descend=descend, timeout_land=timeout, frame_id_land=frame_id, interrupter=interrupter)
|
||||
if use_leds:
|
||||
LedLib.off()
|
||||
|
||||
|
||||
def land(z=1.5, descend=False, timeout=5.0, frame_id='aruco_map', use_leds=True,
|
||||
interrupter=interrupt_event):
|
||||
if use_leds:
|
||||
LedLib.blink(255, 0, 0, interrupter=interrupter)
|
||||
FlightLib.land(z=z, descend=descend, timeout_land=timeout, frame_id_land=frame_id, interrupter=interrupter)
|
||||
if use_leds:
|
||||
LedLib.off()
|
||||
except NameError:
|
||||
print("Can't create flying functions")
|
||||
|
||||
@@ -106,6 +106,8 @@ class Client(object):
|
||||
|
||||
def start(self):
|
||||
logger.info("Starting client")
|
||||
messaging.NotifierSock().init(self.selector)
|
||||
|
||||
try:
|
||||
while True:
|
||||
self._reconnect()
|
||||
@@ -196,6 +198,7 @@ class Client(object):
|
||||
# self.server_connection.send_message("ping")
|
||||
# self._last_ping_time = time.time()
|
||||
# logging.debug("tick")
|
||||
|
||||
for key, mask in events: # TODO add notifier to client!
|
||||
connection = key.data
|
||||
if connection is None:
|
||||
@@ -214,11 +217,15 @@ class Client(object):
|
||||
if isinstance(error, OSError):
|
||||
if error.errno == errno.EINTR:
|
||||
raise KeyboardInterrupt
|
||||
|
||||
|
||||
if not self.selector.get_map():
|
||||
logger.warning("No active connections left!")
|
||||
return
|
||||
try:
|
||||
mapping = self.selector.get_map().values()
|
||||
notifier_key = self.selector.get_key(messaging.NotifierSock().get_sock())
|
||||
notify_only= len(mapping) == 1 and notifier_key in mapping
|
||||
if notify_only or not mapping:
|
||||
logger.warning("No active connections left!")
|
||||
return
|
||||
except (RuntimeError, KeyError) as e:
|
||||
logger.error("Exception {} occured when getting net map!".format(e))
|
||||
|
||||
|
||||
@messaging.message_callback("config_write")
|
||||
@@ -244,5 +251,6 @@ def _response_time(*args, **kwargs):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
client = Client()
|
||||
client.start()
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[SERVER]
|
||||
port = 25000
|
||||
broadcast_port = 8181
|
||||
host = 192.168.1.101
|
||||
host = 192.168.1.19
|
||||
buffer_size = 1024
|
||||
|
||||
[FILETRANSFER]
|
||||
@@ -23,7 +23,7 @@ timeout_to_disarm_after_watchdog_action = 10.0
|
||||
[TELEMETRY]
|
||||
frequency = 1
|
||||
transmit = True
|
||||
clear_tasks_when_emergency = True
|
||||
land_if_pos_delta_bigger_than = 3.0
|
||||
log_cpu_and_memory = True
|
||||
|
||||
[ANIMATION]
|
||||
@@ -35,22 +35,23 @@ y_ratio = 1.0
|
||||
z_ratio = 1.0
|
||||
|
||||
[COPTERS]
|
||||
frame_id = floor
|
||||
frame_id = map
|
||||
takeoff_height = 1.0
|
||||
takeoff_time = 5.0
|
||||
safe_takeoff = False
|
||||
reach_first_point_time = 5.0
|
||||
land_time = 3.0
|
||||
land_time = 1.0
|
||||
x0_common = 0
|
||||
y0_common = 0
|
||||
z0_common = 0
|
||||
land_timeout = 6.0
|
||||
yaw = 180
|
||||
land_timeout = 10.0
|
||||
|
||||
[FLOOR FRAME]
|
||||
parent = aruco_map
|
||||
x = 2.4
|
||||
y = 12.4
|
||||
z = 6.27
|
||||
z = 6.4
|
||||
roll = 180
|
||||
pitch = 0
|
||||
yaw = -90
|
||||
@@ -58,7 +59,7 @@ yaw = -90
|
||||
[PRIVATE]
|
||||
id = /hostname
|
||||
restart_dhcpcd = True
|
||||
use_leds = False
|
||||
use_leds = True
|
||||
led_pin = 21
|
||||
x0 = 0
|
||||
y0 = 0
|
||||
|
||||
@@ -38,6 +38,7 @@ country=GB
|
||||
network={
|
||||
ssid="$1"
|
||||
psk="$2"
|
||||
scan_ssid=1
|
||||
}
|
||||
EOF
|
||||
|
||||
@@ -71,8 +72,9 @@ EOF
|
||||
# change server ip in client_config
|
||||
sed -i "0,/^host/s/\(^h.*\)/host = $4/" client_config.ini
|
||||
|
||||
# enable clever show service
|
||||
# enable clever show service and visual_pose_watchdog service
|
||||
systemctl enable clever-show.service
|
||||
systemctl enable visual_pose_watchdog.service
|
||||
|
||||
# restart clever
|
||||
reboot
|
||||
@@ -29,11 +29,8 @@ from tf.transformations import quaternion_from_euler, euler_from_quaternion, qua
|
||||
import tf2_ros
|
||||
|
||||
static_bloadcaster = tf2_ros.StaticTransformBroadcaster()
|
||||
Telemetry = namedtuple("Telemetry", "git_version animation_id battery_v battery_p system_status calibration_status mode selfcheck current_position start_position armed")
|
||||
telemetry = Telemetry('nan', 'No animation', 'nan', 'nan', 'NO_FCU', 'NO_FCU', 'NO_FCU', 'NO_FCU', 'NO_POS', 'NO_POS', False)
|
||||
emergency = False
|
||||
|
||||
# get_telemetry = rospy.ServiceProxy('get_telemetry', srv.GetTelemetry)
|
||||
emergency = False
|
||||
|
||||
logging.basicConfig( # TODO all prints as logs
|
||||
level=logging.DEBUG, # INFO
|
||||
@@ -78,8 +75,8 @@ class CopterClient(client.Client):
|
||||
super(CopterClient, self).load_config()
|
||||
self.TELEM_FREQ = self.config.getfloat('TELEMETRY', 'frequency')
|
||||
self.TELEM_TRANSMIT = self.config.getboolean('TELEMETRY', 'transmit')
|
||||
self.CLEAR_TASKS_WHEN_EMERGENCY = self.config.getboolean('TELEMETRY', 'clear_tasks_when_emergency')
|
||||
self.LOG_CPU_AND_MEMORY = self.config.getboolean('TELEMETRY', 'log_cpu_and_memory')
|
||||
self.LAND_POS_DELTA = self.config.getfloat('TELEMETRY', 'land_if_pos_delta_bigger_than')
|
||||
self.FRAME_ID = self.config.get('COPTERS', 'frame_id')
|
||||
self.FRAME_FLIPPED_HEIGHT = 0.
|
||||
self.TAKEOFF_HEIGHT = self.config.getfloat('COPTERS', 'takeoff_height')
|
||||
@@ -91,6 +88,7 @@ class CopterClient(client.Client):
|
||||
self.X0_COMMON = self.config.getfloat('COPTERS', 'x0_common')
|
||||
self.Y0_COMMON = self.config.getfloat('COPTERS', 'y0_common')
|
||||
self.Z0_COMMON = self.config.getfloat('COPTERS', 'z0_common')
|
||||
self.YAW = self.config.get('COPTERS', 'yaw')
|
||||
self.TAKEOFF_CHECK = self.config.getboolean('ANIMATION', 'takeoff_animation_check')
|
||||
self.LAND_CHECK = self.config.getboolean('ANIMATION', 'land_animation_check')
|
||||
self.FRAME_DELAY = self.config.getfloat('ANIMATION', 'frame_delay')
|
||||
@@ -132,7 +130,8 @@ class CopterClient(client.Client):
|
||||
else:
|
||||
rospy.logerror("Can't make floor frame!")
|
||||
start_subscriber()
|
||||
telemetry_thread.start()
|
||||
|
||||
telemetry.start_loop()
|
||||
super(CopterClient, self).start()
|
||||
|
||||
def start_floor_frame_broadcast(self):
|
||||
@@ -305,9 +304,10 @@ def _response_selfcheck(*args, **kwargs):
|
||||
stop_subscriber()
|
||||
return "NOT_CONNECTED_TO_FCU"
|
||||
|
||||
|
||||
@messaging.request_callback("telemetry")
|
||||
def _response_telemetry(*args, **kwargs):
|
||||
return create_telemetry_message(telemetry)
|
||||
return telemetry.create_msg_contents()
|
||||
|
||||
|
||||
@messaging.request_callback("anim_id")
|
||||
@@ -581,7 +581,6 @@ def _play_animation(*args, **kwargs):
|
||||
check_takeoff=client.active_client.TAKEOFF_CHECK,
|
||||
check_land=client.active_client.LAND_CHECK,
|
||||
)
|
||||
|
||||
# Choose start action
|
||||
if start_action == 'takeoff':
|
||||
# Takeoff first
|
||||
@@ -635,16 +634,21 @@ def _play_animation(*args, **kwargs):
|
||||
# Play animation file
|
||||
for frame in corrected_frames:
|
||||
point, color, yaw = animation.convert_frame(frame)
|
||||
if client.active_client.YAW == "animation":
|
||||
yaw = frame["yaw"]
|
||||
else:
|
||||
yaw = math.radians(float(client.active_client.YAW))
|
||||
task_manager.add_task(frame_time, 0, animation.execute_frame,
|
||||
task_kwargs={
|
||||
"point": point,
|
||||
"color": color,
|
||||
"yaw": yaw,
|
||||
"frame_id": client.active_client.FRAME_ID,
|
||||
"use_leds": client.active_client.USE_LEDS,
|
||||
"flight_func": FlightLib.navto,
|
||||
}
|
||||
)
|
||||
frame_time += client.active_client.FRAME_DELAY
|
||||
frame_time += frame["delay"]
|
||||
|
||||
# Calculate land_time
|
||||
land_time = frame_time + client.active_client.LAND_TIME
|
||||
@@ -657,143 +661,239 @@ def _play_animation(*args, **kwargs):
|
||||
},
|
||||
)
|
||||
|
||||
def telemetry_loop():
|
||||
global telemetry, emergency
|
||||
last_state = []
|
||||
equal_state_counter = 0
|
||||
max_count = 2
|
||||
tasks_cleared = False
|
||||
rate = rospy.Rate(client.active_client.TELEM_FREQ)
|
||||
while not rospy.is_shutdown():
|
||||
telemetry = telemetry._replace(animation_id = animation.get_id())
|
||||
telemetry = telemetry._replace(git_version = subprocess.check_output("git log --pretty=format:'%h' -n 1", shell=True))
|
||||
class Telemetry:
|
||||
params_default_dict = {
|
||||
"git_version": None,
|
||||
"animation_id": None,
|
||||
"battery": None,
|
||||
"armed": False,
|
||||
"system_status": None,
|
||||
"calibration_status": None,
|
||||
"mode": None,
|
||||
"selfcheck": None,
|
||||
"current_position": None,
|
||||
"start_position": None,
|
||||
"time": None,
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self._lock = threading.Lock()
|
||||
self._last_state = []
|
||||
self._interruption_counter = 0
|
||||
self._max_interruptions = 2
|
||||
self._tasks_cleared = False
|
||||
|
||||
for key, value in self.params_default_dict.items():
|
||||
setattr(self, key, value)
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
if key in self.params_default_dict:
|
||||
with self.__dict__['_lock']:
|
||||
self.__dict__[key] = value
|
||||
else:
|
||||
self.__dict__[key] = value
|
||||
|
||||
def __getattr__(self, item):
|
||||
if item in self.params_default_dict:
|
||||
with self.__dict__['_lock']:
|
||||
return self.__dict__[item]
|
||||
|
||||
return self.__dict__[item]
|
||||
|
||||
@classmethod
|
||||
def get_git_version(cls):
|
||||
return subprocess.check_output("git log --pretty=format:'%h' -n 1", shell=True)
|
||||
|
||||
@classmethod
|
||||
def get_start_position(cls):
|
||||
x_start, y_start = animation.get_start_xy(os.path.abspath("animation.csv"),
|
||||
x_ratio=client.active_client.X_RATIO,
|
||||
y_ratio=client.active_client.Y_RATIO,
|
||||
)
|
||||
x_ratio=client.active_client.X_RATIO,
|
||||
y_ratio=client.active_client.Y_RATIO,
|
||||
)
|
||||
x_delta = client.active_client.X0 + client.active_client.X0_COMMON
|
||||
y_delta = client.active_client.Y0 + client.active_client.Y0_COMMON
|
||||
z_delta = client.active_client.Z0 + client.active_client.Z0_COMMON
|
||||
if not math.isnan(x_start):
|
||||
telemetry = telemetry._replace(start_position = '{:.2f} {:.2f} {:.2f}'.format(x_start+x_delta, y_start+y_delta, z_delta))
|
||||
else:
|
||||
telemetry = telemetry._replace(start_position = 'NO_POS')
|
||||
services_unavailable = FlightLib.check_ros_services_unavailable()
|
||||
if not services_unavailable:
|
||||
try:
|
||||
ros_telemetry = FlightLib.get_telemetry_locked(client.active_client.FRAME_ID)
|
||||
if ros_telemetry.connected:
|
||||
telemetry = telemetry._replace(armed = ros_telemetry.armed)
|
||||
telemetry = telemetry._replace(battery_v = '{:.2f}'.format(ros_telemetry.voltage))
|
||||
batt_empty_param = get_param('BAT_V_EMPTY')
|
||||
batt_charged_param = get_param('BAT_V_CHARGED')
|
||||
batt_cells_param = get_param('BAT_N_CELLS')
|
||||
if batt_empty_param.success and batt_charged_param.success and batt_cells_param.success:
|
||||
batt_empty = batt_empty_param.value.real
|
||||
batt_charged = batt_charged_param.value.real
|
||||
batt_cells = batt_cells_param.value.integer
|
||||
try:
|
||||
telemetry = telemetry._replace(battery_p = '{}'.format(int(min((ros_telemetry.voltage/batt_cells - batt_empty)/(batt_charged - batt_empty)*100., 100))))
|
||||
except ValueError:
|
||||
telemetry = telemetry._replace(battery_p = 'nan')
|
||||
else:
|
||||
telemetry = telemetry._replace(battery_p = 'nan')
|
||||
telemetry = telemetry._replace(calibration_status = get_calibration_status())
|
||||
telemetry = telemetry._replace(system_status = get_sys_status())
|
||||
telemetry = telemetry._replace(mode = ros_telemetry.mode)
|
||||
check = FlightLib.selfcheck()
|
||||
if not check:
|
||||
check = "OK"
|
||||
telemetry = telemetry._replace(selfcheck = str(check))
|
||||
if not math.isnan(ros_telemetry.x):
|
||||
telemetry = telemetry._replace(current_position = '{:.2f} {:.2f} {:.2f} {:.1f} {}'.format(ros_telemetry.x, ros_telemetry.y, ros_telemetry.z,
|
||||
math.degrees(ros_telemetry.yaw), client.active_client.FRAME_ID))
|
||||
else:
|
||||
telemetry = telemetry._replace(current_position = 'NO_POS in {}'.format(client.active_client.FRAME_ID))
|
||||
else:
|
||||
telemetry = telemetry._replace(battery_v = 'nan')
|
||||
telemetry = telemetry._replace(battery_p = 'nan')
|
||||
telemetry = telemetry._replace(calibration_status = 'NO_FCU')
|
||||
telemetry = telemetry._replace(system_status = 'NO_FCU')
|
||||
telemetry = telemetry._replace(mode = 'NO_FCU')
|
||||
telemetry = telemetry._replace(selfcheck = 'NO_FCU')
|
||||
telemetry = telemetry._replace(current_position = 'NO_POS')
|
||||
except rospy.ServiceException:
|
||||
logger.debug("Some service is unavailable")
|
||||
except AttributeError as e:
|
||||
logger.debug(e)
|
||||
except rospy.TransportException as e:
|
||||
logger.debug(e)
|
||||
else:
|
||||
telemetry = telemetry._replace(selfcheck = 'WAIT_ROS')
|
||||
if client.active_client.TELEM_TRANSMIT:
|
||||
try:
|
||||
client.active_client.server_connection.send_message('telem', args={'message':create_telemetry_message(telemetry)})
|
||||
except AttributeError as e:
|
||||
logger.debug(e)
|
||||
if client.active_client.CLEAR_TASKS_WHEN_EMERGENCY:
|
||||
mode = telemetry.mode
|
||||
armed = telemetry.armed
|
||||
last_task = task_manager.get_last_task_name()
|
||||
state = [mode, armed, last_task]
|
||||
if state == last_state:
|
||||
equal_state_counter += 1
|
||||
else:
|
||||
equal_state_counter = 0
|
||||
external_interruption = (mode != "OFFBOARD" and armed == True and last_task not in [None, 'land'])
|
||||
log_msg = ''
|
||||
if emergency and external_interruption:
|
||||
log_msg = "emergency and external interruption"
|
||||
elif emergency:
|
||||
log_msg = "emergency"
|
||||
elif external_interruption:
|
||||
log_msg = "external interruption"
|
||||
logger.info("Possible expernal interruption, state_counter = {}".format(equal_state_counter))
|
||||
if emergency or (external_interruption and equal_state_counter >= max_count):
|
||||
if not tasks_cleared:
|
||||
logger.info("Clear task manager because of {}".format(log_msg))
|
||||
logger.info("Mode: {} | armed: {} | last task: {} ".format(mode, armed, last_task))
|
||||
task_manager.reset()
|
||||
tasks_cleared = True
|
||||
equal_state_counter = 0
|
||||
else:
|
||||
tasks_cleared = False
|
||||
last_state = state
|
||||
if client.active_client.LOG_CPU_AND_MEMORY:
|
||||
cpu_usage = psutil.cpu_percent(interval=None, percpu=True)
|
||||
mem_usage = psutil.virtual_memory().percent
|
||||
cpu_temp_info = psutil.sensors_temperatures()['cpu-thermal'][0]
|
||||
cpu_temp = cpu_temp_info.current
|
||||
# https://github.com/raspberrypi/documentation/blob/JamesH65-patch-vcgencmd-vcdbg-docs/raspbian/applications/vcgencmd.md
|
||||
throttled_hex = subprocess.check_output("vcgencmd get_throttled", shell=True).split('=')[1]
|
||||
under_voltage = bool(int(bin(int(throttled_hex,16))[2:][-1]))
|
||||
power_state = 'normal' if not under_voltage else 'under voltage!'
|
||||
if cpu_temp_info.critical:
|
||||
cpu_temp_state = 'critical'
|
||||
elif cpu_temp_info.high:
|
||||
cpu_temp_state = 'high'
|
||||
else:
|
||||
cpu_temp_state = 'normal'
|
||||
logger.info("CPU usage: {} | Memory: {} % | T: {} ({}) | Power: {}".format(cpu_usage, mem_usage, cpu_temp, cpu_temp_state, power_state))
|
||||
|
||||
rate.sleep()
|
||||
x = x_start + x_delta
|
||||
y = y_start + y_delta
|
||||
if not FlightLib._check_nans(x, y, z_delta):
|
||||
return x, y, z_delta
|
||||
return 'NO_POS'
|
||||
|
||||
def create_telemetry_message(telemetry):
|
||||
msg = client.active_client.client_id + '`'
|
||||
for key in telemetry.__dict__:
|
||||
if key != 'armed':
|
||||
msg += telemetry.__dict__[key] + '`'
|
||||
msg += repr(time.time())
|
||||
return msg
|
||||
@classmethod
|
||||
def get_battery(cls, ros_telemetry):
|
||||
battery_v = ros_telemetry.voltage
|
||||
|
||||
batt_empty_param = get_param('BAT_V_EMPTY')
|
||||
batt_charged_param = get_param('BAT_V_CHARGED')
|
||||
batt_cells_param = get_param('BAT_N_CELLS')
|
||||
|
||||
if batt_empty_param.success and batt_charged_param.success and batt_cells_param.success:
|
||||
batt_empty = batt_empty_param.value.real
|
||||
batt_charged = batt_charged_param.value.real
|
||||
batt_cells = batt_cells_param.value.integer
|
||||
|
||||
battery_p = (ros_telemetry.voltage / batt_cells - batt_empty) / (batt_charged - batt_empty) * 1.
|
||||
battery_p = max(min(battery_p, 1.), 0.)
|
||||
else:
|
||||
battery_p = float('nan')
|
||||
|
||||
return battery_v, battery_p
|
||||
|
||||
@classmethod
|
||||
def get_selfcheck(cls):
|
||||
check = FlightLib.selfcheck()
|
||||
if not check:
|
||||
check = "OK"
|
||||
return check
|
||||
|
||||
@classmethod
|
||||
def get_position(cls, ros_telemetry):
|
||||
x, y, z = ros_telemetry.x, ros_telemetry.y, ros_telemetry.z
|
||||
if not math.isnan(x):
|
||||
return x, y, z, math.degrees(ros_telemetry.yaw), client.active_client.FRAME_ID
|
||||
return 'NO_POS'
|
||||
|
||||
def update_telemetry(self):
|
||||
self.animation_id = animation.get_id()
|
||||
self.git_version = self.get_git_version()
|
||||
self.start_position = self.get_start_position()
|
||||
try:
|
||||
ros_telemetry = FlightLib.get_telemetry_locked(client.active_client.FRAME_ID)
|
||||
if ros_telemetry.connected:
|
||||
self.battery = self.get_battery(ros_telemetry)
|
||||
self.armed = ros_telemetry.armed
|
||||
self.calibration_status = get_calibration_status()
|
||||
self.system_status = get_sys_status()
|
||||
self.mode = ros_telemetry.mode
|
||||
self.selfcheck = self.get_selfcheck()
|
||||
self.current_position = self.get_position(ros_telemetry)
|
||||
else:
|
||||
self.reset_telemetry_values()
|
||||
except rospy.ServiceException:
|
||||
rospy.logdebug("Some service is unavailable")
|
||||
self.selfcheck = ["WAIT_ROS"]
|
||||
except AttributeError as e:
|
||||
rospy.logdebug(e)
|
||||
except rospy.TransportException as e:
|
||||
rospy.logdebug(e)
|
||||
self.time = time.time()
|
||||
|
||||
def round_telemetry(self):
|
||||
round_list = ["battery", "start_position", "current_position"]
|
||||
for key in round_list:
|
||||
if self.__dict__[key] not in [None, 'NO_POS', 'NO_FCU']:
|
||||
self.__dict__[key] = [round(v,2) if type(v) == float else v for v in self.__dict__[key]]
|
||||
|
||||
def reset_telemetry_values(self):
|
||||
self.battery = float('nan'), float('nan')
|
||||
self.calibration_status = 'NO_FCU'
|
||||
self.system_status = 'NO_FCU'
|
||||
self.mode = 'NO_FCU'
|
||||
self.selfcheck = ['NO_FCU']
|
||||
self.current_position = 'NO_POS'
|
||||
|
||||
def check_failsafe(self):
|
||||
global emergency
|
||||
# check current state
|
||||
state = [self.mode, self.armed, task_manager.get_last_task_name()]
|
||||
mode, armed, last_task = state
|
||||
# check external interruption
|
||||
external_interruption = (mode != "OFFBOARD" and armed == True and last_task not in [None, 'land'])
|
||||
log_msg = ''
|
||||
if emergency:
|
||||
log_msg += 'emergency and '
|
||||
if external_interruption:
|
||||
log_msg += 'external interruption and '
|
||||
# count interruptions to avoid px4 mode glitches
|
||||
if state == self._last_state:
|
||||
self._interruption_counter += 1
|
||||
else:
|
||||
self._interruption_counter = 0
|
||||
logger.info("Possible expernal interruption, state_counter = {}".format(self._interruption_counter))
|
||||
# delete last ' end ' from log message
|
||||
if len(log_msg) > 5:
|
||||
log_msg = log_msg[:-5]
|
||||
# clear task manager if emergency or external interruption
|
||||
if emergency or (external_interruption and self._interruption_counter >= self._max_interruptions):
|
||||
if not self._tasks_cleared:
|
||||
logger.info("Clear task manager because of {}".format(log_msg))
|
||||
logger.info("Mode: {} | armed: {} | last task: {} ".format(mode, armed, last_task))
|
||||
task_manager.reset()
|
||||
FlightLib.reset_delta()
|
||||
self._tasks_cleared = True
|
||||
self._interruption_counter = 0
|
||||
else:
|
||||
self._tasks_cleared = False
|
||||
self._last_state = state
|
||||
# check position delta
|
||||
if not emergency:
|
||||
delta = FlightLib.get_delta()
|
||||
if delta > client.active_client.LAND_POS_DELTA:
|
||||
logger.info("Delta: {}".format(delta))
|
||||
_command_land()
|
||||
|
||||
def transmit_message(self):
|
||||
try:
|
||||
client.active_client.server_connection.send_message('telemetry', args={'value': self.create_msg_contents()})
|
||||
except AttributeError as e:
|
||||
logger.debug(e)
|
||||
|
||||
@classmethod
|
||||
def log_cpu_and_memory(cls):
|
||||
cpu_usage = psutil.cpu_percent(interval=None, percpu=True)
|
||||
mem_usage = psutil.virtual_memory().percent
|
||||
cpu_temp_info = psutil.sensors_temperatures()['cpu-thermal'][0]
|
||||
cpu_temp = cpu_temp_info.current
|
||||
# https://github.com/raspberrypi/documentation/blob/JamesH65-patch-vcgencmd-vcdbg-docs/raspbian/applications/vcgencmd.md
|
||||
throttled_hex = subprocess.check_output("vcgencmd get_throttled", shell=True).split('=')[1]
|
||||
under_voltage = bool(int(bin(int(throttled_hex,16))[2:][-1]))
|
||||
power_state = 'normal' if not under_voltage else 'under voltage!'
|
||||
if cpu_temp_info.critical:
|
||||
cpu_temp_state = 'critical'
|
||||
elif cpu_temp_info.high:
|
||||
cpu_temp_state = 'high'
|
||||
else:
|
||||
cpu_temp_state = 'normal'
|
||||
logger.info("CPU usage: {} | Memory: {} % | T: {} ({}) | Power: {}".format(
|
||||
cpu_usage, mem_usage, cpu_temp, cpu_temp_state, power_state))
|
||||
|
||||
def _update_loop(self, freq): # TODO extract?
|
||||
rate = rospy.Rate(freq)
|
||||
while not rospy.is_shutdown():
|
||||
|
||||
self.update_telemetry()
|
||||
self.round_telemetry()
|
||||
self.check_failsafe()
|
||||
|
||||
if client.active_client.TELEM_TRANSMIT and client.active_client.connected:
|
||||
self.transmit_message()
|
||||
|
||||
if client.active_client.LOG_CPU_AND_MEMORY:
|
||||
self.log_cpu_and_memory()
|
||||
|
||||
rate.sleep()
|
||||
|
||||
def start_loop(self):
|
||||
if client.active_client.TELEM_FREQ > 0:
|
||||
telemetry_thread = threading.Thread(target=self._update_loop, name="Telemetry getting thread",
|
||||
args=(client.active_client.TELEM_FREQ,)) # TODO MOVE? Daemon?
|
||||
telemetry_thread.start()
|
||||
else:
|
||||
logger.info("Don't create telemetry loop because of zero or negative telemetry frequency")
|
||||
|
||||
def create_msg_contents(self, keys=None): # keys: set or list
|
||||
if keys is None:
|
||||
keys = self.params_default_dict.keys()
|
||||
# return only existing keys from 'keys'
|
||||
return {k: self.__dict__[k] for k in keys if k in self.params_default_dict}
|
||||
|
||||
def emergency_callback(data):
|
||||
global emergency
|
||||
emergency = data.data
|
||||
|
||||
telemetry_thread = threading.Thread(target=telemetry_loop, name="Telemetry getting thread")
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
telemetry = Telemetry()
|
||||
copter_client = CopterClient()
|
||||
task_manager = tasking.TaskManager()
|
||||
rospy.Subscriber('/emergency', Bool, emergency_callback)
|
||||
|
||||
@@ -108,9 +108,9 @@ def get_sys_status():
|
||||
mavlink.MAV_STATE_EMERGENCY: "EMERGENCY",
|
||||
mavlink.MAV_STATE_POWEROFF: "POWEROFF",
|
||||
mavlink.MAV_STATE_FLIGHT_TERMINATION: "TERMINATION"
|
||||
}.get(system_status, "NOT_CONNECTED_TO_FCU")
|
||||
}.get(system_status, "NO_FCU")
|
||||
return status_text
|
||||
return "NOT_CONNECTED_TO_FCU"
|
||||
return "NO_FCU"
|
||||
|
||||
def start_subscriber():
|
||||
global heartbeat_sub, heartbeat_sub_status
|
||||
|
||||
@@ -15,6 +15,7 @@ parent_dir = os.path.dirname(current_dir)
|
||||
sys.path.insert(0, parent_dir)
|
||||
|
||||
import messaging_lib as messaging
|
||||
import timing_lib as timing
|
||||
from config import ConfigManager
|
||||
|
||||
random.seed()
|
||||
@@ -190,7 +191,6 @@ class Server(messaging.Singleton):
|
||||
|
||||
try:
|
||||
while self.broadcast_thread_running.is_set():
|
||||
time.sleep(self.config.broadcast_delay) # todo make interruptable (from time lib)
|
||||
broadcast_sock.sendto(msg, ('255.255.255.255', self.config.broadcast_port))
|
||||
logging.debug("Broadcast sent")
|
||||
|
||||
|
||||
111
messaging_lib.py
111
messaging_lib.py
@@ -17,12 +17,24 @@ try:
|
||||
except ImportError:
|
||||
import selectors2 as selectors
|
||||
|
||||
|
||||
# import logging_lib
|
||||
|
||||
PendingRequest = collections.namedtuple("PendingRequest", ["value", "requested_value", # "expires_on",
|
||||
"callback", "callback_args", "callback_kwargs",
|
||||
"request_args", "resend",
|
||||
])
|
||||
|
||||
class Namespace:
|
||||
def __init__(self, **kwargs):
|
||||
self.__dict__.update(kwargs)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.__dict__[key]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.__dict__[key] = value
|
||||
|
||||
|
||||
class PendingRequest(Namespace): pass
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -239,10 +251,19 @@ class ConnectionManager(object):
|
||||
self.socket = client_socket
|
||||
self.addr = client_addr
|
||||
|
||||
self._clear()
|
||||
|
||||
self._set_selector_events_mask('r')
|
||||
if self.resend_requests:
|
||||
self._resend_requests()
|
||||
|
||||
def _clear(self):
|
||||
if not self.resume_queue: # maybe needs locks
|
||||
self._recv_buffer = b''
|
||||
self._send_buffer = b''
|
||||
self._received_queue.clear()
|
||||
self._send_queue.clear()
|
||||
|
||||
def close(self):
|
||||
with self._close_lock:
|
||||
self._should_close = True
|
||||
@@ -253,11 +274,6 @@ class ConnectionManager(object):
|
||||
def _close(self):
|
||||
logger.info("Closing connection to {}".format(self.addr))
|
||||
|
||||
if not self.resume_queue:
|
||||
self._recv_buffer = b''
|
||||
self._send_buffer = b''
|
||||
self._received_queue.clear() #
|
||||
|
||||
try:
|
||||
logger.info("Unregistering selector of {}".format(self.addr))
|
||||
self.selector.unregister(self.socket)
|
||||
@@ -281,6 +297,7 @@ class ConnectionManager(object):
|
||||
with self._close_lock:
|
||||
self._should_close = False
|
||||
|
||||
self._clear()
|
||||
logger.info("CLOSED connection to {}".format(self.addr))
|
||||
|
||||
def process_events(self, mask):
|
||||
@@ -379,7 +396,7 @@ class ConnectionManager(object):
|
||||
)
|
||||
|
||||
f = request.callback
|
||||
f(value, *request.callback_args, **request.callback_kwargs)
|
||||
f(self, value, *request.callback_args, **request.callback_kwargs)
|
||||
else:
|
||||
logger.warning("Unexpected response!")
|
||||
|
||||
@@ -417,9 +434,6 @@ class ConnectionManager(object):
|
||||
logger.warning(
|
||||
"Attempt to send message {} to {} failed due error: {}".format(self._send_buffer, self.addr, error))
|
||||
|
||||
if not self.resume_queue:
|
||||
self._send_buffer = b''
|
||||
|
||||
raise error
|
||||
else:
|
||||
logger.debug("Sent {} to {}".format(self._send_buffer[:sent], self.addr))
|
||||
@@ -456,14 +470,12 @@ class ConnectionManager(object):
|
||||
|
||||
def _resend_requests(self):
|
||||
with self._request_lock:
|
||||
for request_id, request in self._request_queue.items():
|
||||
for request_id, request in self._request_queue.items(): #TODO filter
|
||||
if request.resend:
|
||||
self._send(MessageManager.create_request(
|
||||
request.requested_value, request_id, request.request_args.update(resend=request.resend))
|
||||
)
|
||||
#request.resend = False
|
||||
|
||||
# self._request_queue.clear()
|
||||
request.resend = False
|
||||
|
||||
def send_message(self, command, args=None):
|
||||
self._send(MessageManager.create_simple_message(command, args))
|
||||
@@ -484,41 +496,52 @@ class ConnectionManager(object):
|
||||
))
|
||||
|
||||
|
||||
class NotifierSock(Singleton): #TODO remake as connecting ONLY to self socket and selector
|
||||
class NotifierSock(Singleton):
|
||||
def __init__(self):
|
||||
self.receive_socket = None
|
||||
self.addr = None
|
||||
self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self._server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self._server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
||||
self._server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
|
||||
self._notify_socket = None
|
||||
self._notify_lock = threading.Lock()
|
||||
self._sending_sock = socket.socket()
|
||||
self._send_lock = threading.Lock()
|
||||
|
||||
def bind(self, server_addr):
|
||||
self._notify_socket = socket.socket()
|
||||
self._notify_socket.connect(server_addr)
|
||||
logger.info("Notify socket: bind")
|
||||
self._receiving_sock = None
|
||||
|
||||
def connect(self, _, client_socket, client_addr):
|
||||
self.receive_socket = client_socket
|
||||
self.addr = client_addr
|
||||
def init(self, selector, port=26000):
|
||||
port += random.randint(0, 100) # local testing fix
|
||||
|
||||
logger.info("Notify socket: connected")
|
||||
self._server_socket.bind(('', port))
|
||||
self._server_socket.listen(1)
|
||||
self._sending_sock.connect(('127.0.0.1', port))
|
||||
self._receiving_sock, _ = self._server_socket.accept()
|
||||
logger.info("Notify socket connected")
|
||||
|
||||
selector.register(self._receiving_sock, selectors.EVENT_READ, data=self)
|
||||
logger.info("Notify socket registered in selector")
|
||||
|
||||
def close(self):
|
||||
if self._server_socket is not None:
|
||||
self._server_socket.close()
|
||||
if self._receiving_sock is not None:
|
||||
self._receiving_sock.close()
|
||||
logger.info("Notify socket closed")
|
||||
|
||||
def get_sock(self):
|
||||
return self._receiving_sock
|
||||
|
||||
def notify(self):
|
||||
with self._notify_lock:
|
||||
if self.addr is not None:
|
||||
self._notify_socket.sendall(bytes(1))
|
||||
logger.debug("Notify socket: notified")
|
||||
with self._send_lock:
|
||||
if self._receiving_sock is not None:
|
||||
self._sending_sock.sendall(bytes(1))
|
||||
logger.debug("Notify socket notified")
|
||||
|
||||
def process_events(self, mask):
|
||||
if mask & selectors.EVENT_READ:
|
||||
if mask & selectors.EVENT_READ and self._receiving_sock is not None:
|
||||
try:
|
||||
data = self.receive_socket.recv(1024)
|
||||
except Exception: # TODO remove
|
||||
self._receiving_sock.recv(1024)
|
||||
logger.debug("Notify socket received")
|
||||
except io.BlockingIOError:
|
||||
pass
|
||||
else:
|
||||
if data:
|
||||
logger.debug("Notifier received {} from {}".format(data, self.addr))
|
||||
else:
|
||||
self.addr = None
|
||||
logger.warning("Notifier: connection to {} lost!".format(self.addr))
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
Reference in New Issue
Block a user