dpa.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package storage
  17. import (
  18. "errors"
  19. "fmt"
  20. "io"
  21. "sync"
  22. "time"
  23. "github.com/ethereum/go-ethereum/log"
  24. )
  25. /*
  26. DPA provides the client API entrypoints Store and Retrieve to store and retrieve
  27. It can store anything that has a byte slice representation, so files or serialised objects etc.
  28. Storage: DPA calls the Chunker to segment the input datastream of any size to a merkle hashed tree of chunks. The key of the root block is returned to the client.
  29. Retrieval: given the key of the root block, the DPA retrieves the block chunks and reconstructs the original data and passes it back as a lazy reader. A lazy reader is a reader with on-demand delayed processing, i.e. the chunks needed to reconstruct a large file are only fetched and processed if that particular part of the document is actually read.
  30. As the chunker produces chunks, DPA dispatches them to its own chunk store
  31. implementation for storage or retrieval.
  32. */
  33. const (
  34. storeChanCapacity = 100
  35. retrieveChanCapacity = 100
  36. singletonSwarmDbCapacity = 50000
  37. singletonSwarmCacheCapacity = 500
  38. maxStoreProcesses = 8
  39. maxRetrieveProcesses = 8
  40. )
  41. var (
  42. notFound = errors.New("not found")
  43. )
  44. type DPA struct {
  45. ChunkStore
  46. storeC chan *Chunk
  47. retrieveC chan *Chunk
  48. Chunker Chunker
  49. lock sync.Mutex
  50. running bool
  51. quitC chan bool
  52. }
  53. // for testing locally
  54. func NewLocalDPA(datadir string) (*DPA, error) {
  55. hash := MakeHashFunc("SHA256")
  56. dbStore, err := NewDbStore(datadir, hash, singletonSwarmDbCapacity, 0)
  57. if err != nil {
  58. return nil, err
  59. }
  60. return NewDPA(&LocalStore{
  61. NewMemStore(dbStore, singletonSwarmCacheCapacity),
  62. dbStore,
  63. }, NewChunkerParams()), nil
  64. }
  65. func NewDPA(store ChunkStore, params *ChunkerParams) *DPA {
  66. chunker := NewTreeChunker(params)
  67. return &DPA{
  68. Chunker: chunker,
  69. ChunkStore: store,
  70. }
  71. }
  72. // Public API. Main entry point for document retrieval directly. Used by the
  73. // FS-aware API and httpaccess
  74. // Chunk retrieval blocks on netStore requests with a timeout so reader will
  75. // report error if retrieval of chunks within requested range time out.
  76. func (self *DPA) Retrieve(key Key) LazySectionReader {
  77. return self.Chunker.Join(key, self.retrieveC)
  78. }
  79. // Public API. Main entry point for document storage directly. Used by the
  80. // FS-aware API and httpaccess
  81. func (self *DPA) Store(data io.Reader, size int64, swg *sync.WaitGroup, wwg *sync.WaitGroup) (key Key, err error) {
  82. return self.Chunker.Split(data, size, self.storeC, swg, wwg)
  83. }
  84. func (self *DPA) Start() {
  85. self.lock.Lock()
  86. defer self.lock.Unlock()
  87. if self.running {
  88. return
  89. }
  90. self.running = true
  91. self.retrieveC = make(chan *Chunk, retrieveChanCapacity)
  92. self.storeC = make(chan *Chunk, storeChanCapacity)
  93. self.quitC = make(chan bool)
  94. self.storeLoop()
  95. self.retrieveLoop()
  96. }
  97. func (self *DPA) Stop() {
  98. self.lock.Lock()
  99. defer self.lock.Unlock()
  100. if !self.running {
  101. return
  102. }
  103. self.running = false
  104. close(self.quitC)
  105. }
  106. // retrieveLoop dispatches the parallel chunk retrieval requests received on the
  107. // retrieve channel to its ChunkStore (NetStore or LocalStore)
  108. func (self *DPA) retrieveLoop() {
  109. for i := 0; i < maxRetrieveProcesses; i++ {
  110. go self.retrieveWorker()
  111. }
  112. log.Trace(fmt.Sprintf("dpa: retrieve loop spawning %v workers", maxRetrieveProcesses))
  113. }
  114. func (self *DPA) retrieveWorker() {
  115. for chunk := range self.retrieveC {
  116. log.Trace(fmt.Sprintf("dpa: retrieve loop : chunk %v", chunk.Key.Log()))
  117. storedChunk, err := self.Get(chunk.Key)
  118. if err == notFound {
  119. log.Trace(fmt.Sprintf("chunk %v not found", chunk.Key.Log()))
  120. } else if err != nil {
  121. log.Trace(fmt.Sprintf("error retrieving chunk %v: %v", chunk.Key.Log(), err))
  122. } else {
  123. chunk.SData = storedChunk.SData
  124. chunk.Size = storedChunk.Size
  125. }
  126. close(chunk.C)
  127. select {
  128. case <-self.quitC:
  129. return
  130. default:
  131. }
  132. }
  133. }
  134. // storeLoop dispatches the parallel chunk store request processors
  135. // received on the store channel to its ChunkStore (NetStore or LocalStore)
  136. func (self *DPA) storeLoop() {
  137. for i := 0; i < maxStoreProcesses; i++ {
  138. go self.storeWorker()
  139. }
  140. log.Trace(fmt.Sprintf("dpa: store spawning %v workers", maxStoreProcesses))
  141. }
  142. func (self *DPA) storeWorker() {
  143. for chunk := range self.storeC {
  144. self.Put(chunk)
  145. if chunk.wg != nil {
  146. log.Trace(fmt.Sprintf("dpa: store processor %v", chunk.Key.Log()))
  147. chunk.wg.Done()
  148. }
  149. select {
  150. case <-self.quitC:
  151. return
  152. default:
  153. }
  154. }
  155. }
  156. // DpaChunkStore implements the ChunkStore interface,
  157. // this chunk access layer assumed 2 chunk stores
  158. // local storage eg. LocalStore and network storage eg., NetStore
  159. // access by calling network is blocking with a timeout
  160. type dpaChunkStore struct {
  161. n int
  162. localStore ChunkStore
  163. netStore ChunkStore
  164. }
  165. func NewDpaChunkStore(localStore, netStore ChunkStore) *dpaChunkStore {
  166. return &dpaChunkStore{0, localStore, netStore}
  167. }
  168. // Get is the entrypoint for local retrieve requests
  169. // waits for response or times out
  170. func (self *dpaChunkStore) Get(key Key) (chunk *Chunk, err error) {
  171. chunk, err = self.netStore.Get(key)
  172. // timeout := time.Now().Add(searchTimeout)
  173. if chunk.SData != nil {
  174. log.Trace(fmt.Sprintf("DPA.Get: %v found locally, %d bytes", key.Log(), len(chunk.SData)))
  175. return
  176. }
  177. // TODO: use self.timer time.Timer and reset with defer disableTimer
  178. timer := time.After(searchTimeout)
  179. select {
  180. case <-timer:
  181. log.Trace(fmt.Sprintf("DPA.Get: %v request time out ", key.Log()))
  182. err = notFound
  183. case <-chunk.Req.C:
  184. log.Trace(fmt.Sprintf("DPA.Get: %v retrieved, %d bytes (%p)", key.Log(), len(chunk.SData), chunk))
  185. }
  186. return
  187. }
  188. // Put is the entrypoint for local store requests coming from storeLoop
  189. func (self *dpaChunkStore) Put(entry *Chunk) {
  190. chunk, err := self.localStore.Get(entry.Key)
  191. if err != nil {
  192. log.Trace(fmt.Sprintf("DPA.Put: %v new chunk. call netStore.Put", entry.Key.Log()))
  193. chunk = entry
  194. } else if chunk.SData == nil {
  195. log.Trace(fmt.Sprintf("DPA.Put: %v request entry found", entry.Key.Log()))
  196. chunk.SData = entry.SData
  197. chunk.Size = entry.Size
  198. } else {
  199. log.Trace(fmt.Sprintf("DPA.Put: %v chunk already known", entry.Key.Log()))
  200. return
  201. }
  202. // from this point on the storage logic is the same with network storage requests
  203. log.Trace(fmt.Sprintf("DPA.Put %v: %v", self.n, chunk.Key.Log()))
  204. self.n++
  205. self.netStore.Put(chunk)
  206. }
  207. // Close chunk store
  208. func (self *dpaChunkStore) Close() {}