main.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package main
  2. import (
  3. "encoding/json"
  4. "flag"
  5. "fmt"
  6. "io/ioutil"
  7. stdlog "log"
  8. "math/rand"
  9. "net/http"
  10. "net/rpc"
  11. "os"
  12. "os/signal"
  13. "syscall"
  14. "time"
  15. kitlog "github.com/go-kit/kit/log"
  16. "github.com/go-kit/kit/metrics"
  17. "github.com/go-kit/kit/metrics/expvar"
  18. "github.com/go-kit/kit/metrics/prometheus"
  19. "github.com/go-kit/kit/metrics/statsd"
  20. httptransport "github.com/go-kit/kit/transport/http"
  21. stdprometheus "github.com/prometheus/client_golang/prometheus"
  22. "golang.org/x/net/context"
  23. "github.com/cryptix/exp/todoKitSvc/client"
  24. httpclient "github.com/cryptix/exp/todoKitSvc/client/http"
  25. "github.com/cryptix/exp/todoKitSvc/reqrep"
  26. "github.com/cryptix/exp/todoKitSvc/todosvc"
  27. )
  28. func main() {
  29. // Flag domain. Note that gRPC transitively registers flags via its import
  30. // of glog. So, we define a new flag set, to keep those domains distinct.
  31. fs := flag.NewFlagSet("", flag.ExitOnError)
  32. var (
  33. debugAddr = fs.String("debug.addr", ":8000", "Address for HTTP debug/instrumentation server")
  34. httpAddr = fs.String("http.addr", ":8001", "Address for HTTP (JSON) server")
  35. netrpcAddr = fs.String("netrpc.addr", ":8003", "Address for net/rpc server")
  36. proxyHTTPURL = fs.String("proxy.http.url", "", "if set, proxy requests over HTTP to this todosvc")
  37. )
  38. flag.Usage = fs.Usage // only show our flags
  39. fs.Parse(os.Args[1:])
  40. // `package log` domain
  41. var logger kitlog.Logger
  42. logger = kitlog.NewLogfmtLogger(os.Stderr)
  43. logger = kitlog.NewContext(logger).With("ts", kitlog.DefaultTimestampUTC)
  44. stdlog.SetOutput(kitlog.NewStdlibAdapter(logger)) // redirect stdlib logging to us
  45. stdlog.SetFlags(0) // flags are handled in our logger
  46. // `package metrics` domain
  47. requests := metrics.NewMultiCounter(
  48. expvar.NewCounter("requests"),
  49. statsd.NewCounter(ioutil.Discard, "requests_total", time.Second),
  50. prometheus.NewCounter(stdprometheus.CounterOpts{
  51. Namespace: "todosvc",
  52. Subsystem: "add",
  53. Name: "requests_total",
  54. Help: "Total number of received requests.",
  55. }, []string{}),
  56. )
  57. duration := metrics.NewTimeHistogram(time.Nanosecond, metrics.NewMultiHistogram(
  58. expvar.NewHistogram("duration_nanoseconds_total", 0, 1e9, 3, 50, 95, 99),
  59. statsd.NewHistogram(ioutil.Discard, "duration_nanoseconds_total", time.Second),
  60. prometheus.NewSummary(stdprometheus.SummaryOpts{
  61. Namespace: "todosvc",
  62. Subsystem: "add",
  63. Name: "duration_nanoseconds_total",
  64. Help: "Total nanoseconds spend serving requests.",
  65. }, []string{}),
  66. ))
  67. // Our business and operational domain
  68. var t todosvc.Todo = todosvc.NewInmemTodo()
  69. if *proxyHTTPURL != "" {
  70. e := httpclient.NewClient("GET", *proxyHTTPURL)
  71. t = client.NewClient(e)
  72. }
  73. t = NewLoggingTodo(logger, t)
  74. t = NewInstrumentedTodo(requests, duration, t)
  75. // Server domain
  76. todoEndpoints := makeTodoServerEndpoints(t)
  77. // Mechanical stuff
  78. rand.Seed(time.Now().UnixNano())
  79. root := context.Background()
  80. errc := make(chan error)
  81. go func() {
  82. errc <- interrupt()
  83. }()
  84. // Transport: HTTP (debug/instrumentation)
  85. go func() {
  86. logger.Log("addr", *debugAddr, "transport", "debug")
  87. errc <- http.ListenAndServe(*debugAddr, nil)
  88. }()
  89. // Transport: HTTP (JSON)
  90. go func() {
  91. ctx, cancel := context.WithCancel(root)
  92. defer cancel()
  93. before := []httptransport.BeforeFunc{}
  94. after := []httptransport.AfterFunc{}
  95. mux := http.NewServeMux()
  96. // Add
  97. addDecode := func(r *http.Request) (interface{}, error) {
  98. var request reqrep.AddRequest
  99. if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
  100. return nil, err
  101. }
  102. return request, r.Body.Close()
  103. }
  104. addHandler := makeHTTPBinding(ctx, todoEndpoints.Add, addDecode, before, after)
  105. mux.Handle("/add", addHandler)
  106. // List
  107. listDecode := func(r *http.Request) (interface{}, error) {
  108. var request reqrep.ListRequest
  109. if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
  110. return nil, err
  111. }
  112. return request, r.Body.Close()
  113. }
  114. listHandler := makeHTTPBinding(ctx, todoEndpoints.List, listDecode, before, after)
  115. mux.Handle("/list", listHandler)
  116. logger.Log("addr", *httpAddr, "transport", "HTTP/JSON")
  117. errc <- http.ListenAndServe(*httpAddr, mux)
  118. }()
  119. // Transport: net/rpc
  120. go func() {
  121. ctx, cancel := context.WithCancel(root)
  122. defer cancel()
  123. s := rpc.NewServer()
  124. s.RegisterName("todosvc", NetrpcBinding{ctx, todoEndpoints})
  125. s.HandleHTTP(rpc.DefaultRPCPath, rpc.DefaultDebugPath)
  126. logger.Log("addr", *netrpcAddr, "transport", "net/rpc")
  127. errc <- http.ListenAndServe(*netrpcAddr, s)
  128. }()
  129. logger.Log("fatal", <-errc)
  130. }
  131. func interrupt() error {
  132. c := make(chan os.Signal)
  133. signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
  134. return fmt.Errorf("%s", <-c)
  135. }