dashboard.go 13 KB


  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package dashboard
  17. //go:generate yarn --cwd ./assets install
  18. //go:generate yarn --cwd ./assets build
  19. //go:generate go-bindata -nometadata -o assets.go -prefix assets -nocompress -pkg dashboard assets/index.html assets/bundle.js
  20. //go:generate sh -c "sed 's#var _bundleJs#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
  21. //go:generate sh -c "sed 's#var _indexHtml#//nolint:misspell\\\n&#' assets.go > assets.go.tmp && mv assets.go.tmp assets.go"
  22. //go:generate gofmt -w -s assets.go
  23. import (
  24. "fmt"
  25. "net"
  26. "net/http"
  27. "runtime"
  28. "sync"
  29. "sync/atomic"
  30. "time"
  31. "github.com/elastic/gosigar"
  32. "github.com/ethereum/go-ethereum/log"
  33. "github.com/ethereum/go-ethereum/metrics"
  34. "github.com/ethereum/go-ethereum/p2p"
  35. "github.com/ethereum/go-ethereum/params"
  36. "github.com/ethereum/go-ethereum/rpc"
  37. "golang.org/x/net/websocket"
  38. )
  39. const (
  40. activeMemorySampleLimit = 200 // Maximum number of active memory data samples
  41. virtualMemorySampleLimit = 200 // Maximum number of virtual memory data samples
  42. networkIngressSampleLimit = 200 // Maximum number of network ingress data samples
  43. networkEgressSampleLimit = 200 // Maximum number of network egress data samples
  44. processCPUSampleLimit = 200 // Maximum number of process cpu data samples
  45. systemCPUSampleLimit = 200 // Maximum number of system cpu data samples
  46. diskReadSampleLimit = 200 // Maximum number of disk read data samples
  47. diskWriteSampleLimit = 200 // Maximum number of disk write data samples
  48. )
  49. var nextID uint32 // Next connection id
  50. // Dashboard contains the dashboard internals.
  51. type Dashboard struct {
  52. config *Config
  53. listener net.Listener
  54. conns map[uint32]*client // Currently live websocket connections
  55. charts *SystemMessage
  56. commit string
  57. lock sync.RWMutex // Lock protecting the dashboard's internals
  58. quit chan chan error // Channel used for graceful exit
  59. wg sync.WaitGroup
  60. }
  61. // client represents active websocket connection with a remote browser.
  62. type client struct {
  63. conn *websocket.Conn // Particular live websocket connection
  64. msg chan Message // Message queue for the update messages
  65. logger log.Logger // Logger for the particular live websocket connection
  66. }
  67. // New creates a new dashboard instance with the given configuration.
  68. func New(config *Config, commit string) (*Dashboard, error) {
  69. now := time.Now()
  70. db := &Dashboard{
  71. conns: make(map[uint32]*client),
  72. config: config,
  73. quit: make(chan chan error),
  74. charts: &SystemMessage{
  75. ActiveMemory: emptyChartEntries(now, activeMemorySampleLimit, config.Refresh),
  76. VirtualMemory: emptyChartEntries(now, virtualMemorySampleLimit, config.Refresh),
  77. NetworkIngress: emptyChartEntries(now, networkIngressSampleLimit, config.Refresh),
  78. NetworkEgress: emptyChartEntries(now, networkEgressSampleLimit, config.Refresh),
  79. ProcessCPU: emptyChartEntries(now, processCPUSampleLimit, config.Refresh),
  80. SystemCPU: emptyChartEntries(now, systemCPUSampleLimit, config.Refresh),
  81. DiskRead: emptyChartEntries(now, diskReadSampleLimit, config.Refresh),
  82. DiskWrite: emptyChartEntries(now, diskWriteSampleLimit, config.Refresh),
  83. },
  84. commit: commit,
  85. }
  86. return db, nil
  87. }
  88. // emptyChartEntries returns a ChartEntry array containing limit number of empty samples.
  89. func emptyChartEntries(t time.Time, limit int, refresh time.Duration) ChartEntries {
  90. ce := make(ChartEntries, limit)
  91. for i := 0; i < limit; i++ {
  92. ce[i] = &ChartEntry{
  93. Time: t.Add(-time.Duration(i) * refresh),
  94. }
  95. }
  96. return ce
  97. }
  98. // Protocols is a meaningless implementation of node.Service.
  99. func (db *Dashboard) Protocols() []p2p.Protocol { return nil }
  100. // APIs is a meaningless implementation of node.Service.
  101. func (db *Dashboard) APIs() []rpc.API { return nil }
  102. // Start implements node.Service, starting the data collection thread and the listening server of the dashboard.
  103. func (db *Dashboard) Start(server *p2p.Server) error {
  104. log.Info("Starting dashboard")
  105. db.wg.Add(2)
  106. go db.collectData()
  107. go db.collectLogs() // In case of removing this line change 2 back to 1 in wg.Add.
  108. http.HandleFunc("/", db.webHandler)
  109. http.Handle("/api", websocket.Handler(db.apiHandler))
  110. listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", db.config.Host, db.config.Port))
  111. if err != nil {
  112. return err
  113. }
  114. db.listener = listener
  115. go http.Serve(listener, nil)
  116. return nil
  117. }
  118. // Stop implements node.Service, stopping the data collection thread and the connection listener of the dashboard.
  119. func (db *Dashboard) Stop() error {
  120. // Close the connection listener.
  121. var errs []error
  122. if err := db.listener.Close(); err != nil {
  123. errs = append(errs, err)
  124. }
  125. // Close the collectors.
  126. errc := make(chan error, 1)
  127. for i := 0; i < 2; i++ {
  128. db.quit <- errc
  129. if err := <-errc; err != nil {
  130. errs = append(errs, err)
  131. }
  132. }
  133. // Close the connections.
  134. db.lock.Lock()
  135. for _, c := range db.conns {
  136. if err := c.conn.Close(); err != nil {
  137. c.logger.Warn("Failed to close connection", "err", err)
  138. }
  139. }
  140. db.lock.Unlock()
  141. // Wait until every goroutine terminates.
  142. db.wg.Wait()
  143. log.Info("Dashboard stopped")
  144. var err error
  145. if len(errs) > 0 {
  146. err = fmt.Errorf("%v", errs)
  147. }
  148. return err
  149. }
  150. // webHandler handles all non-api requests, simply flattening and returning the dashboard website.
  151. func (db *Dashboard) webHandler(w http.ResponseWriter, r *http.Request) {
  152. log.Debug("Request", "URL", r.URL)
  153. path := r.URL.String()
  154. if path == "/" {
  155. path = "/index.html"
  156. }
  157. blob, err := Asset(path[1:])
  158. if err != nil {
  159. log.Warn("Failed to load the asset", "path", path, "err", err)
  160. http.Error(w, "not found", http.StatusNotFound)
  161. return
  162. }
  163. w.Write(blob)
  164. }
  165. // apiHandler handles requests for the dashboard.
  166. func (db *Dashboard) apiHandler(conn *websocket.Conn) {
  167. id := atomic.AddUint32(&nextID, 1)
  168. client := &client{
  169. conn: conn,
  170. msg: make(chan Message, 128),
  171. logger: log.New("id", id),
  172. }
  173. done := make(chan struct{})
  174. // Start listening for messages to send.
  175. db.wg.Add(1)
  176. go func() {
  177. defer db.wg.Done()
  178. for {
  179. select {
  180. case <-done:
  181. return
  182. case msg := <-client.msg:
  183. if err := websocket.JSON.Send(client.conn, msg); err != nil {
  184. client.logger.Warn("Failed to send the message", "msg", msg, "err", err)
  185. client.conn.Close()
  186. return
  187. }
  188. }
  189. }
  190. }()
  191. versionMeta := ""
  192. if len(params.VersionMeta) > 0 {
  193. versionMeta = fmt.Sprintf(" (%s)", params.VersionMeta)
  194. }
  195. // Send the past data.
  196. client.msg <- Message{
  197. General: &GeneralMessage{
  198. Version: fmt.Sprintf("v%d.%d.%d%s", params.VersionMajor, params.VersionMinor, params.VersionPatch, versionMeta),
  199. Commit: db.commit,
  200. },
  201. System: &SystemMessage{
  202. ActiveMemory: db.charts.ActiveMemory,
  203. VirtualMemory: db.charts.VirtualMemory,
  204. NetworkIngress: db.charts.NetworkIngress,
  205. NetworkEgress: db.charts.NetworkEgress,
  206. ProcessCPU: db.charts.ProcessCPU,
  207. SystemCPU: db.charts.SystemCPU,
  208. DiskRead: db.charts.DiskRead,
  209. DiskWrite: db.charts.DiskWrite,
  210. },
  211. }
  212. // Start tracking the connection and drop at connection loss.
  213. db.lock.Lock()
  214. db.conns[id] = client
  215. db.lock.Unlock()
  216. defer func() {
  217. db.lock.Lock()
  218. delete(db.conns, id)
  219. db.lock.Unlock()
  220. }()
  221. for {
  222. fail := []byte{}
  223. if _, err := conn.Read(fail); err != nil {
  224. close(done)
  225. return
  226. }
  227. // Ignore all messages
  228. }
  229. }
  230. // collectData collects the required data to plot on the dashboard.
  231. func (db *Dashboard) collectData() {
  232. defer db.wg.Done()
  233. systemCPUUsage := gosigar.Cpu{}
  234. systemCPUUsage.Get()
  235. var (
  236. mem runtime.MemStats
  237. prevNetworkIngress = metrics.DefaultRegistry.Get("p2p/InboundTraffic").(metrics.Meter).Count()
  238. prevNetworkEgress = metrics.DefaultRegistry.Get("p2p/OutboundTraffic").(metrics.Meter).Count()
  239. prevProcessCPUTime = getProcessCPUTime()
  240. prevSystemCPUUsage = systemCPUUsage
  241. prevDiskRead = metrics.DefaultRegistry.Get("eth/db/chaindata/disk/read").(metrics.Meter).Count()
  242. prevDiskWrite = metrics.DefaultRegistry.Get("eth/db/chaindata/disk/write").(metrics.Meter).Count()
  243. frequency = float64(db.config.Refresh / time.Second)
  244. numCPU = float64(runtime.NumCPU())
  245. )
  246. for {
  247. select {
  248. case errc := <-db.quit:
  249. errc <- nil
  250. return
  251. case <-time.After(db.config.Refresh):
  252. systemCPUUsage.Get()
  253. var (
  254. curNetworkIngress = metrics.DefaultRegistry.Get("p2p/InboundTraffic").(metrics.Meter).Count()
  255. curNetworkEgress = metrics.DefaultRegistry.Get("p2p/OutboundTraffic").(metrics.Meter).Count()
  256. curProcessCPUTime = getProcessCPUTime()
  257. curSystemCPUUsage = systemCPUUsage
  258. curDiskRead = metrics.DefaultRegistry.Get("eth/db/chaindata/disk/read").(metrics.Meter).Count()
  259. curDiskWrite = metrics.DefaultRegistry.Get("eth/db/chaindata/disk/write").(metrics.Meter).Count()
  260. deltaNetworkIngress = float64(curNetworkIngress - prevNetworkIngress)
  261. deltaNetworkEgress = float64(curNetworkEgress - prevNetworkEgress)
  262. deltaProcessCPUTime = curProcessCPUTime - prevProcessCPUTime
  263. deltaSystemCPUUsage = curSystemCPUUsage.Delta(prevSystemCPUUsage)
  264. deltaDiskRead = curDiskRead - prevDiskRead
  265. deltaDiskWrite = curDiskWrite - prevDiskWrite
  266. )
  267. prevNetworkIngress = curNetworkIngress
  268. prevNetworkEgress = curNetworkEgress
  269. prevProcessCPUTime = curProcessCPUTime
  270. prevSystemCPUUsage = curSystemCPUUsage
  271. prevDiskRead = curDiskRead
  272. prevDiskWrite = curDiskWrite
  273. now := time.Now()
  274. runtime.ReadMemStats(&mem)
  275. activeMemory := &ChartEntry{
  276. Time: now,
  277. Value: float64(mem.Alloc) / frequency,
  278. }
  279. virtualMemory := &ChartEntry{
  280. Time: now,
  281. Value: float64(mem.Sys) / frequency,
  282. }
  283. networkIngress := &ChartEntry{
  284. Time: now,
  285. Value: deltaNetworkIngress / frequency,
  286. }
  287. networkEgress := &ChartEntry{
  288. Time: now,
  289. Value: deltaNetworkEgress / frequency,
  290. }
  291. processCPU := &ChartEntry{
  292. Time: now,
  293. Value: deltaProcessCPUTime / frequency / numCPU * 100,
  294. }
  295. systemCPU := &ChartEntry{
  296. Time: now,
  297. Value: float64(deltaSystemCPUUsage.Sys+deltaSystemCPUUsage.User) / frequency / numCPU,
  298. }
  299. diskRead := &ChartEntry{
  300. Time: now,
  301. Value: float64(deltaDiskRead) / frequency,
  302. }
  303. diskWrite := &ChartEntry{
  304. Time: now,
  305. Value: float64(deltaDiskWrite) / frequency,
  306. }
  307. db.charts.ActiveMemory = append(db.charts.ActiveMemory[1:], activeMemory)
  308. db.charts.VirtualMemory = append(db.charts.VirtualMemory[1:], virtualMemory)
  309. db.charts.NetworkIngress = append(db.charts.NetworkIngress[1:], networkIngress)
  310. db.charts.NetworkEgress = append(db.charts.NetworkEgress[1:], networkEgress)
  311. db.charts.ProcessCPU = append(db.charts.ProcessCPU[1:], processCPU)
  312. db.charts.SystemCPU = append(db.charts.SystemCPU[1:], systemCPU)
  313. db.charts.DiskRead = append(db.charts.DiskRead[1:], diskRead)
  314. db.charts.DiskWrite = append(db.charts.DiskRead[1:], diskWrite)
  315. db.sendToAll(&Message{
  316. System: &SystemMessage{
  317. ActiveMemory: ChartEntries{activeMemory},
  318. VirtualMemory: ChartEntries{virtualMemory},
  319. NetworkIngress: ChartEntries{networkIngress},
  320. NetworkEgress: ChartEntries{networkEgress},
  321. ProcessCPU: ChartEntries{processCPU},
  322. SystemCPU: ChartEntries{systemCPU},
  323. DiskRead: ChartEntries{diskRead},
  324. DiskWrite: ChartEntries{diskWrite},
  325. },
  326. })
  327. }
  328. }
  329. }
  330. // collectLogs collects and sends the logs to the active dashboards.
  331. func (db *Dashboard) collectLogs() {
  332. defer db.wg.Done()
  333. id := 1
  334. // TODO (kurkomisi): log collection comes here.
  335. for {
  336. select {
  337. case errc := <-db.quit:
  338. errc <- nil
  339. return
  340. case <-time.After(db.config.Refresh / 2):
  341. db.sendToAll(&Message{
  342. Logs: &LogsMessage{
  343. Log: []string{fmt.Sprintf("%-4d: This is a fake log.", id)},
  344. },
  345. })
  346. id++
  347. }
  348. }
  349. }
  350. // sendToAll sends the given message to the active dashboards.
  351. func (db *Dashboard) sendToAll(msg *Message) {
  352. db.lock.Lock()
  353. for _, c := range db.conns {
  354. select {
  355. case c.msg <- *msg:
  356. default:
  357. c.conn.Close()
  358. }
  359. }
  360. db.lock.Unlock()
  361. }