Merge pull request #2712 from filecoin-project/frrist/sector-events
refactor(chainwatch): update to work with latest specs-actors miner changes
This commit is contained in:
commit
7c32d49ec6
File diff suppressed because it is too large
Load Diff
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -19,6 +20,7 @@ import (
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util"
|
||||
"github.com/filecoin-project/lotus/lib/parmap"
|
||||
)
|
||||
|
||||
@ -27,7 +29,8 @@ var log = logging.Logger("processor")
|
||||
type Processor struct {
|
||||
db *sql.DB
|
||||
|
||||
node api.FullNode
|
||||
node api.FullNode
|
||||
ctxStore *cw_util.APIIpldStore
|
||||
|
||||
genesisTs *types.TipSet
|
||||
|
||||
@ -50,11 +53,13 @@ type actorInfo struct {
|
||||
state string
|
||||
}
|
||||
|
||||
func NewProcessor(db *sql.DB, node api.FullNode, batch int) *Processor {
|
||||
func NewProcessor(ctx context.Context, db *sql.DB, node api.FullNode, batch int) *Processor {
|
||||
ctxStore := cw_util.NewAPIIpldStore(ctx, node)
|
||||
return &Processor{
|
||||
db: db,
|
||||
node: node,
|
||||
batch: batch,
|
||||
db: db,
|
||||
ctxStore: ctxStore,
|
||||
node: node,
|
||||
batch: batch,
|
||||
}
|
||||
}
|
||||
|
||||
@ -102,17 +107,18 @@ func (p *Processor) Start(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Debugw("Stopping Processor...")
|
||||
log.Info("Stopping Processor...")
|
||||
return
|
||||
default:
|
||||
loopStart := time.Now()
|
||||
toProcess, err := p.unprocessedBlocks(ctx, p.batch)
|
||||
if err != nil {
|
||||
log.Fatalw("Failed to get unprocessed blocks", "error", err)
|
||||
}
|
||||
|
||||
if len(toProcess) == 0 {
|
||||
log.Debugw("No unprocessed blocks. Wait then try again...")
|
||||
time.Sleep(time.Second * 10)
|
||||
log.Info("No unprocessed blocks. Wait then try again...")
|
||||
time.Sleep(time.Second * 30)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -123,6 +129,12 @@ func (p *Processor) Start(ctx context.Context) {
|
||||
if err != nil {
|
||||
log.Fatalw("Failed to collect actor changes", "error", err)
|
||||
}
|
||||
log.Infow("Collected Actor Changes",
|
||||
"MarketChanges", len(actorChanges[builtin.StorageMarketActorCodeID]),
|
||||
"MinerChanges", len(actorChanges[builtin.StorageMinerActorCodeID]),
|
||||
"RewardChanges", len(actorChanges[builtin.RewardActorCodeID]),
|
||||
"AccountChanges", len(actorChanges[builtin.AccountActorCodeID]),
|
||||
"nullRounds", len(nullRounds))
|
||||
|
||||
grp, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
@ -130,6 +142,7 @@ func (p *Processor) Start(ctx context.Context) {
|
||||
if err := p.HandleMarketChanges(ctx, actorChanges[builtin.StorageMarketActorCodeID]); err != nil {
|
||||
return xerrors.Errorf("Failed to handle market changes: %w", err)
|
||||
}
|
||||
log.Info("Processed Market Changes")
|
||||
return nil
|
||||
})
|
||||
|
||||
@ -137,6 +150,7 @@ func (p *Processor) Start(ctx context.Context) {
|
||||
if err := p.HandleMinerChanges(ctx, actorChanges[builtin.StorageMinerActorCodeID]); err != nil {
|
||||
return xerrors.Errorf("Failed to handle miner changes: %w", err)
|
||||
}
|
||||
log.Info("Processed Miner Changes")
|
||||
return nil
|
||||
})
|
||||
|
||||
@ -144,6 +158,7 @@ func (p *Processor) Start(ctx context.Context) {
|
||||
if err := p.HandleRewardChanges(ctx, actorChanges[builtin.RewardActorCodeID], nullRounds); err != nil {
|
||||
return xerrors.Errorf("Failed to handle reward changes: %w", err)
|
||||
}
|
||||
log.Info("Processed Reward Changes")
|
||||
return nil
|
||||
})
|
||||
|
||||
@ -151,6 +166,7 @@ func (p *Processor) Start(ctx context.Context) {
|
||||
if err := p.HandleMessageChanges(ctx, toProcess); err != nil {
|
||||
return xerrors.Errorf("Failed to handle message changes: %w", err)
|
||||
}
|
||||
log.Info("Processed Message Changes")
|
||||
return nil
|
||||
})
|
||||
|
||||
@ -158,6 +174,7 @@ func (p *Processor) Start(ctx context.Context) {
|
||||
if err := p.HandleCommonActorsChanges(ctx, actorChanges); err != nil {
|
||||
return xerrors.Errorf("Failed to handle common actor changes: %w", err)
|
||||
}
|
||||
log.Info("Processed CommonActor Changes")
|
||||
return nil
|
||||
})
|
||||
|
||||
@ -173,6 +190,7 @@ func (p *Processor) Start(ctx context.Context) {
|
||||
if err := p.refreshViews(); err != nil {
|
||||
log.Errorw("Failed to refresh views", "error", err)
|
||||
}
|
||||
log.Infow("Processed Batch", "duration", time.Since(loopStart).String())
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -184,10 +202,6 @@ func (p *Processor) refreshViews() error {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := p.db.Exec(`refresh materialized view miner_sectors_view`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -239,6 +253,15 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C
|
||||
act := act
|
||||
a := a
|
||||
|
||||
// ignore actors that were deleted.
|
||||
has, err := p.node.ChainHasObj(ctx, act.Head)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if !has {
|
||||
continue
|
||||
}
|
||||
|
||||
addr, err := address.NewFromString(a)
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
@ -300,6 +323,8 @@ where rnk <= $1
|
||||
}
|
||||
out := map[cid.Cid]*types.BlockHeader{}
|
||||
|
||||
minBlock := abi.ChainEpoch(math.MaxInt64)
|
||||
maxBlock := abi.ChainEpoch(0)
|
||||
// TODO consider parallel execution here for getting the blocks from the api as is done in fetchMessages()
|
||||
for rows.Next() {
|
||||
if rows.Err() != nil {
|
||||
@ -319,14 +344,23 @@ where rnk <= $1
|
||||
return nil, xerrors.Errorf("Failed to get block header %s: %w", ci.String(), err)
|
||||
}
|
||||
out[ci] = bh
|
||||
if bh.Height < minBlock {
|
||||
minBlock = bh.Height
|
||||
}
|
||||
if bh.Height > maxBlock {
|
||||
maxBlock = bh.Height
|
||||
}
|
||||
}
|
||||
log.Infow("Gathered Blocks to Process", "start", minBlock, "end", maxBlock)
|
||||
return out, rows.Close()
|
||||
}
|
||||
|
||||
func (p *Processor) markBlocksProcessed(ctx context.Context, processed map[cid.Cid]*types.BlockHeader) error {
|
||||
start := time.Now()
|
||||
processedHeight := abi.ChainEpoch(0)
|
||||
defer func() {
|
||||
log.Debugw("Marked blocks as Processed", "duration", time.Since(start).String())
|
||||
log.Infow("Processed Blocks", "height", processedHeight)
|
||||
}()
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
@ -339,7 +373,10 @@ func (p *Processor) markBlocksProcessed(ctx context.Context, processed map[cid.C
|
||||
return err
|
||||
}
|
||||
|
||||
for c := range processed {
|
||||
for c, bh := range processed {
|
||||
if bh.Height > processedHeight {
|
||||
processedHeight = bh.Height
|
||||
}
|
||||
if _, err := stmt.Exec(processedAt, c.String()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -73,7 +73,7 @@ var runCmd = &cli.Command{
|
||||
sync := syncer.NewSyncer(db, api)
|
||||
sync.Start(ctx)
|
||||
|
||||
proc := processor.NewProcessor(db, api, maxBatch)
|
||||
proc := processor.NewProcessor(ctx, db, api, maxBatch)
|
||||
proc.Start(ctx)
|
||||
|
||||
sched := scheduler.PrepareScheduler(db)
|
||||
|
Loading…
Reference in New Issue
Block a user