beacon.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package beacon
  7. import (
  8. "context"
  9. "fmt"
  10. "net"
  11. "time"
  12. "github.com/thejerf/suture/v4"
  13. "github.com/syncthing/syncthing/lib/svcutil"
  14. )
  15. type recv struct {
  16. data []byte
  17. src net.Addr
  18. }
  19. type Interface interface {
  20. suture.Service
  21. fmt.Stringer
  22. Send(data []byte)
  23. Recv() ([]byte, net.Addr)
  24. Error() error
  25. }
  26. type cast struct {
  27. *suture.Supervisor
  28. name string
  29. reader svcutil.ServiceWithError
  30. writer svcutil.ServiceWithError
  31. outbox chan recv
  32. inbox chan []byte
  33. stopped chan struct{}
  34. }
  35. // newCast creates a base object for multi- or broadcasting. Afterwards the
  36. // caller needs to set reader and writer with the addReader and addWriter
  37. // methods to get a functional implementation of Interface.
  38. func newCast(name string) *cast {
  39. // Only log restarts in debug mode.
  40. spec := svcutil.SpecWithDebugLogger(l)
  41. // Don't retry too frenetically: an error to open a socket or
  42. // whatever is usually something that is either permanent or takes
  43. // a while to get solved...
  44. spec.FailureThreshold = 2
  45. spec.FailureBackoff = 60 * time.Second
  46. c := &cast{
  47. Supervisor: suture.New(name, spec),
  48. name: name,
  49. inbox: make(chan []byte),
  50. outbox: make(chan recv, 16),
  51. stopped: make(chan struct{}),
  52. }
  53. svcutil.OnSupervisorDone(c.Supervisor, func() { close(c.stopped) })
  54. return c
  55. }
  56. func (c *cast) addReader(svc func(context.Context) error) {
  57. c.reader = c.createService(svc, "reader")
  58. c.Add(c.reader)
  59. }
  60. func (c *cast) addWriter(svc func(ctx context.Context) error) {
  61. c.writer = c.createService(svc, "writer")
  62. c.Add(c.writer)
  63. }
  64. func (c *cast) createService(svc func(context.Context) error, suffix string) svcutil.ServiceWithError {
  65. return svcutil.AsService(svc, fmt.Sprintf("%s/%s", c, suffix))
  66. }
  67. func (c *cast) String() string {
  68. return fmt.Sprintf("%s@%p", c.name, c)
  69. }
  70. func (c *cast) Send(data []byte) {
  71. select {
  72. case c.inbox <- data:
  73. case <-c.stopped:
  74. }
  75. }
  76. func (c *cast) Recv() ([]byte, net.Addr) {
  77. select {
  78. case recv := <-c.outbox:
  79. return recv.data, recv.src
  80. case <-c.stopped:
  81. }
  82. return nil, nil
  83. }
  84. func (c *cast) Error() error {
  85. if err := c.reader.Error(); err != nil {
  86. return err
  87. }
  88. return c.writer.Error()
  89. }