Feature/44 read write historic slots #46

Merged
abdulrabbani00 merged 10 commits from feature/44-read-write-historic-slots into develop 2022-05-24 20:18:55 +00:00
10 changed files with 202 additions and 61 deletions
Showing only changes of commit 771c7969f6 - Show all commits

View File

@ -206,4 +206,4 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
with:
args: --timeout 90s
args: --timeout 90s --disable deadcode

View File

@ -37,7 +37,7 @@ To run the application, do as follows:
2. Run the start up command.
```
go run -race main.go capture head --db.address localhost \
go run -race main.go capture historic --db.address localhost \
--db.password password \
--db.port 8076 \
--db.username vdbm \
@ -45,11 +45,14 @@ go run -race main.go capture head --db.address localhost \
--db.driver PGX \
--bc.address localhost \
--bc.port 5052 \
--bc.maxHistoricProcessWorker 2 \
--bc.maxKnownGapsWorker 2 \
--bc.knownGapsProcess=true \
--bc.connectionProtocol http \
--t.skipSync=true \
--log.level info \
--t.skipSync=false \
--log.level debug \
--log.output=true \
--kg.increment 100
--kg.increment 1000000
```
## Running Tests

View File

@ -25,21 +25,24 @@ import (
)
var (
dbUsername string
dbPassword string
dbName string
dbAddress string
dbDriver string
dbPort int
bcAddress string
bcPort int
bcBootRetryInterval int
bcBootMaxRetry int
bcConnectionProtocol string
bcType string
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
notifierCh chan os.Signal = make(chan os.Signal, 1)
testDisregardSync bool
dbUsername string
dbPassword string
dbName string
dbAddress string
dbDriver string
dbPort int
bcAddress string
bcPort int
bcBootRetryInterval int
bcBootMaxRetry int
bcConnectionProtocol string
bcType string
bcIsProcessKnownGaps bool
bcMaxHistoricProcessWorker int
bcMaxKnownGapsWorker int
maxWaitSecondsShutdown time.Duration = time.Duration(5) * time.Second
notifierCh chan os.Signal = make(chan os.Signal, 1)
testDisregardSync bool
)
// captureCmd represents the capture command
@ -84,6 +87,9 @@ func init() {
captureCmd.PersistentFlags().StringVarP(&bcConnectionProtocol, "bc.connectionProtocol", "", "http", "protocol for connecting to the beacon node.")
captureCmd.PersistentFlags().IntVarP(&bcBootRetryInterval, "bc.bootRetryInterval", "", 30, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcBootMaxRetry, "bc.bootMaxRetry", "", 5, "The amount of time to wait between retries while booting the application")
captureCmd.PersistentFlags().IntVarP(&bcMaxHistoricProcessWorker, "bc.maxHistoricProcessWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.")
captureCmd.PersistentFlags().IntVarP(&bcMaxKnownGapsWorker, "bc.maxKnownGapsWorker", "", 30, "The number of workers that should be actively processing slots from the ethcl.historic_process table. Be careful of system memory.")
captureCmd.PersistentFlags().BoolVar(&bcIsProcessKnownGaps, "bc.knownGapsProcess", false, "Should we process entries from the knownGaps table as they occur?")
err = captureCmd.MarkPersistentFlagRequired("bc.address")
exitErr(err)
err = captureCmd.MarkPersistentFlagRequired("bc.port")
@ -104,10 +110,10 @@ func init() {
exitErr(err)
err = viper.BindPFlag("db.name", captureCmd.PersistentFlags().Lookup("db.name"))
exitErr(err)
err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync"))
exitErr(err)
// Testing Specific
err = viper.BindPFlag("t.skipSync", captureCmd.PersistentFlags().Lookup("t.skipSync"))
exitErr(err)
err = viper.BindPFlag("t.driver", captureCmd.PersistentFlags().Lookup("db.driver"))
exitErr(err)
@ -124,6 +130,14 @@ func init() {
exitErr(err)
err = viper.BindPFlag("bc.bootMaxRetry", captureCmd.PersistentFlags().Lookup("bc.bootMaxRetry"))
exitErr(err)
err = viper.BindPFlag("bc.bootMaxRetry", captureCmd.PersistentFlags().Lookup("bc.bootMaxRetry"))
exitErr(err)
err = viper.BindPFlag("bc.knownGapsProcess", captureCmd.PersistentFlags().Lookup("bc.knownGapsProcess"))
exitErr(err)
err = viper.BindPFlag("bc.maxHistoricProcessWorker", captureCmd.PersistentFlags().Lookup("bc.maxHistoricProcessWorker"))
exitErr(err)
err = viper.BindPFlag("bc.maxKnownGapsWorker", captureCmd.PersistentFlags().Lookup("bc.maxKnownGapsWorker"))
exitErr(err)
// Here you will define your flags and configuration settings.
}

View File

@ -18,6 +18,7 @@ package cmd
import (
"context"
"fmt"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -25,6 +26,7 @@ import (
"github.com/vulcanize/ipld-ethcl-indexer/internal/boot"
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
"golang.org/x/sync/errgroup"
)
var (
@ -56,6 +58,20 @@ func startHeadTracking() {
log.Info("The Beacon Client has booted successfully!")
// Capture head blocks
go Bc.CaptureHead()
if bcIsProcessKnownGaps {
errG := new(errgroup.Group)
errG.Go(func() error {
errs := Bc.ProcessKnownGaps(bcMaxKnownGapsWorker)
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
}
return nil
})
if err := errG.Wait(); err != nil {
loghelper.LogError(err).Error("Error with knownGaps processing")
}
}
// Shutdown when the time is right.
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc)

View File

@ -18,6 +18,7 @@ package cmd
import (
"context"
"fmt"
"os"
log "github.com/sirupsen/logrus"
@ -26,6 +27,7 @@ import (
"github.com/vulcanize/ipld-ethcl-indexer/internal/shutdown"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
"golang.org/x/sync/errgroup"
)
// historicCmd represents the historic command
@ -49,12 +51,35 @@ func startHistoricProcessing() {
if err != nil {
StopApplicationPreBoot(err, Db)
}
errs := Bc.CaptureHistoric(2)
if errs != nil {
log.WithFields(log.Fields{
"TotalErrors": errs,
}).Error("The historical processing service ended after receiving too many errors.")
errG, _ := errgroup.WithContext(context.Background())
errG.Go(func() error {
errs := Bc.CaptureHistoric(bcMaxHistoricProcessWorker)
if len(errs) != 0 {
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing historic events")
return fmt.Errorf("Application ended because there were too many error when attempting to process historic")
}
}
return nil
})
if bcIsProcessKnownGaps {
errG.Go(func() error {
errs := Bc.ProcessKnownGaps(bcMaxKnownGapsWorker)
if len(errs) != 0 {
log.WithFields(log.Fields{"errs": errs}).Error("All errors when processing knownGaps")
return fmt.Errorf("Application ended because there were too many error when attempting to process knownGaps")
}
return nil
})
}
if err := errG.Wait(); err != nil {
loghelper.LogError(err).Error("Error with knownGaps processing")
}
log.Debug("WE ARE AT CHECKPOINT")
// Shutdown when the time is right.
err = shutdown.ShutdownServices(ctx, notifierCh, maxWaitSecondsShutdown, Db, Bc)

View File

@ -78,7 +78,7 @@ func handleBatchProcess(maxWorkers int, bp BatchProcessing, db sql.Database, ser
// Start workers
for w := 1; w <= maxWorkers; w++ {
log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting historic processing workers")
log.WithFields(log.Fields{"maxWorkers": maxWorkers}).Debug("Starting batch processing workers")
go processSlotRangeWorker(workCh, errCh, db, serverEndpoint, metrics)
}

View File

@ -26,8 +26,8 @@ var _ = Describe("Capturehistoric", func() {
time.Sleep(2 * time.Second)
validateSlot(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, 3, "proposed")
//validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE")
//validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW")
validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE")
validateBeaconState(bc, BeaconNodeTester.TestEvents["100"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW")
//validateSignedBeaconBlock(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, "0x629ae1587895043076500f4f5dcb202a47c2fc95d5b5c548cb83bc97bd2dbfe1", "0x8d3f027beef5cbd4f8b29fc831aba67a5d74768edca529f5596f07fd207865e1", "/blocks/QHVAEQBQGQ4TKNJUGAYDGNZRGM2DOZJSGZTDMMLEG5QTIYTCMRQTKYRSGNTGCMDCGI2WINLGMM2DMNJRGYYGMMTBHEZGINJSME3DGYRZGE4WE")
//validateBeaconState(bc, BeaconNodeTester.TestEvents["101"].HeadMessage, "/blocks/QHVAEQRQPBTDEOBWMEYDGNZZMMYDGOBWMEZWGN3CMUZDQZBQGVSDQMRZMY4GKYRXMIZDQMDDMM4WKZDFGE2TINBZMFTDEMDFMJRWIMBWME3WCNJW")

View File

@ -419,3 +419,20 @@ func calculateEpoch(slot int, slotPerEpoch int) string {
epoch := slot / slotPerEpoch
return strconv.Itoa(epoch)
}
// A helper function to check to see if the slot is processed.
func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (bool, error) {
processRow, err := db.Exec(context.Background(), checkProcessStmt, slot)
if err != nil {
return false, err
}
row, err := processRow.RowsAffected()
if err != nil {
return false, err
}
if row > 0 {
return true, nil
}
return false, nil
}

View File

@ -31,18 +31,19 @@ import (
)
var (
// Get a single highest priority and non-checked out row.
// Get a single highest priority and non-checked out row row from ethcl.historical_process
getHpEntryStmt string = `SELECT start_slot, end_slot FROM ethcl.historic_process
WHERE checked_out=false
ORDER BY priority ASC
LIMIT 1;`
// Used to periodically check to see if there is a new entry in the ethcl_historic_process table.
checkHpEntryStmt string = `INSERT INTO ethcl.historic_process (start_slot, end_slot) VALUES ($1, $2) ON CONFLICT (start_slot, end_slot) DO NOTHING;`
// Used to get the highest priority row that is not checked out, and to check it out within the ethcl.historic_process table.
// Used to periodically check to see if there is a new entry in the ethcl.historic_process table.
checkHpEntryStmt string = `SELECT * FROM ethcl.historic_process WHERE checked_out=false;`
// Used to checkout a row from the ethcl.historic_process table
lockHpEntryStmt string = `UPDATE ethcl.historic_process
SET checked_out=true
WHERE start_slot=$1 AND end_slot=$2;`
deleteSlotsEntryStmt string = `DELETE FROM ethcl.historic_process
// Used to delete an entry from the knownGaps table
deleteHpEntryStmt string = `DELETE FROM ethcl.historic_process
WHERE start_slot=$1 AND end_slot=$2;`
)
@ -58,7 +59,7 @@ func (hp historicProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error
// Remove the table entry.
func (hp historicProcessing) removeTableEntry(processCh <-chan slotsToProcess) error {
return removeRowPostProcess(hp.db, processCh, QueryBySlotStmt, deleteSlotsEntryStmt)
return removeRowPostProcess(hp.db, processCh, QueryBySlotStmt, deleteHpEntryStmt)
}
// Remove the table entry.
@ -93,7 +94,15 @@ func processSlotRangeWorker(workCh <-chan int, errCh chan<- batchHistoricError,
func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRowsStmt string, checkOutRowStmt string, slotCh chan<- slotsToProcess) []error {
errCount := make([]error, 0)
// 5 is an arbitrary number. It allows us to retry a few times before
// ending the application.
prevErrCount := 0
for len(errCount) < 5 {
if len(errCount) != prevErrCount {
log.WithFields(log.Fields{
"errCount": errCount,
}).Error("New error entry added")
}
processRow, err := db.Exec(context.Background(), checkNewRowsStmt)
if err != nil {
errCount = append(errCount, err)
@ -103,13 +112,11 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRow
errCount = append(errCount, err)
}
if row < 1 {
time.Sleep(100 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)
log.Debug("We are checking rows, be patient")
continue
}
log.Debug("Found a row, going to start processing.")
log.WithFields(log.Fields{
"ErrCount": errCount,
}).Debug("The ErrCounter")
log.Debug("We found a new row")
ctx := context.Background()
// Setup TX
@ -160,7 +167,7 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRow
errCount = append(errCount, err)
continue
}
if rows != 1 {
if rows == 0 {
loghelper.LogSlotRangeStatementError(strconv.Itoa(sp.startSlot), strconv.Itoa(sp.endSlot), checkOutRowStmt, err).WithFields(log.Fields{
"rowsReturn": rows,
}).Error("We did not lock a single row.")
@ -183,13 +190,13 @@ func getBatchProcessRow(db sql.Database, getStartEndSlotStmt string, checkNewRow
// After a row has been processed it should be removed from its appropriate table.
func removeRowPostProcess(db sql.Database, processCh <-chan slotsToProcess, checkProcessedStmt, removeStmt string) error {
errCh := make(chan error, 0)
errCh := make(chan error)
for {
slots := <-processCh
// Make sure the start and end slot exist in the slots table.
go func() {
finishedProcess := false
for finishedProcess == false {
for !finishedProcess {
isStartProcess, err := isSlotProcessed(db, checkProcessedStmt, strconv.Itoa(slots.startSlot))
if err != nil {
errCh <- err
@ -214,20 +221,3 @@ func removeRowPostProcess(db sql.Database, processCh <-chan slotsToProcess, chec
}
}
}
// A helper function to check to see if the slot is processed.
func isSlotProcessed(db sql.Database, checkProcessStmt string, slot string) (bool, error) {
processRow, err := db.Exec(context.Background(), checkProcessStmt, slot)
if err != nil {
return false, err
}
row, err := processRow.RowsAffected()
if err != nil {
return false, err
}
if row > 0 {
return true, nil
}
return false, nil
}

View File

@ -0,0 +1,76 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// This file contains all the code to process historic slots.
package beaconclient
import (
"strconv"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-ethcl-indexer/pkg/loghelper"
)
var (
// Get a single non-checked out row row from ethcl.known_gaps.
getKgEntryStmt string = `SELECT start_slot, end_slot FROM ethcl.known_gaps
WHERE checked_out=false
LIMIT 1;`
// Used to periodically check to see if there is a new entry in the ethcl.known_gaps table.
checkKgEntryStmt string = `SELECT * FROM ethcl.known_gaps WHERE checked_out=false;`
// Used to checkout a row from the ethcl.known_gaps table
lockKgEntryStmt string = `UPDATE ethcl.known_gaps
SET checked_out=true
WHERE start_slot=$1 AND end_slot=$2;`
// Used to delete an entry from the knownGaps table
deleteKgEntryStmt string = `DELETE FROM ethcl.known_gaps
WHERE start_slot=$1 AND end_slot=$2;`
)
type knownGapsProcessing struct {
db sql.Database
metrics *BeaconClientMetrics
}
// This function will perform all the heavy lifting for tracking the head of the chain.
func (bc *BeaconClient) ProcessKnownGaps(maxWorkers int) []error {
log.Info("We are starting the known gaps processing service.")
hp := knownGapsProcessing{db: bc.Db, metrics: bc.Metrics}
errs := handleBatchProcess(maxWorkers, hp, hp.db, bc.ServerEndpoint, bc.Metrics)
log.Debug("Exiting known gaps processing service")
return errs
}
// Get a single row of historical slots from the table.
func (kgp knownGapsProcessing) getSlotRange(slotCh chan<- slotsToProcess) []error {
return getBatchProcessRow(kgp.db, getKgEntryStmt, checkKgEntryStmt, lockKgEntryStmt, slotCh)
}
// Remove the table entry.
func (kgp knownGapsProcessing) removeTableEntry(processCh <-chan slotsToProcess) error {
return removeRowPostProcess(kgp.db, processCh, QueryBySlotStmt, deleteKgEntryStmt)
}
// Remove the table entry.
func (kgp knownGapsProcessing) handleProcessingErrors(errMessages <-chan batchHistoricError) {
for {
errMs := <-errMessages
loghelper.LogSlotError(strconv.Itoa(errMs.slot), errMs.err)
writeKnownGaps(kgp.db, 1, errMs.slot, errMs.slot, errMs.err, errMs.errProcess, kgp.metrics)
}
}