Files
clever-show/Drone/tasking_lib.py

194 lines
6.4 KiB
Python

import heapq
import time
import logging
import threading
import collections
import itertools
logger = logging.getLogger(__name__)
Task = collections.namedtuple("Task", ["func", "args", "kwargs", "delayable", ])
INTERRUPTER = threading.Event()
def wait(end, interrupter=INTERRUPTER, maxsleep=0.1): # Added features to interrupter sleep and set max sleeping interval
while not interrupter.is_set(): # Basic implementation of pause module until()
now = time.time()
diff = min(end - now, maxsleep)
if diff <= 0:
break
else:
time.sleep(diff / 2)
else:
logger.warning("Waiting was interrupted!")
print("Waiting was interrupted!")
class TaskManager(object):
def __init__(self):
self.task_queue = []
self._counter = itertools.count() # unique sequence count
self._processor_thread = threading.Thread(target=self._task_processor, name="Task processing thread")
self._processor_thread.daemon = True
self._task_queue_lock = threading.RLock()
self._running_event = threading.Event()
self._reset_event = threading.Event()
self._wait_interrupt_event = threading.Event()
self._task_interrupt_event = threading.Event()
self._shutdown_event = threading.Event()
def add_task(self, timestamp, priority, task_function,
task_args=(), task_kwargs={}, task_delayable=False):
self._wait_interrupt_event.set()
self._running_event.clear()
task = Task(task_function, task_args, task_kwargs, task_delayable)
count = next(self._counter)
entry = (timestamp, priority, count, task)
with self._task_queue_lock:
if self.task_queue:
entry_old = self.task_queue[0]
else:
entry_old = entry
heapq.heappush(self.task_queue, entry)
if self.task_queue[0] != entry_old:
self._task_interrupt_event.set()
print("Task queue updated with more priority task")
if self._reset_event.is_set():
self._task_interrupt_event.set()
self._reset_event.clear()
print("Task queue updated after reset")
self._wait_interrupt_event.clear()
self._running_event.set()
print(self.task_queue)
def pop_task(self):
with self._task_queue_lock:
if self.task_queue:
return heapq.heappop(self.task_queue)
raise KeyError('Pop from an empty priority queue')
def start(self, timeouts=False):
print("Task manager is started")
logger.info("Task manager is started")
self._processor_thread.start()
self.resume()
def stop(self):
self.pause(interrupt=True)
with self._task_queue_lock:
del self.task_queue[:]
print self.task_queue
def shutdown(self):
self.stop()
self._shutdown_event.set()
self._wait_interrupt_event.set()
self._task_interrupt_event.set()
self._running_event.clear()
self._processor_thread.join(timeout=5)
def pause(self, interrupt=True):
if interrupt:
self._wait_interrupt_event.set()
self._task_interrupt_event.set()
self._running_event.clear()
logger.info("Task queue paused")
print("Task queue paused")
def resume(self):
self._running_event.set()
self._wait_interrupt_event.clear()
self._task_interrupt_event.clear()
logger.info("Task queue resumed")
print("Task queue resumed")
def reset(self):
self.stop()
self.resume()
self._reset_event.set()
# def interrupt(self):
# self._interrupt_event.set()
# while self._interrupt_event.is_set():
# pass
# logger.info("Task execution successfully interrupted")
def execute_task(self):
with self._task_queue_lock:
if self.task_queue:
start_time, priority, count, task = self.task_queue[0]
else:
return
logger.info("Waiting util task execution time:{}".format(start_time))
print("Waiting util task execution time:{}".format(start_time))
wait(start_time, self._wait_interrupt_event)
if not self._wait_interrupt_event.is_set():
logger.info("Executing task {}".format(task))
print("Executing task {}".format(task))
try:
task.func(*task.args, interrupter=self._task_interrupt_event, **task.kwargs)
except Exception as e:
logger.error("Error '{}' occurred in task {}".format(e, task))
print("Error '{}' occurred in task {}".format(e, task))
else:
logger.warning("Task interrupted before execution")
print("Task interrupted before execution")
self._wait_interrupt_event.clear()
return
if time.time() > start_time:
start_time_n, priority_n, count_n, task_n = self.task_queue[0]
if (task_n == task) and (start_time_n == start_time):
self.pop_task()
try:
print("Pop {} function!".format(task.func.__name__))
except Exception as e:
print("Pop something!")
if self._task_interrupt_event.is_set():
self._task_interrupt_event.clear()
logger.info("Execution done")
print("Execution done")
def _task_processor(self):
logger.info("Tasking thread started")
# self._update_queue() # Initial tick if tasks added before start
while not self._shutdown_event.is_set():
self._running_event.wait()
self.execute_task()
if __name__ == "__main__":
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)
def printer(stri, interrupter, *args, **kwargs):
logger.info("String: {}, timenow: {}".format(stri, time.time()))
wait(time.time()+30, interrupter)
tasker = TaskManager() # Lower priority first!
tasker.start()
tasker.add_task(time.time(), 10, printer, ("Task1 ", ))
tasker.add_task(time.time()+10, 5, printer, ("Task2 ", ))
time.sleep(1)
tasker.add_task(time.time()+7, 1, printer, ("Task3", ))
time.sleep(3)
tasker.add_task(time.time()+7, 0, printer, ("Task4", ))
while True:
pass