backend.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. /*
  2. Package wslog implements a (github.com/op/go-logging).Backend and a (net/http).Handler
  3. The Handler upgrades requests to websocket connections and registeres them with the backend.
  4. The Backend copies log records to all registerd clients.
  5. */
  6. package wslog
  7. import (
  8. "fmt"
  9. "net/http"
  10. "os"
  11. "github.com/gorilla/websocket"
  12. "github.com/op/go-logging"
  13. )
  14. // formattedRec extends a record with it's formatted Message
  15. type formattedRec struct {
  16. logging.Record
  17. Msg string
  18. }
  19. // WebsocketBackend is a Hub-like Backend that copies log messages to listening connections
  20. type WebsocketBackend struct {
  21. connections map[*websocketConn]bool // registered connections
  22. broadcast chan *formattedRec // sends on this
  23. register chan *websocketConn // register requests from the connections.
  24. unregister chan *websocketConn // unregister requests from connections.
  25. upgrader *websocket.Upgrader // elevates a normal http request to a websocket connection
  26. }
  27. // NewBackend creates a new WebsocketBackend.
  28. func NewBackend() *WebsocketBackend {
  29. wb := &WebsocketBackend{
  30. broadcast: make(chan *formattedRec),
  31. register: make(chan *websocketConn),
  32. unregister: make(chan *websocketConn),
  33. connections: make(map[*websocketConn]bool),
  34. // TODO(cryptix): configure sizes?
  35. upgrader: &websocket.Upgrader{ReadBufferSize: 1024, WriteBufferSize: 1024},
  36. }
  37. go wb.run()
  38. return wb
  39. }
  40. // run does the event handling
  41. func (b *WebsocketBackend) run() {
  42. for {
  43. select {
  44. // add new connections to the pool
  45. case c := <-b.register:
  46. b.connections[c] = true
  47. // remove connections from the pool
  48. case c := <-b.unregister:
  49. if _, ok := b.connections[c]; ok {
  50. delete(b.connections, c)
  51. close(c.send)
  52. }
  53. // copy records to every connected client
  54. case rec := <-b.broadcast:
  55. for c := range b.connections {
  56. select {
  57. case c.send <- rec:
  58. default: // remove connection if send fails
  59. delete(b.connections, c)
  60. close(c.send)
  61. }
  62. }
  63. }
  64. }
  65. }
  66. // Log implements the logging.Backend interface. It broadcasts Records to the registerd connections
  67. func (b *WebsocketBackend) Log(level logging.Level, calldepth int, rec *logging.Record) error {
  68. // format here to not loose original calling information
  69. b.broadcast <- &formattedRec{
  70. Record: *rec,
  71. Msg: rec.Formatted(calldepth + 1),
  72. }
  73. return nil
  74. }
  75. // ServeHTTP implements net/http.Handler.
  76. // upgrades the request to a websocket connection
  77. // if successfull, registers it with the backend
  78. func (b *WebsocketBackend) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  79. ws, err := b.upgrader.Upgrade(w, r, nil)
  80. if err != nil {
  81. fmt.Fprintf(os.Stderr, "Upgrade Error: %v\n", err)
  82. return
  83. }
  84. // TODO(cryptix): configure sizes?
  85. c := &websocketConn{
  86. send: make(chan *formattedRec, 10),
  87. ws: ws,
  88. }
  89. // (un)register onto the hub
  90. b.register <- c
  91. defer func() { b.unregister <- c }()
  92. // do the i/o
  93. go c.writer()
  94. c.reader()
  95. }