gfcp_updater.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. // Copyright © 2021 Jeffrey H. Johnson <trnsz@pobox.com>.
  2. // Copyright © 2015 Daniel Fu <daniel820313@gmail.com>.
  3. // Copyright © 2019 Loki 'l0k18' Verloren <stalker.loki@protonmail.ch>.
  4. // Copyright © 2021 Gridfinity, LLC. <admin@gridfinity.com>.
  5. //
  6. // All rights reserved.
  7. //
  8. // All use of this code is governed by the MIT license.
  9. // The complete license is available in the LICENSE file.
  10. package gfcp
  11. import (
  12. "container/heap"
  13. "sync"
  14. "time"
  15. )
  16. var updater updateHeap
  17. func init() {
  18. updater.init()
  19. go updater.updateTask()
  20. }
  21. type entry struct {
  22. ts time.Time
  23. s *UDPSession
  24. }
  25. type updateHeap struct {
  26. entries []entry
  27. mu sync.Mutex
  28. chWakeUp chan struct{}
  29. }
  30. func (
  31. h *updateHeap,
  32. ) Len() int {
  33. return len(
  34. h.entries,
  35. )
  36. }
  37. func (
  38. h *updateHeap,
  39. ) Less(
  40. i,
  41. j int,
  42. ) bool {
  43. return h.entries[i].ts.Before(
  44. h.entries[j].ts,
  45. )
  46. }
  47. func (
  48. h *updateHeap,
  49. ) Swap(
  50. i,
  51. j int,
  52. ) {
  53. h.entries[i], h.entries[j] = h.entries[j], h.entries[i]
  54. h.entries[i].s.updaterIdx = i
  55. h.entries[j].s.updaterIdx = j
  56. }
  57. func (
  58. h *updateHeap,
  59. ) Push(
  60. x interface{},
  61. ) {
  62. h.entries = append(
  63. h.entries,
  64. x.(entry),
  65. )
  66. n := len(
  67. h.entries,
  68. )
  69. h.entries[n-1].s.updaterIdx = n - 1
  70. }
  71. func (
  72. h *updateHeap,
  73. ) Pop() interface{} {
  74. n := len(
  75. h.entries,
  76. )
  77. x := h.entries[n-1]
  78. h.entries[n-1].s.updaterIdx = -1
  79. h.entries[n-1] = entry{}
  80. h.entries = h.entries[0 : n-1]
  81. return x
  82. }
  83. func (
  84. h *updateHeap,
  85. ) init() {
  86. h.chWakeUp = make(
  87. chan struct{},
  88. 1,
  89. )
  90. }
  91. func (
  92. h *updateHeap,
  93. ) addSession(
  94. s *UDPSession,
  95. ) {
  96. h.mu.Lock()
  97. heap.Push(
  98. h,
  99. entry{
  100. time.Now(),
  101. s,
  102. },
  103. )
  104. h.mu.Unlock()
  105. h.wakeup()
  106. }
  107. func (
  108. h *updateHeap,
  109. ) removeSession(
  110. s *UDPSession,
  111. ) {
  112. h.mu.Lock()
  113. if s.updaterIdx != -1 {
  114. heap.Remove(
  115. h,
  116. s.updaterIdx,
  117. )
  118. }
  119. h.mu.Unlock()
  120. }
  121. func (
  122. h *updateHeap,
  123. ) wakeup() {
  124. select {
  125. case h.chWakeUp <- struct{}{}:
  126. default:
  127. }
  128. }
  129. func (
  130. h *updateHeap,
  131. ) updateTask() {
  132. timer := time.NewTimer(0)
  133. for {
  134. select {
  135. case <-timer.C:
  136. case <-h.chWakeUp:
  137. }
  138. h.mu.Lock()
  139. hlen := h.Len()
  140. for i := 0; i < hlen; i++ {
  141. entry := &h.entries[0]
  142. if !time.Now().Before(
  143. entry.ts,
  144. ) {
  145. interval := entry.s.update()
  146. entry.ts = time.Now().Add(
  147. interval,
  148. )
  149. heap.Fix(
  150. h,
  151. 0,
  152. )
  153. } else {
  154. break
  155. }
  156. }
  157. if hlen > 0 {
  158. timer.Reset(
  159. time.Until(
  160. h.entries[0].ts,
  161. ),
  162. )
  163. }
  164. h.mu.Unlock()
  165. }
  166. }