123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679 |
- // Copyright (C) 2015 The Syncthing Authors.
- //
- // This Source Code Form is subject to the terms of the Mozilla Public
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
- // You can obtain one at https://mozilla.org/MPL/2.0/.
- // Package rc provides remote control of a Syncthing process via the REST API.
- package rc
- import (
- "bufio"
- "bytes"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "log"
- "net/http"
- "net/url"
- "os"
- "os/exec"
- "path/filepath"
- "strconv"
- "time"
- "github.com/syncthing/syncthing/lib/config"
- "github.com/syncthing/syncthing/lib/dialer"
- "github.com/syncthing/syncthing/lib/events"
- "github.com/syncthing/syncthing/lib/model"
- "github.com/syncthing/syncthing/lib/protocol"
- "github.com/syncthing/syncthing/lib/sync"
- )
- // APIKey is set via the STGUIAPIKEY variable when we launch the binary, to
- // ensure that we have API access regardless of authentication settings.
- const APIKey = "592A47BC-A7DF-4C2F-89E0-A80B3E5094EE"
- type Process struct {
- // Set at initialization
- addr string
- // Set by eventLoop()
- eventMut sync.Mutex
- id protocol.DeviceID
- folders []string
- startComplete chan struct{}
- stopped chan struct{}
- stopErr error
- sequence map[string]map[string]int64 // Folder ID => Device ID => Sequence
- done map[string]bool // Folder ID => 100%
- cmd *exec.Cmd
- logfd *os.File
- }
- // NewProcess returns a new Process talking to Syncthing at the specified address.
- // Example: NewProcess("127.0.0.1:8082")
- func NewProcess(addr string) *Process {
- p := &Process{
- addr: addr,
- sequence: make(map[string]map[string]int64),
- done: make(map[string]bool),
- eventMut: sync.NewMutex(),
- startComplete: make(chan struct{}),
- stopped: make(chan struct{}),
- }
- return p
- }
- func (p *Process) ID() protocol.DeviceID {
- return p.id
- }
- // LogTo creates the specified log file and ensures that stdout and stderr
- // from the Start()ed process is redirected there. Must be called before
- // Start().
- func (p *Process) LogTo(filename string) error {
- if p.cmd != nil {
- panic("logfd cannot be set with an existing cmd")
- }
- if p.logfd != nil {
- p.logfd.Close()
- }
- fd, err := os.Create(filename)
- if err != nil {
- return err
- }
- p.logfd = fd
- return nil
- }
- // Start runs the specified Syncthing binary with the given arguments.
- // Syncthing should be configured to provide an API on the address given to
- // NewProcess. Event processing is started.
- func (p *Process) Start(bin string, args ...string) error {
- cmd := exec.Command(bin, args...)
- if p.logfd != nil {
- cmd.Stdout = p.logfd
- cmd.Stderr = p.logfd
- }
- cmd.Env = append(os.Environ(), "STNORESTART=1", "STGUIAPIKEY="+APIKey)
- err := cmd.Start()
- if err != nil {
- return err
- }
- p.cmd = cmd
- go p.eventLoop()
- go p.wait()
- return nil
- }
- func (p *Process) wait() {
- p.cmd.Wait()
- if p.logfd != nil {
- p.stopErr = p.checkForProblems(p.logfd)
- }
- close(p.stopped)
- }
- // AwaitStartup waits for the Syncthing process to start and perform initial
- // scans of all folders.
- func (p *Process) AwaitStartup() {
- select {
- case <-p.startComplete:
- case <-p.stopped:
- }
- }
- // Stop stops the running Syncthing process. If the process was logging to a
- // local file (set by LogTo), the log file will be opened and checked for
- // panics and data races. The presence of either will be signalled in the form
- // of a returned error.
- func (p *Process) Stop() (*os.ProcessState, error) {
- select {
- case <-p.stopped:
- return p.cmd.ProcessState, p.stopErr
- default:
- }
- if _, err := p.Post("/rest/system/shutdown", nil); err != nil && err != io.ErrUnexpectedEOF {
- // Unexpected EOF is somewhat expected here, as we may exit before
- // returning something sensible.
- return nil, err
- }
- <-p.stopped
- return p.cmd.ProcessState, p.stopErr
- }
- // Stopped returns a channel that will be closed when Syncthing has stopped.
- func (p *Process) Stopped() chan struct{} {
- return p.stopped
- }
- // Get performs an HTTP GET and returns the bytes and/or an error. Any non-200
- // return code is returned as an error.
- func (p *Process) Get(path string) ([]byte, error) {
- client := &http.Client{
- Timeout: 30 * time.Second,
- Transport: &http.Transport{
- DialContext: dialer.DialContext,
- Proxy: http.ProxyFromEnvironment,
- DisableKeepAlives: true,
- },
- }
- url := fmt.Sprintf("http://%s%s", p.addr, path)
- req, err := http.NewRequest("GET", url, nil)
- if err != nil {
- return nil, err
- }
- req.Header.Add("X-API-Key", APIKey)
- resp, err := client.Do(req)
- if err != nil {
- return nil, err
- }
- return p.readResponse(resp)
- }
- // Post performs an HTTP POST and returns the bytes and/or an error. Any
- // non-200 return code is returned as an error.
- func (p *Process) Post(path string, data io.Reader) ([]byte, error) {
- client := &http.Client{
- Timeout: 600 * time.Second,
- Transport: &http.Transport{
- DisableKeepAlives: true,
- },
- }
- url := fmt.Sprintf("http://%s%s", p.addr, path)
- req, err := http.NewRequest("POST", url, data)
- if err != nil {
- return nil, err
- }
- req.Header.Add("X-API-Key", APIKey)
- req.Header.Add("Content-Type", "application/json")
- resp, err := client.Do(req)
- if err != nil {
- return nil, err
- }
- return p.readResponse(resp)
- }
- type Event struct {
- ID int
- Time time.Time
- Type string
- Data interface{}
- }
- func (p *Process) Events(since int) ([]Event, error) {
- bs, err := p.Get(fmt.Sprintf("/rest/events?since=%d&timeout=10", since))
- if err != nil {
- return nil, err
- }
- var evs []Event
- dec := json.NewDecoder(bytes.NewReader(bs))
- dec.UseNumber()
- err = dec.Decode(&evs)
- if err != nil {
- return nil, fmt.Errorf("events: %w in %q", err, bs)
- }
- return evs, err
- }
- func (p *Process) Rescan(folder string) error {
- _, err := p.Post("/rest/db/scan?folder="+url.QueryEscape(folder), nil)
- return err
- }
- func (p *Process) RescanDelay(folder string, delaySeconds int) error {
- _, err := p.Post(fmt.Sprintf("/rest/db/scan?folder=%s&next=%d", url.QueryEscape(folder), delaySeconds), nil)
- return err
- }
- func (p *Process) RescanSub(folder string, sub string, delaySeconds int) error {
- return p.RescanSubs(folder, []string{sub}, delaySeconds)
- }
- func (p *Process) RescanSubs(folder string, subs []string, delaySeconds int) error {
- data := url.Values{}
- data.Set("folder", folder)
- for _, sub := range subs {
- data.Add("sub", sub)
- }
- data.Set("next", strconv.Itoa(delaySeconds))
- _, err := p.Post("/rest/db/scan?"+data.Encode(), nil)
- return err
- }
- func (p *Process) ConfigInSync() (bool, error) {
- bs, err := p.Get("/rest/system/config/insync")
- if err != nil {
- return false, err
- }
- return bytes.Contains(bs, []byte("true")), nil
- }
- func (p *Process) GetConfig() (config.Configuration, error) {
- var cfg config.Configuration
- bs, err := p.Get("/rest/system/config")
- if err != nil {
- return cfg, err
- }
- err = json.Unmarshal(bs, &cfg)
- return cfg, err
- }
- func (p *Process) PostConfig(cfg config.Configuration) error {
- buf := new(bytes.Buffer)
- if err := json.NewEncoder(buf).Encode(cfg); err != nil {
- return err
- }
- _, err := p.Post("/rest/system/config", buf)
- return err
- }
- func (p *Process) PauseDevice(dev protocol.DeviceID) error {
- _, err := p.Post("/rest/system/pause?device="+dev.String(), nil)
- return err
- }
- func (p *Process) ResumeDevice(dev protocol.DeviceID) error {
- _, err := p.Post("/rest/system/resume?device="+dev.String(), nil)
- return err
- }
- func (p *Process) PauseAll() error {
- _, err := p.Post("/rest/system/pause", nil)
- return err
- }
- func (p *Process) ResumeAll() error {
- _, err := p.Post("/rest/system/resume", nil)
- return err
- }
- func InSync(folder string, ps ...*Process) bool {
- for _, p := range ps {
- p.eventMut.Lock()
- }
- defer func() {
- for _, p := range ps {
- p.eventMut.Unlock()
- }
- }()
- for i := range ps {
- // If our latest FolderSummary didn't report 100%, then we are not done.
- if !ps[i].done[folder] {
- l.Debugf("done = ps[%d].done[%q] = false", i, folder)
- return false
- }
- // Check Sequence for each device. The local version seen by remote
- // devices should be the same as what it has locally, or the index
- // hasn't been sent yet.
- sourceID := ps[i].id.String()
- sourceSeq := ps[i].sequence[folder][sourceID]
- l.Debugf("sourceSeq = ps[%d].sequence[%q][%q] = %d", i, folder, sourceID, sourceSeq)
- for j := range ps {
- if i != j {
- remoteSeq := ps[j].sequence[folder][sourceID]
- if remoteSeq != sourceSeq {
- l.Debugf("remoteSeq = ps[%d].sequence[%q][%q] = %d", j, folder, sourceID, remoteSeq)
- return false
- }
- }
- }
- }
- return true
- }
- func AwaitSync(folder string, ps ...*Process) {
- for {
- time.Sleep(250 * time.Millisecond)
- if InSync(folder, ps...) {
- return
- }
- }
- }
- type Model struct {
- GlobalBytes int
- GlobalDeleted int
- GlobalFiles int
- InSyncBytes int
- InSyncFiles int
- Invalid string
- LocalBytes int
- LocalDeleted int
- LocalFiles int
- NeedBytes int
- NeedFiles int
- State string
- StateChanged time.Time
- Version int
- }
- func (p *Process) Model(folder string) (Model, error) {
- bs, err := p.Get("/rest/db/status?folder=" + url.QueryEscape(folder))
- if err != nil {
- return Model{}, err
- }
- var res Model
- if err := json.Unmarshal(bs, &res); err != nil {
- return Model{}, err
- }
- l.Debugf("%+v", res)
- return res, nil
- }
- func (*Process) readResponse(resp *http.Response) ([]byte, error) {
- bs, err := io.ReadAll(resp.Body)
- resp.Body.Close()
- if err != nil {
- return bs, err
- }
- if resp.StatusCode != 200 {
- return bs, errors.New(resp.Status)
- }
- return bs, nil
- }
- func (p *Process) checkForProblems(logfd *os.File) error {
- fd, err := os.Open(logfd.Name())
- if err != nil {
- return err
- }
- defer fd.Close()
- raceConditionStart := []byte("WARNING: DATA RACE")
- raceConditionSep := []byte("==================")
- panicConditionStart := []byte("panic:")
- panicConditionSep := []byte("[") // fallback if we don't already know our ID
- if p.id.String() != "" {
- panicConditionSep = []byte(p.id.String()[:5])
- }
- sc := bufio.NewScanner(fd)
- race := false
- _panic := false
- for sc.Scan() {
- line := sc.Bytes()
- if race || _panic {
- if bytes.Contains(line, panicConditionSep) {
- _panic = false
- continue
- }
- fmt.Printf("%s\n", line)
- if bytes.Contains(line, raceConditionSep) {
- race = false
- }
- } else if bytes.Contains(line, raceConditionStart) {
- fmt.Printf("%s\n", raceConditionSep)
- fmt.Printf("%s\n", raceConditionStart)
- race = true
- if err == nil {
- err = errors.New("Race condition detected")
- }
- } else if bytes.Contains(line, panicConditionStart) {
- _panic = true
- if err == nil {
- err = errors.New("Panic detected")
- }
- }
- }
- return err
- }
- func (p *Process) eventLoop() {
- since := 0
- notScanned := make(map[string]struct{})
- start := time.Now()
- for {
- select {
- case <-p.stopped:
- return
- default:
- }
- evs, err := p.Events(since)
- if err != nil {
- if time.Since(start) < 5*time.Second {
- // The API has probably not started yet, lets give it some time.
- continue
- }
- // If we're stopping, no need to print the error.
- select {
- case <-p.stopped:
- return
- default:
- }
- log.Println("eventLoop: events:", err)
- continue
- }
- for _, ev := range evs {
- if ev.ID != since+1 {
- l.Warnln("Event ID jumped", since, "to", ev.ID)
- }
- since = ev.ID
- switch ev.Type {
- case "Starting":
- // The Starting event tells us where the configuration is. Load
- // it and populate our list of folders.
- data := ev.Data.(map[string]interface{})
- id, err := protocol.DeviceIDFromString(data["myID"].(string))
- if err != nil {
- log.Println("eventLoop: DeviceIdFromString:", err)
- continue
- }
- p.id = id
- home := data["home"].(string)
- w, _, err := config.Load(filepath.Join(home, "config.xml"), protocol.LocalDeviceID, events.NoopLogger)
- if err != nil {
- log.Println("eventLoop: Starting:", err)
- continue
- }
- for id := range w.Folders() {
- p.eventMut.Lock()
- p.folders = append(p.folders, id)
- p.eventMut.Unlock()
- notScanned[id] = struct{}{}
- }
- l.Debugln("Started", p.id)
- case "StateChanged":
- // When a folder changes to idle, we tick it off by removing
- // it from p.notScanned.
- if len(p.folders) == 0 {
- // We haven't parsed the config yet, shouldn't happen
- panic("race, or lost startup event")
- }
- select {
- case <-p.startComplete:
- default:
- data := ev.Data.(map[string]interface{})
- to := data["to"].(string)
- if to == "idle" {
- folder := data["folder"].(string)
- delete(notScanned, folder)
- if len(notScanned) == 0 {
- close(p.startComplete)
- }
- }
- }
- case "LocalIndexUpdated":
- data := ev.Data.(map[string]interface{})
- folder := data["folder"].(string)
- p.eventMut.Lock()
- m := p.updateSequenceLocked(folder, p.id.String(), data["sequence"])
- p.done[folder] = false
- l.Debugf("LocalIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
- p.eventMut.Unlock()
- case "RemoteIndexUpdated":
- data := ev.Data.(map[string]interface{})
- device := data["device"].(string)
- folder := data["folder"].(string)
- p.eventMut.Lock()
- m := p.updateSequenceLocked(folder, device, data["sequence"])
- p.done[folder] = false
- l.Debugf("RemoteIndexUpdated %v %v done=false\n\t%+v", p.id, folder, m)
- p.eventMut.Unlock()
- case "FolderSummary":
- data := ev.Data.(map[string]interface{})
- folder := data["folder"].(string)
- summary := data["summary"].(map[string]interface{})
- need, _ := summary["needTotalItems"].(json.Number).Int64()
- done := need == 0
- p.eventMut.Lock()
- m := p.updateSequenceLocked(folder, p.id.String(), summary["sequence"])
- p.done[folder] = done
- l.Debugf("FolderSummary %v %v\n\t%+v\n\t%+v", p.id, folder, p.done, m)
- p.eventMut.Unlock()
- case "FolderCompletion":
- data := ev.Data.(map[string]interface{})
- device := data["device"].(string)
- folder := data["folder"].(string)
- p.eventMut.Lock()
- m := p.updateSequenceLocked(folder, device, data["sequence"])
- l.Debugf("FolderCompletion %v\n\t%+v", p.id, folder, m)
- p.eventMut.Unlock()
- }
- }
- }
- }
- func (p *Process) updateSequenceLocked(folder, device string, sequenceIntf interface{}) map[string]int64 {
- sequence, _ := sequenceIntf.(json.Number).Int64()
- m := p.sequence[folder]
- if m == nil {
- m = make(map[string]int64)
- }
- m[device] = sequence
- p.sequence[folder] = m
- return m
- }
- type ConnectionStats struct {
- Address string
- Type string
- Connected bool
- Paused bool
- ClientVersion string
- InBytesTotal int64
- OutBytesTotal int64
- }
- func (p *Process) Connections() (map[string]ConnectionStats, error) {
- bs, err := p.Get("/rest/system/connections")
- if err != nil {
- return nil, err
- }
- var res map[string]ConnectionStats
- if err := json.Unmarshal(bs, &res); err != nil {
- return nil, err
- }
- return res, nil
- }
- type SystemStatus struct {
- Alloc int64
- Goroutines int
- MyID protocol.DeviceID
- PathSeparator string
- StartTime time.Time
- Sys int64
- Themes []string
- Tilde string
- Uptime int
- }
- func (p *Process) SystemStatus() (SystemStatus, error) {
- bs, err := p.Get("/rest/system/status")
- if err != nil {
- return SystemStatus{}, err
- }
- var res SystemStatus
- if err := json.Unmarshal(bs, &res); err != nil {
- return SystemStatus{}, err
- }
- return res, nil
- }
- type SystemVersion struct {
- Arch string
- Codename string
- LongVersion string
- OS string
- Version string
- }
- func (p *Process) SystemVersion() (SystemVersion, error) {
- bs, err := p.Get("/rest/system/version")
- if err != nil {
- return SystemVersion{}, err
- }
- var res SystemVersion
- if err := json.Unmarshal(bs, &res); err != nil {
- return SystemVersion{}, err
- }
- return res, nil
- }
- func (p *Process) RemoteInSync(folder string, dev protocol.DeviceID) (bool, error) {
- bs, err := p.Get(fmt.Sprintf("/rest/db/completion?folder=%v&device=%v", url.QueryEscape(folder), dev))
- if err != nil {
- return false, err
- }
- var comp model.FolderCompletion
- if err := json.Unmarshal(bs, &comp); err != nil {
- return false, err
- }
- return comp.NeedItems+comp.NeedDeletes == 0, nil
- }
|