mirror of
https://github.com/CopterExpress/clever-show.git
synced 2026-05-26 07:07:58 +00:00
260 lines
8.9 KiB
Python
260 lines
8.9 KiB
Python
import heapq
|
|
import time
|
|
import logging
|
|
import threading
|
|
import collections
|
|
import itertools
|
|
|
|
logger = logging.getLogger(__name__)
|
|
Task = collections.namedtuple("Task", ["func", "args", "kwargs", "delayable", ])
|
|
|
|
INTERRUPTER = threading.Event()
|
|
|
|
|
|
def wait(end, interrupter=INTERRUPTER, maxsleep=0.1):
|
|
# Added features to interrupter sleep and set max sleeping interval
|
|
|
|
while not interrupter.is_set(): # Basic implementation of pause module until()
|
|
now = time.time()
|
|
diff = min(end - now, maxsleep)
|
|
if diff <= 0:
|
|
break
|
|
else:
|
|
time.sleep(diff / 2)
|
|
else:
|
|
logger.warning("Waiting was interrupted!")
|
|
#print("Waiting was interrupted!")
|
|
|
|
|
|
class TaskManager(object):
|
|
def __init__(self):
|
|
self.task_queue = []
|
|
self._counter = itertools.count() # unique sequence count
|
|
|
|
self._processor_thread = threading.Thread(target=self._task_processor, name="Task processing thread")
|
|
self._processor_thread.daemon = True
|
|
self._task_queue_lock = threading.RLock()
|
|
|
|
self._running_event = threading.Event()
|
|
self._reset_event = threading.Event()
|
|
self._wait_interrupt_event = threading.Event()
|
|
self._task_interrupt_event = threading.Event()
|
|
self._shutdown_event = threading.Event()
|
|
|
|
self._last_task = None
|
|
|
|
self._timeshift = 0.0
|
|
|
|
def add_task(self, timestamp, priority, task_function,
|
|
task_args=(), task_kwargs=None, task_delayable=False):
|
|
|
|
if task_kwargs is None:
|
|
task_kwargs = {}
|
|
|
|
self._wait_interrupt_event.set()
|
|
self._running_event.clear()
|
|
|
|
task = Task(task_function, task_args, task_kwargs, task_delayable)
|
|
|
|
count = next(self._counter)
|
|
entry = (timestamp, priority, count, task)
|
|
|
|
with self._task_queue_lock:
|
|
if self.task_queue:
|
|
entry_old = self.task_queue[0]
|
|
else:
|
|
entry_old = entry
|
|
|
|
heapq.heappush(self.task_queue, entry)
|
|
|
|
if self.task_queue[0] != entry_old:
|
|
self._task_interrupt_event.set()
|
|
#print("Task queue updated with more priority task")
|
|
|
|
if self._reset_event.is_set():
|
|
self._task_interrupt_event.set()
|
|
self._reset_event.clear()
|
|
#print("Task queue updated after reset")
|
|
|
|
self._wait_interrupt_event.clear()
|
|
self._running_event.set()
|
|
|
|
# #print(self.task_queue)
|
|
|
|
def pop_task(self):
|
|
with self._task_queue_lock:
|
|
if self.task_queue:
|
|
return heapq.heappop(self.task_queue)
|
|
raise KeyError('Pop from an empty priority queue')
|
|
|
|
def get_last_task_name(self):
|
|
try:
|
|
name = self._last_task.func.__name__
|
|
if name == 'execute_frame':
|
|
return name.kwargs["frame"].action
|
|
return name
|
|
except AttributeError:
|
|
return None
|
|
|
|
def get_current_task(self):
|
|
try:
|
|
start_time, priority, count, task = self.task_queue[0]
|
|
except IndexError as e:
|
|
logger.debug("Task queue checking exception: {}".format(e))
|
|
return "No task"
|
|
else:
|
|
if self._running_event.is_set():
|
|
time_to_start = start_time - time.time()
|
|
if time_to_start > 0:
|
|
return "{} in {:.1f} s".format(task.func.__name__,time_to_start)
|
|
return task.func.__name__
|
|
else:
|
|
return "paused"
|
|
|
|
def start(self):
|
|
#print("Task manager is started")
|
|
logger.info("Task manager is started")
|
|
self._processor_thread.start()
|
|
self.resume()
|
|
|
|
def stop(self):
|
|
self._timeshift = 0.0
|
|
self.pause(interrupt=True)
|
|
with self._task_queue_lock:
|
|
del self.task_queue[:]
|
|
|
|
def shutdown(self, timeout=5.0):
|
|
self.stop()
|
|
self._shutdown_event.set()
|
|
self._processor_thread.join(timeout=timeout)
|
|
|
|
def pause(self, interrupt=True):
|
|
if interrupt:
|
|
self._wait_interrupt_event.set()
|
|
self._task_interrupt_event.set()
|
|
self._running_event.clear()
|
|
logger.info("Task queue paused")
|
|
#print("Task queue paused")
|
|
|
|
def resume(self, time_to_start_next_task=0.0):
|
|
if self.task_queue:
|
|
next_task_time = self.task_queue[0][0]
|
|
if time_to_start_next_task > next_task_time:
|
|
self._timeshift = time_to_start_next_task - next_task_time
|
|
self._wait_interrupt_event.clear()
|
|
self._task_interrupt_event.clear()
|
|
self._running_event.set()
|
|
logger.info("Task queue resumed with timeshift {}".format(self._timeshift))
|
|
#print("Task queue resumed with timeshift {}".format(self._timeshift))
|
|
|
|
def reset(self, interrupt_next_task=True):
|
|
self.stop()
|
|
self.resume()
|
|
if interrupt_next_task:
|
|
self._reset_event.set()
|
|
|
|
def execute_task(self):
|
|
delta = 0.1
|
|
|
|
with self._task_queue_lock:
|
|
try:
|
|
start_time, priority, count, task = self.task_queue[0]
|
|
except IndexError as e:
|
|
logger.debug("Task queue checking exception: {}".format(e))
|
|
self._timeshift = 0.0
|
|
self._wait_interrupt_event.clear()
|
|
self._task_interrupt_event.clear()
|
|
self._running_event.clear()
|
|
return
|
|
|
|
task_start_time = start_time + self._timeshift
|
|
logger.info("Executing task {}".format(task.func.__name__))
|
|
logger.debug("Waiting util task execution time:{}".format(task_start_time))
|
|
#print("Waiting until task execution time:{}".format(task_start_time))
|
|
wait(task_start_time, self._wait_interrupt_event)
|
|
|
|
if task_start_time - time.time() > 0.01:
|
|
logger.error("Need for waiting more")
|
|
self._wait_interrupt_event.clear()
|
|
return
|
|
|
|
if not self._wait_interrupt_event.is_set():
|
|
#logger.info("Executing task {}".format(task))
|
|
#print("{} Executing task {}".format(time.time(),task))
|
|
#print("Interrupter is set: {}".format(self._task_interrupt_event.is_set()))
|
|
try:
|
|
task.func(*task.args, interrupter=self._task_interrupt_event, **task.kwargs)
|
|
|
|
except Exception as e:
|
|
try:
|
|
logger.error("Error '{}' occurred in task {}".format(e, task))
|
|
#print("Error '{}' occurred in task {}".format(e, task))
|
|
if str(e) == 'STOP':
|
|
self.reset()
|
|
logger.error("Return after STOP exception, can't arm!")
|
|
return
|
|
except (KeyError, TypeError):
|
|
logger.error(e)
|
|
else:
|
|
logger.error("Task interrupted before execution")
|
|
#print("Task interrupted before execution")
|
|
self._wait_interrupt_event.clear()
|
|
return
|
|
|
|
if time.time() > start_time:
|
|
try:
|
|
start_time_n, priority_n, count_n, task_n = self.task_queue[0]
|
|
except IndexError as e:
|
|
logger.warning("Timeout checking exception: {}".format(e))
|
|
self._timeshift = 0.0
|
|
self._wait_interrupt_event.clear()
|
|
self._task_interrupt_event.clear()
|
|
self._running_event.clear()
|
|
return
|
|
if (task_n == task) and (start_time_n == start_time):
|
|
try:
|
|
self.pop_task()
|
|
except KeyError as e:
|
|
logger.error(str(e))
|
|
self._last_task = task
|
|
#try:
|
|
#print("Pop {} function!".format(task.func.__name__))
|
|
#except Exception as e:
|
|
#print("Pop something!")
|
|
|
|
if self._task_interrupt_event.is_set():
|
|
self._task_interrupt_event.clear()
|
|
|
|
logger.info("Execution done")
|
|
#print("Execution done")
|
|
|
|
def _task_processor(self):
|
|
logger.info("Tasking thread started")
|
|
# self._update_queue() # Initial tick if tasks added before start
|
|
while not self._shutdown_event.is_set():
|
|
self._running_event.wait()
|
|
self.execute_task()
|
|
|
|
if __name__ == "__main__":
|
|
#logger.addHandler(logging.StreamHandler())
|
|
#logger.setLevel(logging.DEBUG)
|
|
|
|
def printer(stri, interrupter, *args, **kwargs):
|
|
#logger.info("String: {}, timenow: {}".format(stri, time.time()))
|
|
wait(time.time()+30, interrupter)
|
|
|
|
tasker = TaskManager() # Lower priority first!
|
|
|
|
tasker.start()
|
|
tasker.add_task(time.time(), 10, printer, ("Task1 ", ))
|
|
tasker.add_task(time.time()+10, 5, printer, ("Task2 ", ))
|
|
time.sleep(1)
|
|
tasker.add_task(time.time()+3, 1, printer, ("Task3", ))
|
|
tasker.pause()
|
|
time.sleep(5)
|
|
tasker.resume(time_to_start_next_task=time.time()+1)
|
|
tasker.add_task(time.time()+7, 0, printer, ("Task4", ))
|
|
|
|
while True:
|
|
pass
|