123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534 |
- package storage
- import (
- "encoding/binary"
- "errors"
- "fmt"
- "io"
- "sync"
- "time"
- "github.com/ethereum/go-ethereum/metrics"
- )
- var (
- errAppendOppNotSuported = errors.New("Append operation not supported")
- errOperationTimedOut = errors.New("operation timed out")
- )
- var (
- newChunkCounter = metrics.NewRegisteredCounter("storage.chunks.new", nil)
- )
- type TreeChunker struct {
- branches int64
- hashFunc SwarmHasher
-
- hashSize int64
- chunkSize int64
- workerCount int64
- workerLock sync.RWMutex
- }
- func NewTreeChunker(params *ChunkerParams) (self *TreeChunker) {
- self = &TreeChunker{}
- self.hashFunc = MakeHashFunc(params.Hash)
- self.branches = params.Branches
- self.hashSize = int64(self.hashFunc().Size())
- self.chunkSize = self.hashSize * self.branches
- self.workerCount = 0
- return
- }
- func (self *Chunk) String() string {
- return fmt.Sprintf("Key: %v TreeSize: %v Chunksize: %v", self.Key.Log(), self.Size, len(self.SData))
- }
- type hashJob struct {
- key Key
- chunk []byte
- size int64
- parentWg *sync.WaitGroup
- }
- func (self *TreeChunker) incrementWorkerCount() {
- self.workerLock.Lock()
- defer self.workerLock.Unlock()
- self.workerCount += 1
- }
- func (self *TreeChunker) getWorkerCount() int64 {
- self.workerLock.RLock()
- defer self.workerLock.RUnlock()
- return self.workerCount
- }
- func (self *TreeChunker) decrementWorkerCount() {
- self.workerLock.Lock()
- defer self.workerLock.Unlock()
- self.workerCount -= 1
- }
- func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
- if self.chunkSize <= 0 {
- panic("chunker must be initialised")
- }
- jobC := make(chan *hashJob, 2*ChunkProcessors)
- wg := &sync.WaitGroup{}
- errC := make(chan error)
- quitC := make(chan bool)
-
- if wwg != nil {
- wwg.Add(1)
- }
- self.incrementWorkerCount()
- go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg)
- depth := 0
- treeSize := self.chunkSize
-
-
- for ; treeSize < size; treeSize *= self.branches {
- depth++
- }
- key := make([]byte, self.hashFunc().Size())
-
- wg.Add(1)
-
- go self.split(depth, treeSize/self.branches, key, data, size, jobC, chunkC, errC, quitC, wg, swg, wwg)
-
- go func() {
-
- wg.Wait()
-
- if swg != nil {
- swg.Wait()
- }
- close(errC)
- }()
- defer close(quitC)
- select {
- case err := <-errC:
- if err != nil {
- return nil, err
- }
- case <-time.NewTimer(splitTimeout).C:
- return nil, errOperationTimedOut
- }
- return key, nil
- }
- func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, parentWg, swg, wwg *sync.WaitGroup) {
-
- for depth > 0 && size < treeSize {
- treeSize /= self.branches
- depth--
- }
- if depth == 0 {
-
- chunkData := make([]byte, size+8)
- binary.LittleEndian.PutUint64(chunkData[0:8], uint64(size))
- var readBytes int64
- for readBytes < size {
- n, err := data.Read(chunkData[8+readBytes:])
- readBytes += int64(n)
- if err != nil && !(err == io.EOF && readBytes == size) {
- errC <- err
- return
- }
- }
- select {
- case jobC <- &hashJob{key, chunkData, size, parentWg}:
- case <-quitC:
- }
- return
- }
-
-
- branchCnt := (size + treeSize - 1) / treeSize
- var chunk = make([]byte, branchCnt*self.hashSize+8)
- var pos, i int64
- binary.LittleEndian.PutUint64(chunk[0:8], uint64(size))
- childrenWg := &sync.WaitGroup{}
- var secSize int64
- for i < branchCnt {
-
- if size-pos < treeSize {
- secSize = size - pos
- } else {
- secSize = treeSize
- }
-
- subTreeKey := chunk[8+i*self.hashSize : 8+(i+1)*self.hashSize]
- childrenWg.Add(1)
- self.split(depth-1, treeSize/self.branches, subTreeKey, data, secSize, jobC, chunkC, errC, quitC, childrenWg, swg, wwg)
- i++
- pos += treeSize
- }
-
-
-
- childrenWg.Wait()
- worker := self.getWorkerCount()
- if int64(len(jobC)) > worker && worker < ChunkProcessors {
- if wwg != nil {
- wwg.Add(1)
- }
- self.incrementWorkerCount()
- go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg)
- }
- select {
- case jobC <- &hashJob{key, chunk, size, parentWg}:
- case <-quitC:
- }
- }
- func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, swg, wwg *sync.WaitGroup) {
- defer self.decrementWorkerCount()
- hasher := self.hashFunc()
- if wwg != nil {
- defer wwg.Done()
- }
- for {
- select {
- case job, ok := <-jobC:
- if !ok {
- return
- }
-
- self.hashChunk(hasher, job, chunkC, swg)
- case <-quitC:
- return
- }
- }
- }
- func (self *TreeChunker) hashChunk(hasher SwarmHash, job *hashJob, chunkC chan *Chunk, swg *sync.WaitGroup) {
- hasher.ResetWithLength(job.chunk[:8])
- hasher.Write(job.chunk[8:])
- h := hasher.Sum(nil)
- newChunk := &Chunk{
- Key: h,
- SData: job.chunk,
- Size: job.size,
- wg: swg,
- }
-
- copy(job.key, h)
-
- if chunkC != nil {
- if swg != nil {
- swg.Add(1)
- }
- }
- job.parentWg.Done()
- if chunkC != nil {
-
-
-
-
-
-
- newChunkCounter.Inc(1)
- chunkC <- newChunk
- }
- }
- func (self *TreeChunker) Append(key Key, data io.Reader, chunkC chan *Chunk, swg, wwg *sync.WaitGroup) (Key, error) {
- return nil, errAppendOppNotSuported
- }
- type LazyChunkReader struct {
- key Key
- chunkC chan *Chunk
- chunk *Chunk
- off int64
- chunkSize int64
- branches int64
- hashSize int64
- }
- func (self *TreeChunker) Join(key Key, chunkC chan *Chunk) LazySectionReader {
- return &LazyChunkReader{
- key: key,
- chunkC: chunkC,
- chunkSize: self.chunkSize,
- branches: self.branches,
- hashSize: self.hashSize,
- }
- }
- func (self *LazyChunkReader) Size(quitC chan bool) (n int64, err error) {
- if self.chunk != nil {
- return self.chunk.Size, nil
- }
- chunk := retrieve(self.key, self.chunkC, quitC)
- if chunk == nil {
- select {
- case <-quitC:
- return 0, errors.New("aborted")
- default:
- return 0, fmt.Errorf("root chunk not found for %v", self.key.Hex())
- }
- }
- self.chunk = chunk
- return chunk.Size, nil
- }
- func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
-
- if len(b) == 0 {
- return 0, nil
- }
- quitC := make(chan bool)
- size, err := self.Size(quitC)
- if err != nil {
- return 0, err
- }
- errC := make(chan error)
-
- var treeSize int64
- var depth int
-
- treeSize = self.chunkSize
- for ; treeSize < size; treeSize *= self.branches {
- depth++
- }
- wg := sync.WaitGroup{}
- wg.Add(1)
- go self.join(b, off, off+int64(len(b)), depth, treeSize/self.branches, self.chunk, &wg, errC, quitC)
- go func() {
- wg.Wait()
- close(errC)
- }()
- err = <-errC
- if err != nil {
- close(quitC)
- return 0, err
- }
- if off+int64(len(b)) >= size {
- return len(b), io.EOF
- }
- return len(b), nil
- }
- func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunk *Chunk, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) {
- defer parentWg.Done()
-
-
-
- for chunk.Size < treeSize && depth > 0 {
- treeSize /= self.branches
- depth--
- }
-
- if depth == 0 {
- extra := 8 + eoff - int64(len(chunk.SData))
- if extra > 0 {
- eoff -= extra
- }
- copy(b, chunk.SData[8+off:8+eoff])
- return
- }
-
- start := off / treeSize
- end := (eoff + treeSize - 1) / treeSize
- wg := &sync.WaitGroup{}
- defer wg.Wait()
- for i := start; i < end; i++ {
- soff := i * treeSize
- roff := soff
- seoff := soff + treeSize
- if soff < off {
- soff = off
- }
- if seoff > eoff {
- seoff = eoff
- }
- if depth > 1 {
- wg.Wait()
- }
- wg.Add(1)
- go func(j int64) {
- childKey := chunk.SData[8+j*self.hashSize : 8+(j+1)*self.hashSize]
- chunk := retrieve(childKey, self.chunkC, quitC)
- if chunk == nil {
- select {
- case errC <- fmt.Errorf("chunk %v-%v not found", off, off+treeSize):
- case <-quitC:
- }
- return
- }
- if soff < off {
- soff = off
- }
- self.join(b[soff-off:seoff-off], soff-roff, seoff-roff, depth-1, treeSize/self.branches, chunk, wg, errC, quitC)
- }(i)
- }
- }
- func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk {
- chunk := &Chunk{
- Key: key,
- C: make(chan bool),
- }
-
- select {
- case chunkC <- chunk:
- case <-quitC:
- return nil
- }
-
- select {
- case <-quitC:
-
- return nil
- case <-chunk.C:
- }
- if len(chunk.SData) == 0 {
- return nil
- }
- return chunk
- }
- func (self *LazyChunkReader) Read(b []byte) (read int, err error) {
- read, err = self.ReadAt(b, self.off)
- self.off += int64(read)
- return
- }
- var errWhence = errors.New("Seek: invalid whence")
- var errOffset = errors.New("Seek: invalid offset")
- func (s *LazyChunkReader) Seek(offset int64, whence int) (int64, error) {
- switch whence {
- default:
- return 0, errWhence
- case 0:
- offset += 0
- case 1:
- offset += s.off
- case 2:
- if s.chunk == nil {
- _, err := s.Size(nil)
- if err != nil {
- return 0, fmt.Errorf("can't get size: %v", err)
- }
- }
- offset += s.chunk.Size
- }
- if offset < 0 {
- return 0, errOffset
- }
- s.off = offset
- return offset, nil
- }
|