From 4a85428b5311475256d4d11f02ac0a2de8a214a4 Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Wed, 25 May 2022 10:59:25 -0400 Subject: [PATCH 1/8] Capture the unique identifier everywhere its needed. --- cmd/boot.go | 8 +++-- cmd/capture.go | 4 +++ cmd/head.go | 2 +- cmd/historic.go | 2 +- example.ipld-ethcl-indexer-config.json | 3 +- go.mod | 2 +- internal/boot/boot.go | 28 ++++++++++------ internal/boot/boot_test.go | 20 +++++++---- internal/shutdown/shutdown_test.go | 46 ++++++++++++++------------ pkg/beaconclient/beaconclient.go | 16 +++++++-- pkg/beaconclient/capturehead_test.go | 8 +++-- pkg/beaconclient/healthcheck_test.go | 11 +++--- pkg/beaconclient/metrics.go | 43 ++++++++++++++++++------ 13 files changed, 128 insertions(+), 65 deletions(-) diff --git a/cmd/boot.go b/cmd/boot.go index 9fc3532..bb543e6 100644 --- a/cmd/boot.go +++ b/cmd/boot.go @@ -23,6 +23,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/spf13/viper" "github.com/vulcanize/ipld-ethcl-indexer/internal/boot" "github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown" "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" @@ -44,8 +45,9 @@ func bootApp() { log.Info("Starting the application in boot mode.") ctx := context.Background() - BC, DB, err := boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, - bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, kgTableIncrement, "head", testDisregardSync) + Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), + viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), + viper.GetInt("kg.increment"), "boot", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) if err != nil { loghelper.LogError(err).Error("Unable to Start application") } @@ -58,7 +60,7 @@ func bootApp() { notifierCh <- syscall.SIGTERM }() - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") } else { diff --git a/cmd/capture.go b/cmd/capture.go index e1b46c8..93bbe66 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -39,6 +39,7 @@ var ( bcConnectionProtocol string bcType string bcMaxHistoricProcessWorker int + bcUniqueNodeIdentifier int kgMaxWorker int kgTableIncrement int kgProcessGaps bool @@ -93,6 +94,7 @@ func init() { captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application") captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.") + captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.") // err = captureCmd.MarkPersistentFlagRequired("bc.address") // exitErr(err) // err = captureCmd.MarkPersistentFlagRequired("bc.port") @@ -143,6 +145,8 @@ func init() { exitErr(err) err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker")) exitErr(err) + err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier")) + exitErr(err) // Here you will define your flags and configuration settings. //// Known Gap Specific diff --git a/cmd/head.go b/cmd/head.go index 1006fe1..3af009f 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -50,7 +50,7 @@ func startHeadTracking() { Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), - viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync")) + viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) if err != nil { StopApplicationPreBoot(err, Db) } diff --git a/cmd/historic.go b/cmd/historic.go index cfd9d03..2d89deb 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -50,7 +50,7 @@ func startHistoricProcessing() { Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"), viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), - viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync")) + viper.GetInt("kg.increment"), "historic", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) if err != nil { StopApplicationPreBoot(err, Db) } diff --git a/example.ipld-ethcl-indexer-config.json b/example.ipld-ethcl-indexer-config.json index 0929773..69746ca 100644 --- a/example.ipld-ethcl-indexer-config.json +++ b/example.ipld-ethcl-indexer-config.json @@ -14,7 +14,8 @@ "bootRetryInterval": 30, "bootMaxRetry": 5, "maxHistoricProcessWorker": 2, - "connectionProtocol": "http" + "connectionProtocol": "http", + "uniqueNodeIdentifier": 100 }, "t": { "skipSync": true diff --git a/go.mod b/go.mod index 5bb8589..3c844c8 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/multiformats/go-multihash v0.1.0 github.com/onsi/ginkgo/v2 v2.1.4 github.com/onsi/gomega v1.19.0 + github.com/prometheus/client_golang v1.12.1 github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc github.com/sirupsen/logrus v1.8.1 ) @@ -57,7 +58,6 @@ require ( github.com/multiformats/go-varint v0.0.6 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.12.1 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect diff --git a/internal/boot/boot.go b/internal/boot/boot.go index e726cf8..b1e476b 100644 --- a/internal/boot/boot.go +++ b/internal/boot/boot.go @@ -42,14 +42,17 @@ var ( // // 3. Make sure the node is synced, unless disregardSync is true. func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, - bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { + bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) { log.Info("Booting the Application") log.Debug("Creating the Beacon Client") - BC = beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement) + Bc, err := beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement, uniqueNodeIdentifier) + if err != nil { + return Bc, nil, err + } log.Debug("Checking Beacon Client") - err := BC.CheckBeaconClient() + err = Bc.CheckBeaconClient() if err != nil { return nil, nil, err } @@ -60,36 +63,37 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName return nil, nil, err } - BC.Db = DB + Bc.Db = DB var status bool if !disregardSync { - status, err = BC.CheckHeadSync() + status, err = Bc.CheckHeadSync() if err != nil { log.Error("Unable to get the nodes sync status") - return BC, DB, err + return Bc, DB, err } if status { log.Error("The node is still syncing..") err = fmt.Errorf("The node is still syncing.") - return BC, DB, err + return Bc, DB, err } } else { log.Warn("We are not checking to see if the node has synced to head.") } - return BC, DB, nil + return Bc, DB, nil } // Add retry logic to ensure that we are give the Beacon Client and the DB time to start. func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string, - bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int, startUpMode string, disregardSync bool) (*beaconclient.BeaconClient, sql.Database, error) { + bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int, + startUpMode string, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) { var err error if bcMaxRetry < 0 { i := 0 for { BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, - bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync) + bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier) if err != nil { log.WithFields(log.Fields{ "retryNumber": i, @@ -104,7 +108,7 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int } else { for i := 0; i < bcMaxRetry; i++ { BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName, - bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync) + bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier) if err != nil { log.WithFields(log.Fields{ "retryNumber": i, @@ -136,6 +140,8 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int } BC.UpdateLatestSlotInBeaconServer(int64(headSlot)) // Add another switch case for bcType if its ever needed. + case "boot": + log.Debug("Running application in boot mode.") default: log.WithFields(log.Fields{ "startUpMode": startUpMode, diff --git a/internal/boot/boot_test.go b/internal/boot/boot_test.go index aa77e56..ef621ae 100644 --- a/internal/boot/boot_test.go +++ b/internal/boot/boot_test.go @@ -38,44 +38,52 @@ var _ = Describe("Boot", func() { bcBootRetryInterval int = 1 bcBootMaxRetry int = 5 bcKgTableIncrement int = 10 + bcUniqueIdentifier int = 100 ) Describe("Booting the application", Label("integration"), func() { Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() { It("Should connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier) defer db.Close() Expect(err).ToNot(HaveOccurred()) }) }) Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() { It("Should connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true, bcUniqueIdentifier) defer db.Close() Expect(err).ToNot(HaveOccurred()) }) }) Context("When the DB and BC are both up and running, and we check for a synced head", func() { It("Should not connect successfully", func() { - _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false) + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, bcUniqueIdentifier) + defer db.Close() + Expect(err).To(HaveOccurred()) + }) + }) + Context("When the DB and BC are both up and running, we skip checking for a synced head, but the unique identifier is 0", func() { + It("Should not connect successfully", func() { + _, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, 0) defer db.Close() Expect(err).To(HaveOccurred()) }) }) Context("When the DB is running but not the BC", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true) + _, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) Expect(err).To(HaveOccurred()) }) }) Context("When the BC is running but not the DB", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true) + _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) Expect(err).To(HaveOccurred()) }) }) Context("When neither the BC or DB are running", func() { It("Should not connect successfully", func() { - _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true) + _, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier) Expect(err).To(HaveOccurred()) }) }) diff --git a/internal/shutdown/shutdown_test.go b/internal/shutdown/shutdown_test.go index 0bbf38d..8c75506 100644 --- a/internal/shutdown/shutdown_test.go +++ b/internal/shutdown/shutdown_test.go @@ -35,32 +35,34 @@ import ( "github.com/vulcanize/ipld-ethcl-indexer/pkg/gracefulshutdown" ) +var ( + dbAddress string = "localhost" + dbPort int = 8076 + dbName string = "vulcanize_testing" + dbUsername string = "vdbm" + dbPassword string = "password" + dbDriver string = "PGX" + bcAddress string = "localhost" + bcPort int = 5052 + bcConnectionProtocol string = "http" + bcType string = "lighthouse" + bcBootRetryInterval int = 1 + bcBootMaxRetry int = 5 + bcKgTableIncrement int = 10 + bcUniqueIdentifier int = 100 + maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second + DB sql.Database + BC *beaconclient.BeaconClient + err error + ctx context.Context + notifierCh chan os.Signal +) + var _ = Describe("Shutdown", func() { - var ( - dbAddress string = "localhost" - dbPort int = 8076 - dbName string = "vulcanize_testing" - dbUsername string = "vdbm" - dbPassword string = "password" - dbDriver string = "PGX" - bcAddress string = "localhost" - bcPort int = 5052 - bcConnectionProtocol string = "http" - bcType string = "lighthouse" - bcBootRetryInterval int = 1 - bcBootMaxRetry int = 5 - bcKgTableIncrement int = 10 - maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second - DB sql.Database - BC *beaconclient.BeaconClient - err error - ctx context.Context - notifierCh chan os.Signal - ) BeforeEach(func() { ctx = context.Background() BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, - bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true) + bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier) notifierCh = make(chan os.Signal, 1) Expect(err).To(BeNil()) }) diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index 15f24b2..33d3cac 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -48,6 +48,7 @@ type BeaconClient struct { Db sql.Database // Database object used for reads and writes. Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics. KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry. + UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node. // Used for Head Tracking @@ -84,7 +85,16 @@ type SseError struct { } // A Function to create the BeaconClient. -func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int) *BeaconClient { +func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int, uniqueNodeIdentifier int) (*BeaconClient, error) { + if uniqueNodeIdentifier == 0 { + return nil, fmt.Errorf("The unique node identifier provided is 0, it must be a non-zero value!!!!") + } + + metrics, err := CreateBeaconClientMetrics() + if err != nil { + return nil, err + } + endpoint := fmt.Sprintf("%s://%s:%d", connectionProtocol, bcAddress, bcPort) log.Info("Creating the BeaconClient") return &BeaconClient{ @@ -93,9 +103,9 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres KnownGapTableIncrement: bcKgTableIncrement, HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint), ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), - Metrics: CreateBeaconClientMetrics(), + Metrics: metrics, //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), - } + }, nil } // Create all the channels to handle a SSE events diff --git a/pkg/beaconclient/capturehead_test.go b/pkg/beaconclient/capturehead_test.go index 459b2af..0856cbe 100644 --- a/pkg/beaconclient/capturehead_test.go +++ b/pkg/beaconclient/capturehead_test.go @@ -54,6 +54,7 @@ var ( dbUser string = "vdbm" dbPassword string = "password" dbDriver string = "pgx" + bcUniqueIdentifier int = 100 dummyParentRoot string = "46f98c08b54a71dfda4d56e29ec3952b8300cd8d6b67a9b6c562ae96a7a25a42" knownGapsTableIncrement int = 100000 maxRetry int = 120 @@ -202,6 +203,7 @@ var ( dbPassword: dbPassword, dbDriver: dbDriver, knownGapsTableIncrement: knownGapsTableIncrement, + bcUniqueIdentifier: bcUniqueIdentifier, } BeaconNodeTester = TestBeaconNode{ @@ -421,6 +423,7 @@ type Config struct { dbPassword string dbDriver string knownGapsTableIncrement int + bcUniqueIdentifier int } ////////////////////////////////////////////////////// @@ -430,7 +433,8 @@ type Config struct { // Must run before each test. We can't use the beforeEach because of the way // Gingko treats race conditions. func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient { - bc := *beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement) + bc, err := beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement, config.bcUniqueIdentifier) + Expect(err).ToNot(HaveOccurred()) db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver) Expect(err).ToNot(HaveOccurred()) @@ -441,7 +445,7 @@ func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient { writeSlot(db, maxSlot) bc.Db = db - return &bc + return bc } // A helper function to validate the expected output from the ethcl.slots table. diff --git a/pkg/beaconclient/healthcheck_test.go b/pkg/beaconclient/healthcheck_test.go index df92398..c09e3d8 100644 --- a/pkg/beaconclient/healthcheck_test.go +++ b/pkg/beaconclient/healthcheck_test.go @@ -25,19 +25,22 @@ import ( var _ = Describe("Healthcheck", func() { var ( - BC *beaconclient.BeaconClient + Bc *beaconclient.BeaconClient errBc *beaconclient.BeaconClient ) BeforeEach(func() { - BC = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10) - errBc = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10) + var err error + Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10, bcUniqueIdentifier) + Expect(err).ToNot(HaveOccurred()) + errBc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10, bcUniqueIdentifier) + Expect(err).ToNot(HaveOccurred()) }) Describe("Connecting to the lighthouse client", Label("integration"), func() { Context("When the client is running", func() { It("We should connect successfully", func() { - err := BC.CheckBeaconClient() + err := Bc.CheckBeaconClient() Expect(err).To(BeNil()) }) }) diff --git a/pkg/beaconclient/metrics.go b/pkg/beaconclient/metrics.go index 6156e2d..086d7a7 100644 --- a/pkg/beaconclient/metrics.go +++ b/pkg/beaconclient/metrics.go @@ -24,7 +24,7 @@ import ( ) //Create a metric struct and register each channel with prometheus -func CreateBeaconClientMetrics() *BeaconClientMetrics { +func CreateBeaconClientMetrics() (*BeaconClientMetrics, error) { metrics := &BeaconClientMetrics{ SlotInserts: 0, ReorgInserts: 0, @@ -34,17 +34,38 @@ func CreateBeaconClientMetrics() *BeaconClientMetrics { HeadError: 0, HeadReorgError: 0, } - prometheusRegisterHelper("slot_inserts", "Keeps track of the number of slots we have inserted.", &metrics.SlotInserts) - prometheusRegisterHelper("reorg_inserts", "Keeps track of the number of reorgs we have inserted.", &metrics.ReorgInserts) - prometheusRegisterHelper("known_gaps_inserts", "Keeps track of the number of known gaps we have inserted.", &metrics.KnownGapsInserts) - prometheusRegisterHelper("known_gaps_processed", "Keeps track of the number of known gaps we processed.", &metrics.knownGapsProcessed) - prometheusRegisterHelper("known_gaps_processing_error", "Keeps track of the number of known gaps we had errors processing.", &metrics.KnownGapsProcessingError) - prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError) - prometheusRegisterHelper("head_reorg_error", "Keeps track of the number of errors we had processing reorg messages.", &metrics.HeadReorgError) - return metrics + err := prometheusRegisterHelper("slot_inserts", "Keeps track of the number of slots we have inserted.", &metrics.SlotInserts) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("reorg_inserts", "Keeps track of the number of reorgs we have inserted.", &metrics.ReorgInserts) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("known_gaps_inserts", "Keeps track of the number of known gaps we have inserted.", &metrics.KnownGapsInserts) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("known_gaps_processed", "Keeps track of the number of known gaps we processed.", &metrics.knownGapsProcessed) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("known_gaps_processing_error", "Keeps track of the number of known gaps we had errors processing.", &metrics.KnownGapsProcessingError) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("head_error", "Keeps track of the number of errors we had processing head messages.", &metrics.HeadError) + if err != nil { + return nil, err + } + err = prometheusRegisterHelper("head_reorg_error", "Keeps track of the number of errors we had processing reorg messages.", &metrics.HeadReorgError) + if err != nil { + return nil, err + } + return metrics, nil } -func prometheusRegisterHelper(name string, help string, varPointer *uint64) { +func prometheusRegisterHelper(name string, help string, varPointer *uint64) error { err := prometheus.Register(prometheus.NewCounterFunc( prometheus.CounterOpts{ Namespace: "beacon_client", @@ -58,7 +79,9 @@ func prometheusRegisterHelper(name string, help string, varPointer *uint64) { })) if err != nil && err.Error() != "duplicate metrics collector registration attempted" { loghelper.LogError(err).WithField("name", name).Error("Unable to register counter.") + return err } + return nil } // A structure utilized for keeping track of various metrics. Currently, mostly used in testing. -- 2.45.2 From 0da384517a1d39dad6d71be2ce483a24b5756b8b Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Thu, 2 Jun 2022 17:04:27 -0400 Subject: [PATCH 2/8] Handle graceful shutdown for historical processing --- cmd/boot.go | 2 +- cmd/head.go | 2 +- cmd/historic.go | 2 +- internal/shutdown/shutdown.go | 53 +++++++++++++++++++++++----- internal/shutdown/shutdown_test.go | 8 ++--- pkg/beaconclient/beaconclient.go | 7 +++- pkg/beaconclient/capturehistoric.go | 45 +++++++++++++++-------- pkg/beaconclient/processhistoric.go | 34 ++++++++++++++---- pkg/beaconclient/processknowngaps.go | 43 ++++++++++++++++++---- 9 files changed, 153 insertions(+), 43 deletions(-) diff --git a/cmd/boot.go b/cmd/boot.go index bb543e6..e5e7b3b 100644 --- a/cmd/boot.go +++ b/cmd/boot.go @@ -60,7 +60,7 @@ func bootApp() { notifierCh <- syscall.SIGTERM }() - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownBoot(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") } else { diff --git a/cmd/head.go b/cmd/head.go index 3af009f..9ef4d63 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -81,7 +81,7 @@ func startHeadTracking() { } // Shutdown when the time is right. - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") } else { diff --git a/cmd/historic.go b/cmd/historic.go index 2d89deb..e9d8d95 100644 --- a/cmd/historic.go +++ b/cmd/historic.go @@ -91,7 +91,7 @@ func startHistoricProcessing() { } // Shutdown when the time is right. - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) + err = shutdown.ShutdownHistoricProcessing(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc) if err != nil { loghelper.LogError(err).Error("Ungracefully Shutdown ipld-ethcl-indexer!") } else { diff --git a/internal/shutdown/shutdown.go b/internal/shutdown/shutdown.go index 22c7310..9c8d71e 100644 --- a/internal/shutdown/shutdown.go +++ b/internal/shutdown/shutdown.go @@ -27,8 +27,21 @@ import ( ) // Shutdown all the internal services for the application. -func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { - successCh, errCh := gracefulshutdown.Shutdown(ctx, notifierCh, waitTime, map[string]gracefulshutdown.Operation{ +func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient, shutdownOperations map[string]gracefulshutdown.Operation) error { + //successCh, errCh := gracefulshutdown.Shutdown(ctx, notifierCh, waitTime, ) + successCh, errCh := gracefulshutdown.Shutdown(ctx, notifierCh, waitTime, shutdownOperations) + + select { + case <-successCh: + return nil + case err := <-errCh: + return err + } +} + +// Wrapper function for shutting down the head tracking process. +func ShutdownHeadTracking(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { + return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. "beaconClient": func(ctx context.Context) error { defer DB.Close() @@ -39,11 +52,33 @@ func ShutdownServices(ctx context.Context, notifierCh chan os.Signal, waitTime t return err }, }) - - select { - case <-successCh: - return nil - case err := <-errCh: - return err - } +} + +// Wrapper function for shutting down the head tracking process. +func ShutdownHistoricProcessing(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { + return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ + // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. + "beaconClient": func(ctx context.Context) error { + defer DB.Close() + err := BC.StopHistoric() + if err != nil { + loghelper.LogError(err).Error("Unable to stop processing historic") + } + err = BC.StopKnownGapsProcessing() + if err != nil { + loghelper.LogError(err).Error("Unable to stop processing known gaps") + } + return err + }, + }) +} + +// Wrapper function for shutting down the application in boot mode. +func ShutdownBoot(ctx context.Context, notifierCh chan os.Signal, waitTime time.Duration, DB sql.Database, BC *beaconclient.BeaconClient) error { + return ShutdownServices(ctx, notifierCh, waitTime, DB, BC, map[string]gracefulshutdown.Operation{ + // Combining DB shutdown with BC because BC needs DB open to cleanly shutdown. + "Database": func(ctx context.Context) error { + return DB.Close() + }, + }) } diff --git a/internal/shutdown/shutdown_test.go b/internal/shutdown/shutdown_test.go index 8c75506..c6fa00e 100644 --- a/internal/shutdown/shutdown_test.go +++ b/internal/shutdown/shutdown_test.go @@ -67,12 +67,12 @@ var _ = Describe("Shutdown", func() { Expect(err).To(BeNil()) }) - Describe("Run Shutdown Function,", Label("integration"), func() { + Describe("Run Shutdown Function for head tracking,", Label("integration"), func() { Context("When Channels are empty,", func() { It("Should Shutdown Successfully.", func() { go func() { log.Debug("Starting shutdown chan") - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).ToNot(HaveOccurred()) }() @@ -84,7 +84,7 @@ var _ = Describe("Shutdown", func() { //log.SetLevel(log.DebugLevel) go func() { log.Debug("Starting shutdown chan") - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).ToNot(HaveOccurred()) shutdownCh <- true @@ -117,7 +117,7 @@ var _ = Describe("Shutdown", func() { //log.SetLevel(log.DebugLevel) go func() { log.Debug("Starting shutdown chan") - err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) + err = shutdown.ShutdownHeadTracking(ctx, notifierCh, maxWaitSecondsShutdown, DB, BC) log.Debug("We have completed the shutdown...") Expect(err).To(MatchError(gracefulshutdown.TimeoutErr(maxWaitSecondsShutdown.String()))) shutdownCh <- true diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index 33d3cac..caa3a20 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -18,6 +18,7 @@ package beaconclient import ( "context" "fmt" + "math/rand" "github.com/r3labs/sse" log "github.com/sirupsen/logrus" @@ -67,6 +68,8 @@ type BeaconClient struct { // This value is lazily updated. Therefore at times it will be outdated. LatestSlotInBeaconServer int64 PerformHistoricalProcessing bool // Should we perform historical processing? + HistoricalProcess historicProcessing + KnownGapsProcess knownGapsProcessing } // A struct to keep track of relevant the head event topic. @@ -87,7 +90,8 @@ type SseError struct { // A Function to create the BeaconClient. func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int, uniqueNodeIdentifier int) (*BeaconClient, error) { if uniqueNodeIdentifier == 0 { - return nil, fmt.Errorf("The unique node identifier provided is 0, it must be a non-zero value!!!!") + uniqueNodeIdentifier := rand.Int() + log.WithField("randomUniqueNodeIdentifier", uniqueNodeIdentifier).Warn("No uniqueNodeIdentifier provided, we are going to use a randomly generated one.") } metrics, err := CreateBeaconClientMetrics() @@ -104,6 +108,7 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres HeadTracking: createSseEvent[Head](endpoint, BcHeadTopicEndpoint), ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint), Metrics: metrics, + UniqueNodeIdentifier: uniqueNodeIdentifier, //FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint), }, nil } diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index 3c4d29b..1b0631a 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -22,27 +22,39 @@ import ( log "github.com/sirupsen/logrus" "github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql" + "github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper" "golang.org/x/sync/errgroup" ) // This function will perform all the heavy lifting for tracking the head of the chain. func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error { log.Info("We are starting the historical processing service.") - hp := historicProcessing{db: bc.Db, metrics: bc.Metrics} - errs := handleBatchProcess(maxWorkers, hp, hp.db, bc.ServerEndpoint, bc.Metrics) + bc.HistoricalProcess = historicProcessing{db: bc.Db, metrics: bc.Metrics} + errs := handleBatchProcess(maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.finishProcessing, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics) log.Debug("Exiting Historical") return errs } +// This function will perform all the necessary clean up tasks for stopping historical processing. +func (bc *BeaconClient) StopHistoric() error { + log.Info("We are stopping the historical processing service.") + err := bc.HistoricalProcess.releaseDbLocks() + if err != nil { + loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the ethcl.historic_processing table. Manual Intervention is needed!") + } + return nil +} + // An interface to enforce any batch processing. Currently there are two use cases for this. // // 1. Historic Processing // // 2. Known Gaps Processing type BatchProcessing interface { - getSlotRange(chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write. - handleProcessingErrors(<-chan batchHistoricError) - removeTableEntry(<-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database. + getSlotRange(chan<- slotsToProcess) []error // Write the slots to process in a channel, return an error if you cant get the next slots to write. + handleProcessingErrors(<-chan batchHistoricError) // Custom logic to handle errors. + removeTableEntry(<-chan slotsToProcess) error // With the provided start and end slot, remove the entry from the database. + releaseDbLocks() error // Update the checked_out column to false for whatever table is being updated. } // A struct to pass around indicating a table entry for slots to process. @@ -71,12 +83,12 @@ type batchHistoricError struct { // 4. Remove the slot entry from the DB. // // 5. Handle any errors. -func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics) []error { +func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics) []error { slotsCh := make(chan slotsToProcess) workCh := make(chan int) processedCh := make(chan slotsToProcess) errCh := make(chan batchHistoricError) - finishCh := make(chan []error, 1) + finalErrCh := make(chan []error, 1) // Start workers for w := 1; w <= maxWorkers; w++ { @@ -115,7 +127,7 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, ser return bp.removeTableEntry(processedCh) }) if err := errG.Wait(); err != nil { - finishCh <- []error{err} + finalErrCh <- []error{err} } }() // Process errors from slot processing. @@ -125,12 +137,17 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, ser go func() { errs := bp.getSlotRange(slotsCh) // Periodically adds new entries.... if errs != nil { - finishCh <- errs + finalErrCh <- errs } - finishCh <- nil + finalErrCh <- nil }() - - errs := <-finishCh - log.Debug("Finishing the batchProcess") - return errs + log.Debug("Waiting for shutdown signal from channel") + select { + case <-finishCh: + log.Debug("Received shutdown signal from channel") + return nil + case errs := <-finalErrCh: + log.Debug("Finishing the batchProcess") + return errs + } } diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index 2ba3bd4..006544c 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -42,19 +42,25 @@ var ( lockHpEntryStmt string = `UPDATE ethcl.historic_process SET checked_out=true WHERE start_slot=$1 AND end_slot=$2;` - // Used to delete an entry from the knownGaps table + // Used to delete an entry from the ethcl.historic_process table deleteHpEntryStmt string = `DELETE FROM ethcl.historic_process WHERE start_slot=$1 AND end_slot=$2;` + // Used to update every single row that this node has checked out. + releaseHpLockStmt string = `UPDATE ethcl.historic_process + SET checked_out=false + WHERE checked_out_by=$1` ) type historicProcessing struct { - db sql.Database - metrics *BeaconClientMetrics + db sql.Database //db connection + metrics *BeaconClientMetrics // metrics for beaconclient + uniqueNodeIdentifier int // node unique identifier. + finishProcessing chan int // A channel which indicates to the process handleBatchProcess function that its time to end. } // Get a single row of historical slots from the table. func (hp historicProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { - return getBatchProcessRow(hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh) + return getBatchProcessRow(hp.db, getHpEntryStmt, checkHpEntryStmt, lockHpEntryStmt, slotCh, strconv.Itoa(hp.uniqueNodeIdentifier)) } // Remove the table entry. @@ -71,6 +77,22 @@ func (hp historicProcessing) handleProcessingErrors(errMessages <-chan batchHist } } +func (hp historicProcessing) releaseDbLocks() error { + go func() { hp.finishProcessing <- 1 }() + log.Debug("Updating all the entries to ethcl.historical processing") + res, err := hp.db.Exec(context.Background(), releaseHpLockStmt, hp.uniqueNodeIdentifier) + if err != nil { + return fmt.Errorf("Unable to remove lock from ethcl.historical_processing table for node %d, error is %e", hp.uniqueNodeIdentifier, err) + } + log.Debug("Update all the entries to ethcl.historical processing") + rows, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("Unable to calculated number of rows affected by releasing locks from ethcl.historical_processing table for node %d, error is %e", hp.uniqueNodeIdentifier, err) + } + log.WithField("rowCount", rows).Info("Released historicalProcess locks for specified rows.") + return nil +} + // Process the slot range. func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics) { for slot := range workCh { @@ -91,7 +113,7 @@ func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, // It also locks the row by updating the checked_out column. // The statement for getting the start_slot and end_slot must be provided. // The statement for "locking" the row must also be provided. -func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess) []error { +func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess, uniqueNodeIdentifier string) []error { errCount := make([]error, 0) // 5 is an arbitrary number. It allows us to retry a few times before @@ -148,7 +170,7 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRow } // Checkout the Row - res, err := tx.Exec(ctx, checkOutRowStmt, sp.startSlot, sp.endSlot) + res, err := tx.Exec(ctx, checkOutRowStmt, sp.startSlot, sp.endSlot, uniqueNodeIdentifier) if err != nil { loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).Error("Unable to checkout the row") errCount = append(errCount, err) diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index 6c07c02..5f3603e 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -36,7 +36,7 @@ var ( checkKgEntryStmt string = `SELECT * FROM ethcl.known_gaps WHERE checked_out=false;` // Used to checkout a row from the ethcl.known_gaps table lockKgEntryStmt string = `UPDATE ethcl.known_gaps - SET checked_out=true + SET checked_out=true, checked_out_by=$3 WHERE start_slot=$1 AND end_slot=$2;` // Used to delete an entry from the knownGaps table deleteKgEntryStmt string = `DELETE FROM ethcl.known_gaps @@ -44,25 +44,41 @@ var ( // Used to check to see if a single slot exists in the known_gaps table. checkKgSingleSlotStmt string = `SELECT start_slot, end_slot FROM ethcl.known_gaps WHERE start_slot=$1 AND end_slot=$2;` + // Used to update every single row that this node has checked out. + releaseKgLockStmt string = `UPDATE ethcl.known_gaps + SET checked_out=false + WHERE checked_out_by=$1` ) type knownGapsProcessing struct { - db sql.Database - metrics *BeaconClientMetrics + db sql.Database //db connection + metrics *BeaconClientMetrics // metrics for beaconclient + uniqueNodeIdentifier int // node unique identifier. + finishProcessing chan int // A channel which indicates to the process handleBatchProcess function that its time to end. } // This function will perform all the heavy lifting for tracking the head of the chain. func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error { log.Info("We are starting the known gaps processing service.") - hp := knownGapsProcessing{db: bc.Db, metrics: bc.Metrics} - errs := handleBatchProcess(maxWorkers, hp, hp.db, bc.ServerEndpoint, bc.Metrics) + bc.KnownGapsProcess = knownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics, finishProcessing: make(chan int)} + errs := handleBatchProcess(maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.finishProcessing, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics) log.Debug("Exiting known gaps processing service") return errs } +// This function will perform all the necessary clean up tasks for stopping historical processing. +func (bc *BeaconClient) StopKnownGapsProcessing() error { + log.Info("We are stopping the historical processing service.") + err := bc.KnownGapsProcess.releaseDbLocks() + if err != nil { + loghelper.LogError(err).WithField("uniqueIdentifier", bc.UniqueNodeIdentifier).Error("We were unable to remove the locks from the ethcl.known_gaps table. Manual Intervention is needed!") + } + return nil +} + // Get a single row of historical slots from the table. func (kgp knownGapsProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { - return getBatchProcessRow(kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh) + return getBatchProcessRow(kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh, strconv.Itoa(kgp.uniqueNodeIdentifier)) } // Remove the table entry. @@ -99,3 +115,18 @@ func (kgp knownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHi } } } + +// Updated checked_out column for the uniqueNodeIdentifier. +func (kgp knownGapsProcessing) releaseDbLocks() error { + go func() { kgp.finishProcessing <- 1 }() + res, err := kgp.db.Exec(context.Background(), releaseKgLockStmt, kgp.uniqueNodeIdentifier) + if err != nil { + return err + } + rows, err := res.RowsAffected() + if err != nil { + return err + } + log.WithField("rowCount", rows).Info("Released knownGaps locks for specified rows.") + return nil +} -- 2.45.2 From 3da8094e60173029388d398197b3402d378f2737 Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Fri, 3 Jun 2022 11:12:54 -0400 Subject: [PATCH 3/8] Update flags and minor shutdown updates --- cmd/boot.go | 2 +- cmd/capture.go | 2 ++ cmd/head.go | 1 + internal/shutdown/shutdown.go | 14 +++++++++++--- pkg/beaconclient/beaconclient.go | 6 +++--- pkg/beaconclient/processknowngaps.go | 12 ++++++------ pkg/database/sql/postgres/database.go | 2 +- 7 files changed, 25 insertions(+), 14 deletions(-) diff --git a/cmd/boot.go b/cmd/boot.go index e5e7b3b..6f98729 100644 --- a/cmd/boot.go +++ b/cmd/boot.go @@ -49,7 +49,7 @@ func bootApp() { viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"), viper.GetInt("kg.increment"), "boot", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier")) if err != nil { - loghelper.LogError(err).Error("Unable to Start application") + StopApplicationPreBoot(err, Db) } log.Info("Boot complete, we are going to shutdown.") diff --git a/cmd/capture.go b/cmd/capture.go index 93bbe66..d4ad6fc 100644 --- a/cmd/capture.go +++ b/cmd/capture.go @@ -125,6 +125,8 @@ func init() { exitErr(err) err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name")) exitErr(err) + err = viper.BindPFlag("db.driver", captureCmd.PersistentFlags().Lookup("db.driver")) + exitErr(err) //// Testing Specific err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync")) diff --git a/cmd/head.go b/cmd/head.go index 9ef4d63..f2dd0e5 100644 --- a/cmd/head.go +++ b/cmd/head.go @@ -94,6 +94,7 @@ func init() { captureCmd.AddCommand(headCmd) } +// Start prometheus server func serveProm(addr string) { mux := http.NewServeMux() mux.Handle("/metrics", promhttp.Handler()) diff --git a/internal/shutdown/shutdown.go b/internal/shutdown/shutdown.go index 9c8d71e..228b760 100644 --- a/internal/shutdown/shutdown.go +++ b/internal/shutdown/shutdown.go @@ -49,6 +49,12 @@ func ShutdownHeadTracking(ctx context.Context, notifierCh chan os.Signal, waitTi if err != nil { loghelper.LogError(err).Error("Unable to trigger shutdown of head tracking") } + if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { + err = BC.StopKnownGapsProcessing() + if err != nil { + loghelper.LogError(err).Error("Unable to stop processing known gaps") + } + } return err }, }) @@ -64,9 +70,11 @@ func ShutdownHistoricProcessing(ctx context.Context, notifierCh chan os.Signal, if err != nil { loghelper.LogError(err).Error("Unable to stop processing historic") } - err = BC.StopKnownGapsProcessing() - if err != nil { - loghelper.LogError(err).Error("Unable to stop processing known gaps") + if BC.KnownGapsProcess != (beaconclient.KnownGapsProcessing{}) { + err = BC.StopKnownGapsProcessing() + if err != nil { + loghelper.LogError(err).Error("Unable to stop processing known gaps") + } } return err }, diff --git a/pkg/beaconclient/beaconclient.go b/pkg/beaconclient/beaconclient.go index caa3a20..e9fd5c0 100644 --- a/pkg/beaconclient/beaconclient.go +++ b/pkg/beaconclient/beaconclient.go @@ -50,6 +50,7 @@ type BeaconClient struct { Metrics *BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics. KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry. UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node. + KnownGapsProcess KnownGapsProcessing // object keeping track of knowngaps processing // Used for Head Tracking @@ -67,9 +68,8 @@ type BeaconClient struct { // The latest available slot within the Beacon Server. We can't query any slot greater than this. // This value is lazily updated. Therefore at times it will be outdated. LatestSlotInBeaconServer int64 - PerformHistoricalProcessing bool // Should we perform historical processing? - HistoricalProcess historicProcessing - KnownGapsProcess knownGapsProcessing + PerformHistoricalProcessing bool // Should we perform historical processing? + HistoricalProcess historicProcessing // object keeping track of historical processing } // A struct to keep track of relevant the head event topic. diff --git a/pkg/beaconclient/processknowngaps.go b/pkg/beaconclient/processknowngaps.go index 5f3603e..01762da 100644 --- a/pkg/beaconclient/processknowngaps.go +++ b/pkg/beaconclient/processknowngaps.go @@ -50,7 +50,7 @@ var ( WHERE checked_out_by=$1` ) -type knownGapsProcessing struct { +type KnownGapsProcessing struct { db sql.Database //db connection metrics *BeaconClientMetrics // metrics for beaconclient uniqueNodeIdentifier int // node unique identifier. @@ -60,7 +60,7 @@ type knownGapsProcessing struct { // This function will perform all the heavy lifting for tracking the head of the chain. func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error { log.Info("We are starting the known gaps processing service.") - bc.KnownGapsProcess = knownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics, finishProcessing: make(chan int)} + bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics, finishProcessing: make(chan int)} errs := handleBatchProcess(maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.finishProcessing, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics) log.Debug("Exiting known gaps processing service") return errs @@ -77,17 +77,17 @@ func (bc *BeaconClient) StopKnownGapsProcessing() error { } // Get a single row of historical slots from the table. -func (kgp knownGapsProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { +func (kgp KnownGapsProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error { return getBatchProcessRow(kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh, strconv.Itoa(kgp.uniqueNodeIdentifier)) } // Remove the table entry. -func (kgp knownGapsProcessing) removeTableEntry(processCh <-chan slotsToProcess) error { +func (kgp KnownGapsProcessing) removeTableEntry(processCh <-chan slotsToProcess) error { return removeRowPostProcess(kgp.db, processCh, QueryBySlotStmt, deleteKgEntryStmt) } // Remove the table entry. -func (kgp knownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) { +func (kgp KnownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) { for { errMs := <-errMessages @@ -117,7 +117,7 @@ func (kgp knownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHi } // Updated checked_out column for the uniqueNodeIdentifier. -func (kgp knownGapsProcessing) releaseDbLocks() error { +func (kgp KnownGapsProcessing) releaseDbLocks() error { go func() { kgp.finishProcessing <- 1 }() res, err := kgp.db.Exec(context.Background(), releaseKgLockStmt, kgp.uniqueNodeIdentifier) if err != nil { diff --git a/pkg/database/sql/postgres/database.go b/pkg/database/sql/postgres/database.go index c8451f8..16cb936 100644 --- a/pkg/database/sql/postgres/database.go +++ b/pkg/database/sql/postgres/database.go @@ -49,7 +49,7 @@ func SetupPostgresDb(dbHostname string, dbPort int, dbName string, dbUsername st "driver_name_provided": driverName, }).Error("Can't resolve driver type") } - log.Info("Using Driver:", DbDriver) + log.Info("Using Driver: ", DbDriver) postgresConfig := Config{ Hostname: dbHostname, -- 2.45.2 From ceace514d36d017ab4bfb64e8b0f5bc40f2ec2a6 Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Fri, 3 Jun 2022 11:30:21 -0400 Subject: [PATCH 4/8] Update checkout statement for historic --- pkg/beaconclient/processhistoric.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/beaconclient/processhistoric.go b/pkg/beaconclient/processhistoric.go index 006544c..eb3e71b 100644 --- a/pkg/beaconclient/processhistoric.go +++ b/pkg/beaconclient/processhistoric.go @@ -40,7 +40,7 @@ var ( checkHpEntryStmt string = `SELECT * FROM ethcl.historic_process WHERE checked_out=false;` // Used to checkout a row from the ethcl.historic_process table lockHpEntryStmt string = `UPDATE ethcl.historic_process - SET checked_out=true + SET checked_out=true, checked_out_by=$3 WHERE start_slot=$1 AND end_slot=$2;` // Used to delete an entry from the ethcl.historic_process table deleteHpEntryStmt string = `DELETE FROM ethcl.historic_process -- 2.45.2 From 095dd3a39318c9f09c2c896402023232e032f611 Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Fri, 3 Jun 2022 11:56:43 -0400 Subject: [PATCH 5/8] update cicd pipeline --- .github/workflows/on-pr.yml | 190 +------------------------- .github/workflows/on-publish.yml | 7 + .github/workflows/tests.yml | 198 ++++++++++++++++++++++++++++ pkg/beaconclient/capturehistoric.go | 6 + 4 files changed, 218 insertions(+), 183 deletions(-) create mode 100644 .github/workflows/tests.yml diff --git a/.github/workflows/on-pr.yml b/.github/workflows/on-pr.yml index 1e7f347..c570dd7 100644 --- a/.github/workflows/on-pr.yml +++ b/.github/workflows/on-pr.yml @@ -22,189 +22,13 @@ on: - "!LICENSE" - "!.github/workflows/**" - ".github/workflows/on-pr.yml" + - ".github/workflows/tests.yml" - "**" -env: - stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref || 'main'}} - ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref || 'feature/historic-processing' }} - ssz-data-ref: ${{ github.event.inputs.ssz-data-ref || 'main' }} - GOPATH: /tmp/go jobs: - build: - name: Run Docker Build - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - with: - path: "./ipld-ethcl-indexer" - - - uses: actions/checkout@v3 - with: - ref: ${{ env.stack-orchestrator-ref }} - path: "./stack-orchestrator/" - repository: vulcanize/stack-orchestrator - fetch-depth: 0 - - - uses: actions/checkout@v3 - with: - ref: ${{ env.ipld-ethcl-db-ref }} - repository: vulcanize/ipld-ethcl-db - path: "./ipld-ethcl-db/" - token: ${{ secrets.GH_PAT }} - fetch-depth: 0 - - - name: Create config file - run: | - echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh - echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh - echo ethcl_capture_mode=boot >> ./config.sh - echo ethcl_skip_sync=true >> ./config.sh - echo ethcl_known_gap_increment=1000000 >> ./config.sh - cat ./config.sh - - - name: Run docker compose - run: | - docker-compose \ - -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \ - -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/latest/docker-compose-lighthouse.yml" \ - -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" \ - --env-file ./config.sh \ - up -d --build - - - name: Check to make sure HEALTH file is present - shell: bash - run: | - until $(docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" cp ipld-ethcl-indexer:/root/HEALTH ./HEALTH) ; do sleep 10; done - cat ./HEALTH - if [[ "$(cat ./HEALTH)" -eq "0" ]]; then echo "Application boot successful" && (exit 0); else docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" cp ipld-ethcl-indexer:/root/ipld-ethcl-indexer.log . && cat ipld-ethcl-indexer.log && (exit 1); fi - - unit-test: - name: Run Unit Tests - runs-on: ubuntu-latest - ## IF you want to update the default branch for `pull_request runs, do it after the ||` - steps: - - name: Create GOPATH - run: mkdir -p /tmp/go - - - uses: actions/checkout@v2 - with: - path: "./ipld-ethcl-indexer" - - - uses: actions/checkout@v3 - with: - ref: ${{ env.stack-orchestrator-ref }} - path: "./stack-orchestrator/" - repository: vulcanize/stack-orchestrator - fetch-depth: 0 - - - uses: actions/checkout@v3 - with: - ref: ${{ env.ipld-ethcl-db-ref }} - repository: vulcanize/ipld-ethcl-db - path: "./ipld-ethcl-db/" - token: ${{ secrets.GH_PAT }} - fetch-depth: 0 - - - uses: actions/checkout@v3 - with: - ref: ${{ env.ssz-data-ref }} - repository: vulcanize/ssz-data - path: "./ipld-ethcl-indexer/pkg/beaconclient/ssz-data" - fetch-depth: 0 - - - name: Create config file - run: | - echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh - echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh - cat ./config.sh - - - name: Run docker compose - run: | - docker-compose \ - -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \ - --env-file ./config.sh \ - up -d --build - - - uses: actions/setup-go@v3 - with: - go-version: ">=1.17.0" - check-latest: true - - - name: Install packages - run: | - go install github.com/onsi/ginkgo/v2/ginkgo@latest - which ginkgo - - - name: Run the tests using Make - run: | - cd ipld-ethcl-indexer - make unit-test-ci - - integration-test: - name: Run Integration Tests - runs-on: ubuntu-latest - steps: - - name: Create GOPATH - run: mkdir -p /tmp/go - - - uses: actions/checkout@v2 - with: - path: "./ipld-ethcl-indexer" - - - uses: actions/checkout@v3 - with: - ref: ${{ env.stack-orchestrator-ref }} - path: "./stack-orchestrator/" - repository: vulcanize/stack-orchestrator - fetch-depth: 0 - - - uses: actions/checkout@v3 - with: - ref: ${{ env.ipld-ethcl-db-ref }} - repository: vulcanize/ipld-ethcl-db - path: "./ipld-ethcl-db/" - token: ${{ secrets.GH_PAT }} - fetch-depth: 0 - - - name: Create config file - run: | - echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh - echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh - echo ethcl_capture_mode=boot >> ./config.sh - cat ./config.sh - - - name: Run docker compose - run: | - docker-compose \ - -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \ - -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/latest/docker-compose-lighthouse.yml" \ - --env-file ./config.sh \ - up -d --build - - - uses: actions/setup-go@v3 - with: - go-version: ">=1.17.0" - check-latest: true - - - name: Install packages - run: | - go install github.com/onsi/ginkgo/v2/ginkgo@latest - which ginkgo - - - name: Run the tests using Make - run: | - cd ipld-ethcl-indexer - make integration-test-ci - - golangci: - runs-on: ubuntu-latest - steps: - - uses: actions/setup-go@v3 - with: - go-version: ">=1.17.0" - - uses: actions/checkout@v3 - - name: golangci-lint - uses: golangci/golangci-lint-action@v3 - with: - args: --timeout 90s --disable deadcode,unused -# args: --timeout 90s --disable deadcode, + trigger-tests: + uses: ./.github/workflows/tests.yml + with: + stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref }} + ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref }} + ssz-data-ref: ${{ github.event.inputs.ssz-data-ref }} diff --git a/.github/workflows/on-publish.yml b/.github/workflows/on-publish.yml index 801a1c9..348b76b 100644 --- a/.github/workflows/on-publish.yml +++ b/.github/workflows/on-publish.yml @@ -3,9 +3,16 @@ on: release: types: [published, edited] jobs: + trigger-tests: + uses: ./.github/workflows/tests.yml + with: + stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref }} + ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref }} + ssz-data-ref: ${{ github.event.inputs.ssz-data-ref }} build: name: Run docker build runs-on: ubuntu-latest + needs: trigger-tests steps: - uses: actions/checkout@v2 - name: Get the version diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..85af609 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,198 @@ +name: Test the stack. +on: + workflow_call: + inputs: + stack-orchestrator-ref: + required: false + type: string + ipld-ethcl-db-ref: + required: false + type: string + ssz-data-ref: + required: false + type: string + +env: + stack-orchestrator-ref: ${{ inputs.stack-orchestrator-ref || 'develop' }} + ipld-ethcl-db-ref: ${{ inputs.ipld-ethcl-db-ref || 'feature/historic-processing' }} + ssz-data-ref: ${{ inputs.ssz-data-ref || 'main' }} + GOPATH: /tmp/go +jobs: + build: + name: Run Docker Build + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + path: "./ipld-ethcl-indexer" + + - uses: actions/checkout@v3 + with: + ref: ${{ env.stack-orchestrator-ref }} + path: "./stack-orchestrator/" + repository: vulcanize/stack-orchestrator + fetch-depth: 0 + + - uses: actions/checkout@v3 + with: + ref: ${{ env.ipld-ethcl-db-ref }} + repository: vulcanize/ipld-ethcl-db + path: "./ipld-ethcl-db/" + token: ${{ secrets.GH_PAT }} + fetch-depth: 0 + + - name: Create config file + run: | + echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh + echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh + echo ethcl_capture_mode=boot >> ./config.sh + echo ethcl_skip_sync=true >> ./config.sh + echo ethcl_known_gap_increment=1000000 >> ./config.sh + cat ./config.sh + + - name: Run docker compose + run: | + docker-compose \ + -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \ + -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/latest/docker-compose-lighthouse.yml" \ + -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" \ + --env-file ./config.sh \ + up -d --build + + - name: Check to make sure HEALTH file is present + shell: bash + run: | + until $(docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" cp ipld-ethcl-indexer:/root/HEALTH ./HEALTH) ; do sleep 10; done + cat ./HEALTH + if [[ "$(cat ./HEALTH)" -eq "0" ]]; then echo "Application boot successful" && (exit 0); else docker compose -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ipld-ethcl-indexer.yml" cp ipld-ethcl-indexer:/root/ipld-ethcl-indexer.log . && cat ipld-ethcl-indexer.log && (exit 1); fi + + unit-test: + name: Run Unit Tests + runs-on: ubuntu-latest + ## IF you want to update the default branch for `pull_request runs, do it after the ||` + steps: + - name: Create GOPATH + run: mkdir -p /tmp/go + + - uses: actions/checkout@v2 + with: + path: "./ipld-ethcl-indexer" + + - uses: actions/checkout@v3 + with: + ref: ${{ env.stack-orchestrator-ref }} + path: "./stack-orchestrator/" + repository: vulcanize/stack-orchestrator + fetch-depth: 0 + + - uses: actions/checkout@v3 + with: + ref: ${{ env.ipld-ethcl-db-ref }} + repository: vulcanize/ipld-ethcl-db + path: "./ipld-ethcl-db/" + token: ${{ secrets.GH_PAT }} + fetch-depth: 0 + + - uses: actions/checkout@v3 + with: + ref: ${{ env.ssz-data-ref }} + repository: vulcanize/ssz-data + path: "./ipld-ethcl-indexer/pkg/beaconclient/ssz-data" + fetch-depth: 0 + + - name: Create config file + run: | + echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh + echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh + cat ./config.sh + + - name: Run docker compose + run: | + docker-compose \ + -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \ + --env-file ./config.sh \ + up -d --build + + - uses: actions/setup-go@v3 + with: + go-version: ">=1.17.0" + check-latest: true + + - name: Install packages + run: | + go install github.com/onsi/ginkgo/v2/ginkgo@latest + which ginkgo + + - name: Run the tests using Make + run: | + cd ipld-ethcl-indexer + make unit-test-ci + + integration-test: + name: Run Integration Tests + runs-on: ubuntu-latest + steps: + - name: Create GOPATH + run: mkdir -p /tmp/go + + - uses: actions/checkout@v2 + with: + path: "./ipld-ethcl-indexer" + + - uses: actions/checkout@v3 + with: + ref: ${{ env.stack-orchestrator-ref }} + path: "./stack-orchestrator/" + repository: vulcanize/stack-orchestrator + fetch-depth: 0 + + - uses: actions/checkout@v3 + with: + ref: ${{ env.ipld-ethcl-db-ref }} + repository: vulcanize/ipld-ethcl-db + path: "./ipld-ethcl-db/" + token: ${{ secrets.GH_PAT }} + fetch-depth: 0 + + - name: Create config file + run: | + echo vulcanize_ipld_ethcl_db=$GITHUB_WORKSPACE/ipld-ethcl-db/ > ./config.sh + echo vulcanize_ipld_ethcl_indexer=$GITHUB_WORKSPACE/ipld-ethcl-indexer >> ./config.sh + echo ethcl_capture_mode=boot >> ./config.sh + cat ./config.sh + + - name: Run docker compose + run: | + docker-compose \ + -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-ethcl-db.yml" \ + -f "$GITHUB_WORKSPACE/stack-orchestrator/docker/latest/docker-compose-lighthouse.yml" \ + --env-file ./config.sh \ + up -d --build + + - uses: actions/setup-go@v3 + with: + go-version: ">=1.17.0" + check-latest: true + + - name: Install packages + run: | + go install github.com/onsi/ginkgo/v2/ginkgo@latest + which ginkgo + + - name: Run the tests using Make + run: | + cd ipld-ethcl-indexer + make integration-test-ci + + golangci: + runs-on: ubuntu-latest + steps: + - uses: actions/setup-go@v3 + with: + go-version: ">=1.17.0" + - uses: actions/checkout@v3 + - name: golangci-lint + uses: golangci/golangci-lint-action@v3 + with: + args: --timeout 90s --disable deadcode,unused +# args: --timeout 90s --disable deadcode, diff --git a/pkg/beaconclient/capturehistoric.go b/pkg/beaconclient/capturehistoric.go index 1b0631a..cd88c90 100644 --- a/pkg/beaconclient/capturehistoric.go +++ b/pkg/beaconclient/capturehistoric.go @@ -57,6 +57,12 @@ type BatchProcessing interface { releaseDbLocks() error // Update the checked_out column to false for whatever table is being updated. } +/// ^^^ +// Might be better to remove the interface and create a single struct that historicalProcessing +// and knownGapsProcessing can use. The struct would contain all the SQL strings that they need. +// And the only difference in logic for processing would be within the error handling. +// Which can be a function we pass into handleBatchProcess() + // A struct to pass around indicating a table entry for slots to process. type slotsToProcess struct { startSlot int // The start slot -- 2.45.2 From c64df8903ced38c66acea97c711355fb8b2ecd43 Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Fri, 3 Jun 2022 11:59:11 -0400 Subject: [PATCH 6/8] Add secret --- .github/workflows/on-pr.yml | 2 ++ .github/workflows/on-publish.yml | 2 ++ .github/workflows/tests.yml | 3 +++ 3 files changed, 7 insertions(+) diff --git a/.github/workflows/on-pr.yml b/.github/workflows/on-pr.yml index c570dd7..809b9b2 100644 --- a/.github/workflows/on-pr.yml +++ b/.github/workflows/on-pr.yml @@ -32,3 +32,5 @@ jobs: stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref }} ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref }} ssz-data-ref: ${{ github.event.inputs.ssz-data-ref }} + secrets: + GH_PAT: ${{secrets.GH_PAT}} diff --git a/.github/workflows/on-publish.yml b/.github/workflows/on-publish.yml index 348b76b..32bc92e 100644 --- a/.github/workflows/on-publish.yml +++ b/.github/workflows/on-publish.yml @@ -9,6 +9,8 @@ jobs: stack-orchestrator-ref: ${{ github.event.inputs.stack-orchestrator-ref }} ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref }} ssz-data-ref: ${{ github.event.inputs.ssz-data-ref }} + secrets: + GH_PAT: ${{secrets.GH_PAT}} build: name: Run docker build runs-on: ubuntu-latest diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 85af609..3977299 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -11,6 +11,9 @@ on: ssz-data-ref: required: false type: string + secrets: + GH_PAT: + required: true env: stack-orchestrator-ref: ${{ inputs.stack-orchestrator-ref || 'develop' }} -- 2.45.2 From 90cf03509f8cfd7b9ad3b911fe9bfed72d46b880 Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Fri, 3 Jun 2022 12:13:12 -0400 Subject: [PATCH 7/8] Use deploy-key instead of personal PAT --- .github/workflows/on-pr.yml | 2 +- .github/workflows/on-publish.yml | 2 +- .github/workflows/tests.yml | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/on-pr.yml b/.github/workflows/on-pr.yml index 809b9b2..ac5b146 100644 --- a/.github/workflows/on-pr.yml +++ b/.github/workflows/on-pr.yml @@ -33,4 +33,4 @@ jobs: ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref }} ssz-data-ref: ${{ github.event.inputs.ssz-data-ref }} secrets: - GH_PAT: ${{secrets.GH_PAT}} + GHA_KEY: ${{secrets.GHA_KEY}} diff --git a/.github/workflows/on-publish.yml b/.github/workflows/on-publish.yml index 32bc92e..11ffbaa 100644 --- a/.github/workflows/on-publish.yml +++ b/.github/workflows/on-publish.yml @@ -10,7 +10,7 @@ jobs: ipld-ethcl-db-ref: ${{ github.event.inputs.ipld-ethcl-db-ref }} ssz-data-ref: ${{ github.event.inputs.ssz-data-ref }} secrets: - GH_PAT: ${{secrets.GH_PAT}} + GHA_KEY: ${{secrets.GHA_KEY}} build: name: Run docker build runs-on: ubuntu-latest diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 3977299..059393c 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -12,7 +12,7 @@ on: required: false type: string secrets: - GH_PAT: + GHA_KEY: required: true env: @@ -41,7 +41,7 @@ jobs: ref: ${{ env.ipld-ethcl-db-ref }} repository: vulcanize/ipld-ethcl-db path: "./ipld-ethcl-db/" - token: ${{ secrets.GH_PAT }} + ssh-key: ${{secrets.GHA_KEY}} fetch-depth: 0 - name: Create config file -- 2.45.2 From 250d0b538da6220f72fd27da158d48f49bdaf42f Mon Sep 17 00:00:00 2001 From: Abdul Rabbani Date: Fri, 3 Jun 2022 12:17:14 -0400 Subject: [PATCH 8/8] update variables --- .github/workflows/tests.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 059393c..8077c67 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -93,7 +93,7 @@ jobs: ref: ${{ env.ipld-ethcl-db-ref }} repository: vulcanize/ipld-ethcl-db path: "./ipld-ethcl-db/" - token: ${{ secrets.GH_PAT }} + ssh-key: ${{ secrets.GHA_KEY }} fetch-depth: 0 - uses: actions/checkout@v3 @@ -154,7 +154,7 @@ jobs: ref: ${{ env.ipld-ethcl-db-ref }} repository: vulcanize/ipld-ethcl-db path: "./ipld-ethcl-db/" - token: ${{ secrets.GH_PAT }} + ssh-key: ${{secrets.GHA_KEY}} fetch-depth: 0 - name: Create config file -- 2.45.2