Add functions to interact with the beacon client #18

Merged
abdulrabbani00 merged 7 commits from feature/17-interact-with-the-beacon-client into develop 2022-04-27 18:07:46 +00:00
18 changed files with 207 additions and 52 deletions
Showing only changes of commit f46c4410a2 - Show all commits

View File

@ -21,6 +21,7 @@ var (
dbPort int dbPort int
bcAddress string bcAddress string
bcPort int bcPort int
bcConnectionProtocol string
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
) )
@ -62,6 +63,7 @@ func init() {
//// Beacon Client Specific //// Beacon Client Specific
captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required if username is set)") captureCmd.PersistentFlags().StringVarP(&bcAddress, "bc.address", "l", "", "Address to connect to beacon node (required if username is set)")
captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required if username is set)") captureCmd.PersistentFlags().IntVarP(&bcPort, "bc.port", "r", 0, "Port to connect to beacon node (required if username is set)")
captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.")
err = captureCmd.MarkPersistentFlagRequired("bc.address") err = captureCmd.MarkPersistentFlagRequired("bc.address")
exitErr(err) exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("bc.port") err = captureCmd.MarkPersistentFlagRequired("bc.port")
@ -87,6 +89,8 @@ func init() {
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.port", captureCmd.PersistentFlags().Lookup("bc.port")) err = viper.BindPFlag("bc.port", captureCmd.PersistentFlags().Lookup("bc.port"))
exitErr(err) exitErr(err)
err = viper.BindPFlag("bc.connectionProtocol", captureCmd.PersistentFlags().Lookup("bc.connectionProtocol"))
exitErr(err)
// Here you will define your flags and configuration settings. // Here you will define your flags and configuration settings.
} }

View File

@ -31,7 +31,7 @@ func startHeadTracking() {
log.Info("Starting the application in head tracking mode.") log.Info("Starting the application in head tracking mode.")
ctx := context.Background() ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort) BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to Start application") loghelper.LogError(err).Error("Unable to Start application")
} }

2
go.mod
View File

@ -19,6 +19,7 @@ require (
github.com/jackc/puddle v1.2.1 // indirect github.com/jackc/puddle v1.2.1 // indirect
github.com/kr/text v0.2.0 // indirect github.com/kr/text v0.2.0 // indirect
github.com/lib/pq v1.10.4 // indirect github.com/lib/pq v1.10.4 // indirect
github.com/minio/sha256-simd v0.1.1 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect golang.org/x/net v0.0.0-20220412020605-290c469a71a5 // indirect
@ -27,6 +28,7 @@ require (
) )
require ( require (
github.com/ferranbt/fastssz v0.0.0-20220303160658-88bb965b6747
github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/georgysavva/scany v0.3.0 github.com/georgysavva/scany v0.3.0
github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect

7
go.sum
View File

@ -65,6 +65,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ferranbt/fastssz v0.0.0-20220303160658-88bb965b6747 h1:K2Bt7NSX8x/5MD2RiO7cPLy21dBgnQ84r9uR0QYoHrE=
github.com/ferranbt/fastssz v0.0.0-20220303160658-88bb965b6747/go.mod h1:S8yiDeAXy8f88W4Ul+0dBMPx49S05byYbmZD6Uv94K4=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
@ -112,6 +114,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
@ -251,6 +255,9 @@ github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hd
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU=
github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
github.com/mitchellh/mapstructure v1.3.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs= github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs=
github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=

View File

@ -56,11 +56,11 @@ func SetupPostgresDb(dbHostname string, dbPort int, dbName string, dbUsername st
// //
// 2. Connect to the database. // 2. Connect to the database.
// //
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int) (*beaconclient.BeaconClient, sql.Database, error) { func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) {
log.Info("Booting the Application") log.Info("Booting the Application")
log.Debug("Creating the Beacon Client") log.Debug("Creating the Beacon Client")
BC = beaconclient.CreateBeaconClient(ctx, bcAddress, bcPort) BC = beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort)
log.Debug("Checking Beacon Client") log.Debug("Checking Beacon Client")
err := BC.CheckBeaconClient() err := BC.CheckBeaconClient()
@ -77,10 +77,10 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
} }
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start. // Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int) (*beaconclient.BeaconClient, sql.Database, error) { func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string) (*beaconclient.BeaconClient, sql.Database, error) {
var err error var err error
for i := 0; i < maxRetry; i++ { for i := 0; i < maxRetry; i++ {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort) BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol)
if err != nil { if err != nil {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"retryNumber": i, "retryNumber": i,

View File

@ -18,30 +18,31 @@ var _ = Describe("Boot", func() {
dbDriver string = "PGX" dbDriver string = "PGX"
bcAddress string = "localhost" bcAddress string = "localhost"
bcPort int = 5052 bcPort int = 5052
bcConnectionProtocol string = "http"
) )
Describe("Booting the application", Label("integration"), func() { Describe("Booting the application", Label("integration"), func() {
Context("When the DB and BC are both up and running", func() { Context("When the DB and BC are both up and running", func() {
It("Should connect successfully", func() { It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort) _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
defer db.Close() defer db.Close()
Expect(err).To(BeNil()) Expect(err).To(BeNil())
}) })
}) })
Context("When the DB is running but not the BC", func() { Context("When the DB is running but not the BC", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100) _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol)
Expect(err).ToNot(BeNil()) Expect(err).ToNot(BeNil())
}) })
}) })
Context("When the BC is running but not the DB", func() { Context("When the BC is running but not the DB", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort) _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol)
Expect(err).ToNot(BeNil()) Expect(err).ToNot(BeNil())
}) })
}) })
Context("When neither the BC or DB are running", func() { Context("When neither the BC or DB are running", func() {
It("Should not connect successfully", func() { It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100) _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol)
Expect(err).ToNot(BeNil()) Expect(err).ToNot(BeNil())
}) })
}) })

View File

@ -13,14 +13,14 @@ var (
bcHeadTopicEndpoint = "/eth/v1/events?topics=head" // Endpoint used to subscribe to the head of the chain bcHeadTopicEndpoint = "/eth/v1/events?topics=head" // Endpoint used to subscribe to the head of the chain
bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain
bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
connectionProtocol = "http" bcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks
bcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States
) )
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying. // A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.
type BeaconClient struct { type BeaconClient struct {
Context context.Context // A context generic context with multiple uses. Context context.Context // A context generic context with multiple uses.
ServerAddress string // Address of the Beacon Server ServerEndpoint string // What is the endpoint of the beacon server.
ServerPort int // Port of the Beacon Server
PerformHeadTracking bool // Should we track head? PerformHeadTracking bool // Should we track head?
PerformHistoricalProcessing bool // Should we perform historical processing? PerformHistoricalProcessing bool // Should we perform historical processing?
HeadTracking *SseEvents[Head] // Track the head block HeadTracking *SseEvents[Head] // Track the head block
@ -30,7 +30,7 @@ type BeaconClient struct {
// A struct to keep track of relevant the head event topic. // A struct to keep track of relevant the head event topic.
type SseEvents[P ProcessedEvents] struct { type SseEvents[P ProcessedEvents] struct {
Url string // The url for the subscription. Primarily used for logging Endpoint string // The endpoint for the subscription. Primarily used for logging
MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel MessagesCh chan *sse.Event // Contains all the messages from the SSE Channel
ErrorCh chan *SseError // Contains any errors while SSE streaming occurred ErrorCh chan *SseError // Contains any errors while SSE streaming occurred
ProcessCh chan *P // Used to capture processed data in its proper struct. ProcessCh chan *P // Used to capture processed data in its proper struct.
@ -44,30 +44,30 @@ type SseError struct {
} }
// A Function to create the BeaconClient. // A Function to create the BeaconClient.
func CreateBeaconClient(ctx context.Context, bcAddress string, bcPort int) *BeaconClient { func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int) *BeaconClient {
endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort)
log.Info("Creating the BeaconClient") log.Info("Creating the BeaconClient")
return &BeaconClient{ return &BeaconClient{
Context: ctx, Context: ctx,
ServerAddress: bcAddress, ServerEndpoint: endpoint,
ServerPort: bcPort, HeadTracking: createSseEvent[Head](endpoint, bcHeadTopicEndpoint),
HeadTracking: createSseEvent[Head](connectionProtocol, bcAddress, bcPort, bcHeadTopicEndpoint), ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
ReOrgTracking: createSseEvent[ChainReorg](connectionProtocol, bcAddress, bcPort, bcReorgTopicEndpoint), FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
FinalizationTracking: createSseEvent[FinalizedCheckpoint](connectionProtocol, bcAddress, bcPort, bcFinalizedTopicEndpoint),
} }
} }
// Create all the channels to handle a SSE events // Create all the channels to handle a SSE events
func createSseEvent[P ProcessedEvents](connectionProtocol, bcAddress string, bcPort int, endpoint string) *SseEvents[P] { func createSseEvent[P ProcessedEvents](baseEndpoint string, path string) *SseEvents[P] {
url := fmt.Sprintf("%s://%s:%d%s", connectionProtocol, bcAddress, bcPort, endpoint) endpoint := baseEndpoint + path
sseEvents := &SseEvents[P]{ sseEvents := &SseEvents[P]{
Url: url, Endpoint: endpoint,
MessagesCh: make(chan *sse.Event), MessagesCh: make(chan *sse.Event),
ErrorCh: make(chan *SseError), ErrorCh: make(chan *SseError),
ProcessCh: make(chan *P), ProcessCh: make(chan *P),
SseClient: func(url string) *sse.Client { SseClient: func(endpoint string) *sse.Client {
log.WithFields(log.Fields{"url": url}).Info("Creating SSE client") log.WithFields(log.Fields{"endpoint": endpoint}).Info("Creating SSE client")
return sse.NewClient(url) return sse.NewClient(endpoint)
}(url), }(endpoint),
} }
return sseEvents return sseEvents
} }

View File

@ -1,8 +1,11 @@
// This file will call all the functions to start and stop capturing the head of the beacon chain.
package beaconclient package beaconclient
import ( import (
"time" "time"
"github.com/ferranbt/fastssz/spectests"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
) )
@ -10,10 +13,40 @@ import (
// This function will perform all the heavy lifting for tracking the head of the chain. // This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHead() { func (bc *BeaconClient) CaptureHead() {
log.Info("We are tracking the head of the chain.") log.Info("We are tracking the head of the chain.")
go bc.handleHead() bc.tempHelper()
go bc.handleFinalizedCheckpoint() // go bc.handleHead()
go bc.handleReorgs() // go bc.handleFinalizedCheckpoint()
bc.captureEventTopic() // go bc.handleReorgs()
// bc.captureEventTopic()
}
// A temporary helper function to see the output of beacon block and states.
func (bc *BeaconClient) tempHelper() {
slot := "3200"
blockEndpoint := bc.ServerEndpoint + bcBlockQueryEndpoint + slot
stateEndpoint := bc.ServerEndpoint + bcStateQueryEndpoint + slot
// Query
log.Info("Get")
blockSsz, _ := querySsz(blockEndpoint, slot)
stateSsz, _ := querySsz(stateEndpoint, slot)
// Transform
log.Info("Tranform")
stateObj := new(spectests.BeaconState)
err := stateObj.UnmarshalSSZ(stateSsz)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
}
blockObj := new(spectests.SignedBeaconBlock)
err = blockObj.UnmarshalSSZ(blockSsz)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
}
// Check
log.Info("Check")
log.Info("State Slot: ", stateObj.Slot)
log.Info("Block Slot: ", blockObj.Block.Slot)
} }
// Stop the head tracking service. // Stop the head tracking service.
@ -36,11 +69,11 @@ func (bc *BeaconClient) StopHeadTracking() error {
// This function closes the SSE subscription, but waits until the MessagesCh is empty // This function closes the SSE subscription, but waits until the MessagesCh is empty
func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) { func (se *SseEvents[ProcessedEvents]) finishProcessingChannel(finish chan<- bool) {
loghelper.LogUrl(se.Url).Info("Received a close event.") loghelper.LogEndpoint(se.Endpoint).Info("Received a close event.")
se.SseClient.Unsubscribe(se.MessagesCh) se.SseClient.Unsubscribe(se.MessagesCh)
for len(se.MessagesCh) != 0 && len(se.ProcessCh) != 0 { for len(se.MessagesCh) != 0 && len(se.ProcessCh) != 0 {
time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond) time.Sleep(time.Duration(shutdownWaitInterval) * time.Millisecond)
} }
loghelper.LogUrl(se.Url).Info("Done processing all messages, ready for shutdown") loghelper.LogEndpoint(se.Endpoint).Info("Done processing all messages, ready for shutdown")
finish <- true finish <- true
} }

View File

@ -3,7 +3,6 @@ package beaconclient
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"strconv"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
@ -15,7 +14,7 @@ import (
// to follow their provided guidelines. https://lighthouse-book.sigmaprime.io/api-bn.html#security // to follow their provided guidelines. https://lighthouse-book.sigmaprime.io/api-bn.html#security
func (bc BeaconClient) CheckBeaconClient() error { func (bc BeaconClient) CheckBeaconClient() error {
log.Debug("Attempting to connect to the beacon client") log.Debug("Attempting to connect to the beacon client")
bcEndpoint := "http://" + bc.ServerAddress + ":" + strconv.Itoa(bc.ServerPort) + bcHealthEndpoint bcEndpoint := bc.ServerEndpoint + bcHealthEndpoint
resp, err := http.Get(bcEndpoint) resp, err := http.Get(bcEndpoint)
if err != nil { if err != nil {
loghelper.LogError(err).Error("Unable to get bc endpoint: ", bcEndpoint) loghelper.LogError(err).Error("Unable to get bc endpoint: ", bcEndpoint)

View File

@ -10,8 +10,8 @@ import (
var _ = Describe("Healthcheck", func() { var _ = Describe("Healthcheck", func() {
var ( var (
BC = beaconclient.CreateBeaconClient(context.Background(), "localhost", 5052) BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052)
errBc = beaconclient.CreateBeaconClient(context.Background(), "blah-blah", 1010) errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010)
) )
Describe("Connecting to the lighthouse client", Label("integration"), func() { Describe("Connecting to the lighthouse client", Label("integration"), func() {
Context("When the client is running", func() { Context("When the client is running", func() {

View File

@ -18,7 +18,7 @@ var (
// When new messages come in, it will ensure that they are decoded into JSON. // When new messages come in, it will ensure that they are decoded into JSON.
// If any errors occur, it log the error information. // If any errors occur, it log the error information.
func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) { func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) {
loghelper.LogUrl(eventHandler.Url).Info("Subscribing to Messages") loghelper.LogEndpoint(eventHandler.Endpoint).Info("Subscribing to Messages")
go eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh) go eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
for { for {
select { select {
@ -30,7 +30,7 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) {
case headErr := <-eventHandler.ErrorCh: case headErr := <-eventHandler.ErrorCh:
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"url": eventHandler.Url, "endpoint": eventHandler.Endpoint,
"err": headErr.err, "err": headErr.err,
"msg": headErr.msg, "msg": headErr.msg,
}, },
@ -44,7 +44,6 @@ func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) {
// Turn the data object into a Struct. // Turn the data object into a Struct.
func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan<- *SseError) { func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan<- *SseError) {
log.WithFields(log.Fields{"msg": msg}).Info("Processing a Message")
var msgMarshaled P var msgMarshaled P
err := json.Unmarshal(msg, &msgMarshaled) err := json.Unmarshal(msg, &msgMarshaled)
if err != nil { if err != nil {
@ -55,7 +54,6 @@ func processMsg[P ProcessedEvents](msg []byte, processCh chan<- *P, errorCh chan
} }
return return
} }
log.WithFields(log.Fields{"process": processCh}).Info("Processed")
processCh <- &msgMarshaled processCh <- &msgMarshaled
} }

View File

@ -1,3 +1,6 @@
// This file contains all the functions to handle SSE events after they have been turned
// to the structs.
package beaconclient package beaconclient
import log "github.com/sirupsen/logrus" import log "github.com/sirupsen/logrus"

View File

@ -0,0 +1,101 @@
// This file will contain functions to query the Beacon Chain Server.
package beaconclient
import (
"fmt"
"io/ioutil"
"net/http"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
// Attempt to use generics..
// // These are types that append slot at the end of the URL to handle a request.
// type SlotBasedRequests interface {
// *specs.BeaconState | *specs.SignedBeaconBlock
// UnmarshalSSZ([]byte) error
// }
//
// func queryState[R SlotBasedRequests](endpoint string, slot string) (R, error) {
// obj := new(R)
// rawState, err := querySlot(endpoint, slot)
// if err != nil {
// return *obj, err
// }
//
// err = &obj.UnmarshalSSZ(rawState)
// err = (*obj).UnmarshalSSZ(rawState)
// if err != nil {
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node Successfully!: %s", err.Error())
// }
// return *obj, nil
// }
// This function will query a state object based on the slot provided.
// The object is SSZ encoded.
//type BeaconBlockResponse struct {
// version string `json: `
//}
// func queryState(endpoint string, slot string) (spectests.BeaconState, error) {
// obj := new(spectests.BeaconState)
// fullEndpoint := endpoint + slot
// rawState, err := querySsz(fullEndpoint, slot)
// if err != nil {
// return *obj, err
// }
//
// err = obj.UnmarshalSSZ(rawState)
// if err != nil {
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node")
// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node: %s", err.Error())
// }
// return *obj, nil
// }
//
// // This function will query a state object based on the slot provided.
// // The object is SSZ encoded.
// func queryBlock(endpoint string, slot string) (spectests.SignedBeaconBlock, error) {
// obj := new(spectests.SignedBeaconBlock)
// fullEndpoint := endpoint + slot
// rawBlock, err := querySsz(fullEndpoint, slot)
// if err != nil {
// return *obj, err
// }
//
// err = obj.UnmarshalSSZ(rawBlock)
// if err != nil {
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node Successfully!: %s", err.Error())
// }
// return *obj, nil
// }
// A helper function to query endpoints that utilize slots.
func querySsz(endpoint string, slot string) ([]byte, error) {
log.WithFields(log.Fields{"endpoint": endpoint}).Info("Querying endpoint")
client := &http.Client{}
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to create a request!")
return nil, fmt.Errorf("Unable to create a request!: %s", err.Error())
}
// Not set correctly
req.Header.Set("Accept", "application/octet-stream")
response, err := client.Do(req)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to query Beacon Node!")
return nil, fmt.Errorf("Unable to query Beacon Node: %s", err.Error())
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!")
return nil, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error())
}
return body, nil
}

View File

@ -5,8 +5,8 @@ import (
) )
// A simple helper function that will help wrap the error message. // A simple helper function that will help wrap the error message.
func LogUrl(url string) *log.Entry { func LogEndpoint(endpoint string) *log.Entry {
return log.WithFields(log.Fields{ return log.WithFields(log.Fields{
"url": url, "endpoint": endpoint,
}) })
} }

View File

@ -11,3 +11,10 @@ func LogError(err error) *log.Entry {
"err": err, "err": err,
}) })
} }
func LogSlotError(slot string, err error) *log.Entry {
return log.WithFields(log.Fields{
"err": err,
"slot": slot,
})
}

BIN
tmp/ci/state Normal file

Binary file not shown.

BIN
tmp/code/block Normal file

Binary file not shown.

BIN
tmp/code/state Normal file

Binary file not shown.