docker_test.go 11 KB


  1. //go:build !race
  2. package docker_test
  3. import (
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "net"
  10. "net/http"
  11. "os"
  12. "path/filepath"
  13. "runtime"
  14. "strings"
  15. "testing"
  16. "time"
  17. "github.com/rclone/rclone/cmd/mountlib"
  18. "github.com/rclone/rclone/cmd/serve/docker"
  19. "github.com/rclone/rclone/fs"
  20. "github.com/rclone/rclone/fs/config"
  21. "github.com/rclone/rclone/fstest"
  22. "github.com/rclone/rclone/fstest/testy"
  23. "github.com/rclone/rclone/lib/file"
  24. "github.com/stretchr/testify/assert"
  25. "github.com/stretchr/testify/require"
  26. _ "github.com/rclone/rclone/backend/local"
  27. _ "github.com/rclone/rclone/backend/memory"
  28. _ "github.com/rclone/rclone/cmd/cmount"
  29. _ "github.com/rclone/rclone/cmd/mount"
  30. )
  31. func initialise(ctx context.Context, t *testing.T) (string, fs.Fs) {
  32. fstest.Initialise()
  33. // Make test cache directory
  34. testDir, err := fstest.LocalRemote()
  35. require.NoError(t, err)
  36. err = file.MkdirAll(testDir, 0755)
  37. require.NoError(t, err)
  38. // Make test file system
  39. testFs, err := fs.NewFs(ctx, testDir)
  40. require.NoError(t, err)
  41. return testDir, testFs
  42. }
  43. func assertErrorContains(t *testing.T, err error, errString string, msgAndArgs ...interface{}) {
  44. assert.Error(t, err)
  45. if err != nil {
  46. assert.Contains(t, err.Error(), errString, msgAndArgs...)
  47. }
  48. }
  49. func assertVolumeInfo(t *testing.T, v *docker.VolInfo, name, path string) {
  50. assert.Equal(t, name, v.Name)
  51. assert.Equal(t, path, v.Mountpoint)
  52. assert.NotEmpty(t, v.CreatedAt)
  53. _, err := time.Parse(time.RFC3339, v.CreatedAt)
  54. assert.NoError(t, err)
  55. }
  56. func TestDockerPluginLogic(t *testing.T) {
  57. ctx := context.Background()
  58. oldCacheDir := config.GetCacheDir()
  59. testDir, testFs := initialise(ctx, t)
  60. err := config.SetCacheDir(testDir)
  61. require.NoError(t, err)
  62. defer func() {
  63. _ = config.SetCacheDir(oldCacheDir)
  64. if !t.Failed() {
  65. fstest.Purge(testFs)
  66. _ = os.RemoveAll(testDir)
  67. }
  68. }()
  69. // Create dummy volume driver
  70. drv, err := docker.NewDriver(ctx, testDir, nil, nil, true, true)
  71. require.NoError(t, err)
  72. require.NotNil(t, drv)
  73. // 1st volume request
  74. volReq := &docker.CreateRequest{
  75. Name: "vol1",
  76. Options: docker.VolOpts{},
  77. }
  78. assertErrorContains(t, drv.Create(volReq), "volume must have either remote or backend")
  79. volReq.Options["remote"] = testDir
  80. assert.NoError(t, drv.Create(volReq))
  81. path1 := filepath.Join(testDir, "vol1")
  82. assert.ErrorIs(t, drv.Create(volReq), docker.ErrVolumeExists)
  83. getReq := &docker.GetRequest{Name: "vol1"}
  84. getRes, err := drv.Get(getReq)
  85. assert.NoError(t, err)
  86. require.NotNil(t, getRes)
  87. assertVolumeInfo(t, getRes.Volume, "vol1", path1)
  88. // 2nd volume request
  89. volReq.Name = "vol2"
  90. assert.NoError(t, drv.Create(volReq))
  91. path2 := filepath.Join(testDir, "vol2")
  92. listRes, err := drv.List()
  93. require.NoError(t, err)
  94. require.Equal(t, 2, len(listRes.Volumes))
  95. assertVolumeInfo(t, listRes.Volumes[0], "vol1", path1)
  96. assertVolumeInfo(t, listRes.Volumes[1], "vol2", path2)
  97. // Try prohibited volume options
  98. volReq.Name = "vol99"
  99. volReq.Options["remote"] = testDir
  100. volReq.Options["type"] = "memory"
  101. err = drv.Create(volReq)
  102. assertErrorContains(t, err, "volume must have either remote or backend")
  103. volReq.Options["persist"] = "WrongBoolean"
  104. err = drv.Create(volReq)
  105. assertErrorContains(t, err, "cannot parse option")
  106. volReq.Options["persist"] = "true"
  107. delete(volReq.Options, "remote")
  108. err = drv.Create(volReq)
  109. assertErrorContains(t, err, "persist remotes is prohibited")
  110. volReq.Options["persist"] = "false"
  111. volReq.Options["memory-option-broken"] = "some-value"
  112. err = drv.Create(volReq)
  113. assertErrorContains(t, err, "unsupported backend option")
  114. getReq.Name = "vol99"
  115. getRes, err = drv.Get(getReq)
  116. assert.Error(t, err)
  117. assert.Nil(t, getRes)
  118. // Test mount requests
  119. mountReq := &docker.MountRequest{
  120. Name: "vol2",
  121. ID: "id1",
  122. }
  123. mountRes, err := drv.Mount(mountReq)
  124. assert.NoError(t, err)
  125. require.NotNil(t, mountRes)
  126. assert.Equal(t, path2, mountRes.Mountpoint)
  127. mountRes, err = drv.Mount(mountReq)
  128. assert.Error(t, err)
  129. assert.Nil(t, mountRes)
  130. assertErrorContains(t, err, "already mounted by this id")
  131. mountReq.ID = "id2"
  132. mountRes, err = drv.Mount(mountReq)
  133. assert.NoError(t, err)
  134. require.NotNil(t, mountRes)
  135. assert.Equal(t, path2, mountRes.Mountpoint)
  136. unmountReq := &docker.UnmountRequest{
  137. Name: "vol2",
  138. ID: "id1",
  139. }
  140. err = drv.Unmount(unmountReq)
  141. assert.NoError(t, err)
  142. err = drv.Unmount(unmountReq)
  143. assert.Error(t, err)
  144. assertErrorContains(t, err, "not mounted by this id")
  145. // Simulate plugin restart
  146. drv2, err := docker.NewDriver(ctx, testDir, nil, nil, true, false)
  147. assert.NoError(t, err)
  148. require.NotNil(t, drv2)
  149. // New plugin instance should pick up the saved state
  150. listRes, err = drv2.List()
  151. require.NoError(t, err)
  152. require.Equal(t, 2, len(listRes.Volumes))
  153. assertVolumeInfo(t, listRes.Volumes[0], "vol1", path1)
  154. assertVolumeInfo(t, listRes.Volumes[1], "vol2", path2)
  155. rmReq := &docker.RemoveRequest{Name: "vol2"}
  156. err = drv.Remove(rmReq)
  157. assertErrorContains(t, err, "volume is in use")
  158. unmountReq.ID = "id1"
  159. err = drv.Unmount(unmountReq)
  160. assert.Error(t, err)
  161. assertErrorContains(t, err, "not mounted by this id")
  162. unmountReq.ID = "id2"
  163. err = drv.Unmount(unmountReq)
  164. assert.NoError(t, err)
  165. err = drv.Unmount(unmountReq)
  166. assert.EqualError(t, err, "volume is not mounted")
  167. err = drv.Remove(rmReq)
  168. assert.NoError(t, err)
  169. }
  170. const (
  171. httpTimeout = 2 * time.Second
  172. tempDelay = 10 * time.Millisecond
  173. )
  174. type APIClient struct {
  175. t *testing.T
  176. cli *http.Client
  177. host string
  178. }
  179. func newAPIClient(t *testing.T, host, unixPath string) *APIClient {
  180. tr := &http.Transport{
  181. MaxIdleConns: 1,
  182. IdleConnTimeout: httpTimeout,
  183. DisableCompression: true,
  184. }
  185. if unixPath != "" {
  186. tr.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) {
  187. return net.Dial("unix", unixPath)
  188. }
  189. } else {
  190. dialer := &net.Dialer{
  191. Timeout: httpTimeout,
  192. KeepAlive: httpTimeout,
  193. }
  194. tr.DialContext = dialer.DialContext
  195. }
  196. cli := &http.Client{
  197. Transport: tr,
  198. Timeout: httpTimeout,
  199. }
  200. return &APIClient{
  201. t: t,
  202. cli: cli,
  203. host: host,
  204. }
  205. }
  206. func (a *APIClient) request(path string, in, out interface{}, wantErr bool) {
  207. t := a.t
  208. var (
  209. dataIn []byte
  210. dataOut []byte
  211. err error
  212. )
  213. realm := "VolumeDriver"
  214. if path == "Activate" {
  215. realm = "Plugin"
  216. }
  217. url := fmt.Sprintf("http://%s/%s.%s", a.host, realm, path)
  218. if str, isString := in.(string); isString {
  219. dataIn = []byte(str)
  220. } else {
  221. dataIn, err = json.Marshal(in)
  222. require.NoError(t, err)
  223. }
  224. fs.Logf(path, "<-- %s", dataIn)
  225. req, err := http.NewRequest("POST", url, bytes.NewBuffer(dataIn))
  226. require.NoError(t, err)
  227. req.Header.Set("Content-Type", "application/json")
  228. res, err := a.cli.Do(req)
  229. require.NoError(t, err)
  230. wantStatus := http.StatusOK
  231. if wantErr {
  232. wantStatus = http.StatusInternalServerError
  233. }
  234. assert.Equal(t, wantStatus, res.StatusCode)
  235. dataOut, err = io.ReadAll(res.Body)
  236. require.NoError(t, err)
  237. err = res.Body.Close()
  238. require.NoError(t, err)
  239. if strPtr, isString := out.(*string); isString || wantErr {
  240. require.True(t, isString, "must use string for error response")
  241. if wantErr {
  242. var errRes docker.ErrorResponse
  243. err = json.Unmarshal(dataOut, &errRes)
  244. require.NoError(t, err)
  245. *strPtr = errRes.Err
  246. } else {
  247. *strPtr = strings.TrimSpace(string(dataOut))
  248. }
  249. } else {
  250. err = json.Unmarshal(dataOut, out)
  251. require.NoError(t, err)
  252. }
  253. fs.Logf(path, "--> %s", dataOut)
  254. time.Sleep(tempDelay)
  255. }
  256. func testMountAPI(t *testing.T, sockAddr string) {
  257. // Disable tests under macOS and linux in the CI since they are locking up
  258. if runtime.GOOS == "darwin" || runtime.GOOS == "linux" {
  259. testy.SkipUnreliable(t)
  260. }
  261. if _, mountFn := mountlib.ResolveMountMethod(""); mountFn == nil {
  262. t.Skip("Test requires working mount command")
  263. }
  264. ctx := context.Background()
  265. oldCacheDir := config.GetCacheDir()
  266. testDir, testFs := initialise(ctx, t)
  267. err := config.SetCacheDir(testDir)
  268. require.NoError(t, err)
  269. defer func() {
  270. _ = config.SetCacheDir(oldCacheDir)
  271. if !t.Failed() {
  272. fstest.Purge(testFs)
  273. _ = os.RemoveAll(testDir)
  274. }
  275. }()
  276. // Prepare API client
  277. var cli *APIClient
  278. var unixPath string
  279. if sockAddr != "" {
  280. cli = newAPIClient(t, sockAddr, "")
  281. } else {
  282. unixPath = filepath.Join(testDir, "rclone.sock")
  283. cli = newAPIClient(t, "localhost", unixPath)
  284. }
  285. // Create mounting volume driver and listen for requests
  286. drv, err := docker.NewDriver(ctx, testDir, nil, nil, false, true)
  287. require.NoError(t, err)
  288. require.NotNil(t, drv)
  289. defer drv.Exit()
  290. srv := docker.NewServer(drv)
  291. go func() {
  292. var errServe error
  293. if unixPath != "" {
  294. errServe = srv.ServeUnix(unixPath, os.Getgid())
  295. } else {
  296. errServe = srv.ServeTCP(sockAddr, testDir, nil, false)
  297. }
  298. assert.ErrorIs(t, errServe, http.ErrServerClosed)
  299. }()
  300. defer func() {
  301. err := srv.Shutdown(ctx)
  302. assert.NoError(t, err)
  303. fs.Logf(nil, "Server stopped")
  304. time.Sleep(tempDelay)
  305. }()
  306. time.Sleep(tempDelay) // Let server start
  307. // Run test sequence
  308. path1 := filepath.Join(testDir, "path1")
  309. require.NoError(t, file.MkdirAll(path1, 0755))
  310. mount1 := filepath.Join(testDir, "vol1")
  311. res := ""
  312. cli.request("Activate", "{}", &res, false)
  313. assert.Contains(t, res, `"VolumeDriver"`)
  314. createReq := docker.CreateRequest{
  315. Name: "vol1",
  316. Options: docker.VolOpts{"remote": path1},
  317. }
  318. cli.request("Create", createReq, &res, false)
  319. assert.Equal(t, "{}", res)
  320. cli.request("Create", createReq, &res, true)
  321. assert.Contains(t, res, "volume already exists")
  322. mountReq := docker.MountRequest{Name: "vol1", ID: "id1"}
  323. var mountRes docker.MountResponse
  324. cli.request("Mount", mountReq, &mountRes, false)
  325. assert.Equal(t, mount1, mountRes.Mountpoint)
  326. cli.request("Mount", mountReq, &res, true)
  327. assert.Contains(t, res, "already mounted by this id")
  328. removeReq := docker.RemoveRequest{Name: "vol1"}
  329. cli.request("Remove", removeReq, &res, true)
  330. assert.Contains(t, res, "volume is in use")
  331. text := []byte("banana")
  332. err = os.WriteFile(filepath.Join(mount1, "txt"), text, 0644)
  333. assert.NoError(t, err)
  334. time.Sleep(tempDelay)
  335. text2, err := os.ReadFile(filepath.Join(path1, "txt"))
  336. assert.NoError(t, err)
  337. if runtime.GOOS != "windows" {
  338. // this check sometimes fails on windows - ignore
  339. assert.Equal(t, text, text2)
  340. }
  341. unmountReq := docker.UnmountRequest{Name: "vol1", ID: "id1"}
  342. cli.request("Unmount", unmountReq, &res, false)
  343. assert.Equal(t, "{}", res)
  344. cli.request("Unmount", unmountReq, &res, true)
  345. assert.Equal(t, "volume is not mounted", res)
  346. cli.request("Remove", removeReq, &res, false)
  347. assert.Equal(t, "{}", res)
  348. cli.request("Remove", removeReq, &res, true)
  349. assert.Equal(t, "volume not found", res)
  350. var listRes docker.ListResponse
  351. cli.request("List", "{}", &listRes, false)
  352. assert.Empty(t, listRes.Volumes)
  353. }
  354. func TestDockerPluginMountTCP(t *testing.T) {
  355. testMountAPI(t, "localhost:53789")
  356. }
  357. func TestDockerPluginMountUnix(t *testing.T) {
  358. if runtime.GOOS != "linux" {
  359. t.Skip("Test is Linux-only")
  360. }
  361. testMountAPI(t, "")
  362. }