box.go 50 KB


  1. // Package box provides an interface to the Box
  2. // object storage system.
  3. package box
  4. // FIXME Box only supports file names of 255 characters or less. Names
  5. // that will not be supported are those that contain non-printable
  6. // ascii, / or \, names with trailing spaces, and the special names
  7. // “.” and “..”.
  8. // FIXME box can copy a directory
  9. import (
  10. "context"
  11. "crypto/rsa"
  12. "encoding/json"
  13. "encoding/pem"
  14. "errors"
  15. "fmt"
  16. "io"
  17. "net/http"
  18. "net/url"
  19. "os"
  20. "path"
  21. "strconv"
  22. "strings"
  23. "sync"
  24. "sync/atomic"
  25. "time"
  26. "github.com/golang-jwt/jwt/v4"
  27. "github.com/rclone/rclone/backend/box/api"
  28. "github.com/rclone/rclone/fs"
  29. "github.com/rclone/rclone/fs/config"
  30. "github.com/rclone/rclone/fs/config/configmap"
  31. "github.com/rclone/rclone/fs/config/configstruct"
  32. "github.com/rclone/rclone/fs/config/obscure"
  33. "github.com/rclone/rclone/fs/fserrors"
  34. "github.com/rclone/rclone/fs/fshttp"
  35. "github.com/rclone/rclone/fs/hash"
  36. "github.com/rclone/rclone/lib/dircache"
  37. "github.com/rclone/rclone/lib/encoder"
  38. "github.com/rclone/rclone/lib/env"
  39. "github.com/rclone/rclone/lib/jwtutil"
  40. "github.com/rclone/rclone/lib/oauthutil"
  41. "github.com/rclone/rclone/lib/pacer"
  42. "github.com/rclone/rclone/lib/rest"
  43. "github.com/youmark/pkcs8"
  44. "golang.org/x/oauth2"
  45. )
  46. const (
  47. rcloneClientID = "d0374ba6pgmaguie02ge15sv1mllndho"
  48. rcloneEncryptedClientSecret = "sYbJYm99WB8jzeaLPU0OPDMJKIkZvD2qOn3SyEMfiJr03RdtDt3xcZEIudRhbIDL"
  49. minSleep = 10 * time.Millisecond
  50. maxSleep = 2 * time.Second
  51. decayConstant = 2 // bigger for slower decay, exponential
  52. rootURL = "https://api.box.com/2.0"
  53. uploadURL = "https://upload.box.com/api/2.0"
  54. minUploadCutoff = 50000000 // upload cutoff can be no lower than this
  55. defaultUploadCutoff = 50 * 1024 * 1024
  56. tokenURL = "https://api.box.com/oauth2/token"
  57. )
  58. // Globals
  59. var (
  60. // Description of how to auth for this app
  61. oauthConfig = &oauth2.Config{
  62. Scopes: nil,
  63. Endpoint: oauth2.Endpoint{
  64. AuthURL: "https://app.box.com/api/oauth2/authorize",
  65. TokenURL: "https://app.box.com/api/oauth2/token",
  66. },
  67. ClientID: rcloneClientID,
  68. ClientSecret: obscure.MustReveal(rcloneEncryptedClientSecret),
  69. RedirectURL: oauthutil.RedirectURL,
  70. }
  71. )
  72. type boxCustomClaims struct {
  73. jwt.StandardClaims
  74. BoxSubType string `json:"box_sub_type,omitempty"`
  75. }
  76. // Register with Fs
  77. func init() {
  78. fs.Register(&fs.RegInfo{
  79. Name: "box",
  80. Description: "Box",
  81. NewFs: NewFs,
  82. Config: func(ctx context.Context, name string, m configmap.Mapper, config fs.ConfigIn) (*fs.ConfigOut, error) {
  83. jsonFile, ok := m.Get("box_config_file")
  84. boxSubType, boxSubTypeOk := m.Get("box_sub_type")
  85. boxAccessToken, boxAccessTokenOk := m.Get("access_token")
  86. var err error
  87. // If using box config.json, use JWT auth
  88. if ok && boxSubTypeOk && jsonFile != "" && boxSubType != "" {
  89. err = refreshJWTToken(ctx, jsonFile, boxSubType, name, m)
  90. if err != nil {
  91. return nil, fmt.Errorf("failed to configure token with jwt authentication: %w", err)
  92. }
  93. // Else, if not using an access token, use oauth2
  94. } else if boxAccessToken == "" || !boxAccessTokenOk {
  95. return oauthutil.ConfigOut("", &oauthutil.Options{
  96. OAuth2Config: oauthConfig,
  97. })
  98. }
  99. return nil, nil
  100. },
  101. Options: append(oauthutil.SharedOptions, []fs.Option{{
  102. Name: "root_folder_id",
  103. Help: "Fill in for rclone to use a non root folder as its starting point.",
  104. Default: "0",
  105. Advanced: true,
  106. Sensitive: true,
  107. }, {
  108. Name: "box_config_file",
  109. Help: "Box App config.json location\n\nLeave blank normally." + env.ShellExpandHelp,
  110. }, {
  111. Name: "access_token",
  112. Help: "Box App Primary Access Token\n\nLeave blank normally.",
  113. Sensitive: true,
  114. }, {
  115. Name: "box_sub_type",
  116. Default: "user",
  117. Examples: []fs.OptionExample{{
  118. Value: "user",
  119. Help: "Rclone should act on behalf of a user.",
  120. }, {
  121. Value: "enterprise",
  122. Help: "Rclone should act on behalf of a service account.",
  123. }},
  124. }, {
  125. Name: "upload_cutoff",
  126. Help: "Cutoff for switching to multipart upload (>= 50 MiB).",
  127. Default: fs.SizeSuffix(defaultUploadCutoff),
  128. Advanced: true,
  129. }, {
  130. Name: "commit_retries",
  131. Help: "Max number of times to try committing a multipart file.",
  132. Default: 100,
  133. Advanced: true,
  134. }, {
  135. Name: "list_chunk",
  136. Default: 1000,
  137. Help: "Size of listing chunk 1-1000.",
  138. Advanced: true,
  139. }, {
  140. Name: "owned_by",
  141. Default: "",
  142. Help: "Only show items owned by the login (email address) passed in.",
  143. Advanced: true,
  144. }, {
  145. Name: "impersonate",
  146. Default: "",
  147. Help: `Impersonate this user ID when using a service account.
  148. Setting this flag allows rclone, when using a JWT service account, to
  149. act on behalf of another user by setting the as-user header.
  150. The user ID is the Box identifier for a user. User IDs can found for
  151. any user via the GET /users endpoint, which is only available to
  152. admins, or by calling the GET /users/me endpoint with an authenticated
  153. user session.
  154. See: https://developer.box.com/guides/authentication/jwt/as-user/
  155. `,
  156. Advanced: true,
  157. Sensitive: true,
  158. }, {
  159. Name: config.ConfigEncoding,
  160. Help: config.ConfigEncodingHelp,
  161. Advanced: true,
  162. // From https://developer.box.com/docs/error-codes#section-400-bad-request :
  163. // > Box only supports file or folder names that are 255 characters or less.
  164. // > File names containing non-printable ascii, "/" or "\", names with leading
  165. // > or trailing spaces, and the special names “.” and “..” are also unsupported.
  166. //
  167. // Testing revealed names with leading spaces work fine.
  168. // Also encode invalid UTF-8 bytes as json doesn't handle them properly.
  169. Default: (encoder.Display |
  170. encoder.EncodeBackSlash |
  171. encoder.EncodeRightSpace |
  172. encoder.EncodeInvalidUtf8),
  173. }}...),
  174. })
  175. }
  176. func refreshJWTToken(ctx context.Context, jsonFile string, boxSubType string, name string, m configmap.Mapper) error {
  177. jsonFile = env.ShellExpand(jsonFile)
  178. boxConfig, err := getBoxConfig(jsonFile)
  179. if err != nil {
  180. return fmt.Errorf("get box config: %w", err)
  181. }
  182. privateKey, err := getDecryptedPrivateKey(boxConfig)
  183. if err != nil {
  184. return fmt.Errorf("get decrypted private key: %w", err)
  185. }
  186. claims, err := getClaims(boxConfig, boxSubType)
  187. if err != nil {
  188. return fmt.Errorf("get claims: %w", err)
  189. }
  190. signingHeaders := getSigningHeaders(boxConfig)
  191. queryParams := getQueryParams(boxConfig)
  192. client := fshttp.NewClient(ctx)
  193. err = jwtutil.Config("box", name, tokenURL, *claims, signingHeaders, queryParams, privateKey, m, client)
  194. return err
  195. }
  196. func getBoxConfig(configFile string) (boxConfig *api.ConfigJSON, err error) {
  197. file, err := os.ReadFile(configFile)
  198. if err != nil {
  199. return nil, fmt.Errorf("box: failed to read Box config: %w", err)
  200. }
  201. err = json.Unmarshal(file, &boxConfig)
  202. if err != nil {
  203. return nil, fmt.Errorf("box: failed to parse Box config: %w", err)
  204. }
  205. return boxConfig, nil
  206. }
  207. func getClaims(boxConfig *api.ConfigJSON, boxSubType string) (claims *boxCustomClaims, err error) {
  208. val, err := jwtutil.RandomHex(20)
  209. if err != nil {
  210. return nil, fmt.Errorf("box: failed to generate random string for jti: %w", err)
  211. }
  212. claims = &boxCustomClaims{
  213. //lint:ignore SA1019 since we need to use jwt.StandardClaims even if deprecated in jwt-go v4 until a more permanent solution is ready in time before jwt-go v5 where it is removed entirely
  214. //nolint:staticcheck // Don't include staticcheck when running golangci-lint to avoid SA1019
  215. StandardClaims: jwt.StandardClaims{
  216. Id: val,
  217. Issuer: boxConfig.BoxAppSettings.ClientID,
  218. Subject: boxConfig.EnterpriseID,
  219. Audience: tokenURL,
  220. ExpiresAt: time.Now().Add(time.Second * 45).Unix(),
  221. },
  222. BoxSubType: boxSubType,
  223. }
  224. return claims, nil
  225. }
  226. func getSigningHeaders(boxConfig *api.ConfigJSON) map[string]interface{} {
  227. signingHeaders := map[string]interface{}{
  228. "kid": boxConfig.BoxAppSettings.AppAuth.PublicKeyID,
  229. }
  230. return signingHeaders
  231. }
  232. func getQueryParams(boxConfig *api.ConfigJSON) map[string]string {
  233. queryParams := map[string]string{
  234. "client_id": boxConfig.BoxAppSettings.ClientID,
  235. "client_secret": boxConfig.BoxAppSettings.ClientSecret,
  236. }
  237. return queryParams
  238. }
  239. func getDecryptedPrivateKey(boxConfig *api.ConfigJSON) (key *rsa.PrivateKey, err error) {
  240. block, rest := pem.Decode([]byte(boxConfig.BoxAppSettings.AppAuth.PrivateKey))
  241. if len(rest) > 0 {
  242. return nil, fmt.Errorf("box: extra data included in private key: %w", err)
  243. }
  244. rsaKey, err := pkcs8.ParsePKCS8PrivateKey(block.Bytes, []byte(boxConfig.BoxAppSettings.AppAuth.Passphrase))
  245. if err != nil {
  246. return nil, fmt.Errorf("box: failed to decrypt private key: %w", err)
  247. }
  248. return rsaKey.(*rsa.PrivateKey), nil
  249. }
  250. // Options defines the configuration for this backend
  251. type Options struct {
  252. UploadCutoff fs.SizeSuffix `config:"upload_cutoff"`
  253. CommitRetries int `config:"commit_retries"`
  254. Enc encoder.MultiEncoder `config:"encoding"`
  255. RootFolderID string `config:"root_folder_id"`
  256. AccessToken string `config:"access_token"`
  257. ListChunk int `config:"list_chunk"`
  258. OwnedBy string `config:"owned_by"`
  259. Impersonate string `config:"impersonate"`
  260. }
  261. // ItemMeta defines metadata we cache for each Item ID
  262. type ItemMeta struct {
  263. SequenceID int64 // the most recent event processed for this item
  264. ParentID string // ID of the parent directory of this item
  265. Name string // leaf name of this item
  266. }
  267. // Fs represents a remote box
  268. type Fs struct {
  269. name string // name of this remote
  270. root string // the path we are working on
  271. opt Options // parsed options
  272. features *fs.Features // optional features
  273. srv *rest.Client // the connection to the server
  274. dirCache *dircache.DirCache // Map of directory path to directory id
  275. pacer *fs.Pacer // pacer for API calls
  276. tokenRenewer *oauthutil.Renew // renew the token on expiry
  277. uploadToken *pacer.TokenDispenser // control concurrency
  278. itemMetaCacheMu *sync.Mutex // protects itemMetaCache
  279. itemMetaCache map[string]ItemMeta // map of Item ID to selected metadata
  280. }
  281. // Object describes a box object
  282. //
  283. // Will definitely have info but maybe not meta
  284. type Object struct {
  285. fs *Fs // what this object is part of
  286. remote string // The remote path
  287. hasMetaData bool // whether info below has been set
  288. size int64 // size of the object
  289. modTime time.Time // modification time of the object
  290. id string // ID of the object
  291. publicLink string // Public Link for the object
  292. sha1 string // SHA-1 of the object content
  293. }
  294. // ------------------------------------------------------------
  295. // Name of the remote (as passed into NewFs)
  296. func (f *Fs) Name() string {
  297. return f.name
  298. }
  299. // Root of the remote (as passed into NewFs)
  300. func (f *Fs) Root() string {
  301. return f.root
  302. }
  303. // String converts this Fs to a string
  304. func (f *Fs) String() string {
  305. return fmt.Sprintf("box root '%s'", f.root)
  306. }
  307. // Features returns the optional features of this Fs
  308. func (f *Fs) Features() *fs.Features {
  309. return f.features
  310. }
  311. // parsePath parses a box 'url'
  312. func parsePath(path string) (root string) {
  313. root = strings.Trim(path, "/")
  314. return
  315. }
  316. // retryErrorCodes is a slice of error codes that we will retry
  317. var retryErrorCodes = []int{
  318. 429, // Too Many Requests.
  319. 500, // Internal Server Error
  320. 502, // Bad Gateway
  321. 503, // Service Unavailable
  322. 504, // Gateway Timeout
  323. 509, // Bandwidth Limit Exceeded
  324. }
  325. // shouldRetry returns a boolean as to whether this resp and err
  326. // deserve to be retried. It returns the err as a convenience
  327. func shouldRetry(ctx context.Context, resp *http.Response, err error) (bool, error) {
  328. if fserrors.ContextError(ctx, &err) {
  329. return false, err
  330. }
  331. authRetry := false
  332. if resp != nil && resp.StatusCode == 401 && strings.Contains(resp.Header.Get("Www-Authenticate"), "expired_token") {
  333. authRetry = true
  334. fs.Debugf(nil, "Should retry: %v", err)
  335. }
  336. // Box API errors which should be retries
  337. if apiErr, ok := err.(*api.Error); ok && apiErr.Code == "operation_blocked_temporary" {
  338. fs.Debugf(nil, "Retrying API error %v", err)
  339. return true, err
  340. }
  341. return authRetry || fserrors.ShouldRetry(err) || fserrors.ShouldRetryHTTP(resp, retryErrorCodes), err
  342. }
  343. // readMetaDataForPath reads the metadata from the path
  344. func (f *Fs) readMetaDataForPath(ctx context.Context, path string) (info *api.Item, err error) {
  345. // defer log.Trace(f, "path=%q", path)("info=%+v, err=%v", &info, &err)
  346. leaf, directoryID, err := f.dirCache.FindPath(ctx, path, false)
  347. if err != nil {
  348. if err == fs.ErrorDirNotFound {
  349. return nil, fs.ErrorObjectNotFound
  350. }
  351. return nil, err
  352. }
  353. // Use preupload to find the ID
  354. itemMini, err := f.preUploadCheck(ctx, leaf, directoryID, -1)
  355. if err != nil {
  356. return nil, err
  357. }
  358. if itemMini == nil {
  359. return nil, fs.ErrorObjectNotFound
  360. }
  361. // Now we have the ID we can look up the object proper
  362. opts := rest.Opts{
  363. Method: "GET",
  364. Path: "/files/" + itemMini.ID,
  365. Parameters: fieldsValue(),
  366. }
  367. var item api.Item
  368. err = f.pacer.Call(func() (bool, error) {
  369. resp, err := f.srv.CallJSON(ctx, &opts, nil, &item)
  370. return shouldRetry(ctx, resp, err)
  371. })
  372. if err != nil {
  373. return nil, err
  374. }
  375. return &item, nil
  376. }
  377. // errorHandler parses a non 2xx error response into an error
  378. func errorHandler(resp *http.Response) error {
  379. // Decode error response
  380. errResponse := new(api.Error)
  381. err := rest.DecodeJSON(resp, &errResponse)
  382. if err != nil {
  383. fs.Debugf(nil, "Couldn't decode error response: %v", err)
  384. }
  385. if errResponse.Code == "" {
  386. errResponse.Code = resp.Status
  387. }
  388. if errResponse.Status == 0 {
  389. errResponse.Status = resp.StatusCode
  390. }
  391. return errResponse
  392. }
  393. // NewFs constructs an Fs from the path, container:path
  394. func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, error) {
  395. // Parse config into Options struct
  396. opt := new(Options)
  397. err := configstruct.Set(m, opt)
  398. if err != nil {
  399. return nil, err
  400. }
  401. if opt.UploadCutoff < minUploadCutoff {
  402. return nil, fmt.Errorf("box: upload cutoff (%v) must be greater than equal to %v", opt.UploadCutoff, fs.SizeSuffix(minUploadCutoff))
  403. }
  404. root = parsePath(root)
  405. client := fshttp.NewClient(ctx)
  406. var ts *oauthutil.TokenSource
  407. // If not using an accessToken, create an oauth client and tokensource
  408. if opt.AccessToken == "" {
  409. client, ts, err = oauthutil.NewClient(ctx, name, m, oauthConfig)
  410. if err != nil {
  411. return nil, fmt.Errorf("failed to configure Box: %w", err)
  412. }
  413. }
  414. ci := fs.GetConfig(ctx)
  415. f := &Fs{
  416. name: name,
  417. root: root,
  418. opt: *opt,
  419. srv: rest.NewClient(client).SetRoot(rootURL),
  420. pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
  421. uploadToken: pacer.NewTokenDispenser(ci.Transfers),
  422. itemMetaCacheMu: new(sync.Mutex),
  423. itemMetaCache: make(map[string]ItemMeta),
  424. }
  425. f.features = (&fs.Features{
  426. CaseInsensitive: true,
  427. CanHaveEmptyDirectories: true,
  428. }).Fill(ctx, f)
  429. f.srv.SetErrorHandler(errorHandler)
  430. // If using an accessToken, set the Authorization header
  431. if f.opt.AccessToken != "" {
  432. f.srv.SetHeader("Authorization", "Bearer "+f.opt.AccessToken)
  433. }
  434. // If using impersonate set an as-user header
  435. if f.opt.Impersonate != "" {
  436. f.srv.SetHeader("as-user", f.opt.Impersonate)
  437. }
  438. jsonFile, ok := m.Get("box_config_file")
  439. boxSubType, boxSubTypeOk := m.Get("box_sub_type")
  440. if ts != nil {
  441. // If using box config.json and JWT, renewing should just refresh the token and
  442. // should do so whether there are uploads pending or not.
  443. if ok && boxSubTypeOk && jsonFile != "" && boxSubType != "" {
  444. f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error {
  445. err := refreshJWTToken(ctx, jsonFile, boxSubType, name, m)
  446. return err
  447. })
  448. f.tokenRenewer.Start()
  449. } else {
  450. // Renew the token in the background
  451. f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error {
  452. _, err := f.readMetaDataForPath(ctx, "")
  453. return err
  454. })
  455. }
  456. }
  457. // Get rootFolderID
  458. rootID := f.opt.RootFolderID
  459. f.dirCache = dircache.New(root, rootID, f)
  460. // Find the current root
  461. err = f.dirCache.FindRoot(ctx, false)
  462. if err != nil {
  463. // Assume it is a file
  464. newRoot, remote := dircache.SplitPath(root)
  465. tempF := *f
  466. tempF.dirCache = dircache.New(newRoot, rootID, &tempF)
  467. tempF.root = newRoot
  468. // Make new Fs which is the parent
  469. err = tempF.dirCache.FindRoot(ctx, false)
  470. if err != nil {
  471. // No root so return old f
  472. return f, nil
  473. }
  474. _, err := tempF.newObjectWithInfo(ctx, remote, nil)
  475. if err != nil {
  476. if err == fs.ErrorObjectNotFound {
  477. // File doesn't exist so return old f
  478. return f, nil
  479. }
  480. return nil, err
  481. }
  482. f.features.Fill(ctx, &tempF)
  483. // XXX: update the old f here instead of returning tempF, since
  484. // `features` were already filled with functions having *f as a receiver.
  485. // See https://github.com/rclone/rclone/issues/2182
  486. f.dirCache = tempF.dirCache
  487. f.root = tempF.root
  488. // return an error with an fs which points to the parent
  489. return f, fs.ErrorIsFile
  490. }
  491. return f, nil
  492. }
  493. // rootSlash returns root with a slash on if it is empty, otherwise empty string
  494. func (f *Fs) rootSlash() string {
  495. if f.root == "" {
  496. return f.root
  497. }
  498. return f.root + "/"
  499. }
  500. // Return an Object from a path
  501. //
  502. // If it can't be found it returns the error fs.ErrorObjectNotFound.
  503. func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *api.Item) (fs.Object, error) {
  504. o := &Object{
  505. fs: f,
  506. remote: remote,
  507. }
  508. var err error
  509. if info != nil {
  510. // Set info
  511. err = o.setMetaData(info)
  512. } else {
  513. err = o.readMetaData(ctx) // reads info and meta, returning an error
  514. }
  515. if err != nil {
  516. return nil, err
  517. }
  518. return o, nil
  519. }
  520. // NewObject finds the Object at remote. If it can't be found
  521. // it returns the error fs.ErrorObjectNotFound.
  522. func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
  523. return f.newObjectWithInfo(ctx, remote, nil)
  524. }
  525. // FindLeaf finds a directory of name leaf in the folder with ID pathID
  526. func (f *Fs) FindLeaf(ctx context.Context, pathID, leaf string) (pathIDOut string, found bool, err error) {
  527. // Find the leaf in pathID
  528. found, err = f.listAll(ctx, pathID, true, false, true, func(item *api.Item) bool {
  529. if strings.EqualFold(item.Name, leaf) {
  530. pathIDOut = item.ID
  531. return true
  532. }
  533. return false
  534. })
  535. return pathIDOut, found, err
  536. }
  537. // fieldsValue creates a url.Values with fields set to those in api.Item
  538. func fieldsValue() url.Values {
  539. values := url.Values{}
  540. values.Set("fields", api.ItemFields)
  541. return values
  542. }
  543. // CreateDir makes a directory with pathID as parent and name leaf
  544. func (f *Fs) CreateDir(ctx context.Context, pathID, leaf string) (newID string, err error) {
  545. // fs.Debugf(f, "CreateDir(%q, %q)\n", pathID, leaf)
  546. var resp *http.Response
  547. var info *api.Item
  548. opts := rest.Opts{
  549. Method: "POST",
  550. Path: "/folders",
  551. Parameters: fieldsValue(),
  552. }
  553. mkdir := api.CreateFolder{
  554. Name: f.opt.Enc.FromStandardName(leaf),
  555. Parent: api.Parent{
  556. ID: pathID,
  557. },
  558. }
  559. err = f.pacer.Call(func() (bool, error) {
  560. resp, err = f.srv.CallJSON(ctx, &opts, &mkdir, &info)
  561. return shouldRetry(ctx, resp, err)
  562. })
  563. if err != nil {
  564. //fmt.Printf("...Error %v\n", err)
  565. return "", err
  566. }
  567. // fmt.Printf("...Id %q\n", *info.Id)
  568. return info.ID, nil
  569. }
  570. // list the objects into the function supplied
  571. //
  572. // If directories is set it only sends directories
  573. // User function to process a File item from listAll
  574. //
  575. // Should return true to finish processing
  576. type listAllFn func(*api.Item) bool
  577. // Lists the directory required calling the user function on each item found
  578. //
  579. // If the user fn ever returns true then it early exits with found = true
  580. func (f *Fs) listAll(ctx context.Context, dirID string, directoriesOnly bool, filesOnly bool, activeOnly bool, fn listAllFn) (found bool, err error) {
  581. opts := rest.Opts{
  582. Method: "GET",
  583. Path: "/folders/" + dirID + "/items",
  584. Parameters: fieldsValue(),
  585. }
  586. opts.Parameters.Set("limit", strconv.Itoa(f.opt.ListChunk))
  587. opts.Parameters.Set("usemarker", "true")
  588. var marker *string
  589. OUTER:
  590. for {
  591. if marker != nil {
  592. opts.Parameters.Set("marker", *marker)
  593. }
  594. var result api.FolderItems
  595. var resp *http.Response
  596. err = f.pacer.Call(func() (bool, error) {
  597. resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
  598. return shouldRetry(ctx, resp, err)
  599. })
  600. if err != nil {
  601. return found, fmt.Errorf("couldn't list files: %w", err)
  602. }
  603. for i := range result.Entries {
  604. item := &result.Entries[i]
  605. if item.Type == api.ItemTypeFolder {
  606. if filesOnly {
  607. continue
  608. }
  609. } else if item.Type == api.ItemTypeFile {
  610. if directoriesOnly {
  611. continue
  612. }
  613. } else {
  614. fs.Debugf(f, "Ignoring %q - unknown type %q", item.Name, item.Type)
  615. continue
  616. }
  617. if activeOnly && item.ItemStatus != api.ItemStatusActive {
  618. continue
  619. }
  620. if f.opt.OwnedBy != "" && f.opt.OwnedBy != item.OwnedBy.Login {
  621. continue
  622. }
  623. item.Name = f.opt.Enc.ToStandardName(item.Name)
  624. if fn(item) {
  625. found = true
  626. break OUTER
  627. }
  628. }
  629. marker = result.NextMarker
  630. if marker == nil {
  631. break
  632. }
  633. }
  634. return
  635. }
  636. // List the objects and directories in dir into entries. The
  637. // entries can be returned in any order but should be for a
  638. // complete directory.
  639. //
  640. // dir should be "" to list the root, and should not have
  641. // trailing slashes.
  642. //
  643. // This should return ErrDirNotFound if the directory isn't
  644. // found.
  645. func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
  646. directoryID, err := f.dirCache.FindDir(ctx, dir, false)
  647. if err != nil {
  648. return nil, err
  649. }
  650. var iErr error
  651. _, err = f.listAll(ctx, directoryID, false, false, true, func(info *api.Item) bool {
  652. remote := path.Join(dir, info.Name)
  653. if info.Type == api.ItemTypeFolder {
  654. // cache the directory ID for later lookups
  655. f.dirCache.Put(remote, info.ID)
  656. d := fs.NewDir(remote, info.ModTime()).SetID(info.ID)
  657. // FIXME more info from dir?
  658. entries = append(entries, d)
  659. } else if info.Type == api.ItemTypeFile {
  660. o, err := f.newObjectWithInfo(ctx, remote, info)
  661. if err != nil {
  662. iErr = err
  663. return true
  664. }
  665. entries = append(entries, o)
  666. }
  667. // Cache some metadata for this Item to help us process events later
  668. // on. In particular, the box event API does not provide the old path
  669. // of the Item when it is renamed/deleted/moved/etc.
  670. f.itemMetaCacheMu.Lock()
  671. cachedItemMeta, found := f.itemMetaCache[info.ID]
  672. if !found || cachedItemMeta.SequenceID < info.SequenceID {
  673. f.itemMetaCache[info.ID] = ItemMeta{SequenceID: info.SequenceID, ParentID: directoryID, Name: info.Name}
  674. }
  675. f.itemMetaCacheMu.Unlock()
  676. return false
  677. })
  678. if err != nil {
  679. return nil, err
  680. }
  681. if iErr != nil {
  682. return nil, iErr
  683. }
  684. return entries, nil
  685. }
  686. // Creates from the parameters passed in a half finished Object which
  687. // must have setMetaData called on it
  688. //
  689. // Returns the object, leaf, directoryID and error.
  690. //
  691. // Used to create new objects
  692. func (f *Fs) createObject(ctx context.Context, remote string, modTime time.Time, size int64) (o *Object, leaf string, directoryID string, err error) {
  693. // Create the directory for the object if it doesn't exist
  694. leaf, directoryID, err = f.dirCache.FindPath(ctx, remote, true)
  695. if err != nil {
  696. return
  697. }
  698. // Temporary Object under construction
  699. o = &Object{
  700. fs: f,
  701. remote: remote,
  702. }
  703. return o, leaf, directoryID, nil
  704. }
  705. // preUploadCheck checks to see if a file can be uploaded
  706. //
  707. // It returns "", nil if the file is good to go
  708. // It returns "ID", nil if the file must be updated
  709. func (f *Fs) preUploadCheck(ctx context.Context, leaf, directoryID string, size int64) (item *api.ItemMini, err error) {
  710. check := api.PreUploadCheck{
  711. Name: f.opt.Enc.FromStandardName(leaf),
  712. Parent: api.Parent{
  713. ID: directoryID,
  714. },
  715. }
  716. if size >= 0 {
  717. check.Size = &size
  718. }
  719. opts := rest.Opts{
  720. Method: "OPTIONS",
  721. Path: "/files/content/",
  722. }
  723. var result api.PreUploadCheckResponse
  724. var resp *http.Response
  725. err = f.pacer.Call(func() (bool, error) {
  726. resp, err = f.srv.CallJSON(ctx, &opts, &check, &result)
  727. return shouldRetry(ctx, resp, err)
  728. })
  729. if err != nil {
  730. if apiErr, ok := err.(*api.Error); ok && apiErr.Code == "item_name_in_use" {
  731. var conflict api.PreUploadCheckConflict
  732. err = json.Unmarshal(apiErr.ContextInfo, &conflict)
  733. if err != nil {
  734. return nil, fmt.Errorf("pre-upload check: JSON decode failed: %w", err)
  735. }
  736. if conflict.Conflicts.Type != api.ItemTypeFile {
  737. return nil, fs.ErrorIsDir
  738. }
  739. return &conflict.Conflicts, nil
  740. }
  741. return nil, fmt.Errorf("pre-upload check: %w", err)
  742. }
  743. return nil, nil
  744. }
  745. // Put the object
  746. //
  747. // Copy the reader in to the new object which is returned.
  748. //
  749. // The new object may have been created if an error is returned
  750. func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
  751. // If directory doesn't exist, file doesn't exist so can upload
  752. remote := src.Remote()
  753. leaf, directoryID, err := f.dirCache.FindPath(ctx, remote, false)
  754. if err != nil {
  755. if err == fs.ErrorDirNotFound {
  756. return f.PutUnchecked(ctx, in, src, options...)
  757. }
  758. return nil, err
  759. }
  760. // Preflight check the upload, which returns the ID if the
  761. // object already exists
  762. item, err := f.preUploadCheck(ctx, leaf, directoryID, src.Size())
  763. if err != nil {
  764. return nil, err
  765. }
  766. if item == nil {
  767. return f.PutUnchecked(ctx, in, src, options...)
  768. }
  769. // If object exists then create a skeleton one with just id
  770. o := &Object{
  771. fs: f,
  772. remote: remote,
  773. id: item.ID,
  774. }
  775. return o, o.Update(ctx, in, src, options...)
  776. }
  777. // PutStream uploads to the remote path with the modTime given of indeterminate size
  778. func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
  779. return f.Put(ctx, in, src, options...)
  780. }
  781. // PutUnchecked the object into the container
  782. //
  783. // This will produce an error if the object already exists.
  784. //
  785. // Copy the reader in to the new object which is returned.
  786. //
  787. // The new object may have been created if an error is returned
  788. func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
  789. remote := src.Remote()
  790. size := src.Size()
  791. modTime := src.ModTime(ctx)
  792. o, _, _, err := f.createObject(ctx, remote, modTime, size)
  793. if err != nil {
  794. return nil, err
  795. }
  796. return o, o.Update(ctx, in, src, options...)
  797. }
  798. // Mkdir creates the container if it doesn't exist
  799. func (f *Fs) Mkdir(ctx context.Context, dir string) error {
  800. _, err := f.dirCache.FindDir(ctx, dir, true)
  801. return err
  802. }
  803. // deleteObject removes an object by ID
  804. func (f *Fs) deleteObject(ctx context.Context, id string) error {
  805. opts := rest.Opts{
  806. Method: "DELETE",
  807. Path: "/files/" + id,
  808. NoResponse: true,
  809. }
  810. return f.pacer.Call(func() (bool, error) {
  811. resp, err := f.srv.Call(ctx, &opts)
  812. return shouldRetry(ctx, resp, err)
  813. })
  814. }
  815. // purgeCheck removes the root directory, if check is set then it
  816. // refuses to do so if it has anything in
  817. func (f *Fs) purgeCheck(ctx context.Context, dir string, check bool) error {
  818. root := path.Join(f.root, dir)
  819. if root == "" {
  820. return errors.New("can't purge root directory")
  821. }
  822. dc := f.dirCache
  823. rootID, err := dc.FindDir(ctx, dir, false)
  824. if err != nil {
  825. return err
  826. }
  827. opts := rest.Opts{
  828. Method: "DELETE",
  829. Path: "/folders/" + rootID,
  830. Parameters: url.Values{},
  831. NoResponse: true,
  832. }
  833. opts.Parameters.Set("recursive", strconv.FormatBool(!check))
  834. var resp *http.Response
  835. err = f.pacer.Call(func() (bool, error) {
  836. resp, err = f.srv.Call(ctx, &opts)
  837. return shouldRetry(ctx, resp, err)
  838. })
  839. if err != nil {
  840. return fmt.Errorf("rmdir failed: %w", err)
  841. }
  842. f.dirCache.FlushDir(dir)
  843. if err != nil {
  844. return err
  845. }
  846. return nil
  847. }
  848. // Rmdir deletes the root folder
  849. //
  850. // Returns an error if it isn't empty
  851. func (f *Fs) Rmdir(ctx context.Context, dir string) error {
  852. return f.purgeCheck(ctx, dir, true)
  853. }
  854. // Precision return the precision of this Fs
  855. func (f *Fs) Precision() time.Duration {
  856. return time.Second
  857. }
  858. // Copy src to this remote using server-side copy operations.
  859. //
  860. // This is stored with the remote path given.
  861. //
  862. // It returns the destination Object and a possible error.
  863. //
  864. // Will only be called if src.Fs().Name() == f.Name()
  865. //
  866. // If it isn't possible then return fs.ErrorCantCopy
  867. func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
  868. srcObj, ok := src.(*Object)
  869. if !ok {
  870. fs.Debugf(src, "Can't copy - not same remote type")
  871. return nil, fs.ErrorCantCopy
  872. }
  873. err := srcObj.readMetaData(ctx)
  874. if err != nil {
  875. return nil, err
  876. }
  877. srcPath := srcObj.fs.rootSlash() + srcObj.remote
  878. dstPath := f.rootSlash() + remote
  879. if strings.EqualFold(srcPath, dstPath) {
  880. return nil, fmt.Errorf("can't copy %q -> %q as are same name when lowercase", srcPath, dstPath)
  881. }
  882. // Create temporary object
  883. dstObj, leaf, directoryID, err := f.createObject(ctx, remote, srcObj.modTime, srcObj.size)
  884. if err != nil {
  885. return nil, err
  886. }
  887. // Copy the object
  888. opts := rest.Opts{
  889. Method: "POST",
  890. Path: "/files/" + srcObj.id + "/copy",
  891. Parameters: fieldsValue(),
  892. }
  893. copyFile := api.CopyFile{
  894. Name: f.opt.Enc.FromStandardName(leaf),
  895. Parent: api.Parent{
  896. ID: directoryID,
  897. },
  898. }
  899. var resp *http.Response
  900. var info *api.Item
  901. err = f.pacer.Call(func() (bool, error) {
  902. resp, err = f.srv.CallJSON(ctx, &opts, &copyFile, &info)
  903. return shouldRetry(ctx, resp, err)
  904. })
  905. if err != nil {
  906. return nil, err
  907. }
  908. err = dstObj.setMetaData(info)
  909. if err != nil {
  910. return nil, err
  911. }
  912. return dstObj, nil
  913. }
  914. // Purge deletes all the files and the container
  915. //
  916. // Optional interface: Only implement this if you have a way of
  917. // deleting all the files quicker than just running Remove() on the
  918. // result of List()
  919. func (f *Fs) Purge(ctx context.Context, dir string) error {
  920. return f.purgeCheck(ctx, dir, false)
  921. }
  922. // move a file or folder
  923. func (f *Fs) move(ctx context.Context, endpoint, id, leaf, directoryID string) (info *api.Item, err error) {
  924. // Move the object
  925. opts := rest.Opts{
  926. Method: "PUT",
  927. Path: endpoint + id,
  928. Parameters: fieldsValue(),
  929. }
  930. move := api.UpdateFileMove{
  931. Name: f.opt.Enc.FromStandardName(leaf),
  932. Parent: api.Parent{
  933. ID: directoryID,
  934. },
  935. }
  936. var resp *http.Response
  937. err = f.pacer.Call(func() (bool, error) {
  938. resp, err = f.srv.CallJSON(ctx, &opts, &move, &info)
  939. return shouldRetry(ctx, resp, err)
  940. })
  941. if err != nil {
  942. return nil, err
  943. }
  944. return info, nil
  945. }
  946. // About gets quota information
  947. func (f *Fs) About(ctx context.Context) (usage *fs.Usage, err error) {
  948. opts := rest.Opts{
  949. Method: "GET",
  950. Path: "/users/me",
  951. }
  952. var user api.User
  953. var resp *http.Response
  954. err = f.pacer.Call(func() (bool, error) {
  955. resp, err = f.srv.CallJSON(ctx, &opts, nil, &user)
  956. return shouldRetry(ctx, resp, err)
  957. })
  958. if err != nil {
  959. return nil, fmt.Errorf("failed to read user info: %w", err)
  960. }
  961. // FIXME max upload size would be useful to use in Update
  962. usage = &fs.Usage{
  963. Used: fs.NewUsageValue(user.SpaceUsed), // bytes in use
  964. Total: fs.NewUsageValue(user.SpaceAmount), // bytes total
  965. Free: fs.NewUsageValue(user.SpaceAmount - user.SpaceUsed), // bytes free
  966. }
  967. return usage, nil
  968. }
  969. // Move src to this remote using server-side move operations.
  970. //
  971. // This is stored with the remote path given.
  972. //
  973. // It returns the destination Object and a possible error.
  974. //
  975. // Will only be called if src.Fs().Name() == f.Name()
  976. //
  977. // If it isn't possible then return fs.ErrorCantMove
  978. func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
  979. srcObj, ok := src.(*Object)
  980. if !ok {
  981. fs.Debugf(src, "Can't move - not same remote type")
  982. return nil, fs.ErrorCantMove
  983. }
  984. // Create temporary object
  985. dstObj, leaf, directoryID, err := f.createObject(ctx, remote, srcObj.modTime, srcObj.size)
  986. if err != nil {
  987. return nil, err
  988. }
  989. // Do the move
  990. info, err := f.move(ctx, "/files/", srcObj.id, leaf, directoryID)
  991. if err != nil {
  992. return nil, err
  993. }
  994. err = dstObj.setMetaData(info)
  995. if err != nil {
  996. return nil, err
  997. }
  998. return dstObj, nil
  999. }
  1000. // DirMove moves src, srcRemote to this remote at dstRemote
  1001. // using server-side move operations.
  1002. //
  1003. // Will only be called if src.Fs().Name() == f.Name()
  1004. //
  1005. // If it isn't possible then return fs.ErrorCantDirMove
  1006. //
  1007. // If destination exists then return fs.ErrorDirExists
  1008. func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error {
  1009. srcFs, ok := src.(*Fs)
  1010. if !ok {
  1011. fs.Debugf(srcFs, "Can't move directory - not same remote type")
  1012. return fs.ErrorCantDirMove
  1013. }
  1014. srcID, _, _, dstDirectoryID, dstLeaf, err := f.dirCache.DirMove(ctx, srcFs.dirCache, srcFs.root, srcRemote, f.root, dstRemote)
  1015. if err != nil {
  1016. return err
  1017. }
  1018. // Do the move
  1019. _, err = f.move(ctx, "/folders/", srcID, dstLeaf, dstDirectoryID)
  1020. if err != nil {
  1021. return err
  1022. }
  1023. srcFs.dirCache.FlushDir(srcRemote)
  1024. return nil
  1025. }
  1026. // PublicLink adds a "readable by anyone with link" permission on the given file or folder.
  1027. func (f *Fs) PublicLink(ctx context.Context, remote string, expire fs.Duration, unlink bool) (string, error) {
  1028. id, err := f.dirCache.FindDir(ctx, remote, false)
  1029. var opts rest.Opts
  1030. if err == nil {
  1031. fs.Debugf(f, "attempting to share directory '%s'", remote)
  1032. opts = rest.Opts{
  1033. Method: "PUT",
  1034. Path: "/folders/" + id,
  1035. Parameters: fieldsValue(),
  1036. }
  1037. } else {
  1038. fs.Debugf(f, "attempting to share single file '%s'", remote)
  1039. o, err := f.NewObject(ctx, remote)
  1040. if err != nil {
  1041. return "", err
  1042. }
  1043. if o.(*Object).publicLink != "" {
  1044. return o.(*Object).publicLink, nil
  1045. }
  1046. opts = rest.Opts{
  1047. Method: "PUT",
  1048. Path: "/files/" + o.(*Object).id,
  1049. Parameters: fieldsValue(),
  1050. }
  1051. }
  1052. shareLink := api.CreateSharedLink{}
  1053. var info api.Item
  1054. var resp *http.Response
  1055. err = f.pacer.Call(func() (bool, error) {
  1056. resp, err = f.srv.CallJSON(ctx, &opts, &shareLink, &info)
  1057. return shouldRetry(ctx, resp, err)
  1058. })
  1059. return info.SharedLink.URL, err
  1060. }
  1061. // deletePermanently permanently deletes a trashed file
  1062. func (f *Fs) deletePermanently(ctx context.Context, itemType, id string) error {
  1063. opts := rest.Opts{
  1064. Method: "DELETE",
  1065. NoResponse: true,
  1066. }
  1067. if itemType == api.ItemTypeFile {
  1068. opts.Path = "/files/" + id + "/trash"
  1069. } else {
  1070. opts.Path = "/folders/" + id + "/trash"
  1071. }
  1072. return f.pacer.Call(func() (bool, error) {
  1073. resp, err := f.srv.Call(ctx, &opts)
  1074. return shouldRetry(ctx, resp, err)
  1075. })
  1076. }
  1077. // CleanUp empties the trash
  1078. func (f *Fs) CleanUp(ctx context.Context) (err error) {
  1079. var (
  1080. deleteErrors atomic.Uint64
  1081. concurrencyControl = make(chan struct{}, fs.GetConfig(ctx).Checkers)
  1082. wg sync.WaitGroup
  1083. )
  1084. _, err = f.listAll(ctx, "trash", false, false, false, func(item *api.Item) bool {
  1085. if item.Type == api.ItemTypeFolder || item.Type == api.ItemTypeFile {
  1086. wg.Add(1)
  1087. concurrencyControl <- struct{}{}
  1088. go func() {
  1089. defer func() {
  1090. <-concurrencyControl
  1091. wg.Done()
  1092. }()
  1093. err := f.deletePermanently(ctx, item.Type, item.ID)
  1094. if err != nil {
  1095. fs.Errorf(f, "failed to delete trash item %q (%q): %v", item.Name, item.ID, err)
  1096. deleteErrors.Add(1)
  1097. }
  1098. }()
  1099. } else {
  1100. fs.Debugf(f, "Ignoring %q - unknown type %q", item.Name, item.Type)
  1101. }
  1102. return false
  1103. })
  1104. wg.Wait()
  1105. if deleteErrors.Load() != 0 {
  1106. return fmt.Errorf("failed to delete %d trash items", deleteErrors.Load())
  1107. }
  1108. return err
  1109. }
  1110. // Shutdown shutdown the fs
  1111. func (f *Fs) Shutdown(ctx context.Context) error {
  1112. f.tokenRenewer.Shutdown()
  1113. return nil
  1114. }
  1115. // ChangeNotify calls the passed function with a path that has had changes.
  1116. // If the implementation uses polling, it should adhere to the given interval.
  1117. //
  1118. // Automatically restarts itself in case of unexpected behavior of the remote.
  1119. //
  1120. // Close the returned channel to stop being notified.
  1121. func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) {
  1122. go func() {
  1123. // get the `stream_position` early so all changes from now on get processed
  1124. streamPosition, err := f.changeNotifyStreamPosition(ctx)
  1125. if err != nil {
  1126. fs.Infof(f, "Failed to get StreamPosition: %s", err)
  1127. }
  1128. // box can send duplicate Event IDs. Use this map to track and filter
  1129. // the ones we've already processed.
  1130. processedEventIDs := make(map[string]time.Time)
  1131. var ticker *time.Ticker
  1132. var tickerC <-chan time.Time
  1133. for {
  1134. select {
  1135. case pollInterval, ok := <-pollIntervalChan:
  1136. if !ok {
  1137. if ticker != nil {
  1138. ticker.Stop()
  1139. }
  1140. return
  1141. }
  1142. if ticker != nil {
  1143. ticker.Stop()
  1144. ticker, tickerC = nil, nil
  1145. }
  1146. if pollInterval != 0 {
  1147. ticker = time.NewTicker(pollInterval)
  1148. tickerC = ticker.C
  1149. }
  1150. case <-tickerC:
  1151. if streamPosition == "" {
  1152. streamPosition, err = f.changeNotifyStreamPosition(ctx)
  1153. if err != nil {
  1154. fs.Infof(f, "Failed to get StreamPosition: %s", err)
  1155. continue
  1156. }
  1157. }
  1158. // Garbage collect EventIDs older than 1 minute
  1159. for eventID, timestamp := range processedEventIDs {
  1160. if time.Since(timestamp) > time.Minute {
  1161. delete(processedEventIDs, eventID)
  1162. }
  1163. }
  1164. streamPosition, err = f.changeNotifyRunner(ctx, notifyFunc, streamPosition, processedEventIDs)
  1165. if err != nil {
  1166. fs.Infof(f, "Change notify listener failure: %s", err)
  1167. }
  1168. }
  1169. }
  1170. }()
  1171. }
  1172. func (f *Fs) changeNotifyStreamPosition(ctx context.Context) (streamPosition string, err error) {
  1173. opts := rest.Opts{
  1174. Method: "GET",
  1175. Path: "/events",
  1176. Parameters: fieldsValue(),
  1177. }
  1178. opts.Parameters.Set("stream_position", "now")
  1179. opts.Parameters.Set("stream_type", "changes")
  1180. var result api.Events
  1181. var resp *http.Response
  1182. err = f.pacer.Call(func() (bool, error) {
  1183. resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
  1184. return shouldRetry(ctx, resp, err)
  1185. })
  1186. if err != nil {
  1187. return "", err
  1188. }
  1189. return strconv.FormatInt(result.NextStreamPosition, 10), nil
  1190. }
  1191. // Attempts to construct the full path for an object, given the ID of its
  1192. // parent directory and the name of the object.
  1193. //
  1194. // Can return "" if the parentID is not currently in the directory cache.
  1195. func (f *Fs) getFullPath(parentID string, childName string) (fullPath string) {
  1196. fullPath = ""
  1197. name := f.opt.Enc.ToStandardName(childName)
  1198. if parentID != "" {
  1199. if parentDir, ok := f.dirCache.GetInv(parentID); ok {
  1200. if len(parentDir) > 0 {
  1201. fullPath = parentDir + "/" + name
  1202. } else {
  1203. fullPath = name
  1204. }
  1205. }
  1206. } else {
  1207. // No parent, this object is at the root
  1208. fullPath = name
  1209. }
  1210. return fullPath
  1211. }
  1212. func (f *Fs) changeNotifyRunner(ctx context.Context, notifyFunc func(string, fs.EntryType), streamPosition string, processedEventIDs map[string]time.Time) (nextStreamPosition string, err error) {
  1213. nextStreamPosition = streamPosition
  1214. for {
  1215. limit := f.opt.ListChunk
  1216. // box only allows a max of 500 events
  1217. if limit > 500 {
  1218. limit = 500
  1219. }
  1220. opts := rest.Opts{
  1221. Method: "GET",
  1222. Path: "/events",
  1223. Parameters: fieldsValue(),
  1224. }
  1225. opts.Parameters.Set("stream_position", nextStreamPosition)
  1226. opts.Parameters.Set("stream_type", "changes")
  1227. opts.Parameters.Set("limit", strconv.Itoa(limit))
  1228. var result api.Events
  1229. var resp *http.Response
  1230. fs.Debugf(f, "Checking for changes on remote (next_stream_position: %q)", nextStreamPosition)
  1231. err = f.pacer.Call(func() (bool, error) {
  1232. resp, err = f.srv.CallJSON(ctx, &opts, nil, &result)
  1233. return shouldRetry(ctx, resp, err)
  1234. })
  1235. if err != nil {
  1236. return "", err
  1237. }
  1238. if result.ChunkSize != int64(len(result.Entries)) {
  1239. return "", fmt.Errorf("invalid response to event request, chunk_size (%v) not equal to number of entries (%v)", result.ChunkSize, len(result.Entries))
  1240. }
  1241. nextStreamPosition = strconv.FormatInt(result.NextStreamPosition, 10)
  1242. if result.ChunkSize == 0 {
  1243. return nextStreamPosition, nil
  1244. }
  1245. type pathToClear struct {
  1246. path string
  1247. entryType fs.EntryType
  1248. }
  1249. var pathsToClear []pathToClear
  1250. newEventIDs := 0
  1251. for _, entry := range result.Entries {
  1252. eventDetails := fmt.Sprintf("[%q(%d)|%s|%s|%s|%s]", entry.Source.Name, entry.Source.SequenceID,
  1253. entry.Source.Type, entry.EventType, entry.Source.ID, entry.EventID)
  1254. if entry.EventID == "" {
  1255. fs.Debugf(f, "%s ignored due to missing EventID", eventDetails)
  1256. continue
  1257. }
  1258. if _, ok := processedEventIDs[entry.EventID]; ok {
  1259. fs.Debugf(f, "%s ignored due to duplicate EventID", eventDetails)
  1260. continue
  1261. }
  1262. processedEventIDs[entry.EventID] = time.Now()
  1263. newEventIDs++
  1264. if entry.Source.ID == "" { // missing File or Folder ID
  1265. fs.Debugf(f, "%s ignored due to missing SourceID", eventDetails)
  1266. continue
  1267. }
  1268. if entry.Source.Type != api.ItemTypeFile && entry.Source.Type != api.ItemTypeFolder { // event is not for a file or folder
  1269. fs.Debugf(f, "%s ignored due to unsupported SourceType", eventDetails)
  1270. continue
  1271. }
  1272. // Only interested in event types that result in a file tree change
  1273. if _, found := api.FileTreeChangeEventTypes[entry.EventType]; !found {
  1274. fs.Debugf(f, "%s ignored due to unsupported EventType", eventDetails)
  1275. continue
  1276. }
  1277. f.itemMetaCacheMu.Lock()
  1278. itemMeta, cachedItemMetaFound := f.itemMetaCache[entry.Source.ID]
  1279. if cachedItemMetaFound {
  1280. if itemMeta.SequenceID >= entry.Source.SequenceID {
  1281. // Item in the cache has the same or newer SequenceID than
  1282. // this event. Ignore this event, it must be old.
  1283. f.itemMetaCacheMu.Unlock()
  1284. fs.Debugf(f, "%s ignored due to old SequenceID (%q)", eventDetails, itemMeta.SequenceID)
  1285. continue
  1286. }
  1287. // This event is newer. Delete its entry from the cache,
  1288. // we'll notify about its change below, then it's up to a
  1289. // future list operation to repopulate the cache.
  1290. delete(f.itemMetaCache, entry.Source.ID)
  1291. }
  1292. f.itemMetaCacheMu.Unlock()
  1293. entryType := fs.EntryDirectory
  1294. if entry.Source.Type == api.ItemTypeFile {
  1295. entryType = fs.EntryObject
  1296. }
  1297. // The box event only includes the new path for the object (e.g.
  1298. // the path after the object was moved). If there was an old path
  1299. // saved in our cache, it must be cleared.
  1300. if cachedItemMetaFound {
  1301. path := f.getFullPath(itemMeta.ParentID, itemMeta.Name)
  1302. if path != "" {
  1303. fs.Debugf(f, "%s added old path (%q) for notify", eventDetails, path)
  1304. pathsToClear = append(pathsToClear, pathToClear{path: path, entryType: entryType})
  1305. } else {
  1306. fs.Debugf(f, "%s old parent not cached", eventDetails)
  1307. }
  1308. // If this is a directory, also delete it from the dir cache.
  1309. // This will effectively invalidate the item metadata cache
  1310. // entries for all descendents of this directory, since we
  1311. // will no longer be able to construct a full path for them.
  1312. // This is exactly what we want, since we don't want to notify
  1313. // on the paths of these descendents if one of their ancestors
  1314. // has been renamed/deleted.
  1315. if entry.Source.Type == api.ItemTypeFolder {
  1316. f.dirCache.FlushDir(path)
  1317. }
  1318. }
  1319. // If the item is "active", then it is not trashed or deleted, so
  1320. // it potentially has a valid parent.
  1321. //
  1322. // Construct the new path of the object, based on the Parent ID
  1323. // and its name. If we get an empty result, it means we don't
  1324. // currently know about this object so notification is unnecessary.
  1325. if entry.Source.ItemStatus == api.ItemStatusActive {
  1326. path := f.getFullPath(entry.Source.Parent.ID, entry.Source.Name)
  1327. if path != "" {
  1328. fs.Debugf(f, "%s added new path (%q) for notify", eventDetails, path)
  1329. pathsToClear = append(pathsToClear, pathToClear{path: path, entryType: entryType})
  1330. } else {
  1331. fs.Debugf(f, "%s new parent not found", eventDetails)
  1332. }
  1333. }
  1334. }
  1335. // box can sometimes repeatedly return the same Event IDs within a
  1336. // short period of time. If it stops giving us new ones, treat it
  1337. // the same as if it returned us none at all.
  1338. if newEventIDs == 0 {
  1339. return nextStreamPosition, nil
  1340. }
  1341. notifiedPaths := make(map[string]bool)
  1342. for _, p := range pathsToClear {
  1343. if _, ok := notifiedPaths[p.path]; ok {
  1344. continue
  1345. }
  1346. notifiedPaths[p.path] = true
  1347. notifyFunc(p.path, p.entryType)
  1348. }
  1349. fs.Debugf(f, "Received %v events, resulting in %v paths and %v notifications", len(result.Entries), len(pathsToClear), len(notifiedPaths))
  1350. }
  1351. }
  1352. // DirCacheFlush resets the directory cache - used in testing as an
  1353. // optional interface
  1354. func (f *Fs) DirCacheFlush() {
  1355. f.dirCache.ResetRoot()
  1356. }
  1357. // Hashes returns the supported hash sets.
  1358. func (f *Fs) Hashes() hash.Set {
  1359. return hash.Set(hash.SHA1)
  1360. }
  1361. // ------------------------------------------------------------
  1362. // Fs returns the parent Fs
  1363. func (o *Object) Fs() fs.Info {
  1364. return o.fs
  1365. }
  1366. // Return a string version
  1367. func (o *Object) String() string {
  1368. if o == nil {
  1369. return "<nil>"
  1370. }
  1371. return o.remote
  1372. }
  1373. // Remote returns the remote path
  1374. func (o *Object) Remote() string {
  1375. return o.remote
  1376. }
  1377. // Hash returns the SHA-1 of an object returning a lowercase hex string
  1378. func (o *Object) Hash(ctx context.Context, t hash.Type) (string, error) {
  1379. if t != hash.SHA1 {
  1380. return "", hash.ErrUnsupported
  1381. }
  1382. return o.sha1, nil
  1383. }
  1384. // Size returns the size of an object in bytes
  1385. func (o *Object) Size() int64 {
  1386. err := o.readMetaData(context.TODO())
  1387. if err != nil {
  1388. fs.Logf(o, "Failed to read metadata: %v", err)
  1389. return 0
  1390. }
  1391. return o.size
  1392. }
  1393. // setMetaData sets the metadata from info
  1394. func (o *Object) setMetaData(info *api.Item) (err error) {
  1395. if info.Type == api.ItemTypeFolder {
  1396. return fs.ErrorIsDir
  1397. }
  1398. if info.Type != api.ItemTypeFile {
  1399. return fmt.Errorf("%q is %q: %w", o.remote, info.Type, fs.ErrorNotAFile)
  1400. }
  1401. o.hasMetaData = true
  1402. o.size = int64(info.Size)
  1403. o.sha1 = info.SHA1
  1404. o.modTime = info.ModTime()
  1405. o.id = info.ID
  1406. o.publicLink = info.SharedLink.URL
  1407. return nil
  1408. }
  1409. // readMetaData gets the metadata if it hasn't already been fetched
  1410. //
  1411. // it also sets the info
  1412. func (o *Object) readMetaData(ctx context.Context) (err error) {
  1413. if o.hasMetaData {
  1414. return nil
  1415. }
  1416. info, err := o.fs.readMetaDataForPath(ctx, o.remote)
  1417. if err != nil {
  1418. if apiErr, ok := err.(*api.Error); ok {
  1419. if apiErr.Code == "not_found" || apiErr.Code == "trashed" {
  1420. return fs.ErrorObjectNotFound
  1421. }
  1422. }
  1423. return err
  1424. }
  1425. return o.setMetaData(info)
  1426. }
  1427. // ModTime returns the modification time of the object
  1428. //
  1429. // It attempts to read the objects mtime and if that isn't present the
  1430. // LastModified returned in the http headers
  1431. func (o *Object) ModTime(ctx context.Context) time.Time {
  1432. err := o.readMetaData(ctx)
  1433. if err != nil {
  1434. fs.Logf(o, "Failed to read metadata: %v", err)
  1435. return time.Now()
  1436. }
  1437. return o.modTime
  1438. }
  1439. // setModTime sets the modification time of the local fs object
  1440. func (o *Object) setModTime(ctx context.Context, modTime time.Time) (*api.Item, error) {
  1441. opts := rest.Opts{
  1442. Method: "PUT",
  1443. Path: "/files/" + o.id,
  1444. Parameters: fieldsValue(),
  1445. }
  1446. update := api.UpdateFileModTime{
  1447. ContentModifiedAt: api.Time(modTime),
  1448. }
  1449. var info *api.Item
  1450. err := o.fs.pacer.Call(func() (bool, error) {
  1451. resp, err := o.fs.srv.CallJSON(ctx, &opts, &update, &info)
  1452. return shouldRetry(ctx, resp, err)
  1453. })
  1454. return info, err
  1455. }
  1456. // SetModTime sets the modification time of the local fs object
  1457. func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
  1458. info, err := o.setModTime(ctx, modTime)
  1459. if err != nil {
  1460. return err
  1461. }
  1462. return o.setMetaData(info)
  1463. }
  1464. // Storable returns a boolean showing whether this object storable
  1465. func (o *Object) Storable() bool {
  1466. return true
  1467. }
  1468. // Open an object for read
  1469. func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) {
  1470. if o.id == "" {
  1471. return nil, errors.New("can't download - no id")
  1472. }
  1473. fs.FixRangeOption(options, o.size)
  1474. var resp *http.Response
  1475. opts := rest.Opts{
  1476. Method: "GET",
  1477. Path: "/files/" + o.id + "/content",
  1478. Options: options,
  1479. }
  1480. err = o.fs.pacer.Call(func() (bool, error) {
  1481. resp, err = o.fs.srv.Call(ctx, &opts)
  1482. return shouldRetry(ctx, resp, err)
  1483. })
  1484. if err != nil {
  1485. return nil, err
  1486. }
  1487. return resp.Body, err
  1488. }
  1489. // upload does a single non-multipart upload
  1490. //
  1491. // This is recommended for less than 50 MiB of content
  1492. func (o *Object) upload(ctx context.Context, in io.Reader, leaf, directoryID string, modTime time.Time, options ...fs.OpenOption) (err error) {
  1493. upload := api.UploadFile{
  1494. Name: o.fs.opt.Enc.FromStandardName(leaf),
  1495. ContentModifiedAt: api.Time(modTime),
  1496. ContentCreatedAt: api.Time(modTime),
  1497. Parent: api.Parent{
  1498. ID: directoryID,
  1499. },
  1500. }
  1501. var resp *http.Response
  1502. var result api.FolderItems
  1503. opts := rest.Opts{
  1504. Method: "POST",
  1505. Body: in,
  1506. MultipartMetadataName: "attributes",
  1507. MultipartContentName: "contents",
  1508. MultipartFileName: upload.Name,
  1509. RootURL: uploadURL,
  1510. Options: options,
  1511. }
  1512. // If object has an ID then it is existing so create a new version
  1513. if o.id != "" {
  1514. opts.Path = "/files/" + o.id + "/content"
  1515. } else {
  1516. opts.Path = "/files/content"
  1517. }
  1518. err = o.fs.pacer.CallNoRetry(func() (bool, error) {
  1519. resp, err = o.fs.srv.CallJSON(ctx, &opts, &upload, &result)
  1520. return shouldRetry(ctx, resp, err)
  1521. })
  1522. if err != nil {
  1523. return err
  1524. }
  1525. if result.TotalCount != 1 || len(result.Entries) != 1 {
  1526. return fmt.Errorf("failed to upload %v - not sure why", o)
  1527. }
  1528. return o.setMetaData(&result.Entries[0])
  1529. }
  1530. // Update the object with the contents of the io.Reader, modTime and size
  1531. //
  1532. // If existing is set then it updates the object rather than creating a new one.
  1533. //
  1534. // The new object may have been created if an error is returned.
  1535. func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
  1536. if o.fs.tokenRenewer != nil {
  1537. o.fs.tokenRenewer.Start()
  1538. defer o.fs.tokenRenewer.Stop()
  1539. }
  1540. size := src.Size()
  1541. modTime := src.ModTime(ctx)
  1542. remote := o.Remote()
  1543. // Create the directory for the object if it doesn't exist
  1544. leaf, directoryID, err := o.fs.dirCache.FindPath(ctx, remote, true)
  1545. if err != nil {
  1546. return err
  1547. }
  1548. // Upload with simple or multipart
  1549. if size <= int64(o.fs.opt.UploadCutoff) {
  1550. err = o.upload(ctx, in, leaf, directoryID, modTime, options...)
  1551. } else {
  1552. err = o.uploadMultipart(ctx, in, leaf, directoryID, size, modTime, options...)
  1553. }
  1554. return err
  1555. }
  1556. // Remove an object
  1557. func (o *Object) Remove(ctx context.Context) error {
  1558. return o.fs.deleteObject(ctx, o.id)
  1559. }
  1560. // ID returns the ID of the Object if known, or "" if not
  1561. func (o *Object) ID() string {
  1562. return o.id
  1563. }
  1564. // Check the interfaces are satisfied
  1565. var (
  1566. _ fs.Fs = (*Fs)(nil)
  1567. _ fs.Purger = (*Fs)(nil)
  1568. _ fs.PutStreamer = (*Fs)(nil)
  1569. _ fs.Copier = (*Fs)(nil)
  1570. _ fs.Abouter = (*Fs)(nil)
  1571. _ fs.Mover = (*Fs)(nil)
  1572. _ fs.DirMover = (*Fs)(nil)
  1573. _ fs.DirCacheFlusher = (*Fs)(nil)
  1574. _ fs.PublicLinker = (*Fs)(nil)
  1575. _ fs.CleanUpper = (*Fs)(nil)
  1576. _ fs.Shutdowner = (*Fs)(nil)
  1577. _ fs.Object = (*Object)(nil)
  1578. _ fs.IDer = (*Object)(nil)
  1579. )