tryGossip.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package main
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "encoding/json"
  6. "strings"
  7. "time"
  8. "cryptoscope.co/go/luigi"
  9. "cryptoscope.co/go/muxrpc"
  10. "cryptoscope.co/go/muxrpc/codec"
  11. "cryptoscope.co/go/secretstream"
  12. "github.com/cryptix/go/debug"
  13. kitlog "github.com/go-kit/kit/log"
  14. "github.com/pkg/errors"
  15. )
  16. func ssbTryGossip(ctx context.Context, addrWithKey string) error {
  17. start := time.Now()
  18. args := strings.SplitN(addrWithKey, ":", 3)
  19. if len(args) != 3 {
  20. return errors.Errorf("tryGossip: expected host:port:key. got: %v", args)
  21. }
  22. c, err := secretstream.NewClient(*localKey, sbotAppKey)
  23. if err != nil {
  24. return err
  25. }
  26. var rk = args[2]
  27. var remotPubKey = localKey.Public
  28. rk = strings.TrimSuffix(rk, ".ed25519")
  29. rk = strings.TrimPrefix(rk, "@")
  30. rpk, err := base64.StdEncoding.DecodeString(rk)
  31. if err != nil {
  32. return errors.Wrapf(err, "tryGossip: base64 decode of remoteKey failed")
  33. }
  34. copy(remotPubKey[:], rpk)
  35. d, err := c.NewDialer(remotPubKey)
  36. if err != nil {
  37. return err
  38. }
  39. addr := strings.Join(args[:2], ":")
  40. conn, err := d("tcp", addr)
  41. if err != nil {
  42. return errors.Wrapf(err, "tryGossip: dialing %q failed", addr)
  43. }
  44. log.Log("try", "gossip", "addr", addr)
  45. counter := debug.WrapCounter(conn)
  46. p := muxrpc.NewPacker(counter)
  47. if verboseLogging {
  48. p = muxrpc.NewPacker(codec.Wrap(kitlog.With(log, "id", args[2]), counter))
  49. }
  50. gossipHandler := sbotHandler{args[2]}
  51. rpc := muxrpc.Handle(p, gossipHandler)
  52. go serveRpc(ctx, start, args[2], rpc, counter)
  53. type msg struct {
  54. Key string `json:"key"`
  55. Value struct {
  56. Author string `json:"author"`
  57. Signature string `json:"signature"`
  58. Timestamp float64 `json:"timestamp"`
  59. Seq float64 `json:"sequence"`
  60. Content json.RawMessage `json:"content"`
  61. } `json:"value"`
  62. }
  63. type histArg struct {
  64. Id string `json:"id"`
  65. Seq int `json:"seq"`
  66. Keys bool `json:"keys"`
  67. Live bool `json:"live"`
  68. }
  69. src, err := rpc.Source(ctx, msg{}, []string{"createHistoryStream"}, histArg{Id: localID, Seq: 0, Keys: true})
  70. if err != nil {
  71. return errors.Wrapf(err, "tryGossip: createHistoryStream failed")
  72. }
  73. var closed bool
  74. for !closed {
  75. v, err := src.Next(ctx)
  76. if err != nil {
  77. if luigi.IsEOS(err) {
  78. closed = true
  79. continue
  80. } else {
  81. return errors.Wrapf(err, "tryGossip: src.Next failed")
  82. }
  83. }
  84. m := v.(msg)
  85. log.Log("draining", "myHist", "seq", m.Value.Seq, "k", m.Key)
  86. }
  87. return nil
  88. }