Feature/22 test handling incoming events (#30)

* Checkpoint before the weekend

* Update location for SetupPostgresDB

* Include first functioning tests for processing head

* Fix gitignore

* Test CaptureHead | Add Metrics | Handle Test Race Conditions

This Commit allows us to:

* Test the `CaptureHead` function.
* Test parsing a single Head message.
* Test a Reorg condition.
* Add Metrics. This is primarily used for testing but can have future use cases.
* Rearrange the test due to race conditions introduced by reusing a variable. `BeforeEach` can't be used to update `BC`.

* Update and finalize testing at this stage

* Update code and CI/CD

* Fix lint errors

* Update CICD and fail when file not found.

* Update test to have failed as expected.
This commit is contained in:
Abdul Rabbani 2022-05-12 09:52:13 -04:00 committed by Abdul Rabbani
parent bf38c83a4c
commit bea9ce2920
23 changed files with 715 additions and 251 deletions

View File

@ -11,6 +11,10 @@ on:
description: "The branch, commit or sha from ipld-eth-db to checkout"
required: false
default: "main"
ssz-data-ref:
description: "The branch, commit or sha from ssz-data to checkout"
required: false
default: "main"
pull_request:
paths:
- "!**.md"
@ -22,7 +26,8 @@ on:
env:
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || '21d076268730e3f25fcec6371c1aca1bf48040d8'}}
ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || '05600e51d2163e1c5e2a872cb54606bc0a380d12' }}
ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || 'minimal-beacon-chain-schema' }}
ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }}
GOPATH: /tmp/go
jobs:
build:
@ -96,12 +101,26 @@ jobs:
path: "./ipld-eth-db/"
fetch-depth: 0
- uses: actions/checkout@v3
with:
ref: ${{ env.ssz-data-ref }}
repository: vulcanize/ssz-data
path: "./ipld-ethcl-indexer/pkg/beaconclient/ssz-data"
fetch-depth: 0
- name: Create config file
run: |
echo vulcanize_ipld_eth_db=$GITHUB_WORKSPACE/ipld-eth-db/ > ./config.sh
echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh
cat ./config.sh
- name: Run docker compose
run: |
docker-compose \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-db.yml" \
--env-file ./config.sh \
up -d --build
- uses: actions/setup-go@v3
with:
go-version: ">=1.17.0"

1
.gitignore vendored
View File

@ -4,3 +4,4 @@ ipld-ethcl-indexer.log
report.json
cover.profile
temp/*
pkg/beaconclient/ssz-data/

View File

@ -66,21 +66,19 @@ unit-test-local:
go vet ./...
go fmt ./...
$(GINKGO) -r --label-filter unit \
--procs=4 --compilers=4 \
--randomize-all --randomize-suites \
--fail-on-pending --keep-going \
--race --trace
--trace
.PHONY: unit-test-ci
unit-test-ci:
go vet ./...
go fmt ./...
$(GINKGO) -r --label-filter unit \
--procs=4 --compilers=4 \
--randomize-all --randomize-suites \
--fail-on-pending --keep-going \
--cover --coverprofile=cover.profile \
--race --trace --json-report=report.json
--trace --json-report=report.json
.PHONY: build

View File

@ -14,9 +14,27 @@ This application will capture all the `BeaconState`'s and `SignedBeaconBlock`'s
To learn more about the applications individual components, please read the [application components](/application_component.md).
# Running the Application
# Quick Start
To run the application, utilize the following command, and update the values as needed.
## Running the Application
To run the application, do as follows:
1. Setup the prerequisite applications.
a. Run a beacon client (such as lighthouse).
b. Run a postgres DB.
c. You can utilize the `stack-orchestrator` [repository](https://github.com/vulcanize/stack-orchestrato).
```
./wrapper.sh -e skip \
-d ../docker/local/docker-compose-db.yml \
-d ../docker/latest/docker-compose-lighthouse.yml \
-v remove \
-p ../local-config.sh
```
2. Run the start up command.
```
go run main.go capture head --db.address localhost \
@ -27,9 +45,19 @@ go run main.go capture head --db.address localhost \
--db.driver PGX \
--bc.address localhost \
--bc.port 5052 \
--log.level info
--bc.connectionProtocol http \
--log.level info \
--log.output=true
```
## Running Tests
To run tests, you will need to clone another repository which contains all the ssz files.
1. `git clone git@github.com:vulcanize/ssz-data.git pkg/beaconclient/ssz-data`
2. To run unit tests, make sure you have a DB running: `make unit-test-local`
3. To run integration tests, make sure you have a lighthouse client and a DB running: `make integration-test-local-no-race` .
# Development Patterns
This section will cover some generic development patterns utilizes.
@ -58,9 +86,18 @@ This project utilizes `ginkgo` for testing. A few notes on testing:
- All test packages are named `{base_package}_test`. This ensures we only test the public methods.
- If there is a need to test a private method, please include why in the testing file.
- Unit tests must contain the `Label("unit")`.
- Unit tests should not rely on any running service. If a running service is needed. Utilize an integration test.
- Unit tests should not rely on any running service (except for a postgres DB). If a running service is needed. Utilize an integration test.
- Integration tests must contain the `Label("integration")`.
#### Understanding Testing Components
A few notes about the testing components.
- The `TestEvents` map contains several events for testers to leverage when testing.
- Any object ending in `-dummy` is not a real object. You will also notice it has a present field called `MimicConfig`. This object will use an existing SSZ object, and update the parameters from the `Head` and `MimicConfig`.
- This is done because creating an empty or minimal `SignedBeaconBlock` and `BeaconState` is fairly challenging.
- By slightly modifying an existing object, we can test re-org, malformed objects, and other negative conditions.
# Contribution
If you want to contribute please make sure you do the following:

View File

@ -25,3 +25,15 @@ This package will contain code to interact with the beacon client.
## `pkg/version`
A generic package which can be utilized to easily version our applications.
## `pkg/gracefulshutdown`
A generic package that can be used to shutdown various services within an application.
## `pkg/loghelper`
This package contains useful functions for logging.
## `internal/shutdown`
This package is used to shutdown the `ipld-ethcl-indexer`. It calls the `pkg/gracefulshutdown` package.

1
go.mod
View File

@ -83,6 +83,7 @@ require (
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jackc/pgx/v4 v4.16.0
github.com/jarcoal/httpmock v1.2.0
github.com/julienschmidt/httprouter v1.3.0
github.com/magiconair/properties v1.8.6 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect

2
go.sum
View File

@ -387,6 +387,8 @@ github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5D
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jarcoal/httpmock v1.2.0 h1:gSvTxxFR/MEMfsGrvRbdfpRUMBStovlSRLw0Ep1bwwc=
github.com/jarcoal/httpmock v1.2.0/go.mod h1:oCoTsnAz4+UoOUIf5lJOWV2QQIW5UoeUI6aM2YnWAZk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=

View File

@ -26,7 +26,7 @@ func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime t
})
select {
case _ = <-successCh:
case <-successCh:
return nil
case err := <-errCh:
return err

View File

@ -21,12 +21,19 @@ var (
//bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
)
// A structure utilized for keeping track of various metrics. Currently, mostly used in testing.
type BeaconClientMetrics struct {
HeadTrackingInserts uint64 // Number of head events we wrote to the DB.
HeadTrackingReorgs uint64 // The number of reorg events written to the DB.
}
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.
type BeaconClient struct {
Context context.Context // A context generic context with multiple uses.
ServerEndpoint string // What is the endpoint of the beacon server.
PerformHistoricalProcessing bool // Should we perform historical processing?
Db sql.Database // Database object used for reads and writes.
Context context.Context // A context generic context with multiple uses.
ServerEndpoint string // What is the endpoint of the beacon server.
PerformHistoricalProcessing bool // Should we perform historical processing?
Db sql.Database // Database object used for reads and writes.
Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics.
// Used for Head Tracking
PerformHeadTracking bool // Should we track head?
@ -63,6 +70,10 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres
ServerEndpoint: endpoint,
HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint),
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
Metrics: &BeaconClientMetrics{
HeadTrackingInserts: 0,
HeadTrackingReorgs: 0,
},
//FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
}
}

Binary file not shown.

View File

@ -1,198 +1,517 @@
package beaconclient_test
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
"strings"
"strconv"
"sync/atomic"
"time"
"github.com/jarcoal/httpmock"
. "github.com/onsi/ginkgo/v2"
"github.com/sirupsen/logrus"
types "github.com/prysmaticlabs/prysm/consensus-types/primitives"
st "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/r3labs/sse"
log "github.com/sirupsen/logrus"
. "github.com/onsi/gomega"
// . "github.com/onsi/gomega"
"github.com/r3labs/sse/v2"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql/postgres"
)
type Message struct {
HeadMessage beaconclient.Head // The head messsage that will be streamed to the BeaconClient
ReorgMessage beaconclient.ChainReorg // The reorg messsage that will be streamed to the BeaconClient
TestNotes string // A small explanation of the purpose this structure plays in the testing landscape.
SignedBeaconBlock string // The file path output of an SSZ encoded SignedBeaconBlock.
BeaconState string // The file path output of an SSZ encoded BeaconState.
SuccessfulDBQuery string // A string that indicates what a query to the DB should output to pass the test.
HeadMessage beaconclient.Head // The head messsage that will be streamed to the BeaconClient
TestNotes string // A small explanation of the purpose this structure plays in the testing landscape.
MimicConfig *MimicConfig // A configuration of parameters that you are trying to
SignedBeaconBlock string // The file path output of an SSZ encoded SignedBeaconBlock.
BeaconState string // The file path output of an SSZ encoded BeaconState.
}
var TestEvents map[string]*Message
// A structure that can be utilized to mimic and existing SSZ object but change it ever so slightly.
// This is used because creating your own SSZ object is a headache.
type MimicConfig struct {
ParentRoot string // The parent root, leave it empty if you want a to use the universal
}
var _ = Describe("Capturehead", func() {
TestEvents = map[string]*Message{
"100": {
HeadMessage: beaconclient.Head{
Slot: "100",
Block: "0x582187e97f7520bb69eea014c3834c964c45259372a0eaaea3f032013797996b",
State: "0xf286a0379c0386a3c7be28d05d829f8eb7b280cc9ede15449af20ebcd06a7a56",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
var (
TestConfig Config
BeaconNodeTester TestBeaconNode
address string = "localhost"
port int = 8080
protocol string = "http"
TestEvents map[string]Message
dbHost string = "localhost"
dbPort int = 8077
dbName string = "vulcanize_testing"
dbUser string = "vdbm"
dbPassword string = "password"
dbDriver string = "pgx"
dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42"
)
BeforeEach(func() {
TestEvents = map[string]Message{
"100-dummy": {
HeadMessage: beaconclient.Head{
Slot: "100",
Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a92d52a63b919b",
State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c0723c92c1782bfa",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.",
MimicConfig: &MimicConfig{},
SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
},
TestNotes: "This is a simple, easy to process block.",
SignedBeaconBlock: filepath.Join("data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("data", "100", "beacon-state.ssz"),
},
"101": {
HeadMessage: beaconclient.Head{
Slot: "101",
Block: "0xabe1a972e512182d04f0d4a5c9c25f9ee57c2e9d0ff3f4c4c82fd42d13d31083",
State: "0xcb04aa2edbf13c7bb7e7bd9b621ced6832e0075e89147352eac3019a824ce847",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
"100-dummy-2": {
HeadMessage: beaconclient.Head{
Slot: "100",
Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a9aaaaaaaaaaaa",
State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c072bbbbbbbbbbbb",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.",
MimicConfig: &MimicConfig{},
SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
},
TestNotes: "This is a simple, easy to process block.",
SignedBeaconBlock: filepath.Join("data", "101", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("data", "101", "beacon-state.ssz"),
},
}
"100": {
HeadMessage: beaconclient.Head{
Slot: "100",
Block: "0x582187e97f7520bb69eea014c3834c964c45259372a0eaaea3f032013797996b",
State: "0xf286a0379c0386a3c7be28d05d829f8eb7b280cc9ede15449af20ebcd06a7a56",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "An easy to process Phase 0 block",
SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"),
},
"2375703-dummy": {
HeadMessage: beaconclient.Head{
Slot: "2375703",
Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508a8e648e",
State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d265aef5f",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "This is a dummy message that is used for reorgs",
MimicConfig: &MimicConfig{},
SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
},
"2375703-dummy-2": {
HeadMessage: beaconclient.Head{
Slot: "2375703",
Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508aaaaaaa",
State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d2bbbbbbb",
CurrentDutyDependentRoot: "",
PreviousDutyDependentRoot: "",
EpochTransition: false,
ExecutionOptimistic: false,
},
TestNotes: "This is a dummy message that is used for reorgs",
MimicConfig: &MimicConfig{},
SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
},
"2375703": {
HeadMessage: beaconclient.Head{
Slot: "2375703",
Block: "0x4392372c5f6e39499e31bf924388b5815639103149f0f54f8a453773b1802301",
State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e",
CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false},
TestNotes: "An easy to process Altair Block",
SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"),
BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"),
},
}
TestConfig = Config{
protocol: protocol,
address: address,
port: port,
dummyParentRoot: dummyParentRoot,
dbHost: dbHost,
dbPort: dbPort,
dbName: dbName,
dbUser: dbUser,
dbPassword: dbPassword,
dbDriver: dbDriver,
}
BeaconNodeTester = TestBeaconNode{
TestEvents: TestEvents,
TestConfig: TestConfig,
}
})
// We might also want to add an integration test that will actually process a single event, then end.
// This will help us know that our models match that actual data being served from the beacon node.
Describe("Receiving New Head SSE messages", Label("unit"), func() {
Context("Correctly formatted", Label("dry"), func() {
Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() {
Context("Correctly formatted Phase0 Block", func() {
It("Should turn it into a struct successfully.", func() {
server := createSseServer()
logrus.Info("DONE!")
client := sse.NewClient("http://localhost:8080" + beaconclient.BcHeadTopicEndpoint)
BeaconNodeTester.testProcessBlock(BeaconNodeTester.TestEvents["100"].HeadMessage, 3)
logrus.Info("DONE!")
ch := make(chan *sse.Event)
go client.SubscribeChanRaw(ch)
time.Sleep(2 * time.Second)
logrus.Info("DONE!")
sendMessageToStream(server, []byte("hello"))
client.Unsubscribe(ch)
val := <-ch
logrus.Info("DONE!")
logrus.Info(val)
})
})
//Context("A single incorrectly formatted", func() {
Context("Correctly formatted Altair Block", func() {
It("Should turn it into a struct successfully.", func() {
BeaconNodeTester.testProcessBlock(BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240)
})
})
//Context("A single incorrectly formatted head message", func() {
// It("Should create an error, maybe also add the projected slot to the knownGaps table......")
// If it can unmarshal the head add it to knownGaps
//})
//Context("An incorrectly formatted message sandwiched between correctly formatted messages", func() {
// It("Should create an error, maybe also add the projected slot to the knownGaps table......")
//})
// Context("When there is a skipped slot", func() {
// It("Should indicate that the slot was skipped")
// })
// Context("When the slot is not properly served", func() {
// It("Should return an error, and add the slot to the knownGaps table.")
// })
//})
// Context("With gaps in between head slots", func() {
// It("Should add the slots in between to the knownGaps table")
// })
// Context("With the previousBlockHash not matching the parentBlockHash", func() {
// It("Should recognize the reorg and add the previous slot to knownGaps table.")
// })
// Context("Out of order", func() {
// It("Not sure what it should do....")
// })
})
//Describe("Receiving New Reorg SSE messages", Label("unit"), func() {
// Context("Reorg slot is already in the DB", func() {
// It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.")
// })
// Context("Multiple reorgs have occurred on this slot", func() {
// It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.")
// })
// Context("Reorg slot in not already in the DB", func() {
// It("Should simply have the correct slot in the DB.")
// })
Describe("ReOrg Scenario", Label("unit", "behavioral"), func() {
Context("Altair: Multiple head messages for the same slot.", func() {
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
BeaconNodeTester.testMultipleHead(TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240)
})
})
Context("Phase0: Multiple head messages for the same slot.", func() {
It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
BeaconNodeTester.testMultipleHead(TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3)
})
})
Context("Phase 0: Multiple reorgs have occurred on this slot", Label("new"), func() {
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
BeaconNodeTester.testMultipleReorgs(TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3)
})
})
Context("Altair: Multiple reorgs have occurred on this slot", Label("new"), func() {
It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() {
BeaconNodeTester.testMultipleReorgs(TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240)
})
})
//Context("Reorg slot in not already in the DB", func() {
// It("Should simply have the correct slot in the DB.")
// Add to knowngaps
//})
//})
//Describe("Querying SignedBeaconBlock and Beacon State", Label("unit"), func() {
// Context("When the slot is properly served by the beacon node", func() {
// It("Should provide a successful response.")
// })
// Context("When there is a skipped slot", func() {
// It("Should indicate that the slot was skipped")
// // Future use case.
// })
// Context("When the slot is not properly served", func() {
// It("Should return an error, and add the slot to the knownGaps table.")
// })
//})
//Describe("Receiving properly formatted Head SSE events.", Label("unit"), func() {
// Context("In sequential order", func() {
// It("Should write each event to the DB successfully.")
// })
// Context("With gaps in slots", func() {
// It("Should add the slots in between to the knownGaps table")
// })
// Context("With a repeat slot", func() {
// It("Should recognize the reorg and process it.")
// })
// Context("With the previousBlockHash not matching the parentBlockHash", func() {
// It("Should recognize the reorg and add the previous slot to knownGaps table.")
// })
// Context("Out of order", func() {
// It("Not sure what it should do....")
// })
// Context("With a skipped slot", func() {
// It("Should recognize the slot as skipped and continue without error.")
// // Future use case
// })
//})
})
})
// Create a new Sse.Server.
func createSseServer() *sse.Server {
// server := sse.New()
// server.CreateStream("")
mux := http.NewServeMux()
//mux.HandleFunc(beaconclient.BcHeadTopicEndpoint, func(w http.ResponseWriter, r *http.Request) {
// go func() {
// // Received Browser Disconnection
// <-r.Context().Done()
// println("The client is disconnected here")
// return
// }()
// server.ServeHTTP(w, r)
//})
mux.HandleFunc(beaconclient.BcStateQueryEndpoint, provideState)
mux.HandleFunc(beaconclient.BcBlockQueryEndpoint, provideBlock)
go http.ListenAndServe(":8080", mux)
return server
type Config struct {
protocol string
address string
port int
dummyParentRoot string
dbHost string
dbPort int
dbName string
dbUser string
dbPassword string
dbDriver string
}
// Send messages to the stream.
func sendMessageToStream(server *sse.Server, data []byte) {
server.Publish("", &sse.Event{
Data: data,
//////////////////////////////////////////////////////
// Helper functions
//////////////////////////////////////////////////////
// Must run before each test. We can't use the beforeEach because of the way
// Gingko treats race conditions.
func setUpTest(config Config) *beaconclient.BeaconClient {
bc := *beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port)
db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver)
Expect(err).ToNot(HaveOccurred())
// Drop all records from the DB.
clearEthclDbTables(db)
bc.Db = db
return &bc
}
// A helper function to validate the expected output.
func validateSlot(bc *beaconclient.BeaconClient, headMessage *beaconclient.Head, correctEpoch int, correctStatus string) {
epoch, dbSlot, blockRoot, stateRoot, status := queryDbSlotAndBlock(bc.Db, headMessage.Slot, headMessage.Block)
baseSlot, err := strconv.Atoi(headMessage.Slot)
Expect(err).ToNot(HaveOccurred())
Expect(dbSlot).To(Equal(baseSlot))
Expect(epoch).To(Equal(correctEpoch))
Expect(blockRoot).To(Equal(headMessage.Block))
Expect(stateRoot).To(Equal(headMessage.State))
Expect(status).To(Equal(correctStatus))
}
// Wrapper function to send a head message to the beaconclient
func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head) {
data, err := json.Marshal(head)
Expect(err).ToNot(HaveOccurred())
startInserts := atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts)
bc.HeadTracking.MessagesCh <- &sse.Event{
ID: []byte{},
Data: data,
Event: []byte{},
Retry: []byte{},
}
for atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts) != startInserts+1 {
time.Sleep(1 * time.Second)
}
}
// A helper function to query the ethcl.slots table based on the slot and block_root
func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot string) (int, int, string, string, string) {
sqlStatement := `SELECT epoch, slot, block_root, state_root, status FROM ethcl.slots WHERE slot=$1 AND block_root=$2;`
var epoch, slot int
var blockRoot, stateRoot, status string
row := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot)
err := row.Scan(&epoch, &slot, &blockRoot, &stateRoot, &status)
Expect(err).ToNot(HaveOccurred())
return epoch, slot, blockRoot, stateRoot, status
}
// A function that will remove all entries from the ethcl tables for you.
func clearEthclDbTables(db sql.Database) {
deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.batch_processing;"}
for _, queries := range deleteQueries {
_, err := db.Exec(context.Background(), queries)
Expect(err).ToNot(HaveOccurred())
}
}
// An object that is used to aggregate test functions. Test functions are needed because we need to
// run the same tests on multiple blocks for multiple forks. So they save us time.
type TestBeaconNode struct {
TestEvents map[string]Message
TestConfig Config
}
// Create a new new mock for the beacon node.
func (tbc TestBeaconNode) SetupBeaconNodeMock(TestEvents map[string]Message, protocol string, address string, port int, dummyParentRoot string) {
httpmock.Activate()
stateUrl := `=~^` + protocol + "://" + address + ":" + strconv.Itoa(port) + beaconclient.BcStateQueryEndpoint + `([^/]+)\z`
httpmock.RegisterResponder("GET", stateUrl,
func(req *http.Request) (*http.Response, error) {
// Get ID from request
id := httpmock.MustGetSubmatch(req, 1)
dat, err := tbc.provideSsz(id, "state", dummyParentRoot)
if err != nil {
Expect(err).NotTo(HaveOccurred())
return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), err
}
return httpmock.NewBytesResponse(200, dat), nil
},
)
blockUrl := `=~^` + protocol + "://" + address + ":" + strconv.Itoa(port) + beaconclient.BcBlockQueryEndpoint + `([^/]+)\z`
httpmock.RegisterResponder("GET", blockUrl,
func(req *http.Request) (*http.Response, error) {
// Get ID from request
id := httpmock.MustGetSubmatch(req, 1)
dat, err := tbc.provideSsz(id, "block", dummyParentRoot)
if err != nil {
Expect(err).NotTo(HaveOccurred())
return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), err
}
return httpmock.NewBytesResponse(200, dat), nil
},
)
}
// A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it.
func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string, dummyParentRoot string) ([]byte, error) {
var slotFile string
var Message Message
for _, val := range tbc.TestEvents {
if sszIdentifier == "state" {
if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.State == slotIdentifier {
slotFile = val.BeaconState
Message = val
}
} else if sszIdentifier == "block" {
if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.Block == slotIdentifier {
slotFile = val.SignedBeaconBlock
Message = val
}
}
}
if Message.MimicConfig != nil {
log.Info("We are going to create a custom SSZ object for testing purposes.")
if sszIdentifier == "block" {
dat, err := os.ReadFile(slotFile)
if err != nil {
return nil, fmt.Errorf("Can't find the slot file, %s", slotFile)
}
block := &st.SignedBeaconBlock{}
err = block.UnmarshalSSZ(dat)
if err != nil {
log.Error("Error unmarshalling: ", err)
}
slot, err := strconv.ParseUint(Message.HeadMessage.Slot, 10, 64)
Expect(err).ToNot(HaveOccurred())
block.Block.Slot = types.Slot(slot)
block.Block.StateRoot, err = hex.DecodeString(Message.HeadMessage.State)
Expect(err).ToNot(HaveOccurred())
if Message.MimicConfig.ParentRoot == "" {
block.Block.ParentRoot, err = hex.DecodeString(dummyParentRoot)
Expect(err).ToNot(HaveOccurred())
} else {
block.Block.ParentRoot, err = hex.DecodeString(Message.MimicConfig.ParentRoot)
Expect(err).ToNot(HaveOccurred())
}
return block.MarshalSSZ()
}
if sszIdentifier == "state" {
dat, err := os.ReadFile(slotFile)
if err != nil {
return nil, fmt.Errorf("Can't find the slot file, %s", slotFile)
}
state := st.BeaconState{}
err = state.UnmarshalSSZ(dat)
Expect(err)
slot, err := strconv.ParseUint(Message.HeadMessage.Slot, 10, 64)
Expect(err).ToNot(HaveOccurred())
state.Slot = types.Slot(slot)
return state.MarshalSSZ()
}
}
if slotFile == "" {
return nil, fmt.Errorf("We couldn't find the slot file for %s", slotIdentifier)
}
dat, err := os.ReadFile(slotFile)
if err != nil {
return nil, fmt.Errorf("Can't find the slot file, %s", slotFile)
}
return dat, nil
}
// Helper function to test three reorg messages. There are going to be many functions like this,
// Because we need to test the same logic for multiple phases.
func (tbc TestBeaconNode) testMultipleReorgs(firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int) {
bc := setUpTest(tbc.TestConfig)
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
go bc.CaptureHead()
time.Sleep(1 * time.Second)
log.Info("Sending Phase0 Messages to BeaconClient")
sendHeadMessage(bc, firstHead)
sendHeadMessage(bc, secondHead)
sendHeadMessage(bc, thirdHead)
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 2 {
time.Sleep(1 * time.Second)
}
log.Info("Checking Phase0 to make sure the fork was marked properly.")
validateSlot(bc, &firstHead, epoch, "forked")
validateSlot(bc, &secondHead, epoch, "forked")
validateSlot(bc, &thirdHead, epoch, "proposed")
log.Info("Send the reorg message.")
data, err := json.Marshal(&beaconclient.ChainReorg{
Slot: firstHead.Slot,
Depth: "1",
OldHeadBlock: thirdHead.Block,
NewHeadBlock: secondHead.Block,
OldHeadState: thirdHead.State,
NewHeadState: secondHead.State,
Epoch: strconv.Itoa(epoch),
ExecutionOptimistic: false,
})
logrus.Info("publish complete")
Expect(err).ToNot(HaveOccurred())
bc.ReOrgTracking.MessagesCh <- &sse.Event{
Data: data,
}
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 3 {
time.Sleep(1 * time.Second)
}
log.Info("Make sure the forks were properly updated!")
validateSlot(bc, &firstHead, epoch, "forked")
validateSlot(bc, &secondHead, epoch, "proposed")
validateSlot(bc, &thirdHead, epoch, "forked")
}
// A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it.
func provideState(w http.ResponseWriter, req *http.Request) {
path := strings.Split(req.URL.Path, "/")
slot := path[len(path)-1]
slotFile := "data/" + slot + "/beacon-state.ssz"
dat, err := os.ReadFile(slotFile)
if err != nil {
fmt.Fprintf(w, "Can't find the slot file, %s", slotFile)
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(dat)
// A test to validate a single block was processed correctly
func (tbc TestBeaconNode) testProcessBlock(head beaconclient.Head, epoch int) {
bc := setUpTest(tbc.TestConfig)
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
go bc.CaptureHead()
time.Sleep(1 * time.Second)
sendHeadMessage(bc, head)
validateSlot(bc, &head, epoch, "proposed")
}
// A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it.
func provideBlock(w http.ResponseWriter, req *http.Request) {
path := strings.Split(req.URL.Path, "/")
slot := path[len(path)-1]
slotFile := "data/" + slot + "/signed-beacon-block.ssz"
dat, err := os.ReadFile(slotFile)
if err != nil {
fmt.Fprintf(w, "Can't find the slot file, %s", slotFile)
// A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly.
func (tbc TestBeaconNode) testMultipleHead(firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int) {
bc := setUpTest(tbc.TestConfig)
tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot)
defer httpmock.DeactivateAndReset()
go bc.CaptureHead()
time.Sleep(1 * time.Second)
sendHeadMessage(bc, firstHead)
sendHeadMessage(bc, secondHead)
for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 1 {
time.Sleep(1 * time.Second)
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(dat)
log.Info("Checking Altair to make sure the fork was marked properly.")
validateSlot(bc, &firstHead, epoch, "forked")
validateSlot(bc, &secondHead, epoch, "proposed")
}

View File

@ -29,10 +29,17 @@ VALUES ($1, $2, $3) ON CONFLICT (slot, state_root) DO NOTHING`
UpsertBlocksStmt string = `
INSERT INTO public.blocks (key, data)
VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
UpdateReorgStmt string = `UPDATE ethcl.slots
SET status=forked
WHERE slot=$1 AND block_hash<>$2
RETURNING block_hash;`
UpdateForkedStmt string = `UPDATE ethcl.slots
SET status='forked'
WHERE slot=$1 AND block_root<>$2
RETURNING block_root;`
UpdateProposedStmt string = `UPDATE ethcl.slots
SET status='proposed'
WHERE slot=$1 AND block_root=$2
RETURNING block_root;`
CheckProposedStmt string = `SELECT slot, block_root
FROM ethcl.slots
WHERE slot=$1 AND block_root=$2;`
)
// Put all functionality to prepare the write object
@ -40,6 +47,7 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
// Remove any of it from the processslot file.
type DatabaseWriter struct {
Db sql.Database
Metrics *BeaconClientMetrics
DbSlots *DbSlots
DbSignedBeaconBlock *DbSignedBeaconBlock
DbBeaconState *DbBeaconState
@ -47,9 +55,10 @@ type DatabaseWriter struct {
rawSignedBeaconBlock []byte
}
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, status string) *DatabaseWriter {
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, status string, metrics *BeaconClientMetrics) *DatabaseWriter {
dw := &DatabaseWriter{
Db: db,
Db: db,
Metrics: metrics,
}
dw.prepareSlotsModel(slot, stateRoot, blockRoot, status)
dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot)
@ -101,6 +110,7 @@ func (dw *DatabaseWriter) writeFullSlot() {
dw.writeSlots()
dw.writeSignedBeaconBlocks()
dw.writeBeaconState()
dw.Metrics.IncrementHeadTrackingInserts(1)
}
// Write the information for the generic slots table. For now this is only one function.
@ -137,7 +147,7 @@ func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) {
func (dw *DatabaseWriter) upsertSignedBeaconBlock() {
_, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.MhKey)
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.signed_beacon_block table")
loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table")
}
}
@ -151,25 +161,89 @@ func (dw *DatabaseWriter) writeBeaconState() {
func (dw *DatabaseWriter) upsertBeaconState() {
_, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey)
if err != nil {
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.signed_beacon_block table")
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table")
}
}
// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot.
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
func updateReorgs(db sql.Database, slot string, latestBlockRoot string) (int64, error) {
res, err := db.Exec(context.Background(), UpdateReorgStmt, slot, latestBlockRoot)
func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
forkCount, err := updateForked(db, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with reorgs.")
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.")
// Add to knownGaps Table
}
proposedCount, err := updateProposed(db, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.")
// Add to knownGaps Table
}
if forkCount > 0 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"forkCount": forkCount,
}).Info("Updated rows that were forked.")
} else {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"forkCount": forkCount,
}).Warn("There were no forked rows to update.")
}
if proposedCount == 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": proposedCount,
}).Info("Updated the row that should have been marked as proposed.")
} else if proposedCount > 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": proposedCount,
}).Error("Too many rows were marked as proposed!")
} else if proposedCount == 0 {
var count int
err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.")
}
if count != 1 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"proposedCount": count,
}).Warn("The proposed block was not marked as proposed...")
} else {
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
}
}
metrics.IncrementHeadTrackingReorgs(1)
}
// Update the slots table by marking the old slot's as forked.
func updateForked(db sql.Database, slot string, latestBlockRoot string) (int64, error) {
res, err := db.Exec(context.Background(), UpdateForkedStmt, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the forked slots")
return 0, err
}
count, err := res.RowsAffected()
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were effected by the reorg.")
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were marked as forked.")
return 0, err
}
return count, err
}
func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64, error) {
res, err := db.Exec(context.Background(), UpdateProposedStmt, slot, latestBlockRoot)
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the proposed slot.")
return 0, err
}
count, err := res.RowsAffected()
if err != nil {
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were marked as proposed")
return 0, err
}
return count, nil
return count, err
}
// Dummy function for calculating the mhKey.

View File

@ -19,23 +19,26 @@ var (
// When new messages come in, it will ensure that they are decoded into JSON.
// If any errors occur, it log the error information.
func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) {
errG := new(errgroup.Group)
errG.Go(func() error {
err := eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
if err != nil {
return err
go func() {
errG := new(errgroup.Group)
errG.Go(func() error {
err := eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh)
if err != nil {
return err
}
return nil
})
if err := errG.Wait(); err != nil {
log.WithFields(log.Fields{
"err": err,
"endpoint": eventHandler.Endpoint,
}).Error("Unable to subscribe to the SSE endpoint.")
return
} else {
loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.")
}
return nil
})
if err := errG.Wait(); err != nil {
log.WithFields(log.Fields{
"err": err,
"endpoint": eventHandler.Endpoint,
}).Error("Unable to subscribe to the SSE endpoint.")
return
} else {
loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.")
}
}()
for {
select {
case message := <-eventHandler.MessagesCh:

View File

@ -0,0 +1,15 @@
package beaconclient
import "sync/atomic"
// Wrapper function to increment inserts. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementHeadTrackingInserts(inc uint64) {
atomic.AddUint64(&m.HeadTrackingInserts, inc)
}
// Wrapper function to increment reorgs. If we want to use mutexes later we can easily update all
// occurrences here.
func (m *BeaconClientMetrics) IncrementHeadTrackingReorgs(inc uint64) {
atomic.AddUint64(&m.HeadTrackingReorgs, inc)
}

View File

@ -17,7 +17,7 @@ func (bc *BeaconClient) handleReorg() {
for {
reorg := <-bc.ReOrgTracking.ProcessCh
log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.")
processReorg(bc.Db, reorg.Slot, reorg.NewHeadBlock)
writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics)
}
}
@ -33,7 +33,7 @@ func (bc *BeaconClient) handleHead() {
err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot),
}
}
err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot)
err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics)
if err != nil {
loghelper.LogSlotError(head.Slot, err)
}

View File

@ -1,31 +0,0 @@
package beaconclient
import (
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
func processReorg(db sql.Database, slot string, latestBlockRoot string) {
// Check to see if there are slots in the DB with the given slot.
// Update them ALL to forked
// Upsert the new slot into the DB, mark the status to proposed.
// Query at the end to make sure that you have handled the reorg properly.
updatedRows, err := updateReorgs(db, slot, latestBlockRoot)
if err != nil {
// Add this slot to the knownGaps table..
// Maybe we need to rename the knownGaps table to the "batchProcess" table.
}
if updatedRows > 0 {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"updatedRows": updatedRows,
}).Info("Updated DB based on Reorgs.")
} else {
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
"updatedRows": updatedRows,
}).Warn("There were no rows to update.")
}
}

View File

@ -30,14 +30,15 @@ var (
type ProcessSlot struct {
// Generic
Slot int // The slot number.
Epoch int // The epoch number.
BlockRoot string // The hex encoded string of the BlockRoot.
StateRoot string // The hex encoded string of the StateRoot.
ParentBlockRoot string // The hex encoded string of the parent block.
Status string // The status of the block
HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and missed slots.
Db sql.Database // The DB object used to write to the DB.
Slot int // The slot number.
Epoch int // The epoch number.
BlockRoot string // The hex encoded string of the BlockRoot.
StateRoot string // The hex encoded string of the StateRoot.
ParentBlockRoot string // The hex encoded string of the parent block.
Status string // The status of the block
HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and missed slots.
Db sql.Database // The DB object used to write to the DB.
Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics
// BeaconBlock
SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock
@ -54,7 +55,7 @@ type ProcessSlot struct {
}
// This function will do all the work to process the slot and write it to the DB.
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string) error {
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics) error {
headOrHistoric = strings.ToLower(headOrHistoric)
if headOrHistoric != "head" && headOrHistoric != "historic" {
return fmt.Errorf("headOrBatch must be either historic or head!")
@ -64,6 +65,8 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
BlockRoot: blockRoot,
StateRoot: stateRoot,
HeadOrHistoric: headOrHistoric,
Db: db,
Metrics: metrics,
}
// Get the SignedBeaconBlock.
@ -78,25 +81,24 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
return err
}
// Get this object ready to write
dw := ps.createWriteObjects()
// Write the object to the DB.
dw.writeFullSlot()
// Handle any reorgs or skipped slots.
if ps.HeadOrHistoric == "head" {
if previousSlot != 0 && previousBlockRoot != "" {
ps.checkPreviousSlot(previousSlot, previousBlockRoot)
}
}
// Get this object ready to write
dw := ps.createWriteObjects(db)
// Write the object to the DB.
dw.writeFullSlot()
return nil
}
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string) error {
return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head")
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics) error {
return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics)
}
// Handle a historic slot. A wrapper function for calling `handleFullSlot`.
@ -179,7 +181,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
"slot": ps.FullBeaconState.Slot,
"fork": true,
}).Warn("A fork occurred! The previous slot and current slot match.")
processReorg(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot)
writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
} else if previousSlot-1 != int(ps.FullBeaconState.Slot) {
log.WithFields(log.Fields{
"previousSlot": previousSlot,
@ -192,7 +194,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
"previousBlockRoot": previousBlockRoot,
"currentBlockParent": parentRoot,
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
processReorg(ps.Db, strconv.Itoa(previousSlot), parentRoot)
writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics)
// Call our batch processing function.
// Continue with this slot.
} else {
@ -216,7 +218,7 @@ func (ps *ProcessSlot) checkMissedSlot() {
}
// Transforms all the raw data into DB models that can be written to the DB.
func (ps *ProcessSlot) createWriteObjects(db sql.Database) *DatabaseWriter {
func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter {
var (
stateRoot string
blockRoot string
@ -234,6 +236,7 @@ func (ps *ProcessSlot) createWriteObjects(db sql.Database) *DatabaseWriter {
if ps.BlockRoot != "" {
blockRoot = ps.BlockRoot
} else {
log.Info("We need to add logic")
// We need to get the state of Slot + 1, then we can run the below.
// WE can query it for each run, or we can leave it blank, and update it.
// I just want to avoid getting the same state twice, especially since the state can get heavy.
@ -251,7 +254,7 @@ func (ps *ProcessSlot) createWriteObjects(db sql.Database) *DatabaseWriter {
status = "proposed"
}
dw := CreateDatabaseWrite(db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, status)
dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, status, ps.Metrics)
dw.rawSignedBeaconBlock = ps.SszSignedBeaconBlock
dw.rawBeaconState = ps.SszBeaconState

View File

@ -29,17 +29,18 @@ var _ = Describe("Pgx", func() {
_, err := postgres.NewPostgresDB(postgres.Config{
Driver: "PGX",
})
Expect(err).NotTo(BeNil())
Expect(err).To(HaveOccurred())
present, err := doesContainsSubstring(err.Error(), sql.DbConnectionFailedMsg)
Expect(present).To(BeTrue())
Expect(err).NotTo(HaveOccurred())
})
})
Context("The connection is successful", func() {
It("Should create a DB object", func() {
db, err := postgres.NewPostgresDB(postgres.DefaultConfig)
defer db.Close()
Expect(err).To(BeNil())
defer db.Close()
})
})
})

View File

@ -38,7 +38,6 @@ func Shutdown(ctx context.Context, notifierCh chan os.Signal, timeout time.Durat
timeoutFunc := time.AfterFunc(timeout, func() {
log.Warnf(TimeoutErr(timeout.String()).Error())
errCh <- TimeoutErr(timeout.String())
return
})
defer timeoutFunc.Stop()