Source code for draugr.threading_utilities.skipper

import queue
import threading

__all__ = ["Skipper"]


[docs]class Skipper: """description"""
[docs] def __init__(self, fun): self.Q = queue.Queue(1) self.fun = fun self._stop = False self.thread = threading.Thread(target=self.loop) self.thread.start()
[docs] def loop(self): """description""" while True: if self._stop: break (args, kwargs) = self.Q.get() if self._stop: break self.fun(*args, **kwargs)
[docs] def stop(self): """description""" self._stop = True try: self.Q.put_nowait((None, None)) except: pass self.thread.join()
def __call__(self, *args, **kwargs): while not self.Q.empty(): self.Q.get_nowait() self.Q.put((args, kwargs))
if __name__ == "__main__": def yhhgsady(): """description""" import time def worker(wid): """ :param wid: :type wid: """ print(f"Worker {wid} Start") time.sleep(1) print(f"Worker {wid} END") S = Skipper(worker) S(1) time.sleep(0.1) S(2) time.sleep(0.1) S(3) time.sleep(4) S(4) time.sleep(0.2) S(5) time.sleep(0.2) S(6) time.sleep(0.2) S(7) time.sleep(4) S.stop() def yhhgsad2y(): """description""" import time def worker(wid): """ :param wid: :type wid: """ print(f"Worker {wid} Start") time.sleep(1) print(f"Worker {wid} END") S = Skipper(worker) S(1) time.sleep(4) S.stop() # yhhgsady() yhhgsad2y()