12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229 |
- // Package mega provides an interface to the Mega
- // object storage system.
- package mega
- /*
- Open questions
- * Does mega support a content hash - what exactly are the mega hashes?
- * Can mega support setting modification times?
- Improvements:
- * Uploads could be done in parallel
- * Downloads would be more efficient done in one go
- * Uploads would be more efficient with bigger chunks
- * Looks like mega can support server-side copy, but it isn't implemented in go-mega
- * Upload can set modtime... - set as int64_t - can set ctime and mtime?
- */
- import (
- "context"
- "errors"
- "fmt"
- "io"
- "path"
- "strings"
- "sync"
- "time"
- "github.com/rclone/rclone/fs"
- "github.com/rclone/rclone/fs/config"
- "github.com/rclone/rclone/fs/config/configmap"
- "github.com/rclone/rclone/fs/config/configstruct"
- "github.com/rclone/rclone/fs/config/obscure"
- "github.com/rclone/rclone/fs/fserrors"
- "github.com/rclone/rclone/fs/fshttp"
- "github.com/rclone/rclone/fs/hash"
- "github.com/rclone/rclone/lib/encoder"
- "github.com/rclone/rclone/lib/pacer"
- "github.com/rclone/rclone/lib/readers"
- mega "github.com/t3rm1n4l/go-mega"
- )
- const (
- minSleep = 10 * time.Millisecond
- maxSleep = 2 * time.Second
- eventWaitTime = 500 * time.Millisecond
- decayConstant = 2 // bigger for slower decay, exponential
- )
- var (
- megaCacheMu sync.Mutex // mutex for the below
- megaCache = map[string]*mega.Mega{} // cache logged in Mega's by user
- )
- // Register with Fs
- func init() {
- fs.Register(&fs.RegInfo{
- Name: "mega",
- Description: "Mega",
- NewFs: NewFs,
- Options: []fs.Option{{
- Name: "user",
- Help: "User name.",
- Required: true,
- Sensitive: true,
- }, {
- Name: "pass",
- Help: "Password.",
- Required: true,
- IsPassword: true,
- }, {
- Name: "debug",
- Help: `Output more debug from Mega.
- If this flag is set (along with -vv) it will print further debugging
- information from the mega backend.`,
- Default: false,
- Advanced: true,
- }, {
- Name: "hard_delete",
- Help: `Delete files permanently rather than putting them into the trash.
- Normally the mega backend will put all deletions into the trash rather
- than permanently deleting them. If you specify this then rclone will
- permanently delete objects instead.`,
- Default: false,
- Advanced: true,
- }, {
- Name: "use_https",
- Help: `Use HTTPS for transfers.
- MEGA uses plain text HTTP connections by default.
- Some ISPs throttle HTTP connections, this causes transfers to become very slow.
- Enabling this will force MEGA to use HTTPS for all transfers.
- HTTPS is normally not necessary since all data is already encrypted anyway.
- Enabling it will increase CPU usage and add network overhead.`,
- Default: false,
- Advanced: true,
- }, {
- Name: config.ConfigEncoding,
- Help: config.ConfigEncodingHelp,
- Advanced: true,
- // Encode invalid UTF-8 bytes as json doesn't handle them properly.
- Default: (encoder.Base |
- encoder.EncodeInvalidUtf8),
- }},
- })
- }
- // Options defines the configuration for this backend
- type Options struct {
- User string `config:"user"`
- Pass string `config:"pass"`
- Debug bool `config:"debug"`
- HardDelete bool `config:"hard_delete"`
- UseHTTPS bool `config:"use_https"`
- Enc encoder.MultiEncoder `config:"encoding"`
- }
- // Fs represents a remote mega
- type Fs struct {
- name string // name of this remote
- root string // the path we are working on
- opt Options // parsed config options
- features *fs.Features // optional features
- srv *mega.Mega // the connection to the server
- pacer *fs.Pacer // pacer for API calls
- rootNodeMu sync.Mutex // mutex for _rootNode
- _rootNode *mega.Node // root node - call findRoot to use this
- mkdirMu sync.Mutex // used to serialize calls to mkdir / rmdir
- }
- // Object describes a mega object
- //
- // Will definitely have info but maybe not meta.
- //
- // Normally rclone would just store an ID here but go-mega and mega.nz
- // expect you to build an entire tree of all the objects in memory.
- // In this case we just store a pointer to the object.
- type Object struct {
- fs *Fs // what this object is part of
- remote string // The remote path
- info *mega.Node // pointer to the mega node
- }
- // ------------------------------------------------------------
- // Name of the remote (as passed into NewFs)
- func (f *Fs) Name() string {
- return f.name
- }
- // Root of the remote (as passed into NewFs)
- func (f *Fs) Root() string {
- return f.root
- }
- // String converts this Fs to a string
- func (f *Fs) String() string {
- return fmt.Sprintf("mega root '%s'", f.root)
- }
- // Features returns the optional features of this Fs
- func (f *Fs) Features() *fs.Features {
- return f.features
- }
- // parsePath parses a mega 'url'
- func parsePath(path string) (root string) {
- root = strings.Trim(path, "/")
- return
- }
- // shouldRetry returns a boolean as to whether this err deserves to be
- // retried. It returns the err as a convenience
- func shouldRetry(ctx context.Context, err error) (bool, error) {
- if fserrors.ContextError(ctx, &err) {
- return false, err
- }
- // Let the mega library handle the low level retries
- return false, err
- }
- // readMetaDataForPath reads the metadata from the path
- func (f *Fs) readMetaDataForPath(ctx context.Context, remote string) (info *mega.Node, err error) {
- rootNode, err := f.findRoot(ctx, false)
- if err != nil {
- return nil, err
- }
- return f.findObject(rootNode, remote)
- }
- // NewFs constructs an Fs from the path, container:path
- func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, error) {
- // Parse config into Options struct
- opt := new(Options)
- err := configstruct.Set(m, opt)
- if err != nil {
- return nil, err
- }
- if opt.Pass != "" {
- var err error
- opt.Pass, err = obscure.Reveal(opt.Pass)
- if err != nil {
- return nil, fmt.Errorf("couldn't decrypt password: %w", err)
- }
- }
- ci := fs.GetConfig(ctx)
- // cache *mega.Mega on username so we can reuse and share
- // them between remotes. They are expensive to make as they
- // contain all the objects and sharing the objects makes the
- // move code easier as we don't have to worry about mixing
- // them up between different remotes.
- megaCacheMu.Lock()
- defer megaCacheMu.Unlock()
- srv := megaCache[opt.User]
- if srv == nil {
- srv = mega.New().SetClient(fshttp.NewClient(ctx))
- srv.SetRetries(ci.LowLevelRetries) // let mega do the low level retries
- srv.SetHTTPS(opt.UseHTTPS)
- srv.SetLogger(func(format string, v ...interface{}) {
- fs.Infof("*go-mega*", format, v...)
- })
- if opt.Debug {
- srv.SetDebugger(func(format string, v ...interface{}) {
- fs.Debugf("*go-mega*", format, v...)
- })
- }
- err := srv.Login(opt.User, opt.Pass)
- if err != nil {
- return nil, fmt.Errorf("couldn't login: %w", err)
- }
- megaCache[opt.User] = srv
- }
- root = parsePath(root)
- f := &Fs{
- name: name,
- root: root,
- opt: *opt,
- srv: srv,
- pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
- }
- f.features = (&fs.Features{
- DuplicateFiles: true,
- CanHaveEmptyDirectories: true,
- }).Fill(ctx, f)
- // Find the root node and check if it is a file or not
- _, err = f.findRoot(ctx, false)
- switch err {
- case nil:
- // root node found and is a directory
- case fs.ErrorDirNotFound:
- // root node not found, so can't be a file
- case fs.ErrorIsFile:
- // root node is a file so point to parent directory
- root = path.Dir(root)
- if root == "." {
- root = ""
- }
- f.root = root
- return f, err
- }
- return f, nil
- }
- // splitNodePath splits nodePath into / separated parts, returning nil if it
- // should refer to the root.
- // It also encodes the parts into backend-specific encoding
- func (f *Fs) splitNodePath(nodePath string) (parts []string) {
- nodePath = path.Clean(nodePath)
- if nodePath == "." || nodePath == "/" {
- return nil
- }
- nodePath = f.opt.Enc.FromStandardPath(nodePath)
- return strings.Split(nodePath, "/")
- }
- // findNode looks up the node for the path of the name given from the root given
- //
- // It returns mega.ENOENT if it wasn't found
- func (f *Fs) findNode(rootNode *mega.Node, nodePath string) (*mega.Node, error) {
- parts := f.splitNodePath(nodePath)
- if parts == nil {
- return rootNode, nil
- }
- nodes, err := f.srv.FS.PathLookup(rootNode, parts)
- if err != nil {
- return nil, err
- }
- return nodes[len(nodes)-1], nil
- }
- // findDir finds the directory rooted from the node passed in
- func (f *Fs) findDir(rootNode *mega.Node, dir string) (node *mega.Node, err error) {
- node, err = f.findNode(rootNode, dir)
- if err == mega.ENOENT {
- return nil, fs.ErrorDirNotFound
- } else if err == nil && node.GetType() == mega.FILE {
- return nil, fs.ErrorIsFile
- }
- return node, err
- }
- // findObject looks up the node for the object of the name given
- func (f *Fs) findObject(rootNode *mega.Node, file string) (node *mega.Node, err error) {
- node, err = f.findNode(rootNode, file)
- if err == mega.ENOENT {
- return nil, fs.ErrorObjectNotFound
- } else if err == nil && node.GetType() != mega.FILE {
- return nil, fs.ErrorIsDir // all other node types are directories
- }
- return node, err
- }
- // lookupDir looks up the node for the directory of the name given
- //
- // if create is true it tries to create the root directory if not found
- func (f *Fs) lookupDir(ctx context.Context, dir string) (*mega.Node, error) {
- rootNode, err := f.findRoot(ctx, false)
- if err != nil {
- return nil, err
- }
- return f.findDir(rootNode, dir)
- }
- // lookupParentDir finds the parent node for the remote passed in
- func (f *Fs) lookupParentDir(ctx context.Context, remote string) (dirNode *mega.Node, leaf string, err error) {
- parent, leaf := path.Split(remote)
- dirNode, err = f.lookupDir(ctx, parent)
- return dirNode, leaf, err
- }
- // mkdir makes the directory and any parent directories for the
- // directory of the name given
- func (f *Fs) mkdir(ctx context.Context, rootNode *mega.Node, dir string) (node *mega.Node, err error) {
- f.mkdirMu.Lock()
- defer f.mkdirMu.Unlock()
- parts := f.splitNodePath(dir)
- if parts == nil {
- return rootNode, nil
- }
- var i int
- // look up until we find a directory which exists
- for i = 0; i <= len(parts); i++ {
- var nodes []*mega.Node
- nodes, err = f.srv.FS.PathLookup(rootNode, parts[:len(parts)-i])
- if err == nil {
- if len(nodes) == 0 {
- node = rootNode
- } else {
- node = nodes[len(nodes)-1]
- }
- break
- }
- if err != mega.ENOENT {
- return nil, fmt.Errorf("mkdir lookup failed: %w", err)
- }
- }
- if err != nil {
- return nil, fmt.Errorf("internal error: mkdir called with nonexistent root node: %w", err)
- }
- // i is number of directories to create (may be 0)
- // node is directory to create them from
- for _, name := range parts[len(parts)-i:] {
- // create directory called name in node
- err = f.pacer.Call(func() (bool, error) {
- node, err = f.srv.CreateDir(name, node)
- return shouldRetry(ctx, err)
- })
- if err != nil {
- return nil, fmt.Errorf("mkdir create node failed: %w", err)
- }
- }
- return node, nil
- }
- // mkdirParent creates the parent directory of remote
- func (f *Fs) mkdirParent(ctx context.Context, remote string) (dirNode *mega.Node, leaf string, err error) {
- rootNode, err := f.findRoot(ctx, true)
- if err != nil {
- return nil, "", err
- }
- parent, leaf := path.Split(remote)
- dirNode, err = f.mkdir(ctx, rootNode, parent)
- return dirNode, leaf, err
- }
- // findRoot looks up the root directory node and returns it.
- //
- // if create is true it tries to create the root directory if not found
- func (f *Fs) findRoot(ctx context.Context, create bool) (*mega.Node, error) {
- f.rootNodeMu.Lock()
- defer f.rootNodeMu.Unlock()
- // Check if we haven't found it already
- if f._rootNode != nil {
- return f._rootNode, nil
- }
- // Check for preexisting root
- absRoot := f.srv.FS.GetRoot()
- node, err := f.findDir(absRoot, f.root)
- //log.Printf("findRoot findDir %p %v", node, err)
- if err == nil {
- f._rootNode = node
- return node, nil
- }
- if !create || err != fs.ErrorDirNotFound {
- return nil, err
- }
- //..not found so create the root directory
- f._rootNode, err = f.mkdir(ctx, absRoot, f.root)
- return f._rootNode, err
- }
- // clearRoot unsets the root directory
- func (f *Fs) clearRoot() {
- f.rootNodeMu.Lock()
- f._rootNode = nil
- f.rootNodeMu.Unlock()
- //log.Printf("cleared root directory")
- }
- // CleanUp deletes all files currently in trash
- func (f *Fs) CleanUp(ctx context.Context) (err error) {
- trash := f.srv.FS.GetTrash()
- items := []*mega.Node{}
- _, err = f.list(ctx, trash, func(item *mega.Node) bool {
- items = append(items, item)
- return false
- })
- if err != nil {
- return fmt.Errorf("CleanUp failed to list items in trash: %w", err)
- }
- fs.Infof(f, "Deleting %d items from the trash", len(items))
- errors := 0
- // similar to f.deleteNode(trash) but with HardDelete as true
- for _, item := range items {
- fs.Debugf(f, "Deleting trash %q", f.opt.Enc.ToStandardName(item.GetName()))
- deleteErr := f.pacer.Call(func() (bool, error) {
- err := f.srv.Delete(item, true)
- return shouldRetry(ctx, err)
- })
- if deleteErr != nil {
- err = deleteErr
- errors++
- }
- }
- fs.Infof(f, "Deleted %d items from the trash with %d errors", len(items), errors)
- return err
- }
- // Return an Object from a path
- //
- // If it can't be found it returns the error fs.ErrorObjectNotFound.
- func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *mega.Node) (fs.Object, error) {
- o := &Object{
- fs: f,
- remote: remote,
- }
- var err error
- if info != nil {
- // Set info
- err = o.setMetaData(info)
- } else {
- err = o.readMetaData(ctx) // reads info and meta, returning an error
- }
- if err != nil {
- return nil, err
- }
- return o, nil
- }
- // NewObject finds the Object at remote. If it can't be found
- // it returns the error fs.ErrorObjectNotFound.
- func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
- return f.newObjectWithInfo(ctx, remote, nil)
- }
- // list the objects into the function supplied
- //
- // If directories is set it only sends directories
- // User function to process a File item from listAll
- //
- // Should return true to finish processing
- type listFn func(*mega.Node) bool
- // Lists the directory required calling the user function on each item found
- //
- // If the user fn ever returns true then it early exits with found = true
- func (f *Fs) list(ctx context.Context, dir *mega.Node, fn listFn) (found bool, err error) {
- nodes, err := f.srv.FS.GetChildren(dir)
- if err != nil {
- return false, fmt.Errorf("list failed: %w", err)
- }
- for _, item := range nodes {
- if fn(item) {
- found = true
- break
- }
- }
- return
- }
- // List the objects and directories in dir into entries. The
- // entries can be returned in any order but should be for a
- // complete directory.
- //
- // dir should be "" to list the root, and should not have
- // trailing slashes.
- //
- // This should return ErrDirNotFound if the directory isn't
- // found.
- func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
- dirNode, err := f.lookupDir(ctx, dir)
- if err != nil {
- return nil, err
- }
- var iErr error
- _, err = f.list(ctx, dirNode, func(info *mega.Node) bool {
- remote := path.Join(dir, f.opt.Enc.ToStandardName(info.GetName()))
- switch info.GetType() {
- case mega.FOLDER, mega.ROOT, mega.INBOX, mega.TRASH:
- d := fs.NewDir(remote, info.GetTimeStamp()).SetID(info.GetHash())
- entries = append(entries, d)
- case mega.FILE:
- o, err := f.newObjectWithInfo(ctx, remote, info)
- if err != nil {
- iErr = err
- return true
- }
- entries = append(entries, o)
- }
- return false
- })
- if err != nil {
- return nil, err
- }
- if iErr != nil {
- return nil, iErr
- }
- return entries, nil
- }
- // Creates from the parameters passed in a half finished Object which
- // must have setMetaData called on it
- //
- // Returns the dirNode, object, leaf and error.
- //
- // Used to create new objects
- func (f *Fs) createObject(ctx context.Context, remote string, modTime time.Time, size int64) (o *Object, dirNode *mega.Node, leaf string, err error) {
- dirNode, leaf, err = f.mkdirParent(ctx, remote)
- if err != nil {
- return nil, nil, leaf, err
- }
- // Temporary Object under construction
- o = &Object{
- fs: f,
- remote: remote,
- }
- return o, dirNode, leaf, nil
- }
- // Put the object
- //
- // Copy the reader in to the new object which is returned.
- //
- // The new object may have been created if an error is returned
- // PutUnchecked uploads the object
- //
- // This will create a duplicate if we upload a new file without
- // checking to see if there is one already - use Put() for that.
- func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
- existingObj, err := f.newObjectWithInfo(ctx, src.Remote(), nil)
- switch err {
- case nil:
- return existingObj, existingObj.Update(ctx, in, src, options...)
- case fs.ErrorObjectNotFound:
- // Not found so create it
- return f.PutUnchecked(ctx, in, src)
- default:
- return nil, err
- }
- }
- // PutUnchecked the object
- //
- // Copy the reader in to the new object which is returned.
- //
- // The new object may have been created if an error is returned
- // PutUnchecked uploads the object
- //
- // This will create a duplicate if we upload a new file without
- // checking to see if there is one already - use Put() for that.
- func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
- remote := src.Remote()
- size := src.Size()
- modTime := src.ModTime(ctx)
- o, _, _, err := f.createObject(ctx, remote, modTime, size)
- if err != nil {
- return nil, err
- }
- return o, o.Update(ctx, in, src, options...)
- }
- // Mkdir creates the directory if it doesn't exist
- func (f *Fs) Mkdir(ctx context.Context, dir string) error {
- rootNode, err := f.findRoot(ctx, true)
- if err != nil {
- return err
- }
- _, err = f.mkdir(ctx, rootNode, dir)
- if err != nil {
- return fmt.Errorf("Mkdir failed: %w", err)
- }
- return nil
- }
- // deleteNode removes a file or directory, observing useTrash
- func (f *Fs) deleteNode(ctx context.Context, node *mega.Node) (err error) {
- err = f.pacer.Call(func() (bool, error) {
- err = f.srv.Delete(node, f.opt.HardDelete)
- return shouldRetry(ctx, err)
- })
- return err
- }
- // purgeCheck removes the directory dir, if check is set then it
- // refuses to do so if it has anything in
- func (f *Fs) purgeCheck(ctx context.Context, dir string, check bool) error {
- f.mkdirMu.Lock()
- defer f.mkdirMu.Unlock()
- rootNode, err := f.findRoot(ctx, false)
- if err != nil {
- return err
- }
- dirNode, err := f.findDir(rootNode, dir)
- if err != nil {
- return err
- }
- if check {
- children, err := f.srv.FS.GetChildren(dirNode)
- if err != nil {
- return fmt.Errorf("purgeCheck GetChildren failed: %w", err)
- }
- if len(children) > 0 {
- return fs.ErrorDirectoryNotEmpty
- }
- }
- waitEvent := f.srv.WaitEventsStart()
- err = f.deleteNode(ctx, dirNode)
- if err != nil {
- return fmt.Errorf("delete directory node failed: %w", err)
- }
- // Remove the root node if we just deleted it
- if dirNode == rootNode {
- f.clearRoot()
- }
- f.srv.WaitEvents(waitEvent, eventWaitTime)
- return nil
- }
- // Rmdir deletes the root folder
- //
- // Returns an error if it isn't empty
- func (f *Fs) Rmdir(ctx context.Context, dir string) error {
- return f.purgeCheck(ctx, dir, true)
- }
- // Precision return the precision of this Fs
- func (f *Fs) Precision() time.Duration {
- return fs.ModTimeNotSupported
- }
- // Purge deletes all the files in the directory
- //
- // Optional interface: Only implement this if you have a way of
- // deleting all the files quicker than just running Remove() on the
- // result of List()
- func (f *Fs) Purge(ctx context.Context, dir string) error {
- return f.purgeCheck(ctx, dir, false)
- }
- // move a file or folder (srcFs, srcRemote, info) to (f, dstRemote)
- //
- // info will be updates
- func (f *Fs) move(ctx context.Context, dstRemote string, srcFs *Fs, srcRemote string, info *mega.Node) (err error) {
- var (
- dstFs = f
- srcDirNode, dstDirNode *mega.Node
- srcParent, dstParent string
- srcLeaf, dstLeaf string
- )
- if dstRemote != "" {
- // lookup or create the destination parent directory
- dstDirNode, dstLeaf, err = dstFs.mkdirParent(ctx, dstRemote)
- } else {
- // find or create the parent of the root directory
- absRoot := dstFs.srv.FS.GetRoot()
- dstParent, dstLeaf = path.Split(dstFs.root)
- dstDirNode, err = dstFs.mkdir(ctx, absRoot, dstParent)
- }
- if err != nil {
- return fmt.Errorf("server-side move failed to make dst parent dir: %w", err)
- }
- if srcRemote != "" {
- // lookup the existing parent directory
- srcDirNode, srcLeaf, err = srcFs.lookupParentDir(ctx, srcRemote)
- } else {
- // lookup the existing root parent
- absRoot := srcFs.srv.FS.GetRoot()
- srcParent, srcLeaf = path.Split(srcFs.root)
- srcDirNode, err = f.findDir(absRoot, srcParent)
- }
- if err != nil {
- return fmt.Errorf("server-side move failed to lookup src parent dir: %w", err)
- }
- // move the object into its new directory if required
- if srcDirNode != dstDirNode && srcDirNode.GetHash() != dstDirNode.GetHash() {
- //log.Printf("move src %p %q dst %p %q", srcDirNode, srcDirNode.GetName(), dstDirNode, dstDirNode.GetName())
- err = f.pacer.Call(func() (bool, error) {
- err = f.srv.Move(info, dstDirNode)
- return shouldRetry(ctx, err)
- })
- if err != nil {
- return fmt.Errorf("server-side move failed: %w", err)
- }
- }
- waitEvent := f.srv.WaitEventsStart()
- // rename the object if required
- if srcLeaf != dstLeaf {
- //log.Printf("rename %q to %q", srcLeaf, dstLeaf)
- err = f.pacer.Call(func() (bool, error) {
- err = f.srv.Rename(info, f.opt.Enc.FromStandardName(dstLeaf))
- return shouldRetry(ctx, err)
- })
- if err != nil {
- return fmt.Errorf("server-side rename failed: %w", err)
- }
- }
- f.srv.WaitEvents(waitEvent, eventWaitTime)
- return nil
- }
- // Move src to this remote using server-side move operations.
- //
- // This is stored with the remote path given.
- //
- // It returns the destination Object and a possible error.
- //
- // Will only be called if src.Fs().Name() == f.Name()
- //
- // If it isn't possible then return fs.ErrorCantMove
- func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
- dstFs := f
- //log.Printf("Move %q -> %q", src.Remote(), remote)
- srcObj, ok := src.(*Object)
- if !ok {
- fs.Debugf(src, "Can't move - not same remote type")
- return nil, fs.ErrorCantMove
- }
- // Do the move
- err := f.move(ctx, remote, srcObj.fs, srcObj.remote, srcObj.info)
- if err != nil {
- return nil, err
- }
- // Create a destination object
- dstObj := &Object{
- fs: dstFs,
- remote: remote,
- info: srcObj.info,
- }
- return dstObj, nil
- }
- // DirMove moves src, srcRemote to this remote at dstRemote
- // using server-side move operations.
- //
- // Will only be called if src.Fs().Name() == f.Name()
- //
- // If it isn't possible then return fs.ErrorCantDirMove
- //
- // If destination exists then return fs.ErrorDirExists
- func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error {
- dstFs := f
- srcFs, ok := src.(*Fs)
- if !ok {
- fs.Debugf(srcFs, "Can't move directory - not same remote type")
- return fs.ErrorCantDirMove
- }
- // find the source
- info, err := srcFs.lookupDir(ctx, srcRemote)
- if err != nil {
- return err
- }
- // check the destination doesn't exist
- _, err = dstFs.lookupDir(ctx, dstRemote)
- if err == nil {
- return fs.ErrorDirExists
- } else if err != fs.ErrorDirNotFound {
- return fmt.Errorf("DirMove error while checking dest directory: %w", err)
- }
- // Do the move
- err = f.move(ctx, dstRemote, srcFs, srcRemote, info)
- if err != nil {
- return err
- }
- // Clear src if it was the root
- if srcRemote == "" {
- srcFs.clearRoot()
- }
- return nil
- }
- // DirCacheFlush an optional interface to flush internal directory cache
- func (f *Fs) DirCacheFlush() {
- // f.dirCache.ResetRoot()
- // FIXME Flush the mega somehow?
- }
- // Hashes returns the supported hash sets.
- func (f *Fs) Hashes() hash.Set {
- return hash.Set(hash.None)
- }
- // PublicLink generates a public link to the remote path (usually readable by anyone)
- func (f *Fs) PublicLink(ctx context.Context, remote string, expire fs.Duration, unlink bool) (link string, err error) {
- root, err := f.findRoot(ctx, false)
- if err != nil {
- return "", fmt.Errorf("PublicLink failed to find root node: %w", err)
- }
- node, err := f.findNode(root, remote)
- if err != nil {
- return "", fmt.Errorf("PublicLink failed to find path: %w", err)
- }
- link, err = f.srv.Link(node, true)
- if err != nil {
- return "", fmt.Errorf("PublicLink failed to create link: %w", err)
- }
- return link, nil
- }
- // MergeDirs merges the contents of all the directories passed
- // in into the first one and rmdirs the other directories.
- func (f *Fs) MergeDirs(ctx context.Context, dirs []fs.Directory) error {
- if len(dirs) < 2 {
- return nil
- }
- // find dst directory
- dstDir := dirs[0]
- dstDirNode := f.srv.FS.HashLookup(dstDir.ID())
- if dstDirNode == nil {
- return fmt.Errorf("MergeDirs failed to find node for: %v", dstDir)
- }
- for _, srcDir := range dirs[1:] {
- // find src directory
- srcDirNode := f.srv.FS.HashLookup(srcDir.ID())
- if srcDirNode == nil {
- return fmt.Errorf("MergeDirs failed to find node for: %v", srcDir)
- }
- // list the objects
- infos := []*mega.Node{}
- _, err := f.list(ctx, srcDirNode, func(info *mega.Node) bool {
- infos = append(infos, info)
- return false
- })
- if err != nil {
- return fmt.Errorf("MergeDirs list failed on %v: %w", srcDir, err)
- }
- // move them into place
- for _, info := range infos {
- fs.Infof(srcDir, "merging %q", f.opt.Enc.ToStandardName(info.GetName()))
- err = f.pacer.Call(func() (bool, error) {
- err = f.srv.Move(info, dstDirNode)
- return shouldRetry(ctx, err)
- })
- if err != nil {
- return fmt.Errorf("MergeDirs move failed on %q in %v: %w", f.opt.Enc.ToStandardName(info.GetName()), srcDir, err)
- }
- }
- // rmdir (into trash) the now empty source directory
- fs.Infof(srcDir, "removing empty directory")
- err = f.deleteNode(ctx, srcDirNode)
- if err != nil {
- return fmt.Errorf("MergeDirs move failed to rmdir %q: %w", srcDir, err)
- }
- }
- return nil
- }
- // About gets quota information
- func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
- var q mega.QuotaResp
- var err error
- err = f.pacer.Call(func() (bool, error) {
- q, err = f.srv.GetQuota()
- return shouldRetry(ctx, err)
- })
- if err != nil {
- return nil, fmt.Errorf("failed to get Mega Quota: %w", err)
- }
- usage := &fs.Usage{
- Total: fs.NewUsageValue(int64(q.Mstrg)), // quota of bytes that can be used
- Used: fs.NewUsageValue(int64(q.Cstrg)), // bytes in use
- Free: fs.NewUsageValue(int64(q.Mstrg - q.Cstrg)), // bytes which can be uploaded before reaching the quota
- }
- return usage, nil
- }
- // ------------------------------------------------------------
- // Fs returns the parent Fs
- func (o *Object) Fs() fs.Info {
- return o.fs
- }
- // Return a string version
- func (o *Object) String() string {
- if o == nil {
- return "<nil>"
- }
- return o.remote
- }
- // Remote returns the remote path
- func (o *Object) Remote() string {
- return o.remote
- }
- // Hash returns the hashes of an object
- func (o *Object) Hash(ctx context.Context, t hash.Type) (string, error) {
- return "", hash.ErrUnsupported
- }
- // Size returns the size of an object in bytes
- func (o *Object) Size() int64 {
- return o.info.GetSize()
- }
- // setMetaData sets the metadata from info
- func (o *Object) setMetaData(info *mega.Node) (err error) {
- if info.GetType() != mega.FILE {
- return fs.ErrorIsDir // all other node types are directories
- }
- o.info = info
- return nil
- }
- // readMetaData gets the metadata if it hasn't already been fetched
- //
- // it also sets the info
- func (o *Object) readMetaData(ctx context.Context) (err error) {
- if o.info != nil {
- return nil
- }
- info, err := o.fs.readMetaDataForPath(ctx, o.remote)
- if err != nil {
- if err == fs.ErrorDirNotFound {
- err = fs.ErrorObjectNotFound
- }
- return err
- }
- return o.setMetaData(info)
- }
- // ModTime returns the modification time of the object
- //
- // It attempts to read the objects mtime and if that isn't present the
- // LastModified returned in the http headers
- func (o *Object) ModTime(ctx context.Context) time.Time {
- return o.info.GetTimeStamp()
- }
- // SetModTime sets the modification time of the local fs object
- func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
- return fs.ErrorCantSetModTime
- }
- // Storable returns a boolean showing whether this object storable
- func (o *Object) Storable() bool {
- return true
- }
- // openObject represents a download in progress
- type openObject struct {
- ctx context.Context
- mu sync.Mutex
- o *Object
- d *mega.Download
- id int
- skip int64
- chunk []byte
- closed bool
- }
- // get the next chunk
- func (oo *openObject) getChunk(ctx context.Context) (err error) {
- if oo.id >= oo.d.Chunks() {
- return io.EOF
- }
- var chunk []byte
- err = oo.o.fs.pacer.Call(func() (bool, error) {
- chunk, err = oo.d.DownloadChunk(oo.id)
- return shouldRetry(ctx, err)
- })
- if err != nil {
- return err
- }
- oo.id++
- oo.chunk = chunk
- return nil
- }
- // Read reads up to len(p) bytes into p.
- func (oo *openObject) Read(p []byte) (n int, err error) {
- oo.mu.Lock()
- defer oo.mu.Unlock()
- if oo.closed {
- return 0, errors.New("read on closed file")
- }
- // Skip data at the start if requested
- for oo.skip > 0 {
- _, size, err := oo.d.ChunkLocation(oo.id)
- if err != nil {
- return 0, err
- }
- if oo.skip < int64(size) {
- break
- }
- oo.id++
- oo.skip -= int64(size)
- }
- if len(oo.chunk) == 0 {
- err = oo.getChunk(oo.ctx)
- if err != nil {
- return 0, err
- }
- if oo.skip > 0 {
- oo.chunk = oo.chunk[oo.skip:]
- oo.skip = 0
- }
- }
- n = copy(p, oo.chunk)
- oo.chunk = oo.chunk[n:]
- return n, nil
- }
- // Close closed the file - MAC errors are reported here
- func (oo *openObject) Close() (err error) {
- oo.mu.Lock()
- defer oo.mu.Unlock()
- if oo.closed {
- return nil
- }
- err = oo.o.fs.pacer.Call(func() (bool, error) {
- err = oo.d.Finish()
- return shouldRetry(oo.ctx, err)
- })
- if err != nil {
- return fmt.Errorf("failed to finish download: %w", err)
- }
- oo.closed = true
- return nil
- }
- // Open an object for read
- func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) {
- var offset, limit int64 = 0, -1
- for _, option := range options {
- switch x := option.(type) {
- case *fs.SeekOption:
- offset = x.Offset
- case *fs.RangeOption:
- offset, limit = x.Decode(o.Size())
- default:
- if option.Mandatory() {
- fs.Logf(o, "Unsupported mandatory option: %v", option)
- }
- }
- }
- var d *mega.Download
- err = o.fs.pacer.Call(func() (bool, error) {
- d, err = o.fs.srv.NewDownload(o.info)
- return shouldRetry(ctx, err)
- })
- if err != nil {
- return nil, fmt.Errorf("open download file failed: %w", err)
- }
- oo := &openObject{
- ctx: ctx,
- o: o,
- d: d,
- skip: offset,
- }
- return readers.NewLimitedReadCloser(oo, limit), nil
- }
- // Update the object with the contents of the io.Reader, modTime and size
- //
- // If existing is set then it updates the object rather than creating a new one.
- //
- // The new object may have been created if an error is returned
- func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
- size := src.Size()
- if size < 0 {
- return errors.New("mega backend can't upload a file of unknown length")
- }
- //modTime := src.ModTime(ctx)
- remote := o.Remote()
- // Create the parent directory
- dirNode, leaf, err := o.fs.mkdirParent(ctx, remote)
- if err != nil {
- return fmt.Errorf("update make parent dir failed: %w", err)
- }
- var u *mega.Upload
- err = o.fs.pacer.Call(func() (bool, error) {
- u, err = o.fs.srv.NewUpload(dirNode, o.fs.opt.Enc.FromStandardName(leaf), size)
- return shouldRetry(ctx, err)
- })
- if err != nil {
- return fmt.Errorf("upload file failed to create session: %w", err)
- }
- // Upload the chunks
- // FIXME do this in parallel
- for id := 0; id < u.Chunks(); id++ {
- _, chunkSize, err := u.ChunkLocation(id)
- if err != nil {
- return fmt.Errorf("upload failed to read chunk location: %w", err)
- }
- chunk := make([]byte, chunkSize)
- _, err = io.ReadFull(in, chunk)
- if err != nil {
- return fmt.Errorf("upload failed to read data: %w", err)
- }
- err = o.fs.pacer.Call(func() (bool, error) {
- err = u.UploadChunk(id, chunk)
- return shouldRetry(ctx, err)
- })
- if err != nil {
- return fmt.Errorf("upload file failed to upload chunk: %w", err)
- }
- }
- // Finish the upload
- var info *mega.Node
- err = o.fs.pacer.Call(func() (bool, error) {
- info, err = u.Finish()
- return shouldRetry(ctx, err)
- })
- if err != nil {
- return fmt.Errorf("failed to finish upload: %w", err)
- }
- // If the upload succeeded and the original object existed, then delete it
- if o.info != nil {
- err = o.fs.deleteNode(ctx, o.info)
- if err != nil {
- return fmt.Errorf("upload failed to remove old version: %w", err)
- }
- o.info = nil
- }
- return o.setMetaData(info)
- }
- // Remove an object
- func (o *Object) Remove(ctx context.Context) error {
- err := o.fs.deleteNode(ctx, o.info)
- if err != nil {
- return fmt.Errorf("Remove object failed: %w", err)
- }
- return nil
- }
- // ID returns the ID of the Object if known, or "" if not
- func (o *Object) ID() string {
- return o.info.GetHash()
- }
- // Check the interfaces are satisfied
- var (
- _ fs.Fs = (*Fs)(nil)
- _ fs.Purger = (*Fs)(nil)
- _ fs.Mover = (*Fs)(nil)
- _ fs.PutUncheckeder = (*Fs)(nil)
- _ fs.DirMover = (*Fs)(nil)
- _ fs.DirCacheFlusher = (*Fs)(nil)
- _ fs.PublicLinker = (*Fs)(nil)
- _ fs.MergeDirser = (*Fs)(nil)
- _ fs.Abouter = (*Fs)(nil)
- _ fs.Object = (*Object)(nil)
- _ fs.IDer = (*Object)(nil)
- )
|