diff --git a/.github/workflows/on-pr.yml b/.github/workflows/on-pr.yml index b60ac30..75a38d7 100644 --- a/.github/workflows/on-pr.yml +++ b/.github/workflows/on-pr.yml @@ -11,6 +11,10 @@ on: description: "The branch, commit or sha from ipld-eth-db to checkout" required: false default: "main" + ssz-data-ref: + description: "The branch, commit or sha from ssz-data to checkout" + required: false + default: "main" pull_request: paths: - "!**.md" @@ -22,7 +26,8 @@ on: env: stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || '21d076268730e3f25fcec6371c1aca1bf48040d8'}} - ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || '05600e51d2163e1c5e2a872cb54606bc0a380d12' }} + ipld-eth-db-ref: ${{ github.event.inputs.ipld-eth-db-ref || 'minimal-beacon-chain-schema' }} + ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }} GOPATH: /tmp/go jobs: build: @@ -96,12 +101,26 @@ jobs: path: "./ipld-eth-db/" fetch-depth: 0 + - uses: actions/checkout@v3 + with: + ref: ${{ env.ssz-data-ref }} + repository: vulcanize/ssz-data + path: "./ipld-ethcl-indexer/pkg/beaconclient/ssz-data" + fetch-depth: 0 + - name: Create config file run: | echo vulcanize_ipld_eth_db=$GITHUB_WORKSPACE/ipld-eth-db/ > ./config.sh echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh cat ./config.sh + - name: Run docker compose + run: | + docker-compose \ + -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-db.yml" \ + --env-file ./config.sh \ + up -d --build + - uses: actions/setup-go@v3 with: go-version: ">=1.17.0" diff --git a/.gitignore b/.gitignore index e4900de..826080f 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ ipld-ethcl-indexer.log report.json cover.profile temp/* +pkg/beaconclient/ssz-data/ diff --git a/Makefile b/Makefile index d3bbae0..e9ed9b4 100644 --- a/Makefile +++ b/Makefile @@ -66,21 +66,19 @@ unit-test-local: go vet ./... go fmt ./... $(GINKGO) -r --label-filter unit \ - --procs=4 --compilers=4 \ --randomize-all --randomize-suites \ --fail-on-pending --keep-going \ - --race --trace + --trace .PHONY: unit-test-ci unit-test-ci: go vet ./... go fmt ./... $(GINKGO) -r --label-filter unit \ - --procs=4 --compilers=4 \ --randomize-all --randomize-suites \ --fail-on-pending --keep-going \ --cover --coverprofile=cover.profile \ - --race --trace --json-report=report.json + --trace --json-report=report.json .PHONY: build diff --git a/README.md b/README.md index f24ade2..52b433b 100644 --- a/README.md +++ b/README.md @@ -14,9 +14,27 @@ This application will capture all the `BeaconState`'s and `SignedBeaconBlock`'s To learn more about the applications individual components, please read the [application components](/application_component.md). -# Running the Application +# Quick Start -To run the application, utilize the following command, and update the values as needed. +## Running the Application + +To run the application, do as follows: + +1. Setup the prerequisite applications. + a. Run a beacon client (such as lighthouse). + b. Run a postgres DB. + c. You can utilize the `stack-orchestrator` [repository](https://github.com/vulcanize/stack-orchestrato). + + ``` + ./wrapper.sh -e skip \ + -d ../docker/local/docker-compose-db.yml \ + -d ../docker/latest/docker-compose-lighthouse.yml \ + -v remove \ + -p ../local-config.sh + + ``` + +2. Run the start up command. ``` go run main.go capture head --db.address localhost \ @@ -27,9 +45,19 @@ go run main.go capture head --db.address localhost \ --db.driver PGX \ --bc.address localhost \ --bc.port 5052 \ - --log.level info + --bc.connectionProtocol http \ + --log.level info \ + --log.output=true ``` +## Running Tests + +To run tests, you will need to clone another repository which contains all the ssz files. + +1. `git clone git@github.com:vulcanize/ssz-data.git pkg/beaconclient/ssz-data` +2. To run unit tests, make sure you have a DB running: `make unit-test-local` +3. To run integration tests, make sure you have a lighthouse client and a DB running: `make integration-test-local-no-race` . + # Development Patterns This section will cover some generic development patterns utilizes. @@ -58,9 +86,18 @@ This project utilizes `ginkgo` for testing. A few notes on testing: - All test packages are named `{base_package}_test`. This ensures we only test the public methods. - If there is a need to test a private method, please include why in the testing file. - Unit tests must contain the `Label("unit")`. -- Unit tests should not rely on any running service. If a running service is needed. Utilize an integration test. +- Unit tests should not rely on any running service (except for a postgres DB). If a running service is needed. Utilize an integration test. - Integration tests must contain the `Label("integration")`. +#### Understanding Testing Components + +A few notes about the testing components. + +- The `TestEvents` map contains several events for testers to leverage when testing. +- Any object ending in `-dummy` is not a real object. You will also notice it has a present field called `MimicConfig`. This object will use an existing SSZ object, and update the parameters from the `Head` and `MimicConfig`. + - This is done because creating an empty or minimal `SignedBeaconBlock` and `BeaconState` is fairly challenging. + - By slightly modifying an existing object, we can test re-org, malformed objects, and other negative conditions. + # Contribution If you want to contribute please make sure you do the following: diff --git a/application_component.md b/application_component.md index cfd9e4b..d8bd3ec 100644 --- a/application_component.md +++ b/application_component.md @@ -25,3 +25,15 @@ This package will contain code to interact with the beacon client. ## `pkg/version` A generic package which can be utilized to easily version our applications. + +## `pkg/gracefulshutdown` + +A generic package that can be used to shutdown various services within an application. + +## `pkg/loghelper` + +This package contains useful functions for logging. + +## `internal/shutdown` + +This package is used to shutdown the `ipld-ethcl-indexer`. It calls the `pkg/gracefulshutdown` package. diff --git a/go.mod b/go.mod index 7fc22da..d2ffb6a 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jackc/pgx/v4 v4.16.0 + github.com/jarcoal/httpmock v1.2.0 github.com/julienschmidt/httprouter v1.3.0 github.com/magiconair/properties v1.8.6 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect diff --git a/go.sum b/go.sum index 3d6d492..46ae469 100644 --- a/go.sum +++ b/go.sum @@ -346,6 +346,8 @@ github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dv github.com/jackc/puddle v1.2.1 h1:gI8os0wpRXFd4FiAY2dWiqRK037tjj3t7rKFeO4X5iw= github.com/jackc/puddle v1.2.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= +github.com/jarcoal/httpmock v1.2.0 h1:gSvTxxFR/MEMfsGrvRbdfpRUMBStovlSRLw0Ep1bwwc= +github.com/jarcoal/httpmock v1.2.0/go.mod h1:oCoTsnAz4+UoOUIf5lJOWV2QQIW5UoeUI6aM2YnWAZk= github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= diff --git a/internal/shutdown/shutdown.go b/internal/shutdown/shutdown.go index 299a027..ca296b0 100644 --- a/internal/shutdown/shutdown.go +++ b/internal/shutdown/shutdown.go @@ -26,7 +26,7 @@ func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime t }) select { - case _ = <-successCh: + case <-successCh: return nil case err := <-errCh: return err diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index 00b7439..abd1953 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -21,12 +21,19 @@ var ( //bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain ) +// A structure utilized for keeping track of various metrics. Currently, mostly used in testing. +type BeaconClientMetrics struct { + HeadTrackingInserts uint64 // Number of head events we wrote to the DB. + HeadTrackingReorgs uint64 // The number of reorg events written to the DB. +} + // A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying. type BeaconClient struct { - Context context.Context // A context generic context with multiple uses. - ServerEndpoint string // What is the endpoint of the beacon server. - PerformHistoricalProcessing bool // Should we perform historical processing? - Db sql.Database // Database object used for reads and writes. + Context context.Context // A context generic context with multiple uses. + ServerEndpoint string // What is the endpoint of the beacon server. + PerformHistoricalProcessing bool // Should we perform historical processing? + Db sql.Database // Database object used for reads and writes. + Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics. // Used for Head Tracking PerformHeadTracking bool // Should we track head? @@ -63,6 +70,10 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres ServerEndpoint: endpoint, HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint), ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), + Metrics: &BeaconClientMetrics{ + HeadTrackingInserts: 0, + HeadTrackingReorgs: 0, + }, //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), } } diff --git a/pkg/beaconclient/beaconclient.test b/pkg/beaconclient/beaconclient.test new file mode 100755 index 0000000..6ba7bcc Binary files /dev/null and b/pkg/beaconclient/beaconclient.test differ diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index f846912..2896645 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -1,198 +1,517 @@ package beaconclient_test import ( + "context" + "encoding/hex" + "encoding/json" "fmt" "net/http" "os" "path/filepath" - "strings" + "strconv" + "sync/atomic" "time" + "github.com/jarcoal/httpmock" . "github.com/onsi/ginkgo/v2" - "github.com/sirupsen/logrus" + types "github.com/prysmaticlabs/prysm/consensus-types/primitives" + st "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" + "github.com/r3labs/sse" + log "github.com/sirupsen/logrus" + + . "github.com/onsi/gomega" - // . "github.com/onsi/gomega" - "github.com/r3labs/sse/v2" "github.com/vulcanize/ipld-ethcl-indexer/pkg/beaconclient" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql/postgres" ) type Message struct { - HeadMessage beaconclient.Head // The head messsage that will be streamed to the BeaconClient - ReorgMessage beaconclient.ChainReorg // The reorg messsage that will be streamed to the BeaconClient - TestNotes string // A small explanation of the purpose this structure plays in the testing landscape. - SignedBeaconBlock string // The file path output of an SSZ encoded SignedBeaconBlock. - BeaconState string // The file path output of an SSZ encoded BeaconState. - SuccessfulDBQuery string // A string that indicates what a query to the DB should output to pass the test. + HeadMessage beaconclient.Head // The head messsage that will be streamed to the BeaconClient + TestNotes string // A small explanation of the purpose this structure plays in the testing landscape. + MimicConfig *MimicConfig // A configuration of parameters that you are trying to + SignedBeaconBlock string // The file path output of an SSZ encoded SignedBeaconBlock. + BeaconState string // The file path output of an SSZ encoded BeaconState. } -var TestEvents map[string]*Message +// A structure that can be utilized to mimic and existing SSZ object but change it ever so slightly. +// This is used because creating your own SSZ object is a headache. +type MimicConfig struct { + ParentRoot string // The parent root, leave it empty if you want a to use the universal +} var _ = Describe("Capturehead", func() { - TestEvents = map[string]*Message{ - "100": { - HeadMessage: beaconclient.Head{ - Slot: "100", - Block: "0x582187e97f7520bb69eea014c3834c964c45259372a0eaaea3f032013797996b", - State: "0xf286a0379c0386a3c7be28d05d829f8eb7b280cc9ede15449af20ebcd06a7a56", - CurrentDutyDependentRoot: "", - PreviousDutyDependentRoot: "", - EpochTransition: false, - ExecutionOptimistic: false, + + var ( + TestConfig Config + BeaconNodeTester TestBeaconNode + address string = "localhost" + port int = 8080 + protocol string = "http" + TestEvents map[string]Message + dbHost string = "localhost" + dbPort int = 8077 + dbName string = "vulcanize_testing" + dbUser string = "vdbm" + dbPassword string = "password" + dbDriver string = "pgx" + dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42" + ) + + BeforeEach(func() { + TestEvents = map[string]Message{ + "100-dummy": { + HeadMessage: beaconclient.Head{ + Slot: "100", + Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a92d52a63b919b", + State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c0723c92c1782bfa", + CurrentDutyDependentRoot: "", + PreviousDutyDependentRoot: "", + EpochTransition: false, + ExecutionOptimistic: false, + }, + TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.", + MimicConfig: &MimicConfig{}, + SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"), }, - TestNotes: "This is a simple, easy to process block.", - SignedBeaconBlock: filepath.Join("data", "100", "signed-beacon-block.ssz"), - BeaconState: filepath.Join("data", "100", "beacon-state.ssz"), - }, - "101": { - HeadMessage: beaconclient.Head{ - Slot: "101", - Block: "0xabe1a972e512182d04f0d4a5c9c25f9ee57c2e9d0ff3f4c4c82fd42d13d31083", - State: "0xcb04aa2edbf13c7bb7e7bd9b621ced6832e0075e89147352eac3019a824ce847", - CurrentDutyDependentRoot: "", - PreviousDutyDependentRoot: "", - EpochTransition: false, - ExecutionOptimistic: false, + "100-dummy-2": { + HeadMessage: beaconclient.Head{ + Slot: "100", + Block: "04955400371347e26f61d7a4bbda5b23fa0b25d5fc465160f2a9aaaaaaaaaaaa", + State: "36d5c9a129979b4502bd9a06e57a742810ecbc3fa55a0361c072bbbbbbbbbbbb", + CurrentDutyDependentRoot: "", + PreviousDutyDependentRoot: "", + EpochTransition: false, + ExecutionOptimistic: false, + }, + TestNotes: "A block that is supposed to replicate slot 100, but contains some dummy test information.", + MimicConfig: &MimicConfig{}, + SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"), }, - TestNotes: "This is a simple, easy to process block.", - SignedBeaconBlock: filepath.Join("data", "101", "signed-beacon-block.ssz"), - BeaconState: filepath.Join("data", "101", "beacon-state.ssz"), - }, - } + "100": { + HeadMessage: beaconclient.Head{ + Slot: "100", + Block: "0x582187e97f7520bb69eea014c3834c964c45259372a0eaaea3f032013797996b", + State: "0xf286a0379c0386a3c7be28d05d829f8eb7b280cc9ede15449af20ebcd06a7a56", + CurrentDutyDependentRoot: "", + PreviousDutyDependentRoot: "", + EpochTransition: false, + ExecutionOptimistic: false, + }, + TestNotes: "An easy to process Phase 0 block", + SignedBeaconBlock: filepath.Join("ssz-data", "100", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "100", "beacon-state.ssz"), + }, + "2375703-dummy": { + HeadMessage: beaconclient.Head{ + Slot: "2375703", + Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508a8e648e", + State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d265aef5f", + CurrentDutyDependentRoot: "", + PreviousDutyDependentRoot: "", + EpochTransition: false, + ExecutionOptimistic: false, + }, + TestNotes: "This is a dummy message that is used for reorgs", + MimicConfig: &MimicConfig{}, + SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"), + }, + "2375703-dummy-2": { + HeadMessage: beaconclient.Head{ + Slot: "2375703", + Block: "c9fb337b62e2a0dae4f27ab49913132570f7f2cab3f23ad99f4d07508aaaaaaa", + State: "0299a145bcda2c8f5e7d2e068ee101861edbee2ec1db2d5e1d850b0d2bbbbbbb", + CurrentDutyDependentRoot: "", + PreviousDutyDependentRoot: "", + EpochTransition: false, + ExecutionOptimistic: false, + }, + TestNotes: "This is a dummy message that is used for reorgs", + MimicConfig: &MimicConfig{}, + SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"), + }, + "2375703": { + HeadMessage: beaconclient.Head{ + Slot: "2375703", + Block: "0x4392372c5f6e39499e31bf924388b5815639103149f0f54f8a453773b1802301", + State: "0xb6215b560273af63ec7e011572b60ec1ca0b0232f8ff44fcd4ed55c7526e964e", + CurrentDutyDependentRoot: "", PreviousDutyDependentRoot: "", EpochTransition: false, ExecutionOptimistic: false}, + TestNotes: "An easy to process Altair Block", + SignedBeaconBlock: filepath.Join("ssz-data", "2375703", "signed-beacon-block.ssz"), + BeaconState: filepath.Join("ssz-data", "2375703", "beacon-state.ssz"), + }, + } + TestConfig = Config{ + protocol: protocol, + address: address, + port: port, + dummyParentRoot: dummyParentRoot, + dbHost: dbHost, + dbPort: dbPort, + dbName: dbName, + dbUser: dbUser, + dbPassword: dbPassword, + dbDriver: dbDriver, + } + + BeaconNodeTester = TestBeaconNode{ + TestEvents: TestEvents, + TestConfig: TestConfig, + } + }) // We might also want to add an integration test that will actually process a single event, then end. // This will help us know that our models match that actual data being served from the beacon node. - Describe("Receiving New Head SSE messages", Label("unit"), func() { - Context("Correctly formatted", Label("dry"), func() { + Describe("Receiving New Head SSE messages", Label("unit", "behavioral"), func() { + Context("Correctly formatted Phase0 Block", func() { It("Should turn it into a struct successfully.", func() { - server := createSseServer() - logrus.Info("DONE!") - client := sse.NewClient("http://localhost:8080" + beaconclient.BcHeadTopicEndpoint) + BeaconNodeTester.testProcessBlock(BeaconNodeTester.TestEvents["100"].HeadMessage, 3) - logrus.Info("DONE!") - ch := make(chan *sse.Event) - go client.SubscribeChanRaw(ch) - - time.Sleep(2 * time.Second) - logrus.Info("DONE!") - sendMessageToStream(server, []byte("hello")) - client.Unsubscribe(ch) - val := <-ch - - logrus.Info("DONE!") - logrus.Info(val) }) }) - //Context("A single incorrectly formatted", func() { + Context("Correctly formatted Altair Block", func() { + It("Should turn it into a struct successfully.", func() { + BeaconNodeTester.testProcessBlock(BeaconNodeTester.TestEvents["2375703"].HeadMessage, 74240) + }) + }) + //Context("A single incorrectly formatted head message", func() { // It("Should create an error, maybe also add the projected slot to the knownGaps table......") + // If it can unmarshal the head add it to knownGaps //}) //Context("An incorrectly formatted message sandwiched between correctly formatted messages", func() { // It("Should create an error, maybe also add the projected slot to the knownGaps table......") //}) + // Context("When there is a skipped slot", func() { + // It("Should indicate that the slot was skipped") + // }) + // Context("When the slot is not properly served", func() { + // It("Should return an error, and add the slot to the knownGaps table.") + // }) + //}) + // Context("With gaps in between head slots", func() { + // It("Should add the slots in between to the knownGaps table") + // }) + // Context("With the previousBlockHash not matching the parentBlockHash", func() { + // It("Should recognize the reorg and add the previous slot to knownGaps table.") + // }) + // Context("Out of order", func() { + // It("Not sure what it should do....") + // }) }) - //Describe("Receiving New Reorg SSE messages", Label("unit"), func() { - // Context("Reorg slot is already in the DB", func() { - // It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.") - // }) - // Context("Multiple reorgs have occurred on this slot", func() { - // It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.") - // }) - // Context("Reorg slot in not already in the DB", func() { - // It("Should simply have the correct slot in the DB.") - // }) + Describe("ReOrg Scenario", Label("unit", "behavioral"), func() { + Context("Altair: Multiple head messages for the same slot.", func() { + It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { + BeaconNodeTester.testMultipleHead(TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240) + }) + }) + Context("Phase0: Multiple head messages for the same slot.", func() { + It("The previous block should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { + BeaconNodeTester.testMultipleHead(TestEvents["100-dummy"].HeadMessage, TestEvents["100"].HeadMessage, 3) + }) + }) + Context("Phase 0: Multiple reorgs have occurred on this slot", Label("new"), func() { + It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { + BeaconNodeTester.testMultipleReorgs(TestEvents["100-dummy"].HeadMessage, TestEvents["100-dummy-2"].HeadMessage, TestEvents["100"].HeadMessage, 3) + }) + }) + Context("Altair: Multiple reorgs have occurred on this slot", Label("new"), func() { + It("The previous blocks should be marked as 'forked', the new block should be the only one marked as 'proposed'.", func() { + BeaconNodeTester.testMultipleReorgs(TestEvents["2375703-dummy"].HeadMessage, TestEvents["2375703-dummy-2"].HeadMessage, TestEvents["2375703"].HeadMessage, 74240) + }) + }) + //Context("Reorg slot in not already in the DB", func() { + // It("Should simply have the correct slot in the DB.") + // Add to knowngaps + //}) - //}) - - //Describe("Querying SignedBeaconBlock and Beacon State", Label("unit"), func() { - // Context("When the slot is properly served by the beacon node", func() { - // It("Should provide a successful response.") - // }) - // Context("When there is a skipped slot", func() { - // It("Should indicate that the slot was skipped") - // // Future use case. - - // }) - // Context("When the slot is not properly served", func() { - // It("Should return an error, and add the slot to the knownGaps table.") - // }) - //}) - - //Describe("Receiving properly formatted Head SSE events.", Label("unit"), func() { - // Context("In sequential order", func() { - // It("Should write each event to the DB successfully.") - // }) - // Context("With gaps in slots", func() { - // It("Should add the slots in between to the knownGaps table") - // }) - // Context("With a repeat slot", func() { - // It("Should recognize the reorg and process it.") - // }) - // Context("With the previousBlockHash not matching the parentBlockHash", func() { - // It("Should recognize the reorg and add the previous slot to knownGaps table.") - // }) - // Context("Out of order", func() { - // It("Not sure what it should do....") - // }) - // Context("With a skipped slot", func() { - // It("Should recognize the slot as skipped and continue without error.") - // // Future use case - // }) - //}) + }) }) -// Create a new Sse.Server. -func createSseServer() *sse.Server { - // server := sse.New() - // server.CreateStream("") - - mux := http.NewServeMux() - //mux.HandleFunc(beaconclient.BcHeadTopicEndpoint, func(w http.ResponseWriter, r *http.Request) { - // go func() { - // // Received Browser Disconnection - // <-r.Context().Done() - // println("The client is disconnected here") - // return - // }() - - // server.ServeHTTP(w, r) - //}) - mux.HandleFunc(beaconclient.BcStateQueryEndpoint, provideState) - mux.HandleFunc(beaconclient.BcBlockQueryEndpoint, provideBlock) - go http.ListenAndServe(":8080", mux) - return server +type Config struct { + protocol string + address string + port int + dummyParentRoot string + dbHost string + dbPort int + dbName string + dbUser string + dbPassword string + dbDriver string } -// Send messages to the stream. -func sendMessageToStream(server *sse.Server, data []byte) { - server.Publish("", &sse.Event{ - Data: data, +////////////////////////////////////////////////////// +// Helper functions +////////////////////////////////////////////////////// + +// Must run before each test. We can't use the beforeEach because of the way +// Gingko treats race conditions. +func setUpTest(config Config) *beaconclient.BeaconClient { + bc := *beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port) + db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver) + Expect(err).ToNot(HaveOccurred()) + + // Drop all records from the DB. + clearEthclDbTables(db) + bc.Db = db + + return &bc +} + +// A helper function to validate the expected output. +func validateSlot(bc *beaconclient.BeaconClient, headMessage *beaconclient.Head, correctEpoch int, correctStatus string) { + epoch, dbSlot, blockRoot, stateRoot, status := queryDbSlotAndBlock(bc.Db, headMessage.Slot, headMessage.Block) + baseSlot, err := strconv.Atoi(headMessage.Slot) + Expect(err).ToNot(HaveOccurred()) + Expect(dbSlot).To(Equal(baseSlot)) + Expect(epoch).To(Equal(correctEpoch)) + Expect(blockRoot).To(Equal(headMessage.Block)) + Expect(stateRoot).To(Equal(headMessage.State)) + Expect(status).To(Equal(correctStatus)) +} + +// Wrapper function to send a head message to the beaconclient +func sendHeadMessage(bc *beaconclient.BeaconClient, head beaconclient.Head) { + + data, err := json.Marshal(head) + Expect(err).ToNot(HaveOccurred()) + + startInserts := atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts) + bc.HeadTracking.MessagesCh <- &sse.Event{ + ID: []byte{}, + Data: data, + Event: []byte{}, + Retry: []byte{}, + } + for atomic.LoadUint64(&bc.Metrics.HeadTrackingInserts) != startInserts+1 { + time.Sleep(1 * time.Second) + } +} + +// A helper function to query the ethcl.slots table based on the slot and block_root +func queryDbSlotAndBlock(db sql.Database, querySlot string, queryBlockRoot string) (int, int, string, string, string) { + sqlStatement := `SELECT epoch, slot, block_root, state_root, status FROM ethcl.slots WHERE slot=$1 AND block_root=$2;` + var epoch, slot int + var blockRoot, stateRoot, status string + row := db.QueryRow(context.Background(), sqlStatement, querySlot, queryBlockRoot) + err := row.Scan(&epoch, &slot, &blockRoot, &stateRoot, &status) + Expect(err).ToNot(HaveOccurred()) + return epoch, slot, blockRoot, stateRoot, status +} + +// A function that will remove all entries from the ethcl tables for you. +func clearEthclDbTables(db sql.Database) { + deleteQueries := []string{"DELETE FROM ethcl.slots;", "DELETE FROM ethcl.signed_beacon_block;", "DELETE FROM ethcl.beacon_state;", "DELETE FROM ethcl.batch_processing;"} + for _, queries := range deleteQueries { + _, err := db.Exec(context.Background(), queries) + Expect(err).ToNot(HaveOccurred()) + } + +} + +// An object that is used to aggregate test functions. Test functions are needed because we need to +// run the same tests on multiple blocks for multiple forks. So they save us time. +type TestBeaconNode struct { + TestEvents map[string]Message + TestConfig Config +} + +// Create a new new mock for the beacon node. +func (tbc TestBeaconNode) SetupBeaconNodeMock(TestEvents map[string]Message, protocol string, address string, port int, dummyParentRoot string) { + httpmock.Activate() + stateUrl := `=~^` + protocol + "://" + address + ":" + strconv.Itoa(port) + beaconclient.BcStateQueryEndpoint + `([^/]+)\z` + httpmock.RegisterResponder("GET", stateUrl, + func(req *http.Request) (*http.Response, error) { + // Get ID from request + id := httpmock.MustGetSubmatch(req, 1) + dat, err := tbc.provideSsz(id, "state", dummyParentRoot) + if err != nil { + Expect(err).NotTo(HaveOccurred()) + return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), err + } + return httpmock.NewBytesResponse(200, dat), nil + }, + ) + + blockUrl := `=~^` + protocol + "://" + address + ":" + strconv.Itoa(port) + beaconclient.BcBlockQueryEndpoint + `([^/]+)\z` + httpmock.RegisterResponder("GET", blockUrl, + func(req *http.Request) (*http.Response, error) { + // Get ID from request + id := httpmock.MustGetSubmatch(req, 1) + dat, err := tbc.provideSsz(id, "block", dummyParentRoot) + if err != nil { + Expect(err).NotTo(HaveOccurred()) + return httpmock.NewStringResponse(404, fmt.Sprintf("Unable to find file for %s", id)), err + } + return httpmock.NewBytesResponse(200, dat), nil + }, + ) +} + +// A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it. +func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string, dummyParentRoot string) ([]byte, error) { + var slotFile string + var Message Message + + for _, val := range tbc.TestEvents { + if sszIdentifier == "state" { + if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.State == slotIdentifier { + slotFile = val.BeaconState + Message = val + } + } else if sszIdentifier == "block" { + if val.HeadMessage.Slot == slotIdentifier || val.HeadMessage.Block == slotIdentifier { + slotFile = val.SignedBeaconBlock + Message = val + } + } + } + + if Message.MimicConfig != nil { + log.Info("We are going to create a custom SSZ object for testing purposes.") + if sszIdentifier == "block" { + dat, err := os.ReadFile(slotFile) + if err != nil { + return nil, fmt.Errorf("Can't find the slot file, %s", slotFile) + } + + block := &st.SignedBeaconBlock{} + err = block.UnmarshalSSZ(dat) + if err != nil { + log.Error("Error unmarshalling: ", err) + } + + slot, err := strconv.ParseUint(Message.HeadMessage.Slot, 10, 64) + Expect(err).ToNot(HaveOccurred()) + block.Block.Slot = types.Slot(slot) + + block.Block.StateRoot, err = hex.DecodeString(Message.HeadMessage.State) + Expect(err).ToNot(HaveOccurred()) + + if Message.MimicConfig.ParentRoot == "" { + block.Block.ParentRoot, err = hex.DecodeString(dummyParentRoot) + Expect(err).ToNot(HaveOccurred()) + } else { + block.Block.ParentRoot, err = hex.DecodeString(Message.MimicConfig.ParentRoot) + Expect(err).ToNot(HaveOccurred()) + } + return block.MarshalSSZ() + } + if sszIdentifier == "state" { + dat, err := os.ReadFile(slotFile) + if err != nil { + return nil, fmt.Errorf("Can't find the slot file, %s", slotFile) + } + state := st.BeaconState{} + err = state.UnmarshalSSZ(dat) + Expect(err) + slot, err := strconv.ParseUint(Message.HeadMessage.Slot, 10, 64) + Expect(err).ToNot(HaveOccurred()) + state.Slot = types.Slot(slot) + return state.MarshalSSZ() + } + } + + if slotFile == "" { + return nil, fmt.Errorf("We couldn't find the slot file for %s", slotIdentifier) + } + + dat, err := os.ReadFile(slotFile) + if err != nil { + return nil, fmt.Errorf("Can't find the slot file, %s", slotFile) + } + return dat, nil +} + +// Helper function to test three reorg messages. There are going to be many functions like this, +// Because we need to test the same logic for multiple phases. +func (tbc TestBeaconNode) testMultipleReorgs(firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int) { + bc := setUpTest(tbc.TestConfig) + tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot) + defer httpmock.DeactivateAndReset() + + go bc.CaptureHead() + time.Sleep(1 * time.Second) + + log.Info("Sending Phase0 Messages to BeaconClient") + sendHeadMessage(bc, firstHead) + sendHeadMessage(bc, secondHead) + sendHeadMessage(bc, thirdHead) + + for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 2 { + time.Sleep(1 * time.Second) + } + + log.Info("Checking Phase0 to make sure the fork was marked properly.") + validateSlot(bc, &firstHead, epoch, "forked") + validateSlot(bc, &secondHead, epoch, "forked") + validateSlot(bc, &thirdHead, epoch, "proposed") + + log.Info("Send the reorg message.") + + data, err := json.Marshal(&beaconclient.ChainReorg{ + Slot: firstHead.Slot, + Depth: "1", + OldHeadBlock: thirdHead.Block, + NewHeadBlock: secondHead.Block, + OldHeadState: thirdHead.State, + NewHeadState: secondHead.State, + Epoch: strconv.Itoa(epoch), + ExecutionOptimistic: false, }) - logrus.Info("publish complete") + Expect(err).ToNot(HaveOccurred()) + bc.ReOrgTracking.MessagesCh <- &sse.Event{ + Data: data, + } + + for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 3 { + time.Sleep(1 * time.Second) + } + + log.Info("Make sure the forks were properly updated!") + + validateSlot(bc, &firstHead, epoch, "forked") + validateSlot(bc, &secondHead, epoch, "proposed") + validateSlot(bc, &thirdHead, epoch, "forked") + } -// A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it. -func provideState(w http.ResponseWriter, req *http.Request) { - path := strings.Split(req.URL.Path, "/") - slot := path[len(path)-1] - slotFile := "data/" + slot + "/beacon-state.ssz" - dat, err := os.ReadFile(slotFile) - if err != nil { - fmt.Fprintf(w, "Can't find the slot file, %s", slotFile) - } - w.Header().Set("Content-Type", "application/octet-stream") - w.Write(dat) +// A test to validate a single block was processed correctly +func (tbc TestBeaconNode) testProcessBlock(head beaconclient.Head, epoch int) { + bc := setUpTest(tbc.TestConfig) + tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot) + defer httpmock.DeactivateAndReset() + + go bc.CaptureHead() + time.Sleep(1 * time.Second) + sendHeadMessage(bc, head) + validateSlot(bc, &head, epoch, "proposed") } -// A function to mimic querying the state from the beacon node. We simply get the SSZ file are return it. -func provideBlock(w http.ResponseWriter, req *http.Request) { - path := strings.Split(req.URL.Path, "/") - slot := path[len(path)-1] - slotFile := "data/" + slot + "/signed-beacon-block.ssz" - dat, err := os.ReadFile(slotFile) - if err != nil { - fmt.Fprintf(w, "Can't find the slot file, %s", slotFile) +// A test that ensures that if two HeadMessages occur for a single slot they are marked +// as proposed and forked correctly. +func (tbc TestBeaconNode) testMultipleHead(firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int) { + bc := setUpTest(tbc.TestConfig) + tbc.SetupBeaconNodeMock(tbc.TestEvents, tbc.TestConfig.protocol, tbc.TestConfig.address, tbc.TestConfig.port, tbc.TestConfig.dummyParentRoot) + defer httpmock.DeactivateAndReset() + + go bc.CaptureHead() + time.Sleep(1 * time.Second) + + sendHeadMessage(bc, firstHead) + sendHeadMessage(bc, secondHead) + + for atomic.LoadUint64(&bc.Metrics.HeadTrackingReorgs) != 1 { + time.Sleep(1 * time.Second) } - w.Header().Set("Content-Type", "application/octet-stream") - w.Write(dat) + + log.Info("Checking Altair to make sure the fork was marked properly.") + validateSlot(bc, &firstHead, epoch, "forked") + validateSlot(bc, &secondHead, epoch, "proposed") } diff --git a/pkg/beaconclient/data/100/beacon-state.ssz b/pkg/beaconclient/data/100/beacon-state.ssz deleted file mode 100644 index 3de0338..0000000 Binary files a/pkg/beaconclient/data/100/beacon-state.ssz and /dev/null differ diff --git a/pkg/beaconclient/data/100/signed-beacon-block.ssz b/pkg/beaconclient/data/100/signed-beacon-block.ssz deleted file mode 100644 index 12269ee..0000000 Binary files a/pkg/beaconclient/data/100/signed-beacon-block.ssz and /dev/null differ diff --git a/pkg/beaconclient/data/101/beacon-state.ssz b/pkg/beaconclient/data/101/beacon-state.ssz deleted file mode 100644 index e9a8f0a..0000000 Binary files a/pkg/beaconclient/data/101/beacon-state.ssz and /dev/null differ diff --git a/pkg/beaconclient/data/101/signed-beacon-block.ssz b/pkg/beaconclient/data/101/signed-beacon-block.ssz deleted file mode 100644 index 768ed7f..0000000 Binary files a/pkg/beaconclient/data/101/signed-beacon-block.ssz and /dev/null differ diff --git a/pkg/beaconclient/databasewrite.go b/pkg/beaconclient/databasewrite.go index 8e9e076..c6ae8a0 100644 --- a/pkg/beaconclient/databasewrite.go +++ b/pkg/beaconclient/databasewrite.go @@ -29,10 +29,17 @@ VALUES ($1, $2, $3) ON CONFLICT (slot, state_root) DO NOTHING` UpsertBlocksStmt string = ` INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING` - UpdateReorgStmt string = `UPDATE ethcl.slots - SET status=forked - WHERE slot=$1 AND block_hash<>$2 - RETURNING block_hash;` + UpdateForkedStmt string = `UPDATE ethcl.slots + SET status='forked' + WHERE slot=$1 AND block_root<>$2 + RETURNING block_root;` + UpdateProposedStmt string = `UPDATE ethcl.slots + SET status='proposed' + WHERE slot=$1 AND block_root=$2 + RETURNING block_root;` + CheckProposedStmt string = `SELECT slot, block_root + FROM ethcl.slots + WHERE slot=$1 AND block_root=$2;` ) // Put all functionality to prepare the write object @@ -40,6 +47,7 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING` // Remove any of it from the processslot file. type DatabaseWriter struct { Db sql.Database + Metrics *BeaconClientMetrics DbSlots *DbSlots DbSignedBeaconBlock *DbSignedBeaconBlock DbBeaconState *DbBeaconState @@ -47,9 +55,10 @@ type DatabaseWriter struct { rawSignedBeaconBlock []byte } -func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, status string) *DatabaseWriter { +func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string, status string, metrics *BeaconClientMetrics) *DatabaseWriter { dw := &DatabaseWriter{ - Db: db, + Db: db, + Metrics: metrics, } dw.prepareSlotsModel(slot, stateRoot, blockRoot, status) dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot) @@ -101,6 +110,7 @@ func (dw *DatabaseWriter) writeFullSlot() { dw.writeSlots() dw.writeSignedBeaconBlocks() dw.writeBeaconState() + dw.Metrics.IncrementHeadTrackingInserts(1) } // Write the information for the generic slots table. For now this is only one function. @@ -137,7 +147,7 @@ func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) { func (dw *DatabaseWriter) upsertSignedBeaconBlock() { _, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.MhKey) if err != nil { - loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.signed_beacon_block table") + loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table") } } @@ -151,25 +161,89 @@ func (dw *DatabaseWriter) writeBeaconState() { func (dw *DatabaseWriter) upsertBeaconState() { _, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey) if err != nil { - loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.signed_beacon_block table") + loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table") } } // Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot. // We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked. -func updateReorgs(db sql.Database, slot string, latestBlockRoot string) (int64, error) { - res, err := db.Exec(context.Background(), UpdateReorgStmt, slot, latestBlockRoot) +func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) { + forkCount, err := updateForked(db, slot, latestBlockRoot) if err != nil { - loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with reorgs.") + loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.") + // Add to knownGaps Table + } + proposedCount, err := updateProposed(db, slot, latestBlockRoot) + if err != nil { + loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.") + // Add to knownGaps Table + } + + if forkCount > 0 { + loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ + "forkCount": forkCount, + }).Info("Updated rows that were forked.") + } else { + loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ + "forkCount": forkCount, + }).Warn("There were no forked rows to update.") + + } + + if proposedCount == 1 { + loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ + "proposedCount": proposedCount, + }).Info("Updated the row that should have been marked as proposed.") + } else if proposedCount > 1 { + loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ + "proposedCount": proposedCount, + }).Error("Too many rows were marked as proposed!") + } else if proposedCount == 0 { + var count int + err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count) + if err != nil { + loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.") + } + if count != 1 { + loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ + "proposedCount": count, + }).Warn("The proposed block was not marked as proposed...") + } else { + loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.") + } + } + + metrics.IncrementHeadTrackingReorgs(1) +} + +// Update the slots table by marking the old slot's as forked. +func updateForked(db sql.Database, slot string, latestBlockRoot string) (int64, error) { + res, err := db.Exec(context.Background(), UpdateForkedStmt, slot, latestBlockRoot) + if err != nil { + loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the forked slots") return 0, err } count, err := res.RowsAffected() if err != nil { - loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were effected by the reorg.") + loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were marked as forked.") + return 0, err + } + return count, err +} + +func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64, error) { + res, err := db.Exec(context.Background(), UpdateProposedStmt, slot, latestBlockRoot) + if err != nil { + loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the proposed slot.") + return 0, err + } + count, err := res.RowsAffected() + if err != nil { + loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to figure out how many entries were marked as proposed") return 0, err } - return count, nil + return count, err } // Dummy function for calculating the mhKey. diff --git a/pkg/beaconclient/incomingsse.go b/pkg/beaconclient/incomingsse.go index 7e9c222..e8c5e34 100644 --- a/pkg/beaconclient/incomingsse.go +++ b/pkg/beaconclient/incomingsse.go @@ -19,23 +19,26 @@ var ( // When new messages come in, it will ensure that they are decoded into JSON. // If any errors occur, it log the error information. func handleIncomingSseEvent[P ProcessedEvents](eventHandler *SseEvents[P]) { - errG := new(errgroup.Group) - errG.Go(func() error { - err := eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh) - if err != nil { - return err + go func() { + errG := new(errgroup.Group) + errG.Go(func() error { + err := eventHandler.SseClient.SubscribeChanRaw(eventHandler.MessagesCh) + if err != nil { + return err + } + return nil + }) + if err := errG.Wait(); err != nil { + log.WithFields(log.Fields{ + "err": err, + "endpoint": eventHandler.Endpoint, + }).Error("Unable to subscribe to the SSE endpoint.") + return + } else { + loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.") } - return nil - }) - if err := errG.Wait(); err != nil { - log.WithFields(log.Fields{ - "err": err, - "endpoint": eventHandler.Endpoint, - }).Error("Unable to subscribe to the SSE endpoint.") - return - } else { - loghelper.LogEndpoint(eventHandler.Endpoint).Info("Successfully subscribed to the event stream.") - } + + }() for { select { case message := <-eventHandler.MessagesCh: diff --git a/pkg/beaconclient/metrics.go b/pkg/beaconclient/metrics.go new file mode 100644 index 0000000..3a924f7 --- /dev/null +++ b/pkg/beaconclient/metrics.go @@ -0,0 +1,15 @@ +package beaconclient + +import "sync/atomic" + +// Wrapper function to increment inserts. If we want to use mutexes later we can easily update all +// occurrences here. +func (m *BeaconClientMetrics) IncrementHeadTrackingInserts(inc uint64) { + atomic.AddUint64(&m.HeadTrackingInserts, inc) +} + +// Wrapper function to increment reorgs. If we want to use mutexes later we can easily update all +// occurrences here. +func (m *BeaconClientMetrics) IncrementHeadTrackingReorgs(inc uint64) { + atomic.AddUint64(&m.HeadTrackingReorgs, inc) +} diff --git a/pkg/beaconclient/processevents.go b/pkg/beaconclient/processevents.go index 2b26556..14713e6 100644 --- a/pkg/beaconclient/processevents.go +++ b/pkg/beaconclient/processevents.go @@ -17,7 +17,7 @@ func (bc *BeaconClient) handleReorg() { for { reorg := <-bc.ReOrgTracking.ProcessCh log.WithFields(log.Fields{"reorg": reorg}).Debug("Received a new reorg message.") - processReorg(bc.Db, reorg.Slot, reorg.NewHeadBlock) + writeReorgs(bc.Db, reorg.Slot, reorg.NewHeadBlock, bc.Metrics) } } @@ -33,7 +33,7 @@ func (bc *BeaconClient) handleHead() { err: fmt.Errorf("Unable to turn the slot from string to int: %s", head.Slot), } } - err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot) + err = processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics) if err != nil { loghelper.LogSlotError(head.Slot, err) } diff --git a/pkg/beaconclient/processreorgs.go b/pkg/beaconclient/processreorgs.go deleted file mode 100644 index 2591f29..0000000 --- a/pkg/beaconclient/processreorgs.go +++ /dev/null @@ -1,31 +0,0 @@ -package beaconclient - -import ( - log "github.com/sirupsen/logrus" - "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" - "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" -) - -func processReorg(db sql.Database, slot string, latestBlockRoot string) { - // Check to see if there are slots in the DB with the given slot. - // Update them ALL to forked - // Upsert the new slot into the DB, mark the status to proposed. - // Query at the end to make sure that you have handled the reorg properly. - updatedRows, err := updateReorgs(db, slot, latestBlockRoot) - - if err != nil { - // Add this slot to the knownGaps table.. - // Maybe we need to rename the knownGaps table to the "batchProcess" table. - } - - if updatedRows > 0 { - loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ - "updatedRows": updatedRows, - }).Info("Updated DB based on Reorgs.") - } else { - loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{ - "updatedRows": updatedRows, - }).Warn("There were no rows to update.") - - } -} diff --git a/pkg/beaconclient/processslot.go b/pkg/beaconclient/processslot.go index 394b43c..61dfd85 100644 --- a/pkg/beaconclient/processslot.go +++ b/pkg/beaconclient/processslot.go @@ -30,14 +30,15 @@ var ( type ProcessSlot struct { // Generic - Slot int // The slot number. - Epoch int // The epoch number. - BlockRoot string // The hex encoded string of the BlockRoot. - StateRoot string // The hex encoded string of the StateRoot. - ParentBlockRoot string // The hex encoded string of the parent block. - Status string // The status of the block - HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and missed slots. - Db sql.Database // The DB object used to write to the DB. + Slot int // The slot number. + Epoch int // The epoch number. + BlockRoot string // The hex encoded string of the BlockRoot. + StateRoot string // The hex encoded string of the StateRoot. + ParentBlockRoot string // The hex encoded string of the parent block. + Status string // The status of the block + HeadOrHistoric string // Is this the head or a historic slot. This is critical when trying to analyze errors and missed slots. + Db sql.Database // The DB object used to write to the DB. + Metrics *BeaconClientMetrics // An object to keep track of the beaconclient metrics // BeaconBlock SszSignedBeaconBlock []byte // The entire SSZ encoded SignedBeaconBlock @@ -54,7 +55,7 @@ type ProcessSlot struct { } // This function will do all the work to process the slot and write it to the DB. -func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string) error { +func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics) error { headOrHistoric = strings.ToLower(headOrHistoric) if headOrHistoric != "head" && headOrHistoric != "historic" { return fmt.Errorf("headOrBatch must be either historic or head!") @@ -64,6 +65,8 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot BlockRoot: blockRoot, StateRoot: stateRoot, HeadOrHistoric: headOrHistoric, + Db: db, + Metrics: metrics, } // Get the SignedBeaconBlock. @@ -78,25 +81,24 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot return err } + // Get this object ready to write + dw := ps.createWriteObjects() + + // Write the object to the DB. + dw.writeFullSlot() + // Handle any reorgs or skipped slots. if ps.HeadOrHistoric == "head" { if previousSlot != 0 && previousBlockRoot != "" { ps.checkPreviousSlot(previousSlot, previousBlockRoot) } } - - // Get this object ready to write - dw := ps.createWriteObjects(db) - - // Write the object to the DB. - dw.writeFullSlot() - return nil } // Handle a slot that is at head. A wrapper function for calling `handleFullSlot`. -func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string) error { - return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head") +func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics) error { + return processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics) } // Handle a historic slot. A wrapper function for calling `handleFullSlot`. @@ -179,7 +181,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str "slot": ps.FullBeaconState.Slot, "fork": true, }).Warn("A fork occurred! The previous slot and current slot match.") - processReorg(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot) + writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics) } else if previousSlot-1 != int(ps.FullBeaconState.Slot) { log.WithFields(log.Fields{ "previousSlot": previousSlot, @@ -192,7 +194,7 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str "previousBlockRoot": previousBlockRoot, "currentBlockParent": parentRoot, }).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.") - processReorg(ps.Db, strconv.Itoa(previousSlot), parentRoot) + writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics) // Call our batch processing function. // Continue with this slot. } else { @@ -216,7 +218,7 @@ func (ps *ProcessSlot) checkMissedSlot() { } // Transforms all the raw data into DB models that can be written to the DB. -func (ps *ProcessSlot) createWriteObjects(db sql.Database) *DatabaseWriter { +func (ps *ProcessSlot) createWriteObjects() *DatabaseWriter { var ( stateRoot string blockRoot string @@ -234,6 +236,7 @@ func (ps *ProcessSlot) createWriteObjects(db sql.Database) *DatabaseWriter { if ps.BlockRoot != "" { blockRoot = ps.BlockRoot } else { + log.Info("We need to add logic") // We need to get the state of Slot + 1, then we can run the below. // WE can query it for each run, or we can leave it blank, and update it. // I just want to avoid getting the same state twice, especially since the state can get heavy. @@ -251,7 +254,7 @@ func (ps *ProcessSlot) createWriteObjects(db sql.Database) *DatabaseWriter { status = "proposed" } - dw := CreateDatabaseWrite(db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, status) + dw := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, status, ps.Metrics) dw.rawSignedBeaconBlock = ps.SszSignedBeaconBlock dw.rawBeaconState = ps.SszBeaconState diff --git a/pkg/database/sql/postgres/pgx_test.go b/pkg/database/sql/postgres/pgx_test.go index c5c5422..21a76a8 100644 --- a/pkg/database/sql/postgres/pgx_test.go +++ b/pkg/database/sql/postgres/pgx_test.go @@ -29,17 +29,18 @@ var _ = Describe("Pgx", func() { _, err := postgres.NewPostgresDB(postgres.Config{ Driver: "PGX", }) - Expect(err).NotTo(BeNil()) + Expect(err).To(HaveOccurred()) present, err := doesContainsSubstring(err.Error(), sql.DbConnectionFailedMsg) Expect(present).To(BeTrue()) + Expect(err).NotTo(HaveOccurred()) }) }) Context("The connection is successful", func() { It("Should create a DB object", func() { db, err := postgres.NewPostgresDB(postgres.DefaultConfig) - defer db.Close() Expect(err).To(BeNil()) + defer db.Close() }) }) }) diff --git a/pkg/gracefulshutdown/gracefulshutdown.go b/pkg/gracefulshutdown/gracefulshutdown.go index a8ba972..8230a38 100644 --- a/pkg/gracefulshutdown/gracefulshutdown.go +++ b/pkg/gracefulshutdown/gracefulshutdown.go @@ -38,7 +38,6 @@ func Shutdown(ctx context.Context, notifierCh chan os.Signal, timeout time.Durat timeoutFunc := time.AfterFunc(timeout, func() { log.Warnf(TimeoutErr(timeout.String()).Error()) errCh <- TimeoutErr(timeout.String()) - return }) defer timeoutFunc.Stop()