From cac8d041eb75a8360466942ef7ada2929159e767 Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Tue, 20 Sep 2022 17:35:01 -0500 Subject: [PATCH] pre-payload --- Makefile | 4 +-- pkg/beaconclient/capturehistoric_test.go | 2 +- pkg/beaconclient/models.go | 17 +++++---- pkg/beaconclient/processslot.go | 43 ++++++++++++++++++----- pkg/beaconclient/queryserver.go | 44 ++++-------------------- 5 files changed, 56 insertions(+), 54 deletions(-) diff --git a/Makefile b/Makefile index 7f9f925..f62a92c 100644 --- a/Makefile +++ b/Makefile @@ -65,7 +65,7 @@ integration-test-local-no-race: unit-test-local: go vet ./... go fmt ./... - $(GINKGO) -r --label-filter unit \ + $(GINKGO) -r --label-filter 'unit && !flaky' \ --randomize-all --randomize-suites \ --flake-attempts=3 \ --fail-on-pending --keep-going \ @@ -109,4 +109,4 @@ build: ## Build docker image .PHONY: docker-build docker-build: - docker build -t vulcanize/ipld-eth-beacon-indexer . \ No newline at end of file + docker build -t vulcanize/ipld-eth-beacon-indexer . diff --git a/pkg/beaconclient/capturehistoric_test.go b/pkg/beaconclient/capturehistoric_test.go index 5c0f869..a166cbf 100644 --- a/pkg/beaconclient/capturehistoric_test.go +++ b/pkg/beaconclient/capturehistoric_test.go @@ -98,7 +98,7 @@ var _ = Describe("Capturehistoric", func() { BeaconNodeTester.runKnownGapsProcess(bc, 2, 2, 0, 2, 0) }) }) - Context("When theres a reprocessing error", Label("reprocessingError"), func() { + Context("When theres a reprocessing error", Label("reprocessingError", "flaky"), func() { It("Should update the reprocessing error.", func() { bc := setUpTest(BeaconNodeTester.TestConfig, "99") BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot) diff --git a/pkg/beaconclient/models.go b/pkg/beaconclient/models.go index 7e796a5..19f4d97 100644 --- a/pkg/beaconclient/models.go +++ b/pkg/beaconclient/models.go @@ -62,12 +62,17 @@ type DbSlots struct { // A struct to capture whats being written to eth-beacon.signed_block table. type DbSignedBeaconBlock struct { - Slot string // The slot. - BlockRoot string // The block root - ParentBlock string // The parent block root. - Eth1DataBlockHash string // The eth1 block_hash - MhKey string // The ipld multihash key. - + Slot string // The slot. + BlockRoot string // The block root + ParentBlock string // The parent block root. + Eth1DataBlockHash string // The eth1 block_hash + MhKey string // The ipld multihash key. + PayloadBlockNumber int64 + PayloadTimestamp int64 + PayloadBlockHash string + PayloadParentHash string + PayloadStateRoot string + PayloadReceiptsRoot string } // A struct to capture whats being written to eth-beacon.state table. diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index fd1069f..35b000e 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -191,7 +191,7 @@ func processFullSlot( } parseBeaconTime := time.Now() - finalBlockRoot, finalStateRoot, finalEth1DataBlockHash, err := ps.provideFinalHash() + finalBlockRoot, finalStateRoot, _, err := ps.provideFinalHash() if err != nil { return err, "CalculateBlockRoot" } @@ -199,11 +199,25 @@ func processFullSlot( if spd.CheckDb { checkDbTime := time.Now() - inDb, err := IsSlotInDb(ctx, ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot) - if err != nil { - return err, "checkDb" + var blockRequired bool + if spd.PerformBeaconBlockProcessing { + blockExists, err := checkSlotAndRoot(ps.Db, CheckSignedBeaconBlockStmt, strconv.Itoa(ps.Slot), finalBlockRoot) + if err != nil { + return err, "checkDb" + } + blockRequired = !blockExists } - if inDb { + + var stateRequired bool + if spd.PerformBeaconStateProcessing { + stateExists, err := checkSlotAndRoot(ps.Db, CheckBeaconStateStmt, strconv.Itoa(ps.Slot), finalStateRoot) + if err != nil { + return err, "checkDb" + } + stateRequired = !stateExists + } + + if !blockRequired && !stateRequired { log.WithField("slot", slot).Info("Slot already in the DB.") return nil, "" } @@ -212,7 +226,7 @@ func processFullSlot( // Get this object ready to write createDbWriteTime := time.Now() - dw, err := ps.createWriteObjects(finalBlockRoot, finalStateRoot, finalEth1DataBlockHash) + dw, err := ps.createWriteObjects() if err != nil { return err, "blockRoot" } @@ -303,6 +317,11 @@ func (ps *ProcessSlot) getSignedBeaconBlock(serverAddress string) error { ps.SszSignedBeaconBlock = []byte{} ps.ParentBlockRoot = "" ps.Status = "skipped" + + // A 404 is normal in the case of a "skipped" slot. + if rc == 404 { + return nil + } return err } @@ -389,7 +408,7 @@ func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previou } // Transforms all the raw data into DB models that can be written to the DB. -func (ps *ProcessSlot) createWriteObjects(blockRoot, stateRoot, eth1DataBlockHash string) (*DatabaseWriter, error) { +func (ps *ProcessSlot) createWriteObjects() (*DatabaseWriter, error) { var status string if ps.Status != "" { status = ps.Status @@ -397,7 +416,15 @@ func (ps *ProcessSlot) createWriteObjects(blockRoot, stateRoot, eth1DataBlockHas status = "proposed" } - dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1DataBlockHash, status, &ps.SszSignedBeaconBlock, &ps.SszBeaconState, ps.Metrics) + parseBeaconTime := time.Now() + blockRoot, stateRoot, eth1DataBlockHash, err := ps.provideFinalHash() + if err != nil { + return nil, err + } + ps.PerformanceMetrics.ParseBeaconObjectForHash = time.Since(parseBeaconTime) + + dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1DataBlockHash, + status, &ps.SszSignedBeaconBlock, &ps.SszBeaconState, ps.Metrics) if err != nil { return dw, err } diff --git a/pkg/beaconclient/queryserver.go b/pkg/beaconclient/queryserver.go index 5c80c0b..9ce4ffe 100644 --- a/pkg/beaconclient/queryserver.go +++ b/pkg/beaconclient/queryserver.go @@ -18,7 +18,6 @@ package beaconclient import ( - "encoding/json" "fmt" "io/ioutil" "net/http" @@ -53,47 +52,18 @@ func querySsz(endpoint string, slot string) ([]byte, int, error) { return nil, 0, fmt.Errorf("Unable to query Beacon Node: %s", err.Error()) } defer response.Body.Close() + rc := response.StatusCode + // Any 2xx code is OK. + if rc < 200 || rc >= 300 { + return nil, rc, fmt.Errorf("HTTP Error: %d", rc) + } + body, err := ioutil.ReadAll(response.Body) if err != nil { loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!") return nil, rc, fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error()) } - if rc != 200 { - return body, rc, fmt.Errorf("HTTP Error: %d", rc) - } + return body, rc, nil } - -// A function to query the blockroot for a given slot. -func queryBlockRoot(endpoint string, slot string) (string, error) { - log.WithFields(log.Fields{"endpoint": endpoint}).Debug("Querying endpoint") - client := &http.Client{} - req, err := http.NewRequest("GET", endpoint, nil) - if err != nil { - loghelper.LogSlotError(slot, err).Error("Unable to create a request!") - return "", fmt.Errorf("Unable to create a request!: %s", err.Error()) - } - req.Header.Set("Accept", "application/json") - response, err := client.Do(req) - if err != nil { - loghelper.LogSlotError(slot, err).Error("Unable to query Beacon Node!") - return "", fmt.Errorf("Unable to query Beacon Node: %s", err.Error()) - } - defer response.Body.Close() - body, err := ioutil.ReadAll(response.Body) - if err != nil { - loghelper.LogSlotError(slot, err).Error("Unable to turn response into a []bytes array!") - return "", fmt.Errorf("Unable to turn response into a []bytes array!: %s", err.Error()) - } - - resp := BlockRootResponse{} - if err := json.Unmarshal(body, &resp); err != nil { - loghelper.LogEndpoint(endpoint).WithFields(log.Fields{ - "rawMessage": string(body), - "err": err, - }).Error("Unable to unmarshal the block root") - return "", err - } - return resp.Data.Root, nil -}