encoder.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. // Copyright (c) 2016 Arista Networks, Inc.
  2. // Use of this source code is governed by the Apache License 2.0
  3. // that can be found in the COPYING file.
  4. package openconfig
  5. import (
  6. "encoding/json"
  7. "fmt"
  8. "time"
  9. "notabug.org/themusicgod1/goarista/elasticsearch"
  10. "notabug.org/themusicgod1/goarista/kafka"
  11. "notabug.org/themusicgod1/goarista/openconfig"
  12. "github.com/Shopify/sarama"
  13. "notabug.org/themusicgod1/glog"
  14. "github.com/golang/protobuf/proto"
  15. pb "github.com/openconfig/reference/rpc/openconfig"
  16. )
  17. // UnhandledMessageError is used for proto messages not matching the handled types
  18. type UnhandledMessageError struct {
  19. message proto.Message
  20. }
  21. func (e UnhandledMessageError) Error() string {
  22. return fmt.Sprintf("Unexpected type %T in proto message: %#v", e.message, e.message)
  23. }
  24. // UnhandledSubscribeResponseError is used for subscribe responses not matching the handled types
  25. type UnhandledSubscribeResponseError struct {
  26. response *pb.SubscribeResponse
  27. }
  28. func (e UnhandledSubscribeResponseError) Error() string {
  29. return fmt.Sprintf("Unexpected type %T in subscribe response: %#v", e.response, e.response)
  30. }
  31. type elasticsearchMessageEncoder struct {
  32. *kafka.BaseEncoder
  33. topic string
  34. dataset string
  35. key sarama.Encoder
  36. }
  37. // NewEncoder creates and returns a new elasticsearch MessageEncoder
  38. func NewEncoder(topic string, key sarama.Encoder, dataset string) kafka.MessageEncoder {
  39. baseEncoder := kafka.NewBaseEncoder("elasticsearch")
  40. return &elasticsearchMessageEncoder{
  41. BaseEncoder: baseEncoder,
  42. topic: topic,
  43. dataset: dataset,
  44. key: key,
  45. }
  46. }
  47. func (e *elasticsearchMessageEncoder) Encode(message proto.Message) ([]*sarama.ProducerMessage,
  48. error) {
  49. response, ok := message.(*pb.SubscribeResponse)
  50. if !ok {
  51. return nil, UnhandledMessageError{message: message}
  52. }
  53. update := response.GetUpdate()
  54. if update == nil {
  55. return nil, UnhandledSubscribeResponseError{response: response}
  56. }
  57. updateMap, err := openconfig.NotificationToMap(e.dataset, update,
  58. elasticsearch.EscapeFieldName)
  59. if err != nil {
  60. return nil, err
  61. }
  62. // Convert time to ms to make elasticsearch happy
  63. updateMap["timestamp"] = updateMap["timestamp"].(int64) / 1000000
  64. updateJSON, err := json.Marshal(updateMap)
  65. if err != nil {
  66. return nil, err
  67. }
  68. glog.V(9).Infof("kafka: %s", updateJSON)
  69. return []*sarama.ProducerMessage{
  70. {
  71. Topic: e.topic,
  72. Key: e.key,
  73. Value: sarama.ByteEncoder(updateJSON),
  74. Metadata: kafka.Metadata{StartTime: time.Unix(0, update.Timestamp), NumMessages: 1},
  75. },
  76. }, nil
  77. }