subscription_test.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package event
  17. import (
  18. "context"
  19. "errors"
  20. "testing"
  21. "time"
  22. )
  23. var errInts = errors.New("error in subscribeInts")
  24. func subscribeInts(max, fail int, c chan<- int) Subscription {
  25. return NewSubscription(func(quit <-chan struct{}) error {
  26. for i := 0; i < max; i++ {
  27. if i >= fail {
  28. return errInts
  29. }
  30. select {
  31. case c <- i:
  32. case <-quit:
  33. return nil
  34. }
  35. }
  36. return nil
  37. })
  38. }
  39. func TestNewSubscriptionError(t *testing.T) {
  40. t.Parallel()
  41. channel := make(chan int)
  42. sub := subscribeInts(10, 2, channel)
  43. loop:
  44. for want := 0; want < 10; want++ {
  45. select {
  46. case got := <-channel:
  47. if got != want {
  48. t.Fatalf("wrong int %d, want %d", got, want)
  49. }
  50. case err := <-sub.Err():
  51. if err != errInts {
  52. t.Fatalf("wrong error: got %q, want %q", err, errInts)
  53. }
  54. if want != 2 {
  55. t.Fatalf("got errInts at int %d, should be received at 2", want)
  56. }
  57. break loop
  58. }
  59. }
  60. sub.Unsubscribe()
  61. err, ok := <-sub.Err()
  62. if err != nil {
  63. t.Fatal("got non-nil error after Unsubscribe")
  64. }
  65. if ok {
  66. t.Fatal("channel still open after Unsubscribe")
  67. }
  68. }
  69. func TestResubscribe(t *testing.T) {
  70. t.Parallel()
  71. var i int
  72. nfails := 6
  73. sub := Resubscribe(100*time.Millisecond, func(ctx context.Context) (Subscription, error) {
  74. // fmt.Printf("call #%d @ %v\n", i, time.Now())
  75. i++
  76. if i == 2 {
  77. // Delay the second failure a bit to reset the resubscribe interval.
  78. time.Sleep(200 * time.Millisecond)
  79. }
  80. if i < nfails {
  81. return nil, errors.New("oops")
  82. }
  83. sub := NewSubscription(func(unsubscribed <-chan struct{}) error { return nil })
  84. return sub, nil
  85. })
  86. <-sub.Err()
  87. if i != nfails {
  88. t.Fatalf("resubscribe function called %d times, want %d times", i, nfails)
  89. }
  90. }
  91. func TestResubscribeAbort(t *testing.T) {
  92. t.Parallel()
  93. done := make(chan error)
  94. sub := Resubscribe(0, func(ctx context.Context) (Subscription, error) {
  95. select {
  96. case <-ctx.Done():
  97. done <- nil
  98. case <-time.After(2 * time.Second):
  99. done <- errors.New("context given to resubscribe function not canceled within 2s")
  100. }
  101. return nil, nil
  102. })
  103. sub.Unsubscribe()
  104. if err := <-done; err != nil {
  105. t.Fatal(err)
  106. }
  107. }