From 3626029c11a2b9969d4d70fe35c9f7c3f7ddd8d0 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Thu, 22 Sep 2022 17:15:48 -0500 Subject: [PATCH] Update to latest SSE dependency. Add idle/reconnect logic for SSE. --- go.mod | 6 ++--- go.sum | 12 ++-------- internal/shutdown/shutdown_test.go | 2 +- pkg/beaconclient/beaconclient.go | 15 ++++++++---- pkg/beaconclient/capturehead_test.go | 6 ++--- pkg/beaconclient/incomingsse.go | 36 +++++++++++++++++++++++++--- 6 files changed, 52 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index 91c9641..07b84fc 100644 --- a/go.mod +++ b/go.mod @@ -9,10 +9,11 @@ require ( github.com/multiformats/go-multihash v0.2.1 github.com/onsi/ginkgo/v2 v2.1.4 github.com/onsi/gomega v1.19.0 + github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.13.0 github.com/protolambda/zrnt v0.28.0 github.com/protolambda/ztyp v0.2.2 - github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc + github.com/r3labs/sse/v2 v2.8.1 github.com/sirupsen/logrus v1.9.0 ) @@ -22,7 +23,6 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.3.0 // indirect github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect github.com/holiman/uint256 v1.2.0 // indirect @@ -59,7 +59,6 @@ require ( github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/protolambda/bls12-381-util v0.0.0-20210720105258-a772f2aac13e // indirect - github.com/prysmaticlabs/gohashtree v0.0.2-alpha // indirect github.com/rogpeppe/go-internal v1.8.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect go.uber.org/atomic v1.10.0 // indirect @@ -73,7 +72,6 @@ require ( ) require ( - github.com/ferranbt/fastssz v0.1.2 // indirect github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/georgysavva/scany v1.2.0 github.com/hashicorp/hcl v1.0.0 // indirect diff --git a/go.sum b/go.sum index 41a2c68..4e7ac56 100644 --- a/go.sum +++ b/go.sum @@ -79,8 +79,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/ferranbt/fastssz v0.1.2 h1:Dky6dXlngF6Qjc+EfDipAkE83N5I5DE68bY6O0VLNPk= -github.com/ferranbt/fastssz v0.1.2/go.mod h1:X5UPrE2u1UJjxHA8X54u04SBwdAQjG2sFtWs39YxyWs= github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= @@ -137,8 +135,6 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= -github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -438,10 +434,8 @@ github.com/protolambda/zrnt v0.28.0 h1:vdEL8JDqJ3wdzgqgh6Fhz1Wr3+AMGbUZ2nqoNt6QV github.com/protolambda/zrnt v0.28.0/go.mod h1:qcdX9CXFeVNCQK/q0nswpzhd+31RHMk2Ax/2lMsJ4Jw= github.com/protolambda/ztyp v0.2.2 h1:rVcL3vBu9W/aV646zF6caLS/dyn9BN8NYiuJzicLNyY= github.com/protolambda/ztyp v0.2.2/go.mod h1:9bYgKGqg3wJqT9ac1gI2hnVb0STQq7p/1lapqrqY1dU= -github.com/prysmaticlabs/gohashtree v0.0.2-alpha h1:hk5ZsDQuSkyUMhTd55qB396P1+dtyIKiSwMmYE/hyEU= -github.com/prysmaticlabs/gohashtree v0.0.2-alpha/go.mod h1:4pWaT30XoEx1j8KNJf3TV+E3mQkaufn7mf+jRNb/Fuk= -github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o= -github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8= +github.com/r3labs/sse/v2 v2.8.1 h1:lZH+W4XOLIq88U5MIHOsLec7+R62uhz3bIi2yn0Sg8o= +github.com/r3labs/sse/v2 v2.8.1/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= @@ -489,8 +483,6 @@ github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PK github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/subosito/gotenv v1.4.1 h1:jyEFiXpy21Wm81FBN71l9VoMMV8H8jG+qIK3GCpY6Qs= github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= -github.com/umbracle/go-eth-consensus v0.1.1-0.20220824100056-6a6a6f5dbe76 h1:kdbVPrfvG4L3tZBlj8gUNiTalvh0KuAdG6e8erVjfcY= -github.com/umbracle/go-eth-consensus v0.1.1-0.20220824100056-6a6a6f5dbe76/go.mod h1:qEdbWIH3Ud7mRhUlOxNfScXX1JjcqHOqP355Ei14G3s= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/internal/shutdown/shutdown_test.go b/internal/shutdown/shutdown_test.go index 97d83af..cd86500 100644 --- a/internal/shutdown/shutdown_test.go +++ b/internal/shutdown/shutdown_test.go @@ -26,7 +26,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/r3labs/sse" + "github.com/r3labs/sse/v2" log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-eth-beacon-indexer/internal/boot" "github.com/vulcanize/ipld-eth-beacon-indexer/internal/shutdown" diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index 63fd3b4..053a03c 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -18,11 +18,11 @@ package beaconclient import ( "context" "fmt" - "math/rand" - - "github.com/r3labs/sse" + "github.com/r3labs/sse/v2" log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql" + "math/rand" + "time" ) // TODO: Use prysms config values instead of hardcoding them here. @@ -128,7 +128,14 @@ func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEve ProcessCh: make(chan *P), SseClient: func(endpoint string) *sse.Client { log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client") - return sse.NewClient(endpoint) + client := sse.NewClient(endpoint) + client.ReconnectNotify = func(err error, duration time.Duration) { + log.WithFields(log.Fields{"endpoint": endpoint}).Warn("Reconnecting SSE client") + } + client.OnDisconnect(func(c *sse.Client) { + log.WithFields(log.Fields{"endpoint": endpoint}).Warn("SSE client disconnected") + }) + return client }(endpoint), } return sseEvents diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index a83339f..ec1fead 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -30,7 +30,7 @@ import ( "github.com/jarcoal/httpmock" . "github.com/onsi/ginkgo/v2" - "github.com/r3labs/sse" + "github.com/r3labs/sse/v2" log "github.com/sirupsen/logrus" . "github.com/onsi/gomega" @@ -594,7 +594,7 @@ func validateSlot(bc *beaconclient.BeaconClient, headMessage beaconclient.Head, // A helper function to validate the expected output from the eth_beacon.signed_block table. func validateSignedBeaconBlock(bc *beaconclient.BeaconClient, headMessage beaconclient.Head, correctParentRoot string, correctEth1DataBlockHash string, correctMhKey string, - correctExecutionPayloadheader *beaconclient.DbExecutionPayloadHeader) { + correctExecutionPayloadHeader *beaconclient.DbExecutionPayloadHeader) { dbSignedBlock := queryDbSignedBeaconBlock(bc.Db, headMessage.Slot, headMessage.Block) log.Info("validateSignedBeaconBlock: ", headMessage) baseSlot, err := strconv.ParseUint(headMessage.Slot, 10, 64) @@ -604,7 +604,7 @@ func validateSignedBeaconBlock(bc *beaconclient.BeaconClient, headMessage beacon Expect(dbSignedBlock.ParentBlock).To(Equal(correctParentRoot)) Expect(dbSignedBlock.Eth1DataBlockHash).To(Equal(correctEth1DataBlockHash)) Expect(dbSignedBlock.MhKey).To(Equal(correctMhKey)) - Expect(dbSignedBlock.ExecutionPayloadHeader).To(Equal(correctExecutionPayloadheader)) + Expect(dbSignedBlock.ExecutionPayloadHeader).To(Equal(correctExecutionPayloadHeader)) } // A helper function to validate the expected output from the eth_beacon.state table. diff --git a/pkg/beaconclient/incomingsse.go b/pkg/beaconclient/incomingsse.go index cdb4891..36cf2fe 100644 --- a/pkg/beaconclient/incomingsse.go +++ b/pkg/beaconclient/incomingsse.go @@ -19,6 +19,7 @@ package beaconclient import ( "encoding/json" + "github.com/pkg/errors" "time" log "github.com/sirupsen/logrus" @@ -33,7 +34,7 @@ var ( // This function will capture all the SSE events for a given SseEvents object. // When new messages come in, it will ensure that they are decoded into JSON. // If any errors occur, it log the error information. -func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMetricInc func(uint64)) { +func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMetricInc func(uint64), idleTimeout time.Duration) { go func() { errG := new(errgroup.Group) errG.Go(func() error { @@ -55,8 +56,18 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMe }() for { + var idleTimer *time.Timer = nil + var idleTimerC <-chan time.Time = nil + if idleTimeout > 0 { + idleTimer = time.NewTimer(idleTimeout) + idleTimerC = idleTimer.C + } + select { case message := <-eventHandler.MessagesCh: + if nil != idleTimer { + idleTimer.Stop() + } // Message can be nil if its a keep-alive message if len(message.Data) != 0 { log.WithFields(log.Fields{"msg": string(message.Data)}).Debug("We are going to send the following message to be processed.") @@ -64,6 +75,9 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMe } case headErr := <-eventHandler.ErrorCh: + if nil != idleTimer { + idleTimer.Stop() + } log.WithFields(log.Fields{ "endpoint": eventHandler.Endpoint, "err": headErr.err, @@ -71,6 +85,22 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMe }, ).Error("Unable to handle event.") errMetricInc(1) + + case <-idleTimerC: + err := errors.New("SSE idle timeout") + log.WithFields(log.Fields{ + "endpoint": eventHandler.Endpoint, + "err": err, + "msg": err.Error(), + }, + ).Error("TIMEOUT - Attempting to resubscribe") + errMetricInc(1) + eventHandler.SseClient.Unsubscribe(eventHandler.MessagesCh) + eventHandler.SseClient.Connection.CloseIdleConnections() + err = eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh) + if err != nil { + log.Error("Unable to re-subscribe.", err) + } } } } @@ -93,6 +123,6 @@ func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan // Capture all of the event topics. func (bc *BeaconClient) captureEventTopic() { log.Info("We are capturing all SSE events") - go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError) - go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementReorgError) + go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError, time.Second*15) + go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementReorgError, 0) }