import sys import re import collections import indexed from server import ConfigOption from PyQt5 import QtCore, QtGui, QtWidgets from PyQt5.QtCore import Qt as Qt ModelDataRole = 998 ModelStateRole = 999 class CopterData: class_basic_attrs = indexed.IndexedOrderedDict([('copter_id', None), ('git_ver', None), ('anim_id', None), ('battery', None), ('sys_status', None), ('cal_status', None), ('mode', None), ('selfcheck', None), ('position', None), ('start_pos', None), ('time_delta', None), ('client', None)]) def __init__(self, **kwargs): self.attrs_dict = self.class_basic_attrs.copy() self.attrs_dict.update(kwargs) for attr, value in self.attrs_dict.items(): setattr(self, attr, value) def __getitem__(self, key): return getattr(self, self.attrs_dict.keys()[key]) def __setitem__(self, key, value): setattr(self, self.attrs_dict.keys()[key], value) class StatedCopterData(CopterData): class_basic_states = indexed.IndexedOrderedDict([("checked", 0), ("selfchecked", None), ("takeoff_ready", None), ("copter_id", True), ]) def __init__(self, **kwargs): self.states = CopterData(**self.class_basic_states) super(StatedCopterData, self).__init__(**kwargs) def __setattr__(self, key, value): self.__dict__[key] = value if key in self.class_basic_attrs.keys(): try: self.states.__dict__[key] = \ Checks.all_checks[self.attrs_dict.keys().index(key)](value) except KeyError: # No check present for that col pass else: # update selfchecked and takeoff_ready self.states.__dict__["selfchecked"] = all( [self.states[i] for i in Checks.all_checks.keys()] ) self.states.__dict__["takeoff_ready"] = all( [self.states[i] for i in Checks.takeoff_checklist] ) class Checks: all_checks = {} takeoff_checklist = (3, 4, 6, 7, 8) class CopterDataModel(QtCore.QAbstractTableModel): selected_ready_signal = QtCore.pyqtSignal(bool) selected_takeoff_ready_signal = QtCore.pyqtSignal(bool) selected_flip_ready_signal = QtCore.pyqtSignal(bool) selected_calibrating_signal = QtCore.pyqtSignal(bool) selected_calibration_ready_signal = QtCore.pyqtSignal(bool) def __init__(self, parent=None): super(CopterDataModel, self).__init__(parent) self.headers = ('copter ID', 'version', ' animation ID ', ' battery ', ' system ', 'calibration', ' mode ', 'selfcheck', 'current x y z yaw frame_id', ' start x y z ', 'dt') self.data_contents = [] self.on_id_changed = None self.first_col_is_checked = False def insertRows(self, contents, position='last', parent=QtCore.QModelIndex()): rows = len(contents) position = len(self.data_contents) if position == 'last' else position self.beginInsertRows(parent, position, position + rows - 1) self.data_contents[position:position] = contents self.endInsertRows() def removeRows(self, position, rows=1, index=QtCore.QModelIndex()): self.beginRemoveRows(QtCore.QModelIndex(), position, position + rows - 1) self.data_contents = self.data_contents[:position] + self.data_contents[position + rows:] self.endRemoveRows() return True def user_selected(self, contents=()): contents = contents or self.data_contents return filter(lambda x: x.states.checked == Qt.Checked, contents) def selfchecked_ready(self, contents=()): contents = contents or self.data_contents return filter(lambda x: x.states.selfchecked, contents) def takeoff_ready(self, contents=()): contents = contents or self.data_contents return filter(lambda x: x.states.takeoff_ready, contents) def flip_ready(self, contents=()): contents = contents or self.data_contents return filter(lambda x: flip_checks(x), contents) # possibly change as takeoff checks def calibrating(self, contents=()): contents = contents or self.data_contents return filter(lambda x: calibrating_check(x), contents) def calibration_ready(self, contents=()): contents = contents or self.data_contents return filter(lambda x: calibration_ready_check(x), contents) def get_row_index(self, row_data): try: index = self.data_contents.index(row_data) except ValueError: return None else: return index def get_row_by_attr(self, attr, value): try: row_data = next(filter(lambda x: getattr(x, attr, None) == value, self.data_contents)) except StopIteration: return None else: return row_data def rowCount(self, n=None): return len(self.data_contents) def columnCount(self, n=None): return len(self.headers) def headerData(self, section, orientation, role=Qt.DisplayRole): if role == Qt.DisplayRole: if orientation == Qt.Horizontal: return self.headers[section] def data(self, index, role=Qt.DisplayRole): row = index.row() col = index.column() if role == Qt.DisplayRole or role == Qt.EditRole: # Separate editRole in case of editing non-text item = self.data_contents[row][col] return str(item) if item is not None else "" elif role == ModelDataRole: return self.data_contents[row][col] elif role == Qt.BackgroundRole: try: item = self.data_contents[row] result = item.states[col] except KeyError: return QtGui.QBrush(Qt.white) else: if result is None: return QtGui.QBrush(Qt.yellow) if result: return QtGui.QBrush(Qt.green) else: return QtGui.QBrush(Qt.red) elif role == Qt.CheckStateRole and col == 0: return self.data_contents[row].states.checked if role == QtCore.Qt.TextAlignmentRole and col != 0: return QtCore.Qt.AlignHCenter | QtCore.Qt.AlignVCenter def update_model(self, index=QtCore.QModelIndex(), role=QtCore.Qt.EditRole): selected = set(self.user_selected()) self.selected_ready_signal.emit(selected.issubset(self.selfchecked_ready())) self.selected_takeoff_ready_signal.emit(selected.issubset(self.takeoff_ready())) self.selected_flip_ready_signal.emit(selected.issubset(self.flip_ready())) self.selected_calibrating_signal.emit(selected.issubset(self.calibrating())) self.selected_calibration_ready_signal.emit(selected.issubset(self.calibration_ready())) self.dataChanged.emit(index, index, (role,)) @QtCore.pyqtSlot() def setData(self, index, value, role=Qt.EditRole): if not index.isValid(): return False col = index.column() row = index.row() if role == Qt.CheckStateRole: self.data_contents[row].states.checked = value elif role == Qt.EditRole: # For user actions with data if col == 0: # check user hostname spelling http://man7.org/linux/man-pages/man7/hostname.7.html if value[0] != '-' and len(value) <= 63 and re.match("^[A-Za-z0-9-]*$", value): self.data_contents[row].client.send_message("id", {"new_id": value}) self.data_contents[row].client.remove() else: msg = QtWidgets.QMessageBox() msg.setIcon(QtWidgets.QMessageBox.Critical) msg.setText("Wrong input for the copter name!\nPlease use only A-Z, a-z, 0-9, and '-' chars.\nDon't use '-' as first char.") msg.exec_() else: self.data_contents[row][col] = value elif role == ModelDataRole: # For inner setting\editing of data self.data_contents[row][col] = value elif role == ModelStateRole: self.data_contents[row].states[col] = value else: return False self.update_model(index, role) return True def select_all(self): self.first_col_is_checked = not self.first_col_is_checked for row_num, copter in enumerate(self.data_contents): copter.states.checked = int(self.first_col_is_checked)*2 self.update_model(self.index(row_num, 0), Qt.CheckStateRole) def flags(self, index): roles = Qt.ItemIsSelectable | Qt.ItemIsEnabled if index.column() == 0: roles |= Qt.ItemIsUserCheckable | Qt.ItemIsEditable return roles @QtCore.pyqtSlot(int, int, QtCore.QVariant, QtCore.QVariant) def update_item(self, row, col, value, role=Qt.EditRole): self.setData(self.index(row, col), value, role) @QtCore.pyqtSlot(object) def add_client(self, client): self.insertRows([client]) @QtCore.pyqtSlot(int) def remove_client(self, row): self.removeRows(row) def col_check(col): def inner(f): Checks.all_checks[col] = f def wrapper(*args, **kwargs): return f(*args, **kwargs) return wrapper return inner @col_check(1) def check_ver(item): if not item: return None return True @col_check(2) def check_anim(item): if not item: return None return str(item) != 'No animation' @col_check(3) def check_bat(item): if not item: return None if item == "NO_INFO": return False else: return float(item.split(' ')[1][:-1]) > 30 @col_check(4) def check_sys_status(item): if not item: return None return item == "STANDBY" @col_check(5) def check_cal_status(item): if not item: return None return item == "OK" @col_check(6) def check_mode(item): if not item: return None return (item != "NO_FCU") and not ("CMODE" in item) @col_check(7) def check_selfcheck(item): if not item: return None return item == "OK" @col_check(8) def check_pos_status(item): if not item: return None return item.split(' ')[0] != 'nan' and item.split(' ')[0] != 'NO_POS' @col_check(9) def check_start_pos_status(item): if not item: return None return str(item).split(' ')[0] != 'NO_POS' @col_check(10) def check_time_delta(item): if not item: return None return abs(float(item)) < 1 def all_checks(copter_item): for col, check in Checks.all_checks.items(): if not check(copter_item[col]): return False return True def takeoff_checks(copter_item): for col in Checks.takeoff_checklist: if not Checks.all_checks[col](copter_item[col]): return False return True def flip_checks(copter_item): for col in Checks.takeoff_checklist: if col != 4 or col != 7: if not Checks.all_checks[col](copter_item[col]): return False else: if copter_item[4] != "ACTIVE": return False return True def calibrating_check(copter_item): return copter_item[5] == "CALIBRATING" def calibration_ready_check(copter_item): if not Checks.all_checks[4](copter_item[4]): return False return not calibrating_check(copter_item) class CopterProxyModel(QtCore.QSortFilterProxyModel): def __init__(self, parent=None): super(CopterProxyModel, self).__init__(parent) @staticmethod def human_sort_prepare(item): if item: item = [int(x) if x.isdigit() else x.lower() for x in re.split('([0-9]+)', str(item))] else: item = [] return item def lessThan(self, left, right): leftData = self.sourceModel().data(left) rightData = self.sourceModel().data(right) return self.human_sort_prepare(leftData) < self.human_sort_prepare(rightData) class SignalManager(QtCore.QObject): update_data_signal = QtCore.pyqtSignal(int, int, QtCore.QVariant, QtCore.QVariant) add_client_signal = QtCore.pyqtSignal(object) remove_client_signal = QtCore.pyqtSignal(int) if __name__ == '__main__': import threading import time def timer(): idc = 1001 while True: myModel.setData(myModel.index(0, 0), idc) idc += 1 time.sleep(1) app = QtWidgets.QApplication.instance() if app is None: app = QtWidgets.QApplication(sys.argv) tableView = QtWidgets.QTableView() myModel = CopterDataModel(None) proxyModel = CopterProxyModel() proxyModel.setDynamicSortFilter(True) proxyModel.setSourceModel(myModel) tableView.setModel(proxyModel) tableView.verticalHeader().hide() tableView.setSortingEnabled(True) tableView.show() msgs = [] msg = "[{}]: Failure: {}".format("FCU connection", "Angular velocities estimation is not available") msgs.append(msg) msg = "[{}]: Failure: {}".format("FCU connection1", "Angular velocities estimation is not available") msgs.append(msg) msg = "[{}]: Failure: {}".format("FCU connection2", "Angular velocities estimation is not available") msgs.append(msg) myModel.add_client(StatedCopterData(copter_id=1000, checked=0, selfcheck=msgs, time_utc=1)) myModel.add_client(StatedCopterData(checked=2, selfcheck="OK", time_utc=2)) myModel.add_client(StatedCopterData(checked=2, selfcheck="not ok", time_utc="no")) myModel.setData(myModel.index(0, 1), "test") t = threading.Thread(target=timer, daemon=True) t.start() print(QtCore.QT_VERSION_STR) app.exec_()