main.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. // Copyright (c) 2016 Arista Networks, Inc.
  2. // Use of this source code is governed by the Apache License 2.0
  3. // that can be found in the COPYING file.
  4. // The occlient tool is a client for the gRPC service for getting and setting the
  5. // OpenConfig configuration and state of a network device.
  6. package main
  7. import (
  8. "flag"
  9. "fmt"
  10. "strings"
  11. "sync"
  12. "notabug.org/themusicgod1/goarista/kafka"
  13. "notabug.org/themusicgod1/goarista/kafka/openconfig"
  14. "notabug.org/themusicgod1/goarista/kafka/producer"
  15. "notabug.org/themusicgod1/goarista/openconfig/client"
  16. "github.com/Shopify/sarama"
  17. "notabug.org/themusicgod1/glog"
  18. "github.com/golang/protobuf/proto"
  19. )
  20. var keysFlag = flag.String("kafkakeys", "",
  21. "Keys for kafka messages (comma-separated, default: the value of -addrs")
  22. func newProducer(addresses []string, topic, key, dataset string) (producer.Producer, error) {
  23. encodedKey := sarama.StringEncoder(key)
  24. p, err := producer.New(openconfig.NewEncoder(topic, encodedKey, dataset), addresses, nil)
  25. if err != nil {
  26. return nil, fmt.Errorf("Failed to create Kafka brokers: %s", err)
  27. }
  28. glog.Infof("Connected to Kafka brokers at %s", addresses)
  29. return p, nil
  30. }
  31. func main() {
  32. username, password, subscriptions, grpcAddrs, opts := client.ParseFlags()
  33. if *keysFlag == "" {
  34. *keysFlag = strings.Join(grpcAddrs, ",")
  35. }
  36. keys := strings.Split(*keysFlag, ",")
  37. if len(grpcAddrs) != len(keys) {
  38. glog.Fatal("Please provide the same number of addresses and Kafka keys")
  39. }
  40. addresses := strings.Split(*kafka.Addresses, ",")
  41. wg := new(sync.WaitGroup)
  42. for i, grpcAddr := range grpcAddrs {
  43. key := keys[i]
  44. p, err := newProducer(addresses, *kafka.Topic, key, grpcAddr)
  45. if err != nil {
  46. glog.Fatal(err)
  47. } else {
  48. glog.Infof("Initialized Kafka producer for %s", grpcAddr)
  49. }
  50. publish := func(addr string, message proto.Message) {
  51. p.Write(message)
  52. }
  53. wg.Add(1)
  54. p.Start()
  55. defer p.Stop()
  56. c := client.New(username, password, grpcAddr, opts)
  57. go c.Subscribe(wg, subscriptions, publish)
  58. }
  59. wg.Wait()
  60. }