diff --git a/go.mod b/go.mod index 7443e13..ae12cf8 100644 --- a/go.mod +++ b/go.mod @@ -54,6 +54,7 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/viper v1.11.0 github.com/subosito/gotenv v1.2.0 // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect golang.org/x/text v0.3.7 // indirect gopkg.in/ini.v1 v1.66.4 // indirect diff --git a/go.sum b/go.sum index bcab03a..54f3706 100644 --- a/go.sum +++ b/go.sum @@ -696,6 +696,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index 85a4a15..78495b7 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -10,14 +10,14 @@ import ( // TODO: Use prysms config values instead of hardcoding them here. var ( - bcHealthEndpoint = "/eth/v1/node/health" // Endpoint used for the healthcheck - bcHeadTopicEndpoint = "/eth/v1/events?topics=head" // Endpoint used to subscribe to the head of the chain - bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain - bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain - bcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks - bcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States - bcSlotsPerEpoch = 32 // Number of slots in a single Epoch - bcSlotPerHistoricalVector = 8192 // The number of slots in a historic vector. + bcHealthEndpoint = "/eth/v1/node/health" // Endpoint used for the healthcheck + bcHeadTopicEndpoint = "/eth/v1/events?topics=head" // Endpoint used to subscribe to the head of the chain + bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain + bcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks + bcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States + bcSlotsPerEpoch = 32 // Number of slots in a single Epoch + //bcSlotPerHistoricalVector = 8192 // The number of slots in a historic vector. + //bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain ) // A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying. diff --git a/pkg/beaconclient/capturehead.go b/pkg/beaconclient/capturehead.go index 655f61e..33969ee 100644 --- a/pkg/beaconclient/capturehead.go +++ b/pkg/beaconclient/capturehead.go @@ -20,48 +20,16 @@ func (bc *BeaconClient) CaptureHead(db sql.Database) { bc.captureEventTopic() } -// A temporary helper function to see the output of beacon block and states. -//func (bc *BeaconClient) tempHelper() { -// slot := "3200" -// blockEndpoint := bc.ServerEndpoint + bcBlockQueryEndpoint + slot -// stateEndpoint := bc.ServerEndpoint + bcStateQueryEndpoint + slot -// // Query -// log.Info("Get") -// blockSsz, _ := querySsz(blockEndpoint, slot) -// stateSsz, _ := querySsz(stateEndpoint, slot) -// // Transform -// log.Info("Tranform") -// stateObj := new(spectests.BeaconState) -// err := stateObj.UnmarshalSSZ(stateSsz) -// if err != nil { -// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!") -// } -// -// blockObj := new(spectests.SignedBeaconBlock) -// err = blockObj.UnmarshalSSZ(blockSsz) -// if err != nil { -// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!") -// } -// -// // Check -// log.Info("Check") -// log.Info("State Slot: ", stateObj.Slot) -// log.Info("Block Slot: ", blockObj.Block.Slot) -//} -// // Stop the head tracking service. func (bc *BeaconClient) StopHeadTracking() error { log.Info("We are going to stop tracking the head of chain because of the shutdown signal.") chHead := make(chan bool) chReorg := make(chan bool) - //chFinal := make(chan bool) go bc.HeadTracking.finishProcessingChannel(chHead) go bc.ReOrgTracking.finishProcessingChannel(chReorg) - //go bc.FinalizationTracking.finishProcessingChannel(chFinal) <-chHead - //<-chFinal <-chReorg log.Info("Successfully stopped the head tracking service.") return nil diff --git a/pkg/beaconclient/incomingsse.go b/pkg/beaconclient/incomingsse.go index 42d9955..7e9c222 100644 --- a/pkg/beaconclient/incomingsse.go +++ b/pkg/beaconclient/incomingsse.go @@ -8,6 +8,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" + "golang.org/x/sync/errgroup" ) var ( @@ -18,8 +19,23 @@ var ( // When new messages come in, it will ensure that they are decoded into JSON. // If any errors occur, it log the error information. func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) { - loghelper.LogEndpoint(eventHandler.Endpoint).Info("Subscribing to Messages") - go eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh) + errG := new(errgroup.Group) + errG.Go(func() error { + err := eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh) + if err != nil { + return err + } + return nil + }) + if err := errG.Wait(); err != nil { + log.WithFields(log.Fields{ + "err": err, + "endpoint": eventHandler.Endpoint, + }).Error("Unable to subscribe to the SSE endpoint.") + return + } else { + loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.") + } for { select { case message := <-eventHandler.MessagesCh: @@ -62,5 +78,4 @@ func (bc *BeaconClient) captureEventTopic() { log.Info("We are capturing all SSE events") go handleIncomingSseEvent(bc.HeadTracking) go handleIncomingSseEvent(bc.ReOrgTracking) - // go handleIncomingSseEvent(bc.FinalizationTracking) } diff --git a/pkg/beaconclient/models.go b/pkg/beaconclient/models.go index a3aa232..1cdb443 100644 --- a/pkg/beaconclient/models.go +++ b/pkg/beaconclient/models.go @@ -2,7 +2,7 @@ package beaconclient // This interface captured what the events can be for processed event streams. type ProcessedEvents interface { - Head | FinalizedCheckpoint | ChainReorg + Head | ChainReorg } // This struct captures the JSON representation of the head topic diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index 41d9cf0..c0344f5 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -99,9 +99,10 @@ func handleHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot s } // Handle a historic slot. A wrapper function for calling `handleFullSlot`. -func handleHistoricSlot(db sql.Database, serverAddress string, slot int) error { - return handleFullSlot(db, serverAddress, slot, "", "", 0, "", "historic") -} +// Commented because of the linter...... LOL +//func handleHistoricSlot(db sql.Database, serverAddress string, slot int) error { +// return handleFullSlot(db, serverAddress, slot, "", "", 0, "", "historic") +//} // Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values. func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error { diff --git a/pkg/beaconclient/queryserver.go b/pkg/beaconclient/queryserver.go index 272d429..a249dba 100644 --- a/pkg/beaconclient/queryserver.go +++ b/pkg/beaconclient/queryserver.go @@ -11,70 +11,6 @@ import ( "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" ) -// Attempt to use generics.. -// // These are types that append slot at the end of the URL to handle a request. -// type SlotBasedRequests interface { -// *specs.BeaconState | *specs.SignedBeaconBlock -// UnmarshalSSZ([]byte) error -// } -// -// func queryState[R SlotBasedRequests](endpoint string, slot string) (R, error) { -// obj := new(R) -// rawState, err := querySlot(endpoint, slot) -// if err != nil { -// return *obj, err -// } -// -// err = &obj.UnmarshalSSZ(rawState) -// err = (*obj).UnmarshalSSZ(rawState) -// if err != nil { -// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!") -// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node Successfully!: %s", err.Error()) -// } -// return *obj, nil -// } - -// This function will query a state object based on the slot provided. -// The object is SSZ encoded. - -//type BeaconBlockResponse struct { -// version string `json: ` -//} - -// func queryState(endpoint string, slot string) (spectests.BeaconState, error) { -// obj := new(spectests.BeaconState) -// fullEndpoint := endpoint + slot -// rawState, err := querySsz(fullEndpoint, slot) -// if err != nil { -// return *obj, err -// } -// -// err = obj.UnmarshalSSZ(rawState) -// if err != nil { -// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node") -// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node: %s", err.Error()) -// } -// return *obj, nil -// } -// -// // This function will query a state object based on the slot provided. -// // The object is SSZ encoded. -// func queryBlock(endpoint string, slot string) (spectests.SignedBeaconBlock, error) { -// obj := new(spectests.SignedBeaconBlock) -// fullEndpoint := endpoint + slot -// rawBlock, err := querySsz(fullEndpoint, slot) -// if err != nil { -// return *obj, err -// } -// -// err = obj.UnmarshalSSZ(rawBlock) -// if err != nil { -// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!") -// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node Successfully!: %s", err.Error()) -// } -// return *obj, nil -// } - // A helper function to query endpoints that utilize slots. func querySsz(endpoint string, slot string) ([]byte, int, error) { log.WithFields(log.Fields{"endpoint": endpoint}).Info("Querying endpoint")