package processor import ( "bytes" "context" "fmt" //"strings" "sync" "time" "github.com/filecoin-project/go-address" //"github.com/filecoin-project/lotus/chain/events/state" "github.com/filecoin-project/specs-actors/actors/abi" "golang.org/x/sync/errgroup" "golang.org/x/xerrors" //"github.com/libp2p/go-libp2p-core/peer" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/power" "github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util" ) func (p *Processor) setupMiners() error { tx, err := p.db.Begin() if err != nil { return err } if _, err := tx.Exec(` create table if not exists miner_sectors ( miner_id text not null, sector_id bigint not null, activation_epoch bigint not null, expiration_epoch bigint not null, termination_epoch bigint, deal_weight text not null, verified_deal_weight text not null, seal_cid text not null, seal_rand_epoch bigint not null, constraint miner_sectors_pk primary key (miner_id, sector_id) ); create index if not exists miner_sectors_miner_sectorid_index on miner_sectors (miner_id, sector_id); create table if not exists miner_info ( miner_id text not null, owner_addr text not null, worker_addr text not null, peer_id text, sector_size text not null, precommit_deposits text not null, locked_funds text not null, next_deadline_process_faults bigint not null, constraint miner_info_pk primary key (miner_id) ); /* * captures miner-specific power state for any given stateroot */ create table if not exists miner_power ( miner_id text not null, state_root text not null, raw_bytes_power text not null, quality_adjusted_power text not null, constraint miner_power_pk primary key (miner_id, state_root) ); /* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */ create table if not exists miner_sectors_heads ( miner_id text not null, miner_sectors_cid text not null, state_root text not null, constraint miner_sectors_heads_pk primary key (miner_id,miner_sectors_cid) ); DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'miner_sector_event_type') THEN CREATE TYPE miner_sector_event_type AS ENUM ( 'ADDED','EXTENDED', 'EXPIRED', 'TERMINATED' ); END IF; END$$; create table if not exists miner_sector_events ( miner_id text not null, sector_id bigint not null, state_root text not null, event miner_sector_event_type not null, constraint miner_sector_events_pk primary key (sector_id, event, miner_id, state_root) ) `); err != nil { return err } return tx.Commit() } type minerActorInfo struct { common actorInfo state miner.State // tracked by power actor rawPower big.Int qalPower big.Int } type sectorUpdate struct { terminationEpoch abi.ChainEpoch terminated bool expirationEpoch abi.ChainEpoch sectorID abi.SectorNumber minerID address.Address } func (p *Processor) HandleMinerChanges(ctx context.Context, minerTips ActorTips) error { minerChanges, err := p.processMiners(ctx, minerTips) if err != nil { log.Fatalw("Failed to process miner actors", "error", err) } if err := p.persistMiners(ctx, minerChanges); err != nil { log.Fatalw("Failed to persist miner actors", "error", err) } if err := p.updateMiners(ctx, minerChanges); err != nil { log.Fatalw("Failed to update miner actors", "error", err) } return nil } func (p *Processor) processMiners(ctx context.Context, minerTips map[types.TipSetKey][]actorInfo) ([]minerActorInfo, error) { start := time.Now() defer func() { log.Debugw("Processed Miners", "duration", time.Since(start).String()) }() var out []minerActorInfo // TODO add parallel calls if this becomes slow for tipset, miners := range minerTips { // get the power actors claims map minersClaims, err := getPowerActorClaimsMap(ctx, p.node, tipset) if err != nil { return nil, err } // Get miner raw and quality power for _, act := range miners { var mi minerActorInfo mi.common = act var claim power.Claim // get miner claim from power actors claim map and store if found, else the miner had no claim at // this tipset found, err := minersClaims.Get(adt.AddrKey(act.addr), &claim) if err != nil { return nil, err } if found { mi.qalPower = claim.QualityAdjPower mi.rawPower = claim.RawBytePower } // Get the miner state info astb, err := p.node.ChainReadObj(ctx, act.act.Head) if err != nil { log.Warnw("failed to find miner actor state", "address", act.addr, "error", err) continue } if err := mi.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil { return nil, err } out = append(out, mi) } } return out, nil } func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) error { start := time.Now() defer func() { log.Debugw("Persisted Miners", "duration", time.Since(start).String()) }() grp, _ := errgroup.WithContext(ctx) grp.Go(func() error { if err := p.storeMinersActorState(miners); err != nil { return err } return nil }) grp.Go(func() error { if err := p.storeMinersPower(miners); err != nil { return err } return nil }) grp.Go(func() error { if err := p.storeMinersSectorState(miners); err != nil { return err } return nil }) grp.Go(func() error { if err := p.storeMinersSectorHeads(miners); err != nil { return err } return nil }) return grp.Wait() } func (p *Processor) storeMinersActorState(miners []minerActorInfo) error { start := time.Now() defer func() { log.Debugw("Stored Miners Actor State", "duration", time.Since(start).String()) }() tx, err := p.db.Begin() if err != nil { return err } if _, err := tx.Exec(`create temp table mi (like miner_info excluding constraints) on commit drop;`); err != nil { return xerrors.Errorf("prep temp: %w", err) } stmt, err := tx.Prepare(`copy mi (miner_id, owner_addr, worker_addr, peer_id, sector_size, precommit_deposits, locked_funds, next_deadline_process_faults) from STDIN`) if err != nil { return err } // TODO: Consume new Actor API //for _, m := range miners { //var pid string //if len(m.state.Info.PeerId) != 0 { //peerid, err := peer.IDFromBytes(m.state.Info.PeerId) //if err != nil { //// this should "never happen", but if it does we should still store info about the miner. //log.Warnw("failed to decode peerID", "peerID (bytes)", m.state.Info.PeerId, "miner", m.common.addr, "tipset", m.common.tsKey.String()) //} else { //pid = peerid.String() //} //} //if _, err := stmt.Exec( //m.common.addr.String(), //m.state.Info.Owner.String(), //m.state.Info.Worker.String(), //pid, //m.state.Info.SectorSize.ShortString(), //m.state.PreCommitDeposits.String(), //m.state.LockedFunds.String(), //m.state.NextDeadlineToProcessFaults, //); err != nil { //log.Errorw("failed to store miner state", "state", m.state, "info", m.state.Info, "error", err) //return xerrors.Errorf("failed to store miner state: %w", err) //} //} if err := stmt.Close(); err != nil { return err } if _, err := tx.Exec(`insert into miner_info select * from mi on conflict do nothing `); err != nil { return xerrors.Errorf("actor put: %w", err) } return tx.Commit() } func (p *Processor) storeMinersPower(miners []minerActorInfo) error { start := time.Now() defer func() { log.Debugw("Stored Miners Power", "duration", time.Since(start).String()) }() tx, err := p.db.Begin() if err != nil { return xerrors.Errorf("begin miner_power tx: %w", err) } if _, err := tx.Exec(`create temp table mp (like miner_power excluding constraints) on commit drop`); err != nil { return xerrors.Errorf("prep miner_power temp: %w", err) } stmt, err := tx.Prepare(`copy mp (miner_id, state_root, raw_bytes_power, quality_adjusted_power) from STDIN`) if err != nil { return xerrors.Errorf("prepare tmp miner_power: %w", err) } for _, m := range miners { if _, err := stmt.Exec( m.common.addr.String(), m.common.stateroot.String(), m.rawPower.String(), m.qalPower.String(), ); err != nil { log.Errorw("failed to store miner power", "miner", m.common.addr, "stateroot", m.common.stateroot, "error", err) } } if err := stmt.Close(); err != nil { return xerrors.Errorf("close prepared miner_power: %w", err) } if _, err := tx.Exec(`insert into miner_power select * from mp on conflict do nothing`); err != nil { return xerrors.Errorf("insert miner_power from tmp: %w", err) } if err := tx.Commit(); err != nil { return xerrors.Errorf("commit miner_power tx: %w", err) } return nil } func (p *Processor) storeMinersSectorState(miners []minerActorInfo) error { start := time.Now() defer func() { log.Debugw("Stored Miners Sector State", "duration", time.Since(start).String()) }() tx, err := p.db.Begin() if err != nil { return err } if _, err := tx.Exec(`create temp table ms (like miner_sectors excluding constraints) on commit drop;`); err != nil { return xerrors.Errorf("prep temp: %w", err) } stmt, err := tx.Prepare(`copy ms (miner_id, sector_id, activation_epoch, expiration_epoch, deal_weight, verified_deal_weight, seal_cid, seal_rand_epoch) from STDIN`) if err != nil { return err } grp, ctx := errgroup.WithContext(context.TODO()) for _, m := range miners { m := m grp.Go(func() error { sectors, err := p.node.StateMinerSectors(ctx, m.common.addr, nil, true, m.common.tsKey) if err != nil { log.Debugw("Failed to load sectors", "tipset", m.common.tsKey.String(), "miner", m.common.addr.String(), "error", err) } for _, sector := range sectors { if _, err := stmt.Exec( m.common.addr.String(), uint64(sector.ID), // TODO: Consume new Actor API //int64(sector.Info.ActivationEpoch), 0, //int64(sector.Info.Info.Expiration), 0, sector.Info.DealWeight.String(), sector.Info.VerifiedDealWeight.String(), //sector.Info.Info.SealedCID.String(), "", //int64(sector.Info.Info.SealRandEpoch), 0, ); err != nil { return err } } return nil }) } if err := grp.Wait(); err != nil { return err } if err := stmt.Close(); err != nil { return err } if _, err := tx.Exec(`insert into miner_sectors select * from ms on conflict do nothing `); err != nil { return xerrors.Errorf("actor put: %w", err) } return tx.Commit() } func (p *Processor) storeMinersSectorHeads(miners []minerActorInfo) error { start := time.Now() defer func() { log.Debugw("Stored Miners Sector Heads", "duration", time.Since(start).String()) }() tx, err := p.db.Begin() if err != nil { return err } if _, err := tx.Exec(`create temp table msh (like miner_sectors_heads excluding constraints) on commit drop;`); err != nil { return xerrors.Errorf("prep temp: %w", err) } stmt, err := tx.Prepare(`copy msh (miner_id, miner_sectors_cid, state_root) from STDIN`) if err != nil { return err } for _, m := range miners { if _, err := stmt.Exec( m.common.addr.String(), m.state.Sectors.String(), m.common.stateroot.String(), ); err != nil { log.Errorw("failed to store miners sectors head", "state", m.state, "info", m.state.Info, "error", err) return err } } if err := stmt.Close(); err != nil { return err } if _, err := tx.Exec(`insert into miner_sectors_heads select * from msh on conflict do nothing `); err != nil { return xerrors.Errorf("actor put: %w", err) } return tx.Commit() } func (p *Processor) updateMiners(ctx context.Context, miners []minerActorInfo) error { // TODO when/if there is more than one update operation here use an errgroup as is done in persistMiners if err := p.updateMinersSectors(ctx, miners); err != nil { return err } return nil } func (p *Processor) updateMinersSectors(ctx context.Context, miners []minerActorInfo) error { log.Debugw("Updating Miners Sectors", "#miners", len(miners)) start := time.Now() defer func() { log.Debugw("Updated Miners Sectors", "duration", time.Since(start).String()) }() //pred := state.NewStatePredicates(p.node) eventTx, err := p.db.Begin() if err != nil { return err } if _, err := eventTx.Exec(`create temp table mse (like miner_sector_events excluding constraints) on commit drop;`); err != nil { return xerrors.Errorf("prep temp: %w", err) } eventStmt, err := eventTx.Prepare(`copy mse (sector_id, event, miner_id, state_root) from STDIN `) if err != nil { return err } var updateWg sync.WaitGroup updateWg.Add(1) sectorUpdatesCh := make(chan sectorUpdate) var sectorUpdates []sectorUpdate go func() { for u := range sectorUpdatesCh { sectorUpdates = append(sectorUpdates, u) } updateWg.Done() }() // TODO: Resolve Actor interface shift //minerGrp, ctx := errgroup.WithContext(ctx) //complete := 0 //for _, m := range miners { //m := m //minerGrp.Go(func() error { //// special case genesis miners //sectorDiffFn := pred.OnMinerActorChange(m.common.addr, pred.OnMinerSectorChange()) //changed, val, err := sectorDiffFn(ctx, m.common.parentTsKey, m.common.tsKey) //if err != nil { //if strings.Contains(err.Error(), "address not found") { //return nil //} //log.Errorw("error getting miner sector diff", "miner", m.common.addr, "error", err) //return err //} //if !changed { //complete++ //return nil //} //changes, ok := val.(*state.MinerSectorChanges) //if !ok { //log.Fatalw("Developer Error") //} //log.Debugw("sector changes for miner", "miner", m.common.addr.String(), "Added", len(changes.Added), "Extended", len(changes.Extended), "Removed", len(changes.Removed), "oldState", m.common.parentTsKey, "newState", m.common.tsKey) //for _, extended := range changes.Extended { //if _, err := eventStmt.Exec(extended.To.Info.SectorNumber, "EXTENDED", m.common.addr.String(), m.common.stateroot.String()); err != nil { //return err //} //sectorUpdatesCh <- sectorUpdate{ //terminationEpoch: 0, //terminated: false, //expirationEpoch: extended.To.Info.Expiration, //sectorID: extended.From.Info.SectorNumber, //minerID: m.common.addr, //} //log.Debugw("sector extended", "miner", m.common.addr.String(), "sector", extended.To.Info.SectorNumber, "old", extended.To.Info.Expiration, "new", extended.From.Info.Expiration) //} //curTs, err := p.node.ChainGetTipSet(ctx, m.common.tsKey) //if err != nil { //return err //} //for _, removed := range changes.Removed { //log.Debugw("removed", "miner", m.common.addr) //// decide if they were terminated or extended //if removed.Info.Expiration > curTs.Height() { //if _, err := eventStmt.Exec(removed.Info.SectorNumber, "TERMINATED", m.common.addr.String(), m.common.stateroot.String()); err != nil { //return err //} //log.Debugw("sector terminated", "miner", m.common.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "terminationEpoch", curTs.Height()) //sectorUpdatesCh <- sectorUpdate{ //terminationEpoch: curTs.Height(), //terminated: true, //expirationEpoch: removed.Info.Expiration, //sectorID: removed.Info.SectorNumber, //minerID: m.common.addr, //} //} //if _, err := eventStmt.Exec(removed.Info.SectorNumber, "EXPIRED", m.common.addr.String(), m.common.stateroot.String()); err != nil { //return err //} //log.Debugw("sector removed", "miner", m.common.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "currEpoch", curTs.Height()) //} //for _, added := range changes.Added { //if _, err := eventStmt.Exec(added.Info.SectorNumber, "ADDED", m.common.addr.String(), m.common.stateroot.String()); err != nil { //return err //} //} //complete++ //log.Debugw("Update Done", "complete", complete, "added", len(changes.Added), "removed", len(changes.Removed), "modified", len(changes.Extended)) //return nil //}) //} //if err := minerGrp.Wait(); err != nil { //return err //} close(sectorUpdatesCh) // wait for the update channel to be drained updateWg.Wait() if err := eventStmt.Close(); err != nil { return err } if _, err := eventTx.Exec(`insert into miner_sector_events select * from mse on conflict do nothing `); err != nil { return xerrors.Errorf("actor put: %w", err) } if err := eventTx.Commit(); err != nil { return err } updateTx, err := p.db.Begin() if err != nil { return err } updateStmt, err := updateTx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1, expiration_epoch=$2 WHERE miner_id=$3 AND sector_id=$4`) if err != nil { return err } for _, update := range sectorUpdates { if update.terminated { if _, err := updateStmt.Exec(update.terminationEpoch, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil { return err } } else { if _, err := updateStmt.Exec(nil, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil { return err } } } if err := updateStmt.Close(); err != nil { return err } return updateTx.Commit() } // load the power actor state clam as an adt.Map at the tipset `ts`. func getPowerActorClaimsMap(ctx context.Context, api api.FullNode, ts types.TipSetKey) (*adt.Map, error) { powerActor, err := api.StateGetActor(ctx, builtin.StoragePowerActorAddr, ts) if err != nil { return nil, err } powerRaw, err := api.ChainReadObj(ctx, powerActor.Head) if err != nil { return nil, err } var powerActorState power.State if err := powerActorState.UnmarshalCBOR(bytes.NewReader(powerRaw)); err != nil { return nil, fmt.Errorf("failed to unmarshal power actor state: %w", err) } s := cw_util.NewAPIIpldStore(ctx, api) return adt.AsMap(s, powerActorState.Claims) }