gfcp_updater.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. // Copyright © 2015 Daniel Fu <daniel820313@gmail.com>.
  2. // Copyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.
  3. // Copyright © 2021 Gridfinity, LLC. <admin@gridfinity.com>.
  4. //
  5. // All rights reserved.
  6. //
  7. // All use of this code is governed by the MIT license.
  8. // The complete license is available in the LICENSE file.
  9. package gfcp // import "go.gridfinity.dev/gfcp"
  10. import (
  11. "container/heap"
  12. "sync"
  13. "time"
  14. )
  15. var updater updateHeap
  16. func init() {
  17. updater.init()
  18. go updater.updateTask()
  19. }
  20. type entry struct {
  21. ts time.Time
  22. s *UDPSession
  23. }
  24. type updateHeap struct {
  25. entries []entry
  26. mu sync.Mutex
  27. chWakeUp chan struct{}
  28. }
  29. func (
  30. h *updateHeap,
  31. ) Len() int {
  32. return len(
  33. h.entries,
  34. )
  35. }
  36. func (
  37. h *updateHeap,
  38. ) Less(
  39. i,
  40. j int,
  41. ) bool {
  42. return h.entries[i].ts.Before(
  43. h.entries[j].ts,
  44. )
  45. }
  46. func (
  47. h *updateHeap,
  48. ) Swap(
  49. i,
  50. j int,
  51. ) {
  52. h.entries[i], h.entries[j] = h.entries[j], h.entries[i]
  53. h.entries[i].s.updaterIdx = i
  54. h.entries[j].s.updaterIdx = j
  55. }
  56. func (
  57. h *updateHeap,
  58. ) Push(
  59. x interface{},
  60. ) {
  61. h.entries = append(
  62. h.entries,
  63. x.(entry),
  64. )
  65. n := len(
  66. h.entries,
  67. )
  68. h.entries[n-1].s.updaterIdx = n - 1
  69. }
  70. func (
  71. h *updateHeap,
  72. ) Pop() interface{} {
  73. n := len(
  74. h.entries,
  75. )
  76. x := h.entries[n-1]
  77. h.entries[n-1].s.updaterIdx = -1
  78. h.entries[n-1] = entry{}
  79. h.entries = h.entries[0 : n-1]
  80. return x
  81. }
  82. func (
  83. h *updateHeap,
  84. ) init() {
  85. h.chWakeUp = make(
  86. chan struct{},
  87. 1,
  88. )
  89. }
  90. func (
  91. h *updateHeap,
  92. ) addSession(
  93. s *UDPSession,
  94. ) {
  95. h.mu.Lock()
  96. heap.Push(
  97. h,
  98. entry{
  99. time.Now(),
  100. s,
  101. },
  102. )
  103. h.mu.Unlock()
  104. h.wakeup()
  105. }
  106. func (
  107. h *updateHeap,
  108. ) removeSession(
  109. s *UDPSession,
  110. ) {
  111. h.mu.Lock()
  112. if s.updaterIdx != -1 {
  113. heap.Remove(
  114. h,
  115. s.updaterIdx,
  116. )
  117. }
  118. h.mu.Unlock()
  119. }
  120. func (
  121. h *updateHeap,
  122. ) wakeup() {
  123. select {
  124. case h.chWakeUp <- struct{}{}:
  125. default:
  126. }
  127. }
  128. func (
  129. h *updateHeap,
  130. ) updateTask() {
  131. timer := time.NewTimer(0)
  132. for {
  133. select {
  134. case <-timer.C:
  135. case <-h.chWakeUp:
  136. }
  137. h.mu.Lock()
  138. hlen := h.Len()
  139. for i := 0; i < hlen; i++ {
  140. entry := &h.entries[0]
  141. if !time.Now().Before(
  142. entry.ts,
  143. ) {
  144. interval := entry.s.update()
  145. entry.ts = time.Now().Add(
  146. interval,
  147. )
  148. heap.Fix(
  149. h,
  150. 0,
  151. )
  152. } else {
  153. break
  154. }
  155. }
  156. if hlen > 0 {
  157. timer.Reset(
  158. time.Until(
  159. h.entries[0].ts,
  160. ),
  161. )
  162. }
  163. h.mu.Unlock()
  164. }
  165. }