Feature/44 read write historic slots #46

Merged
abdulrabbani00 merged 10 commits from feature/44-read-write-historic-slots into develop 2022-05-24 20:18:55 +00:00
13 changed files with 596 additions and 246 deletions
Showing only changes of commit 88a553efbd - Show all commits

View File

@ -26,7 +26,7 @@ on:
env:
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'main'}}
ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || 'main' }}
ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || 'feature/historic-process' }}
ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }}
GOPATH: /tmp/go
jobs:

View File

@ -47,18 +47,18 @@ func startHeadTracking() {
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver,
bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "historic", testDisregardSync)
Bc, Db, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver,
bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "head", testDisregardSync)
if err != nil {
StopApplicationPreBoot(err, DB)
StopApplicationPreBoot(err, Db)
}
log.Info("The Beacon Client has booted successfully!")
// Capture head blocks
go BC.CaptureHead()
go Bc.CaptureHead()
// Shutdown when the time is right.
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC)
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
} else {

View File

@ -23,6 +23,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
@ -43,10 +44,24 @@ func startHistoricProcessing() {
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
_, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver,
Bc, Db, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver,
bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "historic", testDisregardSync)
if err != nil {
StopApplicationPreBoot(err, DB)
StopApplicationPreBoot(err, Db)
}
errs := Bc.CaptureHistoric(2)
if errs != nil {
log.WithFields(log.Fields{
"TotalErrors": errs,
}).Error("The historical processing service ended after receiving too many errors.")
}
// Shutdown when the time is right.
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc)
if err != nil {
loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!")
} else {
log.Info("Gracefully shutdown ipld-ethcl-indexer")
}
}

View File

@ -94,9 +94,9 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres
HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint),
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
Metrics: &BeaconClientMetrics{
HeadTrackingInserts: 0,
HeadTrackingReorgs: 0,
HeadTrackingKnownGaps: 0,
SlotInserts: 0,
ReorgInserts: 0,
KnownGapsInserts: 0,
HeadError: 0,
HeadReorgError: 0,
},

View File

@ -44,30 +44,10 @@ import (
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql/postgres"
)
type Message struct {
HeadMessage beaconclient.Head // The head messsage that will be streamed to the BeaconClient
TestNotes string // A small explanation of the purpose this structure plays in the testing landscape.
MimicConfig *MimicConfig // A configuration of parameters that you are trying to
SignedBeaconBlock string // The file path output of an SSZ encoded SignedBeaconBlock.
BeaconState string // The file path output of an SSZ encoded BeaconState.
}
// A structure that can be utilized to mimic and existing SSZ object but change it ever so slightly.
// This is used because creating your own SSZ object is a headache.
type MimicConfig struct {
ParentRoot string // The parent root, leave it empty if you want a to use the universal
ForkVersion string // Specify the fork version. This is needed as a workaround to create dummy SignedBeaconBlocks.
}
var _ = Describe("Capturehead", func() {
var (
TestConfig Config
BeaconNodeTester TestBeaconNode
address string = "localhost"
port int = 8080
protocol string = "http"
TestEvents map[string]Message
dbHost string = "localhost"
dbPort int = 8076
dbName string = "vulcanize_testing"
@ -77,9 +57,7 @@ var _ = Describe("Capturehead", func() {
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
knownGapsTableIncrement int = 100000
maxRetry int = 60
)
BeforeEach(func() {
TestEvents = map[string]Message{
"100-dummy": {
HeadMessage: beaconclient.Head{
@ -230,7 +208,24 @@ var _ = Describe("Capturehead", func() {
TestEvents: TestEvents,
TestConfig: TestConfig,
}
})
)
type Message struct {
HeadMessage beaconclient.Head // The head messsage that will be streamed to the BeaconClient
TestNotes string // A small explanation of the purpose this structure plays in the testing landscape.
MimicConfig *MimicConfig // A configuration of parameters that you are trying to
SignedBeaconBlock string // The file path output of an SSZ encoded SignedBeaconBlock.
BeaconState string // The file path output of an SSZ encoded BeaconState.
}
// A structure that can be utilized to mimic and existing SSZ object but change it ever so slightly.
// This is used because creating your own SSZ object is a headache.
type MimicConfig struct {
ParentRoot string // The parent root, leave it empty if you want a to use the universal
ForkVersion string // Specify the fork version. This is needed as a workaround to create dummy SignedBeaconBlocks.
}
var _ = Describe("Capturehead", func() {
Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() {
Context("Correctly formatted Phase0 Block", func() {
@ -254,7 +249,7 @@ var _ = Describe("Capturehead", func() {
validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, "/blocks/QHVAEQRQPBRDMMRRGVRDKNRQGI3TGYLGGYZWKYZXMUYDCMJVG4ZGENRQMVRTCY3BGBRDAMRTGJTDQZTGGQ2GMY3EGRSWINJVMM3TKMRWMU4TMNDF")
})
})
Context("Correctly formatted Altair Test Blocks", Label("now"), func() {
Context("Correctly formatted Altair Test Blocks", func() {
It("Should turn it into a struct successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
@ -491,7 +486,7 @@ func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxR
data, err := json.Marshal(head)
Expect(err).ToNot(HaveOccurred())
startInserts := atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts)
startInserts := atomic.LoadUint64(&bc.Metrics.SlotInserts)
bc.HeadTracking.MessagesCh <- &sse.Event{
ID: []byte{},
Data: data,
@ -499,13 +494,13 @@ func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxR
Retry: []byte{},
}
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts) != startInserts+expectedSuccessfulInserts {
for atomic.LoadUint64(&bc.Metrics.SlotInserts) != startInserts+expectedSuccessfulInserts {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
log.WithFields(log.Fields{
"startInsert": startInserts,
"currentValue": atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts),
"currentValue": atomic.LoadUint64(&bc.Metrics.SlotInserts),
}).Error("HeadTracking Insert wasn't incremented properly.")
Fail("Too many retries have occurred.")
}
@ -517,7 +512,9 @@ func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot strin
sqlStatement := `SELECT epoch, slot, block_root, state_root, status FROM ethcl.slots WHERE slot=$1 AND block_root=$2;`
var epoch, slot int
var blockRoot, stateRoot, status string
log.Debug("Starting to query the ethcl.slots table, ", querySlot, " ", queryBlockRoot)
row := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot)
log.Debug("Querying the ethcl.slots table complete")
err := row.Scan(&epoch, &slot, &blockRoot, &stateRoot, &status)
Expect(err).ToNot(HaveOccurred())
return epoch, slot, blockRoot, stateRoot, status
@ -567,7 +564,7 @@ func queryKnownGaps(db sql.Database, queryStartGap string, QueryEndGap string) (
// A function that will remove all entries from the ethcl tables for you.
func clearEthclDbTables(db sql.Database) {
deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.known_gaps;"}
deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.known_gaps;", "DELETE FROM ethcl.historic_process"}
for _, queries := range deleteQueries {
_, err := db.Exec(context.Background(), queries)
Expect(err).ToNot(HaveOccurred())
@ -670,6 +667,31 @@ func (tbc TestBeaconNode) SetupBeaconNodeMock(TestEvents map[string]Message, pro
return httpmock.NewBytesResponse(200, dat), nil
},
)
blockRootUrl := `=~^` + protocol + "://" + address + ":" + strconv.Itoa(port) + "/eth/v1/beacon/blocks/" + `([^/]+)` + "/root"
httpmock.RegisterResponder("GET", blockRootUrl,
func(req *http.Request) (*http.Response, error) {
// Get ID from request
slot := httpmock.MustGetSubmatch(req, 1)
dat, err := tbc.provideBlockRoot(slot)
if err != nil {
Expect(err).NotTo(HaveOccurred())
return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find block root for %s", slot)), err
}
return httpmock.NewBytesResponse(200, dat), nil
},
)
}
func (tbc TestBeaconNode) provideBlockRoot(slot string) ([]byte, error) {
for _, val := range tbc.TestEvents {
if val.HeadMessage.Slot == slot && val.MimicConfig == nil {
block, err := hex.DecodeString(val.HeadMessage.Block[2:])
Expect(err).ToNot(HaveOccurred())
return block, nil
}
}
return nil, fmt.Errorf("Unable to find the Blockroot in test object.")
}
// A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it.
@ -779,7 +801,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
sendHeadMessage(bc, thirdHead, maxRetry, 1)
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 2 {
for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 2 {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
@ -810,7 +832,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
}
curRetry = 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 3 {
for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 3 {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
@ -818,7 +840,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
}
}
if bc.Metrics.HeadTrackingKnownGaps != 0 {
if bc.Metrics.KnownGapsInserts != 0 {
Fail("We found gaps when processing a single block")
}
@ -837,20 +859,20 @@ func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head b
sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert)
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingKnownGaps) != expectedKnownGaps {
for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedKnownGaps {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
Fail(fmt.Sprintf("Wrong gap metrics, got: %d, wanted %d", bc.Metrics.HeadTrackingKnownGaps, expectedKnownGaps))
Fail(fmt.Sprintf("Wrong gap metrics, got: %d, wanted %d", bc.Metrics.KnownGapsInserts, expectedKnownGaps))
}
}
curRetry = 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != expectedReorgs {
for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != expectedReorgs {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.HeadTrackingKnownGaps, expectedKnownGaps))
Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.KnownGapsInserts, expectedKnownGaps))
}
}
@ -869,7 +891,7 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH
sendHeadMessage(bc, secondHead, maxRetry, 1)
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 1 {
for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 1 {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
@ -877,7 +899,7 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH
}
}
if bc.Metrics.HeadTrackingKnownGaps != 0 {
if bc.Metrics.KnownGapsInserts != 0 {
Fail("We found gaps when processing a single block")
}
@ -898,7 +920,7 @@ func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, t
}
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.HeadTrackingKnownGaps) != expectedEntries {
for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedEntries {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
@ -910,7 +932,7 @@ func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, t
knownGapCount := countKnownGapsTable(bc.Db)
Expect(knownGapCount).To(Equal(int(expectedEntries)))
if atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 0 {
if atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 0 {
Fail("We found reorgs when we didn't expect it")
}
}

View File

@ -17,12 +17,104 @@
package beaconclient
import log "github.com/sirupsen/logrus"
import (
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"golang.org/x/sync/errgroup"
)
// This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHistoric() {
func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error {
log.Info("We are starting the historical processing service.")
go bc.handleHead()
go bc.handleReorg()
bc.captureEventTopic()
hp := historicProcessing{db: bc.Db, metrics: bc.Metrics}
errs := handleBatchProcess(maxWorkers, hp, hp.db, bc.ServerEndpoint, bc.Metrics)
log.Debug("Exiting Historical")
return errs
}
// An interface to enforce any batch processing. Currently there are two use cases for this.
//
// 1. Historic Processing
//
// 2. Known Gaps Processing
type BatchProcessing interface {
getSlotRange(chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write.
handleProcessingErrors(<-chan batchHistoricError)
removeTableEntry(<-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database.
}
// A struct to pass around indicating a table entry for slots to process.
type slotsToProcess struct {
startSlot int // The start slot
endSlot int // The end slot
}
type batchHistoricError struct {
err error // The error that occurred when attempting to a slot
errProcess string // The process that caused the error.
slot int // The slot which the error is for.
}
// Wrapper function for the BatchProcessing interface.
// This function will take the structure that needs batch processing.
// It follows a generic format.
// Get new entries from any given table.
// 1. Add it to the slotsCh.
//
// 2. Run the maximum specified workers to handle individual slots. We need a maximum because we don't want
// To store too many SSZ objects in memory.
//
// 3. Process the slots and send the err to the ErrCh. Each structure can define how it wants its own errors handled.
//
// 4. Remove the slot entry from the DB.
//
// 5. Handle any errors.
func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics) []error {
slotsCh := make(chan slotsToProcess)
workCh := make(chan int)
processedCh := make(chan slotsToProcess)
errCh := make(chan batchHistoricError)
finishCh := make(chan []error, 1)
// Start workers
for w := 1; w <= maxWorkers; w++ {
log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting historic processing workers")
go processSlotRangeWorker(workCh, errCh, db, serverEndpoint, metrics)
}
// Process all ranges and send each individual slot to the worker.
go func() {
for slots := range slotsCh {
for i := slots.startSlot; i <= slots.endSlot; i++ {
workCh <- i
}
processedCh <- slots
}
}()
// Remove entries, end the application if a row cannot be removed..
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
return bp.removeTableEntry(processedCh)
})
if err := errG.Wait(); err != nil {
finishCh <- []error{err}
}
}()
// Process errors from slot processing.
go bp.handleProcessingErrors(errCh)
// Get slots from the DB.
go func() {
errs := bp.getSlotRange(slotsCh) // Periodically adds new entries....
if errs != nil {
finishCh <- errs
}
finishCh <- nil
}()
errs := <-finishCh
log.Debug("Finishing the batchProcess")
return errs
}

View File

@ -0,0 +1,70 @@
package beaconclient_test
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/jarcoal/httpmock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
)
var _ = Describe("Capturehistoric", func() {
Describe("Run the application in historic mode", Label("unit", "behavioral"), func() {
Context("Phase0: When we need to process a single block in the ethcl.historic_process table.", Label("now"), func() {
It("Successfully Process the Block", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10)
log.SetLevel(log.DebugLevel)
BeaconNodeTester.runBatchProcess(bc, 2, 100, 101, 0, 0)
validateSlot(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, "proposed")
//validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE")
//validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW")
//validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE")
//validateBeaconState(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW")
})
})
})
})
// This function will write an even to the ethcl.historic_process table
func (tbc TestBeaconNode) writeEventToHistoricProcess(bc *beaconclient.BeaconClient, startSlot, endSlot, priority int) {
log.Debug("We are writing the necessary events to batch process")
insertHistoricProcessingStmt := `INSERT INTO ethcl.historic_process (start_slot, end_slot, priority)
VALUES ($1, $2, $3);`
res, err := bc.Db.Exec(context.Background(), insertHistoricProcessingStmt, startSlot, endSlot, priority)
Expect(err)
rows, err := res.RowsAffected()
if rows != 1 {
Fail("We didnt write...")
}
Expect(err)
}
// Start the batch processing function, and check for the correct inserted slots.
func (tbc TestBeaconNode) runBatchProcess(bc *beaconclient.BeaconClient, maxWorkers int, startSlot uint64, endSlot uint64, expectedReorgs uint64, expectedKnownGaps uint64) {
go bc.CaptureHistoric(maxWorkers)
diff := endSlot - startSlot + 1
curRetry := 0
for atomic.LoadUint64(&bc.Metrics.SlotInserts) != diff {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
Fail(fmt.Sprintf("Too many retries have occurred. The number of inserts expects %d, the number that actually occurred, %d", atomic.LoadUint64(&bc.Metrics.SlotInserts), diff))
}
}
Expect(atomic.LoadUint64(&bc.Metrics.KnownGapsInserts)).To(Equal(expectedKnownGaps))
Expect(atomic.LoadUint64(&bc.Metrics.ReorgInserts)).To(Equal(expectedKnownGaps))
}

View File

@ -148,8 +148,10 @@ func (dw *DatabaseWriter) writeFullSlot() error {
}).Debug("Starting to write to the DB.")
err := dw.writeSlots()
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Debug("We couldnt write to the ethcl.slots table...")
return err
}
log.Debug("We finished writing to the ethcl.slots table.")
if dw.DbSlots.Status != "skipped" {
err = dw.writeSignedBeaconBlocks()
if err != nil {

View File

@ -21,9 +21,9 @@ import (
// A structure utilized for keeping track of various metrics. Currently, mostly used in testing.
type BeaconClientMetrics struct {
HeadTrackingInserts uint64 // Number of head events we successfully wrote to the DB.
HeadTrackingReorgs uint64 // Number of reorg events we successfully wrote to the DB.
HeadTrackingKnownGaps uint64 // Number of known_gaps we successfully wrote to the DB.
SlotInserts uint64 // Number of head events we successfully wrote to the DB.
ReorgInserts uint64 // Number of reorg events we successfully wrote to the DB.
KnownGapsInserts uint64 // Number of known_gaps we successfully wrote to the DB.
HeadError uint64 // Number of errors that occurred when decoding the head message.
HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message.
}
@ -31,19 +31,19 @@ type BeaconClientMetrics struct {
// Wrapper function to increment inserts. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementHeadTrackingInserts(inc uint64) {
atomic.AddUint64(&m.HeadTrackingInserts, inc)
atomic.AddUint64(&m.SlotInserts, inc)
}
// Wrapper function to increment reorgs. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementHeadTrackingReorgs(inc uint64) {
atomic.AddUint64(&m.HeadTrackingReorgs, inc)
atomic.AddUint64(&m.ReorgInserts, inc)
}
// Wrapper function to increment known gaps. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementHeadTrackingKnownGaps(inc uint64) {
atomic.AddUint64(&m.HeadTrackingKnownGaps, inc)
atomic.AddUint64(&m.KnownGapsInserts, inc)
}
// Wrapper function to increment head errors. If we want to use mutexes later we can easily update all

View File

@ -23,9 +23,6 @@ import (
"strconv"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
"golang.org/x/sync/errgroup"
)
// This function will perform the necessary steps to handle a reorg.
@ -63,19 +60,7 @@ func (bc *BeaconClient) handleHead() {
log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.")
go func(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) {
errG := new(errgroup.Group)
errG.Go(func() error {
err = processHeadSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, metrics, knownGapsTableIncrement)
if err != nil {
return err
}
return nil
})
if err := errG.Wait(); err != nil {
loghelper.LogSlotError(strconv.Itoa(slot), err).Error("Unable to process a slot")
}
}(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement)
go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement)
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.")

View File

@ -13,6 +13,151 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// This file will call all the functions to start and stop capturing the head of the beacon chain.
// This file contains all the code to process historic slots.
package beaconclient
import (
"context"
"fmt"
"strconv"
"time"
"github.com/jackc/pgx/v4"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
var (
// Get a single highest priority and non-checked out row.
getBpEntryStmt string = `SELECT start_slot, end_slot FROM ethcl.historic_process
WHERE checked_out=false
ORDER BY priority ASC
LIMIT 1;`
lockBpEntryStmt string = `UPDATE ethcl.historic_process
SET checked_out=true
WHERE start_slot=$1 AND end_slot=$2;`
deleteSlotsEntryStmt string = `DELETE FROM ethcl.historic_process
WHERE start_slot=$1 AND end_slot=$2;`
)
type historicProcessing struct {
db sql.Database
metrics *BeaconClientMetrics
}
// Get a single row of historical slots from the table.
func (hp historicProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error {
return getBatchProcessRow(hp.db, getBpEntryStmt, lockBpEntryStmt, slotCh)
}
// Remove the table entry.
func (hp historicProcessing) removeTableEntry(processCh <-chan slotsToProcess) error {
return removeRowPostProcess(hp.db, processCh, deleteSlotsEntryStmt)
}
// Remove the table entry.
func (hp historicProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) {
for {
errMs := <-errMessages
writeKnownGaps(hp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, hp.metrics)
}
}
// Process the slot range.
func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics) {
for slot := range workCh {
log.Debug("Handling slot: ", slot)
err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics)
errMs := batchHistoricError{
err: err,
errProcess: errProcess,
slot: slot,
}
if err != nil {
errCh <- errMs
}
}
}
// A wrapper function that insert the start_slot and end_slot from a single row into a channel.
// It also locks the row by updating the checked_out column.
// The statement for getting the start_slot and end_slot must be provided.
// The statement for "locking" the row must also be provided.
func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess) []error {
errCount := make([]error, 0)
for len(errCount) < 5 {
ctx := context.Background()
// Setup TX
tx, err := db.Begin(ctx)
if err != nil {
errCount = append(errCount, err)
continue
}
defer tx.Rollback(ctx)
// Query the DB for slots.
sp := slotsToProcess{}
err = tx.QueryRow(ctx, getStartEndSlotStmt).Scan(&sp.startSlot, &sp.endSlot)
if err != nil {
if err == pgx.ErrNoRows {
time.Sleep(100 * time.Millisecond)
continue
}
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), getStartEndSlotStmt, err).Error("Unable to get a row")
errCount = append(errCount, err)
continue
}
// Checkout the Row
res, err := tx.Exec(ctx, checkOutRowStmt, sp.startSlot, sp.endSlot)
if err != nil {
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).Error("Unable to checkout the row")
errCount = append(errCount, err)
continue
}
rows, err := res.RowsAffected()
if err != nil {
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, fmt.Errorf("Unable to determine the rows affected when trying to checkout a row."))
errCount = append(errCount, err)
continue
}
if rows > 1 {
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).WithFields(log.Fields{
"rowsReturn": rows,
}).Error("We locked too many rows.....")
errCount = append(errCount, err)
continue
}
if rows != 1 {
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).WithFields(log.Fields{
"rowsReturn": rows,
}).Error("We did not lock a single row.")
errCount = append(errCount, err)
continue
}
err = tx.Commit(ctx)
if err != nil {
loghelper.LogSlotRangeError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), err).Error("Unable commit transactions.")
errCount = append(errCount, err)
continue
}
slotCh <- sp
}
return errCount
}
// After a row has been processed it should be removed from its appropriate table.
func removeRowPostProcess(db sql.Database, processCh <-chan slotsToProcess, removeStmt string) error {
for {
slots := <-processCh
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), slots.endSlot)
if err != nil {
return err
}
}
}

View File

@ -78,7 +78,9 @@ type ProcessSlot struct {
}
// This function will do all the work to process the slot and write it to the DB.
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error {
// It will return the error and error process. The error process is used for providing reach detail to the
// known_gaps table.
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) (error, string) {
ps := &ProcessSlot{
Slot: slot,
BlockRoot: blockRoot,
@ -110,8 +112,7 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
})
if err := g.Wait(); err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot", ps.Metrics)
return err
return err, "processSlot"
}
if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" {
@ -122,37 +123,35 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
blockRootEndpoint := serverAddress + BcBlockRootEndpoint(strconv.Itoa(ps.Slot))
dw, err := ps.createWriteObjects(blockRootEndpoint)
if err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "blockRoot", ps.Metrics)
return err
return err, "blockRoot"
}
// Write the object to the DB.
err = dw.writeFullSlot()
if err != nil {
writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot", ps.Metrics)
return err
return err, "processSlot"
}
// Handle any reorgs or skipped slots.
headOrHistoric = strings.ToLower(headOrHistoric)
if headOrHistoric != "head" && headOrHistoric != "historic" {
return fmt.Errorf("headOrHistoric must be either historic or head!")
return fmt.Errorf("headOrHistoric must be either historic or head!"), ""
}
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" {
ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement)
}
return nil
return nil, ""
}
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error {
return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement)
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) {
err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement)
writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics)
}
// Handle a historic slot. A wrapper function for calling `handleFullSlot`.
// Commented because of the linter...... LOL
//func handleHistoricSlot(db sql.Database, serverAddress string, slot int) error {
// return handleFullSlot(db, serverAddress, slot, "", "", 0, "", "historic")
//}
func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics) (error, string) {
return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1)
}
// Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values.
func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *dt.VersionedUnmarshaler) error {
@ -289,10 +288,13 @@ func (ps *ProcessSlot) createWriteObjects(blockRootEndpoint string) (*DatabaseWr
blockRoot = ps.BlockRoot
} else {
var err error
blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot))
rawBlockRoot, err := ps.FullSignedBeaconBlock.Block().HashTreeRoot()
//blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot))
if err != nil {
return nil, err
}
blockRoot = "0x" + hex.EncodeToString(rawBlockRoot[:])
log.WithFields(log.Fields{"blockRoot": blockRoot}).Debug("Block Root from ssz:")
}
eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().Body().Eth1Data().BlockHash)
}

View File

@ -27,9 +27,26 @@ func LogError(err error) *log.Entry {
})
}
// A simple herlper function to log slot and error.
func LogSlotError(slot string, err error) *log.Entry {
return log.WithFields(log.Fields{
"err": err,
"slot": slot,
})
}
func LogSlotRangeError(startSlot string, endSlot string, err error) *log.Entry {
return log.WithFields(log.Fields{
"err": err,
"startSlot": startSlot,
"endSlot": endSlot,
})
}
func LogSlotRangeStatementError(startSlot string, endSlot string, statement string, err error) *log.Entry {
return log.WithFields(log.Fields{
"err": err,
"startSlot": startSlot,
"endSlot": endSlot,
"SqlStatement": statement,
})
}