Use a transaction for writing, knowngaps, and reorgs #53
@ -47,7 +47,7 @@ func bootApp() {
|
|||||||
|
|
||||||
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
|
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
|
||||||
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
|
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
|
||||||
viper.GetInt("kg.increment"), "boot", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"))
|
viper.GetInt("kg.increment"), "boot", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"), viper.GetBool("bc.checkDb"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
StopApplicationPreBoot(err, Db)
|
StopApplicationPreBoot(err, Db)
|
||||||
}
|
}
|
||||||
|
@ -40,6 +40,7 @@ var (
|
|||||||
bcType string
|
bcType string
|
||||||
bcMaxHistoricProcessWorker int
|
bcMaxHistoricProcessWorker int
|
||||||
bcUniqueNodeIdentifier int
|
bcUniqueNodeIdentifier int
|
||||||
|
bcCheckDb bool
|
||||||
kgMaxWorker int
|
kgMaxWorker int
|
||||||
kgTableIncrement int
|
kgTableIncrement int
|
||||||
kgProcessGaps bool
|
kgProcessGaps bool
|
||||||
@ -95,6 +96,7 @@ func init() {
|
|||||||
captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application")
|
captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application")
|
||||||
captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.")
|
captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.")
|
||||||
captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.")
|
captureCmd.PersistentFlags().IntVarP(&bcUniqueNodeIdentifier, "bc.uniqueNodeIdentifier", "", 0, "The unique identifier of this application. Each application connecting to the DB should have a unique identifier.")
|
||||||
|
captureCmd.PersistentFlags().BoolVarP(&bcCheckDb, "bc.checkDb", "", true, "Should we check to see if the slot exists in the DB before writing it?")
|
||||||
// err = captureCmd.MarkPersistentFlagRequired("bc.address")
|
// err = captureCmd.MarkPersistentFlagRequired("bc.address")
|
||||||
// exitErr(err)
|
// exitErr(err)
|
||||||
// err = captureCmd.MarkPersistentFlagRequired("bc.port")
|
// err = captureCmd.MarkPersistentFlagRequired("bc.port")
|
||||||
@ -149,6 +151,8 @@ func init() {
|
|||||||
exitErr(err)
|
exitErr(err)
|
||||||
err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier"))
|
err = viper.BindPFlag("bc.uniqueNodeIdentifier", captureCmd.PersistentFlags().Lookup("bc.uniqueNodeIdentifier"))
|
||||||
exitErr(err)
|
exitErr(err)
|
||||||
|
err = viper.BindPFlag("bc.checkDb", captureCmd.PersistentFlags().Lookup("bc.checkDb"))
|
||||||
|
exitErr(err)
|
||||||
// Here you will define your flags and configuration settings.
|
// Here you will define your flags and configuration settings.
|
||||||
|
|
||||||
//// Known Gap Specific
|
//// Known Gap Specific
|
||||||
|
@ -50,7 +50,7 @@ func startHeadTracking() {
|
|||||||
|
|
||||||
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
|
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
|
||||||
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
|
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
|
||||||
viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"))
|
viper.GetInt("kg.increment"), "head", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"), viper.GetBool("bc.checkDb"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
StopApplicationPreBoot(err, Db)
|
StopApplicationPreBoot(err, Db)
|
||||||
}
|
}
|
||||||
|
@ -50,7 +50,7 @@ func startHistoricProcessing() {
|
|||||||
|
|
||||||
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
|
Bc, Db, err := boot.BootApplicationWithRetry(ctx, viper.GetString("db.address"), viper.GetInt("db.port"), viper.GetString("db.name"), viper.GetString("db.username"), viper.GetString("db.password"), viper.GetString("db.driver"),
|
||||||
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
|
viper.GetString("bc.address"), viper.GetInt("bc.port"), viper.GetString("bc.connectionProtocol"), viper.GetString("bc.type"), viper.GetInt("bc.bootRetryInterval"), viper.GetInt("bc.bootMaxRetry"),
|
||||||
viper.GetInt("kg.increment"), "historic", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"))
|
viper.GetInt("kg.increment"), "historic", viper.GetBool("t.skipSync"), viper.GetInt("bc.uniqueNodeIdentifier"), viper.GetBool("bc.checkDb"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
StopApplicationPreBoot(err, Db)
|
StopApplicationPreBoot(err, Db)
|
||||||
}
|
}
|
||||||
|
@ -15,7 +15,8 @@
|
|||||||
"bootMaxRetry": 5,
|
"bootMaxRetry": 5,
|
||||||
"maxHistoricProcessWorker": 2,
|
"maxHistoricProcessWorker": 2,
|
||||||
"connectionProtocol": "http",
|
"connectionProtocol": "http",
|
||||||
"uniqueNodeIdentifier": 100
|
"uniqueNodeIdentifier": 100,
|
||||||
|
"checkDb": true
|
||||||
},
|
},
|
||||||
"t": {
|
"t": {
|
||||||
"skipSync": true
|
"skipSync": true
|
||||||
|
@ -42,11 +42,11 @@ var (
|
|||||||
//
|
//
|
||||||
// 3. Make sure the node is synced, unless disregardSync is true.
|
// 3. Make sure the node is synced, unless disregardSync is true.
|
||||||
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
|
func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
|
||||||
bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) {
|
bcAddress string, bcPort int, bcConnectionProtocol string, bcKgTableIncrement int, disregardSync bool, uniqueNodeIdentifier int, checkDb bool) (*beaconclient.BeaconClient, sql.Database, error) {
|
||||||
log.Info("Booting the Application")
|
log.Info("Booting the Application")
|
||||||
|
|
||||||
log.Debug("Creating the Beacon Client")
|
log.Debug("Creating the Beacon Client")
|
||||||
Bc, err := beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement, uniqueNodeIdentifier)
|
Bc, err := beaconclient.CreateBeaconClient(ctx, bcConnectionProtocol, bcAddress, bcPort, bcKgTableIncrement, uniqueNodeIdentifier, checkDb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Bc, nil, err
|
return Bc, nil, err
|
||||||
}
|
}
|
||||||
@ -86,14 +86,14 @@ func BootApplication(ctx context.Context, dbHostname string, dbPort int, dbName
|
|||||||
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
|
// Add retry logic to ensure that we are give the Beacon Client and the DB time to start.
|
||||||
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
|
func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int, dbName string, dbUsername string, dbPassword string, driverName string,
|
||||||
bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int,
|
bcAddress string, bcPort int, bcConnectionProtocol string, bcType string, bcRetryInterval int, bcMaxRetry int, bcKgTableIncrement int,
|
||||||
startUpMode string, disregardSync bool, uniqueNodeIdentifier int) (*beaconclient.BeaconClient, sql.Database, error) {
|
startUpMode string, disregardSync bool, uniqueNodeIdentifier int, checkDb bool) (*beaconclient.BeaconClient, sql.Database, error) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if bcMaxRetry < 0 {
|
if bcMaxRetry < 0 {
|
||||||
i := 0
|
i := 0
|
||||||
for {
|
for {
|
||||||
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
|
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
|
||||||
bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier)
|
bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier, checkDb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"retryNumber": i,
|
"retryNumber": i,
|
||||||
@ -108,7 +108,7 @@ func BootApplicationWithRetry(ctx context.Context, dbHostname string, dbPort int
|
|||||||
} else {
|
} else {
|
||||||
for i := 0; i < bcMaxRetry; i++ {
|
for i := 0; i < bcMaxRetry; i++ {
|
||||||
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
|
BC, DB, err = BootApplication(ctx, dbHostname, dbPort, dbName, dbUsername, dbPassword, driverName,
|
||||||
bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier)
|
bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, disregardSync, uniqueNodeIdentifier, checkDb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"retryNumber": i,
|
"retryNumber": i,
|
||||||
|
@ -39,51 +39,52 @@ var _ = Describe("Boot", func() {
|
|||||||
bcBootMaxRetry int = 5
|
bcBootMaxRetry int = 5
|
||||||
bcKgTableIncrement int = 10
|
bcKgTableIncrement int = 10
|
||||||
bcUniqueIdentifier int = 100
|
bcUniqueIdentifier int = 100
|
||||||
|
bcCheckDb bool = false
|
||||||
)
|
)
|
||||||
Describe("Booting the application", Label("integration"), func() {
|
Describe("Booting the application", Label("integration"), func() {
|
||||||
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() {
|
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing head", func() {
|
||||||
It("Should connect successfully", func() {
|
It("Should connect successfully", func() {
|
||||||
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier)
|
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb)
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() {
|
Context("When the DB and BC are both up and running, we skip checking for a synced head, and we are processing historic ", func() {
|
||||||
It("Should connect successfully", func() {
|
It("Should connect successfully", func() {
|
||||||
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true, bcUniqueIdentifier)
|
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "historic", true, bcUniqueIdentifier, bcCheckDb)
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
Context("When the DB and BC are both up and running, and we check for a synced head", func() {
|
Context("When the DB and BC are both up and running, and we check for a synced head", func() {
|
||||||
It("Should not connect successfully", func() {
|
It("Should not connect successfully", func() {
|
||||||
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, bcUniqueIdentifier)
|
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, bcUniqueIdentifier, bcCheckDb)
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
Context("When the DB and BC are both up and running, we skip checking for a synced head, but the unique identifier is 0", func() {
|
Context("When the DB and BC are both up and running, we skip checking for a synced head, but the unique identifier is 0", func() {
|
||||||
It("Should not connect successfully", func() {
|
It("Should not connect successfully", func() {
|
||||||
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, 0)
|
_, db, err := boot.BootApplicationWithRetry(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", false, 0, bcCheckDb)
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
Context("When the DB is running but not the BC", func() {
|
Context("When the DB is running but not the BC", func() {
|
||||||
It("Should not connect successfully", func() {
|
It("Should not connect successfully", func() {
|
||||||
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier)
|
_, _, err := boot.BootApplication(context.Background(), dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier, bcCheckDb)
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
Context("When the BC is running but not the DB", func() {
|
Context("When the BC is running but not the DB", func() {
|
||||||
It("Should not connect successfully", func() {
|
It("Should not connect successfully", func() {
|
||||||
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier)
|
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, bcAddress, bcPort, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier, bcCheckDb)
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
Context("When neither the BC or DB are running", func() {
|
Context("When neither the BC or DB are running", func() {
|
||||||
It("Should not connect successfully", func() {
|
It("Should not connect successfully", func() {
|
||||||
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier)
|
_, _, err := boot.BootApplication(context.Background(), "hi", 10, dbName, dbUsername, dbPassword, dbDriver, "hi", 100, bcConnectionProtocol, bcKgTableIncrement, true, bcUniqueIdentifier, bcCheckDb)
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -50,6 +50,7 @@ var (
|
|||||||
bcBootMaxRetry int = 5
|
bcBootMaxRetry int = 5
|
||||||
bcKgTableIncrement int = 10
|
bcKgTableIncrement int = 10
|
||||||
bcUniqueIdentifier int = 100
|
bcUniqueIdentifier int = 100
|
||||||
|
bcCheckDb bool = false
|
||||||
maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second
|
maxWaitSecondsShutdown time.Duration = time.Duration(1) * time.Second
|
||||||
DB sql.Database
|
DB sql.Database
|
||||||
BC *beaconclient.BeaconClient
|
BC *beaconclient.BeaconClient
|
||||||
@ -62,7 +63,7 @@ var _ = Describe("Shutdown", func() {
|
|||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress,
|
BC, DB, err = boot.BootApplicationWithRetry(ctx, dbAddress, dbPort, dbName, dbUsername, dbPassword, dbDriver, bcAddress,
|
||||||
bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier)
|
bcPort, bcConnectionProtocol, bcType, bcBootRetryInterval, bcBootMaxRetry, bcKgTableIncrement, "head", true, bcUniqueIdentifier, bcCheckDb)
|
||||||
notifierCh = make(chan os.Signal, 1)
|
notifierCh = make(chan os.Signal, 1)
|
||||||
Expect(err).To(BeNil())
|
Expect(err).To(BeNil())
|
||||||
})
|
})
|
||||||
|
@ -51,6 +51,7 @@ type BeaconClient struct {
|
|||||||
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
|
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
|
||||||
UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node.
|
UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node.
|
||||||
KnownGapsProcess KnownGapsProcessing // object keeping track of knowngaps processing
|
KnownGapsProcess KnownGapsProcessing // object keeping track of knowngaps processing
|
||||||
|
CheckDb bool // Should we check the DB to see if the slot exists before processing it?
|
||||||
|
|
||||||
// Used for Head Tracking
|
// Used for Head Tracking
|
||||||
|
|
||||||
@ -88,7 +89,7 @@ type SseError struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// A Function to create the BeaconClient.
|
// A Function to create the BeaconClient.
|
||||||
func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int, uniqueNodeIdentifier int) (*BeaconClient, error) {
|
func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddress string, bcPort int, bcKgTableIncrement int, uniqueNodeIdentifier int, checkDb bool) (*BeaconClient, error) {
|
||||||
if uniqueNodeIdentifier == 0 {
|
if uniqueNodeIdentifier == 0 {
|
||||||
uniqueNodeIdentifier := rand.Int()
|
uniqueNodeIdentifier := rand.Int()
|
||||||
log.WithField("randomUniqueNodeIdentifier", uniqueNodeIdentifier).Warn("No uniqueNodeIdentifier provided, we are going to use a randomly generated one.")
|
log.WithField("randomUniqueNodeIdentifier", uniqueNodeIdentifier).Warn("No uniqueNodeIdentifier provided, we are going to use a randomly generated one.")
|
||||||
@ -109,6 +110,7 @@ func CreateBeaconClient(ctx context.Context, connectionProtocol string, bcAddres
|
|||||||
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
|
ReOrgTracking: createSseEvent[ChainReorg](endpoint, bcReorgTopicEndpoint),
|
||||||
Metrics: metrics,
|
Metrics: metrics,
|
||||||
UniqueNodeIdentifier: uniqueNodeIdentifier,
|
UniqueNodeIdentifier: uniqueNodeIdentifier,
|
||||||
|
CheckDb: checkDb,
|
||||||
//FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
|
//FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -204,6 +204,7 @@ var (
|
|||||||
dbDriver: dbDriver,
|
dbDriver: dbDriver,
|
||||||
knownGapsTableIncrement: knownGapsTableIncrement,
|
knownGapsTableIncrement: knownGapsTableIncrement,
|
||||||
bcUniqueIdentifier: bcUniqueIdentifier,
|
bcUniqueIdentifier: bcUniqueIdentifier,
|
||||||
|
checkDb: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
BeaconNodeTester = TestBeaconNode{
|
BeaconNodeTester = TestBeaconNode{
|
||||||
@ -424,6 +425,7 @@ type Config struct {
|
|||||||
dbDriver string
|
dbDriver string
|
||||||
knownGapsTableIncrement int
|
knownGapsTableIncrement int
|
||||||
bcUniqueIdentifier int
|
bcUniqueIdentifier int
|
||||||
|
checkDb bool
|
||||||
}
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////
|
||||||
@ -433,7 +435,7 @@ type Config struct {
|
|||||||
// Must run before each test. We can't use the beforeEach because of the way
|
// Must run before each test. We can't use the beforeEach because of the way
|
||||||
// Gingko treats race conditions.
|
// Gingko treats race conditions.
|
||||||
func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient {
|
func setUpTest(config Config, maxSlot string) *beaconclient.BeaconClient {
|
||||||
bc, err := beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement, config.bcUniqueIdentifier)
|
bc, err := beaconclient.CreateBeaconClient(context.Background(), config.protocol, config.address, config.port, config.knownGapsTableIncrement, config.bcUniqueIdentifier, config.checkDb)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver)
|
db, err := postgres.SetupPostgresDb(config.dbHost, config.dbPort, config.dbName, config.dbUser, config.dbPassword, config.dbDriver)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
@ -30,7 +30,7 @@ import (
|
|||||||
func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error {
|
func (bc *BeaconClient) CaptureHistoric(maxWorkers int) []error {
|
||||||
log.Info("We are starting the historical processing service.")
|
log.Info("We are starting the historical processing service.")
|
||||||
bc.HistoricalProcess = historicProcessing{db: bc.Db, metrics: bc.Metrics}
|
bc.HistoricalProcess = historicProcessing{db: bc.Db, metrics: bc.Metrics}
|
||||||
errs := handleBatchProcess(maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.finishProcessing, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics)
|
errs := handleBatchProcess(maxWorkers, bc.HistoricalProcess, bc.HistoricalProcess.finishProcessing, bc.HistoricalProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb)
|
||||||
log.Debug("Exiting Historical")
|
log.Debug("Exiting Historical")
|
||||||
return errs
|
return errs
|
||||||
}
|
}
|
||||||
@ -89,7 +89,7 @@ type batchHistoricError struct {
|
|||||||
// 4. Remove the slot entry from the DB.
|
// 4. Remove the slot entry from the DB.
|
||||||
//
|
//
|
||||||
// 5. Handle any errors.
|
// 5. Handle any errors.
|
||||||
func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics) []error {
|
func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, db sql.Database, serverEndpoint string, metrics *BeaconClientMetrics, checkDb bool) []error {
|
||||||
slotsCh := make(chan slotsToProcess)
|
slotsCh := make(chan slotsToProcess)
|
||||||
workCh := make(chan int)
|
workCh := make(chan int)
|
||||||
processedCh := make(chan slotsToProcess)
|
processedCh := make(chan slotsToProcess)
|
||||||
@ -99,7 +99,7 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, finishCh chan int, d
|
|||||||
// Start workers
|
// Start workers
|
||||||
for w := 1; w <= maxWorkers; w++ {
|
for w := 1; w <= maxWorkers; w++ {
|
||||||
log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers")
|
log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers")
|
||||||
go processSlotRangeWorker(workCh, errCh, db, serverEndpoint, metrics)
|
go processSlotRangeWorker(workCh, errCh, db, serverEndpoint, metrics, checkDb)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process all ranges and send each individual slot to the worker.
|
// Process all ranges and send each individual slot to the worker.
|
||||||
|
@ -16,7 +16,7 @@ import (
|
|||||||
var _ = Describe("Capturehistoric", func() {
|
var _ = Describe("Capturehistoric", func() {
|
||||||
|
|
||||||
Describe("Run the application in historic mode", Label("unit", "behavioral"), func() {
|
Describe("Run the application in historic mode", Label("unit", "behavioral"), func() {
|
||||||
Context("Phase0: When we need to process a single block in the ethcl.historic_process table.", Label("now"), func() {
|
Context("Phase0: When we need to process a single block in the ethcl.historic_process table.", func() {
|
||||||
It("Successfully Process the Block", func() {
|
It("Successfully Process the Block", func() {
|
||||||
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
bc := setUpTest(BeaconNodeTester.TestConfig, "99")
|
||||||
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
BeaconNodeTester.SetupBeaconNodeMock(BeaconNodeTester.TestEvents, BeaconNodeTester.TestConfig.protocol, BeaconNodeTester.TestConfig.address, BeaconNodeTester.TestConfig.port, BeaconNodeTester.TestConfig.dummyParentRoot)
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v4"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
|
||||||
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
|
||||||
@ -54,6 +55,14 @@ VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
|
|||||||
CheckProposedStmt string = `SELECT slot, block_root
|
CheckProposedStmt string = `SELECT slot, block_root
|
||||||
FROM ethcl.slots
|
FROM ethcl.slots
|
||||||
WHERE slot=$1 AND block_root=$2;`
|
WHERE slot=$1 AND block_root=$2;`
|
||||||
|
// Check to see if the slot and block_root exist in ethcl.signed_beacon_block
|
||||||
|
CheckSignedBeaconBlockStmt string = `SELECT slot, block_root
|
||||||
|
FROM ethcl.signed_beacon_block
|
||||||
|
WHERE slot=$1 AND block_root=$2`
|
||||||
|
// Check to see if the slot and state_root exist in ethcl.beacon_state
|
||||||
|
CheckBeaconStateStmt string = `SELECT slot, state_root
|
||||||
|
FROM ethcl.beacon_state
|
||||||
|
WHERE slot=$1 AND state_root=$2`
|
||||||
// Used to get a single slot from the table if it exists
|
// Used to get a single slot from the table if it exists
|
||||||
QueryBySlotStmt string = `SELECT slot
|
QueryBySlotStmt string = `SELECT slot
|
||||||
FROM ethcl.slots
|
FROM ethcl.slots
|
||||||
@ -75,6 +84,8 @@ VALUES ($1, $2, $3, $4, $5, $6) on CONFLICT (start_slot, end_slot) DO NOTHING`
|
|||||||
// Remove any of it from the processslot file.
|
// Remove any of it from the processslot file.
|
||||||
type DatabaseWriter struct {
|
type DatabaseWriter struct {
|
||||||
Db sql.Database
|
Db sql.Database
|
||||||
|
Tx sql.Tx
|
||||||
|
Ctx context.Context
|
||||||
Metrics *BeaconClientMetrics
|
Metrics *BeaconClientMetrics
|
||||||
DbSlots *DbSlots
|
DbSlots *DbSlots
|
||||||
DbSignedBeaconBlock *DbSignedBeaconBlock
|
DbSignedBeaconBlock *DbSignedBeaconBlock
|
||||||
@ -85,14 +96,21 @@ type DatabaseWriter struct {
|
|||||||
|
|
||||||
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string,
|
func CreateDatabaseWrite(db sql.Database, slot int, stateRoot string, blockRoot string, parentBlockRoot string,
|
||||||
eth1BlockHash string, status string, rawSignedBeaconBlock []byte, rawBeaconState []byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) {
|
eth1BlockHash string, status string, rawSignedBeaconBlock []byte, rawBeaconState []byte, metrics *BeaconClientMetrics) (*DatabaseWriter, error) {
|
||||||
|
ctx := context.Background()
|
||||||
|
tx, err := db.Begin(ctx)
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogError(err).Error("We are unable to Begin a SQL transaction")
|
||||||
|
}
|
||||||
dw := &DatabaseWriter{
|
dw := &DatabaseWriter{
|
||||||
Db: db,
|
Db: db,
|
||||||
|
Tx: tx,
|
||||||
|
Ctx: ctx,
|
||||||
rawBeaconState: rawBeaconState,
|
rawBeaconState: rawBeaconState,
|
||||||
rawSignedBeaconBlock: rawSignedBeaconBlock,
|
rawSignedBeaconBlock: rawSignedBeaconBlock,
|
||||||
Metrics: metrics,
|
Metrics: metrics,
|
||||||
}
|
}
|
||||||
dw.prepareSlotsModel(slot, stateRoot, blockRoot, status)
|
dw.prepareSlotsModel(slot, stateRoot, blockRoot, status)
|
||||||
err := dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot, eth1BlockHash)
|
err = dw.prepareSignedBeaconBlockModel(slot, blockRoot, parentBlockRoot, eth1BlockHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -150,28 +168,40 @@ func (dw *DatabaseWriter) prepareBeaconStateModel(slot int, stateRoot string) er
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write all the data for a given slot.
|
// Add all the data for a given slot to a SQL transaction.
|
||||||
func (dw *DatabaseWriter) writeFullSlot() error {
|
// Originally it wrote to each table individually.
|
||||||
|
func (dw *DatabaseWriter) transactFullSlot() error {
|
||||||
// If an error occurs, write to knownGaps table.
|
// If an error occurs, write to knownGaps table.
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"slot": dw.DbSlots.Slot,
|
"slot": dw.DbSlots.Slot,
|
||||||
}).Debug("Starting to write to the DB.")
|
}).Debug("Starting to write to the DB.")
|
||||||
err := dw.writeSlots()
|
err := dw.transactSlots()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl.slots table...")
|
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl.slots table...")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Debug("We finished writing to the ethcl.slots table.")
|
log.Debug("We finished writing to the ethcl.slots table.")
|
||||||
if dw.DbSlots.Status != "skipped" {
|
if dw.DbSlots.Status != "skipped" {
|
||||||
errG, _ := errgroup.WithContext(context.Background())
|
//errG, _ := errgroup.WithContext(context.Background())
|
||||||
errG.Go(func() error {
|
//errG.Go(func() error {
|
||||||
return dw.writeSignedBeaconBlocks()
|
// return dw.transactSignedBeaconBlocks()
|
||||||
})
|
//})
|
||||||
errG.Go(func() error {
|
//errG.Go(func() error {
|
||||||
return dw.writeBeaconState()
|
// return dw.transactBeaconState()
|
||||||
})
|
//})
|
||||||
if err := errG.Wait(); err != nil {
|
//if err := errG.Wait(); err != nil {
|
||||||
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block or state table...")
|
// loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block or state table...")
|
||||||
|
// return err
|
||||||
|
//}
|
||||||
|
// Might want to seperate writing to public.blocks so we can do this concurrently...
|
||||||
|
err := dw.transactSignedBeaconBlocks()
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl block table...")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = dw.transactBeaconState()
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("We couldn't write to the ethcl state table...")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -179,16 +209,16 @@ func (dw *DatabaseWriter) writeFullSlot() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write the information for the generic slots table. For now this is only one function.
|
// Add data for the ethcl.slots table to a transaction. For now this is only one function.
|
||||||
// But in the future if we need to incorporate any FK's or perform any actions to write to the
|
// But in the future if we need to incorporate any FK's or perform any actions to write to the
|
||||||
// slots table we can do it all here.
|
// slots table we can do it all here.
|
||||||
func (dw *DatabaseWriter) writeSlots() error {
|
func (dw *DatabaseWriter) transactSlots() error {
|
||||||
return dw.upsertSlots()
|
return dw.upsertSlots()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upsert to the ethcl.slots table.
|
// Upsert to the ethcl.slots table.
|
||||||
func (dw *DatabaseWriter) upsertSlots() error {
|
func (dw *DatabaseWriter) upsertSlots() error {
|
||||||
_, err := dw.Db.Exec(context.Background(), UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status)
|
_, err := dw.Tx.Exec(dw.Ctx, UpsertSlotsStmt, dw.DbSlots.Epoch, dw.DbSlots.Slot, dw.DbSlots.BlockRoot, dw.DbSlots.StateRoot, dw.DbSlots.Status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table")
|
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.slots table")
|
||||||
return err
|
return err
|
||||||
@ -196,8 +226,8 @@ func (dw *DatabaseWriter) upsertSlots() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write the information for the signed_beacon_block.
|
// Add the information for the signed_beacon_block to a transaction.
|
||||||
func (dw *DatabaseWriter) writeSignedBeaconBlocks() error {
|
func (dw *DatabaseWriter) transactSignedBeaconBlocks() error {
|
||||||
err := dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock)
|
err := dw.upsertPublicBlocks(dw.DbSignedBeaconBlock.MhKey, dw.rawSignedBeaconBlock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -211,7 +241,7 @@ func (dw *DatabaseWriter) writeSignedBeaconBlocks() error {
|
|||||||
|
|
||||||
// Upsert to public.blocks.
|
// Upsert to public.blocks.
|
||||||
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error {
|
func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error {
|
||||||
_, err := dw.Db.Exec(context.Background(), UpsertBlocksStmt, key, data)
|
_, err := dw.Tx.Exec(dw.Ctx, UpsertBlocksStmt, key, data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table")
|
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the public.blocks table")
|
||||||
return err
|
return err
|
||||||
@ -221,7 +251,7 @@ func (dw *DatabaseWriter) upsertPublicBlocks(key string, data []byte) error {
|
|||||||
|
|
||||||
// Upsert to the ethcl.signed_beacon_block table.
|
// Upsert to the ethcl.signed_beacon_block table.
|
||||||
func (dw *DatabaseWriter) upsertSignedBeaconBlock() error {
|
func (dw *DatabaseWriter) upsertSignedBeaconBlock() error {
|
||||||
_, err := dw.Db.Exec(context.Background(), UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.Eth1BlockHash, dw.DbSignedBeaconBlock.MhKey)
|
_, err := dw.Tx.Exec(dw.Ctx, UpsertSignedBeaconBlockStmt, dw.DbSignedBeaconBlock.Slot, dw.DbSignedBeaconBlock.BlockRoot, dw.DbSignedBeaconBlock.ParentBlock, dw.DbSignedBeaconBlock.Eth1BlockHash, dw.DbSignedBeaconBlock.MhKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table")
|
loghelper.LogSlotError(dw.DbSlots.Slot, err).WithFields(log.Fields{"block_root": dw.DbSignedBeaconBlock.BlockRoot}).Error("Unable to write to the slot to the ethcl.signed_beacon_block table")
|
||||||
return err
|
return err
|
||||||
@ -229,8 +259,8 @@ func (dw *DatabaseWriter) upsertSignedBeaconBlock() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write the information for the beacon_state.
|
// Add the information for the beacon_state to a transaction.
|
||||||
func (dw *DatabaseWriter) writeBeaconState() error {
|
func (dw *DatabaseWriter) transactBeaconState() error {
|
||||||
err := dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState)
|
err := dw.upsertPublicBlocks(dw.DbBeaconState.MhKey, dw.rawBeaconState)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -244,7 +274,7 @@ func (dw *DatabaseWriter) writeBeaconState() error {
|
|||||||
|
|
||||||
// Upsert to the ethcl.beacon_state table.
|
// Upsert to the ethcl.beacon_state table.
|
||||||
func (dw *DatabaseWriter) upsertBeaconState() error {
|
func (dw *DatabaseWriter) upsertBeaconState() error {
|
||||||
_, err := dw.Db.Exec(context.Background(), UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey)
|
_, err := dw.Tx.Exec(dw.Ctx, UpsertBeaconState, dw.DbBeaconState.Slot, dw.DbBeaconState.StateRoot, dw.DbBeaconState.MhKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table")
|
loghelper.LogSlotError(dw.DbSlots.Slot, err).Error("Unable to write to the slot to the ethcl.beacon_state table")
|
||||||
return err
|
return err
|
||||||
@ -252,23 +282,23 @@ func (dw *DatabaseWriter) upsertBeaconState() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update a given slot to be marked as forked. Provide the slot and the latest latestBlockRoot.
|
// Update a given slot to be marked as forked within a transaction. Provide the slot and the latest latestBlockRoot.
|
||||||
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
|
// We will mark all entries for the given slot that don't match the provided latestBlockRoot as forked.
|
||||||
func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
|
func transactReorgs(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
|
||||||
slotNum, strErr := strconv.Atoi(slot)
|
slotNum, strErr := strconv.Atoi(slot)
|
||||||
if strErr != nil {
|
if strErr != nil {
|
||||||
loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...")
|
loghelper.LogReorgError(slot, latestBlockRoot, strErr).Error("We can't convert the slot to an int...")
|
||||||
}
|
}
|
||||||
|
|
||||||
forkCount, err := updateForked(db, slot, latestBlockRoot)
|
forkCount, err := updateForked(tx, ctx, slot, latestBlockRoot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.")
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while updating all forks.")
|
||||||
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics)
|
transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics)
|
||||||
}
|
}
|
||||||
proposedCount, err := updateProposed(db, slot, latestBlockRoot)
|
proposedCount, err := updateProposed(tx, ctx, slot, latestBlockRoot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.")
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We ran into some trouble while trying to update the proposed slot.")
|
||||||
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics)
|
transactKnownGaps(tx, ctx, 1, slotNum, slotNum, err, "reorg", metrics)
|
||||||
}
|
}
|
||||||
|
|
||||||
if forkCount > 0 {
|
if forkCount > 0 {
|
||||||
@ -289,29 +319,37 @@ func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *
|
|||||||
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
||||||
"proposedCount": proposedCount,
|
"proposedCount": proposedCount,
|
||||||
}).Error("Too many rows were marked as proposed!")
|
}).Error("Too many rows were marked as proposed!")
|
||||||
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics)
|
transactKnownGaps(tx, ctx, 1, slotNum, slotNum, fmt.Errorf("Too many rows were marked as unproposed."), "reorg", metrics)
|
||||||
} else if proposedCount == 0 {
|
} else if proposedCount == 0 {
|
||||||
var count int
|
transactKnownGaps(tx, ctx, 1, slotNum, slotNum, fmt.Errorf("Unable to find properly proposed row in DB"), "reorg", metrics)
|
||||||
err := db.QueryRow(context.Background(), CheckProposedStmt, slot, latestBlockRoot).Scan(count)
|
|
||||||
if err != nil {
|
|
||||||
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("Unable to query proposed rows after reorg.")
|
|
||||||
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics)
|
|
||||||
} else if count != 1 {
|
|
||||||
loghelper.LogReorg(slot, latestBlockRoot).WithFields(log.Fields{
|
|
||||||
"proposedCount": count,
|
|
||||||
}).Warn("The proposed block was not marked as proposed...")
|
|
||||||
writeKnownGaps(db, 1, slotNum, slotNum, err, "reorg", metrics)
|
|
||||||
} else {
|
|
||||||
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
|
loghelper.LogReorg(slot, latestBlockRoot).Info("Updated the row that should have been marked as proposed.")
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
metrics.IncrementReorgsInsert(1)
|
metrics.IncrementReorgsInsert(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wrapper function that will create a transaction and execute the function.
|
||||||
|
func writeReorgs(db sql.Database, slot string, latestBlockRoot string, metrics *BeaconClientMetrics) {
|
||||||
|
ctx := context.Background()
|
||||||
|
tx, err := db.Begin(ctx)
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Fatal("Unable to create a new transaction for reorgs")
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
err := tx.Rollback(ctx)
|
||||||
|
if err != nil && err != pgx.ErrTxClosed {
|
||||||
|
loghelper.LogError(err).Error("We were unable to Rollback a transaction for reorgs")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
transactReorgs(tx, ctx, slot, latestBlockRoot, metrics)
|
||||||
|
if err = tx.Commit(ctx); err != nil {
|
||||||
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Fatal("Unable to execute the transaction for reorgs")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Update the slots table by marking the old slot's as forked.
|
// Update the slots table by marking the old slot's as forked.
|
||||||
func updateForked(db sql.Database, slot string, latestBlockRoot string) (int64, error) {
|
func updateForked(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string) (int64, error) {
|
||||||
res, err := db.Exec(context.Background(), UpdateForkedStmt, slot, latestBlockRoot)
|
res, err := tx.Exec(ctx, UpdateForkedStmt, slot, latestBlockRoot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the forked slots")
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the forked slots")
|
||||||
return 0, err
|
return 0, err
|
||||||
@ -324,8 +362,9 @@ func updateForked(db sql.Database, slot string, latestBlockRoot string) (int64,
|
|||||||
return count, err
|
return count, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64, error) {
|
// Mark a slot as proposed.
|
||||||
res, err := db.Exec(context.Background(), UpdateProposedStmt, slot, latestBlockRoot)
|
func updateProposed(tx sql.Tx, ctx context.Context, slot string, latestBlockRoot string) (int64, error) {
|
||||||
|
res, err := tx.Exec(ctx, UpdateProposedStmt, slot, latestBlockRoot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the proposed slot.")
|
loghelper.LogReorgError(slot, latestBlockRoot, err).Error("We are unable to update the ethcl.slots table with the proposed slot.")
|
||||||
return 0, err
|
return 0, err
|
||||||
@ -339,20 +378,26 @@ func updateProposed(db sql.Database, slot string, latestBlockRoot string) (int64
|
|||||||
return count, err
|
return count, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaos into
|
// A wrapper function to call upsertKnownGaps. This function will break down the range of known_gaps into
|
||||||
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
|
// smaller chunks. For example, instead of having an entry of 1-101, if we increment the entries by 10 slots, we would
|
||||||
// have 10 entries as follows: 1-10, 11-20, etc...
|
// have 10 entries as follows: 1-10, 11-20, etc...
|
||||||
func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) {
|
func transactKnownGaps(tx sql.Tx, ctx context.Context, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) {
|
||||||
|
var entryErrorMsg string
|
||||||
|
if entryError == nil {
|
||||||
|
entryErrorMsg = ""
|
||||||
|
} else {
|
||||||
|
entryErrorMsg = entryError.Error()
|
||||||
|
}
|
||||||
if endSlot-startSlot <= tableIncrement {
|
if endSlot-startSlot <= tableIncrement {
|
||||||
kgModel := DbKnownGaps{
|
kgModel := DbKnownGaps{
|
||||||
StartSlot: strconv.Itoa(startSlot),
|
StartSlot: strconv.Itoa(startSlot),
|
||||||
EndSlot: strconv.Itoa(endSlot),
|
EndSlot: strconv.Itoa(endSlot),
|
||||||
CheckedOut: false,
|
CheckedOut: false,
|
||||||
ReprocessingError: "",
|
ReprocessingError: "",
|
||||||
EntryError: entryError.Error(),
|
EntryError: entryErrorMsg,
|
||||||
EntryProcess: entryProcess,
|
EntryProcess: entryProcess,
|
||||||
}
|
}
|
||||||
upsertKnownGaps(db, kgModel, metric)
|
upsertKnownGaps(tx, ctx, kgModel, metric)
|
||||||
} else {
|
} else {
|
||||||
totalSlots := endSlot - startSlot
|
totalSlots := endSlot - startSlot
|
||||||
var chunks int
|
var chunks int
|
||||||
@ -374,18 +419,37 @@ func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot
|
|||||||
EndSlot: strconv.Itoa(tempEnd),
|
EndSlot: strconv.Itoa(tempEnd),
|
||||||
CheckedOut: false,
|
CheckedOut: false,
|
||||||
ReprocessingError: "",
|
ReprocessingError: "",
|
||||||
EntryError: entryError.Error(),
|
EntryError: entryErrorMsg,
|
||||||
EntryProcess: entryProcess,
|
EntryProcess: entryProcess,
|
||||||
}
|
}
|
||||||
upsertKnownGaps(db, kgModel, metric)
|
upsertKnownGaps(tx, ctx, kgModel, metric)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wrapper function, instead of adding the knownGaps entries to a transaction, it will
|
||||||
|
// create the transaction and write it.
|
||||||
|
func writeKnownGaps(db sql.Database, tableIncrement int, startSlot int, endSlot int, entryError error, entryProcess string, metric *BeaconClientMetrics) {
|
||||||
|
ctx := context.Background()
|
||||||
|
tx, err := db.Begin(ctx)
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Fatal("Unable to create a new transaction for knownGaps")
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
err := tx.Rollback(ctx)
|
||||||
|
if err != nil && err != pgx.ErrTxClosed {
|
||||||
|
loghelper.LogError(err).Error("We were unable to Rollback a transaction for reorgs")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
transactKnownGaps(tx, ctx, tableIncrement, startSlot, endSlot, entryError, entryProcess, metric)
|
||||||
|
if err = tx.Commit(ctx); err != nil {
|
||||||
|
loghelper.LogSlotRangeError(strconv.Itoa(startSlot), strconv.Itoa(endSlot), err).Fatal("Unable to execute the transaction for knownGaps")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// A function to upsert a single entry to the ethcl.known_gaps table.
|
// A function to upsert a single entry to the ethcl.known_gaps table.
|
||||||
func upsertKnownGaps(db sql.Database, knModel DbKnownGaps, metric *BeaconClientMetrics) {
|
func upsertKnownGaps(tx sql.Tx, ctx context.Context, knModel DbKnownGaps, metric *BeaconClientMetrics) {
|
||||||
_, err := db.Exec(context.Background(), UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot,
|
_, err := tx.Exec(ctx, UpsertKnownGapsStmt, knModel.StartSlot, knModel.EndSlot,
|
||||||
knModel.CheckedOut, knModel.ReprocessingError, knModel.EntryError, knModel.EntryProcess)
|
knModel.CheckedOut, knModel.ReprocessingError, knModel.EntryError, knModel.EntryProcess)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
@ -460,7 +524,55 @@ func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (boo
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
if row > 0 {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check to see if this slot is in the DB. Check ethcl.slots, ethcl.signed_beacon_block
|
||||||
|
// and ethcl.beacon_state. If the slot exists, return true
|
||||||
|
func IsSlotInDb(db sql.Database, slot string, blockRoot string, stateRoot string) (bool, error) {
|
||||||
|
var (
|
||||||
|
isInBeaconState bool
|
||||||
|
isInSignedBeaconBlock bool
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
errG, _ := errgroup.WithContext(context.Background())
|
||||||
|
errG.Go(func() error {
|
||||||
|
isInBeaconState, err = checkSlotAndRoot(db, CheckBeaconStateStmt, slot, stateRoot)
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogError(err).Error("Unable to check if the slot and stateroot exist in ethcl.beacon_state")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
errG.Go(func() error {
|
||||||
|
isInSignedBeaconBlock, err = checkSlotAndRoot(db, CheckSignedBeaconBlockStmt, slot, blockRoot)
|
||||||
|
if err != nil {
|
||||||
|
loghelper.LogError(err).Error("Unable to check if the slot and block_root exist in ethcl.signed_beacon_block")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err := errG.Wait(); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if isInBeaconState && isInSignedBeaconBlock {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Provide a statement, slot, and root, and this function will check to see
|
||||||
|
// if the slot and root exist in the table.
|
||||||
|
func checkSlotAndRoot(db sql.Database, statement, slot, root string) (bool, error) {
|
||||||
|
processRow, err := db.Exec(context.Background(), statement, slot, root)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
row, err := processRow.RowsAffected()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
if row > 0 {
|
if row > 0 {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
@ -31,9 +31,9 @@ var _ = Describe("Healthcheck", func() {
|
|||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
var err error
|
var err error
|
||||||
Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10, bcUniqueIdentifier)
|
Bc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "localhost", 5052, 10, bcUniqueIdentifier, false)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
errBc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10, bcUniqueIdentifier)
|
errBc, err = beaconclient.CreateBeaconClient(context.Background(), "http", "blah-blah", 1010, 10, bcUniqueIdentifier, false)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
})
|
})
|
||||||
|
@ -60,7 +60,7 @@ func (bc *BeaconClient) handleHead() {
|
|||||||
|
|
||||||
log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.")
|
log.WithFields(log.Fields{"head": head}).Debug("We are going to start processing the slot.")
|
||||||
|
|
||||||
go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement)
|
go processHeadSlot(bc.Db, bc.ServerEndpoint, slot, head.Block, head.State, bc.PreviousSlot, bc.PreviousBlockRoot, bc.Metrics, bc.KnownGapTableIncrement, bc.CheckDb)
|
||||||
|
|
||||||
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.")
|
log.WithFields(log.Fields{"head": head.Slot}).Debug("We finished calling processHeadSlot.")
|
||||||
|
|
||||||
|
@ -94,16 +94,16 @@ func (hp historicProcessing) releaseDbLocks() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Process the slot range.
|
// Process the slot range.
|
||||||
func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics) {
|
func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError, db sql.Database, serverAddress string, metrics *BeaconClientMetrics, checkDb bool) {
|
||||||
for slot := range workCh {
|
for slot := range workCh {
|
||||||
log.Debug("Handling slot: ", slot)
|
log.Debug("Handling slot: ", slot)
|
||||||
err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics)
|
err, errProcess := handleHistoricSlot(db, serverAddress, slot, metrics, checkDb)
|
||||||
|
if err != nil {
|
||||||
errMs := batchHistoricError{
|
errMs := batchHistoricError{
|
||||||
err: err,
|
err: err,
|
||||||
errProcess: errProcess,
|
errProcess: errProcess,
|
||||||
slot: slot,
|
slot: slot,
|
||||||
}
|
}
|
||||||
if err != nil {
|
|
||||||
errCh <- errMs
|
errCh <- errMs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -150,7 +150,7 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRow
|
|||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
err := tx.Rollback(ctx)
|
err := tx.Rollback(ctx)
|
||||||
if err != nil {
|
if err != nil && err != pgx.ErrTxClosed {
|
||||||
loghelper.LogError(err).Error("We were unable to Rollback a transaction")
|
loghelper.LogError(err).Error("We were unable to Rollback a transaction")
|
||||||
errCount = append(errCount, err)
|
errCount = append(errCount, err)
|
||||||
}
|
}
|
||||||
|
@ -61,7 +61,7 @@ type KnownGapsProcessing struct {
|
|||||||
func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error {
|
func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error {
|
||||||
log.Info("We are starting the known gaps processing service.")
|
log.Info("We are starting the known gaps processing service.")
|
||||||
bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics, finishProcessing: make(chan int)}
|
bc.KnownGapsProcess = KnownGapsProcessing{db: bc.Db, uniqueNodeIdentifier: bc.UniqueNodeIdentifier, metrics: bc.Metrics, finishProcessing: make(chan int)}
|
||||||
errs := handleBatchProcess(maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.finishProcessing, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics)
|
errs := handleBatchProcess(maxWorkers, bc.KnownGapsProcess, bc.KnownGapsProcess.finishProcessing, bc.KnownGapsProcess.db, bc.ServerEndpoint, bc.Metrics, bc.CheckDb)
|
||||||
log.Debug("Exiting known gaps processing service")
|
log.Debug("Exiting known gaps processing service")
|
||||||
return errs
|
return errs
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v4"
|
||||||
si "github.com/prysmaticlabs/prysm/consensus-types/interfaces"
|
si "github.com/prysmaticlabs/prysm/consensus-types/interfaces"
|
||||||
"github.com/prysmaticlabs/prysm/consensus-types/wrapper"
|
"github.com/prysmaticlabs/prysm/consensus-types/wrapper"
|
||||||
dt "github.com/prysmaticlabs/prysm/encoding/ssz/detect"
|
dt "github.com/prysmaticlabs/prysm/encoding/ssz/detect"
|
||||||
@ -79,7 +80,7 @@ type ProcessSlot struct {
|
|||||||
// This function will do all the work to process the slot and write it to the DB.
|
// This function will do all the work to process the slot and write it to the DB.
|
||||||
// It will return the error and error process. The error process is used for providing reach detail to the
|
// It will return the error and error process. The error process is used for providing reach detail to the
|
||||||
// known_gaps table.
|
// known_gaps table.
|
||||||
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) (error, string) {
|
func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, headOrHistoric string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) (error, string) {
|
||||||
ps := &ProcessSlot{
|
ps := &ProcessSlot{
|
||||||
Slot: slot,
|
Slot: slot,
|
||||||
BlockRoot: blockRoot,
|
BlockRoot: blockRoot,
|
||||||
@ -114,13 +115,34 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
|
|||||||
return err, "processSlot"
|
return err, "processSlot"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
finalBlockRoot, finalStateRoot, finalEth1BlockHash, err := ps.provideFinalHash()
|
||||||
|
if err != nil {
|
||||||
|
return err, "CalculateBlockRoot"
|
||||||
|
}
|
||||||
|
if checkDb {
|
||||||
|
inDb, err := IsSlotInDb(ps.Db, strconv.Itoa(ps.Slot), finalBlockRoot, finalStateRoot)
|
||||||
|
if err != nil {
|
||||||
|
return err, "checkDb"
|
||||||
|
}
|
||||||
|
if inDb {
|
||||||
|
log.WithField("slot", slot).Info("Slot already in the DB.")
|
||||||
|
return nil, ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Get this object ready to write
|
// Get this object ready to write
|
||||||
dw, err := ps.createWriteObjects()
|
dw, err := ps.createWriteObjects(finalBlockRoot, finalStateRoot, finalEth1BlockHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err, "blockRoot"
|
return err, "blockRoot"
|
||||||
}
|
}
|
||||||
// Write the object to the DB.
|
// Write the object to the DB.
|
||||||
err = dw.writeFullSlot()
|
defer func() {
|
||||||
|
err := dw.Tx.Rollback(dw.Ctx)
|
||||||
|
if err != nil && err != pgx.ErrTxClosed {
|
||||||
|
loghelper.LogError(err).Error("We were unable to Rollback a transaction")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
err = dw.transactFullSlot()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err, "processSlot"
|
return err, "processSlot"
|
||||||
}
|
}
|
||||||
@ -131,26 +153,32 @@ func processFullSlot(db sql.Database, serverAddress string, slot int, blockRoot
|
|||||||
return fmt.Errorf("headOrHistoric must be either historic or head!"), ""
|
return fmt.Errorf("headOrHistoric must be either historic or head!"), ""
|
||||||
}
|
}
|
||||||
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" {
|
if ps.HeadOrHistoric == "head" && previousSlot != 0 && previousBlockRoot != "" && ps.Status != "skipped" {
|
||||||
ps.checkPreviousSlot(previousSlot, previousBlockRoot, knownGapsTableIncrement)
|
ps.checkPreviousSlot(dw.Tx, dw.Ctx, previousSlot, previousBlockRoot, knownGapsTableIncrement)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Commit the transaction
|
||||||
|
if err = dw.Tx.Commit(dw.Ctx); err != nil {
|
||||||
|
return err, "transactionCommit"
|
||||||
|
}
|
||||||
|
|
||||||
return nil, ""
|
return nil, ""
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
|
// Handle a slot that is at head. A wrapper function for calling `handleFullSlot`.
|
||||||
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int) {
|
func processHeadSlot(db sql.Database, serverAddress string, slot int, blockRoot string, stateRoot string, previousSlot int, previousBlockRoot string, metrics *BeaconClientMetrics, knownGapsTableIncrement int, checkDb bool) {
|
||||||
// Get the knownGaps at startUp.
|
// Get the knownGaps at startUp.
|
||||||
if previousSlot == 0 && previousBlockRoot == "" {
|
if previousSlot == 0 && previousBlockRoot == "" {
|
||||||
writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics)
|
writeStartUpGaps(db, knownGapsTableIncrement, slot, metrics)
|
||||||
}
|
}
|
||||||
err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement)
|
err, errReason := processFullSlot(db, serverAddress, slot, blockRoot, stateRoot, previousSlot, previousBlockRoot, "head", metrics, knownGapsTableIncrement, checkDb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics)
|
writeKnownGaps(db, knownGapsTableIncrement, slot, slot, err, errReason, metrics)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle a historic slot. A wrapper function for calling `handleFullSlot`.
|
// Handle a historic slot. A wrapper function for calling `handleFullSlot`.
|
||||||
func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics) (error, string) {
|
func handleHistoricSlot(db sql.Database, serverAddress string, slot int, metrics *BeaconClientMetrics, checkDb bool) (error, string) {
|
||||||
return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1)
|
return processFullSlot(db, serverAddress, slot, "", "", 0, "", "historic", metrics, 1, checkDb)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values.
|
// Update the SszSignedBeaconBlock and FullSignedBeaconBlock object with their respective values.
|
||||||
@ -232,14 +260,14 @@ func (ps *ProcessSlot) getBeaconState(serverEndpoint string, vmCh chan<- *dt.Ver
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check to make sure that the previous block we processed is the parent of the current block.
|
// Check to make sure that the previous block we processed is the parent of the current block.
|
||||||
func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) {
|
func (ps *ProcessSlot) checkPreviousSlot(tx sql.Tx, ctx context.Context, previousSlot int, previousBlockRoot string, knownGapsTableIncrement int) {
|
||||||
parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot())
|
parentRoot := "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().ParentRoot())
|
||||||
if previousSlot == int(ps.FullBeaconState.Slot()) {
|
if previousSlot == int(ps.FullBeaconState.Slot()) {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"slot": ps.FullBeaconState.Slot(),
|
"slot": ps.FullBeaconState.Slot(),
|
||||||
"fork": true,
|
"fork": true,
|
||||||
}).Warn("A fork occurred! The previous slot and current slot match.")
|
}).Warn("A fork occurred! The previous slot and current slot match.")
|
||||||
writeReorgs(ps.Db, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
|
transactReorgs(tx, ctx, strconv.Itoa(ps.Slot), ps.BlockRoot, ps.Metrics)
|
||||||
} else if previousSlot > int(ps.FullBeaconState.Slot()) {
|
} else if previousSlot > int(ps.FullBeaconState.Slot()) {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"previousSlot": previousSlot,
|
"previousSlot": previousSlot,
|
||||||
@ -250,27 +278,43 @@ func (ps *ProcessSlot) checkPreviousSlot(previousSlot int, previousBlockRoot str
|
|||||||
"previousSlot": previousSlot,
|
"previousSlot": previousSlot,
|
||||||
"currentSlot": ps.FullBeaconState.Slot(),
|
"currentSlot": ps.FullBeaconState.Slot(),
|
||||||
}).Error("We skipped a few slots.")
|
}).Error("We skipped a few slots.")
|
||||||
writeKnownGaps(ps.Db, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot())-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics)
|
transactKnownGaps(tx, ctx, knownGapsTableIncrement, previousSlot+1, int(ps.FullBeaconState.Slot())-1, fmt.Errorf("Gaps during head processing"), "headGaps", ps.Metrics)
|
||||||
} else if previousBlockRoot != parentRoot {
|
} else if previousBlockRoot != parentRoot {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"previousBlockRoot": previousBlockRoot,
|
"previousBlockRoot": previousBlockRoot,
|
||||||
"currentBlockParent": parentRoot,
|
"currentBlockParent": parentRoot,
|
||||||
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
|
}).Error("The previousBlockRoot does not match the current blocks parent, an unprocessed fork might have occurred.")
|
||||||
writeReorgs(ps.Db, strconv.Itoa(previousSlot), parentRoot, ps.Metrics)
|
transactReorgs(tx, ctx, strconv.Itoa(previousSlot), parentRoot, ps.Metrics)
|
||||||
} else {
|
} else {
|
||||||
log.Debug("Previous Slot and Current Slot are one distance from each other.")
|
log.Debug("Previous Slot and Current Slot are one distance from each other.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Transforms all the raw data into DB models that can be written to the DB.
|
// Transforms all the raw data into DB models that can be written to the DB.
|
||||||
func (ps *ProcessSlot) createWriteObjects() (*DatabaseWriter, error) {
|
func (ps *ProcessSlot) createWriteObjects(blockRoot, stateRoot, eth1BlockHash string) (*DatabaseWriter, error) {
|
||||||
|
var status string
|
||||||
|
if ps.Status != "" {
|
||||||
|
status = ps.Status
|
||||||
|
} else {
|
||||||
|
status = "proposed"
|
||||||
|
}
|
||||||
|
|
||||||
|
dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics)
|
||||||
|
if err != nil {
|
||||||
|
return dw, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return dw, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// This function will return the final blockRoot, stateRoot, and eth1BlockHash that will be
|
||||||
|
// used to write to a DB
|
||||||
|
func (ps *ProcessSlot) provideFinalHash() (string, string, string, error) {
|
||||||
var (
|
var (
|
||||||
stateRoot string
|
stateRoot string
|
||||||
blockRoot string
|
blockRoot string
|
||||||
status string
|
|
||||||
eth1BlockHash string
|
eth1BlockHash string
|
||||||
)
|
)
|
||||||
|
|
||||||
if ps.Status == "skipped" {
|
if ps.Status == "skipped" {
|
||||||
stateRoot = ""
|
stateRoot = ""
|
||||||
blockRoot = ""
|
blockRoot = ""
|
||||||
@ -290,24 +334,12 @@ func (ps *ProcessSlot) createWriteObjects() (*DatabaseWriter, error) {
|
|||||||
rawBlockRoot, err := ps.FullSignedBeaconBlock.Block().HashTreeRoot()
|
rawBlockRoot, err := ps.FullSignedBeaconBlock.Block().HashTreeRoot()
|
||||||
//blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot))
|
//blockRoot, err = queryBlockRoot(blockRootEndpoint, strconv.Itoa(ps.Slot))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return "", "", "", err
|
||||||
}
|
}
|
||||||
blockRoot = "0x" + hex.EncodeToString(rawBlockRoot[:])
|
blockRoot = "0x" + hex.EncodeToString(rawBlockRoot[:])
|
||||||
log.WithFields(log.Fields{"blockRoot": blockRoot}).Debug("Block Root from ssz:")
|
log.WithFields(log.Fields{"blockRoot": blockRoot}).Debug("Block Root from ssz")
|
||||||
}
|
}
|
||||||
eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().Body().Eth1Data().BlockHash)
|
eth1BlockHash = "0x" + hex.EncodeToString(ps.FullSignedBeaconBlock.Block().Body().Eth1Data().BlockHash)
|
||||||
}
|
}
|
||||||
|
return blockRoot, stateRoot, eth1BlockHash, nil
|
||||||
if ps.Status != "" {
|
|
||||||
status = ps.Status
|
|
||||||
} else {
|
|
||||||
status = "proposed"
|
|
||||||
}
|
|
||||||
|
|
||||||
dw, err := CreateDatabaseWrite(ps.Db, ps.Slot, stateRoot, blockRoot, ps.ParentBlockRoot, eth1BlockHash, status, ps.SszSignedBeaconBlock, ps.SszBeaconState, ps.Metrics)
|
|
||||||
if err != nil {
|
|
||||||
return dw, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return dw, nil
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user