feed_test.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package event
  17. import (
  18. "fmt"
  19. "reflect"
  20. "sync"
  21. "testing"
  22. "time"
  23. )
  24. func TestFeedPanics(t *testing.T) {
  25. {
  26. var f Feed
  27. f.Send(int(2))
  28. want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))}
  29. if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
  30. t.Error(err)
  31. }
  32. }
  33. {
  34. var f Feed
  35. ch := make(chan int)
  36. f.Subscribe(ch)
  37. want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))}
  38. if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
  39. t.Error(err)
  40. }
  41. }
  42. {
  43. var f Feed
  44. f.Send(int(2))
  45. want := feedTypeError{op: "Subscribe", got: reflect.TypeOf(make(chan uint64)), want: reflect.TypeOf(make(chan<- int))}
  46. if err := checkPanic(want, func() { f.Subscribe(make(chan uint64)) }); err != nil {
  47. t.Error(err)
  48. }
  49. }
  50. {
  51. var f Feed
  52. if err := checkPanic(errBadChannel, func() { f.Subscribe(make(<-chan int)) }); err != nil {
  53. t.Error(err)
  54. }
  55. }
  56. {
  57. var f Feed
  58. if err := checkPanic(errBadChannel, func() { f.Subscribe(int(0)) }); err != nil {
  59. t.Error(err)
  60. }
  61. }
  62. }
  63. func checkPanic(want error, fn func()) (err error) {
  64. defer func() {
  65. panic := recover()
  66. if panic == nil {
  67. err = fmt.Errorf("didn't panic")
  68. } else if !reflect.DeepEqual(panic, want) {
  69. err = fmt.Errorf("panicked with wrong error: got %q, want %q", panic, want)
  70. }
  71. }()
  72. fn()
  73. return nil
  74. }
  75. func TestFeed(t *testing.T) {
  76. var feed Feed
  77. var done, subscribed sync.WaitGroup
  78. subscriber := func(i int) {
  79. defer done.Done()
  80. subchan := make(chan int)
  81. sub := feed.Subscribe(subchan)
  82. timeout := time.NewTimer(2 * time.Second)
  83. subscribed.Done()
  84. select {
  85. case v := <-subchan:
  86. if v != 1 {
  87. t.Errorf("%d: received value %d, want 1", i, v)
  88. }
  89. case <-timeout.C:
  90. t.Errorf("%d: receive timeout", i)
  91. }
  92. sub.Unsubscribe()
  93. select {
  94. case _, ok := <-sub.Err():
  95. if ok {
  96. t.Errorf("%d: error channel not closed after unsubscribe", i)
  97. }
  98. case <-timeout.C:
  99. t.Errorf("%d: unsubscribe timeout", i)
  100. }
  101. }
  102. const n = 1000
  103. done.Add(n)
  104. subscribed.Add(n)
  105. for i := 0; i < n; i++ {
  106. go subscriber(i)
  107. }
  108. subscribed.Wait()
  109. if nsent := feed.Send(1); nsent != n {
  110. t.Errorf("first send delivered %d times, want %d", nsent, n)
  111. }
  112. if nsent := feed.Send(2); nsent != 0 {
  113. t.Errorf("second send delivered %d times, want 0", nsent)
  114. }
  115. done.Wait()
  116. }
  117. func TestFeedSubscribeSameChannel(t *testing.T) {
  118. var (
  119. feed Feed
  120. done sync.WaitGroup
  121. ch = make(chan int)
  122. sub1 = feed.Subscribe(ch)
  123. sub2 = feed.Subscribe(ch)
  124. _ = feed.Subscribe(ch)
  125. )
  126. expectSends := func(value, n int) {
  127. if nsent := feed.Send(value); nsent != n {
  128. t.Errorf("send delivered %d times, want %d", nsent, n)
  129. }
  130. done.Done()
  131. }
  132. expectRecv := func(wantValue, n int) {
  133. for i := 0; i < n; i++ {
  134. if v := <-ch; v != wantValue {
  135. t.Errorf("received %d, want %d", v, wantValue)
  136. }
  137. }
  138. }
  139. done.Add(1)
  140. go expectSends(1, 3)
  141. expectRecv(1, 3)
  142. done.Wait()
  143. sub1.Unsubscribe()
  144. done.Add(1)
  145. go expectSends(2, 2)
  146. expectRecv(2, 2)
  147. done.Wait()
  148. sub2.Unsubscribe()
  149. done.Add(1)
  150. go expectSends(3, 1)
  151. expectRecv(3, 1)
  152. done.Wait()
  153. }
  154. func TestFeedSubscribeBlockedPost(t *testing.T) {
  155. var (
  156. feed Feed
  157. nsends = 2000
  158. ch1 = make(chan int)
  159. ch2 = make(chan int)
  160. wg sync.WaitGroup
  161. )
  162. defer wg.Wait()
  163. feed.Subscribe(ch1)
  164. wg.Add(nsends)
  165. for i := 0; i < nsends; i++ {
  166. go func() {
  167. feed.Send(99)
  168. wg.Done()
  169. }()
  170. }
  171. sub2 := feed.Subscribe(ch2)
  172. defer sub2.Unsubscribe()
  173. // We're done when ch1 has received N times.
  174. // The number of receives on ch2 depends on scheduling.
  175. for i := 0; i < nsends; {
  176. select {
  177. case <-ch1:
  178. i++
  179. case <-ch2:
  180. }
  181. }
  182. }
  183. func TestFeedUnsubscribeBlockedPost(t *testing.T) {
  184. var (
  185. feed Feed
  186. nsends = 200
  187. chans = make([]chan int, 2000)
  188. subs = make([]Subscription, len(chans))
  189. bchan = make(chan int)
  190. bsub = feed.Subscribe(bchan)
  191. wg sync.WaitGroup
  192. )
  193. for i := range chans {
  194. chans[i] = make(chan int, nsends)
  195. }
  196. // Queue up some Sends. None of these can make progress while bchan isn't read.
  197. wg.Add(nsends)
  198. for i := 0; i < nsends; i++ {
  199. go func() {
  200. feed.Send(99)
  201. wg.Done()
  202. }()
  203. }
  204. // Subscribe the other channels.
  205. for i, ch := range chans {
  206. subs[i] = feed.Subscribe(ch)
  207. }
  208. // Unsubscribe them again.
  209. for _, sub := range subs {
  210. sub.Unsubscribe()
  211. }
  212. // Unblock the Sends.
  213. bsub.Unsubscribe()
  214. wg.Wait()
  215. }
  216. // Checks that unsubscribing a channel during Send works even if that
  217. // channel has already been sent on.
  218. func TestFeedUnsubscribeSentChan(t *testing.T) {
  219. var (
  220. feed Feed
  221. ch1 = make(chan int)
  222. ch2 = make(chan int)
  223. sub1 = feed.Subscribe(ch1)
  224. sub2 = feed.Subscribe(ch2)
  225. wg sync.WaitGroup
  226. )
  227. defer sub2.Unsubscribe()
  228. wg.Add(1)
  229. go func() {
  230. feed.Send(0)
  231. wg.Done()
  232. }()
  233. // Wait for the value on ch1.
  234. <-ch1
  235. // Unsubscribe ch1, removing it from the send cases.
  236. sub1.Unsubscribe()
  237. // Receive ch2, finishing Send.
  238. <-ch2
  239. wg.Wait()
  240. // Send again. This should send to ch2 only, so the wait group will unblock
  241. // as soon as a value is received on ch2.
  242. wg.Add(1)
  243. go func() {
  244. feed.Send(0)
  245. wg.Done()
  246. }()
  247. <-ch2
  248. wg.Wait()
  249. }
  250. func TestFeedUnsubscribeFromInbox(t *testing.T) {
  251. var (
  252. feed Feed
  253. ch1 = make(chan int)
  254. ch2 = make(chan int)
  255. sub1 = feed.Subscribe(ch1)
  256. sub2 = feed.Subscribe(ch1)
  257. sub3 = feed.Subscribe(ch2)
  258. )
  259. if len(feed.inbox) != 3 {
  260. t.Errorf("inbox length != 3 after subscribe")
  261. }
  262. if len(feed.sendCases) != 1 {
  263. t.Errorf("sendCases is non-empty after unsubscribe")
  264. }
  265. sub1.Unsubscribe()
  266. sub2.Unsubscribe()
  267. sub3.Unsubscribe()
  268. if len(feed.inbox) != 0 {
  269. t.Errorf("inbox is non-empty after unsubscribe")
  270. }
  271. if len(feed.sendCases) != 1 {
  272. t.Errorf("sendCases is non-empty after unsubscribe")
  273. }
  274. }
  275. func BenchmarkFeedSend1000(b *testing.B) {
  276. var (
  277. done sync.WaitGroup
  278. feed Feed
  279. nsubs = 1000
  280. )
  281. subscriber := func(ch <-chan int) {
  282. for i := 0; i < b.N; i++ {
  283. <-ch
  284. }
  285. done.Done()
  286. }
  287. done.Add(nsubs)
  288. for i := 0; i < nsubs; i++ {
  289. ch := make(chan int, 200)
  290. feed.Subscribe(ch)
  291. go subscriber(ch)
  292. }
  293. // The actual benchmark.
  294. b.ResetTimer()
  295. for i := 0; i < b.N; i++ {
  296. if feed.Send(i) != nsubs {
  297. panic("wrong number of sends")
  298. }
  299. }
  300. b.StopTimer()
  301. done.Wait()
  302. }