threadTimeWheel.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. import threading
  2. import time
  3. from concurrent.futures.thread import ThreadPoolExecutor
  4. class TimeLoop(threading.Thread):
  5. __oninit = int(time.time())
  6. __probe = 0
  7. __loop = {}
  8. __max = 10
  9. __task = 0
  10. __old = 0
  11. __errors = []
  12. __pool = None
  13. __isPool = True
  14. def run(self):
  15. while True:
  16. if self.__probe > self.__max:
  17. self.__probe = 0
  18. self.__oninit = int(time.time())
  19. self.__probe = int(time.time()) - self.__oninit
  20. if self.__probe in self.__loop:
  21. if len(self.__loop[self.__probe]) > 0:
  22. for i in self.__loop[self.__probe]:
  23. try:
  24. if self.__isPool:
  25. self.__pool.submit(i)
  26. else:
  27. i()
  28. except Exception as e:
  29. self.__errors.append(str(e))
  30. finally:
  31. self.__task-=1
  32. self.__old +=1
  33. self.__loop[self.__probe] = []
  34. if len(self.__errors) > 0:
  35. print(self.__errors.pop())
  36. def start(self,isPool=True,size = 3):
  37. self.__isPool = isPool
  38. if self.__isPool:
  39. self.__pool = ThreadPoolExecutor(thread_name_prefix="TimeWheelRunLoad", max_workers=size)
  40. super().start()
  41. def __str__(self):
  42. return f"{self.__loop}"
  43. def insertTask(self,task,ttl):
  44. if (self.__probe + ttl) > self.__max:
  45. self.__max = ttl + self.__probe
  46. self.__loop[self.__max] = []
  47. key = self.__probe + ttl
  48. if key in self.__loop:
  49. self.__loop[key].append(task)
  50. else:
  51. self.__loop[key] = []
  52. self.__loop[key].append(task)
  53. self.__task += 1
  54. def getOldTaskSize(self):
  55. return self.__old
  56. def getAllTaskSize(self):
  57. return self.__task
  58. def getIsPool(self):
  59. return self.__isPool