scheduler.lua 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. -- scheduler.lua
  2. -- Implementation of a Railway time schedule queue
  3. -- In contrast to the LuaATC interrupt queue, this one can handle many different
  4. -- event receivers. This is done by registering a callback with the scheduler
  5. local ln = advtrains.lines
  6. local sched = {}
  7. local UNITS_THRESH = 10
  8. local MAX_PER_ITER = 10
  9. local callbacks = {}
  10. -- Register a handler callback to handle scheduler items.
  11. -- e - a handler identifier (corresponds to "handler" in enqueue() )
  12. -- func - a function(evtdata) to be executed when a schedule item expires
  13. -- evtdata - arbitrary data that has been passed into enqueue()
  14. function sched.register_callback(e, func)
  15. callbacks[e] = func
  16. end
  17. --[[
  18. {
  19. t = <railway time in seconds>
  20. e = <handler callback>
  21. d = <data table>
  22. u = <unit identifier>
  23. }
  24. The "unit identifier" is there to prevent schedule overflows. It can be, for example, the position hash
  25. of a node or a train ID. If the number of schedules for a unit exceeds UNITS_THRESH, further schedules are
  26. blocked.
  27. ]]--
  28. local queue = {}
  29. local units_cnt = {}
  30. function sched.load(data)
  31. if data then
  32. for i,elem in ipairs(data) do
  33. table.insert(queue, elem)
  34. units_cnt[elem.u] = (units_cnt[elem.u] or 0) + 1
  35. end
  36. atlog("[lines][scheduler] Loaded the schedule queue,",#data,"items.")
  37. end
  38. end
  39. function sched.save()
  40. return queue
  41. end
  42. function sched.run()
  43. local ctime = ln.rwt.get_time()
  44. local cnt = 0
  45. local ucn, elem
  46. while cnt <= MAX_PER_ITER do
  47. elem = queue[1]
  48. if elem and elem.t <= ctime then
  49. table.remove(queue, 1)
  50. if callbacks[elem.e] then
  51. -- run it
  52. callbacks[elem.e](elem.d)
  53. else
  54. atwarn("[lines][scheduler] "..S("No callback to handle schedule"),elem)
  55. end
  56. cnt=cnt+1
  57. ucn = units_cnt[elem.u]
  58. if ucn and ucn>0 then
  59. units_cnt[elem.u] = ucn - 1
  60. end
  61. else
  62. break
  63. end
  64. end
  65. end
  66. -- Enqueue a new scheduled item to be executed at "rwtime"
  67. -- handler: a string identifying the handler to use (registered with sched.register_callback())
  68. -- evtdata: Arbitrary Lua data to be passed to the handler callback
  69. -- unitid: An arbitrary string uniquely identifying the thing that is issuing this enqueue.
  70. -- used to prevent expotentially growing "scheduler bombs"
  71. -- unitlim: Custom override for UNITS_THRESH (see there)
  72. function sched.enqueue(rwtime, handler, evtdata, unitid, unitlim)
  73. local qtime = ln.rwt.to_secs(rwtime)
  74. assert(type(handler)=="string")
  75. assert(type(unitid)=="string")
  76. assert(type(unitlim)=="number")
  77. local cnt=1
  78. local ucn, elem
  79. ucn = (units_cnt[unitid] or 0)
  80. local ulim=(unitlim or UNITS_THRESH)
  81. if ucn >= ulim then
  82. atlog("[lines][scheduler] discarding enqueue for",handler,"(limit",ulim,") because unit",unitid,"has already",ucn,"schedules enqueued")
  83. return false
  84. end
  85. while true do
  86. elem = queue[cnt]
  87. if not elem or elem.t > qtime then
  88. table.insert(queue, cnt, {
  89. t=qtime,
  90. e=handler,
  91. d=evtdata,
  92. u=unitid,
  93. })
  94. units_cnt[unitid] = ucn + 1
  95. return true
  96. end
  97. cnt = cnt+1
  98. end
  99. end
  100. -- See enqueue(). Same meaning, except that rwtime is relative to now.
  101. function sched.enqueue_in(rwtime, handler, evtdata, unitid, unitlim)
  102. local ctime = ln.rwt.get_time()
  103. local rwtime_s = ln.rwt.to_secs(rwtime)
  104. sched.enqueue(ctime + rwtime_s, handler, evtdata, unitid, unitlim)
  105. end
  106. -- Discards all schedules for unit "unitid" (removes them from the queue)
  107. function sched.discard_all(unitid)
  108. local i = 1
  109. while i<=#queue do
  110. if queue[i].u == unitid then
  111. table.remove(queue,i)
  112. else
  113. i=i+1
  114. end
  115. end
  116. units_cnt[unitid] = 0
  117. end
  118. ln.sched = sched