Update boot to set processing type.

This commit is contained in:
Abdul Rabbani 2022-05-19 13:37:38 -04:00
parent 041276da81
commit af1cf89910
13 changed files with 102 additions and 48 deletions

View File

@ -26,7 +26,7 @@ on:
env:
stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'main'}}
ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || 'feature/schema-ipld-ethcl-indexer' }}
ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || 'main' }}
ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }}
GOPATH: /tmp/go
jobs:

View File

@ -44,7 +44,8 @@ func bootApp() {
log.Info("Starting the application in boot mode.")
ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", testDisregardSync)
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver,
bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "head", testDisregardSync)
if err != nil {
loghelper.LogError(err).Error("Unable to Start application")
}

View File

@ -18,7 +18,6 @@ package cmd
import (
"context"
"os"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -48,18 +47,15 @@ func startHeadTracking() {
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", testDisregardSync)
BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver,
bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "historic", testDisregardSync)
if err != nil {
loghelper.LogError(err).Error("Unable to Start application")
if DB != nil {
DB.Close()
}
os.Exit(1)
StopApplicationPreBoot(err, DB)
}
log.Info("The Beacon Client has booted successfully!")
// Capture head blocks
go BC.CaptureHead(kgTableIncrement)
go BC.CaptureHead()
// Shutdown when the time is right.
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC)

View File

@ -43,7 +43,8 @@ func startHistoricProcessing() {
log.Info("Starting the application in head tracking mode.")
ctx := context.Background()
_, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", testDisregardSync)
_, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver,
bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "historic", testDisregardSync)
if err != nil {
StopApplicationPreBoot(err, DB)
}

View File

@ -41,11 +41,12 @@ var (
// 2. Connect to the database.
//
// 3. Make sure the node is synced, unless disregardSync is true.
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, bcAddress string, bcPort int, bcConnectionProtocol string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
log.Info("Booting the Application")
log.Debug("Creating the Beacon Client")
BC = beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort)
BC = beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement)
log.Debug("Checking Beacon Client")
err := BC.CheckBeaconClient()
@ -81,10 +82,11 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, startUpMode string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int, startUpMode string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) {
var err error
for i := 0; i < bcMaxRetry; i++ {
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, bcAddress, bcPort, bcConnectionProtocol, disregardSync)
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync)
if err != nil {
log.WithFields(log.Fields{
"retryNumber": i,
@ -98,15 +100,21 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int
switch strings.ToLower(startUpMode) {
case "head":
log.Debug("No further actions needed to boot the application at this phase.")
BC.PerformHeadTracking = true
case "historic":
log.Debug("Performing additional boot steps for historical processing")
BC.PerformHistoricalProcessing = true
headSlot, err := BC.GetLatestSlotInBeaconServer(bcType)
if err != nil {
return BC, DB, err
}
BC.UpdateLatestSlotInBeaconServer(int64(headSlot))
// Add another switch case for bcType if its ever needed.
default:
log.WithFields(log.Fields{
"startUpMode": startUpMode,
}).Error("The startUpMode provided is not handled.")
}
return BC, DB, err
}

View File

@ -37,44 +37,45 @@ var _ = Describe("Boot", func() {
bcType string = "lighthouse"
bcBootRetryInterval int = 1
bcBootMaxRetry int = 5
bcKgTableIncrement int = 10
)
Describe("Booting the application", Label("integration"), func() {
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() {
It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", true)
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true)
defer db.Close()
Expect(err).ToNot(HaveOccurred())
})
})
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() {
It("Should connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "historic", true)
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true)
defer db.Close()
Expect(err).ToNot(HaveOccurred())
})
})
Context("When the DB and BC are both up and running, and we check for a synced head", func() {
It("Should not connect successfully", func() {
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", false)
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false)
defer db.Close()
Expect(err).To(HaveOccurred())
})
})
Context("When the DB is running but not the BC", func() {
It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true)
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true)
Expect(err).To(HaveOccurred())
})
})
Context("When the BC is running but not the DB", func() {
It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, true)
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true)
Expect(err).To(HaveOccurred())
})
})
Context("When neither the BC or DB are running", func() {
It("Should not connect successfully", func() {
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, true)
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true)
Expect(err).To(HaveOccurred())
})
})

View File

@ -49,6 +49,7 @@ var _ = Describe("Shutdown", func() {
bcType string = "lighthouse"
bcBootRetryInterval int = 1
bcBootMaxRetry int = 5
bcKgTableIncrement int = 10
maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second
DB sql.Database
BC *beaconclient.BeaconClient
@ -58,7 +59,8 @@ var _ = Describe("Shutdown", func() {
)
BeforeEach(func() {
ctx = context.Background()
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, "head", true)
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress,
bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true)
notifierCh = make(chan os.Signal, 1)
Expect(err).To(BeNil())
})

View File

@ -43,12 +43,11 @@ var (
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.
type BeaconClient struct {
Context context.Context // A context generic context with multiple uses.
ServerEndpoint string // What is the endpoint of the beacon server.
PerformHistoricalProcessing bool // Should we perform historical processing?
Db sql.Database // Database object used for reads and writes.
Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics.
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
Context context.Context // A context generic context with multiple uses.
ServerEndpoint string // What is the endpoint of the beacon server.
Db sql.Database // Database object used for reads and writes.
Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics.
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
// Used for Head Tracking
@ -65,7 +64,8 @@ type BeaconClient struct {
// The latest available slot within the Beacon Server. We can't query any slot greater than this.
// This value is lazily updated. Therefore at times it will be outdated.
LatestSlotInBeaconServer int64
LatestSlotInBeaconServer int64
PerformHistoricalProcessing bool // Should we perform historical processing?
}
// A struct to keep track of relevant the head event topic.
@ -84,14 +84,15 @@ type SseError struct {
}
// A Function to create the BeaconClient.
func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int) *BeaconClient {
func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int) *BeaconClient {
endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort)
log.Info("Creating the BeaconClient")
return &BeaconClient{
Context: ctx,
ServerEndpoint: endpoint,
HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint),
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
Context: ctx,
ServerEndpoint: endpoint,
KnownGapTableIncrement: bcKgTableIncrement,
HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint),
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
Metrics: &BeaconClientMetrics{
HeadTrackingInserts: 0,
HeadTrackingReorgs: 0,

View File

@ -25,12 +25,9 @@ import (
)
// This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHead(knownGapsTableIncrement int) {
bc.KnownGapTableIncrement = knownGapsTableIncrement
func (bc *BeaconClient) CaptureHead() {
log.Info("We are tracking the head of the chain.")
//bc.tempHelper()
go bc.handleHead()
//go bc.handleFinalizedCheckpoint()
go bc.handleReorg()
bc.captureEventTopic()
}

View File

@ -254,7 +254,7 @@ var _ = Describe("Capturehead", func() {
validateBeaconState(bc, BeaconNodeTester.TestEvents["2375703"].HeadMessage, "/blocks/QHVAEQRQPBRDMMRRGVRDKNRQGI3TGYLGGYZWKYZXMUYDCMJVG4ZGENRQMVRTCY3BGBRDAMRTGJTDQZTGGQ2GMY3EGRSWINJVMM3TKMRWMU4TMNDF")
})
})
Context("Correctly formatted Altair Test Blocks", func() {
Context("Correctly formatted Altair Test Blocks", Label("now"), func() {
It("Should turn it into a struct successfully.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "2375702")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
@ -315,7 +315,7 @@ var _ = Describe("Capturehead", func() {
// })
//})
Context("When the proper SSZ objects are not served", Label("now"), func() {
Context("When the proper SSZ objects are not served", func() {
It("Should return an error, and add the slot to the knownGaps table.", func() {
bc := setUpTest(BeaconNodeTester.TestConfig, "101")
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
@ -435,7 +435,7 @@ type Config struct {
// Must run before each test. We can't use the beforeEach because of the way
// Gingko treats race conditions.
func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient {
bc := *beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port)
bc := *beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement)
db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver)
Expect(err).ToNot(HaveOccurred())
@ -770,7 +770,7 @@ func (tbc TestBeaconNode) provideSsz(slotIdentifier string, sszIdentifier string
// Helper function to test three reorg messages. There are going to be many functions like this,
// Because we need to test the same logic for multiple phases.
func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, thirdHead beaconclient.Head, epoch int, maxRetry int) {
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
go bc.CaptureHead()
time.Sleep(1 * time.Second)
log.Info("Sending Phase0 Messages to BeaconClient")
@ -832,7 +832,7 @@ func (tbc TestBeaconNode) testMultipleReorgs(bc *beaconclient.BeaconClient, firs
// A test to validate a single block was processed correctly
func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head beaconclient.Head, epoch int, maxRetry int, expectedSuccessInsert uint64, expectedKnownGaps uint64, expectedReorgs uint64) {
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
go bc.CaptureHead()
time.Sleep(1 * time.Second)
sendHeadMessage(bc, head, maxRetry, expectedSuccessInsert)
@ -862,7 +862,7 @@ func (tbc TestBeaconNode) testProcessBlock(bc *beaconclient.BeaconClient, head b
// A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly.
func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstHead beaconclient.Head, secondHead beaconclient.Head, epoch int, maxRetry int) {
go bc.CaptureHead(tbc.TestConfig.knownGapsTableIncrement)
go bc.CaptureHead()
time.Sleep(1 * time.Second)
sendHeadMessage(bc, firstHead, maxRetry, 1)
@ -889,7 +889,8 @@ func (tbc TestBeaconNode) testMultipleHead(bc *beaconclient.BeaconClient, firstH
// A test that ensures that if two HeadMessages occur for a single slot they are marked
// as proposed and forked correctly.
func (tbc TestBeaconNode) testKnownGapsMessages(bc *beaconclient.BeaconClient, tableIncrement int, expectedEntries uint64, maxRetry int, msg ...beaconclient.Head) {
go bc.CaptureHead(tableIncrement)
bc.KnownGapTableIncrement = tableIncrement
go bc.CaptureHead()
time.Sleep(1 * time.Second)
for _, headMsg := range msg {

View File

@ -0,0 +1,28 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// This file will call all the functions to start and stop capturing the head of the beacon chain.
package beaconclient
import log "github.com/sirupsen/logrus"
// This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) CaptureHistoric() {
log.Info("We are starting the historical processing service.")
go bc.handleHead()
go bc.handleReorg()
bc.captureEventTopic()
}

View File

@ -25,8 +25,8 @@ import (
var _ = Describe("Healthcheck", func() {
var (
BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052)
errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010)
BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 10, 5052)
errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 10, 1010)
)
Describe("Connecting to the lighthouse client", Label("integration"), func() {
Context("When the client is running", func() {

View File

@ -0,0 +1,18 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// This file will call all the functions to start and stop capturing the head of the beacon chain.
package beaconclient