lotus/cmd/lotus-chainwatch/processor/miner.go
frrist 41f4f1fd83 refactor: implement processor and syncer
- When chainwatch is ran it will first start a Syncer that continuously collects blocks from the
ChainNotify channel and persists them to the blocks_synced table. Once the Syncer has caught the
blocks_synced table up to the lotus daemons current head a Processor is started. The Processor
selects a batch of contiguous blocks and extracts and stores their data. It attempts to do as much
work as it can in parallel. When the blocks are done being processed their corresponding
processed_at and is_processed fields in the blocks_synced table are filled out.
2020-07-15 11:42:19 -07:00

641 lines
18 KiB
Go

package processor
import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/specs-actors/actors/abi"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util"
)
func (p *Processor) setupMiners() error {
tx, err := p.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`
create table if not exists miner_sectors
(
miner_id text not null,
sector_id bigint not null,
activation_epoch bigint not null,
expiration_epoch bigint not null,
termination_epoch bigint,
deal_weight text not null,
verified_deal_weight text not null,
seal_cid text not null,
seal_rand_epoch bigint not null,
constraint miner_sectors_pk
primary key (miner_id, sector_id)
);
create index if not exists miner_sectors_miner_sectorid_index
on miner_sectors (miner_id, sector_id);
create table if not exists miner_info
(
miner_id text not null,
owner_addr text not null,
worker_addr text not null,
peer_id text,
sector_size text not null,
precommit_deposits text not null,
locked_funds text not null,
next_deadline_process_faults bigint not null,
constraint miner_info_pk
primary key (miner_id)
);
/*
* captures miner-specific power state for any given stateroot
*/
create table if not exists miner_power
(
miner_id text not null,
state_root text not null,
raw_bytes_power text not null,
quality_adjusted_power text not null,
constraint miner_power_pk
primary key (miner_id, state_root)
);
/* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */
create table if not exists miner_sectors_heads
(
miner_id text not null,
miner_sectors_cid text not null,
state_root text not null,
constraint miner_sectors_heads_pk
primary key (miner_id,miner_sectors_cid)
);
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'miner_sector_event_type') THEN
CREATE TYPE miner_sector_event_type AS ENUM
(
'ADDED','EXTENDED', 'EXPIRED', 'TERMINATED'
);
END IF;
END$$;
create table if not exists miner_sector_events
(
miner_id text not null,
sector_id bigint not null,
state_root text not null,
event miner_sector_event_type not null,
constraint miner_sector_events_pk
primary key (sector_id, event, miner_id, state_root)
)
`); err != nil {
return err
}
return tx.Commit()
}
type minerActorInfo struct {
common actorInfo
state miner.State
// tracked by power actor
rawPower big.Int
qalPower big.Int
}
type sectorUpdate struct {
terminationEpoch abi.ChainEpoch
terminated bool
expirationEpoch abi.ChainEpoch
sectorID abi.SectorNumber
minerID address.Address
}
func (p *Processor) HandleMinerChanges(ctx context.Context, minerTips ActorTips) error {
minerChanges, err := p.processMiners(ctx, minerTips)
if err != nil {
log.Fatalw("Failed to process miner actors", "error", err)
}
if err := p.persistMiners(ctx, minerChanges); err != nil {
log.Fatalw("Failed to persist miner actors", "error", err)
}
if err := p.updateMiners(ctx, minerChanges); err != nil {
log.Fatalw("Failed to update miner actors", "error", err)
}
return nil
}
func (p *Processor) processMiners(ctx context.Context, minerTips map[types.TipSetKey][]actorInfo) ([]minerActorInfo, error) {
start := time.Now()
defer func() {
log.Infow("Processed Miners", "duration", time.Since(start).String())
}()
var out []minerActorInfo
// TODO add parallel calls if this becomes slow
for tipset, miners := range minerTips {
// get the power actors claims map
minersClaims, err := getPowerActorClaimsMap(ctx, p.node, tipset)
if err != nil {
return nil, err
}
// Get miner raw and quality power
for _, act := range miners {
var mi minerActorInfo
mi.common = act
var claim power.Claim
// get miner claim from power actors claim map and store if found, else the miner had no claim at
// this tipset
found, err := minersClaims.Get(adt.AddrKey(act.addr), &claim)
if err != nil {
return nil, err
}
if found {
mi.qalPower = claim.QualityAdjPower
mi.rawPower = claim.RawBytePower
}
// Get the miner state info
astb, err := p.node.ChainReadObj(ctx, act.act.Head)
if err != nil {
log.Warnw("failed to find miner actor state", "address", act.addr, "error", err)
continue
}
if err := mi.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil {
return nil, err
}
out = append(out, mi)
}
}
return out, nil
}
func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) error {
start := time.Now()
defer func() {
log.Infow("Persisted Miners", "duration", time.Since(start).String())
}()
grp, _ := errgroup.WithContext(ctx)
grp.Go(func() error {
if err := p.storeMinersActorState(miners); err != nil {
return err
}
return nil
})
grp.Go(func() error {
if err := p.storeMinersPower(miners); err != nil {
return err
}
return nil
})
grp.Go(func() error {
if err := p.storeMinersSectorState(miners); err != nil {
return err
}
return nil
})
grp.Go(func() error {
if err := p.storeMinersSectorHeads(miners); err != nil {
return err
}
return nil
})
return grp.Wait()
}
func (p *Processor) storeMinersActorState(miners []minerActorInfo) error {
start := time.Now()
defer func() {
log.Infow("Stored Miners Actor State", "duration", time.Since(start).String())
}()
tx, err := p.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`create temp table mi (like miner_info excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy mi (miner_id, owner_addr, worker_addr, peer_id, sector_size, precommit_deposits, locked_funds, next_deadline_process_faults) from STDIN`)
if err != nil {
return err
}
for _, m := range miners {
var pid string
if len(m.state.Info.PeerId) != 0 {
peerid, err := peer.IDFromBytes(m.state.Info.PeerId)
if err != nil {
// this should "never happen", but if it does we should still store info about the miner.
log.Warnw("failed to decode peerID", "peerID (bytes)", m.state.Info.PeerId, "miner", m.common.addr, "tipset", m.common.tsKey.String())
} else {
pid = peerid.String()
}
}
if _, err := stmt.Exec(
m.common.addr.String(),
m.state.Info.Owner.String(),
m.state.Info.Worker.String(),
pid,
m.state.Info.SectorSize.ShortString(),
m.state.PreCommitDeposits.String(),
m.state.LockedFunds.String(),
m.state.NextDeadlineToProcessFaults,
); err != nil {
log.Errorw("failed to store miner state", "state", m.state, "info", m.state.Info, "error", err)
return xerrors.Errorf("failed to store miner state: %w", err)
}
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into miner_info select * from mi on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit()
}
func (p *Processor) storeMinersPower(miners []minerActorInfo) error {
start := time.Now()
defer func() {
log.Infow("Stored Miners Power", "duration", time.Since(start).String())
}()
tx, err := p.db.Begin()
if err != nil {
return xerrors.Errorf("begin miner_power tx: %w", err)
}
if _, err := tx.Exec(`create temp table mp (like miner_power excluding constraints) on commit drop`); err != nil {
return xerrors.Errorf("prep miner_power temp: %w", err)
}
stmt, err := tx.Prepare(`copy mp (miner_id, state_root, raw_bytes_power, quality_adjusted_power) from STDIN`)
if err != nil {
return xerrors.Errorf("prepare tmp miner_power: %w", err)
}
for _, m := range miners {
if _, err := stmt.Exec(
m.common.addr.String(),
m.common.stateroot.String(),
m.rawPower.String(),
m.qalPower.String(),
); err != nil {
log.Errorw("failed to store miner power", "miner", m.common.addr, "stateroot", m.common.stateroot, "error", err)
}
}
if err := stmt.Close(); err != nil {
return xerrors.Errorf("close prepared miner_power: %w", err)
}
if _, err := tx.Exec(`insert into miner_power select * from mp on conflict do nothing`); err != nil {
return xerrors.Errorf("insert miner_power from tmp: %w", err)
}
if err := tx.Commit(); err != nil {
return xerrors.Errorf("commit miner_power tx: %w", err)
}
return nil
}
func (p *Processor) storeMinersSectorState(miners []minerActorInfo) error {
start := time.Now()
defer func() {
log.Infow("Stored Miners Sector State", "duration", time.Since(start).String())
}()
tx, err := p.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`create temp table ms (like miner_sectors excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy ms (miner_id, sector_id, activation_epoch, expiration_epoch, deal_weight, verified_deal_weight, seal_cid, seal_rand_epoch) from STDIN`)
if err != nil {
return err
}
grp, ctx := errgroup.WithContext(context.TODO())
for _, m := range miners {
m := m
grp.Go(func() error {
sectors, err := p.node.StateMinerSectors(ctx, m.common.addr, nil, true, m.common.tsKey)
if err != nil {
log.Debugw("Failed to load sectors", "tipset", m.common.tsKey.String(), "miner", m.common.addr.String(), "error", err)
}
for _, sector := range sectors {
if _, err := stmt.Exec(
m.common.addr.String(),
uint64(sector.ID),
int64(sector.Info.ActivationEpoch),
int64(sector.Info.Info.Expiration),
sector.Info.DealWeight.String(),
sector.Info.VerifiedDealWeight.String(),
sector.Info.Info.SealedCID.String(),
int64(sector.Info.Info.SealRandEpoch),
); err != nil {
return err
}
}
return nil
})
}
if err := grp.Wait(); err != nil {
return err
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into miner_sectors select * from ms on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit()
}
func (p *Processor) storeMinersSectorHeads(miners []minerActorInfo) error {
start := time.Now()
defer func() {
log.Infow("Stored Miners Sector Heads", "duration", time.Since(start).String())
}()
tx, err := p.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`create temp table msh (like miner_sectors_heads excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy msh (miner_id, miner_sectors_cid, state_root) from STDIN`)
if err != nil {
return err
}
for _, m := range miners {
if _, err := stmt.Exec(
m.common.addr.String(),
m.state.Sectors.String(),
m.common.stateroot.String(),
); err != nil {
log.Errorw("failed to store miners sectors head", "state", m.state, "info", m.state.Info, "error", err)
return err
}
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into miner_sectors_heads select * from msh on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit()
}
func (p *Processor) updateMiners(ctx context.Context, miners []minerActorInfo) error {
// TODO when/if there is more than one update operation here use an errgroup as is done in persistMiners
if err := p.updateMinersSectors(ctx, miners); err != nil {
return err
}
return nil
}
func (p *Processor) updateMinersSectors(ctx context.Context, miners []minerActorInfo) error {
log.Infow("Updating Miners Sectors", "#miners", len(miners))
start := time.Now()
defer func() {
log.Infow("Updated Miners Sectors", "duration", time.Since(start).String())
}()
pred := state.NewStatePredicates(p.node)
eventTx, err := p.db.Begin()
if err != nil {
return err
}
if _, err := eventTx.Exec(`create temp table mse (like miner_sector_events excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
eventStmt, err := eventTx.Prepare(`copy mse (sector_id, event, miner_id, state_root) from STDIN `)
if err != nil {
return err
}
var updateWg sync.WaitGroup
updateWg.Add(1)
sectorUpdatesCh := make(chan sectorUpdate)
var sectorUpdates []sectorUpdate
go func() {
for u := range sectorUpdatesCh {
sectorUpdates = append(sectorUpdates, u)
}
updateWg.Done()
}()
minerGrp, ctx := errgroup.WithContext(ctx)
complete := 0
for _, m := range miners {
m := m
minerGrp.Go(func() error {
// special case genesis miners
sectorDiffFn := pred.OnMinerActorChange(m.common.addr, pred.OnMinerSectorChange())
changed, val, err := sectorDiffFn(ctx, m.common.parentTsKey, m.common.tsKey)
if err != nil {
if strings.Contains(err.Error(), "address not found") {
return nil
}
log.Errorw("error getting miner sector diff", "miner", m.common.addr, "error", err)
return err
}
if !changed {
complete++
return nil
}
changes, ok := val.(*state.MinerSectorChanges)
if !ok {
log.Fatalw("Developer Error")
}
log.Debugw("sector changes for miner", "miner", m.common.addr.String(), "Added", len(changes.Added), "Extended", len(changes.Extended), "Removed", len(changes.Removed), "oldState", m.common.parentTsKey, "newState", m.common.tsKey)
for _, extended := range changes.Extended {
if _, err := eventStmt.Exec(extended.To.Info.SectorNumber, "EXTENDED", m.common.addr.String(), m.common.stateroot.String()); err != nil {
return err
}
sectorUpdatesCh <- sectorUpdate{
terminationEpoch: 0,
terminated: false,
expirationEpoch: extended.To.Info.Expiration,
sectorID: extended.From.Info.SectorNumber,
minerID: m.common.addr,
}
log.Infow("sector extended", "miner", m.common.addr.String(), "sector", extended.To.Info.SectorNumber, "old", extended.To.Info.Expiration, "new", extended.From.Info.Expiration)
}
curTs, err := p.node.ChainGetTipSet(ctx, m.common.tsKey)
if err != nil {
return err
}
for _, removed := range changes.Removed {
log.Infow("removed", "miner", m.common.addr)
// decide if they were terminated or extended
if removed.Info.Expiration > curTs.Height() {
if _, err := eventStmt.Exec(removed.Info.SectorNumber, "TERMINATED", m.common.addr.String(), m.common.stateroot.String()); err != nil {
return err
}
log.Infow("sector terminated", "miner", m.common.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "terminationEpoch", curTs.Height())
sectorUpdatesCh <- sectorUpdate{
terminationEpoch: curTs.Height(),
terminated: true,
expirationEpoch: removed.Info.Expiration,
sectorID: removed.Info.SectorNumber,
minerID: m.common.addr,
}
}
if _, err := eventStmt.Exec(removed.Info.SectorNumber, "EXPIRED", m.common.addr.String(), m.common.stateroot.String()); err != nil {
return err
}
log.Infow("sector removed", "miner", m.common.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "currEpoch", curTs.Height())
}
for _, added := range changes.Added {
if _, err := eventStmt.Exec(added.Info.SectorNumber, "ADDED", m.common.addr.String(), m.common.stateroot.String()); err != nil {
return err
}
}
complete++
log.Debugw("Update Done", "complete", complete, "added", len(changes.Added), "removed", len(changes.Removed), "modified", len(changes.Extended))
return nil
})
}
if err := minerGrp.Wait(); err != nil {
return err
}
close(sectorUpdatesCh)
// wait for the update channel to be drained
updateWg.Wait()
if err := eventStmt.Close(); err != nil {
return err
}
if _, err := eventTx.Exec(`insert into miner_sector_events select * from mse on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
if err := eventTx.Commit(); err != nil {
return err
}
updateTx, err := p.db.Begin()
if err != nil {
return err
}
updateStmt, err := updateTx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1, expiration_epoch=$2 WHERE miner_id=$3 AND sector_id=$4`)
if err != nil {
return err
}
for _, update := range sectorUpdates {
if update.terminated {
if _, err := updateStmt.Exec(update.terminationEpoch, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil {
return err
}
} else {
if _, err := updateStmt.Exec(nil, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil {
return err
}
}
}
if err := updateStmt.Close(); err != nil {
return err
}
return updateTx.Commit()
}
// load the power actor state clam as an adt.Map at the tipset `ts`.
func getPowerActorClaimsMap(ctx context.Context, api api.FullNode, ts types.TipSetKey) (*adt.Map, error) {
powerActor, err := api.StateGetActor(ctx, builtin.StoragePowerActorAddr, ts)
if err != nil {
return nil, err
}
powerRaw, err := api.ChainReadObj(ctx, powerActor.Head)
if err != nil {
return nil, err
}
var powerActorState power.State
if err := powerActorState.UnmarshalCBOR(bytes.NewReader(powerRaw)); err != nil {
return nil, fmt.Errorf("failed to unmarshal power actor state: %w", err)
}
s := cw_util.NewAPIIpldStore(ctx, api)
return adt.AsMap(s, powerActorState.Claims)
}