diff --git a/Drone/tasking_lib.py b/Drone/tasking_lib.py index 47f7a57..42bfbf4 100644 --- a/Drone/tasking_lib.py +++ b/Drone/tasking_lib.py @@ -8,24 +8,20 @@ import itertools logger = logging.getLogger(__name__) Task = collections.namedtuple("Task", ["func", "args", "kwargs", "delayable", ]) -def wait(end, interrupter=None, maxsleep=0.1): # Added features to interrupter sleep and set max sleeping interval +INTERRUPTER = threading.Event() - interrupted = False +def wait(end, interrupter=INTERRUPTER, maxsleep=0.1): # Added features to interrupter sleep and set max sleeping interval - while not interrupted: # Basic implementation of pause module until() + while not interrupter.is_set(): # Basic implementation of pause module until() now = time.time() diff = min(end - now, maxsleep) - if interrupter is None: - interrupted = False - else: - interrupted = interrupter.is_set() if diff <= 0: break else: time.sleep(diff / 2) - - if interrupted: + else: logger.warning("Waiting was interrupted!") + print("Waiting was interrupted!") class TaskManager(object): @@ -38,24 +34,41 @@ class TaskManager(object): self._task_queue_lock = threading.RLock() self._running_event = threading.Event() - self._interrupt_event = threading.Event() + self._reset_event = threading.Event() + self._wait_interrupt_event = threading.Event() + self._task_interrupt_event = threading.Event() self._shutdown_event = threading.Event() def add_task(self, timestamp, priority, task_function, - task_args=(), task_kwargs=None, task_delayable=False): + task_args=(), task_kwargs={}, task_delayable=False): - self._interrupt_event.set() + self._wait_interrupt_event.set() + self._running_event.clear() - if task_kwargs is None: - task_kwargs = {} - - task = Task(task_function, task_args, task_kwargs, task_delayable) + task = Task(task_function, task_args, task_kwargs, task_delayable) count = next(self._counter) entry = (timestamp, priority, count, task) - + with self._task_queue_lock: + if self.task_queue: + entry_old = self.task_queue[0] + else: + entry_old = entry + heapq.heappush(self.task_queue, entry) + + if self.task_queue[0] != entry_old: + self._task_interrupt_event.set() + print("Task queue updated with more priority task") + + if self._reset_event.is_set(): + self._task_interrupt_event.set() + self._reset_event.clear() + print("Task queue updated after reset") + + self._wait_interrupt_event.clear() + self._running_event.set() print(self.task_queue) @@ -72,29 +85,38 @@ class TaskManager(object): self.resume() def stop(self): - self.pause() + self.pause(interrupt=True) with self._task_queue_lock: del self.task_queue[:] + print self.task_queue def shutdown(self): self.stop() self._shutdown_event.set() + self._wait_interrupt_event.set() + self._task_interrupt_event.set() self._running_event.clear() self._processor_thread.join(timeout=5) - def pause(self, interrupt=False): -# if interrupt: -# self.interrupt() + def pause(self, interrupt=True): + if interrupt: + self._wait_interrupt_event.set() + self._task_interrupt_event.set() self._running_event.clear() logger.info("Task queue paused") + print("Task queue paused") def resume(self): self._running_event.set() + self._wait_interrupt_event.clear() + self._task_interrupt_event.clear() logger.info("Task queue resumed") + print("Task queue resumed") def reset(self): self.stop() self.resume() + self._reset_event.set() # def interrupt(self): # self._interrupt_event.set() @@ -110,23 +132,37 @@ class TaskManager(object): return logger.info("Waiting util task execution time:{}".format(start_time)) - wait(start_time, self._interrupt_event, 1) + print("Waiting util task execution time:{}".format(start_time)) + wait(start_time, self._wait_interrupt_event) - if not self._interrupt_event.is_set(): + if not self._wait_interrupt_event.is_set(): logger.info("Executing task {}".format(task)) + print("Executing task {}".format(task)) try: - task.func(*task.args, interrupter=self._interrupt_event, **task.kwargs) + task.func(*task.args, interrupter=self._task_interrupt_event, **task.kwargs) except Exception as e: logger.error("Error '{}' occurred in task {}".format(e, task)) + print("Error '{}' occurred in task {}".format(e, task)) else: logger.warning("Task interrupted before execution") - self._interrupt_event.clear() + print("Task interrupted before execution") + self._wait_interrupt_event.clear() return if time.time() > start_time: - self.pop_task() + start_time_n, priority_n, count_n, task_n = self.task_queue[0] + if (task_n == task) and (start_time_n == start_time): + self.pop_task() + try: + print("Pop {} function!".format(task.func.__name__)) + except Exception as e: + print("Pop something!") + + if self._task_interrupt_event.is_set(): + self._task_interrupt_event.clear() logger.info("Execution done") + print("Execution done") def _task_processor(self): logger.info("Tasking thread started")