scheduler-locality.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import sim
  2. def getScoreMetricTime(thread_id):
  3. return long(sim.stats.get('thread', thread_id, 'nonidle_elapsed_time'))
  4. def getScoreMetricInstructions(thread_id):
  5. return long(sim.stats.get('thread', thread_id, 'instruction_count'))
  6. class Thread:
  7. def __init__(self, thread_id, getScoreMetric):
  8. self.thread_id = thread_id
  9. self.getScoreMetric = lambda: getScoreMetric(thread_id)
  10. self.core = None
  11. self.runnable = False
  12. self.unscheduled = False
  13. self.score = 0 # Accumulated score
  14. self.metric_last = 0 # State at start of last interval
  15. sim.thread.set_thread_affinity(self.thread_id, ())
  16. def updateScore(self):
  17. metric_now = self.getScoreMetric()
  18. self.score += metric_now - self.metric_last
  19. self.metric_last = metric_now
  20. def setScore(self, score):
  21. self.score = score
  22. self.metric_last = self.getScoreMetric()
  23. def setCore(self, core_id, time = -1):
  24. self.core = core_id
  25. if core_id is None:
  26. self.updateScore()
  27. self.last_scheduled_out = time
  28. sim.thread.set_thread_affinity(self.thread_id, ())
  29. else:
  30. self.last_scheduled_in = time
  31. sim.thread.set_thread_affinity(self.thread_id, [ c == core_id for c in range(sim.config.ncores) ])
  32. def __repr__(self):
  33. return 'Thread(%d, %s, score = %d)' % (self.thread_id, 'core = %d' % self.core if self.core is not None else 'no core', self.score)
  34. class SchedulerLocality:
  35. def setup(self, args):
  36. args = dict(enumerate((args or '').split(':')))
  37. interval_ns = long(args.get(0, None) or 10000000)
  38. scheduler_type = args.get(1, 'equal_time')
  39. core_mask = args.get(2, '')
  40. if scheduler_type == 'equal_time':
  41. self.getScoreMetric = getScoreMetricTime
  42. elif scheduler_type == 'equal_instructions':
  43. self.getScoreMetric = getScoreMetricInstructions
  44. else:
  45. raise ValueError('Invalid scheduler type %s' % scheduler_type)
  46. if core_mask:
  47. core_mask = map(int, core_mask.split(',')) + [0]*sim.config.ncores
  48. self.cores = [ core for core in range(sim.config.ncores) if core_mask[core] ]
  49. else:
  50. self.cores = range(sim.config.ncores)
  51. sim.util.Every(interval_ns * sim.util.Time.NS, self.periodic)
  52. self.threads = {}
  53. self.last_core = 0
  54. def hook_thread_start(self, thread_id, time):
  55. self.threads[thread_id] = Thread(thread_id, self.getScoreMetric)
  56. self.threads[thread_id].runnable = True
  57. # Initial assignment: one thread per core until cores are exhausted
  58. if self.last_core < len(self.cores):
  59. self.threads[thread_id].setCore(self.cores[self.last_core], sim.stats.time())
  60. self.last_core += 1
  61. else:
  62. self.threads[thread_id].setCore(None, sim.stats.time())
  63. def hook_thread_exit(self, thread_id, time):
  64. self.hook_thread_stall(thread_id, 'exit', time)
  65. def hook_thread_stall(self, thread_id, reason, time):
  66. if reason == 'unscheduled':
  67. # Ignore calls due to the thread being scheduled out
  68. self.threads[thread_id].unscheduled = True
  69. else:
  70. core = self.threads[thread_id].core
  71. self.threads[thread_id].setCore(None, time)
  72. self.threads[thread_id].runnable = False
  73. # Schedule a new thread (runnable, but not running) on this free core
  74. threads = [ thread for thread in self.threads.values() if thread.runnable and thread.core is None ]
  75. if threads:
  76. # Order by score
  77. threads.sort(key = lambda thread: thread.score)
  78. threads[0].setCore(core, time)
  79. def hook_thread_resume(self, thread_id, woken_by, time):
  80. if self.threads[thread_id].unscheduled:
  81. # Ignore calls due to the thread being scheduled back in
  82. self.threads[thread_id].unscheduled = False
  83. else:
  84. self.threads[thread_id].setScore(min([ thread.score for thread in self.threads.values() ]))
  85. self.threads[thread_id].runnable = True
  86. # If there is a free core, move us there now
  87. used_cores = set([ thread.core for thread in self.threads.values() if thread.core is not None ])
  88. free_cores = set(self.cores) - used_cores
  89. if len(free_cores):
  90. self.threads[thread_id].setCore(list(free_cores)[0], time)
  91. def periodic(self, time, time_delta):
  92. # Update thread scores
  93. [ thread.updateScore() for thread in self.threads.values() if thread.core is not None ]
  94. # Get a list of all runnable threads
  95. threads = [ thread for thread in self.threads.values() if thread.runnable ]
  96. # Order by score
  97. threads.sort(key = lambda thread: thread.score)
  98. # Select threads to run now, one per core
  99. threads = threads[:len(self.cores)]
  100. #print ', '.join(map(repr, threads))
  101. # Filter out threads that are already running, and keep them on their current core
  102. keep_threads = [ thread for thread in threads if thread.core is not None ]
  103. used_cores = set([ thread.core for thread in keep_threads ])
  104. # Move new threads to free cores
  105. free_cores = set(self.cores) - used_cores
  106. threads = [ thread for thread in threads if thread.core is None ]
  107. assert(len(free_cores) >= len(threads))
  108. for thread, core in zip(threads, sorted(free_cores)):
  109. current_thread = [ t for t in self.threads.values() if t.core == core ]
  110. if current_thread:
  111. current_thread[0].setCore(None)
  112. thread.setCore(core, time)
  113. assert thread.runnable
  114. sim.util.register(SchedulerLocality())