filesystem.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package api
  17. import (
  18. "bufio"
  19. "fmt"
  20. "io"
  21. "net/http"
  22. "os"
  23. "path"
  24. "path/filepath"
  25. "sync"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/log"
  28. "github.com/ethereum/go-ethereum/swarm/storage"
  29. )
  30. const maxParallelFiles = 5
  31. type FileSystem struct {
  32. api *Api
  33. }
  34. func NewFileSystem(api *Api) *FileSystem {
  35. return &FileSystem{api}
  36. }
  37. // Upload replicates a local directory as a manifest file and uploads it
  38. // using dpa store
  39. // TODO: localpath should point to a manifest
  40. //
  41. // DEPRECATED: Use the HTTP API instead
  42. func (self *FileSystem) Upload(lpath, index string) (string, error) {
  43. var list []*manifestTrieEntry
  44. localpath, err := filepath.Abs(filepath.Clean(lpath))
  45. if err != nil {
  46. return "", err
  47. }
  48. f, err := os.Open(localpath)
  49. if err != nil {
  50. return "", err
  51. }
  52. stat, err := f.Stat()
  53. if err != nil {
  54. return "", err
  55. }
  56. var start int
  57. if stat.IsDir() {
  58. start = len(localpath)
  59. log.Debug(fmt.Sprintf("uploading '%s'", localpath))
  60. err = filepath.Walk(localpath, func(path string, info os.FileInfo, err error) error {
  61. if (err == nil) && !info.IsDir() {
  62. if len(path) <= start {
  63. return fmt.Errorf("Path is too short")
  64. }
  65. if path[:start] != localpath {
  66. return fmt.Errorf("Path prefix of '%s' does not match localpath '%s'", path, localpath)
  67. }
  68. entry := newManifestTrieEntry(&ManifestEntry{Path: filepath.ToSlash(path)}, nil)
  69. list = append(list, entry)
  70. }
  71. return err
  72. })
  73. if err != nil {
  74. return "", err
  75. }
  76. } else {
  77. dir := filepath.Dir(localpath)
  78. start = len(dir)
  79. if len(localpath) <= start {
  80. return "", fmt.Errorf("Path is too short")
  81. }
  82. if localpath[:start] != dir {
  83. return "", fmt.Errorf("Path prefix of '%s' does not match dir '%s'", localpath, dir)
  84. }
  85. entry := newManifestTrieEntry(&ManifestEntry{Path: filepath.ToSlash(localpath)}, nil)
  86. list = append(list, entry)
  87. }
  88. cnt := len(list)
  89. errors := make([]error, cnt)
  90. done := make(chan bool, maxParallelFiles)
  91. dcnt := 0
  92. awg := &sync.WaitGroup{}
  93. for i, entry := range list {
  94. if i >= dcnt+maxParallelFiles {
  95. <-done
  96. dcnt++
  97. }
  98. awg.Add(1)
  99. go func(i int, entry *manifestTrieEntry, done chan bool) {
  100. f, err := os.Open(entry.Path)
  101. if err == nil {
  102. stat, _ := f.Stat()
  103. var hash storage.Key
  104. wg := &sync.WaitGroup{}
  105. hash, err = self.api.dpa.Store(f, stat.Size(), wg, nil)
  106. if hash != nil {
  107. list[i].Hash = hash.String()
  108. }
  109. wg.Wait()
  110. awg.Done()
  111. if err == nil {
  112. first512 := make([]byte, 512)
  113. fread, _ := f.ReadAt(first512, 0)
  114. if fread > 0 {
  115. mimeType := http.DetectContentType(first512[:fread])
  116. if filepath.Ext(entry.Path) == ".css" {
  117. mimeType = "text/css"
  118. }
  119. list[i].ContentType = mimeType
  120. }
  121. }
  122. f.Close()
  123. }
  124. errors[i] = err
  125. done <- true
  126. }(i, entry, done)
  127. }
  128. for dcnt < cnt {
  129. <-done
  130. dcnt++
  131. }
  132. trie := &manifestTrie{
  133. dpa: self.api.dpa,
  134. }
  135. quitC := make(chan bool)
  136. for i, entry := range list {
  137. if errors[i] != nil {
  138. return "", errors[i]
  139. }
  140. entry.Path = RegularSlashes(entry.Path[start:])
  141. if entry.Path == index {
  142. ientry := newManifestTrieEntry(&ManifestEntry{
  143. ContentType: entry.ContentType,
  144. }, nil)
  145. ientry.Hash = entry.Hash
  146. trie.addEntry(ientry, quitC)
  147. }
  148. trie.addEntry(entry, quitC)
  149. }
  150. err2 := trie.recalcAndStore()
  151. var hs string
  152. if err2 == nil {
  153. hs = trie.hash.String()
  154. }
  155. awg.Wait()
  156. return hs, err2
  157. }
  158. // Download replicates the manifest basePath structure on the local filesystem
  159. // under localpath
  160. //
  161. // DEPRECATED: Use the HTTP API instead
  162. func (self *FileSystem) Download(bzzpath, localpath string) error {
  163. lpath, err := filepath.Abs(filepath.Clean(localpath))
  164. if err != nil {
  165. return err
  166. }
  167. err = os.MkdirAll(lpath, os.ModePerm)
  168. if err != nil {
  169. return err
  170. }
  171. //resolving host and port
  172. uri, err := Parse(path.Join("bzz:/", bzzpath))
  173. if err != nil {
  174. return err
  175. }
  176. key, err := self.api.Resolve(uri)
  177. if err != nil {
  178. return err
  179. }
  180. path := uri.Path
  181. if len(path) > 0 {
  182. path += "/"
  183. }
  184. quitC := make(chan bool)
  185. trie, err := loadManifest(self.api.dpa, key, quitC)
  186. if err != nil {
  187. log.Warn(fmt.Sprintf("fs.Download: loadManifestTrie error: %v", err))
  188. return err
  189. }
  190. type downloadListEntry struct {
  191. key storage.Key
  192. path string
  193. }
  194. var list []*downloadListEntry
  195. var mde error
  196. prevPath := lpath
  197. err = trie.listWithPrefix(path, quitC, func(entry *manifestTrieEntry, suffix string) {
  198. log.Trace(fmt.Sprintf("fs.Download: %#v", entry))
  199. key = common.Hex2Bytes(entry.Hash)
  200. path := lpath + "/" + suffix
  201. dir := filepath.Dir(path)
  202. if dir != prevPath {
  203. mde = os.MkdirAll(dir, os.ModePerm)
  204. prevPath = dir
  205. }
  206. if (mde == nil) && (path != dir+"/") {
  207. list = append(list, &downloadListEntry{key: key, path: path})
  208. }
  209. })
  210. if err != nil {
  211. return err
  212. }
  213. wg := sync.WaitGroup{}
  214. errC := make(chan error)
  215. done := make(chan bool, maxParallelFiles)
  216. for i, entry := range list {
  217. select {
  218. case done <- true:
  219. wg.Add(1)
  220. case <-quitC:
  221. return fmt.Errorf("aborted")
  222. }
  223. go func(i int, entry *downloadListEntry) {
  224. defer wg.Done()
  225. err := retrieveToFile(quitC, self.api.dpa, entry.key, entry.path)
  226. if err != nil {
  227. select {
  228. case errC <- err:
  229. case <-quitC:
  230. }
  231. return
  232. }
  233. <-done
  234. }(i, entry)
  235. }
  236. go func() {
  237. wg.Wait()
  238. close(errC)
  239. }()
  240. select {
  241. case err = <-errC:
  242. return err
  243. case <-quitC:
  244. return fmt.Errorf("aborted")
  245. }
  246. }
  247. func retrieveToFile(quitC chan bool, dpa *storage.DPA, key storage.Key, path string) error {
  248. f, err := os.Create(path) // TODO: basePath separators
  249. if err != nil {
  250. return err
  251. }
  252. reader := dpa.Retrieve(key)
  253. writer := bufio.NewWriter(f)
  254. size, err := reader.Size(quitC)
  255. if err != nil {
  256. return err
  257. }
  258. if _, err = io.CopyN(writer, reader, size); err != nil {
  259. return err
  260. }
  261. if err := writer.Flush(); err != nil {
  262. return err
  263. }
  264. return f.Close()
  265. }