2022-05-18 16:12:54 +00:00
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
2022-04-26 17:57:01 +00:00
package beaconclient
import (
"context"
"fmt"
2022-09-29 01:39:56 +00:00
"github.com/r3labs/sse/v2"
2022-04-26 17:57:01 +00:00
log "github.com/sirupsen/logrus"
2022-06-09 21:32:46 +00:00
"github.com/vulcanize/ipld-eth-beacon-indexer/pkg/database/sql"
2022-09-29 01:39:56 +00:00
"math/rand"
"time"
2022-04-26 17:57:01 +00:00
)
2022-05-06 15:03:15 +00:00
// TODO: Use prysms config values instead of hardcoding them here.
2022-04-26 17:57:01 +00:00
var (
2022-05-06 15:03:15 +00:00
bcHealthEndpoint = "/eth/v1/node/health" // Endpoint used for the healthcheck
2022-05-09 18:44:27 +00:00
BcHeadTopicEndpoint = "/eth/v1/events?topics=head" // Endpoint used to subscribe to the head of the chain
2022-05-06 15:03:15 +00:00
bcReorgTopicEndpoint = "/eth/v1/events?topics=chain_reorg" // Endpoint used to subscribe to the head of the chain
2022-05-09 18:44:27 +00:00
BcBlockQueryEndpoint = "/eth/v2/beacon/blocks/" // Endpoint to query individual Blocks
BcStateQueryEndpoint = "/eth/v2/debug/beacon/states/" // Endpoint to query individual States
2022-05-19 13:46:38 +00:00
BcSyncStatusEndpoint = "/eth/v1/node/syncing" // The endpoint to check to see if the beacon server is still trying to sync to head.
LhDbInfoEndpoint = "/lighthouse/database/info" // The endpoint for the LIGHTHOUSE server to get the database information.
2022-05-13 12:48:31 +00:00
BcBlockRootEndpoint = func ( slot string ) string {
return "/eth/v1/beacon/blocks/" + slot + "/root"
}
2022-09-29 01:39:56 +00:00
bcSlotsPerEpoch uint64 = 32 // Number of slots in a single Epoch
2022-05-06 15:03:15 +00:00
//bcSlotPerHistoricalVector = 8192 // The number of slots in a historic vector.
//bcFinalizedTopicEndpoint = "/eth/v1/events?topics=finalized_checkpoint" // Endpoint used to subscribe to the head of the chain
2022-04-26 17:57:01 +00:00
)
// A struct that capture the Beacon Server that the Beacon Client will be interacting with and querying.
type BeaconClient struct {
2022-09-29 01:39:56 +00:00
Context context . Context // A context generic context with multiple uses.
ServerEndpoint string // What is the endpoint of the beacon server.
Db sql . Database // Database object used for reads and writes.
Metrics * BeaconClientMetrics // An object used to keep track of certain BeaconClient Metrics.
KnownGapTableIncrement int // The max number of slots within a single known_gaps table entry.
UniqueNodeIdentifier int // The unique identifier within the cluster of this individual node.
KnownGapsProcess KnownGapsProcessing // object keeping track of knowngaps processing
CheckDb bool // Should we check the DB to see if the slot exists before processing it?
PerformBeaconStateProcessing bool // Should we process BeaconStates?
PerformBeaconBlockProcessing bool // Should we process BeaconBlocks?
2022-05-06 15:03:15 +00:00
// Used for Head Tracking
2022-05-19 13:46:38 +00:00
2022-05-06 15:03:15 +00:00
PerformHeadTracking bool // Should we track head?
2022-09-29 01:39:56 +00:00
StartingSlot Slot // If we're performing head tracking. What is the first slot we processed.
PreviousSlot Slot // Whats the previous slot we processed
2022-05-06 15:03:15 +00:00
PreviousBlockRoot string // Whats the previous block root, used to check the next blocks parent.
HeadTracking * SseEvents [ Head ] // Track the head block
ReOrgTracking * SseEvents [ ChainReorg ] // Track all Reorgs
//FinalizationTracking *SseEvents[FinalizedCheckpoint] // Track all finalization checkpoints
2022-05-19 13:46:38 +00:00
// Used for Historical Processing
// The latest available slot within the Beacon Server. We can't query any slot greater than this.
// This value is lazily updated. Therefore at times it will be outdated.
2022-05-24 20:18:55 +00:00
LatestSlotInBeaconServer int64
2022-06-03 16:47:13 +00:00
PerformHistoricalProcessing bool // Should we perform historical processing?
2022-06-09 21:32:46 +00:00
HistoricalProcess HistoricProcessing // object keeping track of historical processing
2022-04-26 17:57:01 +00:00
}
// A struct to keep track of relevant the head event topic.
type SseEvents [ P ProcessedEvents ] struct {
2022-04-27 18:01:59 +00:00
Endpoint string // The endpoint for the subscription. Primarily used for logging
2022-04-26 17:57:01 +00:00
MessagesCh chan * sse . Event // Contains all the messages from the SSE Channel
ErrorCh chan * SseError // Contains any errors while SSE streaming occurred
ProcessCh chan * P // Used to capture processed data in its proper struct.
2022-09-29 01:39:56 +00:00
sseClient * sse . Client // sse.Client object that is used to interact with the SSE stream
2022-04-26 17:57:01 +00:00
}
// An object to capture any errors when turning an SSE message to JSON.
type SseError struct {
err error
msg [ ] byte
}
// A Function to create the BeaconClient.
2022-09-29 01:39:56 +00:00
func CreateBeaconClient ( ctx context . Context , connectionProtocol string , bcAddress string , bcPort int ,
bcKgTableIncrement int , uniqueNodeIdentifier int , checkDb bool , performBeaconBlockProcessing bool , performBeaconStateProcessing bool ) ( * BeaconClient , error ) {
2022-06-03 16:47:13 +00:00
if uniqueNodeIdentifier == 0 {
uniqueNodeIdentifier := rand . Int ( )
log . WithField ( "randomUniqueNodeIdentifier" , uniqueNodeIdentifier ) . Warn ( "No uniqueNodeIdentifier provided, we are going to use a randomly generated one." )
}
metrics , err := CreateBeaconClientMetrics ( )
if err != nil {
return nil , err
}
2022-04-27 18:01:59 +00:00
endpoint := fmt . Sprintf ( "%s://%s:%d" , connectionProtocol , bcAddress , bcPort )
2022-04-26 17:57:01 +00:00
log . Info ( "Creating the BeaconClient" )
return & BeaconClient {
2022-09-29 01:39:56 +00:00
Context : ctx ,
ServerEndpoint : endpoint ,
KnownGapTableIncrement : bcKgTableIncrement ,
HeadTracking : createSseEvent [ Head ] ( endpoint , BcHeadTopicEndpoint ) ,
ReOrgTracking : createSseEvent [ ChainReorg ] ( endpoint , bcReorgTopicEndpoint ) ,
Metrics : metrics ,
UniqueNodeIdentifier : uniqueNodeIdentifier ,
CheckDb : checkDb ,
PerformBeaconBlockProcessing : performBeaconBlockProcessing ,
PerformBeaconStateProcessing : performBeaconStateProcessing ,
2022-05-06 15:03:15 +00:00
//FinalizationTracking: createSseEvent[FinalizedCheckpoint](endpoint, bcFinalizedTopicEndpoint),
2022-06-03 16:47:13 +00:00
} , nil
2022-04-26 17:57:01 +00:00
}
// Create all the channels to handle a SSE events
2022-04-27 18:01:59 +00:00
func createSseEvent [ P ProcessedEvents ] ( baseEndpoint string , path string ) * SseEvents [ P ] {
endpoint := baseEndpoint + path
2022-04-26 17:57:01 +00:00
sseEvents := & SseEvents [ P ] {
2022-04-27 18:01:59 +00:00
Endpoint : endpoint ,
2022-04-28 18:50:29 +00:00
MessagesCh : make ( chan * sse . Event , 1 ) ,
2022-04-26 17:57:01 +00:00
ErrorCh : make ( chan * SseError ) ,
ProcessCh : make ( chan * P ) ,
}
return sseEvents
}
2022-09-29 01:39:56 +00:00
func ( se * SseEvents [ P ] ) Connect ( ) error {
if nil == se . sseClient {
se . initClient ( )
}
return se . sseClient . SubscribeChanRaw ( se . MessagesCh )
}
func ( se * SseEvents [ P ] ) Disconnect ( ) {
if nil == se . sseClient {
return
}
log . WithFields ( log . Fields { "endpoint" : se . Endpoint } ) . Info ( "Disconnecting and destroying SSE client" )
se . sseClient . Unsubscribe ( se . MessagesCh )
se . sseClient . Connection . CloseIdleConnections ( )
se . sseClient = nil
}
func ( se * SseEvents [ P ] ) initClient ( ) {
if nil != se . sseClient {
se . Disconnect ( )
}
log . WithFields ( log . Fields { "endpoint" : se . Endpoint } ) . Info ( "Creating SSE client" )
client := sse . NewClient ( se . Endpoint )
client . ReconnectNotify = func ( err error , duration time . Duration ) {
log . WithFields ( log . Fields { "endpoint" : se . Endpoint } ) . Debug ( "Reconnecting SSE client" )
}
client . OnDisconnect ( func ( c * sse . Client ) {
log . WithFields ( log . Fields { "endpoint" : se . Endpoint } ) . Debug ( "SSE client disconnected" )
} )
se . sseClient = client
}