Capture the head block in the DB entirely. #27

Merged
abdulrabbani00 merged 13 commits from feature/20-write-sse-events-to-the-db into develop 2022-05-06 15:03:16 +00:00
6 changed files with 206 additions and 19 deletions
Showing only changes of commit aad2af0888 - Show all commits

View File

@ -19,13 +19,19 @@ var (
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying. // A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.
type BeaconClient struct { type BeaconClient struct {
Context context.Context // A context generic context with multiple uses. Context context.Context // A context generic context with multiple uses.
ServerEndpoint string // What is the endpoint of the beacon server. ServerEndpoint string // What is the endpoint of the beacon server.
PerformHeadTracking bool // Should we track head? PerformHistoricalProcessing bool // Should we perform historical processing?
PerformHistoricalProcessing bool // Should we perform historical processing?
HeadTracking *SseEvents[Head] // Track the head block // Used for Head Tracking
ReOrgTracking *SseEvents[ChainReorg] // Track all Reorgs PerformHeadTracking bool // Should we track head?
FinalizationTracking *SseEvents[FinalizedCheckpoint] // Track all finalization checkpoints StartingSlot int // If we're performing head tracking. What is the first slot we processed.
PreviousSlot int // Whats the previous slot we processed
PreviousBlockRoot string // Whats the previous block root, used to check the next blocks parent.
CheckKnownGaps bool // Should we check for gaps at start up.
HeadTracking *SseEvents[Head] // Track the head block
ReOrgTracking *SseEvents[ChainReorg] // Track all Reorgs
//FinalizationTracking *SseEvents[FinalizedCheckpoint] // Track all finalization checkpoints
} }
// A struct to keep track of relevant the head event topic. // A struct to keep track of relevant the head event topic.
@ -48,11 +54,11 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres
endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort) endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort)
log.Info("Creating the BeaconClient") log.Info("Creating the BeaconClient")
return &BeaconClient{ return &BeaconClient{
Context: ctx, Context: ctx,
ServerEndpoint: endpoint, ServerEndpoint: endpoint,
HeadTracking: createSseEvent[Head](endpoint, bcHeadTopicEndpoint), HeadTracking: createSseEvent[Head](endpoint, bcHeadTopicEndpoint),
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
} }
} }

View File

@ -54,14 +54,14 @@ func (bc *BeaconClient) StopHeadTracking() error {
log.Info("We are going to stop tracking the head of chain because of the shutdown signal.") log.Info("We are going to stop tracking the head of chain because of the shutdown signal.")
chHead := make(chan bool) chHead := make(chan bool)
chReorg := make(chan bool) chReorg := make(chan bool)
chFinal := make(chan bool) //chFinal := make(chan bool)
go bc.HeadTracking.finishProcessingChannel(chHead) go bc.HeadTracking.finishProcessingChannel(chHead)
go bc.ReOrgTracking.finishProcessingChannel(chReorg) go bc.ReOrgTracking.finishProcessingChannel(chReorg)
go bc.FinalizationTracking.finishProcessingChannel(chFinal) //go bc.FinalizationTracking.finishProcessingChannel(chFinal)
<-chHead <-chHead
<-chFinal //<-chFinal
<-chReorg <-chReorg
log.Info("Successfully stopped the head tracking service.") log.Info("Successfully stopped the head tracking service.")
return nil return nil

View File

@ -62,5 +62,5 @@ func (bc *BeaconClient) captureEventTopic() {
log.Info("We are capturing all SSE events") log.Info("We are capturing all SSE events")
go handleIncomingSseEvent(bc.HeadTracking) go handleIncomingSseEvent(bc.HeadTracking)
go handleIncomingSseEvent(bc.ReOrgTracking) go handleIncomingSseEvent(bc.ReOrgTracking)
go handleIncomingSseEvent(bc.FinalizationTracking) // go handleIncomingSseEvent(bc.FinalizationTracking)
} }

View File

@ -1,5 +1,7 @@
package beaconclient package beaconclient
import "math/big"
// This interface captured what the events can be for processed event streams. // This interface captured what the events can be for processed event streams.
type ProcessedEvents interface { type ProcessedEvents interface {
Head | FinalizedCheckpoint | ChainReorg Head | FinalizedCheckpoint | ChainReorg
@ -35,3 +37,27 @@ type ChainReorg struct {
Epoch string `json:"epoch"` Epoch string `json:"epoch"`
ExecutionOptimistic bool `json:"execution_optimistic"` ExecutionOptimistic bool `json:"execution_optimistic"`
} }
// A struct to capture whats being written to the ethcl.slots table.
type DbSlots struct {
Epoch *big.Int // The epoch.
Slot *big.Int // The slot.
BlockRoot string // The block root
StateRoot string // The state root
status string // The status, it can be proposed | forked | missed.
}
// A struct to capture whats being written to ethcl.signed_beacon_block table.
type DbSignedBeaconBlock struct {
Slot *big.Int // The slot.
BlockRoot string // The block root
ParentBlock string // The parent block root.
mh_key string // The ipld multihash key.
}
type DbBeaconState struct {
Slot *big.Int // The slot.
StateRoot string // The state root
mh_key string // The ipld multihash key.
}

View File

@ -3,7 +3,12 @@
package beaconclient package beaconclient
import log "github.com/sirupsen/logrus" import (
"fmt"
"strconv"
log "github.com/sirupsen/logrus"
)
// This function will perform the necessary steps to handle a reorg. // This function will perform the necessary steps to handle a reorg.
func (bc *BeaconClient) handleReorgs() { func (bc *BeaconClient) handleReorgs() {
@ -30,8 +35,20 @@ func (bc *BeaconClient) handleFinalizedCheckpoint() {
func (bc *BeaconClient) handleHead() { func (bc *BeaconClient) handleHead() {
log.Info("Starting to process head.") log.Info("Starting to process head.")
for { for {
// We will add real functionality later head := <-bc.HeadTracking.ProcessCh
head := <-bc.ReOrgTracking.ProcessCh // Process all the work here.
// Update the previous block if its the first message.
if bc.PreviousSlot == 0 && bc.PreviousBlockRoot == "" {
var err error
bc.PreviousSlot, err = strconv.Atoi(head.Slot)
if err != nil {
bc.HeadTracking.ErrorCh <- &SseError{
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
}
}
bc.PreviousBlockRoot = head.Block
}
log.WithFields(log.Fields{"head": head}).Debug("Received a new head event.") log.WithFields(log.Fields{"head": head}).Debug("Received a new head event.")
} }

View File

@ -0,0 +1,138 @@
// This file will keep track of all the code needed to process a slot.
// To process a slot, it should have all the necessary data needed to write it to the DB.
// But not actually write it.
package beaconclient
import (
"encoding/hex"
"fmt"
"github.com/ferranbt/fastssz/spectests"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
var (
SlotUnmarshalError = "Unable to properly unmarshal the Slot field in the SignedBeaconBlock."
ParentRootUnmarshalError = "Unable to properly unmarshal the ParentRoot field in the SignedBeaconBlock."
MissingIdentifiedError = "Can't Query state without a set slot or block_root"
)
type ProcessSlot struct {
Slot string // The slot number.
BlockRoot string // The hex encoded string of the BlockRoot.
StateRoot string // The hex encoded string of the StateRoot.
ParentBlockRoot string // The hex encoded string of the parent block.
SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock
SszBeaconState []byte // The entire SSZ encoded BeaconState
FullBeaconState *spectests.BeaconState // The unmarshaled BeaconState object, the unmarshalling could have errors.
FullSignedBeaconBlock *spectests.SignedBeaconBlock // The unmarshaled BeaconState object, the unmarshalling could have errors.
}
// This function will do all the work to process the slot at head.
func processHeadSlot(baseEndpoint string, slot string, blockRoot string, stateRoot string, parentBlockRoot string, previousSlot uint64, previousBlockRoot string) error {
pc := &ProcessSlot{
Slot: slot,
BlockRoot: blockRoot,
StateRoot: stateRoot,
ParentBlockRoot: parentBlockRoot,
}
err := pc.getSignedBeaconBlock(baseEndpoint)
if err != nil {
return err
}
err = pc.getBeaconState(baseEndpoint)
if err != nil {
return err
}
// Handle any reorgs or skipped slots.
if previousSlot != 0 && previousBlockRoot != "" {
pc.checkPreviousSlot(previousSlot, previousBlockRoot)
}
// Get this object ready to write
// Write the object to the DB.
return nil
}
// Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values.
func (ps *ProcessSlot) getSignedBeaconBlock(baseEndpoint string) error {
var blockIdentifier string // Used to query the block
if ps.BlockRoot != "" {
blockIdentifier = ps.BlockRoot
} else if ps.Slot != "" {
blockIdentifier = ps.Slot
} else {
log.Error(MissingIdentifiedError)
return fmt.Errorf(MissingIdentifiedError)
}
blockEndpoint := baseEndpoint + blockIdentifier
ps.SszSignedBeaconBlock, _ = querySsz(blockEndpoint, ps.Slot)
ps.FullSignedBeaconBlock = new(spectests.SignedBeaconBlock)
err := ps.FullSignedBeaconBlock.UnmarshalSSZ(ps.SszSignedBeaconBlock)
if err != nil {
if ps.FullSignedBeaconBlock.Block.Slot == 0 {
loghelper.LogSlotError(ps.Slot, err).Error(SlotUnmarshalError)
return fmt.Errorf(SlotUnmarshalError)
} else if ps.FullSignedBeaconBlock.Block.ParentRoot == nil {
loghelper.LogSlotError(ps.Slot, err).Error(ParentRootUnmarshalError)
return fmt.Errorf(ParentRootUnmarshalError)
}
}
return nil
}
// Update the SszBeaconState and FullBeaconState object with their respective values.
func (ps *ProcessSlot) getBeaconState(baseEndpoint string) error {
var stateIdentifier string // Used to query the state
if ps.StateRoot != "" {
stateIdentifier = ps.BlockRoot
} else if ps.Slot != "" {
stateIdentifier = ps.Slot
} else {
log.Error(MissingIdentifiedError)
return fmt.Errorf(MissingIdentifiedError)
}
stateEndpoint := baseEndpoint + stateIdentifier
ps.SszBeaconState, _ = querySsz(stateEndpoint, ps.Slot)
ps.FullBeaconState = new(spectests.BeaconState)
err := ps.FullSignedBeaconBlock.UnmarshalSSZ(ps.SszBeaconState)
if err != nil {
if ps.FullBeaconState.Slot == 0 {
loghelper.LogSlotError(ps.Slot, err).Error(SlotUnmarshalError)
return fmt.Errorf(SlotUnmarshalError)
}
}
return nil
}
func (ps *ProcessSlot) checkPreviousSlot(previousSlot uint64, previousBlockRoot string) {
if previousSlot == ps.FullBeaconState.Slot {
log.WithFields(log.Fields{
"slot": ps.FullBeaconState.Slot,
"fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.")
// Handle Forks
} else if previousSlot-1 != ps.FullBeaconState.Slot {
log.WithFields(log.Fields{
"previousSlot": previousSlot,
"currentSlot": ps.FullBeaconState.Slot,
}).Error("We skipped a few slots.")
// Call our batch processing function.
} else if previousBlockRoot != "0x"+hex.EncodeToString(ps.FullSignedBeaconBlock.Block.ParentRoot) {
log.WithFields(log.Fields{
"previousBlockRoot": previousBlockRoot,
"currentBlockParent": ps.FullSignedBeaconBlock.Block.ParentRoot,
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
// Handle Forks
}
}