mirror of
https://github.com/CopterExpress/clever-show.git
synced 2026-06-08 13:04:31 +00:00
tasking_lib: improve interruption, now it looks like working
This commit is contained in:
@@ -8,24 +8,20 @@ import itertools
|
||||
logger = logging.getLogger(__name__)
|
||||
Task = collections.namedtuple("Task", ["func", "args", "kwargs", "delayable", ])
|
||||
|
||||
def wait(end, interrupter=None, maxsleep=0.1): # Added features to interrupter sleep and set max sleeping interval
|
||||
INTERRUPTER = threading.Event()
|
||||
|
||||
interrupted = False
|
||||
def wait(end, interrupter=INTERRUPTER, maxsleep=0.1): # Added features to interrupter sleep and set max sleeping interval
|
||||
|
||||
while not interrupted: # Basic implementation of pause module until()
|
||||
while not interrupter.is_set(): # Basic implementation of pause module until()
|
||||
now = time.time()
|
||||
diff = min(end - now, maxsleep)
|
||||
if interrupter is None:
|
||||
interrupted = False
|
||||
else:
|
||||
interrupted = interrupter.is_set()
|
||||
if diff <= 0:
|
||||
break
|
||||
else:
|
||||
time.sleep(diff / 2)
|
||||
|
||||
if interrupted:
|
||||
else:
|
||||
logger.warning("Waiting was interrupted!")
|
||||
print("Waiting was interrupted!")
|
||||
|
||||
|
||||
class TaskManager(object):
|
||||
@@ -38,24 +34,41 @@ class TaskManager(object):
|
||||
self._task_queue_lock = threading.RLock()
|
||||
|
||||
self._running_event = threading.Event()
|
||||
self._interrupt_event = threading.Event()
|
||||
self._reset_event = threading.Event()
|
||||
self._wait_interrupt_event = threading.Event()
|
||||
self._task_interrupt_event = threading.Event()
|
||||
self._shutdown_event = threading.Event()
|
||||
|
||||
def add_task(self, timestamp, priority, task_function,
|
||||
task_args=(), task_kwargs=None, task_delayable=False):
|
||||
task_args=(), task_kwargs={}, task_delayable=False):
|
||||
|
||||
self._interrupt_event.set()
|
||||
self._wait_interrupt_event.set()
|
||||
self._running_event.clear()
|
||||
|
||||
if task_kwargs is None:
|
||||
task_kwargs = {}
|
||||
|
||||
task = Task(task_function, task_args, task_kwargs, task_delayable)
|
||||
task = Task(task_function, task_args, task_kwargs, task_delayable)
|
||||
|
||||
count = next(self._counter)
|
||||
entry = (timestamp, priority, count, task)
|
||||
|
||||
|
||||
with self._task_queue_lock:
|
||||
if self.task_queue:
|
||||
entry_old = self.task_queue[0]
|
||||
else:
|
||||
entry_old = entry
|
||||
|
||||
heapq.heappush(self.task_queue, entry)
|
||||
|
||||
if self.task_queue[0] != entry_old:
|
||||
self._task_interrupt_event.set()
|
||||
print("Task queue updated with more priority task")
|
||||
|
||||
if self._reset_event.is_set():
|
||||
self._task_interrupt_event.set()
|
||||
self._reset_event.clear()
|
||||
print("Task queue updated after reset")
|
||||
|
||||
self._wait_interrupt_event.clear()
|
||||
self._running_event.set()
|
||||
|
||||
print(self.task_queue)
|
||||
|
||||
@@ -72,29 +85,38 @@ class TaskManager(object):
|
||||
self.resume()
|
||||
|
||||
def stop(self):
|
||||
self.pause()
|
||||
self.pause(interrupt=True)
|
||||
with self._task_queue_lock:
|
||||
del self.task_queue[:]
|
||||
print self.task_queue
|
||||
|
||||
def shutdown(self):
|
||||
self.stop()
|
||||
self._shutdown_event.set()
|
||||
self._wait_interrupt_event.set()
|
||||
self._task_interrupt_event.set()
|
||||
self._running_event.clear()
|
||||
self._processor_thread.join(timeout=5)
|
||||
|
||||
def pause(self, interrupt=False):
|
||||
# if interrupt:
|
||||
# self.interrupt()
|
||||
def pause(self, interrupt=True):
|
||||
if interrupt:
|
||||
self._wait_interrupt_event.set()
|
||||
self._task_interrupt_event.set()
|
||||
self._running_event.clear()
|
||||
logger.info("Task queue paused")
|
||||
print("Task queue paused")
|
||||
|
||||
def resume(self):
|
||||
self._running_event.set()
|
||||
self._wait_interrupt_event.clear()
|
||||
self._task_interrupt_event.clear()
|
||||
logger.info("Task queue resumed")
|
||||
print("Task queue resumed")
|
||||
|
||||
def reset(self):
|
||||
self.stop()
|
||||
self.resume()
|
||||
self._reset_event.set()
|
||||
|
||||
# def interrupt(self):
|
||||
# self._interrupt_event.set()
|
||||
@@ -110,23 +132,37 @@ class TaskManager(object):
|
||||
return
|
||||
|
||||
logger.info("Waiting util task execution time:{}".format(start_time))
|
||||
wait(start_time, self._interrupt_event, 1)
|
||||
print("Waiting util task execution time:{}".format(start_time))
|
||||
wait(start_time, self._wait_interrupt_event)
|
||||
|
||||
if not self._interrupt_event.is_set():
|
||||
if not self._wait_interrupt_event.is_set():
|
||||
logger.info("Executing task {}".format(task))
|
||||
print("Executing task {}".format(task))
|
||||
try:
|
||||
task.func(*task.args, interrupter=self._interrupt_event, **task.kwargs)
|
||||
task.func(*task.args, interrupter=self._task_interrupt_event, **task.kwargs)
|
||||
except Exception as e:
|
||||
logger.error("Error '{}' occurred in task {}".format(e, task))
|
||||
print("Error '{}' occurred in task {}".format(e, task))
|
||||
else:
|
||||
logger.warning("Task interrupted before execution")
|
||||
self._interrupt_event.clear()
|
||||
print("Task interrupted before execution")
|
||||
self._wait_interrupt_event.clear()
|
||||
return
|
||||
|
||||
if time.time() > start_time:
|
||||
self.pop_task()
|
||||
start_time_n, priority_n, count_n, task_n = self.task_queue[0]
|
||||
if (task_n == task) and (start_time_n == start_time):
|
||||
self.pop_task()
|
||||
try:
|
||||
print("Pop {} function!".format(task.func.__name__))
|
||||
except Exception as e:
|
||||
print("Pop something!")
|
||||
|
||||
if self._task_interrupt_event.is_set():
|
||||
self._task_interrupt_event.clear()
|
||||
|
||||
logger.info("Execution done")
|
||||
print("Execution done")
|
||||
|
||||
def _task_processor(self):
|
||||
logger.info("Tasking thread started")
|
||||
|
||||
Reference in New Issue
Block a user