events.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package fn
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. )
  8. const (
  9. // DefaultQueueSize is the default size to use for concurrent queues.
  10. DefaultQueueSize = 10
  11. )
  12. var (
  13. // nextID is the next subscription ID that will be used for a new event
  14. // receiver. This MUST be used atomically.
  15. nextID uint64
  16. )
  17. // EventReceiver is a struct type that holds two queues for new and removed
  18. // items respectively.
  19. type EventReceiver[T any] struct {
  20. // id is the internal process-unique ID of the subscription.
  21. id uint64
  22. // NewItemCreated is sent to when a new item was created successfully.
  23. NewItemCreated *ConcurrentQueue[T]
  24. // ItemRemoved is sent to when an existing item was removed.
  25. ItemRemoved *ConcurrentQueue[T]
  26. }
  27. // ID returns the internal process-unique ID of the subscription.
  28. func (e *EventReceiver[T]) ID() uint64 {
  29. return e.id
  30. }
  31. // Stop stops the receiver from processing events.
  32. func (e *EventReceiver[T]) Stop() {
  33. e.NewItemCreated.Stop()
  34. e.ItemRemoved.Stop()
  35. }
  36. // NewEventReceiver creates a new event receiver with concurrent queues of the
  37. // given size.
  38. func NewEventReceiver[T any](queueSize int) *EventReceiver[T] {
  39. created := NewConcurrentQueue[T](queueSize)
  40. created.Start()
  41. removed := NewConcurrentQueue[T](queueSize)
  42. removed.Start()
  43. id := atomic.AddUint64(&nextID, 1)
  44. return &EventReceiver[T]{
  45. id: id,
  46. NewItemCreated: created,
  47. ItemRemoved: removed,
  48. }
  49. }
  50. // EventPublisher is an interface type for a component that offers event based
  51. // subscriptions for publishing events.
  52. type EventPublisher[T any, Q any] interface {
  53. // RegisterSubscriber adds a new subscriber for receiving events. The
  54. // deliverExisting boolean indicates whether already existing items
  55. // should be sent to the NewItemCreated channel when the subscription is
  56. // started. An optional deliverFrom can be specified to indicate from
  57. // which timestamp/index/marker onward existing items should be
  58. // delivered on startup. If deliverFrom is nil/zero/empty then all
  59. // existing items will be delivered.
  60. RegisterSubscriber(receiver *EventReceiver[T], deliverExisting bool,
  61. deliverFrom Q) error
  62. // RemoveSubscriber removes the given subscriber and also stops it from
  63. // processing events.
  64. RemoveSubscriber(subscriber *EventReceiver[T]) error
  65. }
  66. // Event is a generic event that can be sent to a subscriber.
  67. type Event interface {
  68. Timestamp() time.Time
  69. }
  70. // EventDistributor is a struct type that helps to distribute events to multiple
  71. // subscribers.
  72. type EventDistributor[T any] struct {
  73. // subscribers is a map of components that want to be notified on new
  74. // events, keyed by their subscription ID.
  75. subscribers map[uint64]*EventReceiver[T]
  76. // subscriberMtx guards the subscribers map and access to the
  77. // subscriptionID.
  78. subscriberMtx sync.Mutex
  79. }
  80. // NewEventDistributor creates a new event distributor of the declared type.
  81. func NewEventDistributor[T any]() *EventDistributor[T] {
  82. return &EventDistributor[T]{
  83. subscribers: make(map[uint64]*EventReceiver[T]),
  84. }
  85. }
  86. // RegisterSubscriber adds a new subscriber for receiving events.
  87. func (d *EventDistributor[T]) RegisterSubscriber(subscriber *EventReceiver[T]) {
  88. d.subscriberMtx.Lock()
  89. defer d.subscriberMtx.Unlock()
  90. d.subscribers[subscriber.ID()] = subscriber
  91. }
  92. // RemoveSubscriber removes the given subscriber and also stops it from
  93. // processing events.
  94. func (d *EventDistributor[T]) RemoveSubscriber(
  95. subscriber *EventReceiver[T]) error {
  96. d.subscriberMtx.Lock()
  97. defer d.subscriberMtx.Unlock()
  98. _, ok := d.subscribers[subscriber.ID()]
  99. if !ok {
  100. return fmt.Errorf("subscriber with ID %d not found",
  101. subscriber.ID())
  102. }
  103. subscriber.Stop()
  104. delete(d.subscribers, subscriber.ID())
  105. return nil
  106. }
  107. // NotifySubscribers sends the given events to all subscribers.
  108. func (d *EventDistributor[T]) NotifySubscribers(events ...T) {
  109. d.subscriberMtx.Lock()
  110. for i := range events {
  111. event := events[i]
  112. for id := range d.subscribers {
  113. d.subscribers[id].NewItemCreated.ChanIn() <- event
  114. }
  115. }
  116. d.subscriberMtx.Unlock()
  117. }