2022-05-24 20:18:55 +00:00
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// This file will call all the functions to start and stop capturing the head of the beacon chain.
package beaconclient
import (
2022-06-09 21:32:46 +00:00
"context"
2022-05-24 20:18:55 +00:00
"fmt"
log "github.com/sirupsen/logrus"
2022-06-09 21:32:46 +00:00
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql"
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/loghelper"
2022-05-24 20:18:55 +00:00
"golang.org/x/sync/errgroup"
)
// This function will perform all the heavy lifting for tracking the head of the chain.
2022-06-09 21:32:46 +00:00
func ( bc * BeaconClient ) CaptureHistoric ( ctx context . Context , maxWorkers int ) [ ] error {
2022-05-24 20:18:55 +00:00
log . Info ( "We are starting the historical processing service." )
2022-06-09 21:32:46 +00:00
bc . HistoricalProcess = HistoricProcessing { db : bc . Db , metrics : bc . Metrics , uniqueNodeIdentifier : bc . UniqueNodeIdentifier }
errs := handleBatchProcess ( ctx , maxWorkers , bc . HistoricalProcess , bc . HistoricalProcess . db , bc . ServerEndpoint , bc . Metrics , bc . CheckDb )
2022-05-24 20:18:55 +00:00
log . Debug ( "Exiting Historical" )
return errs
}
2022-06-03 16:47:13 +00:00
// This function will perform all the necessary clean up tasks for stopping historical processing.
2022-06-09 21:32:46 +00:00
func ( bc * BeaconClient ) StopHistoric ( cancel context . CancelFunc ) error {
2022-06-03 16:47:13 +00:00
log . Info ( "We are stopping the historical processing service." )
2022-06-09 21:32:46 +00:00
err := bc . HistoricalProcess . releaseDbLocks ( cancel )
2022-06-03 16:47:13 +00:00
if err != nil {
2022-06-09 21:32:46 +00:00
loghelper . LogError ( err ) . WithField ( "uniqueIdentifier" , bc . UniqueNodeIdentifier ) . Error ( "We were unable to remove the locks from the eth_beacon.historic_processing table. Manual Intervention is needed!" )
2022-06-03 16:47:13 +00:00
}
return nil
}
2022-05-24 20:18:55 +00:00
// An interface to enforce any batch processing. Currently there are two use cases for this.
//
// 1. Historic Processing
//
// 2. Known Gaps Processing
type BatchProcessing interface {
2022-06-09 21:32:46 +00:00
getSlotRange ( context . Context , chan <- slotsToProcess ) [ ] error // Write the slots to process in a channel, return an error if you cant get the next slots to write.
handleProcessingErrors ( context . Context , <- chan batchHistoricError ) // Custom logic to handle errors.
removeTableEntry ( context . Context , <- chan slotsToProcess ) error // With the provided start and end slot, remove the entry from the database.
releaseDbLocks ( context . CancelFunc ) error // Update the checked_out column to false for whatever table is being updated.
2022-05-24 20:18:55 +00:00
}
2022-06-03 16:47:13 +00:00
/// ^^^
// Might be better to remove the interface and create a single struct that historicalProcessing
// and knownGapsProcessing can use. The struct would contain all the SQL strings that they need.
// And the only difference in logic for processing would be within the error handling.
// Which can be a function we pass into handleBatchProcess()
2022-05-24 20:18:55 +00:00
// A struct to pass around indicating a table entry for slots to process.
type slotsToProcess struct {
startSlot int // The start slot
endSlot int // The end slot
}
type batchHistoricError struct {
err error // The error that occurred when attempting to a slot
errProcess string // The process that caused the error.
slot int // The slot which the error is for.
}
// Wrapper function for the BatchProcessing interface.
// This function will take the structure that needs batch processing.
// It follows a generic format.
// Get new entries from any given table.
// 1. Add it to the slotsCh.
//
// 2. Run the maximum specified workers to handle individual slots. We need a maximum because we don't want
// To store too many SSZ objects in memory.
//
// 3. Process the slots and send the err to the ErrCh. Each structure can define how it wants its own errors handled.
//
// 4. Remove the slot entry from the DB.
//
// 5. Handle any errors.
2022-06-09 21:32:46 +00:00
func handleBatchProcess ( ctx context . Context , maxWorkers int , bp BatchProcessing , db sql . Database , serverEndpoint string , metrics * BeaconClientMetrics , checkDb bool ) [ ] error {
2022-05-24 20:18:55 +00:00
slotsCh := make ( chan slotsToProcess )
workCh := make ( chan int )
processedCh := make ( chan slotsToProcess )
errCh := make ( chan batchHistoricError )
2022-06-03 16:47:13 +00:00
finalErrCh := make ( chan [ ] error , 1 )
2022-05-24 20:18:55 +00:00
// Start workers
for w := 1 ; w <= maxWorkers ; w ++ {
log . WithFields ( log . Fields { "maxWorkers" : maxWorkers } ) . Debug ( "Starting batch processing workers" )
2022-06-09 21:32:46 +00:00
go processSlotRangeWorker ( ctx , workCh , errCh , db , serverEndpoint , metrics , checkDb )
2022-05-24 20:18:55 +00:00
}
// Process all ranges and send each individual slot to the worker.
go func ( ) {
2022-06-09 21:32:46 +00:00
for {
select {
case <- ctx . Done ( ) :
return
case slots := <- slotsCh :
if slots . startSlot > slots . endSlot {
log . Error ( "We received a batch process request where the startSlot is greater than the end slot." )
errCh <- batchHistoricError {
err : fmt . Errorf ( "We received a startSlot where the start was greater than the end." ) ,
errProcess : "RangeOrder" ,
slot : slots . startSlot ,
}
errCh <- batchHistoricError {
err : fmt . Errorf ( "We received a endSlot where the start was greater than the end." ) ,
errProcess : "RangeOrder" ,
slot : slots . endSlot ,
}
} else if slots . startSlot == slots . endSlot {
log . WithField ( "slot" , slots . startSlot ) . Debug ( "Added new slot to workCh" )
workCh <- slots . startSlot
} else {
for i := slots . startSlot ; i <= slots . endSlot ; i ++ {
workCh <- i
log . WithField ( "slot" , i ) . Debug ( "Added new slot to workCh" )
}
processedCh <- slots
2022-05-24 20:18:55 +00:00
}
}
2022-06-09 21:32:46 +00:00
2022-05-24 20:18:55 +00:00
}
} ( )
// Remove entries, end the application if a row cannot be removed..
go func ( ) {
errG := new ( errgroup . Group )
errG . Go ( func ( ) error {
2022-06-09 21:32:46 +00:00
return bp . removeTableEntry ( ctx , processedCh )
2022-05-24 20:18:55 +00:00
} )
if err := errG . Wait ( ) ; err != nil {
2022-06-03 16:47:13 +00:00
finalErrCh <- [ ] error { err }
2022-05-24 20:18:55 +00:00
}
} ( )
// Process errors from slot processing.
2022-06-09 21:32:46 +00:00
go bp . handleProcessingErrors ( ctx , errCh )
2022-05-24 20:18:55 +00:00
// Get slots from the DB.
go func ( ) {
2022-06-09 21:32:46 +00:00
errs := bp . getSlotRange ( ctx , slotsCh ) // Periodically adds new entries....
2022-05-24 20:18:55 +00:00
if errs != nil {
2022-06-03 16:47:13 +00:00
finalErrCh <- errs
2022-05-24 20:18:55 +00:00
}
2022-06-03 16:47:13 +00:00
finalErrCh <- nil
2022-06-09 21:32:46 +00:00
log . Debug ( "We are stopping the processing of adding new entries" )
2022-05-24 20:18:55 +00:00
} ( )
2022-06-03 16:47:13 +00:00
log . Debug ( "Waiting for shutdown signal from channel" )
select {
2022-06-09 21:32:46 +00:00
case <- ctx . Done ( ) :
2022-06-03 16:47:13 +00:00
log . Debug ( "Received shutdown signal from channel" )
return nil
case errs := <- finalErrCh :
log . Debug ( "Finishing the batchProcess" )
return errs
}
2022-05-24 20:18:55 +00:00
}