41f4f1fd83
- When chainwatch is ran it will first start a Syncer that continuously collects blocks from the ChainNotify channel and persists them to the blocks_synced table. Once the Syncer has caught the blocks_synced table up to the lotus daemons current head a Processor is started. The Processor selects a batch of contiguous blocks and extracts and stores their data. It attempts to do as much work as it can in parallel. When the blocks are done being processed their corresponding processed_at and is_processed fields in the blocks_synced table are filled out.
327 lines
8.0 KiB
Go
327 lines
8.0 KiB
Go
package processor
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-address"
|
|
"github.com/ipfs/go-cid"
|
|
logging "github.com/ipfs/go-log/v2"
|
|
|
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
|
"github.com/filecoin-project/specs-actors/actors/builtin"
|
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/lib/parmap"
|
|
)
|
|
|
|
var log = logging.Logger("processor")
|
|
|
|
type Processor struct {
|
|
db *sql.DB
|
|
|
|
node api.FullNode
|
|
|
|
// number of blocks processed at a time
|
|
batch int
|
|
}
|
|
|
|
type ActorTips map[types.TipSetKey][]actorInfo
|
|
|
|
type actorInfo struct {
|
|
act types.Actor
|
|
|
|
stateroot cid.Cid
|
|
height abi.ChainEpoch // so that we can walk the actor changes in chronological order.
|
|
|
|
tsKey types.TipSetKey
|
|
parentTsKey types.TipSetKey
|
|
|
|
addr address.Address
|
|
state string
|
|
}
|
|
|
|
func NewProcessor(db *sql.DB, node api.FullNode, batch int) *Processor {
|
|
return &Processor{
|
|
db: db,
|
|
node: node,
|
|
batch: batch,
|
|
}
|
|
}
|
|
|
|
func (p *Processor) setupSchemas() error {
|
|
if err := p.setupMarket(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := p.setupMiners(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := p.setupRewards(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := p.setupMessages(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := p.setupCommonActors(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Processor) Start(ctx context.Context) {
|
|
log.Info("Starting Processor")
|
|
|
|
if err := p.setupSchemas(); err != nil {
|
|
log.Fatalw("Failed to setup processor", "error", err)
|
|
}
|
|
|
|
go p.subMpool(ctx)
|
|
|
|
// main processor loop
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Infow("Stopping Processor...")
|
|
return
|
|
default:
|
|
toProcess, err := p.unprocessedBlocks(ctx, p.batch)
|
|
if err != nil {
|
|
log.Fatalw("Failed to get unprocessed blocks", "error", err)
|
|
}
|
|
|
|
// TODO special case genesis state handling here to avoid all the special cases that will be needed for it else where
|
|
// before doing "normal" processing.
|
|
|
|
actorChanges, err := p.collectActorChanges(ctx, toProcess)
|
|
if err != nil {
|
|
log.Fatalw("Failed to collect actor changes", "error", err)
|
|
}
|
|
|
|
grp, ctx := errgroup.WithContext(ctx)
|
|
|
|
grp.Go(func() error {
|
|
if err := p.HandleMarketChanges(ctx, actorChanges[builtin.StorageMarketActorCodeID]); err != nil {
|
|
return xerrors.Errorf("Failed to handle market changes: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
grp.Go(func() error {
|
|
if err := p.HandleMinerChanges(ctx, actorChanges[builtin.StorageMinerActorCodeID]); err != nil {
|
|
return xerrors.Errorf("Failed to handle miner changes: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
grp.Go(func() error {
|
|
if err := p.HandleRewardChanges(ctx, actorChanges[builtin.RewardActorCodeID]); err != nil {
|
|
return xerrors.Errorf("Failed to handle reward changes: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
grp.Go(func() error {
|
|
if err := p.HandleMessageChanges(ctx, toProcess); err != nil {
|
|
return xerrors.Errorf("Failed to handle message changes: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
grp.Go(func() error {
|
|
if err := p.HandleCommonActorsChanges(ctx, actorChanges); err != nil {
|
|
return xerrors.Errorf("Failed to handle common actor changes: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err := grp.Wait(); err != nil {
|
|
log.Errorw("Failed to handle actor changes...retrying", "error", err)
|
|
continue
|
|
}
|
|
|
|
if err := p.markBlocksProcessed(ctx, toProcess); err != nil {
|
|
log.Fatalw("Failed to mark blocks as processed", "error", err)
|
|
}
|
|
|
|
if err := p.refreshViews(); err != nil {
|
|
log.Errorw("Failed to refresh views", "error", err)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
}
|
|
|
|
func (p *Processor) refreshViews() error {
|
|
if _, err := p.db.Exec(`refresh materialized view state_heights`); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.Cid]*types.BlockHeader) (map[cid.Cid]ActorTips, error) {
|
|
start := time.Now()
|
|
defer func() {
|
|
log.Infow("Collected Actor Changes", "duration", time.Since(start).String())
|
|
}()
|
|
// ActorCode - > tipset->[]actorInfo
|
|
out := map[cid.Cid]ActorTips{}
|
|
var outMu sync.Mutex
|
|
|
|
// map of addresses to changed actors
|
|
var changes map[string]types.Actor
|
|
actorsSeen := map[cid.Cid]struct{}{}
|
|
|
|
// collect all actor state that has changes between block headers
|
|
paDone := 0
|
|
parmap.Par(50, parmap.MapArr(toProcess), func(bh *types.BlockHeader) {
|
|
paDone++
|
|
if paDone%100 == 0 {
|
|
log.Infow("Collecting actor changes", "done", paDone, "percent", (paDone*100)/len(toProcess))
|
|
}
|
|
|
|
pts, err := p.node.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// collect all actors that had state changes between the blockheader parent-state and its grandparent-state.
|
|
// TODO: changes will contain deleted actors, this causes needless processing further down the pipeline, consider
|
|
// a separate strategy for deleted actors
|
|
changes, err = p.node.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// record the state of all actors that have changed
|
|
for a, act := range changes {
|
|
act := act
|
|
a := a
|
|
|
|
addr, err := address.NewFromString(a)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
ast, err := p.node.StateReadState(ctx, addr, pts.Key())
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// TODO look here for an empty state, maybe thats a sign the actor was deleted?
|
|
|
|
state, err := json.Marshal(ast.State)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
outMu.Lock()
|
|
if _, ok := actorsSeen[act.Head]; !ok {
|
|
_, ok := out[act.Code]
|
|
if !ok {
|
|
out[act.Code] = map[types.TipSetKey][]actorInfo{}
|
|
}
|
|
out[act.Code][pts.Key()] = append(out[act.Code][pts.Key()], actorInfo{
|
|
act: act,
|
|
stateroot: bh.ParentStateRoot,
|
|
height: bh.Height,
|
|
tsKey: pts.Key(),
|
|
parentTsKey: pts.Parents(),
|
|
addr: addr,
|
|
state: string(state),
|
|
})
|
|
}
|
|
actorsSeen[act.Head] = struct{}{}
|
|
outMu.Unlock()
|
|
}
|
|
})
|
|
return out, nil
|
|
}
|
|
|
|
func (p *Processor) unprocessedBlocks(ctx context.Context, batch int) (map[cid.Cid]*types.BlockHeader, error) {
|
|
start := time.Now()
|
|
defer func() {
|
|
log.Infow("Gathered Blocks to process", "duration", time.Since(start).String())
|
|
}()
|
|
rows, err := p.db.Query(`
|
|
with toProcess as (
|
|
select blocks.cid, blocks.height, rank() over (order by height) as rnk
|
|
from blocks
|
|
left join blocks_synced bs on blocks.cid = bs.cid
|
|
where bs.processed_at is null and blocks.height > 0
|
|
)
|
|
select cid
|
|
from toProcess
|
|
where rnk <= $1
|
|
`, batch)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("Failed to query for unprocessed blocks: %w", err)
|
|
}
|
|
out := map[cid.Cid]*types.BlockHeader{}
|
|
|
|
// TODO consider parallel execution here for getting the blocks from the api as is done in fetchMessages()
|
|
for rows.Next() {
|
|
if rows.Err() != nil {
|
|
return nil, err
|
|
}
|
|
var c string
|
|
if err := rows.Scan(&c); err != nil {
|
|
return nil, xerrors.Errorf("Failed to scan unprocessed blocks: %w", err)
|
|
}
|
|
ci, err := cid.Parse(c)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("Failed to parse unprocessed blocks: %w", err)
|
|
}
|
|
bh, err := p.node.ChainGetBlock(ctx, ci)
|
|
if err != nil {
|
|
// this is a pretty serious issue.
|
|
return nil, xerrors.Errorf("Failed to get block header %s: %w", ci.String(), err)
|
|
}
|
|
out[ci] = bh
|
|
}
|
|
return out, rows.Close()
|
|
}
|
|
|
|
func (p *Processor) markBlocksProcessed(ctx context.Context, processed map[cid.Cid]*types.BlockHeader) error {
|
|
start := time.Now()
|
|
defer func() {
|
|
log.Infow("Marked blocks as Processed", "duration", time.Since(start).String())
|
|
}()
|
|
tx, err := p.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
processedAt := time.Now().Unix()
|
|
stmt, err := tx.Prepare(`update blocks_synced set processed_at=$1 where cid=$2`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for c := range processed {
|
|
if _, err := stmt.Exec(processedAt, c.String()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if err := stmt.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|