diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index 4f959e3..420c5d4 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -44,14 +44,16 @@ var ( // A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying. type BeaconClient struct { - Context context.Context // A context generic context with multiple uses. - ServerEndpoint string // What is the endpoint of the beacon server. - Db sql.Database // Database object used for reads and writes. - Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics. - KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry. - UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node. - KnownGapsProcess KnownGapsProcessing // object keeping track of knowngaps processing - CheckDb bool // Should we check the DB to see if the slot exists before processing it? + Context context.Context // A context generic context with multiple uses. + ServerEndpoint string // What is the endpoint of the beacon server. + Db sql.Database // Database object used for reads and writes. + Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics. + KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry. + UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node. + KnownGapsProcess KnownGapsProcessing // object keeping track of knowngaps processing + CheckDb bool // Should we check the DB to see if the slot exists before processing it? + PerformBeaconStateProcessing bool // Should we process BeaconStates? + ProcessBeaconBlockProcessing bool // Should we process BeaconBlocks? // Used for Head Tracking @@ -102,14 +104,16 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort) log.Info("Creating the BeaconClient") return &BeaconClient{ - Context: ctx, - ServerEndpoint: endpoint, - KnownGapTableIncrement: bcKgTableIncrement, - HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint), - ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), - Metrics: metrics, - UniqueNodeIdentifier: uniqueNodeIdentifier, - CheckDb: checkDb, + Context: ctx, + ServerEndpoint: endpoint, + KnownGapTableIncrement: bcKgTableIncrement, + HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint), + ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), + Metrics: metrics, + UniqueNodeIdentifier: uniqueNodeIdentifier, + CheckDb: checkDb, + ProcessBeaconBlockProcessing: true, + PerformBeaconStateProcessing: true, //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), }, nil } diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index 2bf6dfc..5d1b9d8 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -22,7 +22,6 @@ import ( "fmt" log "github.com/sirupsen/logrus" - "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql" "github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper" "golang.org/x/sync/errgroup" ) @@ -31,7 +30,7 @@ import ( func (bc *BeaconClient) CaptureHistoric(ctx context.Context, maxWorkers int) []error { log.Info("We are starting the historical processing service.") bc.HistoricalProcess = HistoricProcessing{db: bc.Db, metrics: bc.Metrics, uniqueNodeIdentifier: bc.UniqueNodeIdentifier} - errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb, bc.Metrics.IncrementHistoricSlotProcessed) + errs := handleBatchProcess(ctx, maxWorkers, bc.HistoricalProcess, bc.SlotProcessingDetails(), bc.Metrics.IncrementHistoricSlotProcessed) log.Debug("Exiting Historical") return errs } @@ -91,7 +90,7 @@ type batchHistoricError struct { // 4. Remove the slot entry from the DB. // // 5. Handle any errors. -func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) []error { +func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, spd SlotProcessingDetails, incrementTracker func(uint64)) []error { slotsCh := make(chan slotsToProcess) workCh := make(chan int) processedCh := make(chan slotsToProcess) @@ -108,7 +107,7 @@ func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, for w := 1; w <= maxWorkers; w++ { log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers") - go processSlotRangeWorker(ctx, workCh, errCh, db, serverEndpoint, metrics, checkDb, incrementTracker) + go processSlotRangeWorker(ctx, workCh, errCh, spd, incrementTracker) } // Process all ranges and send each individual slot to the worker. diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go index 9ef1ec2..60c541c 100644 --- a/pkg/beaconclient/databasewrite.go +++ b/pkg/beaconclient/databasewrite.go @@ -229,6 +229,11 @@ func (dw *DatabaseWriter) upsertSlots() error { // Add the information for the signed_block to a transaction. func (dw *DatabaseWriter) transactSignedBeaconBlocks() error { + if nil == dw.rawSignedBeaconBlock || len(*dw.rawSignedBeaconBlock) == 0 { + log.Warn("Skipping writing of empty BeaconBlock.") + return nil + } + err := dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock) if err != nil { return err @@ -262,6 +267,11 @@ func (dw *DatabaseWriter) upsertSignedBeaconBlock() error { // Add the information for the state to a transaction. func (dw *DatabaseWriter) transactBeaconState() error { + if nil == dw.rawBeaconState || len(*dw.rawBeaconState) == 0 { + log.Warn("Skipping writing of empty BeaconState.") + return nil + } + err := dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState) if err != nil { return err diff --git a/pkg/beaconclient/healthcheck_test.go b/pkg/beaconclient/healthcheck_test.go index c7a642a..b11b4cf 100644 --- a/pkg/beaconclient/healthcheck_test.go +++ b/pkg/beaconclient/healthcheck_test.go @@ -31,7 +31,7 @@ var _ = Describe("Healthcheck", func() { BeforeEach(func() { var err error - Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10, bcUniqueIdentifier, false) + Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 8005, 10, bcUniqueIdentifier, false) Expect(err).ToNot(HaveOccurred()) errBc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10, bcUniqueIdentifier, false) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/beaconclient/processevents.go b/pkg/beaconclient/processevents.go index 8dd5520..f951f8f 100644 --- a/pkg/beaconclient/processevents.go +++ b/pkg/beaconclient/processevents.go @@ -66,7 +66,7 @@ func (bc *BeaconClient) handleHead() { bc.StartingSlot = slot } - go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement, bc.CheckDb) + go processHeadSlot(slot, head.Block, head.State, bc.SlotProcessingDetails()) log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.") diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index c520e41..879a32a 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -97,14 +97,14 @@ func (hp HistoricProcessing) releaseDbLocks() error { } // Process the slot range. -func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics, checkDb bool, incrementTracker func(uint64)) { +func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<- batchHistoricError, spd SlotProcessingDetails, incrementTracker func(uint64)) { for { select { case <-ctx.Done(): return case slot := <-workCh: log.Debug("Handling slot: ", slot) - err, errProcess := handleHistoricSlot(ctx, db, serverAddress, slot, metrics, checkDb) + err, errProcess := handleHistoricSlot(ctx, slot, spd) if err != nil { errMs := batchHistoricError{ err: err, diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index 343fc4a..2a33b5a 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -61,7 +61,7 @@ type KnownGapsProcessing struct { func (bc *BeaconClient) ProcessKnownGaps(ctx context.Context, maxWorkers int) []error { log.Info("We are starting the known gaps processing service.") bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics} - errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb, bc.Metrics.IncrementKnownGapsProcessed) + errs := handleBatchProcess(ctx, maxWorkers, bc.KnownGapsProcess, bc.SlotProcessingDetails(), bc.Metrics.IncrementKnownGapsProcessed) log.Debug("Exiting known gaps processing service") return errs } diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index db7f8c7..d437485 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -35,6 +35,39 @@ import ( "golang.org/x/sync/errgroup" ) +type SlotProcessingDetails struct { + Context context.Context // A context generic context with multiple uses. + ServerEndpoint string // What is the endpoint of the beacon server. + Db sql.Database // Database object used for reads and writes. + Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics. + KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry. + CheckDb bool // Should we check the DB to see if the slot exists before processing it? + PerformBeaconStateProcessing bool // Should we process BeaconStates? + ProcessBeaconBlockProcessing bool // Should we process BeaconBlocks? + + StartingSlot int // If we're performing head tracking. What is the first slot we processed. + PreviousSlot int // Whats the previous slot we processed + PreviousBlockRoot string // Whats the previous block root, used to check the next blocks parent. +} + +func (bc *BeaconClient) SlotProcessingDetails() SlotProcessingDetails { + return SlotProcessingDetails{ + Context: bc.Context, + ServerEndpoint: bc.ServerEndpoint, + Db: bc.Db, + Metrics: bc.Metrics, + + CheckDb: bc.CheckDb, + ProcessBeaconBlockProcessing: bc.ProcessBeaconBlockProcessing, + PerformBeaconStateProcessing: bc.PerformBeaconStateProcessing, + + KnownGapTableIncrement: bc.KnownGapTableIncrement, + StartingSlot: bc.StartingSlot, + PreviousSlot: bc.PreviousSlot, + PreviousBlockRoot: bc.PreviousBlockRoot, + } +} + type ProcessSlot struct { // Generic @@ -50,12 +83,12 @@ type ProcessSlot struct { PerformanceMetrics PerformanceMetrics // An object to keep track of performance metrics. // BeaconBlock - SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock - FullSignedBeaconBlock SignedBeaconBlock // The unmarshaled BeaconState object, the unmarshalling could have errors. + SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock + FullSignedBeaconBlock *SignedBeaconBlock // The unmarshaled BeaconState object, the unmarshalling could have errors. // BeaconState - FullBeaconState BeaconState // The unmarshaled BeaconState object, the unmarshalling could have errors. - SszBeaconState []byte // The entire SSZ encoded BeaconState + FullBeaconState *BeaconState // The unmarshaled BeaconState object, the unmarshalling could have errors. + SszBeaconState []byte // The entire SSZ encoded BeaconState // DB Write objects DbSlotsModel *DbSlots // The model being written to the slots table. @@ -79,7 +112,16 @@ type PerformanceMetrics struct { // This function will do all the work to process the slot and write it to the DB. // It will return the error and error process. The error process is used for providing reach detail to the // known_gaps table. -func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) (error, string) { +func processFullSlot( + ctx context.Context, + slot int, + blockRoot string, + stateRoot string, + previousSlot int, + previousBlockRoot string, + knownGapsTableIncrement int, + headOrHistoric string, + spd *SlotProcessingDetails) (error, string) { select { case <-ctx.Done(): return nil, "" @@ -90,8 +132,8 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, BlockRoot: blockRoot, StateRoot: stateRoot, HeadOrHistoric: headOrHistoric, - Db: db, - Metrics: metrics, + Db: spd.Db, + Metrics: spd.Metrics, PerformanceMetrics: PerformanceMetrics{ BeaconNodeBlockRetrievalTime: 0, BeaconNodeStateRetrievalTime: 0, @@ -108,37 +150,41 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, g, _ := errgroup.WithContext(context.Background()) - // Get the BeaconState. - g.Go(func() error { - select { - case <-ctx.Done(): - return nil - default: - start := time.Now() - err := ps.getBeaconState(serverAddress) - if err != nil { - return err + if spd.PerformBeaconStateProcessing { + // Get the BeaconState. + g.Go(func() error { + select { + case <-ctx.Done(): + return nil + default: + start := time.Now() + err := ps.getBeaconState(spd.ServerEndpoint) + if err != nil { + return err + } + ps.PerformanceMetrics.BeaconNodeStateRetrievalTime = time.Since(start) + return nil } - ps.PerformanceMetrics.BeaconNodeStateRetrievalTime = time.Since(start) - return nil - } - }) + }) + } - // Get the SignedBeaconBlock. - g.Go(func() error { - select { - case <-ctx.Done(): - return nil - default: - start := time.Now() - err := ps.getSignedBeaconBlock(serverAddress) - if err != nil { - return err + if spd.ProcessBeaconBlockProcessing { + // Get the SignedBeaconBlock. + g.Go(func() error { + select { + case <-ctx.Done(): + return nil + default: + start := time.Now() + err := ps.getSignedBeaconBlock(spd.ServerEndpoint) + if err != nil { + return err + } + ps.PerformanceMetrics.BeaconNodeBlockRetrievalTime = time.Since(start) + return nil } - ps.PerformanceMetrics.BeaconNodeBlockRetrievalTime = time.Since(start) - return nil - } - }) + }) + } if err := g.Wait(); err != nil { return err, "processSlot" @@ -151,7 +197,7 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, } ps.PerformanceMetrics.ParseBeaconObjectForHash = time.Since(parseBeaconTime) - if checkDb { + if spd.CheckDb { checkDbTime := time.Now() inDb, err := IsSlotInDb(ctx, ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot) if err != nil { @@ -220,20 +266,23 @@ func processFullSlot(ctx context.Context, db sql.Database, serverAddress string, } // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. -func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) { - // Get the knownGaps at startUp. - if previousSlot == 0 && previousBlockRoot == "" { - writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics) +func processHeadSlot(slot int, blockRoot string, stateRoot string, spd SlotProcessingDetails) { + // Get the knownGaps at startUp + if spd.PreviousSlot == 0 && spd.PreviousBlockRoot == "" { + writeStartUpGaps(spd.Db, spd.KnownGapTableIncrement, slot, spd.Metrics) } - err, errReason := processFullSlot(context.Background(), db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb) + // TODO(telackey): Why context.Background()? + err, errReason := processFullSlot(context.Background(), slot, blockRoot, stateRoot, + spd.PreviousSlot, spd.PreviousBlockRoot, spd.KnownGapTableIncrement, "head", &spd) if err != nil { - writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics) + writeKnownGaps(spd.Db, spd.KnownGapTableIncrement, slot, slot, err, errReason, spd.Metrics) } } // Handle a historic slot. A wrapper function for calling `handleFullSlot`. -func handleHistoricSlot(ctx context.Context, db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics, checkDb bool) (error, string) { - return processFullSlot(ctx, db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1, checkDb) +func handleHistoricSlot(ctx context.Context, slot int, spd SlotProcessingDetails) (error, string) { + return processFullSlot(ctx, slot, "", "", 0, "", + 1, "historic", &spd) } // Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values. @@ -250,7 +299,7 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error { if err != nil || rc != 200 { loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error("Unable to properly query the slot.") - ps.FullSignedBeaconBlock = SignedBeaconBlock{} + ps.FullSignedBeaconBlock = nil ps.SszSignedBeaconBlock = []byte{} ps.ParentBlockRoot = "" ps.Status = "skipped" @@ -261,14 +310,14 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error { err = signedBeaconBlock.UnmarshalSSZ(sszSignedBeaconBlock) if err != nil { loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error("Unable to unmarshal SignedBeaconBlock for slot.") - ps.FullSignedBeaconBlock = SignedBeaconBlock{} + ps.FullSignedBeaconBlock = nil ps.SszSignedBeaconBlock = []byte{} ps.ParentBlockRoot = "" ps.Status = "skipped" return nil } - ps.FullSignedBeaconBlock = signedBeaconBlock + ps.FullSignedBeaconBlock = &signedBeaconBlock ps.SszSignedBeaconBlock = sszSignedBeaconBlock ps.ParentBlockRoot = toHex(ps.FullSignedBeaconBlock.Block().ParentRoot()) @@ -298,13 +347,17 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string) error { return err } - ps.FullBeaconState = beaconState + ps.FullBeaconState = &beaconState ps.SszBeaconState = sszBeaconState return nil } // Check to make sure that the previous block we processed is the parent of the current block. func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) { + if nil == ps.FullSignedBeaconBlock { + log.Debug("Can't check previous slot, no current slot.") + return + } parentRoot := toHex(ps.FullSignedBeaconBlock.Block().ParentRoot()) slot := int(ps.FullBeaconState.Slot()) if previousSlot == slot { @@ -368,18 +421,28 @@ func (ps *ProcessSlot) provideFinalHash() (string, string, string, error) { if ps.StateRoot != "" { stateRoot = ps.StateRoot } else { - stateRoot = toHex(ps.FullSignedBeaconBlock.Block().StateRoot()) - log.Debug("StateRoot: ", stateRoot) + if nil != ps.FullSignedBeaconBlock { + stateRoot = toHex(ps.FullSignedBeaconBlock.Block().StateRoot()) + log.Debug("BeaconBlock StateRoot: ", stateRoot) + } else { + log.Debug("BeaconBlock StateRoot: ") + } } if ps.BlockRoot != "" { blockRoot = ps.BlockRoot } else { - rawBlockRoot := ps.FullSignedBeaconBlock.Block().HashTreeRoot() - blockRoot = toHex(rawBlockRoot) - log.WithFields(log.Fields{"blockRoot": blockRoot}).Debug("Block Root from ssz") + if nil != ps.FullSignedBeaconBlock { + rawBlockRoot := ps.FullSignedBeaconBlock.Block().HashTreeRoot() + blockRoot = toHex(rawBlockRoot) + log.WithFields(log.Fields{"blockRoot": blockRoot}).Debug("Block Root from ssz") + } else { + log.Debug("BeaconBlock HashTreeRoot: ") + } + } + if nil != ps.FullSignedBeaconBlock { + eth1BlockHash = toHex(ps.FullSignedBeaconBlock.Block().Body().Eth1Data().BlockHash) } - eth1BlockHash = toHex(ps.FullSignedBeaconBlock.Block().Body().Eth1Data().BlockHash) } return blockRoot, stateRoot, eth1BlockHash, nil }