main.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. package main
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "fmt"
  6. "net"
  7. "os"
  8. "os/user"
  9. "path/filepath"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "cryptoscope.co/go/binpath"
  14. "cryptoscope.co/go/specialκ"
  15. "cryptoscope.co/go/specialκ/persistent"
  16. "github.com/cryptix/go/logging"
  17. "github.com/cryptix/secretstream"
  18. "github.com/cryptix/secretstream/secrethandshake"
  19. "github.com/dgraph-io/badger"
  20. kitlog "github.com/go-kit/kit/log"
  21. "github.com/pkg/errors"
  22. "gopkg.in/urfave/cli.v2"
  23. "scuttlebot.io/go/muxrpc"
  24. "scuttlebot.io/go/muxrpc/codec"
  25. )
  26. var (
  27. sbotAppKey []byte
  28. defaultKeyFile string
  29. client *muxrpc.Client
  30. log logging.Interface
  31. check = logging.CheckFatal
  32. bdb *badger.DB
  33. pstore specialκ.MFR
  34. Revision = "unset"
  35. )
  36. func init() {
  37. var err error
  38. sbotAppKey, err = base64.StdEncoding.DecodeString("1KHLiKZvAvjbY1ziZEHMXawbCEIM6qwjCDm3VYRan/s=")
  39. check(err)
  40. u, err := user.Current()
  41. check(err)
  42. defaultKeyFile = filepath.Join(u.HomeDir, ".ssb", "secret")
  43. }
  44. func initClient(ctx *cli.Context) error {
  45. localKey, err := secrethandshake.LoadSSBKeyPair(ctx.String("key"))
  46. if err != nil {
  47. return err
  48. }
  49. var conn net.Conn
  50. c, err := secretstream.NewClient(*localKey, sbotAppKey)
  51. if err != nil {
  52. return err
  53. }
  54. var remotPubKey = localKey.Public
  55. if rk := ctx.String("remoteKey"); rk != "" {
  56. rk = strings.TrimSuffix(rk, ".ed25519")
  57. rk = strings.TrimPrefix(rk, "@")
  58. rpk, err := base64.StdEncoding.DecodeString(rk)
  59. if err != nil {
  60. return errors.Wrapf(err, "ssb-gophbot: base64 decode of --remoteKey failed")
  61. }
  62. copy(remotPubKey[:], rpk)
  63. }
  64. d, err := c.NewDialer(remotPubKey)
  65. if err != nil {
  66. return err
  67. }
  68. conn, err = d("tcp", ctx.String("addr"))
  69. if err != nil {
  70. return err
  71. }
  72. if ctx.Bool("verbose") {
  73. client = muxrpc.NewClient(log, codec.Wrap(log, conn))
  74. } else {
  75. client = muxrpc.NewClient(log, conn)
  76. }
  77. go func() {
  78. client.Handle()
  79. log.Log("warning", "muxrpc disconnected")
  80. }()
  81. opts := badger.DefaultOptions
  82. opts.Dir = ctx.String("db")
  83. opts.ValueDir = ctx.String("db")
  84. bdb, err = badger.Open(opts)
  85. check(err)
  86. pstore = persistent.New(persistent.JSONCodec, bdb, log)
  87. return nil
  88. }
  89. func main() {
  90. logging.SetupLogging(nil)
  91. log = logging.Logger("gophermit")
  92. app := cli.App{
  93. Name: "ssb-gophermit",
  94. Usage: "a panoptical hermit in go",
  95. Version: "alpha1",
  96. }
  97. cli.VersionPrinter = func(c *cli.Context) {
  98. // go install -ldflags="-X main.Revision=$(git rev-parse HEAD)"
  99. fmt.Printf("%s ( rev: %s )\n", c.App.Version, Revision)
  100. }
  101. app.Flags = []cli.Flag{
  102. &cli.StringFlag{Name: "db", Value: "./db"},
  103. &cli.StringFlag{Name: "addr", Value: "localhost:8008", Usage: "tcp address of the sbot to connect to (or listen on)"},
  104. &cli.StringFlag{Name: "remoteKey", Value: "", Usage: "the remote pubkey you are connecting to (by default the local key)"},
  105. &cli.StringFlag{Name: "key,k", Value: defaultKeyFile},
  106. &cli.BoolFlag{Name: "verbose,vv", Usage: "print muxrpc packets"},
  107. }
  108. app.Before = initClient
  109. app.Commands = []*cli.Command{
  110. {
  111. Name: "ls",
  112. Action: lsCmd,
  113. },
  114. {
  115. Name: "slurp",
  116. Action: slurpCmd,
  117. Flags: []cli.Flag{
  118. &cli.StringFlag{Name: "id", Value: "@p13zSAiOpguI9nsawkGijsnMfWmFd5rlUNpzekEE+vI=.ed25519"},
  119. &cli.IntFlag{Name: "limit", Value: -1},
  120. &cli.IntFlag{Name: "seq", Value: 0},
  121. },
  122. },
  123. {
  124. Name: "purge",
  125. Action: purgeCmd,
  126. },
  127. }
  128. check(app.Run(os.Args))
  129. }
  130. func purgeCmd(ctx *cli.Context) error {
  131. check(bdb.PurgeOlderVersions())
  132. r, err := strconv.ParseFloat(ctx.Args().Get(0), 10)
  133. check(err)
  134. check(bdb.RunValueLogGC(r))
  135. return bdb.Close()
  136. }
  137. func lsCmd(ctx *cli.Context) error {
  138. pref, err := binpath.ParseString(ctx.Args().First())
  139. if err != nil {
  140. return err
  141. }
  142. opt := badger.DefaultIteratorOptions
  143. opt.PrefetchSize = 50
  144. return bdb.View(func(txn *badger.Txn) error {
  145. it := txn.NewIterator(opt)
  146. for it.Seek(pref); it.ValidForPrefix(pref); it.Next() {
  147. i := it.Item()
  148. v, err := i.Value()
  149. if err != nil {
  150. return err
  151. }
  152. fmt.Printf("%s: %d %x\n", binpath.Path(i.Key()), len(v), i.UserMeta())
  153. }
  154. return nil
  155. })
  156. }
  157. type ssbMsg map[string]interface{}
  158. func slurpCmd(c *cli.Context) error {
  159. start := time.Now()
  160. emitter, src := pstore.Pair(ssbMsg{})
  161. ctx := context.TODO()
  162. specialκ.Then(ctx, src, map[string]specialκ.Sink{
  163. "author": pstore.Map(ssbMsg{}, func(_ context.Context, e specialκ.Entry) specialκ.Entry {
  164. msg, ok := e.Value.(ssbMsg)
  165. if ok {
  166. author := c2m(msg, "value")["author"].(string)
  167. seq := c2m(msg, "value")["sequence"].(float64)
  168. e.Prefix = binpath.JoinStrings("author", author)
  169. e.Key = binpath.FromUint64(uint64(seq))
  170. e.Value = e.Seq
  171. }
  172. return e
  173. }),
  174. "type": pstore.Map(ssbMsg{}, func(_ context.Context, e specialκ.Entry) specialκ.Entry {
  175. msg, ok := e.Value.(ssbMsg)
  176. if ok {
  177. content := c2m(msg, "value", "content")
  178. var t string
  179. if content == nil {
  180. t = "string"
  181. } else {
  182. t = content["type"].(string)
  183. }
  184. e.Prefix = binpath.JoinStrings("type", t)
  185. e.Key = binpath.FromString(msg["key"].(string))
  186. // TODO: reduce to ssb-host struct
  187. }
  188. return e
  189. }),
  190. "pub": pstore.Filter(ssbMsg{}, func(_ context.Context, e specialκ.Entry) bool {
  191. msg, ok := e.Value.(ssbMsg)
  192. if ok {
  193. content := c2m(msg, "value", "content")
  194. if content == nil {
  195. return false
  196. }
  197. if t := content["type"].(string); t == "pub" {
  198. return true
  199. }
  200. }
  201. return false
  202. }),
  203. }, kitlog.NewNopLogger())
  204. var i uint64
  205. msgs := make(chan ssbMsg)
  206. wait := make(chan bool)
  207. go func() {
  208. last := time.Now()
  209. for r := range msgs {
  210. emitter.Emit(ctx, specialκ.Entry{
  211. Seq: i,
  212. Value: r,
  213. Key: binpath.FromString(r["key"].(string)),
  214. })
  215. i++
  216. if i%1000 == 0 {
  217. log.Log("msg", "processed", "i", i, "took", fmt.Sprintf("%v", time.Since(last)))
  218. last = time.Now()
  219. }
  220. }
  221. wait <- true
  222. }()
  223. opts := map[string]interface{}{
  224. "id": c.String("id"),
  225. "limit": c.Int("limit"),
  226. "seq": c.Int("seq"),
  227. }
  228. if err := client.Source("createHistoryStream", msgs, opts); err != nil {
  229. log.Log("warning", errors.Wrap(err, "source stream call failed"))
  230. }
  231. close(msgs)
  232. log.Log("done", "slurp", "msgs", i-1, "id", c.String("id"), "took", fmt.Sprintf("%v", time.Since(start)))
  233. <-wait
  234. check(bdb.Close())
  235. return client.Close()
  236. }
  237. func c2m(v map[string]interface{}, fields ...string) map[string]interface{} {
  238. var ok bool
  239. for _, f := range fields {
  240. v, ok = v[f].(map[string]interface{})
  241. if !ok {
  242. return nil
  243. }
  244. }
  245. return v
  246. }