waitingproof.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package channeldb
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "sync"
  8. "github.com/go-errors/errors"
  9. "github.com/lightningnetwork/lnd/kvdb"
  10. "github.com/lightningnetwork/lnd/lnwire"
  11. )
  12. var (
  13. // waitingProofsBucketKey byte string name of the waiting proofs store.
  14. waitingProofsBucketKey = []byte("waitingproofs")
  15. // ErrWaitingProofNotFound is returned if waiting proofs haven't been
  16. // found by db.
  17. ErrWaitingProofNotFound = errors.New("waiting proofs haven't been " +
  18. "found")
  19. // ErrWaitingProofAlreadyExist is returned if waiting proofs haven't been
  20. // found by db.
  21. ErrWaitingProofAlreadyExist = errors.New("waiting proof with such " +
  22. "key already exist")
  23. )
  24. // WaitingProofStore is the bold db map-like storage for half announcement
  25. // signatures. The one responsibility of this storage is to be able to
  26. // retrieve waiting proofs after client restart.
  27. type WaitingProofStore struct {
  28. // cache is used in order to reduce the number of redundant get
  29. // calls, when object isn't stored in it.
  30. cache map[WaitingProofKey]struct{}
  31. db kvdb.Backend
  32. mu sync.RWMutex
  33. }
  34. // NewWaitingProofStore creates new instance of proofs storage.
  35. func NewWaitingProofStore(db kvdb.Backend) (*WaitingProofStore, error) {
  36. s := &WaitingProofStore{
  37. db: db,
  38. }
  39. if err := s.ForAll(func(proof *WaitingProof) error {
  40. s.cache[proof.Key()] = struct{}{}
  41. return nil
  42. }, func() {
  43. s.cache = make(map[WaitingProofKey]struct{})
  44. }); err != nil && err != ErrWaitingProofNotFound {
  45. return nil, err
  46. }
  47. return s, nil
  48. }
  49. // Add adds new waiting proof in the storage.
  50. func (s *WaitingProofStore) Add(proof *WaitingProof) error {
  51. s.mu.Lock()
  52. defer s.mu.Unlock()
  53. err := kvdb.Update(s.db, func(tx kvdb.RwTx) error {
  54. var err error
  55. var b bytes.Buffer
  56. // Get or create the bucket.
  57. bucket, err := tx.CreateTopLevelBucket(waitingProofsBucketKey)
  58. if err != nil {
  59. return err
  60. }
  61. // Encode the objects and place it in the bucket.
  62. if err := proof.Encode(&b); err != nil {
  63. return err
  64. }
  65. key := proof.Key()
  66. return bucket.Put(key[:], b.Bytes())
  67. }, func() {})
  68. if err != nil {
  69. return err
  70. }
  71. // Knowing that the write succeeded, we can now update the in-memory
  72. // cache with the proof's key.
  73. s.cache[proof.Key()] = struct{}{}
  74. return nil
  75. }
  76. // Remove removes the proof from storage by its key.
  77. func (s *WaitingProofStore) Remove(key WaitingProofKey) error {
  78. s.mu.Lock()
  79. defer s.mu.Unlock()
  80. if _, ok := s.cache[key]; !ok {
  81. return ErrWaitingProofNotFound
  82. }
  83. err := kvdb.Update(s.db, func(tx kvdb.RwTx) error {
  84. // Get or create the top bucket.
  85. bucket := tx.ReadWriteBucket(waitingProofsBucketKey)
  86. if bucket == nil {
  87. return ErrWaitingProofNotFound
  88. }
  89. return bucket.Delete(key[:])
  90. }, func() {})
  91. if err != nil {
  92. return err
  93. }
  94. // Since the proof was successfully deleted from the store, we can now
  95. // remove it from the in-memory cache.
  96. delete(s.cache, key)
  97. return nil
  98. }
  99. // ForAll iterates thought all waiting proofs and passing the waiting proof
  100. // in the given callback.
  101. func (s *WaitingProofStore) ForAll(cb func(*WaitingProof) error,
  102. reset func()) error {
  103. return kvdb.View(s.db, func(tx kvdb.RTx) error {
  104. bucket := tx.ReadBucket(waitingProofsBucketKey)
  105. if bucket == nil {
  106. return ErrWaitingProofNotFound
  107. }
  108. // Iterate over objects buckets.
  109. return bucket.ForEach(func(k, v []byte) error {
  110. // Skip buckets fields.
  111. if v == nil {
  112. return nil
  113. }
  114. r := bytes.NewReader(v)
  115. proof := &WaitingProof{}
  116. if err := proof.Decode(r); err != nil {
  117. return err
  118. }
  119. return cb(proof)
  120. })
  121. }, reset)
  122. }
  123. // Get returns the object which corresponds to the given index.
  124. func (s *WaitingProofStore) Get(key WaitingProofKey) (*WaitingProof, error) {
  125. var proof *WaitingProof
  126. s.mu.RLock()
  127. defer s.mu.RUnlock()
  128. if _, ok := s.cache[key]; !ok {
  129. return nil, ErrWaitingProofNotFound
  130. }
  131. err := kvdb.View(s.db, func(tx kvdb.RTx) error {
  132. bucket := tx.ReadBucket(waitingProofsBucketKey)
  133. if bucket == nil {
  134. return ErrWaitingProofNotFound
  135. }
  136. // Iterate over objects buckets.
  137. v := bucket.Get(key[:])
  138. if v == nil {
  139. return ErrWaitingProofNotFound
  140. }
  141. r := bytes.NewReader(v)
  142. return proof.Decode(r)
  143. }, func() {
  144. proof = &WaitingProof{}
  145. })
  146. return proof, err
  147. }
  148. // WaitingProofKey is the proof key which uniquely identifies the waiting
  149. // proof object. The goal of this key is distinguish the local and remote
  150. // proof for the same channel id.
  151. type WaitingProofKey [9]byte
  152. // WaitingProof is the storable object, which encapsulate the half proof and
  153. // the information about from which side this proof came. This structure is
  154. // needed to make channel proof exchange persistent, so that after client
  155. // restart we may receive remote/local half proof and process it.
  156. type WaitingProof struct {
  157. *lnwire.AnnounceSignatures
  158. isRemote bool
  159. }
  160. // NewWaitingProof constructs a new waiting prof instance.
  161. func NewWaitingProof(isRemote bool, proof *lnwire.AnnounceSignatures) *WaitingProof {
  162. return &WaitingProof{
  163. AnnounceSignatures: proof,
  164. isRemote: isRemote,
  165. }
  166. }
  167. // OppositeKey returns the key which uniquely identifies opposite waiting proof.
  168. func (p *WaitingProof) OppositeKey() WaitingProofKey {
  169. var key [9]byte
  170. binary.BigEndian.PutUint64(key[:8], p.ShortChannelID.ToUint64())
  171. if !p.isRemote {
  172. key[8] = 1
  173. }
  174. return key
  175. }
  176. // Key returns the key which uniquely identifies waiting proof.
  177. func (p *WaitingProof) Key() WaitingProofKey {
  178. var key [9]byte
  179. binary.BigEndian.PutUint64(key[:8], p.ShortChannelID.ToUint64())
  180. if p.isRemote {
  181. key[8] = 1
  182. }
  183. return key
  184. }
  185. // Encode writes the internal representation of waiting proof in byte stream.
  186. func (p *WaitingProof) Encode(w io.Writer) error {
  187. if err := binary.Write(w, byteOrder, p.isRemote); err != nil {
  188. return err
  189. }
  190. // TODO(yy): remove the type assertion when we finished refactoring db
  191. // into using write buffer.
  192. buf, ok := w.(*bytes.Buffer)
  193. if !ok {
  194. return fmt.Errorf("expect io.Writer to be *bytes.Buffer")
  195. }
  196. if err := p.AnnounceSignatures.Encode(buf, 0); err != nil {
  197. return err
  198. }
  199. return nil
  200. }
  201. // Decode reads the data from the byte stream and initializes the
  202. // waiting proof object with it.
  203. func (p *WaitingProof) Decode(r io.Reader) error {
  204. if err := binary.Read(r, byteOrder, &p.isRemote); err != nil {
  205. return err
  206. }
  207. msg := &lnwire.AnnounceSignatures{}
  208. if err := msg.Decode(r, 0); err != nil {
  209. return err
  210. }
  211. (*p).AnnounceSignatures = msg
  212. return nil
  213. }