diff --git a/.github/workflows/on-pr.yml b/.github/workflows/on-pr.yml index cc90129..5042cd6 100644 --- a/.github/workflows/on-pr.yml +++ b/.github/workflows/on-pr.yml @@ -26,7 +26,7 @@ on: env: stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'main'}} - ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || 'main' }} + ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || 'feature/historic-process' }} ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }} GOPATH: /tmp/go jobs: diff --git a/cmd/head.go b/cmd/head.go index 6d6279c..2f360bf 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -47,18 +47,18 @@ func startHeadTracking() { log.Info("Starting the application in head tracking mode.") ctx := context.Background() - BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, - bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "historic", testDisregardSync) + Bc, Db, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, + bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "head", testDisregardSync) if err != nil { - StopApplicationPreBoot(err, DB) + StopApplicationPreBoot(err, Db) } log.Info("The Beacon Client has booted successfully!") // Capture head blocks - go BC.CaptureHead() + go Bc.CaptureHead() // Shutdown when the time is right. - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") } else { diff --git a/cmd/historic.go b/cmd/historic.go index 48cc350..c7fb93f 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -23,6 +23,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/vulcanize/ipld-ethcl-indexer/internal/boot" + "github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" ) @@ -43,10 +44,24 @@ func startHistoricProcessing() { log.Info("Starting the application in head tracking mode.") ctx := context.Background() - _, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, + Bc, Db, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "historic", testDisregardSync) if err != nil { - StopApplicationPreBoot(err, DB) + StopApplicationPreBoot(err, Db) + } + errs := Bc.CaptureHistoric(2) + if errs != nil { + log.WithFields(log.Fields{ + "TotalErrors": errs, + }).Error("The historical processing service ended after receiving too many errors.") + } + + // Shutdown when the time is right. + err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) + if err != nil { + loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") + } else { + log.Info("Gracefully shutdown ipld-ethcl-indexer") } } diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index f851221..d277201 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -94,11 +94,11 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint), ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), Metrics: &BeaconClientMetrics{ - HeadTrackingInserts: 0, - HeadTrackingReorgs: 0, - HeadTrackingKnownGaps: 0, - HeadError: 0, - HeadReorgError: 0, + SlotInserts: 0, + ReorgInserts: 0, + KnownGapsInserts: 0, + HeadError: 0, + HeadReorgError: 0, }, //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), } diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index 8a74662..6be631f 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -44,6 +44,172 @@ import ( "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql/postgres" ) +var ( + address string = "localhost" + port int = 8080 + protocol string = "http" + dbHost string = "localhost" + dbPort int = 8076 + dbName string = "vulcanize_testing" + dbUser string = "vdbm" + dbPassword string = "password" + dbDriver string = "pgx" + dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42" + knownGapsTableIncrement int = 100000 + maxRetry int = 60 + + TestEvents = map[string]Message{ + "100-dummy": { + HeadMessage: beaconclient.Head{ + Slot: "100", + Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a92d52a63b919b", + State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c0723c92c1782bfa", + CurrentDutyDependentRoot: "", + PreviousDutyDependentRoot: "", + EpochTransition: false, + ExecutionOptimistic: false, + }, + TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.", + MimicConfig: &MimicConfig{ + ForkVersion: "phase0", + }, + SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"), + }, + "100-dummy-2": { + HeadMessage: beaconclient.Head{ + Slot: "100", + Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a9aaaaaaaaaaaa", + State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c072bbbbbbbbbbbb", + CurrentDutyDependentRoot: "", + PreviousDutyDependentRoot: "", + EpochTransition: false, + ExecutionOptimistic: false, + }, + TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.", + MimicConfig: &MimicConfig{ + ForkVersion: "phase0", + }, + SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"), + }, + "102-wrong-ssz-1": { + HeadMessage: beaconclient.Head{ + Slot: "102", + Block: "0x46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42", + State: "0x9b20b114c613c1aa462e02d590b3da902b0a1377e938ed0f94dd3491d763ef67", + CurrentDutyDependentRoot: "", + PreviousDutyDependentRoot: "", + EpochTransition: false, + ExecutionOptimistic: false, + }, + TestNotes: "A bad block that returns the wrong ssz objects, used for testing incorrect SSZ decoding.", + BeaconState: filepath.Join("ssz-data", "102", "signed-beacon-block.ssz"), + SignedBeaconBlock: filepath.Join("ssz-data", "102", "beacon-state.ssz"), + }, + "100": { + HeadMessage: beaconclient.Head{ + Slot: "100", + Block: "0x582187e97f7520bb69eea014c3834c964c45259372a0eaaea3f032013797996b", + State: "0xf286a0379c0386a3c7be28d05d829f8eb7b280cc9ede15449af20ebcd06a7a56", + CurrentDutyDependentRoot: "", + PreviousDutyDependentRoot: "", + EpochTransition: false, + ExecutionOptimistic: false, + }, + TestNotes: "An easy to process Phase 0 block", + SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"), + }, + "101": { + HeadMessage: beaconclient.Head{ + Slot: "101", + Block: "0xabe1a972e512182d04f0d4a5c9c25f9ee57c2e9d0ff3f4c4c82fd42d13d31083", + State: "0xcb04aa2edbf13c7bb7e7bd9b621ced6832e0075e89147352eac3019a824ce847", + CurrentDutyDependentRoot: "", + PreviousDutyDependentRoot: "", + EpochTransition: false, + ExecutionOptimistic: false, + }, + TestNotes: "An easy to process Phase 0 block", + SignedBeaconBlock: filepath.Join("ssz-data", "101", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "101", "beacon-state.ssz"), + }, + "2375703-dummy": { + HeadMessage: beaconclient.Head{ + Slot: "2375703", + Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508a8e648e", + State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d265aef5f", + CurrentDutyDependentRoot: "", + PreviousDutyDependentRoot: "", + EpochTransition: false, + ExecutionOptimistic: false, + }, + TestNotes: "This is a dummy message that is used for reorgs", + MimicConfig: &MimicConfig{ + ForkVersion: "altair", + }, + SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"), + }, + "2375703-dummy-2": { + HeadMessage: beaconclient.Head{ + Slot: "2375703", + Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508aaaaaaa", + State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d2bbbbbbb", + CurrentDutyDependentRoot: "", + PreviousDutyDependentRoot: "", + EpochTransition: false, + ExecutionOptimistic: false, + }, + TestNotes: "This is a dummy message that is used for reorgs", + MimicConfig: &MimicConfig{ + ForkVersion: "altair", + }, + SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"), + }, + "2375703": { + HeadMessage: beaconclient.Head{ + Slot: "2375703", + Block: "0x4392372c5f6e39499e31bf924388b5815639103149f0f54f8a453773b1802301", + State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e", + CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false}, + TestNotes: "An easy to process Altair Block", + SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"), + }, + "3797056": { + HeadMessage: beaconclient.Head{ + Slot: "3797056", + Block: "", + State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e", + CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false}, + TestNotes: "An easy to process Altair Block", + SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"), + }, + } + TestConfig = Config{ + protocol: protocol, + address: address, + port: port, + dummyParentRoot: dummyParentRoot, + dbHost: dbHost, + dbPort: dbPort, + dbName: dbName, + dbUser: dbUser, + dbPassword: dbPassword, + dbDriver: dbDriver, + knownGapsTableIncrement: knownGapsTableIncrement, + } + + BeaconNodeTester = TestBeaconNode{ + TestEvents: TestEvents, + TestConfig: TestConfig, + } +) + type Message struct { HeadMessage beaconclient.Head // The head messsage that will be streamed to the BeaconClient TestNotes string // A small explanation of the purpose this structure plays in the testing landscape. @@ -61,177 +227,6 @@ type MimicConfig struct { var _ = Describe("Capturehead", func() { - var ( - TestConfig Config - BeaconNodeTester TestBeaconNode - address string = "localhost" - port int = 8080 - protocol string = "http" - TestEvents map[string]Message - dbHost string = "localhost" - dbPort int = 8076 - dbName string = "vulcanize_testing" - dbUser string = "vdbm" - dbPassword string = "password" - dbDriver string = "pgx" - dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42" - knownGapsTableIncrement int = 100000 - maxRetry int = 60 - ) - - BeforeEach(func() { - TestEvents = map[string]Message{ - "100-dummy": { - HeadMessage: beaconclient.Head{ - Slot: "100", - Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a92d52a63b919b", - State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c0723c92c1782bfa", - CurrentDutyDependentRoot: "", - PreviousDutyDependentRoot: "", - EpochTransition: false, - ExecutionOptimistic: false, - }, - TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.", - MimicConfig: &MimicConfig{ - ForkVersion: "phase0", - }, - SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"), - BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"), - }, - "100-dummy-2": { - HeadMessage: beaconclient.Head{ - Slot: "100", - Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a9aaaaaaaaaaaa", - State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c072bbbbbbbbbbbb", - CurrentDutyDependentRoot: "", - PreviousDutyDependentRoot: "", - EpochTransition: false, - ExecutionOptimistic: false, - }, - TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.", - MimicConfig: &MimicConfig{ - ForkVersion: "phase0", - }, - SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"), - BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"), - }, - "102-wrong-ssz-1": { - HeadMessage: beaconclient.Head{ - Slot: "102", - Block: "0x46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42", - State: "0x9b20b114c613c1aa462e02d590b3da902b0a1377e938ed0f94dd3491d763ef67", - CurrentDutyDependentRoot: "", - PreviousDutyDependentRoot: "", - EpochTransition: false, - ExecutionOptimistic: false, - }, - TestNotes: "A bad block that returns the wrong ssz objects, used for testing incorrect SSZ decoding.", - BeaconState: filepath.Join("ssz-data", "102", "signed-beacon-block.ssz"), - SignedBeaconBlock: filepath.Join("ssz-data", "102", "beacon-state.ssz"), - }, - "100": { - HeadMessage: beaconclient.Head{ - Slot: "100", - Block: "0x582187e97f7520bb69eea014c3834c964c45259372a0eaaea3f032013797996b", - State: "0xf286a0379c0386a3c7be28d05d829f8eb7b280cc9ede15449af20ebcd06a7a56", - CurrentDutyDependentRoot: "", - PreviousDutyDependentRoot: "", - EpochTransition: false, - ExecutionOptimistic: false, - }, - TestNotes: "An easy to process Phase 0 block", - SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"), - BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"), - }, - "101": { - HeadMessage: beaconclient.Head{ - Slot: "101", - Block: "0xabe1a972e512182d04f0d4a5c9c25f9ee57c2e9d0ff3f4c4c82fd42d13d31083", - State: "0xcb04aa2edbf13c7bb7e7bd9b621ced6832e0075e89147352eac3019a824ce847", - CurrentDutyDependentRoot: "", - PreviousDutyDependentRoot: "", - EpochTransition: false, - ExecutionOptimistic: false, - }, - TestNotes: "An easy to process Phase 0 block", - SignedBeaconBlock: filepath.Join("ssz-data", "101", "signed-beacon-block.ssz"), - BeaconState: filepath.Join("ssz-data", "101", "beacon-state.ssz"), - }, - "2375703-dummy": { - HeadMessage: beaconclient.Head{ - Slot: "2375703", - Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508a8e648e", - State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d265aef5f", - CurrentDutyDependentRoot: "", - PreviousDutyDependentRoot: "", - EpochTransition: false, - ExecutionOptimistic: false, - }, - TestNotes: "This is a dummy message that is used for reorgs", - MimicConfig: &MimicConfig{ - ForkVersion: "altair", - }, - SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"), - BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"), - }, - "2375703-dummy-2": { - HeadMessage: beaconclient.Head{ - Slot: "2375703", - Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508aaaaaaa", - State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d2bbbbbbb", - CurrentDutyDependentRoot: "", - PreviousDutyDependentRoot: "", - EpochTransition: false, - ExecutionOptimistic: false, - }, - TestNotes: "This is a dummy message that is used for reorgs", - MimicConfig: &MimicConfig{ - ForkVersion: "altair", - }, - SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"), - BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"), - }, - "2375703": { - HeadMessage: beaconclient.Head{ - Slot: "2375703", - Block: "0x4392372c5f6e39499e31bf924388b5815639103149f0f54f8a453773b1802301", - State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e", - CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false}, - TestNotes: "An easy to process Altair Block", - SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"), - BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"), - }, - "3797056": { - HeadMessage: beaconclient.Head{ - Slot: "3797056", - Block: "", - State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e", - CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false}, - TestNotes: "An easy to process Altair Block", - SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"), - BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"), - }, - } - TestConfig = Config{ - protocol: protocol, - address: address, - port: port, - dummyParentRoot: dummyParentRoot, - dbHost: dbHost, - dbPort: dbPort, - dbName: dbName, - dbUser: dbUser, - dbPassword: dbPassword, - dbDriver: dbDriver, - knownGapsTableIncrement: knownGapsTableIncrement, - } - - BeaconNodeTester = TestBeaconNode{ - TestEvents: TestEvents, - TestConfig: TestConfig, - } - }) - Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() { Context("Correctly formatted Phase0 Block", func() { It("Should turn it into a struct successfully.", func() { @@ -254,7 +249,7 @@ var _ = Describe("Capturehead", func() { validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, "/blocks/QHVAEQRQPBRDMMRRGVRDKNRQGI3TGYLGGYZWKYZXMUYDCMJVG4ZGENRQMVRTCY3BGBRDAMRTGJTDQZTGGQ2GMY3EGRSWINJVMM3TKMRWMU4TMNDF") }) }) - Context("Correctly formatted Altair Test Blocks", Label("now"), func() { + Context("Correctly formatted Altair Test Blocks", func() { It("Should turn it into a struct successfully.", func() { bc := setUpTest(BeaconNodeTester.TestConfig, "2375702") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) @@ -491,7 +486,7 @@ func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxR data, err := json.Marshal(head) Expect(err).ToNot(HaveOccurred()) - startInserts := atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts) + startInserts := atomic.LoadUint64(&bc.Metrics.SlotInserts) bc.HeadTracking.MessagesCh <- &sse.Event{ ID: []byte{}, Data: data, @@ -499,13 +494,13 @@ func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxR Retry: []byte{}, } curRetry := 0 - for atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts) != startInserts+expectedSuccessfulInserts { + for atomic.LoadUint64(&bc.Metrics.SlotInserts) != startInserts+expectedSuccessfulInserts { time.Sleep(1 * time.Second) curRetry = curRetry + 1 if curRetry == maxRetry { log.WithFields(log.Fields{ "startInsert": startInserts, - "currentValue": atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts), + "currentValue": atomic.LoadUint64(&bc.Metrics.SlotInserts), }).Error("HeadTracking Insert wasn't incremented properly.") Fail("Too many retries have occurred.") } @@ -517,7 +512,9 @@ func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot strin sqlStatement := `SELECT epoch, slot, block_root, state_root, status FROM ethcl.slots WHERE slot=$1 AND block_root=$2;` var epoch, slot int var blockRoot, stateRoot, status string + log.Debug("Starting to query the ethcl.slots table, ", querySlot, " ", queryBlockRoot) row := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot) + log.Debug("Querying the ethcl.slots table complete") err := row.Scan(&epoch, &slot, &blockRoot, &stateRoot, &status) Expect(err).ToNot(HaveOccurred()) return epoch, slot, blockRoot, stateRoot, status @@ -567,7 +564,7 @@ func queryKnownGaps(db sql.Database, queryStartGap string, QueryEndGap string) ( // A function that will remove all entries from the ethcl tables for you. func clearEthclDbTables(db sql.Database) { - deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.known_gaps;"} + deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.known_gaps;", "DELETE FROM ethcl.historic_process"} for _, queries := range deleteQueries { _, err := db.Exec(context.Background(), queries) Expect(err).ToNot(HaveOccurred()) @@ -670,6 +667,31 @@ func (tbc TestBeaconNode) SetupBeaconNodeMock(TestEvents map[string]Message, pro return httpmock.NewBytesResponse(200, dat), nil }, ) + blockRootUrl := `=~^` + protocol + "://" + address + ":" + strconv.Itoa(port) + "/eth/v1/beacon/blocks/" + `([^/]+)` + "/root" + httpmock.RegisterResponder("GET", blockRootUrl, + func(req *http.Request) (*http.Response, error) { + // Get ID from request + slot := httpmock.MustGetSubmatch(req, 1) + dat, err := tbc.provideBlockRoot(slot) + if err != nil { + Expect(err).NotTo(HaveOccurred()) + return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find block root for %s", slot)), err + } + return httpmock.NewBytesResponse(200, dat), nil + }, + ) +} + +func (tbc TestBeaconNode) provideBlockRoot(slot string) ([]byte, error) { + + for _, val := range tbc.TestEvents { + if val.HeadMessage.Slot == slot && val.MimicConfig == nil { + block, err := hex.DecodeString(val.HeadMessage.Block[2:]) + Expect(err).ToNot(HaveOccurred()) + return block, nil + } + } + return nil, fmt.Errorf("Unable to find the Blockroot in test object.") } // A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it. @@ -779,7 +801,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs sendHeadMessage(bc, thirdHead, maxRetry, 1) curRetry := 0 - for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 2 { + for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 2 { time.Sleep(1 * time.Second) curRetry = curRetry + 1 if curRetry == maxRetry { @@ -810,7 +832,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs } curRetry = 0 - for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 3 { + for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 3 { time.Sleep(1 * time.Second) curRetry = curRetry + 1 if curRetry == maxRetry { @@ -818,7 +840,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs } } - if bc.Metrics.HeadTrackingKnownGaps != 0 { + if bc.Metrics.KnownGapsInserts != 0 { Fail("We found gaps when processing a single block") } @@ -837,20 +859,20 @@ func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head b sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert) curRetry := 0 - for atomic.LoadUint64(&bc.Metrics.HeadTrackingKnownGaps) != expectedKnownGaps { + for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedKnownGaps { time.Sleep(1 * time.Second) curRetry = curRetry + 1 if curRetry == maxRetry { - Fail(fmt.Sprintf("Wrong gap metrics, got: %d, wanted %d", bc.Metrics.HeadTrackingKnownGaps, expectedKnownGaps)) + Fail(fmt.Sprintf("Wrong gap metrics, got: %d, wanted %d", bc.Metrics.KnownGapsInserts, expectedKnownGaps)) } } curRetry = 0 - for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != expectedReorgs { + for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != expectedReorgs { time.Sleep(1 * time.Second) curRetry = curRetry + 1 if curRetry == maxRetry { - Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.HeadTrackingKnownGaps, expectedKnownGaps)) + Fail(fmt.Sprintf("Wrong reorg metrics, got: %d, wanted %d", bc.Metrics.KnownGapsInserts, expectedKnownGaps)) } } @@ -869,7 +891,7 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH sendHeadMessage(bc, secondHead, maxRetry, 1) curRetry := 0 - for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 1 { + for atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 1 { time.Sleep(1 * time.Second) curRetry = curRetry + 1 if curRetry == maxRetry { @@ -877,7 +899,7 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH } } - if bc.Metrics.HeadTrackingKnownGaps != 0 { + if bc.Metrics.KnownGapsInserts != 0 { Fail("We found gaps when processing a single block") } @@ -898,7 +920,7 @@ func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, t } curRetry := 0 - for atomic.LoadUint64(&bc.Metrics.HeadTrackingKnownGaps) != expectedEntries { + for atomic.LoadUint64(&bc.Metrics.KnownGapsInserts) != expectedEntries { time.Sleep(1 * time.Second) curRetry = curRetry + 1 if curRetry == maxRetry { @@ -910,7 +932,7 @@ func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, t knownGapCount := countKnownGapsTable(bc.Db) Expect(knownGapCount).To(Equal(int(expectedEntries))) - if atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 0 { + if atomic.LoadUint64(&bc.Metrics.ReorgInserts) != 0 { Fail("We found reorgs when we didn't expect it") } } diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index 3d3fe4c..f210952 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -17,12 +17,104 @@ package beaconclient -import log "github.com/sirupsen/logrus" +import ( + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" + "golang.org/x/sync/errgroup" +) // This function will perform all the heavy lifting for tracking the head of the chain. -func (bc *BeaconClient) CaptureHistoric() { +func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error { log.Info("We are starting the historical processing service.") - go bc.handleHead() - go bc.handleReorg() - bc.captureEventTopic() + hp := historicProcessing{db: bc.Db, metrics: bc.Metrics} + errs := handleBatchProcess(maxWorkers, hp, hp.db, bc.ServerEndpoint, bc.Metrics) + log.Debug("Exiting Historical") + return errs +} + +// An interface to enforce any batch processing. Currently there are two use cases for this. +// +// 1. Historic Processing +// +// 2. Known Gaps Processing +type BatchProcessing interface { + getSlotRange(chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write. + handleProcessingErrors(<-chan batchHistoricError) + removeTableEntry(<-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database. +} + +// A struct to pass around indicating a table entry for slots to process. +type slotsToProcess struct { + startSlot int // The start slot + endSlot int // The end slot +} + +type batchHistoricError struct { + err error // The error that occurred when attempting to a slot + errProcess string // The process that caused the error. + slot int // The slot which the error is for. +} + +// Wrapper function for the BatchProcessing interface. +// This function will take the structure that needs batch processing. +// It follows a generic format. +// Get new entries from any given table. +// 1. Add it to the slotsCh. +// +// 2. Run the maximum specified workers to handle individual slots. We need a maximum because we don't want +// To store too many SSZ objects in memory. +// +// 3. Process the slots and send the err to the ErrCh. Each structure can define how it wants its own errors handled. +// +// 4. Remove the slot entry from the DB. +// +// 5. Handle any errors. +func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics) []error { + slotsCh := make(chan slotsToProcess) + workCh := make(chan int) + processedCh := make(chan slotsToProcess) + errCh := make(chan batchHistoricError) + finishCh := make(chan []error, 1) + + // Start workers + for w := 1; w <= maxWorkers; w++ { + log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting historic processing workers") + go processSlotRangeWorker(workCh, errCh, db, serverEndpoint, metrics) + } + + // Process all ranges and send each individual slot to the worker. + go func() { + for slots := range slotsCh { + for i := slots.startSlot; i <= slots.endSlot; i++ { + workCh <- i + } + processedCh <- slots + } + }() + + // Remove entries, end the application if a row cannot be removed.. + go func() { + errG := new(errgroup.Group) + errG.Go(func() error { + return bp.removeTableEntry(processedCh) + }) + if err := errG.Wait(); err != nil { + finishCh <- []error{err} + } + }() + // Process errors from slot processing. + go bp.handleProcessingErrors(errCh) + + // Get slots from the DB. + go func() { + errs := bp.getSlotRange(slotsCh) // Periodically adds new entries.... + if errs != nil { + finishCh <- errs + } + finishCh <- nil + }() + + errs := <-finishCh + log.Debug("Finishing the batchProcess") + return errs } diff --git a/pkg/beaconclient/capturehistoric_test.go b/pkg/beaconclient/capturehistoric_test.go new file mode 100644 index 0000000..382a182 --- /dev/null +++ b/pkg/beaconclient/capturehistoric_test.go @@ -0,0 +1,70 @@ +package beaconclient_test + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + "github.com/jarcoal/httpmock" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient" +) + +var _ = Describe("Capturehistoric", func() { + + Describe("Run the application in historic mode", Label("unit", "behavioral"), func() { + Context("Phase0: When we need to process a single block in the ethcl.historic_process table.", Label("now"), func() { + It("Successfully Process the Block", func() { + bc := setUpTest(BeaconNodeTester.TestConfig, "99") + BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) + defer httpmock.DeactivateAndReset() + BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10) + log.SetLevel(log.DebugLevel) + BeaconNodeTester.runBatchProcess(bc, 2, 100, 101, 0, 0) + + validateSlot(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, "proposed") + //validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE") + //validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW") + + //validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE") + //validateBeaconState(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW") + + }) + }) + }) +}) + +// This function will write an even to the ethcl.historic_process table +func (tbc TestBeaconNode) writeEventToHistoricProcess(bc *beaconclient.BeaconClient, startSlot, endSlot, priority int) { + log.Debug("We are writing the necessary events to batch process") + insertHistoricProcessingStmt := `INSERT INTO ethcl.historic_process (start_slot, end_slot, priority) + VALUES ($1, $2, $3);` + res, err := bc.Db.Exec(context.Background(), insertHistoricProcessingStmt, startSlot, endSlot, priority) + Expect(err) + rows, err := res.RowsAffected() + if rows != 1 { + Fail("We didnt write...") + } + Expect(err) +} + +// Start the batch processing function, and check for the correct inserted slots. +func (tbc TestBeaconNode) runBatchProcess(bc *beaconclient.BeaconClient, maxWorkers int, startSlot uint64, endSlot uint64, expectedReorgs uint64, expectedKnownGaps uint64) { + go bc.CaptureHistoric(maxWorkers) + diff := endSlot - startSlot + 1 + + curRetry := 0 + for atomic.LoadUint64(&bc.Metrics.SlotInserts) != diff { + time.Sleep(1 * time.Second) + curRetry = curRetry + 1 + if curRetry == maxRetry { + Fail(fmt.Sprintf("Too many retries have occurred. The number of inserts expects %d, the number that actually occurred, %d", atomic.LoadUint64(&bc.Metrics.SlotInserts), diff)) + } + } + + Expect(atomic.LoadUint64(&bc.Metrics.KnownGapsInserts)).To(Equal(expectedKnownGaps)) + Expect(atomic.LoadUint64(&bc.Metrics.ReorgInserts)).To(Equal(expectedKnownGaps)) +} diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go index 84d160e..06fa951 100644 --- a/pkg/beaconclient/databasewrite.go +++ b/pkg/beaconclient/databasewrite.go @@ -148,8 +148,10 @@ func (dw *DatabaseWriter) writeFullSlot() error { }).Debug("Starting to write to the DB.") err := dw.writeSlots() if err != nil { + loghelper.LogSlotError(dw.DbSlots.Slot, err).Debug("We couldnt write to the ethcl.slots table...") return err } + log.Debug("We finished writing to the ethcl.slots table.") if dw.DbSlots.Status != "skipped" { err = dw.writeSignedBeaconBlocks() if err != nil { diff --git a/pkg/beaconclient/metrics.go b/pkg/beaconclient/metrics.go index ecf8da5..7c0614a 100644 --- a/pkg/beaconclient/metrics.go +++ b/pkg/beaconclient/metrics.go @@ -21,29 +21,29 @@ import ( // A structure utilized for keeping track of various metrics. Currently, mostly used in testing. type BeaconClientMetrics struct { - HeadTrackingInserts uint64 // Number of head events we successfully wrote to the DB. - HeadTrackingReorgs uint64 // Number of reorg events we successfully wrote to the DB. - HeadTrackingKnownGaps uint64 // Number of known_gaps we successfully wrote to the DB. - HeadError uint64 // Number of errors that occurred when decoding the head message. - HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message. + SlotInserts uint64 // Number of head events we successfully wrote to the DB. + ReorgInserts uint64 // Number of reorg events we successfully wrote to the DB. + KnownGapsInserts uint64 // Number of known_gaps we successfully wrote to the DB. + HeadError uint64 // Number of errors that occurred when decoding the head message. + HeadReorgError uint64 // Number of errors that occurred when decoding the reorg message. } // Wrapper function to increment inserts. If we want to use mutexes later we can easily update all // occurrences here. func (m *BeaconClientMetrics) IncrementHeadTrackingInserts(inc uint64) { - atomic.AddUint64(&m.HeadTrackingInserts, inc) + atomic.AddUint64(&m.SlotInserts, inc) } // Wrapper function to increment reorgs. If we want to use mutexes later we can easily update all // occurrences here. func (m *BeaconClientMetrics) IncrementHeadTrackingReorgs(inc uint64) { - atomic.AddUint64(&m.HeadTrackingReorgs, inc) + atomic.AddUint64(&m.ReorgInserts, inc) } // Wrapper function to increment known gaps. If we want to use mutexes later we can easily update all // occurrences here. func (m *BeaconClientMetrics) IncrementHeadTrackingKnownGaps(inc uint64) { - atomic.AddUint64(&m.HeadTrackingKnownGaps, inc) + atomic.AddUint64(&m.KnownGapsInserts, inc) } // Wrapper function to increment head errors. If we want to use mutexes later we can easily update all diff --git a/pkg/beaconclient/processevents.go b/pkg/beaconclient/processevents.go index 15e4e16..0af605e 100644 --- a/pkg/beaconclient/processevents.go +++ b/pkg/beaconclient/processevents.go @@ -23,9 +23,6 @@ import ( "strconv" log "github.com/sirupsen/logrus" - "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" - "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" - "golang.org/x/sync/errgroup" ) // This function will perform the necessary steps to handle a reorg. @@ -63,19 +60,7 @@ func (bc *BeaconClient) handleHead() { log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.") - go func(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) { - errG := new(errgroup.Group) - errG.Go(func() error { - err = processHeadSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, metrics, knownGapsTableIncrement) - if err != nil { - return err - } - return nil - }) - if err := errG.Wait(); err != nil { - loghelper.LogSlotError(strconv.Itoa(slot), err).Error("Unable to process a slot") - } - }(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement) + go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement) log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.") diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index f58688e..77e4010 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -13,6 +13,151 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -// This file will call all the functions to start and stop capturing the head of the beacon chain. + +// This file contains all the code to process historic slots. package beaconclient + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/jackc/pgx/v4" + log "github.com/sirupsen/logrus" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" +) + +var ( + // Get a single highest priority and non-checked out row. + getBpEntryStmt string = `SELECT start_slot, end_slot FROM ethcl.historic_process + WHERE checked_out=false + ORDER BY priority ASC + LIMIT 1;` + lockBpEntryStmt string = `UPDATE ethcl.historic_process + SET checked_out=true + WHERE start_slot=$1 AND end_slot=$2;` + deleteSlotsEntryStmt string = `DELETE FROM ethcl.historic_process + WHERE start_slot=$1 AND end_slot=$2;` +) + +type historicProcessing struct { + db sql.Database + metrics *BeaconClientMetrics +} + +// Get a single row of historical slots from the table. +func (hp historicProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { + return getBatchProcessRow(hp.db, getBpEntryStmt, lockBpEntryStmt, slotCh) +} + +// Remove the table entry. +func (hp historicProcessing) removeTableEntry(processCh <-chan slotsToProcess) error { + return removeRowPostProcess(hp.db, processCh, deleteSlotsEntryStmt) +} + +// Remove the table entry. +func (hp historicProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) { + for { + errMs := <-errMessages + writeKnownGaps(hp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, hp.metrics) + } +} + +// Process the slot range. +func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics) { + for slot := range workCh { + log.Debug("Handling slot: ", slot) + err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics) + errMs := batchHistoricError{ + err: err, + errProcess: errProcess, + slot: slot, + } + if err != nil { + errCh <- errMs + } + } +} + +// A wrapper function that insert the start_slot and end_slot from a single row into a channel. +// It also locks the row by updating the checked_out column. +// The statement for getting the start_slot and end_slot must be provided. +// The statement for "locking" the row must also be provided. +func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess) []error { + errCount := make([]error, 0) + + for len(errCount) < 5 { + ctx := context.Background() + + // Setup TX + tx, err := db.Begin(ctx) + if err != nil { + errCount = append(errCount, err) + continue + } + defer tx.Rollback(ctx) + + // Query the DB for slots. + sp := slotsToProcess{} + err = tx.QueryRow(ctx, getStartEndSlotStmt).Scan(&sp.startSlot, &sp.endSlot) + if err != nil { + if err == pgx.ErrNoRows { + time.Sleep(100 * time.Millisecond) + continue + } + loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), getStartEndSlotStmt, err).Error("Unable to get a row") + errCount = append(errCount, err) + continue + } + + // Checkout the Row + res, err := tx.Exec(ctx, checkOutRowStmt, sp.startSlot, sp.endSlot) + if err != nil { + loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).Error("Unable to checkout the row") + errCount = append(errCount, err) + continue + } + rows, err := res.RowsAffected() + if err != nil { + loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, fmt.Errorf("Unable to determine the rows affected when trying to checkout a row.")) + errCount = append(errCount, err) + continue + } + if rows > 1 { + loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).WithFields(log.Fields{ + "rowsReturn": rows, + }).Error("We locked too many rows.....") + errCount = append(errCount, err) + continue + } + if rows != 1 { + loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).WithFields(log.Fields{ + "rowsReturn": rows, + }).Error("We did not lock a single row.") + errCount = append(errCount, err) + continue + } + err = tx.Commit(ctx) + if err != nil { + loghelper.LogSlotRangeError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), err).Error("Unable commit transactions.") + errCount = append(errCount, err) + continue + } + slotCh <- sp + } + return errCount +} + +// After a row has been processed it should be removed from its appropriate table. +func removeRowPostProcess(db sql.Database, processCh <-chan slotsToProcess, removeStmt string) error { + for { + slots := <-processCh + _, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), slots.endSlot) + if err != nil { + return err + } + } +} diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index 4e60109..3e76e51 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -78,7 +78,9 @@ type ProcessSlot struct { } // This function will do all the work to process the slot and write it to the DB. -func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error { +// It will return the error and error process. The error process is used for providing reach detail to the +// known_gaps table. +func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) (error, string) { ps := &ProcessSlot{ Slot: slot, BlockRoot: blockRoot, @@ -110,8 +112,7 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot }) if err := g.Wait(); err != nil { - writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot", ps.Metrics) - return err + return err, "processSlot" } if ps.HeadOrHistoric == "head" && previousSlot == 0 && previousBlockRoot == "" { @@ -122,37 +123,35 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot blockRootEndpoint := serverAddress + BcBlockRootEndpoint(strconv.Itoa(ps.Slot)) dw, err := ps.createWriteObjects(blockRootEndpoint) if err != nil { - writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "blockRoot", ps.Metrics) - return err + return err, "blockRoot" } // Write the object to the DB. err = dw.writeFullSlot() if err != nil { - writeKnownGaps(ps.Db, 1, ps.Slot, ps.Slot, err, "processSlot", ps.Metrics) - return err + return err, "processSlot" } // Handle any reorgs or skipped slots. headOrHistoric = strings.ToLower(headOrHistoric) if headOrHistoric != "head" && headOrHistoric != "historic" { - return fmt.Errorf("headOrHistoric must be either historic or head!") + return fmt.Errorf("headOrHistoric must be either historic or head!"), "" } if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" { ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement) } - return nil + return nil, "" } // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. -func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) error { - return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement) +func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) { + err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement) + writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics) } // Handle a historic slot. A wrapper function for calling `handleFullSlot`. -// Commented because of the linter...... LOL -//func handleHistoricSlot(db sql.Database, serverAddress string, slot int) error { -// return handleFullSlot(db, serverAddress, slot, "", "", 0, "", "historic") -//} +func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics) (error, string) { + return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1) +} // Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values. func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string, vmCh <-chan *dt.VersionedUnmarshaler) error { @@ -289,10 +288,13 @@ func (ps *ProcessSlot) createWriteObjects(blockRootEndpoint string) (*DatabaseWr blockRoot = ps.BlockRoot } else { var err error - blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot)) + rawBlockRoot, err := ps.FullSignedBeaconBlock.Block().HashTreeRoot() + //blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot)) if err != nil { return nil, err } + blockRoot = "0x" + hex.EncodeToString(rawBlockRoot[:]) + log.WithFields(log.Fields{"blockRoot": blockRoot}).Debug("Block Root from ssz:") } eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().Body().Eth1Data().BlockHash) } diff --git a/pkg/loghelper/logerror.go b/pkg/loghelper/logerror.go index 94a5069..f6dce63 100644 --- a/pkg/loghelper/logerror.go +++ b/pkg/loghelper/logerror.go @@ -27,9 +27,26 @@ func LogError(err error) *log.Entry { }) } +// A simple herlper function to log slot and error. func LogSlotError(slot string, err error) *log.Entry { return log.WithFields(log.Fields{ "err": err, "slot": slot, }) } + +func LogSlotRangeError(startSlot string, endSlot string, err error) *log.Entry { + return log.WithFields(log.Fields{ + "err": err, + "startSlot": startSlot, + "endSlot": endSlot, + }) +} +func LogSlotRangeStatementError(startSlot string, endSlot string, statement string, err error) *log.Entry { + return log.WithFields(log.Fields{ + "err": err, + "startSlot": startSlot, + "endSlot": endSlot, + "SqlStatement": statement, + }) +}