Thomas E Lackey
27fa54c6dc
1. Updates or replaces outdated dependencies (eg, replacing a version of the Prysm client with the latest zrnt). 2. Add support for parsing Bellatrix-era BeaconState and BeaconBlocks 3. Adds flags for toggling the processing of BeaconBlocks and BeaconState. This is particularly important because processing and storing the BeaconState at this time would be too expensive to really do (see: Temporarily disable BeaconState indexing #75 and [Feature] Reduce the Amount of DB Space the Beacon Chain Needs #71) 4. Fixes flaky event handling. The previous code would not reconnect in the case of errors with the SSE connection. This enables automatic reconnection in the case of error (default in the updated v2 SSE library dependency), and also adds a timeout so that if no event is received in 2.5x the block time, the SSE connection is closed and re-established. 5. Other refactoring and cleanup (eg, changing the type of slot from int to Slot (uint64)).
307 lines
16 KiB
Go
307 lines
16 KiB
Go
package beaconclient_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/jarcoal/httpmock"
|
|
. "github.com/onsi/ginkgo/v2"
|
|
. "github.com/onsi/gomega"
|
|
log "github.com/sirupsen/logrus"
|
|
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/beaconclient"
|
|
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql"
|
|
)
|
|
|
|
var (
|
|
kgCheckCheckedOutStmt = `SELECT * FROM eth_beacon.known_gaps WHERE checked_out=true `
|
|
hpCheckCheckedOutStmt = `SELECT * FROM eth_beacon.historic_process WHERE checked_out=true `
|
|
)
|
|
|
|
var _ = Describe("Capturehistoric", func() {
|
|
|
|
Describe("Run the application in historic mode", Label("unit", "behavioral", "historical"), func() {
|
|
Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.historic_process table.", Label("deb"), func() {
|
|
It("Successfully Process the Blocks", func() {
|
|
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
|
defer httpmock.DeactivateAndReset()
|
|
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 101, 10)
|
|
BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0)
|
|
// Run Two seperate processes
|
|
BeaconNodeTester.writeEventToHistoricProcess(bc, 2375703, 2375703, 10)
|
|
BeaconNodeTester.runHistoricalProcess(bc, 2, 3, 0, 0, 0)
|
|
|
|
time.Sleep(2 * time.Second)
|
|
validatePopularBatchBlocks(bc)
|
|
})
|
|
})
|
|
Context("When the start block is greater than the endBlock", func() {
|
|
It("Should Add two entries to the knownGaps table", func() {
|
|
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
|
defer httpmock.DeactivateAndReset()
|
|
BeaconNodeTester.writeEventToHistoricProcess(bc, 101, 100, 10)
|
|
BeaconNodeTester.runHistoricalProcess(bc, 2, 0, 0, 2, 0)
|
|
})
|
|
})
|
|
Context("Processing the Genesis block", Label("genesis"), func() {
|
|
It("Should Process properly", func() {
|
|
bc := setUpTest(BeaconNodeTester.TestConfig, "100")
|
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
|
defer httpmock.DeactivateAndReset()
|
|
BeaconNodeTester.writeEventToHistoricProcess(bc, 0, 0, 10)
|
|
BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0)
|
|
validateSlot(bc, BeaconNodeTester.TestEvents["0"].HeadMessage, 0, "proposed")
|
|
if bc.PerformBeaconBlockProcessing {
|
|
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["0"].HeadMessage, BeaconNodeTester.TestEvents["0"].CorrectParentRoot, BeaconNodeTester.TestEvents["0"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["0"].CorrectSignedBeaconBlockMhKey, BeaconNodeTester.TestEvents["0"].CorrectExecutionPayloadHeader)
|
|
}
|
|
if bc.PerformBeaconStateProcessing {
|
|
validateBeaconState(bc, BeaconNodeTester.TestEvents["0"].HeadMessage, BeaconNodeTester.TestEvents["0"].CorrectBeaconStateMhKey)
|
|
}
|
|
})
|
|
})
|
|
Context("When there is a skipped slot", func() {
|
|
It("Should process the slot properly.", func() {
|
|
bc := setUpTest(BeaconNodeTester.TestConfig, "3797055")
|
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
|
defer httpmock.DeactivateAndReset()
|
|
BeaconNodeTester.writeEventToHistoricProcess(bc, 3797056, 3797056, 10)
|
|
BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0)
|
|
validateSlot(bc, BeaconNodeTester.TestEvents["3797056"].HeadMessage, 118658, "skipped")
|
|
})
|
|
})
|
|
})
|
|
Describe("Running the Application to process Known Gaps", Label("unit", "behavioral", "knownGaps"), func() {
|
|
Context("Phase0 + Altairs: When we need to process a multiple blocks in a multiple entries in the eth_beacon.known_gaps table.", func() {
|
|
It("Successfully Process the Blocks", func() {
|
|
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
|
defer httpmock.DeactivateAndReset()
|
|
BeaconNodeTester.writeEventToKnownGaps(bc, 100, 101)
|
|
BeaconNodeTester.runKnownGapsProcess(bc, 2, 2, 0, 0, 0)
|
|
// Run Two seperate processes
|
|
BeaconNodeTester.writeEventToKnownGaps(bc, 2375703, 2375703)
|
|
BeaconNodeTester.runKnownGapsProcess(bc, 2, 3, 0, 0, 0)
|
|
|
|
time.Sleep(2 * time.Second)
|
|
validatePopularBatchBlocks(bc)
|
|
})
|
|
})
|
|
Context("When the start block is greater than the endBlock", func() {
|
|
It("Should Add two entries to the knownGaps table", func() {
|
|
bc := setUpTest(BeaconNodeTester.TestConfig, "104")
|
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
|
defer httpmock.DeactivateAndReset()
|
|
BeaconNodeTester.writeEventToKnownGaps(bc, 101, 100)
|
|
BeaconNodeTester.runKnownGapsProcess(bc, 2, 2, 0, 2, 0)
|
|
})
|
|
})
|
|
Context("When theres a reprocessing error", Label("reprocessingError", "flaky"), func() {
|
|
It("Should update the reprocessing error.", func() {
|
|
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
|
defer httpmock.DeactivateAndReset()
|
|
// We dont have an entry in the BeaconNodeTester for this slot
|
|
BeaconNodeTester.writeEventToHistoricProcess(bc, 105, 105, 10)
|
|
BeaconNodeTester.runHistoricalProcess(bc, 2, 0, 0, 1, 0)
|
|
BeaconNodeTester.runKnownGapsProcess(bc, 2, 0, 0, 1, 1)
|
|
})
|
|
})
|
|
})
|
|
Describe("Running the application in Historic, Head, and KnownGaps mode", Label("unit", "historical", "full"), func() {
|
|
Context("When it recieves a head, historic and known Gaps message (in order)", func() {
|
|
It("Should process them all successfully.", func() {
|
|
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
|
defer httpmock.DeactivateAndReset()
|
|
// Head
|
|
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
|
|
|
|
// Historical
|
|
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10)
|
|
BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0)
|
|
|
|
// Known Gaps
|
|
BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101)
|
|
BeaconNodeTester.runKnownGapsProcess(bc, 2, 3, 0, 0, 0)
|
|
|
|
time.Sleep(2 * time.Second)
|
|
validatePopularBatchBlocks(bc)
|
|
})
|
|
})
|
|
Context("When it recieves a historic, head and known Gaps message (in order)", func() {
|
|
It("Should process them all successfully.", func() {
|
|
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
|
defer httpmock.DeactivateAndReset()
|
|
// Historical
|
|
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10)
|
|
BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0)
|
|
|
|
// Head
|
|
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
|
|
|
|
// Known Gaps
|
|
BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101)
|
|
BeaconNodeTester.runKnownGapsProcess(bc, 2, 3, 0, 0, 0)
|
|
|
|
time.Sleep(2 * time.Second)
|
|
validatePopularBatchBlocks(bc)
|
|
})
|
|
})
|
|
Context("When it recieves a known Gaps, historic and head message (in order)", func() {
|
|
It("Should process them all successfully.", func() {
|
|
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
|
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
|
defer httpmock.DeactivateAndReset()
|
|
// Known Gaps
|
|
BeaconNodeTester.writeEventToKnownGaps(bc, 101, 101)
|
|
BeaconNodeTester.runKnownGapsProcess(bc, 2, 1, 0, 0, 0)
|
|
|
|
// Historical
|
|
BeaconNodeTester.writeEventToHistoricProcess(bc, 100, 100, 10)
|
|
BeaconNodeTester.runHistoricalProcess(bc, 2, 2, 0, 0, 0)
|
|
|
|
// Head
|
|
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
|
|
|
|
time.Sleep(2 * time.Second)
|
|
validatePopularBatchBlocks(bc)
|
|
})
|
|
})
|
|
})
|
|
})
|
|
|
|
// This function will write an even to the eth_beacon.known_gaps table
|
|
func (tbc TestBeaconNode) writeEventToKnownGaps(bc *beaconclient.BeaconClient, startSlot, endSlot int) {
|
|
log.Debug("We are writing the necessary events to batch process")
|
|
insertKnownGapsStmt := `INSERT INTO eth_beacon.known_gaps (start_slot, end_slot)
|
|
VALUES ($1, $2);`
|
|
res, err := bc.Db.Exec(context.Background(), insertKnownGapsStmt, startSlot, endSlot)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
rows, err := res.RowsAffected()
|
|
if rows != 1 {
|
|
Fail("We didnt write...")
|
|
}
|
|
Expect(err).ToNot(HaveOccurred())
|
|
}
|
|
|
|
// This function will write an even to the eth_beacon.known_gaps table
|
|
func (tbc TestBeaconNode) writeEventToHistoricProcess(bc *beaconclient.BeaconClient, startSlot, endSlot, priority int) {
|
|
log.Debug("We are writing the necessary events to batch process")
|
|
insertHistoricProcessingStmt := `INSERT INTO eth_beacon.historic_process (start_slot, end_slot, priority)
|
|
VALUES ($1, $2, $3);`
|
|
res, err := bc.Db.Exec(context.Background(), insertHistoricProcessingStmt, startSlot, endSlot, priority)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
rows, err := res.RowsAffected()
|
|
if rows != 1 {
|
|
Fail("We didnt write...")
|
|
}
|
|
Expect(err).ToNot(HaveOccurred())
|
|
}
|
|
|
|
// Start the CaptureHistoric function, and check for the correct inserted slots.
|
|
func (tbc TestBeaconNode) runHistoricalProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
go bc.CaptureHistoric(ctx, maxWorkers, 0)
|
|
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
|
|
log.Debug("Calling the stop function for historical processing..")
|
|
err := bc.StopHistoric(cancel)
|
|
time.Sleep(5 * time.Second)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
validateAllRowsCheckedOut(bc.Db, hpCheckCheckedOutStmt)
|
|
}
|
|
|
|
// Wrapper function that processes knownGaps
|
|
func (tbc TestBeaconNode) runKnownGapsProcess(bc *beaconclient.BeaconClient, maxWorkers int, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
go bc.ProcessKnownGaps(ctx, maxWorkers, 0)
|
|
validateMetrics(bc, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError)
|
|
err := bc.StopKnownGapsProcessing(cancel)
|
|
time.Sleep(5 * time.Second)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
validateAllRowsCheckedOut(bc.Db, kgCheckCheckedOutStmt)
|
|
}
|
|
|
|
func validateMetrics(bc *beaconclient.BeaconClient, expectedInserts, expectedReorgs, expectedKnownGaps, expectedKnownGapsReprocessError uint64) {
|
|
curRetry := 0
|
|
value := atomic.LoadUint64(&bc.Metrics.SlotInserts)
|
|
for value != expectedInserts {
|
|
time.Sleep(1 * time.Second)
|
|
curRetry = curRetry + 1
|
|
if curRetry == maxRetry {
|
|
Fail(fmt.Sprintf("Too many retries have occurred. The number of inserts expected %d, the number that actually occurred, %d", expectedInserts, atomic.LoadUint64(&bc.Metrics.SlotInserts)))
|
|
}
|
|
value = atomic.LoadUint64(&bc.Metrics.SlotInserts)
|
|
}
|
|
curRetry = 0
|
|
value = atomic.LoadUint64(&bc.Metrics.KnownGapsInserts)
|
|
for value != expectedKnownGaps {
|
|
time.Sleep(1 * time.Second)
|
|
curRetry = curRetry + 1
|
|
if curRetry == maxRetry {
|
|
Fail(fmt.Sprintf("Too many retries have occurred. The number of knownGaps expected %d, the number that actually occurred, %d", expectedKnownGaps, atomic.LoadUint64(&bc.Metrics.KnownGapsInserts)))
|
|
}
|
|
value = atomic.LoadUint64(&bc.Metrics.KnownGapsInserts)
|
|
}
|
|
curRetry = 0
|
|
value = atomic.LoadUint64(&bc.Metrics.KnownGapsReprocessError)
|
|
for value != expectedKnownGapsReprocessError {
|
|
time.Sleep(1 * time.Second)
|
|
curRetry = curRetry + 1
|
|
if curRetry == maxRetry {
|
|
Fail(fmt.Sprintf("Too many retries have occurred. The number of knownGapsReprocessingErrors expected %d, the number that actually occurred, %d", expectedKnownGapsReprocessError, value))
|
|
}
|
|
log.Debug("&bc.Metrics.KnownGapsReprocessError: ", &bc.Metrics.KnownGapsReprocessError)
|
|
value = atomic.LoadUint64(&bc.Metrics.KnownGapsReprocessError)
|
|
}
|
|
curRetry = 0
|
|
value = atomic.LoadUint64(&bc.Metrics.ReorgInserts)
|
|
for value != expectedReorgs {
|
|
time.Sleep(1 * time.Second)
|
|
curRetry = curRetry + 1
|
|
if curRetry == maxRetry {
|
|
Fail(fmt.Sprintf("Too many retries have occurred. The number of Reorgs expected %d, the number that actually occurred, %d", expectedReorgs, atomic.LoadUint64(&bc.Metrics.ReorgInserts)))
|
|
}
|
|
value = atomic.LoadUint64(&bc.Metrics.ReorgInserts)
|
|
}
|
|
}
|
|
|
|
// A wrapper function to validate a few popular blocks
|
|
func validatePopularBatchBlocks(bc *beaconclient.BeaconClient) {
|
|
validateSlot(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, "proposed")
|
|
if bc.PerformBeaconBlockProcessing {
|
|
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey, BeaconNodeTester.TestEvents["100"].CorrectExecutionPayloadHeader)
|
|
}
|
|
if bc.PerformBeaconStateProcessing {
|
|
validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey)
|
|
}
|
|
|
|
validateSlot(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, "proposed")
|
|
if bc.PerformBeaconBlockProcessing {
|
|
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, BeaconNodeTester.TestEvents["100"].HeadMessage.Block, BeaconNodeTester.TestEvents["101"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["101"].CorrectSignedBeaconBlockMhKey, BeaconNodeTester.TestEvents["101"].CorrectExecutionPayloadHeader)
|
|
}
|
|
if bc.PerformBeaconStateProcessing {
|
|
validateBeaconState(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, BeaconNodeTester.TestEvents["101"].CorrectBeaconStateMhKey)
|
|
}
|
|
|
|
validateSlot(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, "proposed")
|
|
if bc.PerformBeaconBlockProcessing {
|
|
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey, BeaconNodeTester.TestEvents["2375703"].CorrectExecutionPayloadHeader)
|
|
}
|
|
if bc.PerformBeaconStateProcessing {
|
|
validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey)
|
|
}
|
|
}
|
|
|
|
// Make sure all rows have checked_out as false.
|
|
func validateAllRowsCheckedOut(db sql.Database, checkStmt string) {
|
|
res, err := db.Exec(context.Background(), checkStmt)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
rows, err := res.RowsAffected()
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(rows).To(Equal(int64(0)))
|
|
}
|