ipld-eth-beacon-indexer/pkg/beaconclient/capturehistoric_test.go
Thomas E Lackey 27fa54c6dc
76: Add indexing of ExecutionPayloads (and other Merge-related updates). (#73)
1. Updates or replaces outdated dependencies (eg, replacing a version of the Prysm client with the latest zrnt).

2. Add support for parsing Bellatrix-era BeaconState and BeaconBlocks

3. Adds flags for toggling the processing of BeaconBlocks and BeaconState. This is particularly important because processing and storing the BeaconState at this time would be too expensive to really do (see: Temporarily disable BeaconState indexing #75 and [Feature] Reduce the Amount of DB Space the Beacon Chain Needs #71)

4. Fixes flaky event handling. The previous code would not reconnect in the case of errors with the SSE connection. This enables automatic reconnection in the case of error (default in the updated v2 SSE library dependency), and also adds a timeout so that if no event is received in 2.5x the block time, the SSE connection is closed and re-established.

5. Other refactoring and cleanup (eg, changing the type of slot from int to Slot (uint64)).
2022-09-28 20:39:56 -05:00

307 lines
16 KiB
Go

package beaconclient_test
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/jarcoal/httpmock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/beaconclient"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql"
)
var (
kgCheckCheckedOutStmt = `SELECT * FROM eth_beacon.known_gaps WHERE checked_out=true `
hpCheckCheckedOutStmt = `SELECT * FROM eth_beacon.historic_process WHERE checked_out=true `
)
var _ = Describe("Capturehistoric", func() {
Describe("Run the application in historic mode", Label("unit", "behavioral", "historical"), func() {
Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.historic_process table.", Label("deb"), func() {
It("Successfully Process the Blocks", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10)
BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0)
// Run Two seperate processes
BeaconNodeTester.writeEventToHistoricProcess(bc, 2375703, 2375703, 10)
BeaconNodeTester.runHistoricalProcess(bc, 2, 3, 0, 0, 0)
time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc)
})
})
Context("When the start block is greater than the endBlock", func() {
It("Should Add two entries to the knownGaps table", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.writeEventToHistoricProcess(bc, 101, 100, 10)
BeaconNodeTester.runHistoricalProcess(bc, 2, 0, 0, 2, 0)
})
})
Context("Processing the Genesis block", Label("genesis"), func() {
It("Should Process properly", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "100")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.writeEventToHistoricProcess(bc, 0, 0, 10)
BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0)
validateSlot(bc, BeaconNodeTester.TestEvents["0"].HeadMessage, 0, "proposed")
if bc.PerformBeaconBlockProcessing {
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["0"].HeadMessage, BeaconNodeTester.TestEvents["0"].CorrectParentRoot, BeaconNodeTester.TestEvents["0"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["0"].CorrectSignedBeaconBlockMhKey, BeaconNodeTester.TestEvents["0"].CorrectExecutionPayloadHeader)
}
if bc.PerformBeaconStateProcessing {
validateBeaconState(bc, BeaconNodeTester.TestEvents["0"].HeadMessage, BeaconNodeTester.TestEvents["0"].CorrectBeaconStateMhKey)
}
})
})
Context("When there is a skipped slot", func() {
It("Should process the slot properly.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "3797055")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.writeEventToHistoricProcess(bc, 3797056, 3797056, 10)
BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0)
validateSlot(bc, BeaconNodeTester.TestEvents["3797056"].HeadMessage, 118658, "skipped")
})
})
})
Describe("Running the Application to process Known Gaps", Label("unit", "behavioral", "knownGaps"), func() {
Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.known_gaps table.", func() {
It("Successfully Process the Blocks", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.writeEventToKnownGaps(bc, 100, 101)
BeaconNodeTester.runKnownGapsProcess(bc, 2, 2, 0, 0, 0)
// Run Two seperate processes
BeaconNodeTester.writeEventToKnownGaps(bc, 2375703, 2375703)
BeaconNodeTester.runKnownGapsProcess(bc, 2, 3, 0, 0, 0)
time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc)
})
})
Context("When the start block is greater than the endBlock", func() {
It("Should Add two entries to the knownGaps table", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "104")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
BeaconNodeTester.writeEventToKnownGaps(bc, 101, 100)
BeaconNodeTester.runKnownGapsProcess(bc, 2, 2, 0, 2, 0)
})
})
Context("When theres a reprocessing error", Label("reprocessingError", "flaky"), func() {
It("Should update the reprocessing error.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
// We dont have an entry in the BeaconNodeTester for this slot
BeaconNodeTester.writeEventToHistoricProcess(bc, 105, 105, 10)
BeaconNodeTester.runHistoricalProcess(bc, 2, 0, 0, 1, 0)
BeaconNodeTester.runKnownGapsProcess(bc, 2, 0, 0, 1, 1)
})
})
})
Describe("Running the application in Historic, Head, and KnownGaps mode", Label("unit", "historical", "full"), func() {
Context("When it recieves a head, historic and known Gaps message (in order)", func() {
It("Should process them all successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
// Head
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
// Historical
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10)
BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0)
// Known Gaps
BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101)
BeaconNodeTester.runKnownGapsProcess(bc, 2, 3, 0, 0, 0)
time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc)
})
})
Context("When it recieves a historic, head and known Gaps message (in order)", func() {
It("Should process them all successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
// Historical
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10)
BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0)
// Head
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
// Known Gaps
BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101)
BeaconNodeTester.runKnownGapsProcess(bc, 2, 3, 0, 0, 0)
time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc)
})
})
Context("When it recieves a known Gaps, historic and head message (in order)", func() {
It("Should process them all successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
// Known Gaps
BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101)
BeaconNodeTester.runKnownGapsProcess(bc, 2, 1, 0, 0, 0)
// Historical
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10)
BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0)
// Head
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
time.Sleep(2 * time.Second)
validatePopularBatchBlocks(bc)
})
})
})
})
// This function will write an even to the eth_beacon.known_gaps table
func (tbc TestBeaconNode) writeEventToKnownGaps(bc *beaconclient.BeaconClient, startSlot, endSlot int) {
log.Debug("We are writing the necessary events to batch process")
insertKnownGapsStmt := `INSERT INTO eth_beacon.known_gaps (start_slot, end_slot)
VALUES ($1, $2);`
res, err := bc.Db.Exec(context.Background(), insertKnownGapsStmt, startSlot, endSlot)
Expect(err).ToNot(HaveOccurred())
rows, err := res.RowsAffected()
if rows != 1 {
Fail("We didnt write...")
}
Expect(err).ToNot(HaveOccurred())
}
// This function will write an even to the eth_beacon.known_gaps table
func (tbc TestBeaconNode) writeEventToHistoricProcess(bc *beaconclient.BeaconClient, startSlot, endSlot, priority int) {
log.Debug("We are writing the necessary events to batch process")
insertHistoricProcessingStmt := `INSERT INTO eth_beacon.historic_process (start_slot, end_slot, priority)
VALUES ($1, $2, $3);`
res, err := bc.Db.Exec(context.Background(), insertHistoricProcessingStmt, startSlot, endSlot, priority)
Expect(err).ToNot(HaveOccurred())
rows, err := res.RowsAffected()
if rows != 1 {
Fail("We didnt write...")
}
Expect(err).ToNot(HaveOccurred())
}
// Start the CaptureHistoric function, and check for the correct inserted slots.
func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
ctx, cancel := context.WithCancel(context.Background())
go bc.CaptureHistoric(ctx, maxWorkers, 0)
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
log.Debug("Calling the stop function for historical processing..")
err := bc.StopHistoric(cancel)
time.Sleep(5 * time.Second)
Expect(err).ToNot(HaveOccurred())
validateAllRowsCheckedOut(bc.Db, hpCheckCheckedOutStmt)
}
// Wrapper function that processes knownGaps
func (tbc TestBeaconNode) runKnownGapsProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
ctx, cancel := context.WithCancel(context.Background())
go bc.ProcessKnownGaps(ctx, maxWorkers, 0)
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
err := bc.StopKnownGapsProcessing(cancel)
time.Sleep(5 * time.Second)
Expect(err).ToNot(HaveOccurred())
validateAllRowsCheckedOut(bc.Db, kgCheckCheckedOutStmt)
}
func validateMetrics(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
curRetry := 0
value := atomic.LoadUint64(&bc.Metrics.SlotInserts)
for value != expectedInserts {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
Fail(fmt.Sprintf("Too many retries have occurred. The number of inserts expected %d, the number that actually occurred, %d", expectedInserts, atomic.LoadUint64(&bc.Metrics.SlotInserts)))
}
value = atomic.LoadUint64(&bc.Metrics.SlotInserts)
}
curRetry = 0
value = atomic.LoadUint64(&bc.Metrics.KnownGapsInserts)
for value != expectedKnownGaps {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
Fail(fmt.Sprintf("Too many retries have occurred. The number of knownGaps expected %d, the number that actually occurred, %d", expectedKnownGaps, atomic.LoadUint64(&bc.Metrics.KnownGapsInserts)))
}
value = atomic.LoadUint64(&bc.Metrics.KnownGapsInserts)
}
curRetry = 0
value = atomic.LoadUint64(&bc.Metrics.KnownGapsReprocessError)
for value != expectedKnownGapsReprocessError {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
Fail(fmt.Sprintf("Too many retries have occurred. The number of knownGapsReprocessingErrors expected %d, the number that actually occurred, %d", expectedKnownGapsReprocessError, value))
}
log.Debug("&bc.Metrics.KnownGapsReprocessError: ", &bc.Metrics.KnownGapsReprocessError)
value = atomic.LoadUint64(&bc.Metrics.KnownGapsReprocessError)
}
curRetry = 0
value = atomic.LoadUint64(&bc.Metrics.ReorgInserts)
for value != expectedReorgs {
time.Sleep(1 * time.Second)
curRetry = curRetry + 1
if curRetry == maxRetry {
Fail(fmt.Sprintf("Too many retries have occurred. The number of Reorgs expected %d, the number that actually occurred, %d", expectedReorgs, atomic.LoadUint64(&bc.Metrics.ReorgInserts)))
}
value = atomic.LoadUint64(&bc.Metrics.ReorgInserts)
}
}
// A wrapper function to validate a few popular blocks
func validatePopularBatchBlocks(bc *beaconclient.BeaconClient) {
validateSlot(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, "proposed")
if bc.PerformBeaconBlockProcessing {
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey, BeaconNodeTester.TestEvents["100"].CorrectExecutionPayloadHeader)
}
if bc.PerformBeaconStateProcessing {
validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey)
}
validateSlot(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, "proposed")
if bc.PerformBeaconBlockProcessing {
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, BeaconNodeTester.TestEvents["100"].HeadMessage.Block, BeaconNodeTester.TestEvents["101"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["101"].CorrectSignedBeaconBlockMhKey, BeaconNodeTester.TestEvents["101"].CorrectExecutionPayloadHeader)
}
if bc.PerformBeaconStateProcessing {
validateBeaconState(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, BeaconNodeTester.TestEvents["101"].CorrectBeaconStateMhKey)
}
validateSlot(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, "proposed")
if bc.PerformBeaconBlockProcessing {
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey, BeaconNodeTester.TestEvents["2375703"].CorrectExecutionPayloadHeader)
}
if bc.PerformBeaconStateProcessing {
validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey)
}
}
// Make sure all rows have checked_out as false.
func validateAllRowsCheckedOut(db sql.Database, checkStmt string) {
res, err := db.Exec(context.Background(), checkStmt)
Expect(err).ToNot(HaveOccurred())
rows, err := res.RowsAffected()
Expect(err).ToNot(HaveOccurred())
Expect(rows).To(Equal(int64(0)))
}