mirror of
https://github.com/CopterExpress/clever-show.git
synced 2026-05-29 16:29:34 +00:00
Refactor of table columns handling: checks and formatters
This commit is contained in:
@@ -148,7 +148,7 @@ class HeaderListWidget(QListWidget):
|
||||
flags = Qt.ItemIsUserCheckable | Qt.ItemIsSelectable | Qt.ItemIsDragEnabled | Qt.ItemIsEnabled
|
||||
state = Qt.Checked if visible else Qt.Unchecked
|
||||
|
||||
item = QListWidgetItem(table.columns_names.get(name, "").strip() or name, self)
|
||||
item = QListWidgetItem(table.CopterDataModel.columns_dict.get(name, "").strip() or name, self)
|
||||
item.setFlags(flags)
|
||||
item.setCheckState(state)
|
||||
item.setData(HeaderListWidget.ColumnKeyRole, name)
|
||||
|
||||
@@ -1,38 +1,55 @@
|
||||
import math
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import math
|
||||
from contextlib import suppress
|
||||
from functools import partialmethod
|
||||
|
||||
from PyQt5 import QtCore, QtGui, QtWidgets
|
||||
from PyQt5.QtCore import Qt as Qt, QUrl, QDir
|
||||
|
||||
# Additional custom roles to interact with various table data
|
||||
ModelDataRole = 998
|
||||
ModelStateRole = 999
|
||||
|
||||
def get_git_version(): # TODO import from animation
|
||||
return subprocess.check_output("git log --pretty=format:%h -n 1").decode('UTF-8')
|
||||
|
||||
|
||||
|
||||
class ModelChecks:
|
||||
checks_dict = {}
|
||||
takeoff_checklist = (3, 4, 6, 7, 8)
|
||||
|
||||
battery_min = 50.0
|
||||
start_pos_delta_max = 1.0
|
||||
time_delta_max = 1.0
|
||||
|
||||
@classmethod
|
||||
def col_check(cls, col):
|
||||
def column_check(cls, column, pass_context=False):
|
||||
def inner(f):
|
||||
def wrapper(item):
|
||||
if item is not None:
|
||||
return f(item)
|
||||
return None
|
||||
def wrapper(item, context=None):
|
||||
if item is None:
|
||||
return None
|
||||
if pass_context:
|
||||
return f(item, context)
|
||||
return f(item)
|
||||
|
||||
cls.checks_dict[col] = wrapper
|
||||
cls.checks_dict[column] = wrapper
|
||||
return wrapper
|
||||
|
||||
return inner
|
||||
|
||||
@classmethod
|
||||
def check(cls, column, context):
|
||||
if isinstance(column, int):
|
||||
column = context.columns[column]
|
||||
item = context[column]
|
||||
try:
|
||||
return cls.checks_dict[column](item, context)
|
||||
except KeyError: # When there is no check
|
||||
return None if item is None else True # item is not None
|
||||
|
||||
@classmethod
|
||||
def all_checks(cls, copter_item):
|
||||
for col, check in cls.checks_dict.items():
|
||||
@@ -40,79 +57,88 @@ class ModelChecks:
|
||||
return False
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def takeoff_checks(cls, copter_item):
|
||||
for col in cls.takeoff_checklist:
|
||||
if not cls.checks_dict[col](copter_item[col]):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
@ModelChecks.col_check(1)
|
||||
@ModelChecks.column_check("git_version")
|
||||
def check_ver(item):
|
||||
return True # TODO git version!
|
||||
return get_git_version() == item
|
||||
|
||||
|
||||
@ModelChecks.col_check(2)
|
||||
@ModelChecks.column_check("animation_id")
|
||||
def check_anim(item):
|
||||
return str(item) != 'No animation'
|
||||
|
||||
|
||||
@ModelChecks.col_check(3)
|
||||
@ModelChecks.column_check("battery")
|
||||
def check_bat(item):
|
||||
if item == "NO_INFO":
|
||||
return False
|
||||
return item[1] * 100 > ModelChecks.battery_min
|
||||
|
||||
|
||||
@ModelChecks.col_check(4)
|
||||
@ModelChecks.column_check("fcu_status")
|
||||
def check_sys_status(item):
|
||||
return item == "STANDBY"
|
||||
|
||||
|
||||
@ModelChecks.col_check(5)
|
||||
@ModelChecks.column_check("calibration_status")
|
||||
def check_cal_status(item):
|
||||
return item == "OK"
|
||||
|
||||
|
||||
@ModelChecks.col_check(6)
|
||||
@ModelChecks.column_check("mode")
|
||||
def check_mode(item):
|
||||
return (item != "NO_FCU") and not ("CMODE" in item)
|
||||
|
||||
|
||||
@ModelChecks.col_check(7)
|
||||
@ModelChecks.column_check("selfcheck")
|
||||
def check_selfcheck(item):
|
||||
return item == "OK"
|
||||
|
||||
|
||||
@ModelChecks.col_check(8)
|
||||
def check_pos_status(item):
|
||||
@ModelChecks.column_check("current_position")
|
||||
def check_pos(item):
|
||||
if item == 'NO_POS':
|
||||
return False
|
||||
return not math.isnan(item[0])
|
||||
|
||||
|
||||
@ModelChecks.col_check(9)
|
||||
def check_start_pos_status(item):
|
||||
return item != 'NO_POS'
|
||||
# @ModelChecks.column_check("last_task")
|
||||
# def check_task(item):
|
||||
# return True
|
||||
|
||||
|
||||
@ModelChecks.col_check(10)
|
||||
def check_selfcheck(item):
|
||||
return True
|
||||
|
||||
|
||||
@ModelChecks.col_check(11)
|
||||
@ModelChecks.column_check('time_delta')
|
||||
def check_time_delta(item):
|
||||
return abs(item) < ModelChecks.time_delta_max
|
||||
|
||||
|
||||
@ModelChecks.column_check("start_position", pass_context=True)
|
||||
def check_start_pos(item, context):
|
||||
if context.current_position is None:
|
||||
return item != 'NO_POS' # maybe should return true
|
||||
|
||||
delta = get_distance(get_position(context.current_position),
|
||||
get_position(context.start_position))
|
||||
if math.isnan(delta):
|
||||
return False
|
||||
|
||||
return delta < ModelChecks.start_pos_delta_max
|
||||
|
||||
|
||||
def get_position(position):
|
||||
if position != 'NO_POS' and position[0] != 'nan':
|
||||
return position
|
||||
return [float('nan')]*3
|
||||
|
||||
|
||||
def get_distance(pos1, pos2): # todo as common function
|
||||
if any(math.isnan(x) for x in pos1+pos2):
|
||||
return float('nan')
|
||||
return math.sqrt(sum(map(lambda p: p[0] - p[1], zip(pos1, pos2)))**2) # point distance formula
|
||||
|
||||
|
||||
class CopterData:
|
||||
def __init__(self, columns=(), **kwargs):
|
||||
self._columns = columns
|
||||
self.columns = columns
|
||||
for column in columns:
|
||||
setattr(self, column, None)
|
||||
|
||||
@@ -120,15 +146,15 @@ class CopterData:
|
||||
setattr(self, attr, value)
|
||||
|
||||
def __getitem__(self, key):
|
||||
if key in self._columns:
|
||||
if key in self.columns:
|
||||
return getattr(self, key)
|
||||
return getattr(self, self._columns[key])
|
||||
return getattr(self, self.columns[key])
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if key in self._columns:
|
||||
if key in self.columns:
|
||||
setattr(self, key, value)
|
||||
else:
|
||||
setattr(self, self._columns[key], value)
|
||||
setattr(self, self.columns[key], value)
|
||||
|
||||
|
||||
class StatedCopterData(CopterData):
|
||||
@@ -145,52 +171,18 @@ class StatedCopterData(CopterData):
|
||||
def __setattr__(self, key, value):
|
||||
self.__dict__[key] = value
|
||||
|
||||
if key in self._columns:
|
||||
if key in self.columns:
|
||||
with suppress(KeyError):
|
||||
# print(self.__dict__)
|
||||
self.states.__dict__[key] = self.checks.checks_dict[self._columns.index(key)](value)
|
||||
self.states.__dict__["all_checks"] = all([self.states[i] for i in ModelChecks.checks_dict.keys()])
|
||||
|
||||
# if key == 'start_position':
|
||||
# if (self.__dict__['current_position'] is not None) and (
|
||||
# self.__dict__['start_position'] is not None):
|
||||
# current_pos = get_position(self.__dict__['current_position'])
|
||||
# start_pos = get_position(self.__dict__['start_position'])
|
||||
# delta = get_position_delta(current_pos, start_pos)
|
||||
# if delta != 'NO_POS':
|
||||
# self.states.__dict__[key] = (delta < ModelChecks.start_pos_delta_max)
|
||||
|
||||
# update all_checks and takeoff_ready
|
||||
|
||||
# self.states.__dict__["takeoff_ready"] = all(
|
||||
# [self.states[i] for i in ModelChecks.takeoff_checklist]
|
||||
# )
|
||||
|
||||
|
||||
def get_position(pos_array):
|
||||
if pos_array[0] != 'nan' and pos_array != 'NO_POS':
|
||||
pos = []
|
||||
for i in range(3):
|
||||
pos.append(pos_array[i])
|
||||
else:
|
||||
pos = 'NO_POS'
|
||||
return pos
|
||||
|
||||
|
||||
def get_position_delta(pos1, pos2):
|
||||
if pos1 != 'NO_POS' and pos2 != 'NO_POS':
|
||||
delta_squared = 0
|
||||
for i in range(3):
|
||||
delta_squared += (pos1[i] - pos2[i]) ** 2
|
||||
return math.sqrt(delta_squared)
|
||||
return 'NO_POS'
|
||||
self.states.__dict__[key] = \
|
||||
self.checks.check(key, self) # {key: self.__dict__[key] for key in self.columns})
|
||||
self.states.__dict__["all_checks"] = all([self.states[i] for i in self.checks.checks_dict.keys()])
|
||||
|
||||
|
||||
class ModelFormatter:
|
||||
view_formatters = {}
|
||||
place_formatters = {}
|
||||
VIEW_FORMATTER = False
|
||||
PLACE_FORMATTER = True
|
||||
VIEW_FORMATTER = 1
|
||||
PLACE_FORMATTER = 2
|
||||
|
||||
@classmethod
|
||||
def format_view(cls, col, value):
|
||||
@@ -205,11 +197,11 @@ class ModelFormatter:
|
||||
return value
|
||||
|
||||
@classmethod
|
||||
def col_format(cls, col, format_type):
|
||||
def column_formatter(cls, col, formatter_type):
|
||||
def inner(f):
|
||||
if format_type:
|
||||
if formatter_type == cls.PLACE_FORMATTER:
|
||||
cls.place_formatters[col] = f
|
||||
else:
|
||||
elif formatter_type == cls.VIEW_FORMATTER:
|
||||
cls.view_formatters[col] = f
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
@@ -219,8 +211,10 @@ class ModelFormatter:
|
||||
|
||||
return inner
|
||||
|
||||
place_formatter = partialmethod(column_formatter, formatter_type=PLACE_FORMATTER)
|
||||
view_formatter = partialmethod(column_formatter, formatter_type=VIEW_FORMATTER)
|
||||
|
||||
@ModelFormatter.col_format(0, ModelFormatter.PLACE_FORMATTER)
|
||||
@ModelFormatter.place_formatter("copter_id")
|
||||
def place_id(value):
|
||||
value = str(value).strip()
|
||||
# check user hostname spelling http://man7.org/linux/man-pages/man7/hostname.7.html
|
||||
@@ -240,7 +234,7 @@ def place_id(value):
|
||||
return None
|
||||
|
||||
|
||||
@ModelFormatter.col_format(3, ModelFormatter.PLACE_FORMATTER)
|
||||
@ModelFormatter.place_formatter("battery")
|
||||
def place_battery(value):
|
||||
if isinstance(value, list):
|
||||
battery_v, battery_p = value
|
||||
@@ -248,56 +242,54 @@ def place_battery(value):
|
||||
return "NO_INFO"
|
||||
return value
|
||||
|
||||
|
||||
@ModelFormatter.col_format(3, ModelFormatter.VIEW_FORMATTER)
|
||||
@ModelFormatter.view_formatter("battery")
|
||||
def view_battery(value):
|
||||
if isinstance(value, list):
|
||||
battery_v, battery_p = value
|
||||
return "{:.1f}V {:d}%".format(battery_v, int(battery_p * 100))
|
||||
return f"{battery_v:4.1f}V {min(battery_p, 1):4.0%}"
|
||||
return value
|
||||
|
||||
|
||||
@ModelFormatter.col_format(7, ModelFormatter.VIEW_FORMATTER)
|
||||
@ModelFormatter.view_formatter("selfcheck")
|
||||
def view_selfcheck(value):
|
||||
if isinstance(value, list):
|
||||
if len(value) == 1:
|
||||
if len(value[0]) <= 8:
|
||||
return value[0]
|
||||
if len(value) == 1 and len(value[0]) <= 8:
|
||||
return value[0]
|
||||
return "ERROR"
|
||||
return value
|
||||
|
||||
|
||||
@ModelFormatter.col_format(8, ModelFormatter.VIEW_FORMATTER)
|
||||
@ModelFormatter.view_formatter("current_position")
|
||||
def view_selfcheck(value):
|
||||
if isinstance(value, list):
|
||||
x, y, z, yaw, frame = value
|
||||
return "{:.2f} {:.2f} {:.2f} {:d} {}".format(x, y, z, int(yaw), frame)
|
||||
return f"{x: .2f} {y: .2f} {z: .2f} {int(yaw): d} {frame}"
|
||||
return value
|
||||
|
||||
|
||||
@ModelFormatter.col_format(9, ModelFormatter.VIEW_FORMATTER)
|
||||
@ModelFormatter.view_formatter("start_position")
|
||||
def view_selfcheck(value):
|
||||
if isinstance(value, list):
|
||||
x, y, z = value
|
||||
return "{:.2f} {:.2f} {:.2f}".format(x, y, z)
|
||||
return f"{x: .2f} {y: .2f} {z: .2f}"
|
||||
return value
|
||||
|
||||
|
||||
@ModelFormatter.col_format(10, ModelFormatter.PLACE_FORMATTER)
|
||||
def view_last_task(value):
|
||||
if value is None:
|
||||
@ModelFormatter.place_formatter("last_task")
|
||||
def place_last_task(value):
|
||||
if value is None: # TODO possible behaviour deviation
|
||||
return 'No task'
|
||||
return value
|
||||
|
||||
|
||||
@ModelFormatter.col_format(11, ModelFormatter.PLACE_FORMATTER)
|
||||
@ModelFormatter.place_formatter("time_delta")
|
||||
def place_time_delta(value):
|
||||
return abs(value - time.time())
|
||||
|
||||
|
||||
@ModelFormatter.col_format(11, ModelFormatter.VIEW_FORMATTER)
|
||||
@ModelFormatter.view_formatter("time_delta")
|
||||
def view_time_delta(value):
|
||||
return "{:.3f}".format(value)
|
||||
return f"{value:.3f}"
|
||||
|
||||
|
||||
class CopterDataModel(QtCore.QAbstractTableModel):
|
||||
@@ -366,12 +358,17 @@ class CopterDataModel(QtCore.QAbstractTableModel):
|
||||
def is_column(cls, index, column_name):
|
||||
return index.column() == cls.columns.index(column_name)
|
||||
|
||||
def user_selected(self, contents=()):
|
||||
return self.filter(lambda x: x.states.checked == Qt.Checked, contents)
|
||||
|
||||
def filter(self, f, contents=()):
|
||||
contents = contents or self.data_contents
|
||||
return filter(f, contents)
|
||||
|
||||
def user_selected(self, contents=()):
|
||||
return self.filter(lambda x: x.states.checked == Qt.Checked, contents)
|
||||
def selected_check(self, f, selected=()):
|
||||
selected = selected or set(self.user_selected())
|
||||
print(selected and all(f(item) for item in selected))
|
||||
return bool(selected) and all(f(item) for item in selected) #selected.issubset(self.filter(f))
|
||||
|
||||
def get_row_data(self, index):
|
||||
row = index.row()
|
||||
@@ -409,7 +406,7 @@ class CopterDataModel(QtCore.QAbstractTableModel):
|
||||
col = index.column()
|
||||
if role == Qt.DisplayRole or role == Qt.EditRole: # Separate editRole in case of editing non-text
|
||||
item = self.data_contents[row][col]
|
||||
return str(self.formatter.format_view(col, item)) if item is not None else ""
|
||||
return str(self.formatter.format_view(self.columns[col], item)) if item is not None else ""
|
||||
elif role == ModelDataRole:
|
||||
return self.data_contents[row][col]
|
||||
|
||||
@@ -436,11 +433,11 @@ class CopterDataModel(QtCore.QAbstractTableModel):
|
||||
def update_model(self, index=QtCore.QModelIndex(), role=QtCore.Qt.EditRole):
|
||||
selected = set(self.user_selected())
|
||||
|
||||
self.selected_ready_signal.emit(selected.issubset(self.filter(lambda x: x.states.all_checks)))
|
||||
#self.selected_takeoff_ready_signal.emit(selected.issubset(self.filter(lambda x: x.states.takeoff_ready)))
|
||||
self.selected_flip_ready_signal.emit(selected.issubset(self.filter(flip_checks)))
|
||||
self.selected_calibrating_signal.emit(selected.issubset(self.filter(calibrating_check)))
|
||||
self.selected_calibration_ready_signal.emit(selected.issubset(self.filter(calibration_ready_check)))
|
||||
self.selected_ready_signal.emit(self.selected_check(lambda x: x.states.all_checks, selected))
|
||||
self.selected_takeoff_ready_signal.emit(self.selected_check(takeoff_checks, selected))
|
||||
self.selected_flip_ready_signal.emit(self.selected_check(flip_checks, selected))
|
||||
self.selected_calibrating_signal.emit(self.selected_check(calibrating_check, selected))
|
||||
self.selected_calibration_ready_signal.emit(self.selected_check(calibration_ready_check, selected))
|
||||
|
||||
self.dataChanged.emit(index, index, (role,))
|
||||
|
||||
@@ -455,7 +452,7 @@ class CopterDataModel(QtCore.QAbstractTableModel):
|
||||
if role == Qt.CheckStateRole:
|
||||
self.data_contents[row].states.checked = value
|
||||
elif role == Qt.EditRole: # For user/outer actions with data, place modifiers applied
|
||||
formatted_value = self.formatter.format_place(col, value)
|
||||
formatted_value = self.formatter.format_place(self.columns[col], value)
|
||||
if formatted_value is None: # todo use new := syntax
|
||||
return False
|
||||
|
||||
@@ -520,8 +517,6 @@ class CopterDataModel(QtCore.QAbstractTableModel):
|
||||
# Thread-safe wrappers
|
||||
def add_client(self, **kwargs):
|
||||
default_states = {"checked": 0, "copter_id": True}
|
||||
# class_basic_attrs = {'client': None}
|
||||
# class_basic_states = OrderedDict([("checked", 0), ("selfchecked", None), ("takeoff_ready", None)])
|
||||
self.add_client_signal.emit(self.data_model(self.columns, default_states, **kwargs))
|
||||
|
||||
def remove_client_data(self, row_data):
|
||||
@@ -552,22 +547,28 @@ class CopterDataModel(QtCore.QAbstractTableModel):
|
||||
self.removeRows(row)
|
||||
|
||||
|
||||
def check_checklist(copter_item, checklist=()):
|
||||
return all(copter_item.states[col] for col in checklist)
|
||||
|
||||
|
||||
def takeoff_checks(copter_item):
|
||||
checklist = ("battery", "fcu_status", "mode", "selfcheck", "current_position")
|
||||
return check_checklist(copter_item, checklist)
|
||||
|
||||
def flip_checks(copter_item):
|
||||
for col in ModelChecks.takeoff_checklist:
|
||||
if col != 4 or col != 7:
|
||||
if not ModelChecks.checks_dict[col](copter_item[col]):
|
||||
return False
|
||||
elif copter_item[4] != "ACTIVE":
|
||||
return False
|
||||
checklist = ("battery", "mode", "current_position")
|
||||
if not check_checklist(copter_item, checklist):
|
||||
return False
|
||||
if copter_item["fcu_status"] != "ACTIVE":
|
||||
return True
|
||||
|
||||
# for col in checklist:
|
||||
# if not copter_item.state[col]: # ModelChecks.check(col, copter_item):
|
||||
|
||||
def calibrating_check(copter_item):
|
||||
return copter_item[5] == "CALIBRATING"
|
||||
|
||||
|
||||
def calibration_ready_check(copter_item):
|
||||
if not ModelChecks.checks_dict[4](copter_item[4]):
|
||||
return False
|
||||
return not calibrating_check(copter_item)
|
||||
|
||||
@@ -592,7 +593,6 @@ class CopterProxyModel(QtCore.QSortFilterProxyModel):
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import threading
|
||||
import time
|
||||
|
||||
|
||||
@@ -632,11 +632,9 @@ if __name__ == '__main__':
|
||||
#myModel._add_client(StatedCopterData(copter_id=1000, checked=0, selfcheck=msgs, time_utc=1))
|
||||
#myModel._add_client(StatedCopterData(checked=2, selfcheck="OK", time_utc=2))
|
||||
#myModel._add_client(StatedCopterData(checked=2, selfcheck="not ok", time_utc="no"))
|
||||
myModel.add_client(copter_id=1000, client=None)
|
||||
#myModel.setData(myModel.index(0, 1), "test")
|
||||
|
||||
# t = threading.Thread(target=timer, daemon=True)
|
||||
#t.start()
|
||||
print(QtCore.QT_VERSION_STR)
|
||||
|
||||
app.exec_()
|
||||
print(get_git_version())
|
||||
myModel.update_data(0, 3, [1, 2], role=Qt.EditRole)
|
||||
|
||||
Reference in New Issue
Block a user