76: Add indexing of ExecutionPayloads (and other Merge-related updates). #73

Merged
telackey merged 30 commits from telackey/the_merge into main 2022-09-29 01:39:56 +00:00
10 changed files with 163 additions and 117 deletions
Showing only changes of commit a74df084c4 - Show all commits

View File

@ -37,7 +37,7 @@ var (
BcBlockRootEndpoint = func(slot string) string { BcBlockRootEndpoint = func(slot string) string {
return "/eth/v1/beacon/blocks/" + slot + "/root" return "/eth/v1/beacon/blocks/" + slot + "/root"
} }
bcSlotsPerEpoch = 32 // Number of slots in a single Epoch bcSlotsPerEpoch uint64 = 32 // Number of slots in a single Epoch
//bcSlotPerHistoricalVector = 8192 // The number of slots in a historic vector. //bcSlotPerHistoricalVector = 8192 // The number of slots in a historic vector.
//bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain //bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
) )
@ -58,8 +58,8 @@ type BeaconClient struct {
// Used for Head Tracking // Used for Head Tracking
PerformHeadTracking bool // Should we track head? PerformHeadTracking bool // Should we track head?
StartingSlot int // If we're performing head tracking. What is the first slot we processed. StartingSlot uint64 // If we're performing head tracking. What is the first slot we processed.
PreviousSlot int // Whats the previous slot we processed PreviousSlot uint64 // Whats the previous slot we processed
PreviousBlockRoot string // Whats the previous block root, used to check the next blocks parent. PreviousBlockRoot string // Whats the previous block root, used to check the next blocks parent.
HeadTracking *SseEvents[Head] // Track the head block HeadTracking *SseEvents[Head] // Track the head block
ReOrgTracking *SseEvents[ChainReorg] // Track all Reorgs ReOrgTracking *SseEvents[ChainReorg] // Track all Reorgs

View File

@ -276,6 +276,15 @@ var (
CorrectParentRoot: "0x60e751f7d2cf0ae24b195bda37e9add56a7d8c4b75469c018c0f912518c3bae8", CorrectParentRoot: "0x60e751f7d2cf0ae24b195bda37e9add56a7d8c4b75469c018c0f912518c3bae8",
CorrectSignedBeaconBlockMhKey: "/blocks/QLVAEQRQPA4DCMDBGAYDIMBQME4DAY3EMZTGGMJRMZTGIY3GGE3WCYZUGA2GCYZUMRRGCMRRGVRDSNJSGIYTSNJVME4WIZTEMRTDCNRTMQYGEMDE", CorrectSignedBeaconBlockMhKey: "/blocks/QLVAEQRQPA4DCMDBGAYDIMBQME4DAY3EMZTGGMJRMZTGIY3GGE3WCYZUGA2GCYZUMRRGCMRRGVRDSNJSGIYTSNJVME4WIZTEMRTDCNRTMQYGEMDE",
CorrectBeaconStateMhKey: "", CorrectBeaconStateMhKey: "",
CorrectExecutionPayloadHeader: &beaconclient.DbExecutionPayloadHeader{
BlockNumber: 15537394,
Timestamp: 1663224179,
BlockHash: "0x56a9bb0302da44b8c0b3df540781424684c3af04d0b7a38d72842b762076a664",
ParentHash: "0x55b11b918355b1ef9c5db810302ebad0bf2544255b530cdce90674d5887bb286",
StateRoot: "0x40c07091e16263270f3579385090fea02dd5f061ba6750228fcc082ff762fda7",
ReceiptsRoot: "0x928073fb98ce316265ea35d95ab7e2e1206cecd85242eb841dbbcc4f568fca4b",
TransactionsRoot: "0xf9ef008aaf996dccd1c871c7e937f25d66e057e52773fbe2497090c114231acf",
},
}, },
} }
TestConfig = Config{ TestConfig = Config{
@ -310,6 +319,7 @@ type Message struct {
CorrectBeaconStateMhKey string // The correct MhKey beaconState CorrectBeaconStateMhKey string // The correct MhKey beaconState
CorrectParentRoot string // The correct parent root CorrectParentRoot string // The correct parent root
CorrectEth1DataBlockHash string // The correct eth1blockHash CorrectEth1DataBlockHash string // The correct eth1blockHash
CorrectExecutionPayloadHeader *beaconclient.DbExecutionPayloadHeader // The correct ExecutionPayload details.
} }
// A structure that can be utilized to mimic and existing SSZ object but change it ever so slightly. // A structure that can be utilized to mimic and existing SSZ object but change it ever so slightly.
@ -329,7 +339,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0) BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, maxRetry, 1, 0, 0)
if bc.PerformBeaconBlockProcessing { if bc.PerformBeaconBlockProcessing {
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey, BeaconNodeTester.TestEvents["100"].CorrectExecutionPayloadHeader)
} }
if bc.PerformBeaconStateProcessing { if bc.PerformBeaconStateProcessing {
validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey)
@ -344,7 +354,7 @@ var _ = Describe("Capturehead", Label("head"), func() {
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0) BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, maxRetry, 1, 0, 0)
if bc.PerformBeaconBlockProcessing { if bc.PerformBeaconBlockProcessing {
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey, BeaconNodeTester.TestEvents["2375703"].CorrectExecutionPayloadHeader)
} }
if bc.PerformBeaconStateProcessing { if bc.PerformBeaconStateProcessing {
validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey)
@ -371,12 +381,18 @@ var _ = Describe("Capturehead", Label("head"), func() {
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["4636672"].HeadMessage, 144896, maxRetry, 1, 0, 0) BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["4636672"].HeadMessage, 144896, maxRetry, 1, 0, 0)
if bc.PerformBeaconBlockProcessing {
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["4636672"].HeadMessage, BeaconNodeTester.TestEvents["4636672"].CorrectParentRoot, BeaconNodeTester.TestEvents["4636672"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["4636672"].CorrectSignedBeaconBlockMhKey, BeaconNodeTester.TestEvents["4636672"].CorrectExecutionPayloadHeader)
}
}) })
It("Should turn it into a struct successfully (post-Merge).", func() { It("Should turn it into a struct successfully (post-Merge).", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "4700013") bc := setUpTest(BeaconNodeTester.TestConfig, "4700013")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset() defer httpmock.DeactivateAndReset()
BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["4700013"].HeadMessage, 146875, maxRetry, 1, 0, 0) BeaconNodeTester.testProcessBlock(bc, BeaconNodeTester.TestEvents["4700013"].HeadMessage, 146875, maxRetry, 1, 0, 0)
if bc.PerformBeaconBlockProcessing {
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["4700013"].HeadMessage, BeaconNodeTester.TestEvents["4700013"].CorrectParentRoot, BeaconNodeTester.TestEvents["4700013"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["4700013"].CorrectSignedBeaconBlockMhKey, BeaconNodeTester.TestEvents["4700013"].CorrectExecutionPayloadHeader)
}
}) })
}) })
Context("Correctly formatted Phase0 Test Blocks", func() { Context("Correctly formatted Phase0 Test Blocks", func() {
@ -563,10 +579,10 @@ func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient {
} }
// A helper function to validate the expected output from the eth_beacon.slots table. // A helper function to validate the expected output from the eth_beacon.slots table.
func validateSlot(bc *beaconclient.BeaconClient, headMessage beaconclient.Head, correctEpoch int, correctStatus string) { func validateSlot(bc *beaconclient.BeaconClient, headMessage beaconclient.Head, correctEpoch uint64, correctStatus string) {
epoch, dbSlot, blockRoot, stateRoot, status := queryDbSlotAndBlock(bc.Db, headMessage.Slot, headMessage.Block) epoch, dbSlot, blockRoot, stateRoot, status := queryDbSlotAndBlock(bc.Db, headMessage.Slot, headMessage.Block)
log.Info("validateSlot: ", headMessage) log.Info("validateSlot: ", headMessage)
baseSlot, err := strconv.Atoi(headMessage.Slot) baseSlot, err := strconv.ParseUint(headMessage.Slot, 10, 64)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(dbSlot).To(Equal(baseSlot)) Expect(dbSlot).To(Equal(baseSlot))
Expect(epoch).To(Equal(correctEpoch)) Expect(epoch).To(Equal(correctEpoch))
@ -576,17 +592,19 @@ func validateSlot(bc *beaconclient.BeaconClient, headMessage beaconclient.Head,
} }
// A helper function to validate the expected output from the eth_beacon.signed_block table. // A helper function to validate the expected output from the eth_beacon.signed_block table.
func validateSignedBeaconBlock(bc *beaconclient.BeaconClient, headMessage beaconclient.Head, correctParentRoot string, correctEth1DataBlockHash string, correctMhKey string) { func validateSignedBeaconBlock(bc *beaconclient.BeaconClient, headMessage beaconclient.Head,
dbSlot, blockRoot, parentRoot, eth1DataBlockHash, mhKey := queryDbSignedBeaconBlock(bc.Db, headMessage.Slot, headMessage.Block) correctParentRoot string, correctEth1DataBlockHash string, correctMhKey string,
correctExecutionPayloadheader *beaconclient.DbExecutionPayloadHeader) {
dbSignedBlock := queryDbSignedBeaconBlock(bc.Db, headMessage.Slot, headMessage.Block)
log.Info("validateSignedBeaconBlock: ", headMessage) log.Info("validateSignedBeaconBlock: ", headMessage)
baseSlot, err := strconv.Atoi(headMessage.Slot) baseSlot, err := strconv.ParseUint(headMessage.Slot, 10, 64)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(dbSlot).To(Equal(baseSlot)) Expect(dbSignedBlock.Slot).To(Equal(baseSlot))
Expect(blockRoot).To(Equal(headMessage.Block)) Expect(dbSignedBlock.BlockRoot).To(Equal(headMessage.Block))
Expect(parentRoot).To(Equal(correctParentRoot)) Expect(dbSignedBlock.ParentBlock).To(Equal(correctParentRoot))
Expect(eth1DataBlockHash).To(Equal(correctEth1DataBlockHash)) Expect(dbSignedBlock.Eth1DataBlockHash).To(Equal(correctEth1DataBlockHash))
Expect(mhKey).To(Equal(correctMhKey)) Expect(dbSignedBlock.MhKey).To(Equal(correctMhKey))
Expect(dbSignedBlock.ExecutionPayloadHeader).To(Equal(correctExecutionPayloadheader))
} }
// A helper function to validate the expected output from the eth_beacon.state table. // A helper function to validate the expected output from the eth_beacon.state table.
@ -630,9 +648,9 @@ func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head, maxR
} }
// A helper function to query the eth_beacon.slots table based on the slot and block_root // A helper function to query the eth_beacon.slots table based on the slot and block_root
func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot string) (int, int, string, string, string) { func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot string) (uint64, uint64, string, string, string) {
sqlStatement := `SELECT epoch, slot, block_root, state_root, status FROM eth_beacon.slots WHERE slot=$1 AND block_root=$2;` sqlStatement := `SELECT epoch, slot, block_root, state_root, status FROM eth_beacon.slots WHERE slot=$1 AND block_root=$2;`
var epoch, slot int var epoch, slot uint64
var blockRoot, stateRoot, status string var blockRoot, stateRoot, status string
log.Debug("Starting to query the eth_beacon.slots table, ", querySlot, " ", queryBlockRoot) log.Debug("Starting to query the eth_beacon.slots table, ", querySlot, " ", queryBlockRoot)
err := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot).Scan(&epoch, &slot, &blockRoot, &stateRoot, &status) err := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot).Scan(&epoch, &slot, &blockRoot, &stateRoot, &status)
@ -642,24 +660,51 @@ func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot strin
} }
// A helper function to query the eth_beacon.signed_block table based on the slot and block_root. // A helper function to query the eth_beacon.signed_block table based on the slot and block_root.
func queryDbSignedBeaconBlock(db sql.Database, querySlot string, queryBlockRoot string) (int, string, string, string, string) { func queryDbSignedBeaconBlock(db sql.Database, querySlot string, queryBlockRoot string) beaconclient.DbSignedBeaconBlock {
sqlStatement := `SELECT slot, block_root, parent_block_root, eth1_data_block_hash, mh_key, sqlStatement := `SELECT slot, block_root, parent_block_root, eth1_data_block_hash, mh_key,
payload_block_number, payload_timestamp, payload_block_hash, payload_block_number, payload_timestamp, payload_block_hash,
payload_parent_hash, payload_state_root payload_receipts_root FROM eth_beacon.signed_block WHERE slot=$1 AND block_root=$2;` payload_parent_hash, payload_state_root, payload_receipts_root,
var slot, payloadBlockNumber, payloadTimestamp int payload_transactions_root FROM eth_beacon.signed_block WHERE slot=$1 AND block_root=$2;`
var blockRoot, parentBlockRoot, eth1DataBlockHash, mhKey, payloadBlockHash, payloadParentHash, payloadStateRoot, payloadReceiptsRoot string
var slot uint64
var payloadBlockNumber, payloadTimestamp *uint64
var blockRoot, parentBlockRoot, eth1DataBlockHash, mhKey string
var payloadBlockHash, payloadParentHash, payloadStateRoot, payloadReceiptsRoot, payloadTransactionsRoot *string
row := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot) row := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot)
err := row.Scan(&slot, &blockRoot, &parentBlockRoot, &eth1DataBlockHash, &mhKey, err := row.Scan(&slot, &blockRoot, &parentBlockRoot, &eth1DataBlockHash, &mhKey,
&payloadBlockNumber, &payloadTimestamp, &payloadBlockHash, &payloadParentHash, &payloadBlockNumber, &payloadTimestamp, &payloadBlockHash,
&payloadStateRoot, &payloadReceiptsRoot) &payloadParentHash, &payloadStateRoot, &payloadReceiptsRoot, &payloadTransactionsRoot)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
return slot, blockRoot, parentBlockRoot, eth1DataBlockHash, mhKey
signedBlock := beaconclient.DbSignedBeaconBlock{
Slot: slot,
BlockRoot: blockRoot,
ParentBlock: parentBlockRoot,
Eth1DataBlockHash: eth1DataBlockHash,
MhKey: mhKey,
ExecutionPayloadHeader: nil,
}
if nil != payloadBlockNumber {
signedBlock.ExecutionPayloadHeader = &beaconclient.DbExecutionPayloadHeader{
BlockNumber: *payloadBlockNumber,
Timestamp: *payloadTimestamp,
BlockHash: *payloadBlockHash,
ParentHash: *payloadParentHash,
StateRoot: *payloadStateRoot,
ReceiptsRoot: *payloadReceiptsRoot,
TransactionsRoot: *payloadTransactionsRoot,
}
}
return signedBlock
} }
// A helper function to query the eth_beacon.signed_block table based on the slot and block_root. // A helper function to query the eth_beacon.signed_block table based on the slot and block_root.
func queryDbBeaconState(db sql.Database, querySlot string, queryStateRoot string) (int, string, string) { func queryDbBeaconState(db sql.Database, querySlot string, queryStateRoot string) (uint64, string, string) {
sqlStatement := `SELECT slot, state_root, mh_key FROM eth_beacon.state WHERE slot=$1 AND state_root=$2;` sqlStatement := `SELECT slot, state_root, mh_key FROM eth_beacon.state WHERE slot=$1 AND state_root=$2;`
var slot int var slot uint64
var stateRoot, mhKey string var stateRoot, mhKey string
row := db.QueryRow(context.Background(), sqlStatement, querySlot, queryStateRoot) row := db.QueryRow(context.Background(), sqlStatement, querySlot, queryStateRoot)
err := row.Scan(&slot, &stateRoot, &mhKey) err := row.Scan(&slot, &stateRoot, &mhKey)
@ -896,7 +941,7 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string
// Helper function to test three reorg messages. There are going to be many functions like this, // Helper function to test three reorg messages. There are going to be many functions like this,
// Because we need to test the same logic for multiple phases. // Because we need to test the same logic for multiple phases.
func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) { func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch uint64, maxRetry int) {
go bc.CaptureHead() go bc.CaptureHead()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
@ -928,7 +973,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
NewHeadBlock: secondHead.Block, NewHeadBlock: secondHead.Block,
OldHeadState: thirdHead.State, OldHeadState: thirdHead.State,
NewHeadState: secondHead.State, NewHeadState: secondHead.State,
Epoch: strconv.Itoa(epoch), Epoch: strconv.FormatUint(epoch, 10),
ExecutionOptimistic: false, ExecutionOptimistic: false,
}) })
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
@ -958,7 +1003,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
} }
// A test to validate a single block was processed correctly // A test to validate a single block was processed correctly
func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) { func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch uint64, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) {
go bc.CaptureHead() go bc.CaptureHead()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert) sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert)
@ -988,7 +1033,7 @@ func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head b
// A test that ensures that if two HeadMessages occur for a single slot they are marked // A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly. // as proposed and forked correctly.
func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) { func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch uint64, maxRetry int) {
go bc.CaptureHead() go bc.CaptureHead()
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)

View File

@ -66,14 +66,14 @@ type BatchProcessing interface {
// A struct to pass around indicating a table entry for slots to process. // A struct to pass around indicating a table entry for slots to process.
type slotsToProcess struct { type slotsToProcess struct {
startSlot int // The start slot startSlot uint64 // The start slot
endSlot int // The end slot endSlot uint64 // The end slot
} }
type batchHistoricError struct { type batchHistoricError struct {
err error // The error that occurred when attempting to a slot err error // The error that occurred when attempting to a slot
errProcess string // The process that caused the error. errProcess string // The process that caused the error.
slot int // The slot which the error is for. slot uint64 // The slot which the error is for.
} }
// Wrapper function for the BatchProcessing interface. // Wrapper function for the BatchProcessing interface.
@ -92,7 +92,7 @@ type batchHistoricError struct {
// 5. Handle any errors. // 5. Handle any errors.
func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, spd SlotProcessingDetails, incrementTracker func(uint64)) []error { func handleBatchProcess(ctx context.Context, maxWorkers int, bp BatchProcessing, spd SlotProcessingDetails, incrementTracker func(uint64)) []error {
slotsCh := make(chan slotsToProcess) slotsCh := make(chan slotsToProcess)
workCh := make(chan int) workCh := make(chan uint64)
processedCh := make(chan slotsToProcess) processedCh := make(chan slotsToProcess)
errCh := make(chan batchHistoricError) errCh := make(chan batchHistoricError)
finalErrCh := make(chan []error, 1) finalErrCh := make(chan []error, 1)

View File

@ -55,7 +55,7 @@ var _ = Describe("Capturehistoric", func() {
BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0) BeaconNodeTester.runHistoricalProcess(bc, 2, 1, 0, 0, 0)
validateSlot(bc, BeaconNodeTester.TestEvents["0"].HeadMessage, 0, "proposed") validateSlot(bc, BeaconNodeTester.TestEvents["0"].HeadMessage, 0, "proposed")
if bc.PerformBeaconBlockProcessing { if bc.PerformBeaconBlockProcessing {
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["0"].HeadMessage, BeaconNodeTester.TestEvents["0"].CorrectParentRoot, BeaconNodeTester.TestEvents["0"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["0"].CorrectSignedBeaconBlockMhKey) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["0"].HeadMessage, BeaconNodeTester.TestEvents["0"].CorrectParentRoot, BeaconNodeTester.TestEvents["0"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["0"].CorrectSignedBeaconBlockMhKey, BeaconNodeTester.TestEvents["0"].CorrectExecutionPayloadHeader)
} }
if bc.PerformBeaconStateProcessing { if bc.PerformBeaconStateProcessing {
validateBeaconState(bc, BeaconNodeTester.TestEvents["0"].HeadMessage, BeaconNodeTester.TestEvents["0"].CorrectBeaconStateMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["0"].HeadMessage, BeaconNodeTester.TestEvents["0"].CorrectBeaconStateMhKey)
@ -273,7 +273,7 @@ func validateMetrics(bc *beaconclient.BeaconClient, expectedInserts, expectedReo
func validatePopularBatchBlocks(bc *beaconclient.BeaconClient) { func validatePopularBatchBlocks(bc *beaconclient.BeaconClient) {
validateSlot(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, "proposed") validateSlot(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, "proposed")
if bc.PerformBeaconBlockProcessing { if bc.PerformBeaconBlockProcessing {
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectParentRoot, BeaconNodeTester.TestEvents["100"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["100"].CorrectSignedBeaconBlockMhKey, BeaconNodeTester.TestEvents["100"].CorrectExecutionPayloadHeader)
} }
if bc.PerformBeaconStateProcessing { if bc.PerformBeaconStateProcessing {
validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, BeaconNodeTester.TestEvents["100"].CorrectBeaconStateMhKey)
@ -281,7 +281,7 @@ func validatePopularBatchBlocks(bc *beaconclient.BeaconClient) {
validateSlot(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, "proposed") validateSlot(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, 3, "proposed")
if bc.PerformBeaconBlockProcessing { if bc.PerformBeaconBlockProcessing {
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, BeaconNodeTester.TestEvents["100"].HeadMessage.Block, BeaconNodeTester.TestEvents["101"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["101"].CorrectSignedBeaconBlockMhKey) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, BeaconNodeTester.TestEvents["100"].HeadMessage.Block, BeaconNodeTester.TestEvents["101"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["101"].CorrectSignedBeaconBlockMhKey, BeaconNodeTester.TestEvents["101"].CorrectExecutionPayloadHeader)
} }
if bc.PerformBeaconStateProcessing { if bc.PerformBeaconStateProcessing {
validateBeaconState(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, BeaconNodeTester.TestEvents["101"].CorrectBeaconStateMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, BeaconNodeTester.TestEvents["101"].CorrectBeaconStateMhKey)
@ -289,7 +289,7 @@ func validatePopularBatchBlocks(bc *beaconclient.BeaconClient) {
validateSlot(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, "proposed") validateSlot(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240, "proposed")
if bc.PerformBeaconBlockProcessing { if bc.PerformBeaconBlockProcessing {
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey) validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectParentRoot, BeaconNodeTester.TestEvents["2375703"].CorrectEth1DataBlockHash, BeaconNodeTester.TestEvents["2375703"].CorrectSignedBeaconBlockMhKey, BeaconNodeTester.TestEvents["2375703"].CorrectExecutionPayloadHeader)
} }
if bc.PerformBeaconStateProcessing { if bc.PerformBeaconStateProcessing {
validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey) validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, BeaconNodeTester.TestEvents["2375703"].CorrectBeaconStateMhKey)

View File

@ -100,7 +100,7 @@ type DatabaseWriter struct {
rawSignedBeaconBlock *[]byte rawSignedBeaconBlock *[]byte
} }
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, func CreateDatabaseWrite(db sql.Database, slot uint64, stateRoot string, blockRoot string, parentBlockRoot string,
eth1DataBlockHash string, payloadHeader *ExecutionPayloadHeader, status string, rawSignedBeaconBlock *[]byte, rawBeaconState *[]byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) { eth1DataBlockHash string, payloadHeader *ExecutionPayloadHeader, status string, rawSignedBeaconBlock *[]byte, rawBeaconState *[]byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) {
ctx := context.Background() ctx := context.Background()
tx, err := db.Begin(ctx) tx, err := db.Begin(ctx)
@ -130,10 +130,10 @@ func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot
// Write functions to write each all together... // Write functions to write each all together...
// Should I do one atomic write? // Should I do one atomic write?
// Create the model for the eth_beacon.slots table // Create the model for the eth_beacon.slots table
func (dw *DatabaseWriter) prepareSlotsModel(slot int, stateRoot string, blockRoot string, status string) { func (dw *DatabaseWriter) prepareSlotsModel(slot uint64, stateRoot string, blockRoot string, status string) {
dw.DbSlots = &DbSlots{ dw.DbSlots = &DbSlots{
Epoch: calculateEpoch(slot, bcSlotsPerEpoch), Epoch: calculateEpoch(slot, bcSlotsPerEpoch),
Slot: strconv.Itoa(slot), Slot: strconv.FormatUint((slot), 10),
StateRoot: stateRoot, StateRoot: stateRoot,
BlockRoot: blockRoot, BlockRoot: blockRoot,
Status: status, Status: status,
@ -143,14 +143,14 @@ func (dw *DatabaseWriter) prepareSlotsModel(slot int, stateRoot string, blockRoo
} }
// Create the model for the eth_beacon.signed_block table. // Create the model for the eth_beacon.signed_block table.
func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot int, blockRoot string, parentBlockRoot string, eth1DataBlockHash string, func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot uint64, blockRoot string, parentBlockRoot string, eth1DataBlockHash string,
payloadHeader *ExecutionPayloadHeader) error { payloadHeader *ExecutionPayloadHeader) error {
mhKey, err := MultihashKeyFromSSZRoot([]byte(dw.DbSlots.BlockRoot)) mhKey, err := MultihashKeyFromSSZRoot([]byte(dw.DbSlots.BlockRoot))
if err != nil { if err != nil {
return err return err
} }
dw.DbSignedBeaconBlock = &DbSignedBeaconBlock{ dw.DbSignedBeaconBlock = &DbSignedBeaconBlock{
Slot: strconv.Itoa(slot), Slot: slot,
BlockRoot: blockRoot, BlockRoot: blockRoot,
ParentBlock: parentBlockRoot, ParentBlock: parentBlockRoot,
Eth1DataBlockHash: eth1DataBlockHash, Eth1DataBlockHash: eth1DataBlockHash,
@ -175,13 +175,13 @@ func (dw *DatabaseWriter) prepareSignedBeaconBlockModel(slot int, blockRoot stri
} }
// Create the model for the eth_beacon.state table. // Create the model for the eth_beacon.state table.
func (dw *DatabaseWriter) prepareBeaconStateModel(slot int, stateRoot string) error { func (dw *DatabaseWriter) prepareBeaconStateModel(slot uint64, stateRoot string) error {
mhKey, err := MultihashKeyFromSSZRoot([]byte(dw.DbSlots.StateRoot)) mhKey, err := MultihashKeyFromSSZRoot([]byte(dw.DbSlots.StateRoot))
if err != nil { if err != nil {
return err return err
} }
dw.DbBeaconState = &DbBeaconState{ dw.DbBeaconState = &DbBeaconState{
Slot: strconv.Itoa(slot), Slot: slot,
StateRoot: stateRoot, StateRoot: stateRoot,
MhKey: mhKey, MhKey: mhKey,
} }
@ -344,7 +344,7 @@ func (dw *DatabaseWriter) upsertBeaconState() error {
// Update a given slot to be marked as forked within a transaction. Provide the slot and the latest latestBlockRoot. // Update a given slot to be marked as forked within a transaction. Provide the slot and the latest latestBlockRoot.
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked. // We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
func transactReorgs(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) { func transactReorgs(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
slotNum, strErr := strconv.Atoi(slot) slotNum, strErr := strconv.ParseUint(slot, 10, 64)
if strErr != nil { if strErr != nil {
loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...") loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...")
} }
@ -440,17 +440,17 @@ func updateProposed(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot
// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaps into // A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaps into
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would // smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
// have 10 entries as follows: 1-10, 11-20, etc... // have 10 entries as follows: 1-10, 11-20, etc...
func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) { func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, startSlot uint64, endSlot uint64, entryError error, entryProcess string, metric *BeaconClientMetrics) {
var entryErrorMsg string var entryErrorMsg string
if entryError == nil { if entryError == nil {
entryErrorMsg = "" entryErrorMsg = ""
} else { } else {
entryErrorMsg = entryError.Error() entryErrorMsg = entryError.Error()
} }
if endSlot-startSlot <= tableIncrement { if endSlot-startSlot <= uint64(tableIncrement) {
kgModel := DbKnownGaps{ kgModel := DbKnownGaps{
StartSlot: strconv.Itoa(startSlot), StartSlot: strconv.FormatUint(startSlot, 10),
EndSlot: strconv.Itoa(endSlot), EndSlot: strconv.FormatUint(endSlot, 10),
CheckedOut: false, CheckedOut: false,
ReprocessingError: "", ReprocessingError: "",
EntryError: entryErrorMsg, EntryError: entryErrorMsg,
@ -460,22 +460,22 @@ func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, start
} else { } else {
totalSlots := endSlot - startSlot totalSlots := endSlot - startSlot
var chunks int var chunks int
chunks = totalSlots / tableIncrement chunks = int(totalSlots / uint64(tableIncrement))
if totalSlots%tableIncrement != 0 { if totalSlots%uint64(tableIncrement) != 0 {
chunks = chunks + 1 chunks = chunks + 1
} }
for i := 0; i < chunks; i++ { for i := 0; i < chunks; i++ {
var tempStart, tempEnd int var tempStart, tempEnd uint64
tempStart = startSlot + (i * tableIncrement) tempStart = startSlot + (uint64(i * tableIncrement))
if i+1 == chunks { if i+1 == chunks {
tempEnd = endSlot tempEnd = endSlot
} else { } else {
tempEnd = startSlot + ((i + 1) * tableIncrement) tempEnd = startSlot + uint64((i+1)*tableIncrement)
} }
kgModel := DbKnownGaps{ kgModel := DbKnownGaps{
StartSlot: strconv.Itoa(tempStart), StartSlot: strconv.FormatUint(tempStart, 10),
EndSlot: strconv.Itoa(tempEnd), EndSlot: strconv.FormatUint(tempEnd, 10),
CheckedOut: false, CheckedOut: false,
ReprocessingError: "", ReprocessingError: "",
EntryError: entryErrorMsg, EntryError: entryErrorMsg,
@ -488,11 +488,11 @@ func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, start
// Wrapper function, instead of adding the knownGaps entries to a transaction, it will // Wrapper function, instead of adding the knownGaps entries to a transaction, it will
// create the transaction and write it. // create the transaction and write it.
func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) { func writeKnownGaps(db sql.Database, tableIncrement int, startSlot uint64, endSlot uint64, entryError error, entryProcess string, metric *BeaconClientMetrics) {
ctx := context.Background() ctx := context.Background()
tx, err := db.Begin(ctx) tx, err := db.Begin(ctx)
if err != nil { if err != nil {
loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Fatal("Unable to create a new transaction for knownGaps") loghelper.LogSlotRangeError(strconv.FormatUint(startSlot, 10), strconv.FormatUint(endSlot, 10), err).Fatal("Unable to create a new transaction for knownGaps")
} }
defer func() { defer func() {
err := tx.Rollback(ctx) err := tx.Rollback(ctx)
@ -502,7 +502,8 @@ func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot
}() }()
transactKnownGaps(tx, ctx, tableIncrement, startSlot, endSlot, entryError, entryProcess, metric) transactKnownGaps(tx, ctx, tableIncrement, startSlot, endSlot, entryError, entryProcess, metric)
if err = tx.Commit(ctx); err != nil { if err = tx.Commit(ctx); err != nil {
loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Fatal("Unable to execute the transaction for knownGaps") loghelper.LogSlotRangeError(strconv.FormatUint(startSlot, 10), strconv.FormatUint(endSlot, 10), err).Fatal("Unable to execute the transaction for knownGaps")
} }
} }
@ -525,8 +526,8 @@ func upsertKnownGaps(tx sql.Tx, ctx context.Context, knModel DbKnownGaps, metric
} }
// A function to write the gap between the highest slot in the DB and the first processed slot. // A function to write the gap between the highest slot in the DB and the first processed slot.
func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int, metric *BeaconClientMetrics) { func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot uint64, metric *BeaconClientMetrics) {
var maxSlot int var maxSlot uint64
err := db.QueryRow(context.Background(), QueryHighestSlotStmt).Scan(&maxSlot) err := db.QueryRow(context.Background(), QueryHighestSlotStmt).Scan(&maxSlot)
if err != nil { if err != nil {
loghelper.LogError(err).Fatal("Unable to get the max block from the DB. We must close the application or we might have undetected gaps.") loghelper.LogError(err).Fatal("Unable to get the max block from the DB. We must close the application or we might have undetected gaps.")
@ -554,19 +555,19 @@ func writeStartUpGaps(db sql.Database, tableIncrement int, firstSlot int, metric
} }
// A function to update a knownGap range with a reprocessing error. // A function to update a knownGap range with a reprocessing error.
func updateKnownGapErrors(db sql.Database, startSlot int, endSlot int, reprocessingErr error, metric *BeaconClientMetrics) error { func updateKnownGapErrors(db sql.Database, startSlot uint64, endSlot uint64, reprocessingErr error, metric *BeaconClientMetrics) error {
res, err := db.Exec(context.Background(), UpsertKnownGapsErrorStmt, startSlot, endSlot, reprocessingErr.Error()) res, err := db.Exec(context.Background(), UpsertKnownGapsErrorStmt, startSlot, endSlot, reprocessingErr.Error())
if err != nil { if err != nil {
loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Error("Unable to update reprocessing_error") loghelper.LogSlotRangeError(strconv.FormatUint(startSlot, 10), strconv.FormatUint(endSlot, 10), err).Error("Unable to update reprocessing_error")
return err return err
} }
row, err := res.RowsAffected() row, err := res.RowsAffected()
if err != nil { if err != nil {
loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Error("Unable to count rows affected when trying to update reprocessing_error.") loghelper.LogSlotRangeError(strconv.FormatUint(startSlot, 10), strconv.FormatUint(endSlot, 10), err).Error("Unable to count rows affected when trying to update reprocessing_error.")
return err return err
} }
if row != 1 { if row != 1 {
loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).WithFields(log.Fields{ loghelper.LogSlotRangeError(strconv.FormatUint(startSlot, 10), strconv.FormatUint(endSlot, 10), err).WithFields(log.Fields{
"rowCount": row, "rowCount": row,
}).Error("The rows affected by the upsert for reprocessing_error is not 1.") }).Error("The rows affected by the upsert for reprocessing_error is not 1.")
metric.IncrementKnownGapsReprocessError(1) metric.IncrementKnownGapsReprocessError(1)
@ -577,9 +578,9 @@ func updateKnownGapErrors(db sql.Database, startSlot int, endSlot int, reprocess
} }
// A quick helper function to calculate the epoch. // A quick helper function to calculate the epoch.
func calculateEpoch(slot int, slotPerEpoch int) string { func calculateEpoch(slot uint64, slotPerEpoch uint64) string {
epoch := slot / slotPerEpoch epoch := slot / slotPerEpoch
return strconv.Itoa(epoch) return strconv.FormatUint(epoch, 10)
} }
// A helper function to check to see if the slot is processed. // A helper function to check to see if the slot is processed.

View File

@ -73,7 +73,7 @@ type DbExecutionPayloadHeader struct {
// A struct to capture whats being written to eth-beacon.signed_block table. // A struct to capture whats being written to eth-beacon.signed_block table.
type DbSignedBeaconBlock struct { type DbSignedBeaconBlock struct {
Slot string // The slot. Slot uint64 // The slot.
BlockRoot string // The block root BlockRoot string // The block root
ParentBlock string // The parent block root. ParentBlock string // The parent block root.
Eth1DataBlockHash string // The eth1 block_hash Eth1DataBlockHash string // The eth1 block_hash
@ -83,7 +83,7 @@ type DbSignedBeaconBlock struct {
// A struct to capture whats being written to eth-beacon.state table. // A struct to capture whats being written to eth-beacon.state table.
type DbBeaconState struct { type DbBeaconState struct {
Slot string // The slot. Slot uint64 // The slot.
StateRoot string // The state root StateRoot string // The state root
MhKey string // The ipld multihash key. MhKey string // The ipld multihash key.
} }

View File

@ -42,7 +42,7 @@ func (bc *BeaconClient) handleHead() {
for { for {
head := <-bc.HeadTracking.ProcessCh head := <-bc.HeadTracking.ProcessCh
// Process all the work here. // Process all the work here.
slot, err := strconv.Atoi(head.Slot) slot, err := strconv.ParseUint(head.Slot, 10, 64)
if err != nil { if err != nil {
bc.HeadTracking.ErrorCh <- &SseError{ bc.HeadTracking.ErrorCh <- &SseError{
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot), err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),

View File

@ -74,7 +74,7 @@ func (hp HistoricProcessing) handleProcessingErrors(ctx context.Context, errMess
case <-ctx.Done(): case <-ctx.Done():
return return
case errMs := <-errMessages: case errMs := <-errMessages:
loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err) loghelper.LogSlotError(strconv.FormatUint(errMs.slot, 10), errMs.err)
writeKnownGaps(hp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, hp.metrics) writeKnownGaps(hp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, hp.metrics)
} }
} }
@ -97,7 +97,7 @@ func (hp HistoricProcessing) releaseDbLocks() error {
} }
// Process the slot range. // Process the slot range.
func processSlotRangeWorker(ctx context.Context, workCh <-chan int, errCh chan<- batchHistoricError, spd SlotProcessingDetails, incrementTracker func(uint64)) { func processSlotRangeWorker(ctx context.Context, workCh <-chan uint64, errCh chan<- batchHistoricError, spd SlotProcessingDetails, incrementTracker func(uint64)) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -178,7 +178,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
break break
} }
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), getStartEndSlotStmt, err).Error("Unable to get a row") loghelper.LogSlotRangeStatementError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), getStartEndSlotStmt, err).Error("Unable to get a row")
errCount = append(errCount, err) errCount = append(errCount, err)
break break
} }
@ -186,25 +186,25 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
// Checkout the Row // Checkout the Row
res, err := tx.Exec(dbCtx, checkOutRowStmt, sp.startSlot, sp.endSlot, uniqueNodeIdentifier) res, err := tx.Exec(dbCtx, checkOutRowStmt, sp.startSlot, sp.endSlot, uniqueNodeIdentifier)
if err != nil { if err != nil {
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).Error("Unable to checkout the row") loghelper.LogSlotRangeStatementError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), checkOutRowStmt, err).Error("Unable to checkout the row")
errCount = append(errCount, err) errCount = append(errCount, err)
break break
} }
rows, err := res.RowsAffected() rows, err := res.RowsAffected()
if err != nil { if err != nil {
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, fmt.Errorf("Unable to determine the rows affected when trying to checkout a row.")) loghelper.LogSlotRangeStatementError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), checkOutRowStmt, fmt.Errorf("Unable to determine the rows affected when trying to checkout a row."))
errCount = append(errCount, err) errCount = append(errCount, err)
break break
} }
if rows > 1 { if rows > 1 {
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).WithFields(log.Fields{ loghelper.LogSlotRangeStatementError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), checkOutRowStmt, err).WithFields(log.Fields{
"rowsReturn": rows, "rowsReturn": rows,
}).Error("We locked too many rows.....") }).Error("We locked too many rows.....")
errCount = append(errCount, err) errCount = append(errCount, err)
break break
} }
if rows == 0 { if rows == 0 {
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).WithFields(log.Fields{ loghelper.LogSlotRangeStatementError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), checkOutRowStmt, err).WithFields(log.Fields{
"rowsReturn": rows, "rowsReturn": rows,
}).Error("We did not lock a single row.") }).Error("We did not lock a single row.")
errCount = append(errCount, err) errCount = append(errCount, err)
@ -212,7 +212,7 @@ func getBatchProcessRow(ctx context.Context, db sql.Database, getStartEndSlotStm
} }
err = tx.Commit(dbCtx) err = tx.Commit(dbCtx)
if err != nil { if err != nil {
loghelper.LogSlotRangeError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), err).Error("Unable commit transactions.") loghelper.LogSlotRangeError(strconv.FormatUint(sp.startSlot, 10), strconv.FormatUint(sp.endSlot, 10), err).Error("Unable commit transactions.")
errCount = append(errCount, err) errCount = append(errCount, err)
break break
} }
@ -241,11 +241,11 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan
"endSlot": slots.endSlot, "endSlot": slots.endSlot,
}).Debug("Starting to check to see if the following slots have been processed") }).Debug("Starting to check to see if the following slots have been processed")
for { for {
isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot)) isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.FormatUint(slots.startSlot, 10))
if err != nil { if err != nil {
errCh <- err errCh <- err
} }
isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.endSlot)) isEndProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.FormatUint(slots.endSlot, 10))
if err != nil { if err != nil {
errCh <- err errCh <- err
} }
@ -255,7 +255,7 @@ func removeRowPostProcess(ctx context.Context, db sql.Database, processCh <-chan
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
} }
_, err := db.Exec(context.Background(), removeStmt, strconv.Itoa(slots.startSlot), strconv.Itoa(slots.endSlot)) _, err := db.Exec(context.Background(), removeStmt, strconv.FormatUint(slots.startSlot, 10), strconv.FormatUint(slots.endSlot, 10))
if err != nil { if err != nil {
errCh <- err errCh <- err
} }

View File

@ -97,21 +97,21 @@ func (kgp KnownGapsProcessing) handleProcessingErrors(ctx context.Context, errMe
// Check to see if this if this entry already exists. // Check to see if this if this entry already exists.
res, err := kgp.db.Exec(context.Background(), checkKgSingleSlotStmt, errMs.slot, errMs.slot) res, err := kgp.db.Exec(context.Background(), checkKgSingleSlotStmt, errMs.slot, errMs.slot)
if err != nil { if err != nil {
loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).Error("Unable to see if this slot is in the eth_beacon.known_gaps table") loghelper.LogSlotError(strconv.FormatUint(errMs.slot, 10), err).Error("Unable to see if this slot is in the eth_beacon.known_gaps table")
} }
rows, err := res.RowsAffected() rows, err := res.RowsAffected()
if err != nil { if err != nil {
loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).WithFields(log.Fields{ loghelper.LogSlotError(strconv.FormatUint(errMs.slot, 10), err).WithFields(log.Fields{
"queryStatement": checkKgSingleSlotStmt, "queryStatement": checkKgSingleSlotStmt,
}).Error("Unable to get the number of rows affected by this statement.") }).Error("Unable to get the number of rows affected by this statement.")
} }
if rows > 0 { if rows > 0 {
loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err).Error("We received an error when processing a knownGap") loghelper.LogSlotError(strconv.FormatUint(errMs.slot, 10), errMs.err).Error("We received an error when processing a knownGap")
err = updateKnownGapErrors(kgp.db, errMs.slot, errMs.slot, errMs.err, kgp.metrics) err = updateKnownGapErrors(kgp.db, errMs.slot, errMs.slot, errMs.err, kgp.metrics)
if err != nil { if err != nil {
loghelper.LogSlotError(strconv.Itoa(errMs.slot), err).Error("Error processing known gap") loghelper.LogSlotError(strconv.FormatUint(errMs.slot, 10), err).Error("Error processing known gap")
} }
} else { } else {
writeKnownGaps(kgp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, kgp.metrics) writeKnownGaps(kgp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, kgp.metrics)

View File

@ -45,8 +45,8 @@ type SlotProcessingDetails struct {
PerformBeaconStateProcessing bool // Should we process BeaconStates? PerformBeaconStateProcessing bool // Should we process BeaconStates?
PerformBeaconBlockProcessing bool // Should we process BeaconBlocks? PerformBeaconBlockProcessing bool // Should we process BeaconBlocks?
StartingSlot int // If we're performing head tracking. What is the first slot we processed. StartingSlot uint64 // If we're performing head tracking. What is the first slot we processed.
PreviousSlot int // Whats the previous slot we processed PreviousSlot uint64 // Whats the previous slot we processed
PreviousBlockRoot string // Whats the previous block root, used to check the next blocks parent. PreviousBlockRoot string // Whats the previous block root, used to check the next blocks parent.
} }
@ -71,8 +71,8 @@ func (bc *BeaconClient) SlotProcessingDetails() SlotProcessingDetails {
type ProcessSlot struct { type ProcessSlot struct {
// Generic // Generic
Slot int // The slot number. Slot uint64 // The slot number.
Epoch int // The epoch number. Epoch uint64 // The epoch number.
BlockRoot string // The hex encoded string of the BlockRoot. BlockRoot string // The hex encoded string of the BlockRoot.
StateRoot string // The hex encoded string of the StateRoot. StateRoot string // The hex encoded string of the StateRoot.
ParentBlockRoot string // The hex encoded string of the parent block. ParentBlockRoot string // The hex encoded string of the parent block.
@ -114,10 +114,10 @@ type PerformanceMetrics struct {
// known_gaps table. // known_gaps table.
func processFullSlot( func processFullSlot(
ctx context.Context, ctx context.Context,
slot int, slot uint64,
blockRoot string, blockRoot string,
stateRoot string, stateRoot string,
previousSlot int, previousSlot uint64,
previousBlockRoot string, previousBlockRoot string,
knownGapsTableIncrement int, knownGapsTableIncrement int,
headOrHistoric string, headOrHistoric string,
@ -201,7 +201,7 @@ func processFullSlot(
checkDbTime := time.Now() checkDbTime := time.Now()
var blockRequired bool var blockRequired bool
if spd.PerformBeaconBlockProcessing { if spd.PerformBeaconBlockProcessing {
blockExists, err := checkSlotAndRoot(ps.Db, CheckSignedBeaconBlockStmt, strconv.Itoa(ps.Slot), finalBlockRoot) blockExists, err := checkSlotAndRoot(ps.Db, CheckSignedBeaconBlockStmt, strconv.FormatUint(ps.Slot, 10), finalBlockRoot)
if err != nil { if err != nil {
return err, "checkDb" return err, "checkDb"
} }
@ -210,7 +210,7 @@ func processFullSlot(
var stateRequired bool var stateRequired bool
if spd.PerformBeaconStateProcessing { if spd.PerformBeaconStateProcessing {
stateExists, err := checkSlotAndRoot(ps.Db, CheckBeaconStateStmt, strconv.Itoa(ps.Slot), finalStateRoot) stateExists, err := checkSlotAndRoot(ps.Db, CheckBeaconStateStmt, strconv.FormatUint(ps.Slot, 10), finalStateRoot)
if err != nil { if err != nil {
return err, "checkDb" return err, "checkDb"
} }
@ -280,7 +280,7 @@ func processFullSlot(
} }
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func processHeadSlot(slot int, blockRoot string, stateRoot string, spd SlotProcessingDetails) { func processHeadSlot(slot uint64, blockRoot string, stateRoot string, spd SlotProcessingDetails) {
// Get the knownGaps at startUp // Get the knownGaps at startUp
if spd.PreviousSlot == 0 && spd.PreviousBlockRoot == "" { if spd.PreviousSlot == 0 && spd.PreviousBlockRoot == "" {
writeStartUpGaps(spd.Db, spd.KnownGapTableIncrement, slot, spd.Metrics) writeStartUpGaps(spd.Db, spd.KnownGapTableIncrement, slot, spd.Metrics)
@ -294,7 +294,7 @@ func processHeadSlot(slot int, blockRoot string, stateRoot string, spd SlotProce
} }
// Handle a historic slot. A wrapper function for calling `handleFullSlot`. // Handle a historic slot. A wrapper function for calling `handleFullSlot`.
func handleHistoricSlot(ctx context.Context, slot int, spd SlotProcessingDetails) (error, string) { func handleHistoricSlot(ctx context.Context, slot uint64, spd SlotProcessingDetails) (error, string) {
return processFullSlot(ctx, slot, "", "", 0, "", return processFullSlot(ctx, slot, "", "", 0, "",
1, "historic", &spd) 1, "historic", &spd)
} }
@ -305,14 +305,14 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error {
if ps.BlockRoot != "" { if ps.BlockRoot != "" {
blockIdentifier = ps.BlockRoot blockIdentifier = ps.BlockRoot
} else { } else {
blockIdentifier = strconv.Itoa(ps.Slot) blockIdentifier = strconv.FormatUint(ps.Slot, 10)
} }
blockEndpoint := serverAddress + BcBlockQueryEndpoint + blockIdentifier blockEndpoint := serverAddress + BcBlockQueryEndpoint + blockIdentifier
sszSignedBeaconBlock, rc, err := querySsz(blockEndpoint, strconv.Itoa(ps.Slot)) sszSignedBeaconBlock, rc, err := querySsz(blockEndpoint, strconv.FormatUint(ps.Slot, 10))
if err != nil || rc != 200 { if err != nil || rc != 200 {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error("Unable to properly query the slot.") loghelper.LogSlotError(strconv.FormatUint(ps.Slot, 10), err).Error("Unable to properly query the slot.")
ps.FullSignedBeaconBlock = nil ps.FullSignedBeaconBlock = nil
ps.SszSignedBeaconBlock = []byte{} ps.SszSignedBeaconBlock = []byte{}
ps.ParentBlockRoot = "" ps.ParentBlockRoot = ""
@ -328,7 +328,7 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error {
var signedBeaconBlock SignedBeaconBlock var signedBeaconBlock SignedBeaconBlock
err = signedBeaconBlock.UnmarshalSSZ(sszSignedBeaconBlock) err = signedBeaconBlock.UnmarshalSSZ(sszSignedBeaconBlock)
if err != nil { if err != nil {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error("Unable to unmarshal SignedBeaconBlock for slot.") loghelper.LogSlotError(strconv.FormatUint(ps.Slot, 10), err).Error("Unable to unmarshal SignedBeaconBlock for slot.")
ps.FullSignedBeaconBlock = nil ps.FullSignedBeaconBlock = nil
ps.SszSignedBeaconBlock = []byte{} ps.SszSignedBeaconBlock = []byte{}
ps.ParentBlockRoot = "" ps.ParentBlockRoot = ""
@ -349,20 +349,20 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string) error {
if ps.StateRoot != "" { if ps.StateRoot != "" {
stateIdentifier = ps.StateRoot stateIdentifier = ps.StateRoot
} else { } else {
stateIdentifier = strconv.Itoa(ps.Slot) stateIdentifier = strconv.FormatUint(ps.Slot, 10)
} }
stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier stateEndpoint := serverEndpoint + BcStateQueryEndpoint + stateIdentifier
sszBeaconState, _, err := querySsz(stateEndpoint, strconv.Itoa(ps.Slot)) sszBeaconState, _, err := querySsz(stateEndpoint, strconv.FormatUint(ps.Slot, 10))
if err != nil { if err != nil {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error("Unable to properly query the BeaconState.") loghelper.LogSlotError(strconv.FormatUint(ps.Slot, 10), err).Error("Unable to properly query the BeaconState.")
return err return err
} }
var beaconState BeaconState var beaconState BeaconState
err = beaconState.UnmarshalSSZ(sszBeaconState) err = beaconState.UnmarshalSSZ(sszBeaconState)
if err != nil { if err != nil {
loghelper.LogSlotError(strconv.Itoa(ps.Slot), err).Error("Unable to unmarshal the BeaconState.") loghelper.LogSlotError(strconv.FormatUint(ps.Slot, 10), err).Error("Unable to unmarshal the BeaconState.")
return err return err
} }
@ -372,7 +372,7 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string) error {
} }
// Check to make sure that the previous block we processed is the parent of the current block. // Check to make sure that the previous block we processed is the parent of the current block.
func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) { func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot uint64, previousBlockRoot string, knownGapsTableIncrement int) {
if nil == ps.FullSignedBeaconBlock { if nil == ps.FullSignedBeaconBlock {
log.Debug("Can't check block root, no current block.") log.Debug("Can't check block root, no current block.")
return return
@ -384,7 +384,7 @@ func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previou
"slot": slot, "slot": slot,
"fork": true, "fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.") }).Warn("A fork occurred! The previous slot and current slot match.")
transactReorgs(tx, ctx, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) transactReorgs(tx, ctx, strconv.FormatUint(ps.Slot, 10), ps.BlockRoot, ps.Metrics)
} else if previousSlot > slot { } else if previousSlot > slot {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"previousSlot": previousSlot, "previousSlot": previousSlot,
@ -401,7 +401,7 @@ func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previou
"previousBlockRoot": previousBlockRoot, "previousBlockRoot": previousBlockRoot,
"currentBlockParent": parentRoot, "currentBlockParent": parentRoot,
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.") }).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
transactReorgs(tx, ctx, strconv.Itoa(previousSlot), parentRoot, ps.Metrics) transactReorgs(tx, ctx, strconv.FormatUint(previousSlot, 10), parentRoot, ps.Metrics)
} else { } else {
log.Debug("Previous Slot and Current Slot are one distance from each other.") log.Debug("Previous Slot and Current Slot are one distance from each other.")
} }