Feature/22 test handling incoming events #30

Merged
abdulrabbani00 merged 11 commits from feature/22-test-handling-incoming-events into develop 2022-05-12 13:52:14 +00:00
7 changed files with 329 additions and 225 deletions
Showing only changes of commit 36b9c1f25b - Show all commits

2
.gitignore vendored
View File

@ -4,4 +4,4 @@ ipld-ethcl-indexer.log
report.json report.json
cover.profile cover.profile
temp/* temp/*
/pkg/beaconclient/data/ pkg/beaconclient/ssz-data/

View File

@ -21,10 +21,10 @@ var (
//bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain //bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
) )
// A structure utilized for keeping track of various metrics. // A structure utilized for keeping track of various metrics. Currently, mostly used in testing.
type BeaconClientMetrics struct { type BeaconClientMetrics struct {
HeadTrackingInserts uint64 HeadTrackingInserts uint64 // Number of head events we wrote to the DB.
HeadTrackingReorgs uint64 HeadTrackingReorgs uint64 // The number of reorg events written to the DB.
} }
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying. // A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.

View File

@ -27,12 +27,11 @@ import (
) )
type Message struct { type Message struct {
HeadMessage beaconclient.Head // The head messsage that will be streamed to the BeaconClient HeadMessage beaconclient.Head // The head messsage that will be streamed to the BeaconClient
ReorgMessage beaconclient.ChainReorg // The reorg messsage that will be streamed to the BeaconClient TestNotes string // A small explanation of the purpose this structure plays in the testing landscape.
TestNotes string // A small explanation of the purpose this structure plays in the testing landscape. MimicConfig *MimicConfig // A configuration of parameters that you are trying to
MimicConfig *MimicConfig // A configuration of parameters that you are trying to SignedBeaconBlock string // The file path output of an SSZ encoded SignedBeaconBlock.
SignedBeaconBlock string // The file path output of an SSZ encoded SignedBeaconBlock. BeaconState string // The file path output of an SSZ encoded BeaconState.
BeaconState string // The file path output of an SSZ encoded BeaconState.
} }
// A structure that can be utilized to mimic and existing SSZ object but change it ever so slightly. // A structure that can be utilized to mimic and existing SSZ object but change it ever so slightly.
@ -44,21 +43,23 @@ type MimicConfig struct {
var _ = Describe("Capturehead", func() { var _ = Describe("Capturehead", func() {
var ( var (
address string = "localhost" TestConfig Config
port int = 8080 BeaconNodeTester TestBeaconNode
protocol string = "http" address string = "localhost"
TestEvents map[string]*Message port int = 8080
dbHost string = "localhost" protocol string = "http"
dbPort int = 8077 TestEvents map[string]Message
dbName string = "vulcanize_testing" dbHost string = "localhost"
dbUser string = "vdbm" dbPort int = 8077
dbPassword string = "password" dbName string = "vulcanize_testing"
dbDriver string = "pgx" dbUser string = "vdbm"
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42" dbPassword string = "password"
dbDriver string = "pgx"
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
) )
BeforeEach(func() { BeforeEach(func() {
TestEvents = map[string]*Message{ TestEvents = map[string]Message{
"100-dummy": { "100-dummy": {
HeadMessage: beaconclient.Head{ HeadMessage: beaconclient.Head{
Slot: "100", Slot: "100",
@ -69,10 +70,25 @@ var _ = Describe("Capturehead", func() {
EpochTransition: false, EpochTransition: false,
ExecutionOptimistic: false, ExecutionOptimistic: false,
}, },
TestNotes: "An easy to process Phase 0 block", TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.",
MimicConfig: &MimicConfig{}, MimicConfig: &MimicConfig{},
SignedBeaconBlock: filepath.Join("data", "100", "signed-beacon-block.ssz"), SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("data", "100", "beacon-state.ssz"), BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
},
"100-dummy-2": {
HeadMessage: beaconclient.Head{
Slot: "100",
Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a9aaaaaaaaaaaa",
State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c072bbbbbbbbbbbb",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.",
MimicConfig: &MimicConfig{},
SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
}, },
"100": { "100": {
HeadMessage: beaconclient.Head{ HeadMessage: beaconclient.Head{
@ -85,8 +101,8 @@ var _ = Describe("Capturehead", func() {
ExecutionOptimistic: false, ExecutionOptimistic: false,
}, },
TestNotes: "An easy to process Phase 0 block", TestNotes: "An easy to process Phase 0 block",
SignedBeaconBlock: filepath.Join("data", "100", "signed-beacon-block.ssz"), SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("data", "100", "beacon-state.ssz"), BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
}, },
"2375703-dummy": { "2375703-dummy": {
HeadMessage: beaconclient.Head{ HeadMessage: beaconclient.Head{
@ -100,8 +116,23 @@ var _ = Describe("Capturehead", func() {
}, },
TestNotes: "This is a dummy message that is used for reorgs", TestNotes: "This is a dummy message that is used for reorgs",
MimicConfig: &MimicConfig{}, MimicConfig: &MimicConfig{},
SignedBeaconBlock: filepath.Join("data", "2375703", "signed-beacon-block.ssz"), SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("data", "2375703", "beacon-state.ssz"), BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
},
"2375703-dummy-2": {
HeadMessage: beaconclient.Head{
Slot: "2375703",
Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508aaaaaaa",
State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d2bbbbbbb",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "This is a dummy message that is used for reorgs",
MimicConfig: &MimicConfig{},
SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
}, },
"2375703": { "2375703": {
HeadMessage: beaconclient.Head{ HeadMessage: beaconclient.Head{
@ -109,190 +140,140 @@ var _ = Describe("Capturehead", func() {
Block: "0x4392372c5f6e39499e31bf924388b5815639103149f0f54f8a453773b1802301", Block: "0x4392372c5f6e39499e31bf924388b5815639103149f0f54f8a453773b1802301",
State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e", State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e",
CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false}, CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false},
ReorgMessage: beaconclient.ChainReorg{},
TestNotes: "An easy to process Altair Block", TestNotes: "An easy to process Altair Block",
SignedBeaconBlock: filepath.Join("data", "2375703", "signed-beacon-block.ssz"), SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("data", "2375703", "beacon-state.ssz"), BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
}, },
} }
TestConfig = Config{
protocol: protocol,
address: address,
port: port,
dummyParentRoot: dummyParentRoot,
dbHost: dbHost,
dbPort: dbPort,
dbName: dbName,
dbUser: dbUser,
dbPassword: dbPassword,
dbDriver: dbDriver,
}
BeaconNodeTester = TestBeaconNode{
TestEvents: TestEvents,
TestConfig: TestConfig,
}
}) })
// We might also want to add an integration test that will actually process a single event, then end. // We might also want to add an integration test that will actually process a single event, then end.
// This will help us know that our models match that actual data being served from the beacon node. // This will help us know that our models match that actual data being served from the beacon node.
Describe("Receiving New Head SSE messages", Label("unit"), func() { Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() {
Context("Correctly formatted Phase0 Block", func() { Context("Correctly formatted Phase0 Block", func() {
It("Should turn it into a struct successfully.", func() { It("Should turn it into a struct successfully.", func() {
BeaconNodeTester.testProcessBlock(BeaconNodeTester.TestEvents["100"].HeadMessage, 3)
BC := setUpTest(TestEvents, protocol, address, port, dummyParentRoot, dbHost, dbPort, dbName, dbUser, dbPassword, dbDriver)
SetupBeaconNodeMock(TestEvents, protocol, address, port, dummyParentRoot)
defer httpmock.Deactivate()
go BC.CaptureHead()
sendHeadMessage(BC, TestEvents["100"].HeadMessage)
epoch, slot, blockRoot, stateRoot, status := queryDbSlotAndBlock(BC.Db, TestEvents["100"].HeadMessage.Slot, TestEvents["100"].HeadMessage.Block)
Expect(slot).To(Equal(100))
Expect(epoch).To(Equal(3))
Expect(blockRoot).To(Equal(TestEvents["100"].HeadMessage.Block))
Expect(stateRoot).To(Equal(TestEvents["100"].HeadMessage.State))
Expect(status).To(Equal("proposed"))
}) })
}) })
Context("Correctly formatted Altair Block", func() { Context("Correctly formatted Altair Block", func() {
It("Should turn it into a struct successfully.", func() { It("Should turn it into a struct successfully.", func() {
BeaconNodeTester.testProcessBlock(BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240)
BC := setUpTest(TestEvents, protocol, address, port, dummyParentRoot, dbHost, dbPort, dbName, dbUser, dbPassword, dbDriver)
SetupBeaconNodeMock(TestEvents, protocol, address, port, dummyParentRoot)
defer httpmock.Deactivate()
go BC.CaptureHead()
sendHeadMessage(BC, TestEvents["2375703"].HeadMessage)
epoch, slot, blockRoot, stateRoot, status := queryDbSlotAndBlock(BC.Db, TestEvents["2375703"].HeadMessage.Slot, TestEvents["2375703"].HeadMessage.Block)
Expect(slot).To(Equal(2375703))
Expect(epoch).To(Equal(74240))
Expect(blockRoot).To(Equal(TestEvents["2375703"].HeadMessage.Block))
Expect(stateRoot).To(Equal(TestEvents["2375703"].HeadMessage.State))
Expect(status).To(Equal("proposed"))
}) })
}) })
//Context("A single incorrectly formatted head message", func() { //Context("A single incorrectly formatted head message", func() {
// It("Should create an error, maybe also add the projected slot to the knownGaps table......") // It("Should create an error, maybe also add the projected slot to the knownGaps table......")
// If it can unmarshal the head add it to knownGaps
//}) //})
//Context("An incorrectly formatted message sandwiched between correctly formatted messages", func() { //Context("An incorrectly formatted message sandwiched between correctly formatted messages", func() {
// It("Should create an error, maybe also add the projected slot to the knownGaps table......") // It("Should create an error, maybe also add the projected slot to the knownGaps table......")
//}) //})
// Context("When there is a skipped slot", func() {
// It("Should indicate that the slot was skipped")
// })
// Context("When the slot is not properly served", func() {
// It("Should return an error, and add the slot to the knownGaps table.")
// })
//})
// Context("With gaps in between head slots", func() {
// It("Should add the slots in between to the knownGaps table")
// })
// Context("With the previousBlockHash not matching the parentBlockHash", func() {
// It("Should recognize the reorg and add the previous slot to knownGaps table.")
// })
// Context("Out of order", func() {
// It("Not sure what it should do....")
// })
}) })
Describe("Receiving New Reorg SSE messages", Label("dry"), func() { Describe("ReOrg Scenario", Label("dry"), func() {
Context("Altair: Reorg slot is already in the DB", func() { Context("Altair: Multiple head messages for the same slot.", func() {
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
BeaconNodeTester.testMultipleHead(TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240)
BC := setUpTest(TestEvents, protocol, address, port, dummyParentRoot, dbHost, dbPort, dbName, dbUser, dbPassword, dbDriver)
SetupBeaconNodeMock(TestEvents, protocol, address, port, dummyParentRoot)
defer httpmock.Deactivate()
go BC.CaptureHead()
log.Info("Sending Altair Messages to BeaconClient")
sendHeadMessage(BC, TestEvents["2375703-dummy"].HeadMessage)
sendHeadMessage(BC, TestEvents["2375703"].HeadMessage)
for atomic.LoadUint64(&BC.Metrics.HeadTrackingReorgs) != 1 {
time.Sleep(1 * time.Second)
}
log.Info("Checking Altair to make sure the fork was marked properly.")
epoch, slot, blockRoot, stateRoot, status := queryDbSlotAndBlock(BC.Db, TestEvents["2375703-dummy"].HeadMessage.Slot, TestEvents["2375703-dummy"].HeadMessage.Block)
Expect(slot).To(Equal(2375703))
Expect(epoch).To(Equal(74240))
Expect(blockRoot).To(Equal(TestEvents["2375703-dummy"].HeadMessage.Block))
Expect(stateRoot).To(Equal(TestEvents["2375703-dummy"].HeadMessage.State))
Expect(status).To(Equal("forked"))
epoch, slot, blockRoot, stateRoot, status = queryDbSlotAndBlock(BC.Db, TestEvents["2375703"].HeadMessage.Slot, TestEvents["2375703"].HeadMessage.Block)
Expect(slot).To(Equal(2375703))
Expect(epoch).To(Equal(74240))
Expect(blockRoot).To(Equal(TestEvents["2375703"].HeadMessage.Block))
Expect(stateRoot).To(Equal(TestEvents["2375703"].HeadMessage.State))
Expect(status).To(Equal("proposed"))
}) })
}) })
Context("Phase0: Reorg slot is already in the DB", func() { Context("Phase0: Multiple head messages for the same slot.", func() {
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
BeaconNodeTester.testMultipleHead(TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3)
BC := setUpTest(TestEvents, protocol, address, port, dummyParentRoot, dbHost, dbPort, dbName, dbUser, dbPassword, dbDriver) })
SetupBeaconNodeMock(TestEvents, protocol, address, port, dummyParentRoot) })
defer httpmock.Deactivate() Context("Phase 0: Multiple reorgs have occurred on this slot", Label("new"), func() {
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
go BC.CaptureHead() BeaconNodeTester.testMultipleReorgs(TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3)
log.Info("Sending Phase0 Messages to BeaconClient") })
sendHeadMessage(BC, TestEvents["100-dummy"].HeadMessage) })
sendHeadMessage(BC, TestEvents["100"].HeadMessage) Context("Altair: Multiple reorgs have occurred on this slot", Label("new"), func() {
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
for atomic.LoadUint64(&BC.Metrics.HeadTrackingReorgs) != 1 { BeaconNodeTester.testMultipleReorgs(TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240)
time.Sleep(1 * time.Second)
}
log.Info("Checking Phase0 to make sure the fork was marked properly.")
epoch, slot, blockRoot, stateRoot, status := queryDbSlotAndBlock(BC.Db, TestEvents["100-dummy"].HeadMessage.Slot, TestEvents["100-dummy"].HeadMessage.Block)
Expect(slot).To(Equal(100))
Expect(epoch).To(Equal(3))
Expect(blockRoot).To(Equal(TestEvents["100-dummy"].HeadMessage.Block))
Expect(stateRoot).To(Equal(TestEvents["100-dummy"].HeadMessage.State))
Expect(status).To(Equal("forked"))
epoch, slot, blockRoot, stateRoot, status = queryDbSlotAndBlock(BC.Db, TestEvents["100"].HeadMessage.Slot, TestEvents["100"].HeadMessage.Block)
Expect(slot).To(Equal(100))
Expect(epoch).To(Equal(3))
Expect(blockRoot).To(Equal(TestEvents["100"].HeadMessage.Block))
Expect(stateRoot).To(Equal(TestEvents["100"].HeadMessage.State))
Expect(status).To(Equal("proposed"))
}) })
}) })
//Context("Multiple reorgs have occurred on this slot", func() {
// It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.")
//})
//Context("Reorg slot in not already in the DB", func() { //Context("Reorg slot in not already in the DB", func() {
// It("Should simply have the correct slot in the DB.") // It("Should simply have the correct slot in the DB.")
// Add to knowngaps
//}) //})
}) })
//Describe("Querying SignedBeaconBlock and Beacon State", Label("unit"), func() {
// Context("When the slot is properly served by the beacon node", func() {
// It("Should provide a successful response.")
// })
// Context("When there is a skipped slot", func() {
// It("Should indicate that the slot was skipped")
// // Future use case.
// })
// Context("When the slot is not properly served", func() {
// It("Should return an error, and add the slot to the knownGaps table.")
// })
//})
//Describe("Receiving properly formatted Head SSE events.", Label("unit"), func() {
// Context("In sequential order", func() {
// It("Should write each event to the DB successfully.")
// })
// Context("With gaps in slots", func() {
// It("Should add the slots in between to the knownGaps table")
// })
// Context("With a repeat slot", func() {
// It("Should recognize the reorg and process it.")
// })
// Context("With the previousBlockHash not matching the parentBlockHash", func() {
// It("Should recognize the reorg and add the previous slot to knownGaps table.")
// })
// Context("Out of order", func() {
// It("Not sure what it should do....")
// })
// Context("With a skipped slot", func() {
// It("Should recognize the slot as skipped and continue without error.")
// // Future use case
// })
//})
}) })
// Must run before each test. We can't use the beforeEach because of the way type Config struct {
// Ginko treats race conditions. protocol string
func setUpTest(testEvents map[string]*Message, protocol string, address string, port int, dummyParentRoot string, address string
dbHost string, dbPort int, dbName string, dbUser string, dbPassword string, dbDriver string) *beaconclient.BeaconClient { port int
dummyParentRoot string
dbHost string
dbPort int
dbName string
dbUser string
dbPassword string
dbDriver string
}
BC := *beaconclient.CreateBeaconClient(context.Background(), protocol, address, port) //////////////////////////////////////////////////////
DB, err := postgres.SetupPostgresDb(dbHost, dbPort, dbName, dbUser, dbPassword, dbDriver) // Helper functions
//////////////////////////////////////////////////////
// Must run before each test. We can't use the beforeEach because of the way
// Gingko treats race conditions.
func setUpTest(config Config) *beaconclient.BeaconClient {
bc := *beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port)
db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
// Drop all records from the DB. // Drop all records from the DB.
clearEthclDbTables(DB) clearEthclDbTables(db)
BC.Db = DB bc.Db = db
return &BC return &bc
}
// A helper function to validate the expected output.
func validateSlot(bc *beaconclient.BeaconClient, headMessage *beaconclient.Head, correctEpoch int, correctStatus string) {
epoch, slot, blockRoot, stateRoot, status := queryDbSlotAndBlock(bc.Db, headMessage.Slot, headMessage.Block)
slot, err := strconv.Atoi(headMessage.Slot)
Expect(err).ToNot(HaveOccurred())
Expect(slot).To(Equal(slot))
Expect(epoch).To(Equal(correctEpoch))
Expect(blockRoot).To(Equal(headMessage.Block))
Expect(stateRoot).To(Equal(headMessage.State))
Expect(status).To(Equal(correctStatus))
} }
// Wrapper function to send a head message to the beaconclient // Wrapper function to send a head message to the beaconclient
@ -334,23 +315,22 @@ func clearEthclDbTables(db sql.Database) {
} }
// An object that is used to aggregate test functions. Test functions are needed because we need to
// run the same tests on multiple blocks for multiple forks. So they save us time.
type TestBeaconNode struct { type TestBeaconNode struct {
TestEvents map[string]*Message TestEvents map[string]Message
TestConfig Config
} }
// Create a new new mock for the beacon node. // Create a new new mock for the beacon node.
func SetupBeaconNodeMock(TestEvents map[string]*Message, protocol string, address string, port int, dummyParentRoot string) { func (tbc TestBeaconNode) SetupBeaconNodeMock(TestEvents map[string]Message, protocol string, address string, port int, dummyParentRoot string) {
httpmock.Activate() httpmock.Activate()
testBC := &TestBeaconNode{
TestEvents: TestEvents,
}
stateUrl := `=~^` + protocol + "://" + address + ":" + strconv.Itoa(port) + beaconclient.BcStateQueryEndpoint + `([^/]+)\z` stateUrl := `=~^` + protocol + "://" + address + ":" + strconv.Itoa(port) + beaconclient.BcStateQueryEndpoint + `([^/]+)\z`
httpmock.RegisterResponder("GET", stateUrl, httpmock.RegisterResponder("GET", stateUrl,
func(req *http.Request) (*http.Response, error) { func(req *http.Request) (*http.Response, error) {
// Get ID from request // Get ID from request
id := httpmock.MustGetSubmatch(req, 1) id := httpmock.MustGetSubmatch(req, 1)
dat, err := testBC.provideSsz(id, "state", dummyParentRoot) dat, err := tbc.provideSsz(id, "state", dummyParentRoot)
if err != nil { if err != nil {
return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), err return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), err
} }
@ -363,7 +343,7 @@ func SetupBeaconNodeMock(TestEvents map[string]*Message, protocol string, addres
func(req *http.Request) (*http.Response, error) { func(req *http.Request) (*http.Response, error) {
// Get ID from request // Get ID from request
id := httpmock.MustGetSubmatch(req, 1) id := httpmock.MustGetSubmatch(req, 1)
dat, err := testBC.provideSsz(id, "block", dummyParentRoot) dat, err := tbc.provideSsz(id, "block", dummyParentRoot)
if err != nil { if err != nil {
return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), err return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), err
} }
@ -375,7 +355,7 @@ func SetupBeaconNodeMock(TestEvents map[string]*Message, protocol string, addres
// A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it. // A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it.
func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string, dummyParentRoot string) ([]byte, error) { func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string, dummyParentRoot string) ([]byte, error) {
var slotFile string var slotFile string
var Message *Message var Message Message
for _, val := range tbc.TestEvents { for _, val := range tbc.TestEvents {
if sszIdentifier == "state" { if sszIdentifier == "state" {
@ -432,7 +412,6 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string
state.Slot = types.Slot(slot) state.Slot = types.Slot(slot)
return state.MarshalSSZ() return state.MarshalSSZ()
} }
} }
if slotFile == "" { if slotFile == "" {
@ -445,3 +424,90 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string
} }
return dat, nil return dat, nil
} }
// Helper function to test three reorg messages. There are going to be many functions like this,
// Because we need to test the same logic for multiple phases.
func (tbc TestBeaconNode) testMultipleReorgs(firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int) {
bc := setUpTest(tbc.TestConfig)
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
go bc.CaptureHead()
time.Sleep(1 * time.Second)
log.Info("Sending Phase0 Messages to BeaconClient")
sendHeadMessage(bc, firstHead)
sendHeadMessage(bc, secondHead)
sendHeadMessage(bc, thirdHead)
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 2 {
time.Sleep(1 * time.Second)
}
log.Info("Checking Phase0 to make sure the fork was marked properly.")
validateSlot(bc, &firstHead, epoch, "forked")
validateSlot(bc, &secondHead, epoch, "forked")
validateSlot(bc, &thirdHead, epoch, "proposed")
log.Info("Send the reorg message.")
data, err := json.Marshal(&beaconclient.ChainReorg{
Slot: firstHead.Slot,
Depth: "1",
OldHeadBlock: thirdHead.Block,
NewHeadBlock: secondHead.Block,
OldHeadState: thirdHead.State,
NewHeadState: secondHead.State,
Epoch: strconv.Itoa(epoch),
ExecutionOptimistic: false,
})
Expect(err).ToNot(HaveOccurred())
bc.ReOrgTracking.MessagesCh <- &sse.Event{
Data: data,
}
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 3 {
time.Sleep(1 * time.Second)
}
log.Info("Make sure the forks were properly updated!")
validateSlot(bc, &firstHead, epoch, "forked")
validateSlot(bc, &secondHead, epoch, "proposed")
validateSlot(bc, &thirdHead, epoch, "forked")
}
// A test to validate a single block was processed correctly
func (tbc TestBeaconNode) testProcessBlock(head beaconclient.Head, epoch int) {
bc := setUpTest(tbc.TestConfig)
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
go bc.CaptureHead()
time.Sleep(1 * time.Second)
sendHeadMessage(bc, head)
validateSlot(bc, &head, epoch, "proposed")
}
// A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly.
func (tbc TestBeaconNode) testMultipleHead(firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int) {
bc := setUpTest(tbc.TestConfig)
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
go bc.CaptureHead()
time.Sleep(1 * time.Second)
sendHeadMessage(bc, firstHead)
sendHeadMessage(bc, secondHead)
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 1 {
time.Sleep(1 * time.Second)
}
log.Info("Checking Altair to make sure the fork was marked properly.")
validateSlot(bc, &firstHead, epoch, "forked")
validateSlot(bc, &secondHead, epoch, "proposed")
}

View File

@ -29,10 +29,17 @@ VALUES ($1, $2, $3) ON CONFLICT (slot, state_root) DO NOTHING`
UpsertBlocksStmt string = ` UpsertBlocksStmt string = `
INSERT INTO public.blocks (key, data) INSERT INTO public.blocks (key, data)
VALUES ($1, $2) ON CONFLICT (key) DO NOTHING` VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
UpdateReorgStmt string = `UPDATE ethcl.slots UpdateForkedStmt string = `UPDATE ethcl.slots
SET status='forked' SET status='forked'
WHERE slot=$1 AND block_root<>$2 WHERE slot=$1 AND block_root<>$2
RETURNING block_root;` RETURNING block_root;`
UpdateProposedStmt string = `UPDATE ethcl.slots
SET status='proposed'
WHERE slot=$1 AND block_root=$2
RETURNING block_root;`
CheckProposedStmt string = `SELECT slot, block_root
FROM ethcl.slots
WHERE slot=$1 AND block_root=$2;`
) )
// Put all functionality to prepare the write object // Put all functionality to prepare the write object
@ -140,7 +147,7 @@ func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) {
func (dw *DatabaseWriter) upsertSignedBeaconBlock() { func (dw *DatabaseWriter) upsertSignedBeaconBlock() {
_, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.MhKey) _, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.MhKey)
if err != nil { if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.signed_beacon_block table") loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table")
} }
} }
@ -160,20 +167,79 @@ func (dw *DatabaseWriter) upsertBeaconState() {
// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot. // Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot.
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked. // We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
func updateReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) (int64, error) { func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
res, err := db.Exec(context.Background(), UpdateReorgStmt, slot, latestBlockRoot) forkCount, err := updateForked(db, slot, latestBlockRoot)
proposedCount, err := updateProposed(db, slot, latestBlockRoot)
if err != nil { if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with reorgs.") loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble processing a reorg.")
// Add to knownGaps Table
}
if forkCount > 0 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"forkCount": forkCount,
}).Info("Updated rows that were forked.")
} else {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"forkCount": forkCount,
}).Warn("There were no forked rows to update.")
}
if proposedCount == 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": proposedCount,
}).Info("Updated the row that should have been marked as proposed.")
} else if proposedCount > 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": proposedCount,
}).Error("Too many rows were marked as proposed!")
} else if proposedCount == 0 {
var count int
err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.")
}
if count != 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": count,
}).Warn("The proposed block was not marked as proposed...")
} else {
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
}
}
metrics.IncrementHeadTrackingReorgs(1)
}
// Update the slots table by marking the old slot's as forked.
func updateForked(db sql.Database, slot string, latestBlockRoot string) (int64, error) {
res, err := db.Exec(context.Background(), UpdateForkedStmt, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the forked slots")
return 0, err return 0, err
} }
count, err := res.RowsAffected() count, err := res.RowsAffected()
if err != nil { if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were effected by the reorg.") loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were marked as forked.")
return 0, err return 0, err
} }
metrics.IncrementHeadTrackingReorgs(1) return count, err
}
return count, nil func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64, error) {
res, err := db.Exec(context.Background(), UpdateProposedStmt, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the proposed slot.")
return 0, err
}
count, err := res.RowsAffected()
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were marked as proposed")
return 0, err
}
return count, err
} }
// Dummy function for calculating the mhKey. // Dummy function for calculating the mhKey.

View File

@ -17,7 +17,7 @@ func (bc *BeaconClient) handleReorg() {
for { for {
reorg := <-bc.ReOrgTracking.ProcessCh reorg := <-bc.ReOrgTracking.ProcessCh
log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.") log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.")
processReorg(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics) writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics)
} }
} }

View File

@ -1,27 +0,0 @@
package beaconclient
import (
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
func processReorg(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
updatedRows, err := updateReorgs(db, slot, latestBlockRoot, metrics)
if err != nil {
// Add this slot to the knownGaps table..
// Maybe we need to rename the knownGaps table to the "batchProcess" table.
}
if updatedRows > 0 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"updatedRows": updatedRows,
}).Info("Updated DB based on Reorgs.")
} else {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"updatedRows": updatedRows,
}).Warn("There were no rows to update.")
}
}

View File

@ -81,19 +81,18 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
return err return err
} }
// Handle any reorgs or skipped slots.
if ps.HeadOrHistoric == "head" {
if previousSlot != 0 && previousBlockRoot != "" {
ps.checkPreviousSlot(previousSlot, previousBlockRoot)
}
}
// Get this object ready to write // Get this object ready to write
dw := ps.createWriteObjects() dw := ps.createWriteObjects()
// Write the object to the DB. // Write the object to the DB.
dw.writeFullSlot() dw.writeFullSlot()
// Handle any reorgs or skipped slots.
if ps.HeadOrHistoric == "head" {
if previousSlot != 0 && previousBlockRoot != "" {
ps.checkPreviousSlot(previousSlot, previousBlockRoot)
}
}
return nil return nil
} }
@ -182,7 +181,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
"slot": ps.FullBeaconState.Slot, "slot": ps.FullBeaconState.Slot,
"fork": true, "fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.") }).Warn("A fork occurred! The previous slot and current slot match.")
processReorg(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
} else if previousSlot-1 != int(ps.FullBeaconState.Slot) { } else if previousSlot-1 != int(ps.FullBeaconState.Slot) {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"previousSlot": previousSlot, "previousSlot": previousSlot,
@ -195,7 +194,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
"previousBlockRoot": previousBlockRoot, "previousBlockRoot": previousBlockRoot,
"currentBlockParent": parentRoot, "currentBlockParent": parentRoot,
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.") }).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
processReorg(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics) writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics)
// Call our batch processing function. // Call our batch processing function.
// Continue with this slot. // Continue with this slot.
} else { } else {