azureblob.go 77 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458
  1. //go:build !plan9 && !solaris && !js
  2. // Package azureblob provides an interface to the Microsoft Azure blob object storage system
  3. package azureblob
  4. import (
  5. "context"
  6. "crypto/md5"
  7. "encoding/base64"
  8. "encoding/binary"
  9. "encoding/hex"
  10. "encoding/json"
  11. "errors"
  12. "fmt"
  13. "io"
  14. "net/http"
  15. "net/url"
  16. "os"
  17. "path"
  18. "sort"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "time"
  23. "github.com/Azure/azure-sdk-for-go/sdk/azcore"
  24. "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
  25. "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
  26. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
  27. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
  28. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
  29. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
  30. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
  31. "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
  32. "github.com/rclone/rclone/fs"
  33. "github.com/rclone/rclone/fs/chunksize"
  34. "github.com/rclone/rclone/fs/config"
  35. "github.com/rclone/rclone/fs/config/configmap"
  36. "github.com/rclone/rclone/fs/config/configstruct"
  37. "github.com/rclone/rclone/fs/config/obscure"
  38. "github.com/rclone/rclone/fs/fserrors"
  39. "github.com/rclone/rclone/fs/fshttp"
  40. "github.com/rclone/rclone/fs/hash"
  41. "github.com/rclone/rclone/fs/walk"
  42. "github.com/rclone/rclone/lib/bucket"
  43. "github.com/rclone/rclone/lib/encoder"
  44. "github.com/rclone/rclone/lib/env"
  45. "github.com/rclone/rclone/lib/multipart"
  46. "github.com/rclone/rclone/lib/pacer"
  47. )
  48. const (
  49. minSleep = 10 * time.Millisecond
  50. maxSleep = 10 * time.Second
  51. decayConstant = 1 // bigger for slower decay, exponential
  52. maxListChunkSize = 5000 // number of items to read at once
  53. modTimeKey = "mtime"
  54. dirMetaKey = "hdi_isfolder"
  55. dirMetaValue = "true"
  56. timeFormatIn = time.RFC3339
  57. timeFormatOut = "2006-01-02T15:04:05.000000000Z07:00"
  58. storageDefaultBaseURL = "blob.core.windows.net"
  59. defaultChunkSize = 4 * fs.Mebi
  60. defaultAccessTier = blob.AccessTier("") // FIXME AccessTierNone
  61. // Default storage account, key and blob endpoint for emulator support,
  62. // though it is a base64 key checked in here, it is publicly available secret.
  63. emulatorAccount = "devstoreaccount1"
  64. emulatorAccountKey = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
  65. emulatorBlobEndpoint = "http://127.0.0.1:10000/devstoreaccount1"
  66. )
  67. var (
  68. errCantUpdateArchiveTierBlobs = fserrors.NoRetryError(errors.New("can't update archive tier blob without --azureblob-archive-tier-delete"))
  69. // Take this when changing or reading metadata.
  70. //
  71. // It acts as global metadata lock so we don't bloat Object
  72. // with an extra lock that will only very rarely be contended.
  73. metadataMu sync.Mutex
  74. )
  75. // Register with Fs
  76. func init() {
  77. fs.Register(&fs.RegInfo{
  78. Name: "azureblob",
  79. Description: "Microsoft Azure Blob Storage",
  80. NewFs: NewFs,
  81. Options: []fs.Option{{
  82. Name: "account",
  83. Help: `Azure Storage Account Name.
  84. Set this to the Azure Storage Account Name in use.
  85. Leave blank to use SAS URL or Emulator, otherwise it needs to be set.
  86. If this is blank and if env_auth is set it will be read from the
  87. environment variable ` + "`AZURE_STORAGE_ACCOUNT_NAME`" + ` if possible.
  88. `,
  89. Sensitive: true,
  90. }, {
  91. Name: "env_auth",
  92. Help: `Read credentials from runtime (environment variables, CLI or MSI).
  93. See the [authentication docs](/azureblob#authentication) for full info.`,
  94. Default: false,
  95. }, {
  96. Name: "key",
  97. Help: `Storage Account Shared Key.
  98. Leave blank to use SAS URL or Emulator.`,
  99. Sensitive: true,
  100. }, {
  101. Name: "sas_url",
  102. Help: `SAS URL for container level access only.
  103. Leave blank if using account/key or Emulator.`,
  104. Sensitive: true,
  105. }, {
  106. Name: "tenant",
  107. Help: `ID of the service principal's tenant. Also called its directory ID.
  108. Set this if using
  109. - Service principal with client secret
  110. - Service principal with certificate
  111. - User with username and password
  112. `,
  113. Sensitive: true,
  114. }, {
  115. Name: "client_id",
  116. Help: `The ID of the client in use.
  117. Set this if using
  118. - Service principal with client secret
  119. - Service principal with certificate
  120. - User with username and password
  121. `,
  122. Sensitive: true,
  123. }, {
  124. Name: "client_secret",
  125. Help: `One of the service principal's client secrets
  126. Set this if using
  127. - Service principal with client secret
  128. `,
  129. Sensitive: true,
  130. }, {
  131. Name: "client_certificate_path",
  132. Help: `Path to a PEM or PKCS12 certificate file including the private key.
  133. Set this if using
  134. - Service principal with certificate
  135. `,
  136. }, {
  137. Name: "client_certificate_password",
  138. Help: `Password for the certificate file (optional).
  139. Optionally set this if using
  140. - Service principal with certificate
  141. And the certificate has a password.
  142. `,
  143. IsPassword: true,
  144. }, {
  145. Name: "client_send_certificate_chain",
  146. Help: `Send the certificate chain when using certificate auth.
  147. Specifies whether an authentication request will include an x5c header
  148. to support subject name / issuer based authentication. When set to
  149. true, authentication requests include the x5c header.
  150. Optionally set this if using
  151. - Service principal with certificate
  152. `,
  153. Default: false,
  154. Advanced: true,
  155. }, {
  156. Name: "username",
  157. Help: `User name (usually an email address)
  158. Set this if using
  159. - User with username and password
  160. `,
  161. Advanced: true,
  162. Sensitive: true,
  163. }, {
  164. Name: "password",
  165. Help: `The user's password
  166. Set this if using
  167. - User with username and password
  168. `,
  169. IsPassword: true,
  170. Advanced: true,
  171. }, {
  172. Name: "service_principal_file",
  173. Help: `Path to file containing credentials for use with a service principal.
  174. Leave blank normally. Needed only if you want to use a service principal instead of interactive login.
  175. $ az ad sp create-for-rbac --name "<name>" \
  176. --role "Storage Blob Data Owner" \
  177. --scopes "/subscriptions/<subscription>/resourceGroups/<resource-group>/providers/Microsoft.Storage/storageAccounts/<storage-account>/blobServices/default/containers/<container>" \
  178. > azure-principal.json
  179. See ["Create an Azure service principal"](https://docs.microsoft.com/en-us/cli/azure/create-an-azure-service-principal-azure-cli) and ["Assign an Azure role for access to blob data"](https://docs.microsoft.com/en-us/azure/storage/common/storage-auth-aad-rbac-cli) pages for more details.
  180. It may be more convenient to put the credentials directly into the
  181. rclone config file under the ` + "`client_id`, `tenant` and `client_secret`" + `
  182. keys instead of setting ` + "`service_principal_file`" + `.
  183. `,
  184. Advanced: true,
  185. }, {
  186. Name: "use_msi",
  187. Help: `Use a managed service identity to authenticate (only works in Azure).
  188. When true, use a [managed service identity](https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/)
  189. to authenticate to Azure Storage instead of a SAS token or account key.
  190. If the VM(SS) on which this program is running has a system-assigned identity, it will
  191. be used by default. If the resource has no system-assigned but exactly one user-assigned identity,
  192. the user-assigned identity will be used by default. If the resource has multiple user-assigned
  193. identities, the identity to use must be explicitly specified using exactly one of the msi_object_id,
  194. msi_client_id, or msi_mi_res_id parameters.`,
  195. Default: false,
  196. Advanced: true,
  197. }, {
  198. Name: "msi_object_id",
  199. Help: "Object ID of the user-assigned MSI to use, if any.\n\nLeave blank if msi_client_id or msi_mi_res_id specified.",
  200. Advanced: true,
  201. Sensitive: true,
  202. }, {
  203. Name: "msi_client_id",
  204. Help: "Object ID of the user-assigned MSI to use, if any.\n\nLeave blank if msi_object_id or msi_mi_res_id specified.",
  205. Advanced: true,
  206. Sensitive: true,
  207. }, {
  208. Name: "msi_mi_res_id",
  209. Help: "Azure resource ID of the user-assigned MSI to use, if any.\n\nLeave blank if msi_client_id or msi_object_id specified.",
  210. Advanced: true,
  211. Sensitive: true,
  212. }, {
  213. Name: "use_emulator",
  214. Help: "Uses local storage emulator if provided as 'true'.\n\nLeave blank if using real azure storage endpoint.",
  215. Default: false,
  216. Advanced: true,
  217. }, {
  218. Name: "endpoint",
  219. Help: "Endpoint for the service.\n\nLeave blank normally.",
  220. Advanced: true,
  221. }, {
  222. Name: "upload_cutoff",
  223. Help: "Cutoff for switching to chunked upload (<= 256 MiB) (deprecated).",
  224. Advanced: true,
  225. }, {
  226. Name: "chunk_size",
  227. Help: `Upload chunk size.
  228. Note that this is stored in memory and there may be up to
  229. "--transfers" * "--azureblob-upload-concurrency" chunks stored at once
  230. in memory.`,
  231. Default: defaultChunkSize,
  232. Advanced: true,
  233. }, {
  234. Name: "upload_concurrency",
  235. Help: `Concurrency for multipart uploads.
  236. This is the number of chunks of the same file that are uploaded
  237. concurrently.
  238. If you are uploading small numbers of large files over high-speed
  239. links and these uploads do not fully utilize your bandwidth, then
  240. increasing this may help to speed up the transfers.
  241. In tests, upload speed increases almost linearly with upload
  242. concurrency. For example to fill a gigabit pipe it may be necessary to
  243. raise this to 64. Note that this will use more memory.
  244. Note that chunks are stored in memory and there may be up to
  245. "--transfers" * "--azureblob-upload-concurrency" chunks stored at once
  246. in memory.`,
  247. Default: 16,
  248. Advanced: true,
  249. }, {
  250. Name: "list_chunk",
  251. Help: `Size of blob list.
  252. This sets the number of blobs requested in each listing chunk. Default
  253. is the maximum, 5000. "List blobs" requests are permitted 2 minutes
  254. per megabyte to complete. If an operation is taking longer than 2
  255. minutes per megabyte on average, it will time out (
  256. [source](https://docs.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-blob-service-operations#exceptions-to-default-timeout-interval)
  257. ). This can be used to limit the number of blobs items to return, to
  258. avoid the time out.`,
  259. Default: maxListChunkSize,
  260. Advanced: true,
  261. }, {
  262. Name: "access_tier",
  263. Help: `Access tier of blob: hot, cool, cold or archive.
  264. Archived blobs can be restored by setting access tier to hot, cool or
  265. cold. Leave blank if you intend to use default access tier, which is
  266. set at account level
  267. If there is no "access tier" specified, rclone doesn't apply any tier.
  268. rclone performs "Set Tier" operation on blobs while uploading, if objects
  269. are not modified, specifying "access tier" to new one will have no effect.
  270. If blobs are in "archive tier" at remote, trying to perform data transfer
  271. operations from remote will not be allowed. User should first restore by
  272. tiering blob to "Hot", "Cool" or "Cold".`,
  273. Advanced: true,
  274. }, {
  275. Name: "archive_tier_delete",
  276. Default: false,
  277. Help: fmt.Sprintf(`Delete archive tier blobs before overwriting.
  278. Archive tier blobs cannot be updated. So without this flag, if you
  279. attempt to update an archive tier blob, then rclone will produce the
  280. error:
  281. %v
  282. With this flag set then before rclone attempts to overwrite an archive
  283. tier blob, it will delete the existing blob before uploading its
  284. replacement. This has the potential for data loss if the upload fails
  285. (unlike updating a normal blob) and also may cost more since deleting
  286. archive tier blobs early may be chargable.
  287. `, errCantUpdateArchiveTierBlobs),
  288. Advanced: true,
  289. }, {
  290. Name: "disable_checksum",
  291. Help: `Don't store MD5 checksum with object metadata.
  292. Normally rclone will calculate the MD5 checksum of the input before
  293. uploading it so it can add it to metadata on the object. This is great
  294. for data integrity checking but can cause long delays for large files
  295. to start uploading.`,
  296. Default: false,
  297. Advanced: true,
  298. }, {
  299. Name: "memory_pool_flush_time",
  300. Default: fs.Duration(time.Minute),
  301. Advanced: true,
  302. Hide: fs.OptionHideBoth,
  303. Help: `How often internal memory buffer pools will be flushed. (no longer used)`,
  304. }, {
  305. Name: "memory_pool_use_mmap",
  306. Default: false,
  307. Advanced: true,
  308. Hide: fs.OptionHideBoth,
  309. Help: `Whether to use mmap buffers in internal memory pool. (no longer used)`,
  310. }, {
  311. Name: config.ConfigEncoding,
  312. Help: config.ConfigEncodingHelp,
  313. Advanced: true,
  314. Default: (encoder.EncodeInvalidUtf8 |
  315. encoder.EncodeSlash |
  316. encoder.EncodeCtl |
  317. encoder.EncodeDel |
  318. encoder.EncodeBackSlash |
  319. encoder.EncodeRightPeriod),
  320. }, {
  321. Name: "public_access",
  322. Help: "Public access level of a container: blob or container.",
  323. Default: "",
  324. Examples: []fs.OptionExample{
  325. {
  326. Value: "",
  327. Help: "The container and its blobs can be accessed only with an authorized request.\nIt's a default value.",
  328. }, {
  329. Value: string(container.PublicAccessTypeBlob),
  330. Help: "Blob data within this container can be read via anonymous request.",
  331. }, {
  332. Value: string(container.PublicAccessTypeContainer),
  333. Help: "Allow full public read access for container and blob data.",
  334. },
  335. },
  336. Advanced: true,
  337. }, {
  338. Name: "directory_markers",
  339. Default: false,
  340. Advanced: true,
  341. Help: `Upload an empty object with a trailing slash when a new directory is created
  342. Empty folders are unsupported for bucket based remotes, this option
  343. creates an empty object ending with "/", to persist the folder.
  344. This object also has the metadata "` + dirMetaKey + ` = ` + dirMetaValue + `" to conform to
  345. the Microsoft standard.
  346. `,
  347. }, {
  348. Name: "no_check_container",
  349. Help: `If set, don't attempt to check the container exists or create it.
  350. This can be useful when trying to minimise the number of transactions
  351. rclone does if you know the container exists already.
  352. `,
  353. Default: false,
  354. Advanced: true,
  355. }, {
  356. Name: "no_head_object",
  357. Help: `If set, do not do HEAD before GET when getting objects.`,
  358. Default: false,
  359. Advanced: true,
  360. }, {
  361. Name: "delete_snapshots",
  362. Help: `Set to specify how to deal with snapshots on blob deletion.`,
  363. Examples: []fs.OptionExample{
  364. {
  365. Value: "",
  366. Help: "By default, the delete operation fails if a blob has snapshots",
  367. }, {
  368. Value: string(blob.DeleteSnapshotsOptionTypeInclude),
  369. Help: "Specify 'include' to remove the root blob and all its snapshots",
  370. }, {
  371. Value: string(blob.DeleteSnapshotsOptionTypeOnly),
  372. Help: "Specify 'only' to remove only the snapshots but keep the root blob.",
  373. },
  374. },
  375. Default: "",
  376. Exclusive: true,
  377. Advanced: true,
  378. }},
  379. })
  380. }
  381. // Options defines the configuration for this backend
  382. type Options struct {
  383. Account string `config:"account"`
  384. EnvAuth bool `config:"env_auth"`
  385. Key string `config:"key"`
  386. SASURL string `config:"sas_url"`
  387. Tenant string `config:"tenant"`
  388. ClientID string `config:"client_id"`
  389. ClientSecret string `config:"client_secret"`
  390. ClientCertificatePath string `config:"client_certificate_path"`
  391. ClientCertificatePassword string `config:"client_certificate_password"`
  392. ClientSendCertificateChain bool `config:"client_send_certificate_chain"`
  393. Username string `config:"username"`
  394. Password string `config:"password"`
  395. ServicePrincipalFile string `config:"service_principal_file"`
  396. UseMSI bool `config:"use_msi"`
  397. MSIObjectID string `config:"msi_object_id"`
  398. MSIClientID string `config:"msi_client_id"`
  399. MSIResourceID string `config:"msi_mi_res_id"`
  400. Endpoint string `config:"endpoint"`
  401. ChunkSize fs.SizeSuffix `config:"chunk_size"`
  402. UploadConcurrency int `config:"upload_concurrency"`
  403. ListChunkSize uint `config:"list_chunk"`
  404. AccessTier string `config:"access_tier"`
  405. ArchiveTierDelete bool `config:"archive_tier_delete"`
  406. UseEmulator bool `config:"use_emulator"`
  407. DisableCheckSum bool `config:"disable_checksum"`
  408. Enc encoder.MultiEncoder `config:"encoding"`
  409. PublicAccess string `config:"public_access"`
  410. DirectoryMarkers bool `config:"directory_markers"`
  411. NoCheckContainer bool `config:"no_check_container"`
  412. NoHeadObject bool `config:"no_head_object"`
  413. DeleteSnapshots string `config:"delete_snapshots"`
  414. }
  415. // Fs represents a remote azure server
  416. type Fs struct {
  417. name string // name of this remote
  418. root string // the path we are working on if any
  419. opt Options // parsed config options
  420. ci *fs.ConfigInfo // global config
  421. features *fs.Features // optional features
  422. cntSVCcacheMu sync.Mutex // mutex to protect cntSVCcache
  423. cntSVCcache map[string]*container.Client // reference to containerClient per container
  424. svc *service.Client // client to access azblob
  425. rootContainer string // container part of root (if any)
  426. rootDirectory string // directory part of root (if any)
  427. isLimited bool // if limited to one container
  428. cache *bucket.Cache // cache for container creation status
  429. pacer *fs.Pacer // To pace and retry the API calls
  430. uploadToken *pacer.TokenDispenser // control concurrency
  431. publicAccess container.PublicAccessType // Container Public Access Level
  432. }
  433. // Object describes an azure object
  434. type Object struct {
  435. fs *Fs // what this object is part of
  436. remote string // The remote path
  437. modTime time.Time // The modified time of the object if known
  438. md5 string // MD5 hash if known
  439. size int64 // Size of the object
  440. mimeType string // Content-Type of the object
  441. accessTier blob.AccessTier // Blob Access Tier
  442. meta map[string]string // blob metadata - take metadataMu when accessing
  443. }
  444. // ------------------------------------------------------------
  445. // Name of the remote (as passed into NewFs)
  446. func (f *Fs) Name() string {
  447. return f.name
  448. }
  449. // Root of the remote (as passed into NewFs)
  450. func (f *Fs) Root() string {
  451. return f.root
  452. }
  453. // String converts this Fs to a string
  454. func (f *Fs) String() string {
  455. if f.rootContainer == "" {
  456. return "Azure root"
  457. }
  458. if f.rootDirectory == "" {
  459. return fmt.Sprintf("Azure container %s", f.rootContainer)
  460. }
  461. return fmt.Sprintf("Azure container %s path %s", f.rootContainer, f.rootDirectory)
  462. }
  463. // Features returns the optional features of this Fs
  464. func (f *Fs) Features() *fs.Features {
  465. return f.features
  466. }
  467. // parsePath parses a remote 'url'
  468. func parsePath(path string) (root string) {
  469. root = strings.Trim(path, "/")
  470. return
  471. }
  472. // split returns container and containerPath from the rootRelativePath
  473. // relative to f.root
  474. func (f *Fs) split(rootRelativePath string) (containerName, containerPath string) {
  475. containerName, containerPath = bucket.Split(bucket.Join(f.root, rootRelativePath))
  476. return f.opt.Enc.FromStandardName(containerName), f.opt.Enc.FromStandardPath(containerPath)
  477. }
  478. // split returns container and containerPath from the object
  479. func (o *Object) split() (container, containerPath string) {
  480. return o.fs.split(o.remote)
  481. }
  482. // validateAccessTier checks if azureblob supports user supplied tier
  483. func validateAccessTier(tier string) bool {
  484. return strings.EqualFold(tier, string(blob.AccessTierHot)) ||
  485. strings.EqualFold(tier, string(blob.AccessTierCool)) ||
  486. strings.EqualFold(tier, string(blob.AccessTierCold)) ||
  487. strings.EqualFold(tier, string(blob.AccessTierArchive))
  488. }
  489. // validatePublicAccess checks if azureblob supports use supplied public access level
  490. func validatePublicAccess(publicAccess string) bool {
  491. switch publicAccess {
  492. case "",
  493. string(container.PublicAccessTypeBlob),
  494. string(container.PublicAccessTypeContainer):
  495. // valid cases
  496. return true
  497. default:
  498. return false
  499. }
  500. }
  501. // retryErrorCodes is a slice of error codes that we will retry
  502. var retryErrorCodes = []int{
  503. 401, // Unauthorized (e.g. "Token has expired")
  504. 408, // Request Timeout
  505. 429, // Rate exceeded.
  506. 500, // Get occasional 500 Internal Server Error
  507. 503, // Service Unavailable
  508. 504, // Gateway Time-out
  509. }
  510. // shouldRetry returns a boolean as to whether this resp and err
  511. // deserve to be retried. It returns the err as a convenience
  512. func (f *Fs) shouldRetry(ctx context.Context, err error) (bool, error) {
  513. if fserrors.ContextError(ctx, &err) {
  514. return false, err
  515. }
  516. // FIXME interpret special errors - more to do here
  517. if storageErr, ok := err.(*azcore.ResponseError); ok {
  518. switch storageErr.ErrorCode {
  519. case "InvalidBlobOrBlock":
  520. // These errors happen sometimes in multipart uploads
  521. // because of block concurrency issues
  522. return true, err
  523. }
  524. statusCode := storageErr.StatusCode
  525. for _, e := range retryErrorCodes {
  526. if statusCode == e {
  527. return true, err
  528. }
  529. }
  530. }
  531. return fserrors.ShouldRetry(err), err
  532. }
  533. func checkUploadChunkSize(cs fs.SizeSuffix) error {
  534. const minChunkSize = fs.SizeSuffixBase
  535. if cs < minChunkSize {
  536. return fmt.Errorf("%s is less than %s", cs, minChunkSize)
  537. }
  538. return nil
  539. }
  540. func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
  541. err = checkUploadChunkSize(cs)
  542. if err == nil {
  543. old, f.opt.ChunkSize = f.opt.ChunkSize, cs
  544. }
  545. return
  546. }
  547. type servicePrincipalCredentials struct {
  548. AppID string `json:"appId"`
  549. Password string `json:"password"`
  550. Tenant string `json:"tenant"`
  551. }
  552. // parseServicePrincipalCredentials unmarshals a service principal credentials JSON file as generated by az cli.
  553. func parseServicePrincipalCredentials(ctx context.Context, credentialsData []byte) (*servicePrincipalCredentials, error) {
  554. var spCredentials servicePrincipalCredentials
  555. if err := json.Unmarshal(credentialsData, &spCredentials); err != nil {
  556. return nil, fmt.Errorf("error parsing credentials from JSON file: %w", err)
  557. }
  558. // TODO: support certificate credentials
  559. // Validate all fields present
  560. if spCredentials.AppID == "" || spCredentials.Password == "" || spCredentials.Tenant == "" {
  561. return nil, fmt.Errorf("missing fields in credentials file")
  562. }
  563. return &spCredentials, nil
  564. }
  565. // setRoot changes the root of the Fs
  566. func (f *Fs) setRoot(root string) {
  567. f.root = parsePath(root)
  568. f.rootContainer, f.rootDirectory = bucket.Split(f.root)
  569. }
  570. // Wrap the http.Transport to satisfy the Transporter interface
  571. type transporter struct {
  572. http.RoundTripper
  573. }
  574. // Make a new transporter
  575. func newTransporter(ctx context.Context) transporter {
  576. return transporter{
  577. RoundTripper: fshttp.NewTransport(ctx),
  578. }
  579. }
  580. // Do sends the HTTP request and returns the HTTP response or error.
  581. func (tr transporter) Do(req *http.Request) (*http.Response, error) {
  582. return tr.RoundTripper.RoundTrip(req)
  583. }
  584. // NewFs constructs an Fs from the path, container:path
  585. func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, error) {
  586. // Parse config into Options struct
  587. opt := new(Options)
  588. err := configstruct.Set(m, opt)
  589. if err != nil {
  590. return nil, err
  591. }
  592. err = checkUploadChunkSize(opt.ChunkSize)
  593. if err != nil {
  594. return nil, fmt.Errorf("chunk size: %w", err)
  595. }
  596. if opt.ListChunkSize > maxListChunkSize {
  597. return nil, fmt.Errorf("blob list size can't be greater than %v - was %v", maxListChunkSize, opt.ListChunkSize)
  598. }
  599. if opt.AccessTier == "" {
  600. opt.AccessTier = string(defaultAccessTier)
  601. } else if !validateAccessTier(opt.AccessTier) {
  602. return nil, fmt.Errorf("supported access tiers are %s, %s, %s and %s",
  603. string(blob.AccessTierHot), string(blob.AccessTierCool), string(blob.AccessTierCold), string(blob.AccessTierArchive))
  604. }
  605. if !validatePublicAccess((opt.PublicAccess)) {
  606. return nil, fmt.Errorf("supported public access level are %s and %s",
  607. string(container.PublicAccessTypeBlob), string(container.PublicAccessTypeContainer))
  608. }
  609. ci := fs.GetConfig(ctx)
  610. f := &Fs{
  611. name: name,
  612. opt: *opt,
  613. ci: ci,
  614. pacer: fs.NewPacer(ctx, pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
  615. uploadToken: pacer.NewTokenDispenser(ci.Transfers),
  616. cache: bucket.NewCache(),
  617. cntSVCcache: make(map[string]*container.Client, 1),
  618. }
  619. f.publicAccess = container.PublicAccessType(opt.PublicAccess)
  620. f.setRoot(root)
  621. f.features = (&fs.Features{
  622. ReadMimeType: true,
  623. WriteMimeType: true,
  624. BucketBased: true,
  625. BucketBasedRootOK: true,
  626. SetTier: true,
  627. GetTier: true,
  628. }).Fill(ctx, f)
  629. if opt.DirectoryMarkers {
  630. f.features.CanHaveEmptyDirectories = true
  631. fs.Debugf(f, "Using directory markers")
  632. }
  633. // Client options specifying our own transport
  634. policyClientOptions := policy.ClientOptions{
  635. Transport: newTransporter(ctx),
  636. }
  637. clientOpt := service.ClientOptions{
  638. ClientOptions: policyClientOptions,
  639. }
  640. // Here we auth by setting one of cred, sharedKeyCred or f.svc
  641. var (
  642. cred azcore.TokenCredential
  643. sharedKeyCred *service.SharedKeyCredential
  644. )
  645. switch {
  646. case opt.EnvAuth:
  647. // Read account from environment if needed
  648. if opt.Account == "" {
  649. opt.Account, _ = os.LookupEnv("AZURE_STORAGE_ACCOUNT_NAME")
  650. }
  651. // Read credentials from the environment
  652. options := azidentity.DefaultAzureCredentialOptions{
  653. ClientOptions: policyClientOptions,
  654. }
  655. cred, err = azidentity.NewDefaultAzureCredential(&options)
  656. if err != nil {
  657. return nil, fmt.Errorf("create azure environment credential failed: %w", err)
  658. }
  659. case opt.UseEmulator:
  660. if opt.Account == "" {
  661. opt.Account = emulatorAccount
  662. }
  663. if opt.Key == "" {
  664. opt.Key = emulatorAccountKey
  665. }
  666. if opt.Endpoint == "" {
  667. opt.Endpoint = emulatorBlobEndpoint
  668. }
  669. sharedKeyCred, err = service.NewSharedKeyCredential(opt.Account, opt.Key)
  670. if err != nil {
  671. return nil, fmt.Errorf("create new shared key credential for emulator failed: %w", err)
  672. }
  673. case opt.Account != "" && opt.Key != "":
  674. sharedKeyCred, err = service.NewSharedKeyCredential(opt.Account, opt.Key)
  675. if err != nil {
  676. return nil, fmt.Errorf("create new shared key credential failed: %w", err)
  677. }
  678. case opt.SASURL != "":
  679. parts, err := sas.ParseURL(opt.SASURL)
  680. if err != nil {
  681. return nil, fmt.Errorf("failed to parse SAS URL: %w", err)
  682. }
  683. endpoint := opt.SASURL
  684. containerName := parts.ContainerName
  685. // Check if we have container level SAS or account level SAS
  686. if containerName != "" {
  687. // Container level SAS
  688. if f.rootContainer != "" && containerName != f.rootContainer {
  689. return nil, fmt.Errorf("container name in SAS URL (%q) and container provided in command (%q) do not match", containerName, f.rootContainer)
  690. }
  691. // Rewrite the endpoint string to be without the container
  692. parts.ContainerName = ""
  693. endpoint = parts.String()
  694. }
  695. f.svc, err = service.NewClientWithNoCredential(endpoint, &clientOpt)
  696. if err != nil {
  697. return nil, fmt.Errorf("unable to create SAS URL client: %w", err)
  698. }
  699. // if using Container level SAS put the container client into the cache
  700. if containerName != "" {
  701. _ = f.cntSVC(containerName)
  702. f.isLimited = true
  703. }
  704. case opt.ClientID != "" && opt.Tenant != "" && opt.ClientSecret != "":
  705. // Service principal with client secret
  706. options := azidentity.ClientSecretCredentialOptions{
  707. ClientOptions: policyClientOptions,
  708. }
  709. cred, err = azidentity.NewClientSecretCredential(opt.Tenant, opt.ClientID, opt.ClientSecret, &options)
  710. if err != nil {
  711. return nil, fmt.Errorf("error creating a client secret credential: %w", err)
  712. }
  713. case opt.ClientID != "" && opt.Tenant != "" && opt.ClientCertificatePath != "":
  714. // Service principal with certificate
  715. //
  716. // Read the certificate
  717. data, err := os.ReadFile(env.ShellExpand(opt.ClientCertificatePath))
  718. if err != nil {
  719. return nil, fmt.Errorf("error reading client certificate file: %w", err)
  720. }
  721. // NewClientCertificateCredential requires at least one *x509.Certificate, and a
  722. // crypto.PrivateKey.
  723. //
  724. // ParseCertificates returns these given certificate data in PEM or PKCS12 format.
  725. // It handles common scenarios but has limitations, for example it doesn't load PEM
  726. // encrypted private keys.
  727. var password []byte
  728. if opt.ClientCertificatePassword != "" {
  729. pw, err := obscure.Reveal(opt.Password)
  730. if err != nil {
  731. return nil, fmt.Errorf("certificate password decode failed - did you obscure it?: %w", err)
  732. }
  733. password = []byte(pw)
  734. }
  735. certs, key, err := azidentity.ParseCertificates(data, password)
  736. if err != nil {
  737. return nil, fmt.Errorf("failed to parse client certificate file: %w", err)
  738. }
  739. options := azidentity.ClientCertificateCredentialOptions{
  740. ClientOptions: policyClientOptions,
  741. SendCertificateChain: opt.ClientSendCertificateChain,
  742. }
  743. cred, err = azidentity.NewClientCertificateCredential(
  744. opt.Tenant, opt.ClientID, certs, key, &options,
  745. )
  746. if err != nil {
  747. return nil, fmt.Errorf("create azure service principal with client certificate credential failed: %w", err)
  748. }
  749. case opt.ClientID != "" && opt.Tenant != "" && opt.Username != "" && opt.Password != "":
  750. // User with username and password
  751. options := azidentity.UsernamePasswordCredentialOptions{
  752. ClientOptions: policyClientOptions,
  753. }
  754. password, err := obscure.Reveal(opt.Password)
  755. if err != nil {
  756. return nil, fmt.Errorf("user password decode failed - did you obscure it?: %w", err)
  757. }
  758. cred, err = azidentity.NewUsernamePasswordCredential(
  759. opt.Tenant, opt.ClientID, opt.Username, password, &options,
  760. )
  761. if err != nil {
  762. return nil, fmt.Errorf("authenticate user with password failed: %w", err)
  763. }
  764. case opt.ServicePrincipalFile != "":
  765. // Loading service principal credentials from file.
  766. loadedCreds, err := os.ReadFile(env.ShellExpand(opt.ServicePrincipalFile))
  767. if err != nil {
  768. return nil, fmt.Errorf("error opening service principal credentials file: %w", err)
  769. }
  770. parsedCreds, err := parseServicePrincipalCredentials(ctx, loadedCreds)
  771. if err != nil {
  772. return nil, fmt.Errorf("error parsing service principal credentials file: %w", err)
  773. }
  774. options := azidentity.ClientSecretCredentialOptions{
  775. ClientOptions: policyClientOptions,
  776. }
  777. cred, err = azidentity.NewClientSecretCredential(parsedCreds.Tenant, parsedCreds.AppID, parsedCreds.Password, &options)
  778. if err != nil {
  779. return nil, fmt.Errorf("error creating a client secret credential: %w", err)
  780. }
  781. case opt.UseMSI:
  782. // Specifying a user-assigned identity. Exactly one of the above IDs must be specified.
  783. // Validate and ensure exactly one is set. (To do: better validation.)
  784. var b2i = map[bool]int{false: 0, true: 1}
  785. set := b2i[opt.MSIClientID != ""] + b2i[opt.MSIObjectID != ""] + b2i[opt.MSIResourceID != ""]
  786. if set > 1 {
  787. return nil, errors.New("more than one user-assigned identity ID is set")
  788. }
  789. var options azidentity.ManagedIdentityCredentialOptions
  790. switch {
  791. case opt.MSIClientID != "":
  792. options.ID = azidentity.ClientID(opt.MSIClientID)
  793. case opt.MSIObjectID != "":
  794. // FIXME this doesn't appear to be in the new SDK?
  795. return nil, fmt.Errorf("MSI object ID is currently unsupported")
  796. case opt.MSIResourceID != "":
  797. options.ID = azidentity.ResourceID(opt.MSIResourceID)
  798. }
  799. cred, err = azidentity.NewManagedIdentityCredential(&options)
  800. if err != nil {
  801. return nil, fmt.Errorf("failed to acquire MSI token: %w", err)
  802. }
  803. default:
  804. return nil, errors.New("no authentication method configured")
  805. }
  806. // Make the client if not already created
  807. if f.svc == nil {
  808. // Work out what the endpoint is if it is still unset
  809. if opt.Endpoint == "" {
  810. if opt.Account == "" {
  811. return nil, fmt.Errorf("account must be set: can't make service URL")
  812. }
  813. u, err := url.Parse(fmt.Sprintf("https://%s.%s", opt.Account, storageDefaultBaseURL))
  814. if err != nil {
  815. return nil, fmt.Errorf("failed to make azure storage URL from account: %w", err)
  816. }
  817. opt.Endpoint = u.String()
  818. }
  819. if sharedKeyCred != nil {
  820. // Shared key cred
  821. f.svc, err = service.NewClientWithSharedKeyCredential(opt.Endpoint, sharedKeyCred, &clientOpt)
  822. if err != nil {
  823. return nil, fmt.Errorf("create client with shared key failed: %w", err)
  824. }
  825. } else if cred != nil {
  826. // Azidentity cred
  827. f.svc, err = service.NewClient(opt.Endpoint, cred, &clientOpt)
  828. if err != nil {
  829. return nil, fmt.Errorf("create client failed: %w", err)
  830. }
  831. }
  832. }
  833. if f.svc == nil {
  834. return nil, fmt.Errorf("internal error: auth failed to make credentials or client")
  835. }
  836. if f.rootContainer != "" && f.rootDirectory != "" {
  837. // Check to see if the (container,directory) is actually an existing file
  838. oldRoot := f.root
  839. newRoot, leaf := path.Split(oldRoot)
  840. f.setRoot(newRoot)
  841. _, err := f.NewObject(ctx, leaf)
  842. if err != nil {
  843. if err == fs.ErrorObjectNotFound || err == fs.ErrorNotAFile {
  844. // File doesn't exist or is a directory so return old f
  845. f.setRoot(oldRoot)
  846. return f, nil
  847. }
  848. return nil, err
  849. }
  850. // return an error with an fs which points to the parent
  851. return f, fs.ErrorIsFile
  852. }
  853. return f, nil
  854. }
  855. // return the container client for the container passed in
  856. func (f *Fs) cntSVC(containerName string) (containerClient *container.Client) {
  857. f.cntSVCcacheMu.Lock()
  858. defer f.cntSVCcacheMu.Unlock()
  859. var ok bool
  860. if containerClient, ok = f.cntSVCcache[containerName]; !ok {
  861. containerClient = f.svc.NewContainerClient(containerName)
  862. f.cntSVCcache[containerName] = containerClient
  863. }
  864. return containerClient
  865. }
  866. // Return an Object from a path
  867. //
  868. // If it can't be found it returns the error fs.ErrorObjectNotFound.
  869. func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *container.BlobItem) (fs.Object, error) {
  870. o := &Object{
  871. fs: f,
  872. remote: remote,
  873. }
  874. if info != nil {
  875. err := o.decodeMetaDataFromBlob(info)
  876. if err != nil {
  877. return nil, err
  878. }
  879. } else if !o.fs.opt.NoHeadObject {
  880. err := o.readMetaData(ctx) // reads info and headers, returning an error
  881. if err != nil {
  882. return nil, err
  883. }
  884. }
  885. return o, nil
  886. }
  887. // NewObject finds the Object at remote. If it can't be found
  888. // it returns the error fs.ErrorObjectNotFound.
  889. func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
  890. return f.newObjectWithInfo(ctx, remote, nil)
  891. }
  892. // getBlobSVC creates a blob client
  893. func (f *Fs) getBlobSVC(container, containerPath string) *blob.Client {
  894. return f.cntSVC(container).NewBlobClient(containerPath)
  895. }
  896. // getBlockBlobSVC creates a block blob client
  897. func (f *Fs) getBlockBlobSVC(container, containerPath string) *blockblob.Client {
  898. return f.cntSVC(container).NewBlockBlobClient(containerPath)
  899. }
  900. // updateMetadataWithModTime adds the modTime passed in to o.meta.
  901. func (o *Object) updateMetadataWithModTime(modTime time.Time) {
  902. metadataMu.Lock()
  903. defer metadataMu.Unlock()
  904. // Make sure o.meta is not nil
  905. if o.meta == nil {
  906. o.meta = make(map[string]string, 1)
  907. }
  908. // Set modTimeKey in it
  909. o.meta[modTimeKey] = modTime.Format(timeFormatOut)
  910. }
  911. // Returns whether file is a directory marker or not
  912. func isDirectoryMarker(size int64, metadata map[string]*string, remote string) bool {
  913. // Directory markers are 0 length
  914. if size == 0 {
  915. endsWithSlash := strings.HasSuffix(remote, "/")
  916. if endsWithSlash || remote == "" {
  917. return true
  918. }
  919. // Note that metadata with hdi_isfolder = true seems to be a
  920. // defacto standard for marking blobs as directories.
  921. // Note also that the metadata hasn't been normalised to lower case yet
  922. for k, v := range metadata {
  923. if v != nil && strings.EqualFold(k, dirMetaKey) && *v == dirMetaValue {
  924. return true
  925. }
  926. }
  927. }
  928. return false
  929. }
  930. // listFn is called from list to handle an object
  931. type listFn func(remote string, object *container.BlobItem, isDirectory bool) error
  932. // list lists the objects into the function supplied from
  933. // the container and root supplied
  934. //
  935. // dir is the starting directory, "" for root
  936. //
  937. // The remote has prefix removed from it and if addContainer is set then
  938. // it adds the container to the start.
  939. func (f *Fs) list(ctx context.Context, containerName, directory, prefix string, addContainer bool, recurse bool, maxResults int32, fn listFn) error {
  940. if f.cache.IsDeleted(containerName) {
  941. return fs.ErrorDirNotFound
  942. }
  943. if prefix != "" {
  944. prefix += "/"
  945. }
  946. if directory != "" {
  947. directory += "/"
  948. }
  949. delimiter := ""
  950. if !recurse {
  951. delimiter = "/"
  952. }
  953. pager := f.cntSVC(containerName).NewListBlobsHierarchyPager(delimiter, &container.ListBlobsHierarchyOptions{
  954. // Copy, Metadata, Snapshots, UncommittedBlobs, Deleted, Tags, Versions, LegalHold, ImmutabilityPolicy, DeletedWithVersions bool
  955. Include: container.ListBlobsInclude{
  956. Copy: false,
  957. Metadata: true,
  958. Snapshots: false,
  959. UncommittedBlobs: false,
  960. Deleted: false,
  961. },
  962. Prefix: &directory,
  963. MaxResults: &maxResults,
  964. })
  965. foundItems := 0
  966. for pager.More() {
  967. var response container.ListBlobsHierarchyResponse
  968. err := f.pacer.Call(func() (bool, error) {
  969. var err error
  970. response, err = pager.NextPage(ctx)
  971. //response, err = f.srv.ListBlobsHierarchySegment(ctx, marker, delimiter, options)
  972. return f.shouldRetry(ctx, err)
  973. })
  974. if err != nil {
  975. // Check http error code along with service code, current SDK doesn't populate service code correctly sometimes
  976. if storageErr, ok := err.(*azcore.ResponseError); ok && (storageErr.ErrorCode == string(bloberror.ContainerNotFound) || storageErr.StatusCode == http.StatusNotFound) {
  977. return fs.ErrorDirNotFound
  978. }
  979. return err
  980. }
  981. // Advance marker to next
  982. // marker = response.NextMarker
  983. foundItems += len(response.Segment.BlobItems)
  984. for i := range response.Segment.BlobItems {
  985. file := response.Segment.BlobItems[i]
  986. // Finish if file name no longer has prefix
  987. // if prefix != "" && !strings.HasPrefix(file.Name, prefix) {
  988. // return nil
  989. // }
  990. if file.Name == nil {
  991. fs.Debugf(f, "Nil name received")
  992. continue
  993. }
  994. remote := f.opt.Enc.ToStandardPath(*file.Name)
  995. if !strings.HasPrefix(remote, prefix) {
  996. fs.Debugf(f, "Odd name received %q", remote)
  997. continue
  998. }
  999. isDirectory := isDirectoryMarker(*file.Properties.ContentLength, file.Metadata, remote)
  1000. if isDirectory {
  1001. // Don't insert the root directory
  1002. if remote == directory {
  1003. continue
  1004. }
  1005. // process directory markers as directories
  1006. remote = strings.TrimRight(remote, "/")
  1007. }
  1008. remote = remote[len(prefix):]
  1009. if addContainer {
  1010. remote = path.Join(containerName, remote)
  1011. }
  1012. // Send object
  1013. err = fn(remote, file, isDirectory)
  1014. if err != nil {
  1015. return err
  1016. }
  1017. }
  1018. // Send the subdirectories
  1019. foundItems += len(response.Segment.BlobPrefixes)
  1020. for _, remote := range response.Segment.BlobPrefixes {
  1021. if remote.Name == nil {
  1022. fs.Debugf(f, "Nil prefix received")
  1023. continue
  1024. }
  1025. remote := strings.TrimRight(*remote.Name, "/")
  1026. remote = f.opt.Enc.ToStandardPath(remote)
  1027. if !strings.HasPrefix(remote, prefix) {
  1028. fs.Debugf(f, "Odd directory name received %q", remote)
  1029. continue
  1030. }
  1031. remote = remote[len(prefix):]
  1032. if addContainer {
  1033. remote = path.Join(containerName, remote)
  1034. }
  1035. // Send object
  1036. err = fn(remote, nil, true)
  1037. if err != nil {
  1038. return err
  1039. }
  1040. }
  1041. }
  1042. if f.opt.DirectoryMarkers && foundItems == 0 && directory != "" {
  1043. // Determine whether the directory exists or not by whether it has a marker
  1044. _, err := f.readMetaData(ctx, containerName, directory)
  1045. if err != nil {
  1046. if err == fs.ErrorObjectNotFound {
  1047. return fs.ErrorDirNotFound
  1048. }
  1049. return err
  1050. }
  1051. }
  1052. return nil
  1053. }
  1054. // Convert a list item into a DirEntry
  1055. func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *container.BlobItem, isDirectory bool) (fs.DirEntry, error) {
  1056. if isDirectory {
  1057. d := fs.NewDir(remote, time.Time{})
  1058. return d, nil
  1059. }
  1060. o, err := f.newObjectWithInfo(ctx, remote, object)
  1061. if err != nil {
  1062. return nil, err
  1063. }
  1064. return o, nil
  1065. }
  1066. // Check to see if this is a limited container and the container is not found
  1067. func (f *Fs) containerOK(container string) bool {
  1068. if !f.isLimited {
  1069. return true
  1070. }
  1071. f.cntSVCcacheMu.Lock()
  1072. defer f.cntSVCcacheMu.Unlock()
  1073. for limitedContainer := range f.cntSVCcache {
  1074. if container == limitedContainer {
  1075. return true
  1076. }
  1077. }
  1078. return false
  1079. }
  1080. // listDir lists a single directory
  1081. func (f *Fs) listDir(ctx context.Context, containerName, directory, prefix string, addContainer bool) (entries fs.DirEntries, err error) {
  1082. if !f.containerOK(containerName) {
  1083. return nil, fs.ErrorDirNotFound
  1084. }
  1085. err = f.list(ctx, containerName, directory, prefix, addContainer, false, int32(f.opt.ListChunkSize), func(remote string, object *container.BlobItem, isDirectory bool) error {
  1086. entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory)
  1087. if err != nil {
  1088. return err
  1089. }
  1090. if entry != nil {
  1091. entries = append(entries, entry)
  1092. }
  1093. return nil
  1094. })
  1095. if err != nil {
  1096. return nil, err
  1097. }
  1098. // container must be present if listing succeeded
  1099. f.cache.MarkOK(containerName)
  1100. return entries, nil
  1101. }
  1102. // listContainers returns all the containers to out
  1103. func (f *Fs) listContainers(ctx context.Context) (entries fs.DirEntries, err error) {
  1104. if f.isLimited {
  1105. f.cntSVCcacheMu.Lock()
  1106. for container := range f.cntSVCcache {
  1107. d := fs.NewDir(container, time.Time{})
  1108. entries = append(entries, d)
  1109. }
  1110. f.cntSVCcacheMu.Unlock()
  1111. return entries, nil
  1112. }
  1113. err = f.listContainersToFn(func(Name string, LastModified time.Time) error {
  1114. d := fs.NewDir(f.opt.Enc.ToStandardName(Name), LastModified)
  1115. f.cache.MarkOK(Name)
  1116. entries = append(entries, d)
  1117. return nil
  1118. })
  1119. if err != nil {
  1120. return nil, err
  1121. }
  1122. return entries, nil
  1123. }
  1124. // List the objects and directories in dir into entries. The
  1125. // entries can be returned in any order but should be for a
  1126. // complete directory.
  1127. //
  1128. // dir should be "" to list the root, and should not have
  1129. // trailing slashes.
  1130. //
  1131. // This should return ErrDirNotFound if the directory isn't
  1132. // found.
  1133. func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
  1134. container, directory := f.split(dir)
  1135. if container == "" {
  1136. if directory != "" {
  1137. return nil, fs.ErrorListBucketRequired
  1138. }
  1139. return f.listContainers(ctx)
  1140. }
  1141. return f.listDir(ctx, container, directory, f.rootDirectory, f.rootContainer == "")
  1142. }
  1143. // ListR lists the objects and directories of the Fs starting
  1144. // from dir recursively into out.
  1145. //
  1146. // dir should be "" to start from the root, and should not
  1147. // have trailing slashes.
  1148. //
  1149. // This should return ErrDirNotFound if the directory isn't
  1150. // found.
  1151. //
  1152. // It should call callback for each tranche of entries read.
  1153. // These need not be returned in any particular order. If
  1154. // callback returns an error then the listing will stop
  1155. // immediately.
  1156. //
  1157. // Don't implement this unless you have a more efficient way
  1158. // of listing recursively that doing a directory traversal.
  1159. func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
  1160. containerName, directory := f.split(dir)
  1161. list := walk.NewListRHelper(callback)
  1162. listR := func(containerName, directory, prefix string, addContainer bool) error {
  1163. return f.list(ctx, containerName, directory, prefix, addContainer, true, int32(f.opt.ListChunkSize), func(remote string, object *container.BlobItem, isDirectory bool) error {
  1164. entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory)
  1165. if err != nil {
  1166. return err
  1167. }
  1168. return list.Add(entry)
  1169. })
  1170. }
  1171. if containerName == "" {
  1172. entries, err := f.listContainers(ctx)
  1173. if err != nil {
  1174. return err
  1175. }
  1176. for _, entry := range entries {
  1177. err = list.Add(entry)
  1178. if err != nil {
  1179. return err
  1180. }
  1181. container := entry.Remote()
  1182. err = listR(container, "", f.rootDirectory, true)
  1183. if err != nil {
  1184. return err
  1185. }
  1186. // container must be present if listing succeeded
  1187. f.cache.MarkOK(container)
  1188. }
  1189. } else {
  1190. if !f.containerOK(containerName) {
  1191. return fs.ErrorDirNotFound
  1192. }
  1193. err = listR(containerName, directory, f.rootDirectory, f.rootContainer == "")
  1194. if err != nil {
  1195. return err
  1196. }
  1197. // container must be present if listing succeeded
  1198. f.cache.MarkOK(containerName)
  1199. }
  1200. return list.Flush()
  1201. }
  1202. // listContainerFn is called from listContainersToFn to handle a container
  1203. type listContainerFn func(Name string, LastModified time.Time) error
  1204. // listContainersToFn lists the containers to the function supplied
  1205. func (f *Fs) listContainersToFn(fn listContainerFn) error {
  1206. max := int32(f.opt.ListChunkSize)
  1207. pager := f.svc.NewListContainersPager(&service.ListContainersOptions{
  1208. Include: service.ListContainersInclude{Metadata: true, Deleted: true},
  1209. MaxResults: &max,
  1210. })
  1211. ctx := context.Background()
  1212. for pager.More() {
  1213. var response service.ListContainersResponse
  1214. err := f.pacer.Call(func() (bool, error) {
  1215. var err error
  1216. response, err = pager.NextPage(ctx)
  1217. return f.shouldRetry(ctx, err)
  1218. })
  1219. if err != nil {
  1220. return err
  1221. }
  1222. for _, cnt := range response.ContainerItems {
  1223. if cnt == nil || cnt.Name == nil || cnt.Properties == nil || cnt.Properties.LastModified == nil {
  1224. fs.Debugf(f, "nil returned in container info")
  1225. }
  1226. err = fn(*cnt.Name, *cnt.Properties.LastModified)
  1227. if err != nil {
  1228. return err
  1229. }
  1230. }
  1231. }
  1232. return nil
  1233. }
  1234. // Put the object into the container
  1235. //
  1236. // Copy the reader in to the new object which is returned.
  1237. //
  1238. // The new object may have been created if an error is returned
  1239. func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
  1240. // Temporary Object under construction
  1241. fs := &Object{
  1242. fs: f,
  1243. remote: src.Remote(),
  1244. }
  1245. return fs, fs.Update(ctx, in, src, options...)
  1246. }
  1247. // PutStream uploads to the remote path with the modTime given of indeterminate size
  1248. func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
  1249. return f.Put(ctx, in, src, options...)
  1250. }
  1251. // Create directory marker file and parents
  1252. func (f *Fs) createDirectoryMarker(ctx context.Context, container, dir string) error {
  1253. if !f.opt.DirectoryMarkers || container == "" {
  1254. return nil
  1255. }
  1256. // Object to be uploaded
  1257. o := &Object{
  1258. fs: f,
  1259. modTime: time.Now(),
  1260. meta: map[string]string{
  1261. dirMetaKey: dirMetaValue,
  1262. },
  1263. }
  1264. for {
  1265. _, containerPath := f.split(dir)
  1266. // Don't create the directory marker if it is the bucket or at the very root
  1267. if containerPath == "" {
  1268. break
  1269. }
  1270. o.remote = dir + "/"
  1271. // Check to see if object already exists
  1272. _, err := f.readMetaData(ctx, container, containerPath+"/")
  1273. if err == nil {
  1274. return nil
  1275. }
  1276. // Upload it if not
  1277. fs.Debugf(o, "Creating directory marker")
  1278. content := io.Reader(strings.NewReader(""))
  1279. err = o.Update(ctx, content, o)
  1280. if err != nil {
  1281. return fmt.Errorf("creating directory marker failed: %w", err)
  1282. }
  1283. // Now check parent directory exists
  1284. dir = path.Dir(dir)
  1285. if dir == "/" || dir == "." {
  1286. break
  1287. }
  1288. }
  1289. return nil
  1290. }
  1291. // Mkdir creates the container if it doesn't exist
  1292. func (f *Fs) Mkdir(ctx context.Context, dir string) error {
  1293. container, _ := f.split(dir)
  1294. e := f.makeContainer(ctx, container)
  1295. if e != nil {
  1296. return e
  1297. }
  1298. return f.createDirectoryMarker(ctx, container, dir)
  1299. }
  1300. // mkdirParent creates the parent bucket/directory if it doesn't exist
  1301. func (f *Fs) mkdirParent(ctx context.Context, remote string) error {
  1302. remote = strings.TrimRight(remote, "/")
  1303. dir := path.Dir(remote)
  1304. if dir == "/" || dir == "." {
  1305. dir = ""
  1306. }
  1307. return f.Mkdir(ctx, dir)
  1308. }
  1309. // makeContainer creates the container if it doesn't exist
  1310. func (f *Fs) makeContainer(ctx context.Context, container string) error {
  1311. if f.opt.NoCheckContainer {
  1312. return nil
  1313. }
  1314. return f.cache.Create(container, func() error {
  1315. // If this is a SAS URL limited to a container then assume it is already created
  1316. if f.isLimited {
  1317. return nil
  1318. }
  1319. opt := service.CreateContainerOptions{
  1320. // Optional. Specifies a user-defined name-value pair associated with the blob.
  1321. //Metadata map[string]string
  1322. // Optional. Specifies the encryption scope settings to set on the container.
  1323. //CpkScopeInfo *CpkScopeInfo
  1324. }
  1325. if f.publicAccess != "" {
  1326. // Specifies whether data in the container may be accessed publicly and the level of access
  1327. opt.Access = &f.publicAccess
  1328. }
  1329. // now try to create the container
  1330. return f.pacer.Call(func() (bool, error) {
  1331. _, err := f.svc.CreateContainer(ctx, container, &opt)
  1332. if err != nil {
  1333. if storageErr, ok := err.(*azcore.ResponseError); ok {
  1334. switch bloberror.Code(storageErr.ErrorCode) {
  1335. case bloberror.ContainerAlreadyExists:
  1336. return false, nil
  1337. case bloberror.ContainerBeingDeleted:
  1338. // From https://docs.microsoft.com/en-us/rest/api/storageservices/delete-container
  1339. // When a container is deleted, a container with the same name cannot be created
  1340. // for at least 30 seconds; the container may not be available for more than 30
  1341. // seconds if the service is still processing the request.
  1342. time.Sleep(6 * time.Second) // default 10 retries will be 60 seconds
  1343. f.cache.MarkDeleted(container)
  1344. return true, err
  1345. case bloberror.AuthorizationFailure:
  1346. // Assume that the user does not have permission to
  1347. // create the container and carry on anyway.
  1348. fs.Debugf(f, "Tried to create container but got %s error - carrying on assuming container exists. Use no_check_container to stop this check..", storageErr.ErrorCode)
  1349. return false, nil
  1350. }
  1351. }
  1352. }
  1353. return f.shouldRetry(ctx, err)
  1354. })
  1355. }, nil)
  1356. }
  1357. // isEmpty checks to see if a given (container, directory) is empty and returns an error if not
  1358. func (f *Fs) isEmpty(ctx context.Context, containerName, directory string) (err error) {
  1359. empty := true
  1360. err = f.list(ctx, containerName, directory, f.rootDirectory, f.rootContainer == "", true, 1, func(remote string, object *container.BlobItem, isDirectory bool) error {
  1361. empty = false
  1362. return nil
  1363. })
  1364. if err != nil {
  1365. return err
  1366. }
  1367. if !empty {
  1368. return fs.ErrorDirectoryNotEmpty
  1369. }
  1370. return nil
  1371. }
  1372. // deleteContainer deletes the container. It can delete a full
  1373. // container so use isEmpty if you don't want that.
  1374. func (f *Fs) deleteContainer(ctx context.Context, containerName string) error {
  1375. return f.cache.Remove(containerName, func() error {
  1376. getOptions := container.GetPropertiesOptions{}
  1377. delOptions := container.DeleteOptions{}
  1378. return f.pacer.Call(func() (bool, error) {
  1379. _, err := f.cntSVC(containerName).GetProperties(ctx, &getOptions)
  1380. if err == nil {
  1381. _, err = f.cntSVC(containerName).Delete(ctx, &delOptions)
  1382. }
  1383. if err != nil {
  1384. // Check http error code along with service code, current SDK doesn't populate service code correctly sometimes
  1385. if storageErr, ok := err.(*azcore.ResponseError); ok && (storageErr.ErrorCode == string(bloberror.ContainerNotFound) || storageErr.StatusCode == http.StatusNotFound) {
  1386. return false, fs.ErrorDirNotFound
  1387. }
  1388. return f.shouldRetry(ctx, err)
  1389. }
  1390. return f.shouldRetry(ctx, err)
  1391. })
  1392. })
  1393. }
  1394. // Rmdir deletes the container if the fs is at the root
  1395. //
  1396. // Returns an error if it isn't empty
  1397. func (f *Fs) Rmdir(ctx context.Context, dir string) error {
  1398. container, directory := f.split(dir)
  1399. // Remove directory marker file
  1400. if f.opt.DirectoryMarkers && container != "" && directory != "" {
  1401. o := &Object{
  1402. fs: f,
  1403. remote: dir + "/",
  1404. }
  1405. fs.Debugf(o, "Removing directory marker")
  1406. err := o.Remove(ctx)
  1407. if err != nil {
  1408. return fmt.Errorf("removing directory marker failed: %w", err)
  1409. }
  1410. }
  1411. if container == "" || directory != "" {
  1412. return nil
  1413. }
  1414. err := f.isEmpty(ctx, container, directory)
  1415. if err != nil {
  1416. return err
  1417. }
  1418. return f.deleteContainer(ctx, container)
  1419. }
  1420. // Precision of the remote
  1421. func (f *Fs) Precision() time.Duration {
  1422. return time.Nanosecond
  1423. }
  1424. // Hashes returns the supported hash sets.
  1425. func (f *Fs) Hashes() hash.Set {
  1426. return hash.Set(hash.MD5)
  1427. }
  1428. // Purge deletes all the files and directories including the old versions.
  1429. func (f *Fs) Purge(ctx context.Context, dir string) error {
  1430. container, directory := f.split(dir)
  1431. if container == "" {
  1432. return errors.New("can't purge from root")
  1433. }
  1434. if directory != "" {
  1435. // Delegate to caller if not root of a container
  1436. return fs.ErrorCantPurge
  1437. }
  1438. return f.deleteContainer(ctx, container)
  1439. }
  1440. // Copy src to this remote using server-side copy operations.
  1441. //
  1442. // This is stored with the remote path given.
  1443. //
  1444. // It returns the destination Object and a possible error.
  1445. //
  1446. // Will only be called if src.Fs().Name() == f.Name()
  1447. //
  1448. // If it isn't possible then return fs.ErrorCantCopy
  1449. func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
  1450. dstContainer, dstPath := f.split(remote)
  1451. err := f.mkdirParent(ctx, remote)
  1452. if err != nil {
  1453. return nil, err
  1454. }
  1455. srcObj, ok := src.(*Object)
  1456. if !ok {
  1457. fs.Debugf(src, "Can't copy - not same remote type")
  1458. return nil, fs.ErrorCantCopy
  1459. }
  1460. dstBlobSVC := f.getBlobSVC(dstContainer, dstPath)
  1461. srcBlobSVC := srcObj.getBlobSVC()
  1462. srcURL := srcBlobSVC.URL()
  1463. options := blob.StartCopyFromURLOptions{
  1464. Tier: parseTier(f.opt.AccessTier),
  1465. }
  1466. var startCopy blob.StartCopyFromURLResponse
  1467. err = f.pacer.Call(func() (bool, error) {
  1468. startCopy, err = dstBlobSVC.StartCopyFromURL(ctx, srcURL, &options)
  1469. return f.shouldRetry(ctx, err)
  1470. })
  1471. if err != nil {
  1472. return nil, err
  1473. }
  1474. copyStatus := startCopy.CopyStatus
  1475. getOptions := blob.GetPropertiesOptions{}
  1476. for copyStatus != nil && string(*copyStatus) == string(container.CopyStatusTypePending) {
  1477. time.Sleep(1 * time.Second)
  1478. getMetadata, err := dstBlobSVC.GetProperties(ctx, &getOptions)
  1479. if err != nil {
  1480. return nil, err
  1481. }
  1482. copyStatus = getMetadata.CopyStatus
  1483. }
  1484. return f.NewObject(ctx, remote)
  1485. }
  1486. // ------------------------------------------------------------
  1487. // Fs returns the parent Fs
  1488. func (o *Object) Fs() fs.Info {
  1489. return o.fs
  1490. }
  1491. // Return a string version
  1492. func (o *Object) String() string {
  1493. if o == nil {
  1494. return "<nil>"
  1495. }
  1496. return o.remote
  1497. }
  1498. // Remote returns the remote path
  1499. func (o *Object) Remote() string {
  1500. return o.remote
  1501. }
  1502. // Hash returns the MD5 of an object returning a lowercase hex string
  1503. func (o *Object) Hash(ctx context.Context, t hash.Type) (string, error) {
  1504. if t != hash.MD5 {
  1505. return "", hash.ErrUnsupported
  1506. }
  1507. // Convert base64 encoded md5 into lower case hex
  1508. if o.md5 == "" {
  1509. return "", nil
  1510. }
  1511. data, err := base64.StdEncoding.DecodeString(o.md5)
  1512. if err != nil {
  1513. return "", fmt.Errorf("failed to decode Content-MD5: %q: %w", o.md5, err)
  1514. }
  1515. return hex.EncodeToString(data), nil
  1516. }
  1517. // Size returns the size of an object in bytes
  1518. func (o *Object) Size() int64 {
  1519. return o.size
  1520. }
  1521. // Set o.metadata from metadata
  1522. func (o *Object) setMetadata(metadata map[string]*string) {
  1523. metadataMu.Lock()
  1524. defer metadataMu.Unlock()
  1525. if len(metadata) > 0 {
  1526. // Lower case the metadata
  1527. o.meta = make(map[string]string, len(metadata))
  1528. for k, v := range metadata {
  1529. if v != nil {
  1530. o.meta[strings.ToLower(k)] = *v
  1531. }
  1532. }
  1533. // Set o.modTime from metadata if it exists and
  1534. // UseServerModTime isn't in use.
  1535. if modTime, ok := o.meta[modTimeKey]; !o.fs.ci.UseServerModTime && ok {
  1536. when, err := time.Parse(timeFormatIn, modTime)
  1537. if err != nil {
  1538. fs.Debugf(o, "Couldn't parse %v = %q: %v", modTimeKey, modTime, err)
  1539. }
  1540. o.modTime = when
  1541. }
  1542. } else {
  1543. o.meta = nil
  1544. }
  1545. }
  1546. // Get metadata from o.meta
  1547. func (o *Object) getMetadata() (metadata map[string]*string) {
  1548. metadataMu.Lock()
  1549. defer metadataMu.Unlock()
  1550. if len(o.meta) == 0 {
  1551. return nil
  1552. }
  1553. metadata = make(map[string]*string, len(o.meta))
  1554. for k, v := range o.meta {
  1555. v := v
  1556. metadata[k] = &v
  1557. }
  1558. return metadata
  1559. }
  1560. // decodeMetaDataFromPropertiesResponse sets the metadata from the data passed in
  1561. //
  1562. // Sets
  1563. //
  1564. // o.id
  1565. // o.modTime
  1566. // o.size
  1567. // o.md5
  1568. // o.meta
  1569. func (o *Object) decodeMetaDataFromPropertiesResponse(info *blob.GetPropertiesResponse) (err error) {
  1570. metadata := info.Metadata
  1571. var size int64
  1572. if info.ContentLength == nil {
  1573. size = -1
  1574. } else {
  1575. size = *info.ContentLength
  1576. }
  1577. if isDirectoryMarker(size, metadata, o.remote) {
  1578. return fs.ErrorNotAFile
  1579. }
  1580. // NOTE - Client library always returns MD5 as base64 decoded string, Object needs to maintain
  1581. // this as base64 encoded string.
  1582. o.md5 = base64.StdEncoding.EncodeToString(info.ContentMD5)
  1583. if info.ContentType == nil {
  1584. o.mimeType = ""
  1585. } else {
  1586. o.mimeType = *info.ContentType
  1587. }
  1588. o.size = size
  1589. if info.LastModified == nil {
  1590. o.modTime = time.Now()
  1591. } else {
  1592. o.modTime = *info.LastModified
  1593. }
  1594. if info.AccessTier == nil {
  1595. o.accessTier = blob.AccessTier("")
  1596. } else {
  1597. o.accessTier = blob.AccessTier(*info.AccessTier)
  1598. }
  1599. o.setMetadata(metadata)
  1600. return nil
  1601. }
  1602. func (o *Object) decodeMetaDataFromDownloadResponse(info *blob.DownloadStreamResponse) (err error) {
  1603. metadata := info.Metadata
  1604. var size int64
  1605. if info.ContentLength == nil {
  1606. size = -1
  1607. } else {
  1608. size = *info.ContentLength
  1609. }
  1610. if isDirectoryMarker(size, metadata, o.remote) {
  1611. return fs.ErrorNotAFile
  1612. }
  1613. // NOTE - Client library always returns MD5 as base64 decoded string, Object needs to maintain
  1614. // this as base64 encoded string.
  1615. o.md5 = base64.StdEncoding.EncodeToString(info.ContentMD5)
  1616. if info.ContentType == nil {
  1617. o.mimeType = ""
  1618. } else {
  1619. o.mimeType = *info.ContentType
  1620. }
  1621. o.size = size
  1622. if info.LastModified == nil {
  1623. o.modTime = time.Now()
  1624. } else {
  1625. o.modTime = *info.LastModified
  1626. }
  1627. // FIXME response doesn't appear to have AccessTier in?
  1628. // if info.AccessTier == nil {
  1629. // o.accessTier = blob.AccessTier("")
  1630. // } else {
  1631. // o.accessTier = blob.AccessTier(*info.AccessTier)
  1632. // }
  1633. o.setMetadata(metadata)
  1634. // If it was a Range request, the size is wrong, so correct it
  1635. if info.ContentRange != nil {
  1636. contentRange := *info.ContentRange
  1637. slash := strings.IndexRune(contentRange, '/')
  1638. if slash >= 0 {
  1639. i, err := strconv.ParseInt(contentRange[slash+1:], 10, 64)
  1640. if err == nil {
  1641. o.size = i
  1642. } else {
  1643. fs.Debugf(o, "Failed to find parse integer from in %q: %v", contentRange, err)
  1644. }
  1645. } else {
  1646. fs.Debugf(o, "Failed to find length in %q", contentRange)
  1647. }
  1648. }
  1649. return nil
  1650. }
  1651. func (o *Object) decodeMetaDataFromBlob(info *container.BlobItem) (err error) {
  1652. if info.Properties == nil {
  1653. return errors.New("nil Properties in decodeMetaDataFromBlob")
  1654. }
  1655. metadata := info.Metadata
  1656. var size int64
  1657. if info.Properties.ContentLength == nil {
  1658. size = -1
  1659. } else {
  1660. size = *info.Properties.ContentLength
  1661. }
  1662. if isDirectoryMarker(size, metadata, o.remote) {
  1663. return fs.ErrorNotAFile
  1664. }
  1665. // NOTE - Client library always returns MD5 as base64 decoded string, Object needs to maintain
  1666. // this as base64 encoded string.
  1667. o.md5 = base64.StdEncoding.EncodeToString(info.Properties.ContentMD5)
  1668. if info.Properties.ContentType == nil {
  1669. o.mimeType = ""
  1670. } else {
  1671. o.mimeType = *info.Properties.ContentType
  1672. }
  1673. o.size = size
  1674. if info.Properties.LastModified == nil {
  1675. o.modTime = time.Now()
  1676. } else {
  1677. o.modTime = *info.Properties.LastModified
  1678. }
  1679. if info.Properties.AccessTier == nil {
  1680. o.accessTier = blob.AccessTier("")
  1681. } else {
  1682. o.accessTier = *info.Properties.AccessTier
  1683. }
  1684. o.setMetadata(metadata)
  1685. return nil
  1686. }
  1687. // getBlobSVC creates a blob client
  1688. func (o *Object) getBlobSVC() *blob.Client {
  1689. container, directory := o.split()
  1690. return o.fs.getBlobSVC(container, directory)
  1691. }
  1692. // clearMetaData clears enough metadata so readMetaData will re-read it
  1693. func (o *Object) clearMetaData() {
  1694. o.modTime = time.Time{}
  1695. }
  1696. // readMetaData gets the metadata if it hasn't already been fetched
  1697. func (f *Fs) readMetaData(ctx context.Context, container, containerPath string) (blobProperties blob.GetPropertiesResponse, err error) {
  1698. if !f.containerOK(container) {
  1699. return blobProperties, fs.ErrorObjectNotFound
  1700. }
  1701. blb := f.getBlobSVC(container, containerPath)
  1702. // Read metadata (this includes metadata)
  1703. options := blob.GetPropertiesOptions{}
  1704. err = f.pacer.Call(func() (bool, error) {
  1705. blobProperties, err = blb.GetProperties(ctx, &options)
  1706. return f.shouldRetry(ctx, err)
  1707. })
  1708. if err != nil {
  1709. // On directories - GetProperties does not work and current SDK does not populate service code correctly hence check regular http response as well
  1710. if storageErr, ok := err.(*azcore.ResponseError); ok && (storageErr.ErrorCode == string(bloberror.BlobNotFound) || storageErr.StatusCode == http.StatusNotFound) {
  1711. return blobProperties, fs.ErrorObjectNotFound
  1712. }
  1713. return blobProperties, err
  1714. }
  1715. return blobProperties, nil
  1716. }
  1717. // readMetaData gets the metadata if it hasn't already been fetched
  1718. //
  1719. // Sets
  1720. //
  1721. // o.id
  1722. // o.modTime
  1723. // o.size
  1724. // o.md5
  1725. func (o *Object) readMetaData(ctx context.Context) (err error) {
  1726. if !o.modTime.IsZero() {
  1727. return nil
  1728. }
  1729. container, containerPath := o.split()
  1730. blobProperties, err := o.fs.readMetaData(ctx, container, containerPath)
  1731. if err != nil {
  1732. return err
  1733. }
  1734. return o.decodeMetaDataFromPropertiesResponse(&blobProperties)
  1735. }
  1736. // ModTime returns the modification time of the object
  1737. //
  1738. // It attempts to read the objects mtime and if that isn't present the
  1739. // LastModified returned in the http headers
  1740. func (o *Object) ModTime(ctx context.Context) (result time.Time) {
  1741. // The error is logged in readMetaData
  1742. _ = o.readMetaData(ctx)
  1743. return o.modTime
  1744. }
  1745. // SetModTime sets the modification time of the local fs object
  1746. func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
  1747. o.updateMetadataWithModTime(modTime)
  1748. blb := o.getBlobSVC()
  1749. opt := blob.SetMetadataOptions{}
  1750. err := o.fs.pacer.Call(func() (bool, error) {
  1751. _, err := blb.SetMetadata(ctx, o.getMetadata(), &opt)
  1752. return o.fs.shouldRetry(ctx, err)
  1753. })
  1754. if err != nil {
  1755. return err
  1756. }
  1757. o.modTime = modTime
  1758. return nil
  1759. }
  1760. // Storable returns if this object is storable
  1761. func (o *Object) Storable() bool {
  1762. return true
  1763. }
  1764. // Open an object for read
  1765. func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) {
  1766. // Offset and Count for range download
  1767. var offset int64
  1768. var count int64
  1769. if o.AccessTier() == blob.AccessTierArchive {
  1770. return nil, fmt.Errorf("blob in archive tier, you need to set tier to hot, cool, cold first")
  1771. }
  1772. fs.FixRangeOption(options, o.size)
  1773. for _, option := range options {
  1774. switch x := option.(type) {
  1775. case *fs.RangeOption:
  1776. offset, count = x.Decode(o.size)
  1777. if count < 0 {
  1778. count = o.size - offset
  1779. }
  1780. case *fs.SeekOption:
  1781. offset = x.Offset
  1782. default:
  1783. if option.Mandatory() {
  1784. fs.Logf(o, "Unsupported mandatory option: %v", option)
  1785. }
  1786. }
  1787. }
  1788. blb := o.getBlobSVC()
  1789. opt := blob.DownloadStreamOptions{
  1790. // When set to true and specified together with the Range, the service returns the MD5 hash for the range, as long as the
  1791. // range is less than or equal to 4 MB in size.
  1792. //RangeGetContentMD5 *bool
  1793. // Range specifies a range of bytes. The default value is all bytes.
  1794. //Range HTTPRange
  1795. Range: blob.HTTPRange{
  1796. Offset: offset,
  1797. Count: count,
  1798. },
  1799. // AccessConditions *AccessConditions
  1800. // CpkInfo *CpkInfo
  1801. // CpkScopeInfo *CpkScopeInfo
  1802. }
  1803. var downloadResponse blob.DownloadStreamResponse
  1804. err = o.fs.pacer.Call(func() (bool, error) {
  1805. downloadResponse, err = blb.DownloadStream(ctx, &opt)
  1806. return o.fs.shouldRetry(ctx, err)
  1807. })
  1808. if err != nil {
  1809. return nil, fmt.Errorf("failed to open for download: %w", err)
  1810. }
  1811. err = o.decodeMetaDataFromDownloadResponse(&downloadResponse)
  1812. if err != nil {
  1813. return nil, fmt.Errorf("failed to decode metadata for download: %w", err)
  1814. }
  1815. return downloadResponse.Body, nil
  1816. }
  1817. // Converts a string into a pointer to a string
  1818. func pString(s string) *string {
  1819. return &s
  1820. }
  1821. // readSeekCloser joins an io.Reader and an io.Seeker and provides a no-op io.Closer
  1822. type readSeekCloser struct {
  1823. io.Reader
  1824. io.Seeker
  1825. }
  1826. // Close does nothing
  1827. func (rs *readSeekCloser) Close() error {
  1828. return nil
  1829. }
  1830. // record chunk number and id for Close
  1831. type azBlock struct {
  1832. chunkNumber uint64
  1833. id string
  1834. }
  1835. // Implements the fs.ChunkWriter interface
  1836. type azChunkWriter struct {
  1837. chunkSize int64
  1838. size int64
  1839. f *Fs
  1840. ui uploadInfo
  1841. blocksMu sync.Mutex // protects the below
  1842. blocks []azBlock // list of blocks for finalize
  1843. o *Object
  1844. }
  1845. // OpenChunkWriter returns the chunk size and a ChunkWriter
  1846. //
  1847. // Pass in the remote and the src object
  1848. // You can also use options to hint at the desired chunk size
  1849. func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) {
  1850. // Temporary Object under construction
  1851. o := &Object{
  1852. fs: f,
  1853. remote: remote,
  1854. }
  1855. ui, err := o.prepareUpload(ctx, src, options)
  1856. if err != nil {
  1857. return info, nil, fmt.Errorf("failed to prepare upload: %w", err)
  1858. }
  1859. // Calculate correct partSize
  1860. partSize := f.opt.ChunkSize
  1861. totalParts := -1
  1862. size := src.Size()
  1863. // Note that the max size of file is 4.75 TB (100 MB X 50,000
  1864. // blocks) and this is bigger than the max uncommitted block
  1865. // size (9.52 TB) so we do not need to part commit block lists
  1866. // or garbage collect uncommitted blocks.
  1867. //
  1868. // See: https://docs.microsoft.com/en-gb/rest/api/storageservices/put-block
  1869. // size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize
  1870. // buffers here (default 4MB). With a maximum number of parts (50,000) this will be a file of
  1871. // 195GB which seems like a not too unreasonable limit.
  1872. if size == -1 {
  1873. warnStreamUpload.Do(func() {
  1874. fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v",
  1875. f.opt.ChunkSize, partSize*fs.SizeSuffix(blockblob.MaxBlocks))
  1876. })
  1877. } else {
  1878. partSize = chunksize.Calculator(remote, size, blockblob.MaxBlocks, f.opt.ChunkSize)
  1879. if partSize > fs.SizeSuffix(blockblob.MaxStageBlockBytes) {
  1880. return info, nil, fmt.Errorf("can't upload as it is too big %v - takes more than %d chunks of %v", fs.SizeSuffix(size), fs.SizeSuffix(blockblob.MaxBlocks), fs.SizeSuffix(blockblob.MaxStageBlockBytes))
  1881. }
  1882. totalParts = int(fs.SizeSuffix(size) / partSize)
  1883. if fs.SizeSuffix(size)%partSize != 0 {
  1884. totalParts++
  1885. }
  1886. }
  1887. fs.Debugf(o, "Multipart upload session started for %d parts of size %v", totalParts, partSize)
  1888. chunkWriter := &azChunkWriter{
  1889. chunkSize: int64(partSize),
  1890. size: size,
  1891. f: f,
  1892. ui: ui,
  1893. o: o,
  1894. }
  1895. info = fs.ChunkWriterInfo{
  1896. ChunkSize: int64(partSize),
  1897. Concurrency: o.fs.opt.UploadConcurrency,
  1898. //LeavePartsOnError: o.fs.opt.LeavePartsOnError,
  1899. }
  1900. fs.Debugf(o, "open chunk writer: started multipart upload")
  1901. return info, chunkWriter, nil
  1902. }
  1903. // WriteChunk will write chunk number with reader bytes, where chunk number >= 0
  1904. func (w *azChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) {
  1905. if chunkNumber < 0 {
  1906. err := fmt.Errorf("invalid chunk number provided: %v", chunkNumber)
  1907. return -1, err
  1908. }
  1909. // Upload the block, with MD5 for check
  1910. m := md5.New()
  1911. currentChunkSize, err := io.Copy(m, reader)
  1912. if err != nil {
  1913. return -1, err
  1914. }
  1915. // If no data read, don't write the chunk
  1916. if currentChunkSize == 0 {
  1917. return 0, nil
  1918. }
  1919. md5sum := m.Sum(nil)
  1920. transactionalMD5 := md5sum[:]
  1921. // increment the blockID and save the blocks for finalize
  1922. var binaryBlockID [8]byte // block counter as LSB first 8 bytes
  1923. binary.LittleEndian.PutUint64(binaryBlockID[:], uint64(chunkNumber))
  1924. blockID := base64.StdEncoding.EncodeToString(binaryBlockID[:])
  1925. // Save the blockID for the commit
  1926. w.blocksMu.Lock()
  1927. w.blocks = append(w.blocks, azBlock{
  1928. chunkNumber: uint64(chunkNumber),
  1929. id: blockID,
  1930. })
  1931. w.blocksMu.Unlock()
  1932. err = w.f.pacer.Call(func() (bool, error) {
  1933. // rewind the reader on retry and after reading md5
  1934. _, err = reader.Seek(0, io.SeekStart)
  1935. if err != nil {
  1936. return false, err
  1937. }
  1938. options := blockblob.StageBlockOptions{
  1939. // Specify the transactional md5 for the body, to be validated by the service.
  1940. TransactionalValidation: blob.TransferValidationTypeMD5(transactionalMD5),
  1941. }
  1942. _, err = w.ui.blb.StageBlock(ctx, blockID, &readSeekCloser{Reader: reader, Seeker: reader}, &options)
  1943. if err != nil {
  1944. if chunkNumber <= 8 {
  1945. return w.f.shouldRetry(ctx, err)
  1946. }
  1947. // retry all chunks once have done the first few
  1948. return true, err
  1949. }
  1950. return false, nil
  1951. })
  1952. if err != nil {
  1953. return -1, fmt.Errorf("failed to upload chunk %d with %v bytes: %w", chunkNumber+1, currentChunkSize, err)
  1954. }
  1955. fs.Debugf(w.o, "multipart upload wrote chunk %d with %v bytes", chunkNumber+1, currentChunkSize)
  1956. return currentChunkSize, err
  1957. }
  1958. // Abort the multipart upload.
  1959. //
  1960. // FIXME it would be nice to delete uncommitted blocks.
  1961. //
  1962. // See: https://github.com/rclone/rclone/issues/5583
  1963. //
  1964. // However there doesn't seem to be an easy way of doing this other than
  1965. // by deleting the target.
  1966. //
  1967. // This means that a failed upload deletes the target which isn't ideal.
  1968. //
  1969. // Uploading a zero length blob and deleting it will remove the
  1970. // uncommitted blocks I think.
  1971. //
  1972. // Could check to see if a file exists already and if it doesn't then
  1973. // create a 0 length file and delete it to flush the uncommitted
  1974. // blocks.
  1975. //
  1976. // This is what azcopy does
  1977. // https://github.com/MicrosoftDocs/azure-docs/issues/36347#issuecomment-541457962
  1978. func (w *azChunkWriter) Abort(ctx context.Context) error {
  1979. fs.Debugf(w.o, "multipart upload aborted (did nothing - see issue #5583)")
  1980. return nil
  1981. }
  1982. // Close and finalise the multipart upload
  1983. func (w *azChunkWriter) Close(ctx context.Context) (err error) {
  1984. // sort the completed parts by part number
  1985. sort.Slice(w.blocks, func(i, j int) bool {
  1986. return w.blocks[i].chunkNumber < w.blocks[j].chunkNumber
  1987. })
  1988. // Create and check a list of block IDs
  1989. blockIDs := make([]string, len(w.blocks))
  1990. for i := range w.blocks {
  1991. if w.blocks[i].chunkNumber != uint64(i) {
  1992. return fmt.Errorf("internal error: expecting chunkNumber %d but got %d", i, w.blocks[i].chunkNumber)
  1993. }
  1994. chunkBytes, err := base64.StdEncoding.DecodeString(w.blocks[i].id)
  1995. if err != nil {
  1996. return fmt.Errorf("internal error: bad block ID: %w", err)
  1997. }
  1998. chunkNumber := binary.LittleEndian.Uint64(chunkBytes)
  1999. if w.blocks[i].chunkNumber != chunkNumber {
  2000. return fmt.Errorf("internal error: expecting decoded chunkNumber %d but got %d", w.blocks[i].chunkNumber, chunkNumber)
  2001. }
  2002. blockIDs[i] = w.blocks[i].id
  2003. }
  2004. options := blockblob.CommitBlockListOptions{
  2005. Metadata: w.o.getMetadata(),
  2006. Tier: parseTier(w.f.opt.AccessTier),
  2007. HTTPHeaders: &w.ui.httpHeaders,
  2008. }
  2009. // Finalise the upload session
  2010. err = w.f.pacer.Call(func() (bool, error) {
  2011. _, err := w.ui.blb.CommitBlockList(ctx, blockIDs, &options)
  2012. return w.f.shouldRetry(ctx, err)
  2013. })
  2014. if err != nil {
  2015. return fmt.Errorf("failed to complete multipart upload: %w", err)
  2016. }
  2017. fs.Debugf(w.o, "multipart upload finished")
  2018. return err
  2019. }
  2020. var warnStreamUpload sync.Once
  2021. // uploadMultipart uploads a file using multipart upload
  2022. //
  2023. // Write a larger blob, using CreateBlockBlob, PutBlock, and PutBlockList.
  2024. func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (ui uploadInfo, err error) {
  2025. chunkWriter, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{
  2026. Open: o.fs,
  2027. OpenOptions: options,
  2028. })
  2029. if err != nil {
  2030. return ui, err
  2031. }
  2032. return chunkWriter.(*azChunkWriter).ui, nil
  2033. }
  2034. // uploadSinglepart uploads a short blob using a single part upload
  2035. func (o *Object) uploadSinglepart(ctx context.Context, in io.Reader, size int64, ui uploadInfo) (err error) {
  2036. chunkSize := int64(o.fs.opt.ChunkSize)
  2037. // fs.Debugf(o, "Single part upload starting of object %d bytes", size)
  2038. if size > chunkSize || size < 0 {
  2039. return fmt.Errorf("internal error: single part upload size too big %d > %d", size, chunkSize)
  2040. }
  2041. rw := multipart.NewRW()
  2042. defer fs.CheckClose(rw, &err)
  2043. n, err := io.CopyN(rw, in, size+1)
  2044. if err != nil && err != io.EOF {
  2045. return fmt.Errorf("single part upload read failed: %w", err)
  2046. }
  2047. if n != size {
  2048. return fmt.Errorf("single part upload: expecting to read %d bytes but read %d", size, n)
  2049. }
  2050. rs := &readSeekCloser{Reader: rw, Seeker: rw}
  2051. options := blockblob.UploadOptions{
  2052. Metadata: o.getMetadata(),
  2053. Tier: parseTier(o.fs.opt.AccessTier),
  2054. HTTPHeaders: &ui.httpHeaders,
  2055. }
  2056. return o.fs.pacer.Call(func() (bool, error) {
  2057. // rewind the reader on retry
  2058. _, err = rs.Seek(0, io.SeekStart)
  2059. if err != nil {
  2060. return false, err
  2061. }
  2062. _, err = ui.blb.Upload(ctx, rs, &options)
  2063. return o.fs.shouldRetry(ctx, err)
  2064. })
  2065. }
  2066. // Info needed for an upload
  2067. type uploadInfo struct {
  2068. blb *blockblob.Client
  2069. httpHeaders blob.HTTPHeaders
  2070. isDirMarker bool
  2071. }
  2072. // Prepare the object for upload
  2073. func (o *Object) prepareUpload(ctx context.Context, src fs.ObjectInfo, options []fs.OpenOption) (ui uploadInfo, err error) {
  2074. container, containerPath := o.split()
  2075. if container == "" || containerPath == "" {
  2076. return ui, fmt.Errorf("can't upload to root - need a container")
  2077. }
  2078. // Create parent dir/bucket if not saving directory marker
  2079. metadataMu.Lock()
  2080. _, ui.isDirMarker = o.meta[dirMetaKey]
  2081. metadataMu.Unlock()
  2082. if !ui.isDirMarker {
  2083. err = o.fs.mkdirParent(ctx, o.remote)
  2084. if err != nil {
  2085. return ui, err
  2086. }
  2087. }
  2088. // Update Mod time
  2089. o.updateMetadataWithModTime(src.ModTime(ctx))
  2090. if err != nil {
  2091. return ui, err
  2092. }
  2093. // Create the HTTP headers for the upload
  2094. ui.httpHeaders = blob.HTTPHeaders{
  2095. BlobContentType: pString(fs.MimeType(ctx, src)),
  2096. }
  2097. // Compute the Content-MD5 of the file. As we stream all uploads it
  2098. // will be set in PutBlockList API call using the 'x-ms-blob-content-md5' header
  2099. if !o.fs.opt.DisableCheckSum {
  2100. if sourceMD5, _ := src.Hash(ctx, hash.MD5); sourceMD5 != "" {
  2101. sourceMD5bytes, err := hex.DecodeString(sourceMD5)
  2102. if err == nil {
  2103. ui.httpHeaders.BlobContentMD5 = sourceMD5bytes
  2104. } else {
  2105. fs.Debugf(o, "Failed to decode %q as MD5: %v", sourceMD5, err)
  2106. }
  2107. }
  2108. }
  2109. // Apply upload options (also allows one to overwrite content-type)
  2110. for _, option := range options {
  2111. key, value := option.Header()
  2112. lowerKey := strings.ToLower(key)
  2113. switch lowerKey {
  2114. case "":
  2115. // ignore
  2116. case "cache-control":
  2117. ui.httpHeaders.BlobCacheControl = pString(value)
  2118. case "content-disposition":
  2119. ui.httpHeaders.BlobContentDisposition = pString(value)
  2120. case "content-encoding":
  2121. ui.httpHeaders.BlobContentEncoding = pString(value)
  2122. case "content-language":
  2123. ui.httpHeaders.BlobContentLanguage = pString(value)
  2124. case "content-type":
  2125. ui.httpHeaders.BlobContentType = pString(value)
  2126. }
  2127. }
  2128. ui.blb = o.fs.getBlockBlobSVC(container, containerPath)
  2129. return ui, nil
  2130. }
  2131. // Update the object with the contents of the io.Reader, modTime and size
  2132. //
  2133. // The new object may have been created if an error is returned
  2134. func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
  2135. if o.accessTier == blob.AccessTierArchive {
  2136. if o.fs.opt.ArchiveTierDelete {
  2137. fs.Debugf(o, "deleting archive tier blob before updating")
  2138. err = o.Remove(ctx)
  2139. if err != nil {
  2140. return fmt.Errorf("failed to delete archive blob before updating: %w", err)
  2141. }
  2142. } else {
  2143. return errCantUpdateArchiveTierBlobs
  2144. }
  2145. }
  2146. size := src.Size()
  2147. multipartUpload := size < 0 || size > int64(o.fs.opt.ChunkSize)
  2148. var ui uploadInfo
  2149. if multipartUpload {
  2150. ui, err = o.uploadMultipart(ctx, in, src, options...)
  2151. } else {
  2152. ui, err = o.prepareUpload(ctx, src, options)
  2153. if err != nil {
  2154. return fmt.Errorf("failed to prepare upload: %w", err)
  2155. }
  2156. err = o.uploadSinglepart(ctx, in, size, ui)
  2157. }
  2158. if err != nil {
  2159. return err
  2160. }
  2161. // Refresh metadata on object
  2162. if !ui.isDirMarker {
  2163. o.clearMetaData()
  2164. err = o.readMetaData(ctx)
  2165. if err != nil {
  2166. return err
  2167. }
  2168. }
  2169. // If tier is not changed or not specified, do not attempt to invoke `SetBlobTier` operation
  2170. if o.fs.opt.AccessTier == string(defaultAccessTier) || o.fs.opt.AccessTier == string(o.AccessTier()) {
  2171. return nil
  2172. }
  2173. // Now, set blob tier based on configured access tier
  2174. return o.SetTier(o.fs.opt.AccessTier)
  2175. }
  2176. // Remove an object
  2177. func (o *Object) Remove(ctx context.Context) error {
  2178. blb := o.getBlobSVC()
  2179. opt := blob.DeleteOptions{}
  2180. if o.fs.opt.DeleteSnapshots != "" {
  2181. action := blob.DeleteSnapshotsOptionType(o.fs.opt.DeleteSnapshots)
  2182. opt.DeleteSnapshots = &action
  2183. }
  2184. return o.fs.pacer.Call(func() (bool, error) {
  2185. _, err := blb.Delete(ctx, &opt)
  2186. return o.fs.shouldRetry(ctx, err)
  2187. })
  2188. }
  2189. // MimeType of an Object if known, "" otherwise
  2190. func (o *Object) MimeType(ctx context.Context) string {
  2191. return o.mimeType
  2192. }
  2193. // AccessTier of an object, default is of type none
  2194. func (o *Object) AccessTier() blob.AccessTier {
  2195. return o.accessTier
  2196. }
  2197. // SetTier performs changing object tier
  2198. func (o *Object) SetTier(tier string) error {
  2199. if !validateAccessTier(tier) {
  2200. return fmt.Errorf("tier %s not supported by Azure Blob Storage", tier)
  2201. }
  2202. // Check if current tier already matches with desired tier
  2203. if o.GetTier() == tier {
  2204. return nil
  2205. }
  2206. desiredAccessTier := blob.AccessTier(tier)
  2207. blb := o.getBlobSVC()
  2208. ctx := context.Background()
  2209. priority := blob.RehydratePriorityStandard
  2210. opt := blob.SetTierOptions{
  2211. RehydratePriority: &priority,
  2212. }
  2213. err := o.fs.pacer.Call(func() (bool, error) {
  2214. _, err := blb.SetTier(ctx, desiredAccessTier, &opt)
  2215. return o.fs.shouldRetry(ctx, err)
  2216. })
  2217. if err != nil {
  2218. return fmt.Errorf("failed to set Blob Tier: %w", err)
  2219. }
  2220. // Set access tier on local object also, this typically
  2221. // gets updated on get blob properties
  2222. o.accessTier = desiredAccessTier
  2223. fs.Debugf(o, "Successfully changed object tier to %s", tier)
  2224. return nil
  2225. }
  2226. // GetTier returns object tier in azure as string
  2227. func (o *Object) GetTier() string {
  2228. return string(o.accessTier)
  2229. }
  2230. func parseTier(tier string) *blob.AccessTier {
  2231. if tier == "" {
  2232. return nil
  2233. }
  2234. msTier := blob.AccessTier(tier)
  2235. return &msTier
  2236. }
  2237. // Check the interfaces are satisfied
  2238. var (
  2239. _ fs.Fs = &Fs{}
  2240. _ fs.Copier = &Fs{}
  2241. _ fs.PutStreamer = &Fs{}
  2242. _ fs.Purger = &Fs{}
  2243. _ fs.ListRer = &Fs{}
  2244. _ fs.OpenChunkWriter = &Fs{}
  2245. _ fs.Object = &Object{}
  2246. _ fs.MimeTyper = &Object{}
  2247. _ fs.GetTierer = &Object{}
  2248. _ fs.SetTierer = &Object{}
  2249. )