tasking_lib: add timeshift for correct work of pause function

This commit is contained in:
Arthur Golubtsov
2019-06-08 00:45:46 +03:00
parent bd186e01ee
commit 21a1ac7d3c

View File

@@ -39,6 +39,8 @@ class TaskManager(object):
self._task_interrupt_event = threading.Event()
self._shutdown_event = threading.Event()
self._timeshift = 0
def add_task(self, timestamp, priority, task_function,
task_args=(), task_kwargs={}, task_delayable=False):
@@ -85,10 +87,10 @@ class TaskManager(object):
self.resume()
def stop(self):
self._timeshift = 0
self.pause(interrupt=True)
with self._task_queue_lock:
del self.task_queue[:]
print self.task_queue
def shutdown(self):
self.stop()
@@ -106,34 +108,33 @@ class TaskManager(object):
logger.info("Task queue paused")
print("Task queue paused")
def resume(self):
def resume(self, time_to_start_next_task = 0):
if self.task_queue:
next_task_time = self.task_queue[0][0]
if time_to_start_next_task > next_task_time:
self._timeshift = time_to_start_next_task - next_task_time
self._running_event.set()
self._wait_interrupt_event.clear()
self._task_interrupt_event.clear()
logger.info("Task queue resumed")
print("Task queue resumed")
logger.info("Task queue resumed with timeshift {}".format(self._timeshift))
print("Task queue resumed with timeshift {}".format(self._timeshift))
def reset(self):
self.stop()
self.resume()
self._reset_event.set()
# def interrupt(self):
# self._interrupt_event.set()
# while self._interrupt_event.is_set():
# pass
# logger.info("Task execution successfully interrupted")
def execute_task(self):
with self._task_queue_lock:
if self.task_queue:
start_time, priority, count, task = self.task_queue[0]
else:
self._timeshift = 0
return
logger.info("Waiting util task execution time:{}".format(start_time))
print("Waiting util task execution time:{}".format(start_time))
wait(start_time, self._wait_interrupt_event)
logger.info("Waiting util task execution time:{}".format(start_time + self._timeshift))
print("Waiting util task execution time:{}".format(start_time + self._timeshift))
wait(start_time + self._timeshift, self._wait_interrupt_event)
if not self._wait_interrupt_event.is_set():
logger.info("Executing task {}".format(task))
@@ -185,8 +186,10 @@ if __name__ == "__main__":
tasker.add_task(time.time(), 10, printer, ("Task1 ", ))
tasker.add_task(time.time()+10, 5, printer, ("Task2 ", ))
time.sleep(1)
tasker.add_task(time.time()+7, 1, printer, ("Task3", ))
time.sleep(3)
tasker.add_task(time.time()+3, 1, printer, ("Task3", ))
tasker.pause()
time.sleep(5)
tasker.resume(time_to_start_next_task=time.time()+1)
tasker.add_task(time.time()+7, 0, printer, ("Task4", ))
while True: