WorkerTaskManager.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. import bisect
  2. from collections.abc import MutableSequence
  3. class CustomSortedList(MutableSequence):
  4. def __init__(self):
  5. super().__init__()
  6. self.items = [] # (priority, added index, actual value)
  7. self.logging = False
  8. def __repr__(self):
  9. return "<{0} {1}>".format(self.__class__.__name__, self.items)
  10. def __len__(self):
  11. return len(self.items)
  12. def __getitem__(self, index):
  13. if type(index) is int:
  14. return self.items[index][2]
  15. else:
  16. return [item[2] for item in self.items[index]]
  17. def __delitem__(self, index):
  18. del self.items[index]
  19. def __setitem__(self, index, value):
  20. self.items[index] = self.valueToItem(value)
  21. def __str__(self):
  22. return str(self[:])
  23. def insert(self, index, value):
  24. self.append(value)
  25. def append(self, value):
  26. bisect.insort(self.items, self.valueToItem(value))
  27. def updateItem(self, value, update_key=None, update_value=None):
  28. self.remove(value)
  29. if update_key is not None:
  30. value[update_key] = update_value
  31. self.append(value)
  32. def sort(self, *args, **kwargs):
  33. raise Exception("Sorted list can't be sorted")
  34. def valueToItem(self, value):
  35. return (self.getPriority(value), self.getId(value), value)
  36. def getPriority(self, value):
  37. return value
  38. def getId(self, value):
  39. return id(value)
  40. def indexSlow(self, value):
  41. for pos, item in enumerate(self.items):
  42. if item[2] == value:
  43. return pos
  44. return None
  45. def index(self, value):
  46. item = (self.getPriority(value), self.getId(value), value)
  47. bisect_pos = bisect.bisect(self.items, item) - 1
  48. if bisect_pos >= 0 and self.items[bisect_pos][2] == value:
  49. return bisect_pos
  50. # Item probably changed since added, switch to slow iteration
  51. pos = self.indexSlow(value)
  52. if self.logging:
  53. print("Slow index for %s in pos %s bisect: %s" % (item[2], pos, bisect_pos))
  54. if pos is None:
  55. raise ValueError("%r not in list" % value)
  56. else:
  57. return pos
  58. def __contains__(self, value):
  59. try:
  60. self.index(value)
  61. return True
  62. except ValueError:
  63. return False
  64. class WorkerTaskManager(CustomSortedList):
  65. def __init__(self):
  66. super().__init__()
  67. self.inner_paths = {}
  68. def getPriority(self, value):
  69. return 0 - (value["priority"] - value["workers_num"] * 10)
  70. def getId(self, value):
  71. return value["id"]
  72. def __contains__(self, value):
  73. return value["inner_path"] in self.inner_paths
  74. def __delitem__(self, index):
  75. # Remove from inner path cache
  76. del self.inner_paths[self.items[index][2]["inner_path"]]
  77. super().__delitem__(index)
  78. # Fast task search by inner_path
  79. def append(self, task):
  80. if task["inner_path"] in self.inner_paths:
  81. raise ValueError("File %s already has a task" % task["inner_path"])
  82. super().append(task)
  83. # Create inner path cache for faster lookup by filename
  84. self.inner_paths[task["inner_path"]] = task
  85. def remove(self, task):
  86. if task not in self:
  87. raise ValueError("%r not in list" % task)
  88. else:
  89. super().remove(task)
  90. def findTask(self, inner_path):
  91. return self.inner_paths.get(inner_path, None)