560305f601
- uses newer version of go-ethereum required for go1.11
209 lines
5.9 KiB
Go
209 lines
5.9 KiB
Go
// Copyright (c) 2016 Arista Networks, Inc.
|
|
// Use of this source code is governed by the Apache License 2.0
|
|
// that can be found in the COPYING file.
|
|
|
|
// The ocredis tool is a client for the OpenConfig gRPC interface that
|
|
// subscribes to state and pushes it to Redis, using Redis' support for hash
|
|
// maps and for publishing events that can be subscribed to.
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"flag"
|
|
"strings"
|
|
|
|
"github.com/aristanetworks/goarista/gnmi"
|
|
|
|
"github.com/aristanetworks/glog"
|
|
pb "github.com/openconfig/gnmi/proto/gnmi"
|
|
redis "gopkg.in/redis.v4"
|
|
)
|
|
|
|
var clusterMode = flag.Bool("cluster", false, "Whether the redis server is a cluster")
|
|
|
|
var redisFlag = flag.String("redis", "",
|
|
"Comma separated list of Redis servers to push updates to")
|
|
|
|
var redisPassword = flag.String("redispass", "", "Password of redis server/cluster")
|
|
|
|
// baseClient allows us to represent both a redis.Client and redis.ClusterClient.
|
|
type baseClient interface {
|
|
Close() error
|
|
ClusterInfo() *redis.StringCmd
|
|
HDel(string, ...string) *redis.IntCmd
|
|
HMSet(string, map[string]string) *redis.StatusCmd
|
|
Ping() *redis.StatusCmd
|
|
Pipelined(func(*redis.Pipeline) error) ([]redis.Cmder, error)
|
|
Publish(string, string) *redis.IntCmd
|
|
}
|
|
|
|
var client baseClient
|
|
|
|
func main() {
|
|
|
|
// gNMI options
|
|
cfg := &gnmi.Config{}
|
|
flag.StringVar(&cfg.Addr, "addr", "localhost", "gNMI gRPC server `address`")
|
|
flag.StringVar(&cfg.CAFile, "cafile", "", "Path to server TLS certificate file")
|
|
flag.StringVar(&cfg.CertFile, "certfile", "", "Path to client TLS certificate file")
|
|
flag.StringVar(&cfg.KeyFile, "keyfile", "", "Path to client TLS private key file")
|
|
flag.StringVar(&cfg.Username, "username", "", "Username to authenticate with")
|
|
flag.StringVar(&cfg.Password, "password", "", "Password to authenticate with")
|
|
flag.BoolVar(&cfg.TLS, "tls", false, "Enable TLS")
|
|
subscribePaths := flag.String("subscribe", "/", "Comma-separated list of paths to subscribe to")
|
|
flag.Parse()
|
|
if *redisFlag == "" {
|
|
glog.Fatal("Specify the address of the Redis server to write to with -redis")
|
|
}
|
|
|
|
subscriptions := strings.Split(*subscribePaths, ",")
|
|
redisAddrs := strings.Split(*redisFlag, ",")
|
|
if !*clusterMode && len(redisAddrs) > 1 {
|
|
glog.Fatal("Please pass only 1 redis address in noncluster mode or enable cluster mode")
|
|
}
|
|
|
|
if *clusterMode {
|
|
client = redis.NewClusterClient(&redis.ClusterOptions{
|
|
Addrs: redisAddrs,
|
|
Password: *redisPassword,
|
|
})
|
|
} else {
|
|
client = redis.NewClient(&redis.Options{
|
|
Addr: *redisFlag,
|
|
Password: *redisPassword,
|
|
})
|
|
}
|
|
defer client.Close()
|
|
|
|
// TODO: Figure out ways to handle being in the wrong mode:
|
|
// Connecting to cluster in non cluster mode - we get a MOVED error on the first HMSET
|
|
// Connecting to a noncluster in cluster mode - we get stuck forever
|
|
_, err := client.Ping().Result()
|
|
if err != nil {
|
|
glog.Fatal("Failed to connect to client: ", err)
|
|
}
|
|
ctx := gnmi.NewContext(context.Background(), cfg)
|
|
client, err := gnmi.Dial(cfg)
|
|
if err != nil {
|
|
glog.Fatal(err)
|
|
}
|
|
respChan := make(chan *pb.SubscribeResponse)
|
|
errChan := make(chan error)
|
|
subscribeOptions := &gnmi.SubscribeOptions{
|
|
Mode: "stream",
|
|
StreamMode: "target_defined",
|
|
Paths: gnmi.SplitPaths(subscriptions),
|
|
}
|
|
go gnmi.Subscribe(ctx, client, subscribeOptions, respChan, errChan)
|
|
for {
|
|
select {
|
|
case resp := <-respChan:
|
|
bufferToRedis(cfg.Addr, resp.GetUpdate())
|
|
case err := <-errChan:
|
|
glog.Fatal(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
type redisData struct {
|
|
key string
|
|
hmset map[string]string
|
|
hdel []string
|
|
pub map[string]interface{}
|
|
}
|
|
|
|
func bufferToRedis(addr string, notif *pb.Notification) {
|
|
if notif == nil {
|
|
// possible that this should be ignored silently
|
|
glog.Error("Nil notification ignored")
|
|
return
|
|
}
|
|
path := addr + "/" + joinPath(notif.Prefix)
|
|
data := &redisData{key: path}
|
|
|
|
if len(notif.Update) != 0 {
|
|
hmset := make(map[string]string, len(notif.Update))
|
|
|
|
// Updates to publish on the pub/sub.
|
|
pub := make(map[string]interface{}, len(notif.Update))
|
|
for _, update := range notif.Update {
|
|
key := joinPath(update.Path)
|
|
value := convertUpdate(update)
|
|
pub[key] = value
|
|
marshaledValue, err := json.Marshal(value)
|
|
if err != nil {
|
|
glog.Fatalf("Failed to JSON marshal update %#v", update)
|
|
}
|
|
hmset[key] = string(marshaledValue)
|
|
}
|
|
data.hmset = hmset
|
|
data.pub = pub
|
|
}
|
|
|
|
if len(notif.Delete) != 0 {
|
|
hdel := make([]string, len(notif.Delete))
|
|
for i, del := range notif.Delete {
|
|
hdel[i] = joinPath(del)
|
|
}
|
|
data.hdel = hdel
|
|
}
|
|
pushToRedis(data)
|
|
}
|
|
|
|
func pushToRedis(data *redisData) {
|
|
_, err := client.Pipelined(func(pipe *redis.Pipeline) error {
|
|
if data.hmset != nil {
|
|
if reply := client.HMSet(data.key, data.hmset); reply.Err() != nil {
|
|
glog.Fatal("Redis HMSET error: ", reply.Err())
|
|
}
|
|
redisPublish(data.key, "updates", data.pub)
|
|
}
|
|
if data.hdel != nil {
|
|
if reply := client.HDel(data.key, data.hdel...); reply.Err() != nil {
|
|
glog.Fatal("Redis HDEL error: ", reply.Err())
|
|
}
|
|
redisPublish(data.key, "deletes", data.hdel)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
glog.Fatal("Failed to send Pipelined commands: ", err)
|
|
}
|
|
}
|
|
|
|
func redisPublish(path, kind string, payload interface{}) {
|
|
js, err := json.Marshal(map[string]interface{}{
|
|
"kind": kind,
|
|
"payload": payload,
|
|
})
|
|
if err != nil {
|
|
glog.Fatalf("JSON error: %s", err)
|
|
}
|
|
if reply := client.Publish(path, string(js)); reply.Err() != nil {
|
|
glog.Fatal("Redis PUBLISH error: ", reply.Err())
|
|
}
|
|
}
|
|
|
|
func joinPath(path *pb.Path) string {
|
|
// path.Elem is empty for some reason so using path.Element instead
|
|
return strings.Join(path.Element, "/")
|
|
}
|
|
|
|
func convertUpdate(update *pb.Update) interface{} {
|
|
switch update.Value.Type {
|
|
case pb.Encoding_JSON:
|
|
var value interface{}
|
|
err := json.Unmarshal(update.Value.Value, &value)
|
|
if err != nil {
|
|
glog.Fatalf("Malformed JSON update %q in %s", update.Value.Value, update)
|
|
}
|
|
return value
|
|
case pb.Encoding_BYTES:
|
|
return update.Value.Value
|
|
default:
|
|
glog.Fatalf("Unhandled type of value %v in %s", update.Value.Type, update)
|
|
return nil
|
|
}
|
|
}
|