Update to latest SSE dependency. Add idle/reconnect logic for SSE.

This commit is contained in:
Thomas E Lackey 2022-09-22 17:15:48 -05:00
parent a74df084c4
commit 3626029c11
6 changed files with 52 additions and 25 deletions

6
go.mod
View File

@ -9,10 +9,11 @@ require (
github.com/multiformats/go-multihash v0.2.1
github.com/onsi/ginkgo/v2 v2.1.4
github.com/onsi/gomega v1.19.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.13.0
github.com/protolambda/zrnt v0.28.0
github.com/protolambda/ztyp v0.2.2
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc
github.com/r3labs/sse/v2 v2.8.1
github.com/sirupsen/logrus v1.9.0
)
@ -22,7 +23,6 @@ require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
github.com/holiman/uint256 v1.2.0 // indirect
@ -59,7 +59,6 @@ require (
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/protolambda/bls12-381-util v0.0.0-20210720105258-a772f2aac13e // indirect
github.com/prysmaticlabs/gohashtree v0.0.2-alpha // indirect
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
@ -73,7 +72,6 @@ require (
)
require (
github.com/ferranbt/fastssz v0.1.2 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/georgysavva/scany v1.2.0
github.com/hashicorp/hcl v1.0.0 // indirect

12
go.sum
View File

@ -79,8 +79,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ferranbt/fastssz v0.1.2 h1:Dky6dXlngF6Qjc+EfDipAkE83N5I5DE68bY6O0VLNPk=
github.com/ferranbt/fastssz v0.1.2/go.mod h1:X5UPrE2u1UJjxHA8X54u04SBwdAQjG2sFtWs39YxyWs=
github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE=
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
@ -137,8 +135,6 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
@ -438,10 +434,8 @@ github.com/protolambda/zrnt v0.28.0 h1:vdEL8JDqJ3wdzgqgh6Fhz1Wr3+AMGbUZ2nqoNt6QV
github.com/protolambda/zrnt v0.28.0/go.mod h1:qcdX9CXFeVNCQK/q0nswpzhd+31RHMk2Ax/2lMsJ4Jw=
github.com/protolambda/ztyp v0.2.2 h1:rVcL3vBu9W/aV646zF6caLS/dyn9BN8NYiuJzicLNyY=
github.com/protolambda/ztyp v0.2.2/go.mod h1:9bYgKGqg3wJqT9ac1gI2hnVb0STQq7p/1lapqrqY1dU=
github.com/prysmaticlabs/gohashtree v0.0.2-alpha h1:hk5ZsDQuSkyUMhTd55qB396P1+dtyIKiSwMmYE/hyEU=
github.com/prysmaticlabs/gohashtree v0.0.2-alpha/go.mod h1:4pWaT30XoEx1j8KNJf3TV+E3mQkaufn7mf+jRNb/Fuk=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8=
github.com/r3labs/sse/v2 v2.8.1 h1:lZH+W4XOLIq88U5MIHOsLec7+R62uhz3bIi2yn0Sg8o=
github.com/r3labs/sse/v2 v2.8.1/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
@ -489,8 +483,6 @@ github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PK
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/subosito/gotenv v1.4.1 h1:jyEFiXpy21Wm81FBN71l9VoMMV8H8jG+qIK3GCpY6Qs=
github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/umbracle/go-eth-consensus v0.1.1-0.20220824100056-6a6a6f5dbe76 h1:kdbVPrfvG4L3tZBlj8gUNiTalvh0KuAdG6e8erVjfcY=
github.com/umbracle/go-eth-consensus v0.1.1-0.20220824100056-6a6a6f5dbe76/go.mod h1:qEdbWIH3Ud7mRhUlOxNfScXX1JjcqHOqP355Ei14G3s=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=

View File

@ -26,7 +26,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/r3labs/sse"
"github.com/r3labs/sse/v2"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-beacon-indexer/internal/boot"
"github.com/vulcanize/ipld-eth-beacon-indexer/internal/shutdown"

View File

@ -18,11 +18,11 @@ package beaconclient
import (
"context"
"fmt"
"math/rand"
"github.com/r3labs/sse"
"github.com/r3labs/sse/v2"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql"
"math/rand"
"time"
)
// TODO: Use prysms config values instead of hardcoding them here.
@ -128,7 +128,14 @@ func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEve
ProcessCh: make(chan *P),
SseClient: func(endpoint string) *sse.Client {
log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client")
return sse.NewClient(endpoint)
client := sse.NewClient(endpoint)
client.ReconnectNotify = func(err error, duration time.Duration) {
log.WithFields(log.Fields{"endpoint": endpoint}).Warn("Reconnecting SSE client")
}
client.OnDisconnect(func(c *sse.Client) {
log.WithFields(log.Fields{"endpoint": endpoint}).Warn("SSE client disconnected")
})
return client
}(endpoint),
}
return sseEvents

View File

@ -30,7 +30,7 @@ import (
"github.com/jarcoal/httpmock"
. "github.com/onsi/ginkgo/v2"
"github.com/r3labs/sse"
"github.com/r3labs/sse/v2"
log "github.com/sirupsen/logrus"
. "github.com/onsi/gomega"
@ -594,7 +594,7 @@ func validateSlot(bc *beaconclient.BeaconClient, headMessage beaconclient.Head,
// A helper function to validate the expected output from the eth_beacon.signed_block table.
func validateSignedBeaconBlock(bc *beaconclient.BeaconClient, headMessage beaconclient.Head,
correctParentRoot string, correctEth1DataBlockHash string, correctMhKey string,
correctExecutionPayloadheader *beaconclient.DbExecutionPayloadHeader) {
correctExecutionPayloadHeader *beaconclient.DbExecutionPayloadHeader) {
dbSignedBlock := queryDbSignedBeaconBlock(bc.Db, headMessage.Slot, headMessage.Block)
log.Info("validateSignedBeaconBlock: ", headMessage)
baseSlot, err := strconv.ParseUint(headMessage.Slot, 10, 64)
@ -604,7 +604,7 @@ func validateSignedBeaconBlock(bc *beaconclient.BeaconClient, headMessage beacon
Expect(dbSignedBlock.ParentBlock).To(Equal(correctParentRoot))
Expect(dbSignedBlock.Eth1DataBlockHash).To(Equal(correctEth1DataBlockHash))
Expect(dbSignedBlock.MhKey).To(Equal(correctMhKey))
Expect(dbSignedBlock.ExecutionPayloadHeader).To(Equal(correctExecutionPayloadheader))
Expect(dbSignedBlock.ExecutionPayloadHeader).To(Equal(correctExecutionPayloadHeader))
}
// A helper function to validate the expected output from the eth_beacon.state table.

View File

@ -19,6 +19,7 @@ package beaconclient
import (
"encoding/json"
"github.com/pkg/errors"
"time"
log "github.com/sirupsen/logrus"
@ -33,7 +34,7 @@ var (
// This function will capture all the SSE events for a given SseEvents object.
// When new messages come in, it will ensure that they are decoded into JSON.
// If any errors occur, it log the error information.
func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMetricInc func(uint64)) {
func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMetricInc func(uint64), idleTimeout time.Duration) {
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
@ -55,8 +56,18 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMe
}()
for {
var idleTimer *time.Timer = nil
var idleTimerC <-chan time.Time = nil
if idleTimeout > 0 {
idleTimer = time.NewTimer(idleTimeout)
idleTimerC = idleTimer.C
}
select {
case message := <-eventHandler.MessagesCh:
if nil != idleTimer {
idleTimer.Stop()
}
// Message can be nil if its a keep-alive message
if len(message.Data) != 0 {
log.WithFields(log.Fields{"msg": string(message.Data)}).Debug("We are going to send the following message to be processed.")
@ -64,6 +75,9 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMe
}
case headErr := <-eventHandler.ErrorCh:
if nil != idleTimer {
idleTimer.Stop()
}
log.WithFields(log.Fields{
"endpoint": eventHandler.Endpoint,
"err": headErr.err,
@ -71,6 +85,22 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P], errMe
},
).Error("Unable to handle event.")
errMetricInc(1)
case <-idleTimerC:
err := errors.New("SSE idle timeout")
log.WithFields(log.Fields{
"endpoint": eventHandler.Endpoint,
"err": err,
"msg": err.Error(),
},
).Error("TIMEOUT - Attempting to resubscribe")
errMetricInc(1)
eventHandler.SseClient.Unsubscribe(eventHandler.MessagesCh)
eventHandler.SseClient.Connection.CloseIdleConnections()
err = eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
if err != nil {
log.Error("Unable to re-subscribe.", err)
}
}
}
}
@ -93,6 +123,6 @@ func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan
// Capture all of the event topics.
func (bc *BeaconClient) captureEventTopic() {
log.Info("We are capturing all SSE events")
go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError)
go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementReorgError)
go handleIncomingSseEvent(bc.HeadTracking, bc.Metrics.IncrementHeadError, time.Second*15)
go handleIncomingSseEvent(bc.ReOrgTracking, bc.Metrics.IncrementReorgError, 0)
}