sbotHandler.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "cryptoscope.co/go/luigi"
  9. "cryptoscope.co/go/muxrpc"
  10. "cryptoscope.co/go/ssb"
  11. "github.com/pkg/errors"
  12. )
  13. type sbotHandler struct {
  14. remoteID string
  15. }
  16. type retWhoami struct {
  17. ID string `json:"id"`
  18. }
  19. type createHistArgs struct {
  20. //map[keys:false id:@Bqm7bG4qvlnWh3BEBFSj2kDr+ 30+mUU3hRgrikE2+xc=.ed25519 seq:20 live:true
  21. Keys bool `json:"keys"`
  22. Live bool `json:"live"`
  23. Id string `json:"id"`
  24. Seq int `json:"seq"`
  25. }
  26. func (h sbotHandler) HandleCall(ctx context.Context, req *muxrpc.Request) {
  27. // TODO: push manifest check into muxrpc
  28. if req.Type == "" {
  29. req.Type = "async"
  30. }
  31. switch m := strings.Join(req.Method, "."); m {
  32. case "whoami":
  33. err := req.Return(ctx, retWhoami{"heinbloed"})
  34. if err != nil {
  35. log.Log("call", "whoami", "err", err)
  36. }
  37. case "gossip.ping":
  38. //todo: read args
  39. go func() {
  40. for i := 0; i < 3; i++ {
  41. err := req.Stream.Pour(ctx, time.Now().Unix())
  42. if err != nil {
  43. log.Log("call", "gossip.ping", "err", err)
  44. req.Stream.CloseWithError(errors.Wrap(err, "failed gossiping"))
  45. return
  46. }
  47. log.Log("call", "gossip.ping", "pong", i)
  48. time.Sleep(1 * time.Second)
  49. }
  50. req.Stream.Close()
  51. }()
  52. for {
  53. v, err := req.Stream.Next(ctx)
  54. if err != nil {
  55. log.Log("call", "gossip.ping", "err", err)
  56. req.Stream.CloseWithError(errors.Wrap(err, "failed gossiping"))
  57. return
  58. }
  59. log.Log("call", "gossip.ping", "ping", v)
  60. }
  61. case "gossip.connect":
  62. if len(req.Args) != 1 {
  63. req.Stream.CloseWithError(errors.Errorf("bad request"))
  64. return
  65. }
  66. addr := req.Args[0].(string)
  67. ret := make(map[string]interface{})
  68. ret["addr"] = addr
  69. err := ssbTryGossip(ctx, addr)
  70. if err != nil {
  71. log.Log("try", "gossip.connect", "err", err)
  72. req.Stream.CloseWithError(errors.Wrap(err, "failed gossiping"))
  73. return
  74. } else {
  75. ret["worked"] = true
  76. }
  77. err = req.Return(ctx, ret)
  78. if err != nil {
  79. log.Log("call", "gossip.connect", "err", err)
  80. }
  81. case "createHistoryStream":
  82. if len(req.Args) != 1 {
  83. req.Stream.CloseWithError(errors.Errorf("bad request"))
  84. return
  85. }
  86. _, ok := req.Args[0].(map[string]interface{})
  87. if !ok {
  88. log.Log("call", "createHistoryStream", "err", "bad call", "tipe", fmt.Sprintf("%T", req.Args[0]))
  89. req.Stream.CloseWithError(errors.Errorf("bad args"))
  90. return
  91. }
  92. //var qargs createHistArgs
  93. //qargs.Keys = qmap["keys"].(bool)
  94. //qargs.Live = qmap["live"].(bool)
  95. //qargs.Seq = int(qmap["seq"].(float64))
  96. //qargs.Id = qmap["id"].(string)
  97. //fmt.Println("createHist", qargs)
  98. req.Stream.Close()
  99. default:
  100. log.Log("warning", "unhandled call", "method", m, "args", fmt.Sprintf("%+v", req.Args))
  101. err := errors.Errorf("unhandled call: %s", m)
  102. // TODO: illegal for async calls to close with Stream
  103. req.Stream.CloseWithError(err)
  104. }
  105. }
  106. type RawSignedMessage struct {
  107. json.RawMessage
  108. }
  109. func (h sbotHandler) HandleConnect(ctx context.Context, e muxrpc.Endpoint) {
  110. var q = createHistArgs{false, false, h.remoteID, 0}
  111. source, err := e.Source(ctx, RawSignedMessage{}, []string{"createHistoryStream"}, q)
  112. if err != nil {
  113. log.Log("handleConnect", "createHistoryStream", "err", err)
  114. return
  115. }
  116. i := 0
  117. ref, err := ssb.ParseRef(h.remoteID)
  118. if err != nil {
  119. log.Log("handleConnect", "ssb.ParseRef", "err", err)
  120. return
  121. }
  122. for {
  123. start := time.Now()
  124. v, err := source.Next(ctx)
  125. if luigi.IsEOS(err) {
  126. break
  127. }
  128. if err != nil {
  129. log.Log("handleConnect", "createHistoryStream", "i", i, "err", err)
  130. break
  131. }
  132. rmsg := v.(RawSignedMessage)
  133. buf, sig, err := ssb.EncodePreserveOrder(rmsg.RawMessage)
  134. if err != nil {
  135. err = errors.Wrap(err, "simple Encode failed")
  136. log.Log("handleConnect", "createHistoryStream", "i", i, "err", err)
  137. break
  138. }
  139. if err := sig.Verify(buf, *ref); err != nil {
  140. err = errors.Wrap(err, "msg verify failed")
  141. log.Log("handleConnect", "createHistoryStream", "i", i, "err", err)
  142. break
  143. }
  144. fmt.Printf("\n####\nverified hist%d (took %v):\n%s\n", i, time.Since(start), buf)
  145. i++
  146. }
  147. log.Log("handle", "connect", "Hello", h.remoteID)
  148. }
  149. type retPing struct {
  150. Pong string
  151. }
  152. func (h sbotHandler) GossipPing(timout int) retPing {
  153. return retPing{"test"}
  154. }