statediff: Use a worker pool #43
@ -191,8 +191,17 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
|||||||
} else {
|
} else {
|
||||||
utils.Fatalf("Must specify client name for statediff DB output")
|
utils.Fatalf("Must specify client name for statediff DB output")
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if ctx.GlobalBool(utils.StateDiffWritingFlag.Name) {
|
||||||
|
utils.Fatalf("Must pass DB parameters if enabling statediff write loop")
|
||||||
}
|
}
|
||||||
utils.RegisterStateDiffService(stack, backend, dbParams, ctx.GlobalBool(utils.StateDiffWritingFlag.Name))
|
}
|
||||||
|
params := statediff.ServiceParams{
|
||||||
|
DBParams: dbParams,
|
||||||
|
EnableWriteLoop: ctx.GlobalBool(utils.StateDiffWritingFlag.Name),
|
||||||
|
NumWorkers: ctx.GlobalUint(utils.StateDiffWorkersFlag.Name),
|
||||||
|
}
|
||||||
|
utils.RegisterStateDiffService(stack, backend, params)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Configure GraphQL if requested
|
// Configure GraphQL if requested
|
||||||
|
@ -162,6 +162,7 @@ var (
|
|||||||
utils.StateDiffDBNodeIDFlag,
|
utils.StateDiffDBNodeIDFlag,
|
||||||
utils.StateDiffDBClientNameFlag,
|
utils.StateDiffDBClientNameFlag,
|
||||||
utils.StateDiffWritingFlag,
|
utils.StateDiffWritingFlag,
|
||||||
|
utils.StateDiffWorkersFlag,
|
||||||
configFileFlag,
|
configFileFlag,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -242,6 +242,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
|
|||||||
utils.StateDiffDBNodeIDFlag,
|
utils.StateDiffDBNodeIDFlag,
|
||||||
utils.StateDiffDBClientNameFlag,
|
utils.StateDiffDBClientNameFlag,
|
||||||
utils.StateDiffWritingFlag,
|
utils.StateDiffWritingFlag,
|
||||||
|
utils.StateDiffWorkersFlag,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -746,6 +746,10 @@ var (
|
|||||||
Name: "statediff.writing",
|
Name: "statediff.writing",
|
||||||
Usage: "Activates progressive writing of state diffs to database as new block are synced",
|
Usage: "Activates progressive writing of state diffs to database as new block are synced",
|
||||||
}
|
}
|
||||||
|
StateDiffWorkersFlag = cli.UintFlag{
|
||||||
|
Name: "statediff.workers",
|
||||||
|
Usage: "Number of concurrent workers to use during statediff processing (0 = 1)",
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// MakeDataDir retrieves the currently requested data directory, terminating
|
// MakeDataDir retrieves the currently requested data directory, terminating
|
||||||
@ -1744,9 +1748,8 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.C
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RegisterStateDiffService configures and registers a service to stream state diff data over RPC
|
// RegisterStateDiffService configures and registers a service to stream state diff data over RPC
|
||||||
// dbParams are: Postgres connection URI, Node ID, client name
|
func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, params statediff.ServiceParams) {
|
||||||
func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, dbParams *statediff.DBParams, startWriteLoop bool) {
|
if err := statediff.New(stack, ethServ, params); err != nil {
|
||||||
if err := statediff.New(stack, ethServ, dbParams, startWriteLoop); err != nil {
|
|
||||||
Fatalf("Failed to register the Statediff service: %v", err)
|
Fatalf("Failed to register the Statediff service: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,7 +8,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
indexerNamespace = "indexer"
|
namespace = "statediff"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Build a fully qualified metric name
|
// Build a fully qualified metric name
|
||||||
@ -16,9 +16,9 @@ func metricName(subsystem, name string) string {
|
|||||||
if name == "" {
|
if name == "" {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
parts := []string{indexerNamespace, name}
|
parts := []string{namespace, name}
|
||||||
if subsystem != "" {
|
if subsystem != "" {
|
||||||
parts = []string{indexerNamespace, subsystem, name}
|
parts = []string{namespace, subsystem, name}
|
||||||
}
|
}
|
||||||
// Prometheus uses _ but geth metrics uses / and replaces
|
// Prometheus uses _ but geth metrics uses / and replaces
|
||||||
return strings.Join(parts, "/")
|
return strings.Join(parts, "/")
|
||||||
@ -57,7 +57,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
|
|||||||
tTxAndRecProcessing: metrics.NewTimer(),
|
tTxAndRecProcessing: metrics.NewTimer(),
|
||||||
tStateStoreCodeProcessing: metrics.NewTimer(),
|
tStateStoreCodeProcessing: metrics.NewTimer(),
|
||||||
}
|
}
|
||||||
subsys := "" // todo
|
subsys := "indexer"
|
||||||
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
|
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
|
||||||
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
|
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
|
||||||
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
|
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
|
||||||
|
54
statediff/metrics.go
Normal file
54
statediff/metrics.go
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
package statediff
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
namespace = "statediff"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Build a fully qualified metric name
|
||||||
|
func metricName(subsystem, name string) string {
|
||||||
|
if name == "" {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
parts := []string{namespace, name}
|
||||||
|
if subsystem != "" {
|
||||||
|
parts = []string{namespace, subsystem, name}
|
||||||
|
}
|
||||||
|
// Prometheus uses _ but geth metrics uses / and replaces
|
||||||
|
return strings.Join(parts, "/")
|
||||||
|
}
|
||||||
|
|
||||||
|
type statediffMetricsHandles struct {
|
||||||
|
// Height of latest synced by core.BlockChain
|
||||||
|
// FIXME
|
||||||
|
lastSyncHeight metrics.Gauge
|
||||||
|
// Height of the latest block received from chainEvent channel
|
||||||
|
lastEventHeight metrics.Gauge
|
||||||
|
// Height of latest state diff
|
||||||
|
lastStatediffHeight metrics.Gauge
|
||||||
|
// Current length of chainEvent channels
|
||||||
|
serviceLoopChannelLen metrics.Gauge
|
||||||
|
writeLoopChannelLen metrics.Gauge
|
||||||
|
}
|
||||||
|
|
||||||
|
func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles {
|
||||||
|
ctx := statediffMetricsHandles{
|
||||||
|
lastSyncHeight: metrics.NewGauge(),
|
||||||
|
lastEventHeight: metrics.NewGauge(),
|
||||||
|
lastStatediffHeight: metrics.NewGauge(),
|
||||||
|
serviceLoopChannelLen: metrics.NewGauge(),
|
||||||
|
writeLoopChannelLen: metrics.NewGauge(),
|
||||||
|
}
|
||||||
|
subsys := "service"
|
||||||
|
reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight)
|
||||||
|
reg.Register(metricName(subsys, "last_event_height"), ctx.lastEventHeight)
|
||||||
|
reg.Register(metricName(subsys, "last_statediff_height"), ctx.lastStatediffHeight)
|
||||||
|
reg.Register(metricName(subsys, "service_loop_channel_len"), ctx.serviceLoopChannelLen)
|
||||||
|
reg.Register(metricName(subsys, "write_loop_channel_len"), ctx.writeLoopChannelLen)
|
||||||
|
return ctx
|
||||||
|
}
|
@ -22,7 +22,6 @@ import (
|
|||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core"
|
"github.com/ethereum/go-ethereum/core"
|
||||||
@ -32,6 +31,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/eth"
|
"github.com/ethereum/go-ethereum/eth"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
"github.com/ethereum/go-ethereum/node"
|
"github.com/ethereum/go-ethereum/node"
|
||||||
"github.com/ethereum/go-ethereum/p2p"
|
"github.com/ethereum/go-ethereum/p2p"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
@ -55,6 +55,8 @@ var writeLoopParams = Params{
|
|||||||
IncludeCode: true,
|
IncludeCode: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var statediffMetrics = RegisterStatediffMetrics(metrics.DefaultRegistry)
|
||||||
|
|
||||||
type blockChain interface {
|
type blockChain interface {
|
||||||
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
|
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
|
||||||
GetBlockByHash(hash common.Hash) *types.Block
|
GetBlockByHash(hash common.Hash) *types.Block
|
||||||
@ -89,6 +91,15 @@ type IService interface {
|
|||||||
WriteLoop(chainEventCh chan core.ChainEvent)
|
WriteLoop(chainEventCh chan core.ChainEvent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wraps consructor parameters
|
||||||
|
type ServiceParams struct {
|
||||||
|
DBParams *DBParams
|
||||||
|
// Whether to enable writing state diffs directly to track blochain head
|
||||||
|
EnableWriteLoop bool
|
||||||
|
// Size of the worker pool
|
||||||
|
NumWorkers uint
|
||||||
|
}
|
||||||
|
|
||||||
// Service is the underlying struct for the state diffing service
|
// Service is the underlying struct for the state diffing service
|
||||||
type Service struct {
|
type Service struct {
|
||||||
// Used to sync access to the Subscriptions
|
// Used to sync access to the Subscriptions
|
||||||
@ -104,41 +115,56 @@ type Service struct {
|
|||||||
// A mapping of subscription params rlp hash to the corresponding subscription params
|
// A mapping of subscription params rlp hash to the corresponding subscription params
|
||||||
SubscriptionTypes map[common.Hash]Params
|
SubscriptionTypes map[common.Hash]Params
|
||||||
// Cache the last block so that we can avoid having to lookup the next block's parent
|
// Cache the last block so that we can avoid having to lookup the next block's parent
|
||||||
lastBlock lastBlockCache
|
BlockCache blockCache
|
||||||
|
|||||||
// Whether or not we have any subscribers; only if we do, do we processes state diffs
|
// Whether or not we have any subscribers; only if we do, do we processes state diffs
|
||||||
subscribers int32
|
subscribers int32
|
||||||
// Interface for publishing statediffs as PG-IPLD objects
|
// Interface for publishing statediffs as PG-IPLD objects
|
||||||
indexer ind.Indexer
|
indexer ind.Indexer
|
||||||
// Whether to enable writing state diffs directly to track blochain head
|
// Whether to enable writing state diffs directly to track blochain head
|
||||||
enableWriteLoop bool
|
enableWriteLoop bool
|
||||||
|
// Size of the worker pool
|
||||||
|
numWorkers uint
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wrap the cached last block for safe access from different service loops
|
// Wrap the cached last block for safe access from different service loops
|
||||||
type lastBlockCache struct {
|
type blockCache struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
block *types.Block
|
blocks map[common.Hash]*types.Block
|
||||||
|
maxSize uint
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBlockCache(max uint) blockCache {
|
||||||
|
return blockCache{
|
||||||
|
blocks: make(map[common.Hash]*types.Block),
|
||||||
|
maxSize: max,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new statediff.Service
|
// New creates a new statediff.Service
|
||||||
func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error {
|
// func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWriteLoop bool) error {
|
||||||
|
func New(stack *node.Node, ethServ *eth.Ethereum, params ServiceParams) error {
|
||||||
blockChain := ethServ.BlockChain()
|
blockChain := ethServ.BlockChain()
|
||||||
var indexer ind.Indexer
|
var indexer ind.Indexer
|
||||||
if dbParams != nil {
|
if params.DBParams != nil {
|
||||||
info := nodeinfo.Info{
|
info := nodeinfo.Info{
|
||||||
GenesisBlock: blockChain.Genesis().Hash().Hex(),
|
GenesisBlock: blockChain.Genesis().Hash().Hex(),
|
||||||
NetworkID: strconv.FormatUint(ethServ.NetVersion(), 10),
|
NetworkID: strconv.FormatUint(ethServ.NetVersion(), 10),
|
||||||
ChainID: blockChain.Config().ChainID.Uint64(),
|
ChainID: blockChain.Config().ChainID.Uint64(),
|
||||||
ID: dbParams.ID,
|
ID: params.DBParams.ID,
|
||||||
ClientName: dbParams.ClientName,
|
ClientName: params.DBParams.ClientName,
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: pass max idle, open, lifetime?
|
// TODO: pass max idle, open, lifetime?
|
||||||
db, err := postgres.NewDB(dbParams.ConnectionURL, postgres.ConnectionConfig{}, info)
|
db, err := postgres.NewDB(params.DBParams.ConnectionURL, postgres.ConnectionConfig{}, info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
indexer = ind.NewStateDiffIndexer(blockChain.Config(), db)
|
indexer = ind.NewStateDiffIndexer(blockChain.Config(), db)
|
||||||
}
|
}
|
||||||
|
workers := params.NumWorkers
|
||||||
|
if workers == 0 {
|
||||||
|
workers = 1
|
||||||
|
}
|
||||||
sds := &Service{
|
sds := &Service{
|
||||||
Mutex: sync.Mutex{},
|
Mutex: sync.Mutex{},
|
||||||
BlockChain: blockChain,
|
BlockChain: blockChain,
|
||||||
@ -146,8 +172,10 @@ func New(stack *node.Node, ethServ *eth.Ethereum, dbParams *DBParams, enableWrit
|
|||||||
QuitChan: make(chan bool),
|
QuitChan: make(chan bool),
|
||||||
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
|
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
|
||||||
SubscriptionTypes: make(map[common.Hash]Params),
|
SubscriptionTypes: make(map[common.Hash]Params),
|
||||||
|
BlockCache: NewBlockCache(workers),
|
||||||
indexer: indexer,
|
indexer: indexer,
|
||||||
enableWriteLoop: enableWriteLoop,
|
enableWriteLoop: params.EnableWriteLoop,
|
||||||
|
numWorkers: workers,
|
||||||
}
|
}
|
||||||
stack.RegisterLifecycle(sds)
|
stack.RegisterLifecycle(sds)
|
||||||
stack.RegisterAPIs(sds.APIs())
|
stack.RegisterAPIs(sds.APIs())
|
||||||
@ -171,46 +199,88 @@ func (sds *Service) APIs() []rpc.API {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lbc *lastBlockCache) replace(currentBlock *types.Block, bc blockChain) *types.Block {
|
// Return the parent block of currentBlock, using the cached block if available;
|
||||||
|
// and cache the passed block
|
||||||
|
func (lbc *blockCache) getParentBlock(currentBlock *types.Block, bc blockChain) *types.Block {
|
||||||
lbc.Lock()
|
lbc.Lock()
|
||||||
parentHash := currentBlock.ParentHash()
|
parentHash := currentBlock.ParentHash()
|
||||||
var parentBlock *types.Block
|
var parentBlock *types.Block
|
||||||
if lbc.block != nil && bytes.Equal(lbc.block.Hash().Bytes(), parentHash.Bytes()) {
|
if block, ok := lbc.blocks[parentHash]; ok {
|
||||||
parentBlock = lbc.block
|
parentBlock = block
|
||||||
|
if len(lbc.blocks) > int(lbc.maxSize) {
|
||||||
|
delete(lbc.blocks, parentHash)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
parentBlock = bc.GetBlockByHash(parentHash)
|
parentBlock = bc.GetBlockByHash(parentHash)
|
||||||
}
|
}
|
||||||
lbc.block = currentBlock
|
lbc.blocks[currentBlock.Hash()] = currentBlock
|
||||||
lbc.Unlock()
|
lbc.Unlock()
|
||||||
return parentBlock
|
return parentBlock
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type workerParams struct {
|
||||||
|
chainEventCh <-chan core.ChainEvent
|
||||||
|
errCh <-chan error
|
||||||
|
wg *sync.WaitGroup
|
||||||
|
id uint
|
||||||
|
}
|
||||||
|
|
||||||
func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
||||||
chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
|
chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
|
||||||
defer chainEventSub.Unsubscribe()
|
defer chainEventSub.Unsubscribe()
|
||||||
errCh := chainEventSub.Err()
|
errCh := chainEventSub.Err()
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
// Process metrics for chain events, then forward to workers
|
||||||
|
chainEventFwd := make(chan core.ChainEvent, chainEventChanSize)
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case chainEvent := <-chainEventCh:
|
||||||
|
statediffMetrics.lastEventHeight.Update(int64(chainEvent.Block.Number().Uint64()))
|
||||||
|
statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
|
||||||
|
chainEventFwd <- chainEvent
|
||||||
|
case <-sds.QuitChan:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
wg.Add(int(sds.numWorkers))
|
||||||
|
for worker := uint(0); worker < sds.numWorkers; worker++ {
|
||||||
|
params := workerParams{chainEventCh: chainEventFwd, errCh: errCh, wg: &wg, id: worker}
|
||||||
|
go sds.writeLoopWorker(params)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sds *Service) writeLoopWorker(params workerParams) {
|
||||||
|
defer params.wg.Done()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
//Notify chain event channel of events
|
//Notify chain event channel of events
|
||||||
case chainEvent := <-chainEventCh:
|
case chainEvent := <-params.chainEventCh:
|
||||||
log.Debug("(WriteLoop) Event received from chainEventCh", "event", chainEvent)
|
log.Debug("WriteLoop(): chain event received", "event", chainEvent)
|
||||||
currentBlock := chainEvent.Block
|
currentBlock := chainEvent.Block
|
||||||
parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain)
|
parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain)
|
||||||
if parentBlock == nil {
|
if parentBlock == nil {
|
||||||
log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number())
|
log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
|
||||||
err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams)
|
err := sds.writeStateDiff(currentBlock, parentBlock.Root(), writeLoopParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("statediff (DB write) processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error())
|
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
case err := <-errCh:
|
// TODO: how to handle with concurrent workers
|
||||||
log.Warn("Error from chain event subscription", "error", err)
|
statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64()))
|
||||||
|
case err := <-params.errCh:
|
||||||
|
log.Warn("Error from chain event subscription", "error", err, "worker", params.id)
|
||||||
sds.close()
|
sds.close()
|
||||||
return
|
return
|
||||||
case <-sds.QuitChan:
|
case <-sds.QuitChan:
|
||||||
log.Info("Quitting the statediff writing process")
|
log.Info("Quitting the statediff writing process", "worker", params.id)
|
||||||
sds.close()
|
sds.close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -226,16 +296,17 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
|
|||||||
select {
|
select {
|
||||||
//Notify chain event channel of events
|
//Notify chain event channel of events
|
||||||
case chainEvent := <-chainEventCh:
|
case chainEvent := <-chainEventCh:
|
||||||
log.Debug("Event received from chainEventCh", "event", chainEvent)
|
statediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh)))
|
||||||
|
log.Debug("Loop(): chain event received", "event", chainEvent)
|
||||||
// if we don't have any subscribers, do not process a statediff
|
// if we don't have any subscribers, do not process a statediff
|
||||||
if atomic.LoadInt32(&sds.subscribers) == 0 {
|
if atomic.LoadInt32(&sds.subscribers) == 0 {
|
||||||
log.Debug("Currently no subscribers to the statediffing service; processing is halted")
|
log.Debug("Currently no subscribers to the statediffing service; processing is halted")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
currentBlock := chainEvent.Block
|
currentBlock := chainEvent.Block
|
||||||
parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain)
|
parentBlock := sds.BlockCache.getParentBlock(currentBlock, sds.BlockChain)
|
||||||
if parentBlock == nil {
|
if parentBlock == nil {
|
||||||
log.Error("Parent block is nil, skipping this block", "number", currentBlock.Number())
|
log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number())
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
sds.streamStateDiff(currentBlock, parentBlock.Root())
|
sds.streamStateDiff(currentBlock, parentBlock.Root())
|
||||||
@ -414,8 +485,8 @@ func (sds *Service) Start() error {
|
|||||||
|
|
||||||
if sds.enableWriteLoop {
|
if sds.enableWriteLoop {
|
||||||
log.Info("Starting statediff DB write loop", "params", writeLoopParams)
|
log.Info("Starting statediff DB write loop", "params", writeLoopParams)
|
||||||
go sds.WriteLoop(make(chan core.ChainEvent, chainEventChanSize))
|
chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
|
||||||
go sds.indexer.ReportDBMetrics(5*time.Second, sds.QuitChan)
|
go sds.WriteLoop(chainEventCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -473,7 +544,7 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- Cod
|
|||||||
log.Info("sending code and codehash", "block height", blockNumber)
|
log.Info("sending code and codehash", "block height", blockNumber)
|
||||||
currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root())
|
currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("error creating trie for block", "number", current.Number(), "err", err)
|
log.Error("error creating trie for block", "block height", current.Number(), "err", err)
|
||||||
close(quitChan)
|
close(quitChan)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -521,7 +592,7 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
|
|||||||
|
|
||||||
// Writes a state diff from the current block, parent state root, and provided params
|
// Writes a state diff from the current block, parent state root, and provided params
|
||||||
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
|
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
|
||||||
log.Info("Writing state diff", "block height", block.Number().Uint64())
|
// log.Info("Writing state diff", "block height", block.Number().Uint64())
|
||||||
var totalDifficulty *big.Int
|
var totalDifficulty *big.Int
|
||||||
var receipts types.Receipts
|
var receipts types.Receipts
|
||||||
if params.IncludeTD {
|
if params.IncludeTD {
|
||||||
|
@ -94,6 +94,7 @@ func testErrorInChainEventLoop(t *testing.T) {
|
|||||||
QuitChan: serviceQuit,
|
QuitChan: serviceQuit,
|
||||||
Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription),
|
Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription),
|
||||||
SubscriptionTypes: make(map[common.Hash]statediff.Params),
|
SubscriptionTypes: make(map[common.Hash]statediff.Params),
|
||||||
|
BlockCache: statediff.NewBlockCache(1),
|
||||||
}
|
}
|
||||||
payloadChan := make(chan statediff.Payload, 2)
|
payloadChan := make(chan statediff.Payload, 2)
|
||||||
quitChan := make(chan bool)
|
quitChan := make(chan bool)
|
||||||
@ -177,6 +178,7 @@ func testErrorInBlockLoop(t *testing.T) {
|
|||||||
QuitChan: make(chan bool),
|
QuitChan: make(chan bool),
|
||||||
Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription),
|
Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription),
|
||||||
SubscriptionTypes: make(map[common.Hash]statediff.Params),
|
SubscriptionTypes: make(map[common.Hash]statediff.Params),
|
||||||
|
BlockCache: statediff.NewBlockCache(1),
|
||||||
}
|
}
|
||||||
payloadChan := make(chan statediff.Payload)
|
payloadChan := make(chan statediff.Payload)
|
||||||
quitChan := make(chan bool)
|
quitChan := make(chan bool)
|
||||||
@ -256,6 +258,7 @@ func testErrorInStateDiffAt(t *testing.T) {
|
|||||||
QuitChan: make(chan bool),
|
QuitChan: make(chan bool),
|
||||||
Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription),
|
Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription),
|
||||||
SubscriptionTypes: make(map[common.Hash]statediff.Params),
|
SubscriptionTypes: make(map[common.Hash]statediff.Params),
|
||||||
|
BlockCache: statediff.NewBlockCache(1),
|
||||||
}
|
}
|
||||||
stateDiffPayload, err := service.StateDiffAt(testBlock1.NumberU64(), defaultParams)
|
stateDiffPayload, err := service.StateDiffAt(testBlock1.NumberU64(), defaultParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user
I like the params getting passed together like this!