Feature/22 test handling incoming events #30

Merged
abdulrabbani00 merged 11 commits from feature/22-test-handling-incoming-events into develop 2022-05-12 13:52:14 +00:00
9 changed files with 310 additions and 86 deletions
Showing only changes of commit 36e8711891 - Show all commits

View File

@ -66,7 +66,6 @@ unit-test-local:
go vet ./...
go fmt ./...
$(GINKGO) -r --label-filter unit \
--procs=4 --compilers=4 \
--randomize-all --randomize-suites \
--fail-on-pending --keep-going \
--race --trace
@ -76,7 +75,6 @@ unit-test-ci:
go vet ./...
go fmt ./...
$(GINKGO) -r --label-filter unit \
--procs=4 --compilers=4 \
--randomize-all --randomize-suites \
--fail-on-pending --keep-going \
--cover --coverprofile=cover.profile \

View File

@ -63,11 +63,24 @@ This project utilizes `ginkgo` for testing. A few notes on testing:
### Testing the `pkg/beaconclient`
To test the `pkg/beaconclient`, keep the following in mind.
#### Get SSZ Files
To test the `/pkg/beaconclient`, you will need to download data locally.
1. [Install Minio](https://docs.min.io/minio/baremetal/quickstart/quickstart.html), you only need the client, `mc`.
2. Run: `mc cp`
#### Understanding Testing Components
A few notes about the testing components.
- The `TestEvents` map contains several events for testers to leverage when testing.
- Any object ending in `-dummy` is not a real object. You will also notice it has a present field called `MimicConfig`. This object will use an existing SSZ object, and update the parameters from the `Head` and `MimicConfig`.
- This is done because creating an empty or minimal `SignedBeaconBlock` and `BeaconState` is fairly challenging.
- By slightly modifying an existing object, we can test re-org, malformed objects, and other negative conditions.
# Contribution
If you want to contribute please make sure you do the following:

View File

@ -21,12 +21,19 @@ var (
//bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
)
// A structure utilized for keeping track of various metrics.
type BeaconClientMetrics struct {
HeadTrackingInserts uint64
HeadTrackingReorgs uint64
}
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.
type BeaconClient struct {
Context context.Context // A context generic context with multiple uses.
ServerEndpoint string // What is the endpoint of the beacon server.
PerformHistoricalProcessing bool // Should we perform historical processing?
Db sql.Database // Database object used for reads and writes.
Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics.
// Used for Head Tracking
PerformHeadTracking bool // Should we track head?
@ -63,6 +70,10 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres
ServerEndpoint: endpoint,
HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint),
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
Metrics: &BeaconClientMetrics{
HeadTrackingInserts: 0,
HeadTrackingReorgs: 0,
},
//FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
}
}

View File

@ -2,17 +2,22 @@ package beaconclient_test
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
"strconv"
"sync/atomic"
"time"
"github.com/jarcoal/httpmock"
. "github.com/onsi/ginkgo/v2"
types "github.com/prysmaticlabs/prysm/consensus-types/primitives"
st "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/r3labs/sse"
log "github.com/sirupsen/logrus"
. "github.com/onsi/gomega"
@ -25,16 +30,20 @@ type Message struct {
HeadMessage beaconclient.Head // The head messsage that will be streamed to the BeaconClient
ReorgMessage beaconclient.ChainReorg // The reorg messsage that will be streamed to the BeaconClient
TestNotes string // A small explanation of the purpose this structure plays in the testing landscape.
MimicConfig *MimicConfig // A configuration of parameters that you are trying to
SignedBeaconBlock string // The file path output of an SSZ encoded SignedBeaconBlock.
BeaconState string // The file path output of an SSZ encoded BeaconState.
}
// A structure that can be utilized to mimic and existing SSZ object but change it ever so slightly.
// This is used because creating your own SSZ object is a headache.
type MimicConfig struct {
ParentRoot string // The parent root, leave it empty if you want a to use the universal
}
var _ = Describe("Capturehead", func() {
var (
BC beaconclient.BeaconClient
DB sql.Database
err error
address string = "localhost"
port int = 8080
protocol string = "http"
@ -45,10 +54,26 @@ var _ = Describe("Capturehead", func() {
dbUser string = "vdbm"
dbPassword string = "password"
dbDriver string = "pgx"
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
)
BeforeEach(func() {
TestEvents = map[string]*Message{
"100-dummy": {
HeadMessage: beaconclient.Head{
Slot: "100",
Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a92d52a63b919b",
State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c0723c92c1782bfa",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "An easy to process Phase 0 block",
MimicConfig: &MimicConfig{},
SignedBeaconBlock: filepath.Join("data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("data", "100", "beacon-state.ssz"),
},
"100": {
HeadMessage: beaconclient.Head{
Slot: "100",
@ -59,45 +84,54 @@ var _ = Describe("Capturehead", func() {
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "This is a simple, easy to process block.",
TestNotes: "An easy to process Phase 0 block",
SignedBeaconBlock: filepath.Join("data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("data", "100", "beacon-state.ssz"),
},
"101": {
"2375703-dummy": {
HeadMessage: beaconclient.Head{
Slot: "101",
Block: "0xabe1a972e512182d04f0d4a5c9c25f9ee57c2e9d0ff3f4c4c82fd42d13d31083",
State: "0xcb04aa2edbf13c7bb7e7bd9b621ced6832e0075e89147352eac3019a824ce847",
Slot: "2375703",
Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508a8e648e",
State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d265aef5f",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "This is a simple, easy to process block.",
SignedBeaconBlock: filepath.Join("data", "101", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("data", "101", "beacon-state.ssz"),
TestNotes: "This is a dummy message that is used for reorgs",
MimicConfig: &MimicConfig{},
SignedBeaconBlock: filepath.Join("data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("data", "2375703", "beacon-state.ssz"),
},
"2375703": {
HeadMessage: beaconclient.Head{
Slot: "2375703",
Block: "0x4392372c5f6e39499e31bf924388b5815639103149f0f54f8a453773b1802301",
State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e",
CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false},
ReorgMessage: beaconclient.ChainReorg{},
TestNotes: "An easy to process Altair Block",
SignedBeaconBlock: filepath.Join("data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("data", "2375703", "beacon-state.ssz"),
},
}
SetupBeaconNodeMock(TestEvents, protocol, address, port)
BC = *beaconclient.CreateBeaconClient(context.Background(), protocol, address, port)
DB, err = postgres.SetupPostgresDb(dbHost, dbPort, dbName, dbUser, dbPassword, dbDriver)
Expect(err).ToNot(HaveOccurred())
clearEthclDbTables(DB)
// Drop all records from the DB.
BC.Db = DB
})
// We might also want to add an integration test that will actually process a single event, then end.
// This will help us know that our models match that actual data being served from the beacon node.
Describe("Receiving New Head SSE messages", Label("unit"), func() {
Context("Correctly formatted", func() {
Context("Correctly formatted Phase0 Block", func() {
It("Should turn it into a struct successfully.", func() {
BC := setUpTest(TestEvents, protocol, address, port, dummyParentRoot, dbHost, dbPort, dbName, dbUser, dbPassword, dbDriver)
SetupBeaconNodeMock(TestEvents, protocol, address, port, dummyParentRoot)
defer httpmock.Deactivate()
go BC.CaptureHead()
sendHeadMessage(BC, TestEvents["100"].HeadMessage)
epoch, slot, blockRoot, stateRoot, status := queryDbSlotAndBlock(DB, TestEvents["100"].HeadMessage.Slot, TestEvents["100"].HeadMessage.Block)
epoch, slot, blockRoot, stateRoot, status := queryDbSlotAndBlock(BC.Db, TestEvents["100"].HeadMessage.Slot, TestEvents["100"].HeadMessage.Block)
Expect(slot).To(Equal(100))
Expect(epoch).To(Equal(3))
Expect(blockRoot).To(Equal(TestEvents["100"].HeadMessage.Block))
@ -106,7 +140,27 @@ var _ = Describe("Capturehead", func() {
})
})
//Context("A single incorrectly formatted", func() {
Context("Correctly formatted Altair Block", func() {
It("Should turn it into a struct successfully.", func() {
BC := setUpTest(TestEvents, protocol, address, port, dummyParentRoot, dbHost, dbPort, dbName, dbUser, dbPassword, dbDriver)
SetupBeaconNodeMock(TestEvents, protocol, address, port, dummyParentRoot)
defer httpmock.Deactivate()
go BC.CaptureHead()
sendHeadMessage(BC, TestEvents["2375703"].HeadMessage)
epoch, slot, blockRoot, stateRoot, status := queryDbSlotAndBlock(BC.Db, TestEvents["2375703"].HeadMessage.Slot, TestEvents["2375703"].HeadMessage.Block)
Expect(slot).To(Equal(2375703))
Expect(epoch).To(Equal(74240))
Expect(blockRoot).To(Equal(TestEvents["2375703"].HeadMessage.Block))
Expect(stateRoot).To(Equal(TestEvents["2375703"].HeadMessage.State))
Expect(status).To(Equal("proposed"))
})
})
//Context("A single incorrectly formatted head message", func() {
// It("Should create an error, maybe also add the projected slot to the knownGaps table......")
//})
//Context("An incorrectly formatted message sandwiched between correctly formatted messages", func() {
@ -114,10 +168,71 @@ var _ = Describe("Capturehead", func() {
//})
})
//Describe("Receiving New Reorg SSE messages", Label("unit"), func() {
// Context("Reorg slot is already in the DB", func() {
// It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.")
// })
Describe("Receiving New Reorg SSE messages", Label("dry"), func() {
Context("Altair: Reorg slot is already in the DB", func() {
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
BC := setUpTest(TestEvents, protocol, address, port, dummyParentRoot, dbHost, dbPort, dbName, dbUser, dbPassword, dbDriver)
SetupBeaconNodeMock(TestEvents, protocol, address, port, dummyParentRoot)
defer httpmock.Deactivate()
go BC.CaptureHead()
log.Info("Sending Altair Messages to BeaconClient")
sendHeadMessage(BC, TestEvents["2375703-dummy"].HeadMessage)
sendHeadMessage(BC, TestEvents["2375703"].HeadMessage)
for atomic.LoadUint64(&BC.Metrics.HeadTrackingReorgs) != 1 {
time.Sleep(1 * time.Second)
}
log.Info("Checking Altair to make sure the fork was marked properly.")
epoch, slot, blockRoot, stateRoot, status := queryDbSlotAndBlock(BC.Db, TestEvents["2375703-dummy"].HeadMessage.Slot, TestEvents["2375703-dummy"].HeadMessage.Block)
Expect(slot).To(Equal(2375703))
Expect(epoch).To(Equal(74240))
Expect(blockRoot).To(Equal(TestEvents["2375703-dummy"].HeadMessage.Block))
Expect(stateRoot).To(Equal(TestEvents["2375703-dummy"].HeadMessage.State))
Expect(status).To(Equal("forked"))
epoch, slot, blockRoot, stateRoot, status = queryDbSlotAndBlock(BC.Db, TestEvents["2375703"].HeadMessage.Slot, TestEvents["2375703"].HeadMessage.Block)
Expect(slot).To(Equal(2375703))
Expect(epoch).To(Equal(74240))
Expect(blockRoot).To(Equal(TestEvents["2375703"].HeadMessage.Block))
Expect(stateRoot).To(Equal(TestEvents["2375703"].HeadMessage.State))
Expect(status).To(Equal("proposed"))
})
})
Context("Phase0: Reorg slot is already in the DB", func() {
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
BC := setUpTest(TestEvents, protocol, address, port, dummyParentRoot, dbHost, dbPort, dbName, dbUser, dbPassword, dbDriver)
SetupBeaconNodeMock(TestEvents, protocol, address, port, dummyParentRoot)
defer httpmock.Deactivate()
go BC.CaptureHead()
log.Info("Sending Phase0 Messages to BeaconClient")
sendHeadMessage(BC, TestEvents["100-dummy"].HeadMessage)
sendHeadMessage(BC, TestEvents["100"].HeadMessage)
for atomic.LoadUint64(&BC.Metrics.HeadTrackingReorgs) != 1 {
time.Sleep(1 * time.Second)
}
log.Info("Checking Phase0 to make sure the fork was marked properly.")
epoch, slot, blockRoot, stateRoot, status := queryDbSlotAndBlock(BC.Db, TestEvents["100-dummy"].HeadMessage.Slot, TestEvents["100-dummy"].HeadMessage.Block)
Expect(slot).To(Equal(100))
Expect(epoch).To(Equal(3))
Expect(blockRoot).To(Equal(TestEvents["100-dummy"].HeadMessage.Block))
Expect(stateRoot).To(Equal(TestEvents["100-dummy"].HeadMessage.State))
Expect(status).To(Equal("forked"))
epoch, slot, blockRoot, stateRoot, status = queryDbSlotAndBlock(BC.Db, TestEvents["100"].HeadMessage.Slot, TestEvents["100"].HeadMessage.Block)
Expect(slot).To(Equal(100))
Expect(epoch).To(Equal(3))
Expect(blockRoot).To(Equal(TestEvents["100"].HeadMessage.Block))
Expect(stateRoot).To(Equal(TestEvents["100"].HeadMessage.State))
Expect(status).To(Equal("proposed"))
})
})
//Context("Multiple reorgs have occurred on this slot", func() {
// It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.")
//})
@ -125,7 +240,7 @@ var _ = Describe("Capturehead", func() {
// It("Should simply have the correct slot in the DB.")
//})
//})
})
//Describe("Querying SignedBeaconBlock and Beacon State", Label("unit"), func() {
// Context("When the slot is properly served by the beacon node", func() {
@ -164,19 +279,38 @@ var _ = Describe("Capturehead", func() {
//})
})
// Must run before each test. We can't use the beforeEach because of the way
// Ginko treats race conditions.
func setUpTest(testEvents map[string]*Message, protocol string, address string, port int, dummyParentRoot string,
dbHost string, dbPort int, dbName string, dbUser string, dbPassword string, dbDriver string) *beaconclient.BeaconClient {
BC := *beaconclient.CreateBeaconClient(context.Background(), protocol, address, port)
DB, err := postgres.SetupPostgresDb(dbHost, dbPort, dbName, dbUser, dbPassword, dbDriver)
Expect(err).ToNot(HaveOccurred())
// Drop all records from the DB.
clearEthclDbTables(DB)
BC.Db = DB
return &BC
}
// Wrapper function to send a head message to the beaconclient
func sendHeadMessage(bc beaconclient.BeaconClient, head beaconclient.Head) {
func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head) {
data, err := json.Marshal(head)
Expect(err).ToNot(HaveOccurred())
startInserts := atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts)
bc.HeadTracking.MessagesCh <- &sse.Event{
ID: []byte{},
Data: data,
Event: []byte{},
Retry: []byte{},
}
time.Sleep(2 * time.Second)
for atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts) != startInserts+1 {
time.Sleep(1 * time.Second)
}
}
// A helper function to query the ethcl.slots table based on the slot and block_root
@ -205,7 +339,7 @@ type TestBeaconNode struct {
}
// Create a new new mock for the beacon node.
func SetupBeaconNodeMock(TestEvents map[string]*Message, protocol string, address string, port int) {
func SetupBeaconNodeMock(TestEvents map[string]*Message, protocol string, address string, port int, dummyParentRoot string) {
httpmock.Activate()
testBC := &TestBeaconNode{
TestEvents: TestEvents,
@ -216,7 +350,7 @@ func SetupBeaconNodeMock(TestEvents map[string]*Message, protocol string, addres
func(req *http.Request) (*http.Response, error) {
// Get ID from request
id := httpmock.MustGetSubmatch(req, 1)
dat, err := testBC.provideSsz(id, "state")
dat, err := testBC.provideSsz(id, "state", dummyParentRoot)
if err != nil {
return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), err
}
@ -229,7 +363,7 @@ func SetupBeaconNodeMock(TestEvents map[string]*Message, protocol string, addres
func(req *http.Request) (*http.Response, error) {
// Get ID from request
id := httpmock.MustGetSubmatch(req, 1)
dat, err := testBC.provideSsz(id, "block")
dat, err := testBC.provideSsz(id, "block", dummyParentRoot)
if err != nil {
return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), err
}
@ -239,22 +373,72 @@ func SetupBeaconNodeMock(TestEvents map[string]*Message, protocol string, addres
}
// A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it.
func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string) ([]byte, error) {
func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string, dummyParentRoot string) ([]byte, error) {
var slotFile string
var Message *Message
for _, val := range tbc.TestEvents {
if sszIdentifier == "state" {
if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.State == slotIdentifier {
slotFile = val.BeaconState
Message = val
}
} else if sszIdentifier == "block" {
if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.Block == slotIdentifier {
slotFile = val.SignedBeaconBlock
Message = val
}
}
}
if Message.MimicConfig != nil {
log.Info("We are going to create a custom SSZ object for testing purposes.")
if sszIdentifier == "block" {
dat, err := os.ReadFile(slotFile)
if err != nil {
return nil, fmt.Errorf("Can't find the slot file, %s", slotFile)
}
block := &st.SignedBeaconBlock{}
err = block.UnmarshalSSZ(dat)
if err != nil {
log.Error("Error unmarshalling: ", err)
}
slot, err := strconv.ParseUint(Message.HeadMessage.Slot, 10, 64)
Expect(err).ToNot(HaveOccurred())
block.Block.Slot = types.Slot(slot)
block.Block.StateRoot, err = hex.DecodeString(Message.HeadMessage.State)
Expect(err).ToNot(HaveOccurred())
if Message.MimicConfig.ParentRoot == "" {
block.Block.ParentRoot, err = hex.DecodeString(dummyParentRoot)
Expect(err).ToNot(HaveOccurred())
} else {
block.Block.ParentRoot, err = hex.DecodeString(Message.MimicConfig.ParentRoot)
}
return block.MarshalSSZ()
}
if sszIdentifier == "state" {
dat, err := os.ReadFile(slotFile)
if err != nil {
return nil, fmt.Errorf("Can't find the slot file, %s", slotFile)
}
state := st.BeaconState{}
state.UnmarshalSSZ(dat)
slot, err := strconv.ParseUint(Message.HeadMessage.Slot, 10, 64)
Expect(err).ToNot(HaveOccurred())
state.Slot = types.Slot(slot)
return state.MarshalSSZ()
}
}
if slotFile == "" {
return nil, fmt.Errorf("We couldn't find the slot file for %s", slotIdentifier)
}
dat, err := os.ReadFile(slotFile)
if err != nil {
return nil, fmt.Errorf("Can't find the slot file, %s", slotFile)

View File

@ -30,9 +30,9 @@ VALUES ($1, $2, $3) ON CONFLICT (slot, state_root) DO NOTHING`
INSERT INTO public.blocks (key, data)
VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
UpdateReorgStmt string = `UPDATE ethcl.slots
SET status=forked
WHERE slot=$1 AND block_hash<>$2
RETURNING block_hash;`
SET status='forked'
WHERE slot=$1 AND block_root<>$2
RETURNING block_root;`
)
// Put all functionality to prepare the write object
@ -40,6 +40,7 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
// Remove any of it from the processslot file.
type DatabaseWriter struct {
Db sql.Database
Metrics *BeaconClientMetrics
DbSlots *DbSlots
DbSignedBeaconBlock *DbSignedBeaconBlock
DbBeaconState *DbBeaconState
@ -47,9 +48,10 @@ type DatabaseWriter struct {
rawSignedBeaconBlock []byte
}
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, status string) *DatabaseWriter {
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, status string, metrics *BeaconClientMetrics) *DatabaseWriter {
dw := &DatabaseWriter{
Db: db,
Metrics: metrics,
}
dw.prepareSlotsModel(slot, stateRoot, blockRoot, status)
dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot)
@ -101,6 +103,7 @@ func (dw *DatabaseWriter) writeFullSlot() {
dw.writeSlots()
dw.writeSignedBeaconBlocks()
dw.writeBeaconState()
dw.Metrics.IncrementHeadTrackingInserts(1)
}
// Write the information for the generic slots table. For now this is only one function.
@ -151,13 +154,13 @@ func (dw *DatabaseWriter) writeBeaconState() {
func (dw *DatabaseWriter) upsertBeaconState() {
_, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey)
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.signed_beacon_block table")
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table")
}
}
// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot.
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
func updateReorgs(db sql.Database, slot string, latestBlockRoot string) (int64, error) {
func updateReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) (int64, error) {
res, err := db.Exec(context.Background(), UpdateReorgStmt, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with reorgs.")
@ -168,6 +171,7 @@ func updateReorgs(db sql.Database, slot string, latestBlockRoot string) (int64,
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were effected by the reorg.")
return 0, err
}
metrics.IncrementHeadTrackingReorgs(1)
return count, nil
}

View File

@ -0,0 +1,15 @@
package beaconclient
import "sync/atomic"
// Wrapper function to increment inserts. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementHeadTrackingInserts(inc uint64) {
atomic.AddUint64(&m.HeadTrackingInserts, inc)
}
// Wrapper function to increment reorgs. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementHeadTrackingReorgs(inc uint64) {
atomic.AddUint64(&m.HeadTrackingReorgs, inc)
}

View File

@ -17,7 +17,7 @@ func (bc *BeaconClient) handleReorg() {
for {
reorg := <-bc.ReOrgTracking.ProcessCh
log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.")
processReorg(bc.Db, reorg.Slot, reorg.NewHeadBlock)
processReorg(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics)
}
}
@ -33,7 +33,7 @@ func (bc *BeaconClient) handleHead() {
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
}
}
err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot)
err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics)
if err != nil {
loghelper.LogSlotError(head.Slot, err)
}

View File

@ -6,12 +6,8 @@ import (
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
func processReorg(db sql.Database, slot string, latestBlockRoot string) {
// Check to see if there are slots in the DB with the given slot.
// Update them ALL to forked
// Upsert the new slot into the DB, mark the status to proposed.
// Query at the end to make sure that you have handled the reorg properly.
updatedRows, err := updateReorgs(db, slot, latestBlockRoot)
func processReorg(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
updatedRows, err := updateReorgs(db, slot, latestBlockRoot, metrics)
if err != nil {
// Add this slot to the knownGaps table..

View File

@ -38,6 +38,7 @@ type ProcessSlot struct {
Status string // The status of the block
HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and missed slots.
Db sql.Database // The DB object used to write to the DB.
Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics
// BeaconBlock
SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock
@ -54,7 +55,7 @@ type ProcessSlot struct {
}
// This function will do all the work to process the slot and write it to the DB.
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string) error {
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics) error {
headOrHistoric = strings.ToLower(headOrHistoric)
if headOrHistoric != "head" && headOrHistoric != "historic" {
return fmt.Errorf("headOrBatch must be either historic or head!")
@ -64,6 +65,8 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
BlockRoot: blockRoot,
StateRoot: stateRoot,
HeadOrHistoric: headOrHistoric,
Db: db,
Metrics: metrics,
}
// Get the SignedBeaconBlock.
@ -86,7 +89,7 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
}
// Get this object ready to write
dw := ps.createWriteObjects(db)
dw := ps.createWriteObjects()
// Write the object to the DB.
dw.writeFullSlot()
@ -95,8 +98,8 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
}
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string) error {
return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head")
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics) error {
return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics)
}
// Handle a historic slot. A wrapper function for calling `handleFullSlot`.
@ -179,7 +182,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
"slot": ps.FullBeaconState.Slot,
"fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.")
processReorg(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot)
processReorg(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
} else if previousSlot-1 != int(ps.FullBeaconState.Slot) {
log.WithFields(log.Fields{
"previousSlot": previousSlot,
@ -192,7 +195,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
"previousBlockRoot": previousBlockRoot,
"currentBlockParent": parentRoot,
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
processReorg(ps.Db, strconv.Itoa(previousSlot), parentRoot)
processReorg(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics)
// Call our batch processing function.
// Continue with this slot.
} else {
@ -216,7 +219,7 @@ func (ps *ProcessSlot) checkMissedSlot() {
}
// Transforms all the raw data into DB models that can be written to the DB.
func (ps *ProcessSlot) createWriteObjects(db sql.Database) *DatabaseWriter {
func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter {
var (
stateRoot string
blockRoot string
@ -251,7 +254,7 @@ func (ps *ProcessSlot) createWriteObjects(db sql.Database) *DatabaseWriter {
status = "proposed"
}
dw := CreateDatabaseWrite(db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, status)
dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, status, ps.Metrics)
dw.rawSignedBeaconBlock = ps.SszSignedBeaconBlock
dw.rawBeaconState = ps.SszBeaconState