rc.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. // Package rc provides remote control of a Syncthing process via the REST API.
  7. package rc
  8. import (
  9. "bufio"
  10. "bytes"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "log"
  16. "net/http"
  17. "net/url"
  18. "os"
  19. "os/exec"
  20. "path/filepath"
  21. "strconv"
  22. "time"
  23. "github.com/syncthing/syncthing/lib/config"
  24. "github.com/syncthing/syncthing/lib/dialer"
  25. "github.com/syncthing/syncthing/lib/events"
  26. "github.com/syncthing/syncthing/lib/model"
  27. "github.com/syncthing/syncthing/lib/protocol"
  28. "github.com/syncthing/syncthing/lib/sync"
  29. )
  30. // APIKey is set via the STGUIAPIKEY variable when we launch the binary, to
  31. // ensure that we have API access regardless of authentication settings.
  32. const APIKey = "592A47BC-A7DF-4C2F-89E0-A80B3E5094EE"
  33. type Process struct {
  34. // Set at initialization
  35. addr string
  36. // Set by eventLoop()
  37. eventMut sync.Mutex
  38. id protocol.DeviceID
  39. folders []string
  40. startComplete chan struct{}
  41. stopped chan struct{}
  42. stopErr error
  43. sequence map[string]map[string]int64 // Folder ID => Device ID => Sequence
  44. done map[string]bool // Folder ID => 100%
  45. cmd *exec.Cmd
  46. logfd *os.File
  47. }
  48. // NewProcess returns a new Process talking to Syncthing at the specified address.
  49. // Example: NewProcess("127.0.0.1:8082")
  50. func NewProcess(addr string) *Process {
  51. p := &Process{
  52. addr: addr,
  53. sequence: make(map[string]map[string]int64),
  54. done: make(map[string]bool),
  55. eventMut: sync.NewMutex(),
  56. startComplete: make(chan struct{}),
  57. stopped: make(chan struct{}),
  58. }
  59. return p
  60. }
  61. func (p *Process) ID() protocol.DeviceID {
  62. return p.id
  63. }
  64. // LogTo creates the specified log file and ensures that stdout and stderr
  65. // from the Start()ed process is redirected there. Must be called before
  66. // Start().
  67. func (p *Process) LogTo(filename string) error {
  68. if p.cmd != nil {
  69. panic("logfd cannot be set with an existing cmd")
  70. }
  71. if p.logfd != nil {
  72. p.logfd.Close()
  73. }
  74. fd, err := os.Create(filename)
  75. if err != nil {
  76. return err
  77. }
  78. p.logfd = fd
  79. return nil
  80. }
  81. // Start runs the specified Syncthing binary with the given arguments.
  82. // Syncthing should be configured to provide an API on the address given to
  83. // NewProcess. Event processing is started.
  84. func (p *Process) Start(bin string, args ...string) error {
  85. cmd := exec.Command(bin, args...)
  86. if p.logfd != nil {
  87. cmd.Stdout = p.logfd
  88. cmd.Stderr = p.logfd
  89. }
  90. cmd.Env = append(os.Environ(), "STNORESTART=1", "STGUIAPIKEY="+APIKey)
  91. err := cmd.Start()
  92. if err != nil {
  93. return err
  94. }
  95. p.cmd = cmd
  96. go p.eventLoop()
  97. go p.wait()
  98. return nil
  99. }
  100. func (p *Process) wait() {
  101. p.cmd.Wait()
  102. if p.logfd != nil {
  103. p.stopErr = p.checkForProblems(p.logfd)
  104. }
  105. close(p.stopped)
  106. }
  107. // AwaitStartup waits for the Syncthing process to start and perform initial
  108. // scans of all folders.
  109. func (p *Process) AwaitStartup() {
  110. select {
  111. case <-p.startComplete:
  112. case <-p.stopped:
  113. }
  114. }
  115. // Stop stops the running Syncthing process. If the process was logging to a
  116. // local file (set by LogTo), the log file will be opened and checked for
  117. // panics and data races. The presence of either will be signalled in the form
  118. // of a returned error.
  119. func (p *Process) Stop() (*os.ProcessState, error) {
  120. select {
  121. case <-p.stopped:
  122. return p.cmd.ProcessState, p.stopErr
  123. default:
  124. }
  125. if _, err := p.Post("/rest/system/shutdown", nil); err != nil && err != io.ErrUnexpectedEOF {
  126. // Unexpected EOF is somewhat expected here, as we may exit before
  127. // returning something sensible.
  128. return nil, err
  129. }
  130. <-p.stopped
  131. return p.cmd.ProcessState, p.stopErr
  132. }
  133. // Stopped returns a channel that will be closed when Syncthing has stopped.
  134. func (p *Process) Stopped() chan struct{} {
  135. return p.stopped
  136. }
  137. // Get performs an HTTP GET and returns the bytes and/or an error. Any non-200
  138. // return code is returned as an error.
  139. func (p *Process) Get(path string) ([]byte, error) {
  140. client := &http.Client{
  141. Timeout: 30 * time.Second,
  142. Transport: &http.Transport{
  143. DialContext: dialer.DialContext,
  144. Proxy: http.ProxyFromEnvironment,
  145. DisableKeepAlives: true,
  146. },
  147. }
  148. url := fmt.Sprintf("http://%s%s", p.addr, path)
  149. req, err := http.NewRequest("GET", url, nil)
  150. if err != nil {
  151. return nil, err
  152. }
  153. req.Header.Add("X-API-Key", APIKey)
  154. resp, err := client.Do(req)
  155. if err != nil {
  156. return nil, err
  157. }
  158. return p.readResponse(resp)
  159. }
  160. // Post performs an HTTP POST and returns the bytes and/or an error. Any
  161. // non-200 return code is returned as an error.
  162. func (p *Process) Post(path string, data io.Reader) ([]byte, error) {
  163. client := &http.Client{
  164. Timeout: 600 * time.Second,
  165. Transport: &http.Transport{
  166. DisableKeepAlives: true,
  167. },
  168. }
  169. url := fmt.Sprintf("http://%s%s", p.addr, path)
  170. req, err := http.NewRequest("POST", url, data)
  171. if err != nil {
  172. return nil, err
  173. }
  174. req.Header.Add("X-API-Key", APIKey)
  175. req.Header.Add("Content-Type", "application/json")
  176. resp, err := client.Do(req)
  177. if err != nil {
  178. return nil, err
  179. }
  180. return p.readResponse(resp)
  181. }
  182. type Event struct {
  183. ID int
  184. Time time.Time
  185. Type string
  186. Data interface{}
  187. }
  188. func (p *Process) Events(since int) ([]Event, error) {
  189. bs, err := p.Get(fmt.Sprintf("/rest/events?since=%d&timeout=10", since))
  190. if err != nil {
  191. return nil, err
  192. }
  193. var evs []Event
  194. dec := json.NewDecoder(bytes.NewReader(bs))
  195. dec.UseNumber()
  196. err = dec.Decode(&evs)
  197. if err != nil {
  198. return nil, fmt.Errorf("events: %w in %q", err, bs)
  199. }
  200. return evs, err
  201. }
  202. func (p *Process) Rescan(folder string) error {
  203. _, err := p.Post("/rest/db/scan?folder="+url.QueryEscape(folder), nil)
  204. return err
  205. }
  206. func (p *Process) RescanDelay(folder string, delaySeconds int) error {
  207. _, err := p.Post(fmt.Sprintf("/rest/db/scan?folder=%s&next=%d", url.QueryEscape(folder), delaySeconds), nil)
  208. return err
  209. }
  210. func (p *Process) RescanSub(folder string, sub string, delaySeconds int) error {
  211. return p.RescanSubs(folder, []string{sub}, delaySeconds)
  212. }
  213. func (p *Process) RescanSubs(folder string, subs []string, delaySeconds int) error {
  214. data := url.Values{}
  215. data.Set("folder", folder)
  216. for _, sub := range subs {
  217. data.Add("sub", sub)
  218. }
  219. data.Set("next", strconv.Itoa(delaySeconds))
  220. _, err := p.Post("/rest/db/scan?"+data.Encode(), nil)
  221. return err
  222. }
  223. func (p *Process) ConfigInSync() (bool, error) {
  224. bs, err := p.Get("/rest/system/config/insync")
  225. if err != nil {
  226. return false, err
  227. }
  228. return bytes.Contains(bs, []byte("true")), nil
  229. }
  230. func (p *Process) GetConfig() (config.Configuration, error) {
  231. var cfg config.Configuration
  232. bs, err := p.Get("/rest/system/config")
  233. if err != nil {
  234. return cfg, err
  235. }
  236. err = json.Unmarshal(bs, &cfg)
  237. return cfg, err
  238. }
  239. func (p *Process) PostConfig(cfg config.Configuration) error {
  240. buf := new(bytes.Buffer)
  241. if err := json.NewEncoder(buf).Encode(cfg); err != nil {
  242. return err
  243. }
  244. _, err := p.Post("/rest/system/config", buf)
  245. return err
  246. }
  247. func (p *Process) PauseDevice(dev protocol.DeviceID) error {
  248. _, err := p.Post("/rest/system/pause?device="+dev.String(), nil)
  249. return err
  250. }
  251. func (p *Process) ResumeDevice(dev protocol.DeviceID) error {
  252. _, err := p.Post("/rest/system/resume?device="+dev.String(), nil)
  253. return err
  254. }
  255. func (p *Process) PauseAll() error {
  256. _, err := p.Post("/rest/system/pause", nil)
  257. return err
  258. }
  259. func (p *Process) ResumeAll() error {
  260. _, err := p.Post("/rest/system/resume", nil)
  261. return err
  262. }
  263. func InSync(folder string, ps ...*Process) bool {
  264. for _, p := range ps {
  265. p.eventMut.Lock()
  266. }
  267. defer func() {
  268. for _, p := range ps {
  269. p.eventMut.Unlock()
  270. }
  271. }()
  272. for i := range ps {
  273. // If our latest FolderSummary didn't report 100%, then we are not done.
  274. if !ps[i].done[folder] {
  275. l.Debugf("done = ps[%d].done[%q] = false", i, folder)
  276. return false
  277. }
  278. // Check Sequence for each device. The local version seen by remote
  279. // devices should be the same as what it has locally, or the index
  280. // hasn't been sent yet.
  281. sourceID := ps[i].id.String()
  282. sourceSeq := ps[i].sequence[folder][sourceID]
  283. l.Debugf("sourceSeq = ps[%d].sequence[%q][%q] = %d", i, folder, sourceID, sourceSeq)
  284. for j := range ps {
  285. if i != j {
  286. remoteSeq := ps[j].sequence[folder][sourceID]
  287. if remoteSeq != sourceSeq {
  288. l.Debugf("remoteSeq = ps[%d].sequence[%q][%q] = %d", j, folder, sourceID, remoteSeq)
  289. return false
  290. }
  291. }
  292. }
  293. }
  294. return true
  295. }
  296. func AwaitSync(folder string, ps ...*Process) {
  297. for {
  298. time.Sleep(250 * time.Millisecond)
  299. if InSync(folder, ps...) {
  300. return
  301. }
  302. }
  303. }
  304. type Model struct {
  305. GlobalBytes int
  306. GlobalDeleted int
  307. GlobalFiles int
  308. InSyncBytes int
  309. InSyncFiles int
  310. Invalid string
  311. LocalBytes int
  312. LocalDeleted int
  313. LocalFiles int
  314. NeedBytes int
  315. NeedFiles int
  316. State string
  317. StateChanged time.Time
  318. Version int
  319. }
  320. func (p *Process) Model(folder string) (Model, error) {
  321. bs, err := p.Get("/rest/db/status?folder=" + url.QueryEscape(folder))
  322. if err != nil {
  323. return Model{}, err
  324. }
  325. var res Model
  326. if err := json.Unmarshal(bs, &res); err != nil {
  327. return Model{}, err
  328. }
  329. l.Debugf("%+v", res)
  330. return res, nil
  331. }
  332. func (*Process) readResponse(resp *http.Response) ([]byte, error) {
  333. bs, err := io.ReadAll(resp.Body)
  334. resp.Body.Close()
  335. if err != nil {
  336. return bs, err
  337. }
  338. if resp.StatusCode != 200 {
  339. return bs, errors.New(resp.Status)
  340. }
  341. return bs, nil
  342. }
  343. func (p *Process) checkForProblems(logfd *os.File) error {
  344. fd, err := os.Open(logfd.Name())
  345. if err != nil {
  346. return err
  347. }
  348. defer fd.Close()
  349. raceConditionStart := []byte("WARNING: DATA RACE")
  350. raceConditionSep := []byte("==================")
  351. panicConditionStart := []byte("panic:")
  352. panicConditionSep := []byte("[") // fallback if we don't already know our ID
  353. if p.id.String() != "" {
  354. panicConditionSep = []byte(p.id.String()[:5])
  355. }
  356. sc := bufio.NewScanner(fd)
  357. race := false
  358. _panic := false
  359. for sc.Scan() {
  360. line := sc.Bytes()
  361. if race || _panic {
  362. if bytes.Contains(line, panicConditionSep) {
  363. _panic = false
  364. continue
  365. }
  366. fmt.Printf("%s\n", line)
  367. if bytes.Contains(line, raceConditionSep) {
  368. race = false
  369. }
  370. } else if bytes.Contains(line, raceConditionStart) {
  371. fmt.Printf("%s\n", raceConditionSep)
  372. fmt.Printf("%s\n", raceConditionStart)
  373. race = true
  374. if err == nil {
  375. err = errors.New("Race condition detected")
  376. }
  377. } else if bytes.Contains(line, panicConditionStart) {
  378. _panic = true
  379. if err == nil {
  380. err = errors.New("Panic detected")
  381. }
  382. }
  383. }
  384. return err
  385. }
  386. func (p *Process) eventLoop() {
  387. since := 0
  388. notScanned := make(map[string]struct{})
  389. start := time.Now()
  390. for {
  391. select {
  392. case <-p.stopped:
  393. return
  394. default:
  395. }
  396. evs, err := p.Events(since)
  397. if err != nil {
  398. if time.Since(start) < 5*time.Second {
  399. // The API has probably not started yet, lets give it some time.
  400. continue
  401. }
  402. // If we're stopping, no need to print the error.
  403. select {
  404. case <-p.stopped:
  405. return
  406. default:
  407. }
  408. log.Println("eventLoop: events:", err)
  409. continue
  410. }
  411. for _, ev := range evs {
  412. if ev.ID != since+1 {
  413. l.Warnln("Event ID jumped", since, "to", ev.ID)
  414. }
  415. since = ev.ID
  416. switch ev.Type {
  417. case "Starting":
  418. // The Starting event tells us where the configuration is. Load
  419. // it and populate our list of folders.
  420. data := ev.Data.(map[string]interface{})
  421. id, err := protocol.DeviceIDFromString(data["myID"].(string))
  422. if err != nil {
  423. log.Println("eventLoop: DeviceIdFromString:", err)
  424. continue
  425. }
  426. p.id = id
  427. home := data["home"].(string)
  428. w, _, err := config.Load(filepath.Join(home, "config.xml"), protocol.LocalDeviceID, events.NoopLogger)
  429. if err != nil {
  430. log.Println("eventLoop: Starting:", err)
  431. continue
  432. }
  433. for id := range w.Folders() {
  434. p.eventMut.Lock()
  435. p.folders = append(p.folders, id)
  436. p.eventMut.Unlock()
  437. notScanned[id] = struct{}{}
  438. }
  439. l.Debugln("Started", p.id)
  440. case "StateChanged":
  441. // When a folder changes to idle, we tick it off by removing
  442. // it from p.notScanned.
  443. if len(p.folders) == 0 {
  444. // We haven't parsed the config yet, shouldn't happen
  445. panic("race, or lost startup event")
  446. }
  447. select {
  448. case <-p.startComplete:
  449. default:
  450. data := ev.Data.(map[string]interface{})
  451. to := data["to"].(string)
  452. if to == "idle" {
  453. folder := data["folder"].(string)
  454. delete(notScanned, folder)
  455. if len(notScanned) == 0 {
  456. close(p.startComplete)
  457. }
  458. }
  459. }
  460. case "LocalIndexUpdated":
  461. data := ev.Data.(map[string]interface{})
  462. folder := data["folder"].(string)
  463. p.eventMut.Lock()
  464. m := p.updateSequenceLocked(folder, p.id.String(), data["sequence"])
  465. p.done[folder] = false
  466. l.Debugf("LocalIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
  467. p.eventMut.Unlock()
  468. case "RemoteIndexUpdated":
  469. data := ev.Data.(map[string]interface{})
  470. device := data["device"].(string)
  471. folder := data["folder"].(string)
  472. p.eventMut.Lock()
  473. m := p.updateSequenceLocked(folder, device, data["sequence"])
  474. p.done[folder] = false
  475. l.Debugf("RemoteIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
  476. p.eventMut.Unlock()
  477. case "FolderSummary":
  478. data := ev.Data.(map[string]interface{})
  479. folder := data["folder"].(string)
  480. summary := data["summary"].(map[string]interface{})
  481. need, _ := summary["needTotalItems"].(json.Number).Int64()
  482. done := need == 0
  483. p.eventMut.Lock()
  484. m := p.updateSequenceLocked(folder, p.id.String(), summary["sequence"])
  485. p.done[folder] = done
  486. l.Debugf("FolderSummary %v %v\n\t%+v\n\t%+v", p.id, folder, p.done, m)
  487. p.eventMut.Unlock()
  488. case "FolderCompletion":
  489. data := ev.Data.(map[string]interface{})
  490. device := data["device"].(string)
  491. folder := data["folder"].(string)
  492. p.eventMut.Lock()
  493. m := p.updateSequenceLocked(folder, device, data["sequence"])
  494. l.Debugf("FolderCompletion %v\n\t%+v", p.id, folder, m)
  495. p.eventMut.Unlock()
  496. }
  497. }
  498. }
  499. }
  500. func (p *Process) updateSequenceLocked(folder, device string, sequenceIntf interface{}) map[string]int64 {
  501. sequence, _ := sequenceIntf.(json.Number).Int64()
  502. m := p.sequence[folder]
  503. if m == nil {
  504. m = make(map[string]int64)
  505. }
  506. m[device] = sequence
  507. p.sequence[folder] = m
  508. return m
  509. }
  510. type ConnectionStats struct {
  511. Address string
  512. Type string
  513. Connected bool
  514. Paused bool
  515. ClientVersion string
  516. InBytesTotal int64
  517. OutBytesTotal int64
  518. }
  519. func (p *Process) Connections() (map[string]ConnectionStats, error) {
  520. bs, err := p.Get("/rest/system/connections")
  521. if err != nil {
  522. return nil, err
  523. }
  524. var res map[string]ConnectionStats
  525. if err := json.Unmarshal(bs, &res); err != nil {
  526. return nil, err
  527. }
  528. return res, nil
  529. }
  530. type SystemStatus struct {
  531. Alloc int64
  532. Goroutines int
  533. MyID protocol.DeviceID
  534. PathSeparator string
  535. StartTime time.Time
  536. Sys int64
  537. Themes []string
  538. Tilde string
  539. Uptime int
  540. }
  541. func (p *Process) SystemStatus() (SystemStatus, error) {
  542. bs, err := p.Get("/rest/system/status")
  543. if err != nil {
  544. return SystemStatus{}, err
  545. }
  546. var res SystemStatus
  547. if err := json.Unmarshal(bs, &res); err != nil {
  548. return SystemStatus{}, err
  549. }
  550. return res, nil
  551. }
  552. type SystemVersion struct {
  553. Arch string
  554. Codename string
  555. LongVersion string
  556. OS string
  557. Version string
  558. }
  559. func (p *Process) SystemVersion() (SystemVersion, error) {
  560. bs, err := p.Get("/rest/system/version")
  561. if err != nil {
  562. return SystemVersion{}, err
  563. }
  564. var res SystemVersion
  565. if err := json.Unmarshal(bs, &res); err != nil {
  566. return SystemVersion{}, err
  567. }
  568. return res, nil
  569. }
  570. func (p *Process) RemoteInSync(folder string, dev protocol.DeviceID) (bool, error) {
  571. bs, err := p.Get(fmt.Sprintf("/rest/db/completion?folder=%v&device=%v", url.QueryEscape(folder), dev))
  572. if err != nil {
  573. return false, err
  574. }
  575. var comp model.FolderCompletion
  576. if err := json.Unmarshal(bs, &comp); err != nil {
  577. return false, err
  578. }
  579. return comp.NeedItems+comp.NeedDeletes == 0, nil
  580. }