Capture the head block in the DB entirely. #27

Merged
abdulrabbani00 merged 13 commits from feature/20-write-sse-events-to-the-db into develop 2022-05-06 15:03:16 +00:00
8 changed files with 33 additions and 111 deletions
Showing only changes of commit 7f1e402218 - Show all commits

1
go.mod
View File

@ -54,6 +54,7 @@ require (
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.11.0 github.com/spf13/viper v1.11.0
github.com/subosito/gotenv v1.2.0 // indirect github.com/subosito/gotenv v1.2.0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect
golang.org/x/text v0.3.7 // indirect golang.org/x/text v0.3.7 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect gopkg.in/ini.v1 v1.66.4 // indirect

1
go.sum
View File

@ -696,6 +696,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View File

@ -13,11 +13,11 @@ var (
bcHealthEndpoint = "/eth/v1/node/health" // Endpoint used for the healthcheck bcHealthEndpoint = "/eth/v1/node/health" // Endpoint used for the healthcheck
bcHeadTopicEndpoint = "/eth/v1/events?topics=head" // Endpoint used to subscribe to the head of the chain bcHeadTopicEndpoint = "/eth/v1/events?topics=head" // Endpoint used to subscribe to the head of the chain
bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain
bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
bcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks bcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks
bcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States bcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States
bcSlotsPerEpoch = 32 // Number of slots in a single Epoch bcSlotsPerEpoch = 32 // Number of slots in a single Epoch
bcSlotPerHistoricalVector = 8192 // The number of slots in a historic vector. //bcSlotPerHistoricalVector = 8192 // The number of slots in a historic vector.
//bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
) )
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying. // A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.

View File

@ -20,48 +20,16 @@ func (bc *BeaconClient) CaptureHead(db sql.Database) {
bc.captureEventTopic() bc.captureEventTopic()
} }
// A temporary helper function to see the output of beacon block and states.
//func (bc *BeaconClient) tempHelper() {
// slot := "3200"
// blockEndpoint := bc.ServerEndpoint + bcBlockQueryEndpoint + slot
// stateEndpoint := bc.ServerEndpoint + bcStateQueryEndpoint + slot
// // Query
// log.Info("Get")
// blockSsz, _ := querySsz(blockEndpoint, slot)
// stateSsz, _ := querySsz(stateEndpoint, slot)
// // Transform
// log.Info("Tranform")
// stateObj := new(spectests.BeaconState)
// err := stateObj.UnmarshalSSZ(stateSsz)
// if err != nil {
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
// }
//
// blockObj := new(spectests.SignedBeaconBlock)
// err = blockObj.UnmarshalSSZ(blockSsz)
// if err != nil {
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
// }
//
// // Check
// log.Info("Check")
// log.Info("State Slot: ", stateObj.Slot)
// log.Info("Block Slot: ", blockObj.Block.Slot)
//}
//
// Stop the head tracking service. // Stop the head tracking service.
func (bc *BeaconClient) StopHeadTracking() error { func (bc *BeaconClient) StopHeadTracking() error {
log.Info("We are going to stop tracking the head of chain because of the shutdown signal.") log.Info("We are going to stop tracking the head of chain because of the shutdown signal.")
chHead := make(chan bool) chHead := make(chan bool)
chReorg := make(chan bool) chReorg := make(chan bool)
//chFinal := make(chan bool)
go bc.HeadTracking.finishProcessingChannel(chHead) go bc.HeadTracking.finishProcessingChannel(chHead)
go bc.ReOrgTracking.finishProcessingChannel(chReorg) go bc.ReOrgTracking.finishProcessingChannel(chReorg)
//go bc.FinalizationTracking.finishProcessingChannel(chFinal)
<-chHead <-chHead
//<-chFinal
<-chReorg <-chReorg
log.Info("Successfully stopped the head tracking service.") log.Info("Successfully stopped the head tracking service.")
return nil return nil

View File

@ -8,6 +8,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
"golang.org/x/sync/errgroup"
) )
var ( var (
@ -18,8 +19,23 @@ var (
// When new messages come in, it will ensure that they are decoded into JSON. // When new messages come in, it will ensure that they are decoded into JSON.
// If any errors occur, it log the error information. // If any errors occur, it log the error information.
func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) { func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) {
loghelper.LogEndpoint(eventHandler.Endpoint).Info("Subscribing to Messages") errG := new(errgroup.Group)
go eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh) errG.Go(func() error {
err := eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
if err != nil {
return err
}
return nil
})
if err := errG.Wait(); err != nil {
log.WithFields(log.Fields{
"err": err,
"endpoint": eventHandler.Endpoint,
}).Error("Unable to subscribe to the SSE endpoint.")
return
} else {
loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.")
}
for { for {
select { select {
case message := <-eventHandler.MessagesCh: case message := <-eventHandler.MessagesCh:
@ -62,5 +78,4 @@ func (bc *BeaconClient) captureEventTopic() {
log.Info("We are capturing all SSE events") log.Info("We are capturing all SSE events")
go handleIncomingSseEvent(bc.HeadTracking) go handleIncomingSseEvent(bc.HeadTracking)
go handleIncomingSseEvent(bc.ReOrgTracking) go handleIncomingSseEvent(bc.ReOrgTracking)
// go handleIncomingSseEvent(bc.FinalizationTracking)
} }

View File

@ -2,7 +2,7 @@ package beaconclient
// This interface captured what the events can be for processed event streams. // This interface captured what the events can be for processed event streams.
type ProcessedEvents interface { type ProcessedEvents interface {
Head | FinalizedCheckpoint | ChainReorg Head | ChainReorg
} }
// This struct captures the JSON representation of the head topic // This struct captures the JSON representation of the head topic

View File

@ -99,9 +99,10 @@ func handleHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot s
} }
// Handle a historic slot. A wrapper function for calling `handleFullSlot`. // Handle a historic slot. A wrapper function for calling `handleFullSlot`.
func handleHistoricSlot(db sql.Database, serverAddress string, slot int) error { // Commented because of the linter...... LOL
return handleFullSlot(db, serverAddress, slot, "", "", 0, "", "historic") //func handleHistoricSlot(db sql.Database, serverAddress string, slot int) error {
} // return handleFullSlot(db, serverAddress, slot, "", "", 0, "", "historic")
//}
// Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values. // Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values.
func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error { func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error {

View File

@ -11,70 +11,6 @@ import (
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
) )
// Attempt to use generics..
// // These are types that append slot at the end of the URL to handle a request.
// type SlotBasedRequests interface {
// *specs.BeaconState | *specs.SignedBeaconBlock
// UnmarshalSSZ([]byte) error
// }
//
// func queryState[R SlotBasedRequests](endpoint string, slot string) (R, error) {
// obj := new(R)
// rawState, err := querySlot(endpoint, slot)
// if err != nil {
// return *obj, err
// }
//
// err = &obj.UnmarshalSSZ(rawState)
// err = (*obj).UnmarshalSSZ(rawState)
// if err != nil {
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node Successfully!: %s", err.Error())
// }
// return *obj, nil
// }
// This function will query a state object based on the slot provided.
// The object is SSZ encoded.
//type BeaconBlockResponse struct {
// version string `json: `
//}
// func queryState(endpoint string, slot string) (spectests.BeaconState, error) {
// obj := new(spectests.BeaconState)
// fullEndpoint := endpoint + slot
// rawState, err := querySsz(fullEndpoint, slot)
// if err != nil {
// return *obj, err
// }
//
// err = obj.UnmarshalSSZ(rawState)
// if err != nil {
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node")
// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node: %s", err.Error())
// }
// return *obj, nil
// }
//
// // This function will query a state object based on the slot provided.
// // The object is SSZ encoded.
// func queryBlock(endpoint string, slot string) (spectests.SignedBeaconBlock, error) {
// obj := new(spectests.SignedBeaconBlock)
// fullEndpoint := endpoint + slot
// rawBlock, err := querySsz(fullEndpoint, slot)
// if err != nil {
// return *obj, err
// }
//
// err = obj.UnmarshalSSZ(rawBlock)
// if err != nil {
// loghelper.LogSlotError(slot, err).Error("Unable to unmarshal the SSZ response from the Beacon Node Successfully!")
// return *obj, fmt.Errorf("Unable to unmarshal the SSZ response from the Beacon Node Successfully!: %s", err.Error())
// }
// return *obj, nil
// }
// A helper function to query endpoints that utilize slots. // A helper function to query endpoints that utilize slots.
func querySsz(endpoint string, slot string) ([]byte, int, error) { func querySsz(endpoint string, slot string) ([]byte, int, error) {
log.WithFields(log.Fields{"endpoint": endpoint}).Info("Querying endpoint") log.WithFields(log.Fields{"endpoint": endpoint}).Info("Querying endpoint")