mirror of
https://github.com/CopterExpress/clever-show.git
synced 2026-05-26 23:19:33 +00:00
Drone: Simplify tasking_lib, correct some bugs
This commit is contained in:
@@ -3,77 +3,81 @@ import time
|
||||
import logging
|
||||
import threading
|
||||
import collections
|
||||
import itertools
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
Task = collections.namedtuple("Task", ["func", "args", "kwargs", "delayable", ])
|
||||
|
||||
def wait(end, interrupter=None, maxsleep=0.1): # Added features to interrupter sleep and set max sleeping interval
|
||||
|
||||
def wait(end, interrupter=None, maxsleep=1): # Added features to interrupter sleep and set max sleeping interval
|
||||
def interrupted():
|
||||
if interrupter is None:
|
||||
return False
|
||||
else:
|
||||
return interrupter.is_set()
|
||||
interrupted = False
|
||||
|
||||
while not interrupted(): # Basic implementation of pause module until()
|
||||
while not interrupted: # Basic implementation of pause module until()
|
||||
now = time.time()
|
||||
diff = min(end - now, maxsleep)
|
||||
if interrupter is None:
|
||||
interrupted = False
|
||||
else:
|
||||
interrupted = interrupter.is_set()
|
||||
if diff <= 0:
|
||||
break
|
||||
else:
|
||||
time.sleep(diff / 2)
|
||||
|
||||
if interrupted:
|
||||
logger.warning("Waiting was interrupted!")
|
||||
|
||||
|
||||
class TaskManager(object):
|
||||
def __init__(self):
|
||||
self.task_queue = []
|
||||
self._active_task = None
|
||||
self._counter = itertools.count() # unique sequence count
|
||||
|
||||
self._processor_thread = threading.Thread(target=self._task_processor, name="Task processing thread")
|
||||
self._processor_thread.daemon = True
|
||||
|
||||
self._timeout_thread = threading.Thread(target=self._task_time_interrupter, name="Task timeouts thread")
|
||||
self._processor_thread.daemon = True
|
||||
|
||||
self._task_lock = threading.RLock()
|
||||
self._queue_lock = threading.RLock()
|
||||
self._task_queue_lock = threading.RLock()
|
||||
|
||||
self._running_event = threading.Event()
|
||||
self._interrupt_event = threading.Event()
|
||||
self.shutdown_event = threading.Event()
|
||||
self._shutdown_event = threading.Event()
|
||||
|
||||
def add_task(self, timestamp, priority, task_function,
|
||||
task_args=(), task_kwargs=None, task_delayable=False):
|
||||
|
||||
self._interrupt_event.set()
|
||||
|
||||
if task_kwargs is None:
|
||||
task_kwargs = {}
|
||||
|
||||
heapq.heappush(self.task_queue, (timestamp, priority,
|
||||
Task(task_function, task_args, task_kwargs, task_delayable)
|
||||
))
|
||||
logger.debug(self.task_queue)
|
||||
if self._processor_thread.is_alive():
|
||||
self._update_queue()
|
||||
task = Task(task_function, task_args, task_kwargs, task_delayable)
|
||||
|
||||
def _remove_task(self, task):
|
||||
with self._queue_lock:
|
||||
self.task_queue.remove(task)
|
||||
heapq.heapify(self.task_queue)
|
||||
count = next(self._counter)
|
||||
entry = (timestamp, priority, count, task)
|
||||
|
||||
with self._task_queue_lock:
|
||||
heapq.heappush(self.task_queue, entry)
|
||||
|
||||
print(self.task_queue)
|
||||
|
||||
def pop_task(self):
|
||||
with self._task_queue_lock:
|
||||
if self.task_queue:
|
||||
return heapq.heappop(self.task_queue)
|
||||
raise KeyError('Pop from an empty priority queue')
|
||||
|
||||
def start(self, timeouts=False):
|
||||
logger.info("Starting")
|
||||
logger.info("Task manager is started")
|
||||
self._processor_thread.start()
|
||||
if timeouts:
|
||||
self._timeout_thread.start()
|
||||
self.resume()
|
||||
|
||||
def stop(self):
|
||||
self.pause(interrupt=True)
|
||||
with self._queue_lock:
|
||||
self.task_queue = []
|
||||
with self._task_queue_lock:
|
||||
del self.task_queue[:]
|
||||
|
||||
def shutdown(self):
|
||||
self.stop()
|
||||
self.shutdown_event.set()
|
||||
self._shutdown_event.set()
|
||||
self._running_event.clear()
|
||||
self._processor_thread.join(timeout=5)
|
||||
|
||||
@@ -91,36 +95,18 @@ class TaskManager(object):
|
||||
self.stop()
|
||||
self.resume()
|
||||
|
||||
def _update_queue(self):
|
||||
logger.info("Queue updated")
|
||||
with self._queue_lock, self._task_lock:
|
||||
if self.task_queue:
|
||||
if self._active_task is None:
|
||||
self._active_task = self.task_queue[0]
|
||||
elif self.task_queue[0] is not self._active_task:
|
||||
if self.task_queue[0] < self._active_task:
|
||||
self.change_active_task(self.task_queue[0])
|
||||
|
||||
def interrupt(self):
|
||||
self._interrupt_event.set()
|
||||
while self._interrupt_event.is_set():
|
||||
pass
|
||||
logger.info("Task execution successfully interrupted")
|
||||
|
||||
def change_active_task(self, task):
|
||||
self.pause(interrupt=True)
|
||||
|
||||
with self._task_lock:
|
||||
if not self._active_task[2].delayable:
|
||||
self._remove_task(self._active_task)
|
||||
self._active_task = task
|
||||
|
||||
self.resume()
|
||||
|
||||
def execute_task(self):
|
||||
with self._task_lock:
|
||||
task = self._active_task[2]
|
||||
start_time = self._active_task[0]
|
||||
with self._task_queue_lock:
|
||||
if self.task_queue:
|
||||
start_time, priority, count, task = self.task_queue[0]
|
||||
else:
|
||||
return
|
||||
|
||||
logger.info("Waiting util task execution time:{}".format(start_time))
|
||||
wait(start_time, self._interrupt_event, 1)
|
||||
@@ -133,42 +119,20 @@ class TaskManager(object):
|
||||
logger.error("Error '{}' occurred in task {}".format(e, task))
|
||||
else:
|
||||
logger.warning("Task interrupted before execution")
|
||||
self._interrupt_event.clear()
|
||||
return
|
||||
|
||||
self._interrupt_event.clear()
|
||||
if time.time() > start_time:
|
||||
self.pop_task()
|
||||
|
||||
try:
|
||||
logger.debug("Removing task")
|
||||
self._remove_task(self._active_task)
|
||||
except ValueError:
|
||||
logger.warning("Task already removed, probably task changed")
|
||||
else:
|
||||
with self._task_lock:
|
||||
self._active_task = None
|
||||
|
||||
self._update_queue()
|
||||
logger.info("Execution done")
|
||||
logger.info("Execution done")
|
||||
|
||||
def _task_processor(self):
|
||||
logger.info("Tasking thread started")
|
||||
self._update_queue() # Initial tick if tasks added before start
|
||||
while not self.shutdown_event.is_set():
|
||||
# self._update_queue() # Initial tick if tasks added before start
|
||||
while not self._shutdown_event.is_set():
|
||||
self._running_event.wait()
|
||||
if self._active_task is not None:
|
||||
self.execute_task()
|
||||
|
||||
def _task_time_interrupter(self): # TODO revork; temporary disabled
|
||||
raise NotImplementedError
|
||||
logger.info("Timeouts thread started")
|
||||
while not self.shutdown_event.is_set():
|
||||
self._running_event.wait()
|
||||
try:
|
||||
if self.task_queue[1] is not self._active_task: # If pending task is more important than current
|
||||
if self.task_queue[1] < self._active_task: # TODO look at timeout time
|
||||
logger.warning("Changing low-priority task due timeout")
|
||||
self.change_active_task(self.task_queue[1])
|
||||
except IndexError:
|
||||
time.sleep(0.01)
|
||||
|
||||
self.execute_task()
|
||||
|
||||
if __name__ == "__main__":
|
||||
logger.addHandler(logging.StreamHandler())
|
||||
@@ -179,15 +143,14 @@ if __name__ == "__main__":
|
||||
wait(time.time()+30, interrupter)
|
||||
|
||||
tasker = TaskManager() # Lower priority first!
|
||||
tasker.start()
|
||||
|
||||
tasker.add_task(0, 10, printer, ("Task1 ", ))
|
||||
tasker.start()
|
||||
tasker.add_task(time.time(), 10, printer, ("Task1 ", ))
|
||||
tasker.add_task(time.time()+10, 5, printer, ("Task2 ", ))
|
||||
time.sleep(1)
|
||||
tasker.add_task(time.time(), 10, printer, ("Lol ", ))
|
||||
tasker.add_task(time.time()+10, 5, printer, ("Kek ", ))
|
||||
tasker.add_task(time.time()+7, 1, printer, ("Dededededee", ))
|
||||
tasker.add_task(time.time()+7, 1, printer, ("Task3", ))
|
||||
time.sleep(3)
|
||||
tasker.add_task(time.time()+7, 0, printer, ("Iiiiiii", ))
|
||||
tasker.add_task(time.time()+7, 0, printer, ("Task4", ))
|
||||
|
||||
while True:
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user