From 132040dcb4aa851385dcb2b40eaeb9c33fb82fe1 Mon Sep 17 00:00:00 2001 From: Arthur Golubtsov Date: Thu, 30 May 2019 10:15:32 +0000 Subject: [PATCH] Drone: Simplify tasking_lib, correct some bugs --- Drone/tasking_lib.py | 143 ++++++++++++++++--------------------------- 1 file changed, 53 insertions(+), 90 deletions(-) diff --git a/Drone/tasking_lib.py b/Drone/tasking_lib.py index 365061e..3aa70ae 100644 --- a/Drone/tasking_lib.py +++ b/Drone/tasking_lib.py @@ -3,77 +3,81 @@ import time import logging import threading import collections +import itertools logger = logging.getLogger(__name__) Task = collections.namedtuple("Task", ["func", "args", "kwargs", "delayable", ]) +def wait(end, interrupter=None, maxsleep=0.1): # Added features to interrupter sleep and set max sleeping interval -def wait(end, interrupter=None, maxsleep=1): # Added features to interrupter sleep and set max sleeping interval - def interrupted(): - if interrupter is None: - return False - else: - return interrupter.is_set() + interrupted = False - while not interrupted(): # Basic implementation of pause module until() + while not interrupted: # Basic implementation of pause module until() now = time.time() diff = min(end - now, maxsleep) + if interrupter is None: + interrupted = False + else: + interrupted = interrupter.is_set() if diff <= 0: break else: time.sleep(diff / 2) + + if interrupted: + logger.warning("Waiting was interrupted!") class TaskManager(object): def __init__(self): self.task_queue = [] - self._active_task = None + self._counter = itertools.count() # unique sequence count self._processor_thread = threading.Thread(target=self._task_processor, name="Task processing thread") self._processor_thread.daemon = True - - self._timeout_thread = threading.Thread(target=self._task_time_interrupter, name="Task timeouts thread") - self._processor_thread.daemon = True - - self._task_lock = threading.RLock() - self._queue_lock = threading.RLock() + self._task_queue_lock = threading.RLock() self._running_event = threading.Event() self._interrupt_event = threading.Event() - self.shutdown_event = threading.Event() + self._shutdown_event = threading.Event() def add_task(self, timestamp, priority, task_function, task_args=(), task_kwargs=None, task_delayable=False): + + self._interrupt_event.set() + if task_kwargs is None: task_kwargs = {} - heapq.heappush(self.task_queue, (timestamp, priority, - Task(task_function, task_args, task_kwargs, task_delayable) - )) - logger.debug(self.task_queue) - if self._processor_thread.is_alive(): - self._update_queue() + task = Task(task_function, task_args, task_kwargs, task_delayable) - def _remove_task(self, task): - with self._queue_lock: - self.task_queue.remove(task) - heapq.heapify(self.task_queue) + count = next(self._counter) + entry = (timestamp, priority, count, task) + + with self._task_queue_lock: + heapq.heappush(self.task_queue, entry) + + print(self.task_queue) + + def pop_task(self): + with self._task_queue_lock: + if self.task_queue: + return heapq.heappop(self.task_queue) + raise KeyError('Pop from an empty priority queue') def start(self, timeouts=False): - logger.info("Starting") + logger.info("Task manager is started") self._processor_thread.start() - if timeouts: - self._timeout_thread.start() self.resume() def stop(self): self.pause(interrupt=True) - with self._queue_lock: - self.task_queue = [] + with self._task_queue_lock: + del self.task_queue[:] def shutdown(self): self.stop() - self.shutdown_event.set() + self._shutdown_event.set() self._running_event.clear() self._processor_thread.join(timeout=5) @@ -91,36 +95,18 @@ class TaskManager(object): self.stop() self.resume() - def _update_queue(self): - logger.info("Queue updated") - with self._queue_lock, self._task_lock: - if self.task_queue: - if self._active_task is None: - self._active_task = self.task_queue[0] - elif self.task_queue[0] is not self._active_task: - if self.task_queue[0] < self._active_task: - self.change_active_task(self.task_queue[0]) - def interrupt(self): self._interrupt_event.set() while self._interrupt_event.is_set(): pass logger.info("Task execution successfully interrupted") - def change_active_task(self, task): - self.pause(interrupt=True) - - with self._task_lock: - if not self._active_task[2].delayable: - self._remove_task(self._active_task) - self._active_task = task - - self.resume() - def execute_task(self): - with self._task_lock: - task = self._active_task[2] - start_time = self._active_task[0] + with self._task_queue_lock: + if self.task_queue: + start_time, priority, count, task = self.task_queue[0] + else: + return logger.info("Waiting util task execution time:{}".format(start_time)) wait(start_time, self._interrupt_event, 1) @@ -133,42 +119,20 @@ class TaskManager(object): logger.error("Error '{}' occurred in task {}".format(e, task)) else: logger.warning("Task interrupted before execution") + self._interrupt_event.clear() + return - self._interrupt_event.clear() + if time.time() > start_time: + self.pop_task() - try: - logger.debug("Removing task") - self._remove_task(self._active_task) - except ValueError: - logger.warning("Task already removed, probably task changed") - else: - with self._task_lock: - self._active_task = None - - self._update_queue() - logger.info("Execution done") + logger.info("Execution done") def _task_processor(self): logger.info("Tasking thread started") - self._update_queue() # Initial tick if tasks added before start - while not self.shutdown_event.is_set(): + # self._update_queue() # Initial tick if tasks added before start + while not self._shutdown_event.is_set(): self._running_event.wait() - if self._active_task is not None: - self.execute_task() - - def _task_time_interrupter(self): # TODO revork; temporary disabled - raise NotImplementedError - logger.info("Timeouts thread started") - while not self.shutdown_event.is_set(): - self._running_event.wait() - try: - if self.task_queue[1] is not self._active_task: # If pending task is more important than current - if self.task_queue[1] < self._active_task: # TODO look at timeout time - logger.warning("Changing low-priority task due timeout") - self.change_active_task(self.task_queue[1]) - except IndexError: - time.sleep(0.01) - + self.execute_task() if __name__ == "__main__": logger.addHandler(logging.StreamHandler()) @@ -179,15 +143,14 @@ if __name__ == "__main__": wait(time.time()+30, interrupter) tasker = TaskManager() # Lower priority first! - tasker.start() - tasker.add_task(0, 10, printer, ("Task1 ", )) + tasker.start() + tasker.add_task(time.time(), 10, printer, ("Task1 ", )) + tasker.add_task(time.time()+10, 5, printer, ("Task2 ", )) time.sleep(1) - tasker.add_task(time.time(), 10, printer, ("Lol ", )) - tasker.add_task(time.time()+10, 5, printer, ("Kek ", )) - tasker.add_task(time.time()+7, 1, printer, ("Dededededee", )) + tasker.add_task(time.time()+7, 1, printer, ("Task3", )) time.sleep(3) - tasker.add_task(time.time()+7, 0, printer, ("Iiiiiii", )) + tasker.add_task(time.time()+7, 0, printer, ("Task4", )) while True: pass