diff --git a/cmd/lotus-chainwatch/processor/miner.go b/cmd/lotus-chainwatch/processor/miner.go index 55c41c4fc..97af7df10 100644 --- a/cmd/lotus-chainwatch/processor/miner.go +++ b/cmd/lotus-chainwatch/processor/miner.go @@ -5,20 +5,15 @@ import ( "context" "fmt" "strings" - - //"strings" - "sync" "time" "github.com/filecoin-project/go-address" - //"github.com/filecoin-project/lotus/chain/events/state" - "github.com/filecoin-project/lotus/chain/events/state" - "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/go-bitfield" + "github.com/ipfs/go-cid" "golang.org/x/sync/errgroup" "golang.org/x/xerrors" - //"github.com/libp2p/go-libp2p-core/peer" - + "github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin" "github.com/filecoin-project/specs-actors/actors/builtin/miner" @@ -26,6 +21,7 @@ import ( "github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/events/state" "github.com/filecoin-project/lotus/chain/types" cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util" ) @@ -46,13 +42,55 @@ create table if not exists miner_info peer_id text, sector_size text not null, - precommit_deposits text not null, - locked_funds text not null, - next_deadline_process_faults bigint not null, constraint miner_info_pk primary key (miner_id) ); +create table if not exists sector_precommit_info +( + miner_id text not null, + sector_id bigint not null, + sealed_cid text not null, + state_root text not null, + + seal_rand_epoch bigint not null, + expiration_epoch bigint not null, + + precommit_deposit text not null, + precommit_epoch bigint not null, + deal_weight text not null, + verified_deal_weight text not null, + + + is_replace_capacity bool not null, + replace_sector_deadline bigint, + replace_sector_partition bigint, + replace_sector_number bigint, + + constraint sector_precommit_info_pk + primary key (miner_id, sector_id, sealed_cid) + +); + +create table if not exists sector_info +( + miner_id text not null, + sector_id bigint not null, + sealed_cid text not null, + state_root text not null, + + activation_epoch bigint not null, + expiration_epoch bigint not null, + + deal_weight text not null, + verified_deal_weight text not null, + + initial_pledge text not null, + + constraint sector_info_pk + primary key (miner_id, sector_id, sealed_cid) +); + /* * captures miner-specific power state for any given stateroot */ @@ -66,55 +104,13 @@ create table if not exists miner_power primary key (miner_id, state_root) ); -create table if not exists miner_precommits -( - miner_id text not null, - sector_id bigint not null, - - precommit_deposit text not null, - precommit_epoch text not null, - constraint miner_precommits_pk - primary key (miner_id, sector_id) - -); - -create table if not exists miner_sectors -( - miner_id text not null, - sector_id bigint not null, - - activation_epoch bigint not null, - expiration_epoch bigint not null, - termination_epoch bigint, - - deal_weight text not null, - verified_deal_weight text not null, - seal_cid text not null, - seal_rand_epoch bigint not null, - constraint miner_sectors_pk - primary key (miner_id, sector_id) -); - -/* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */ -create table if not exists miner_sectors_heads -( - miner_id text not null, - miner_sectors_cid text not null, - - state_root text not null, - - constraint miner_sectors_heads_pk - primary key (miner_id,miner_sectors_cid) - -); - - DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'miner_sector_event_type') THEN CREATE TYPE miner_sector_event_type AS ENUM ( - 'PRECOMMIT', 'COMMIT', 'EXTENDED', 'EXPIRED', 'TERMINATED' + 'PRECOMMIT_ADDED', 'PRECOMMIT_EXPIRED', 'COMMIT_CAPACITY_ADDED', 'SECTOR_ADDED', + 'SECTOR_EXTENDED', 'SECTOR_EXPIRED', 'SECTOR_FAULTED', 'SECTOR_RECOVERING', 'SECTOR_RECOVERED', 'SECTOR_TERMINATED' ); END IF; END$$; @@ -130,11 +126,6 @@ create table if not exists miner_sector_events primary key (sector_id, event, miner_id, state_root) ); -create materialized view if not exists miner_sectors_view as -select ms.miner_id, ms.sector_id, mp.precommit_epoch, ms.activation_epoch, ms.expiration_epoch, ms.termination_epoch, ms.deal_weight, ms.verified_deal_weight -from miner_sectors ms -left join miner_precommits mp on ms.sector_id = mp.sector_id and ms.miner_id = mp.miner_id - `); err != nil { return err } @@ -142,6 +133,38 @@ left join miner_precommits mp on ms.sector_id = mp.sector_id and ms.miner_id = m return tx.Commit() } +type SectorLifecycleEvent string + +const ( + PreCommitAdded = "PRECOMMIT_ADDED" + PreCommitExpired = "PRECOMMIT_EXPIRED" + + CommitCapacityAdded = "COMMIT_CAPACITY_ADDED" + + SectorAdded = "SECTOR_ADDED" + SectorExpired = "SECTOR_EXPIRED" + SectorExtended = "SECTOR_EXTENDED" + SectorFaulted = "SECTOR_FAULTED" + SectorRecovering = "SECTOR_RECOVERING" + SectorRecovered = "SECTOR_RECOVERED" + SectorTerminated = "SECTOR_TERMINATED" +) + +type MinerSectorsEvent struct { + MinerID address.Address + SectorIDs []uint64 + StateRoot cid.Cid + Event SectorLifecycleEvent +} + +type PartitionStatus struct { + Terminated *abi.BitField + Expired *abi.BitField + Faulted *abi.BitField + InRecovery *abi.BitField + Recovered *abi.BitField +} + type minerActorInfo struct { common actorInfo @@ -152,16 +175,6 @@ type minerActorInfo struct { qalPower big.Int } -type sectorUpdate struct { - terminationEpoch abi.ChainEpoch - terminated bool - - expirationEpoch abi.ChainEpoch - - sectorID abi.SectorNumber - minerID address.Address -} - func (p *Processor) HandleMinerChanges(ctx context.Context, minerTips ActorTips) error { minerChanges, err := p.processMiners(ctx, minerTips) if err != nil { @@ -172,9 +185,6 @@ func (p *Processor) HandleMinerChanges(ctx context.Context, minerTips ActorTips) log.Fatalw("Failed to persist miner actors", "error", err) } - if err := p.updateMiners(ctx, minerChanges); err != nil { - log.Fatalw("Failed to update miner actors", "error", err) - } return nil } @@ -233,13 +243,6 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) grp, _ := errgroup.WithContext(ctx) - grp.Go(func() error { - if err := p.storeMinersActorState(miners); err != nil { - return err - } - return nil - }) - grp.Go(func() error { if err := p.storeMinersPower(miners); err != nil { return err @@ -248,30 +251,588 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) }) grp.Go(func() error { - if err := p.storeMinersSectorState(ctx, miners); err != nil { + if err := p.storeMinersActorInfoState(ctx, miners); err != nil { return err } return nil }) + // 8 is arbitrary, idk what a good value here is. + preCommitEvents := make(chan *MinerSectorsEvent, 8) + sectorEvents := make(chan *MinerSectorsEvent, 8) + partitionEvents := make(chan *MinerSectorsEvent, 8) + grp.Go(func() error { - if err := p.storeMinersSectorHeads(miners); err != nil { - return err - } - return nil + return p.storeMinerSectorEvents(ctx, sectorEvents, preCommitEvents, partitionEvents) }) grp.Go(func() error { - if err := p.storeMinersPreCommitState(ctx, miners); err != nil { - return err - } - return nil + defer close(preCommitEvents) + return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents) + }) + + grp.Go(func() error { + defer close(sectorEvents) + return p.storeMinerSectorInfo(ctx, miners, sectorEvents) + }) + + grp.Go(func() error { + defer close(partitionEvents) + return p.getMinerPartitionsDifferences(ctx, miners, partitionEvents) }) return grp.Wait() } -func (p *Processor) storeMinersActorState(miners []minerActorInfo) error { +func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerActorInfo, events chan<- *MinerSectorsEvent) error { + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(`create temp table spi (like sector_precommit_info excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("Failed to create temp table for sector_precommit_info: %w", err) + } + + stmt, err := tx.Prepare(`copy spi (miner_id, sector_id, sealed_cid, state_root, seal_rand_epoch, expiration_epoch, precommit_deposit, precommit_epoch, deal_weight, verified_deal_weight, is_replace_capacity, replace_sector_deadline, replace_sector_partition, replace_sector_number) from STDIN`) + if err != nil { + return xerrors.Errorf("Failed to prepare miner precommit info statement: %w", err) + } + + for _, m := range miners { + minerSectors, err := adt.AsArray(p.ctxStore, m.state.Sectors) + if err != nil { + return err + } + + changes, err := p.getMinerPreCommitChanges(ctx, m) + if err != nil { + if strings.Contains(err.Error(), "address not found") { + continue + } else { + return err + } + } + if changes == nil { + continue + } + + preCommitAdded := make([]uint64, len(changes.Added)) + for i, added := range changes.Added { + if added.Info.ReplaceCapacity { + if _, err := stmt.Exec( + m.common.addr.String(), + added.Info.SectorNumber, + added.Info.SealedCID.String(), + m.common.stateroot.String(), + added.Info.SealRandEpoch, + added.Info.Expiration, + added.PreCommitDeposit.String(), + added.PreCommitEpoch, + added.DealWeight.String(), + added.VerifiedDealWeight.String(), + added.Info.ReplaceCapacity, + added.Info.ReplaceSectorDeadline, + added.Info.ReplaceSectorPartition, + added.Info.ReplaceSectorNumber, + ); err != nil { + return err + } + } else { + if _, err := stmt.Exec( + m.common.addr.String(), + added.Info.SectorNumber, + added.Info.SealedCID.String(), + m.common.stateroot.String(), + added.Info.SealRandEpoch, + added.Info.Expiration, + added.PreCommitDeposit.String(), + added.PreCommitEpoch, + added.DealWeight.String(), + added.VerifiedDealWeight.String(), + added.Info.ReplaceCapacity, + nil, // replace deadline + nil, // replace partition + nil, // replace sector + ); err != nil { + return err + } + + } + preCommitAdded[i] = uint64(added.Info.SectorNumber) + } + if len(preCommitAdded) > 0 { + events <- &MinerSectorsEvent{ + MinerID: m.common.addr, + StateRoot: m.common.stateroot, + SectorIDs: preCommitAdded, + Event: PreCommitAdded, + } + } + var preCommitExpired []uint64 + for _, removed := range changes.Removed { + var sector miner.SectorOnChainInfo + if found, err := minerSectors.Get(uint64(removed.Info.SectorNumber), §or); err != nil { + return err + } else if !found { + preCommitExpired = append(preCommitExpired, uint64(removed.Info.SectorNumber)) + } + } + if len(preCommitExpired) > 0 { + events <- &MinerSectorsEvent{ + MinerID: m.common.addr, + StateRoot: m.common.stateroot, + SectorIDs: preCommitExpired, + Event: PreCommitExpired, + } + } + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("Failed to close sector precommit info statement: %w", err) + } + + if _, err := tx.Exec(`insert into sector_precommit_info select * from spi on conflict do nothing`); err != nil { + return xerrors.Errorf("Failed to insert into sector precommit info table: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("Failed to commit sector precommit info: %w", err) + } + return nil +} + +func (p *Processor) storeMinerSectorInfo(ctx context.Context, miners []minerActorInfo, events chan<- *MinerSectorsEvent) error { + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(`create temp table si (like sector_info excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("Failed to create temp table for sector_: %w", err) + } + + stmt, err := tx.Prepare(`copy si (miner_id, sector_id, sealed_cid, state_root, activation_epoch, expiration_epoch, deal_weight, verified_deal_weight, initial_pledge) from STDIN`) + if err != nil { + return xerrors.Errorf("Failed to prepare miner sector info statement: %w", err) + } + + for _, m := range miners { + changes, err := p.getMinerSectorChanges(ctx, m) + if err != nil { + if strings.Contains(err.Error(), "address not found") { + continue + } else { + return err + } + } + if changes == nil { + continue + } + var sectorsAdded []uint64 + var ccAdded []uint64 + var extended []uint64 + for _, added := range changes.Added { + // add the sector to the table + if _, err := stmt.Exec( + m.common.addr.String(), + added.SectorNumber, + added.SealedCID.String(), + m.common.stateroot.String(), + added.Activation.String(), + added.Expiration.String(), + added.DealWeight.String(), + added.VerifiedDealWeight.String(), + added.InitialPledge.String(), + ); err != nil { + return err + } + if len(added.DealIDs) == 0 { + ccAdded = append(ccAdded, uint64(added.SectorNumber)) + } else { + sectorsAdded = append(sectorsAdded, uint64(added.SectorNumber)) + } + } + + for _, mod := range changes.Extended { + extended = append(extended, uint64(mod.To.SectorNumber)) + } + + events <- &MinerSectorsEvent{ + MinerID: m.common.addr, + StateRoot: m.common.stateroot, + SectorIDs: ccAdded, + Event: CommitCapacityAdded, + } + events <- &MinerSectorsEvent{ + MinerID: m.common.addr, + StateRoot: m.common.stateroot, + SectorIDs: sectorsAdded, + Event: SectorAdded, + } + events <- &MinerSectorsEvent{ + MinerID: m.common.addr, + StateRoot: m.common.stateroot, + SectorIDs: extended, + Event: SectorExtended, + } + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("Failed to close sector info statement: %w", err) + } + + if _, err := tx.Exec(`insert into sector_info select * from si on conflict do nothing`); err != nil { + return xerrors.Errorf("Failed to insert into sector info table: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("Failed to commit sector info: %w", err) + } + return nil + +} + +func (p *Processor) getMinerPartitionsDifferences(ctx context.Context, miners []minerActorInfo, events chan<- *MinerSectorsEvent) error { + grp, ctx := errgroup.WithContext(ctx) + for _, m := range miners { + m := m + grp.Go(func() error { + if err := p.diffMinerPartitions(ctx, m, events); err != nil { + if strings.Contains(err.Error(), "address not found") { + return nil + } + return err + } + return nil + }) + } + return grp.Wait() +} + +func (p *Processor) storeMinerSectorEvents(ctx context.Context, sectorEvents, preCommitEvents, partitionEvents <-chan *MinerSectorsEvent) error { + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(`create temp table mse (like miner_sector_events excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("Failed to create temp table for sector_: %w", err) + } + + stmt, err := tx.Prepare(`copy mse (miner_id, sector_id, event, state_root) from STDIN`) + if err != nil { + return xerrors.Errorf("Failed to prepare miner sector info statement: %w", err) + } + + grp, ctx := errgroup.WithContext(ctx) + grp.Go(func() error { + innerGrp, _ := errgroup.WithContext(ctx) + for mse := range sectorEvents { + mse := mse + innerGrp.Go(func() error { + for _, sid := range mse.SectorIDs { + if _, err := stmt.Exec( + mse.MinerID.String(), + sid, + mse.Event, + mse.StateRoot.String(), + ); err != nil { + return err + } + } + return nil + }) + } + return innerGrp.Wait() + }) + + grp.Go(func() error { + innerGrp, _ := errgroup.WithContext(ctx) + for mse := range preCommitEvents { + mse := mse + innerGrp.Go(func() error { + for _, sid := range mse.SectorIDs { + if _, err := stmt.Exec( + mse.MinerID.String(), + sid, + mse.Event, + mse.StateRoot.String(), + ); err != nil { + return err + } + } + return nil + }) + } + return innerGrp.Wait() + }) + + grp.Go(func() error { + innerGrp, _ := errgroup.WithContext(ctx) + for mse := range partitionEvents { + mse := mse + grp.Go(func() error { + for _, sid := range mse.SectorIDs { + if _, err := stmt.Exec( + mse.MinerID.String(), + sid, + mse.Event, + mse.StateRoot.String(), + ); err != nil { + return err + } + } + return nil + }) + } + return innerGrp.Wait() + }) + + if err := grp.Wait(); err != nil { + return err + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("Failed to close sector event statement: %w", err) + } + + if _, err := tx.Exec(`insert into miner_sector_events select * from mse on conflict do nothing`); err != nil { + return xerrors.Errorf("Failed to insert into sector event table: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("Failed to commit sector events: %w", err) + } + return nil +} + +func (p *Processor) getMinerStateAt(ctx context.Context, maddr address.Address, tskey types.TipSetKey) (miner.State, error) { + prevActor, err := p.node.StateGetActor(ctx, maddr, tskey) + if err != nil { + return miner.State{}, err + } + var out miner.State + // Get the miner state info + astb, err := p.node.ChainReadObj(ctx, prevActor.Head) + if err != nil { + return miner.State{}, err + } + if err := out.UnmarshalCBOR(bytes.NewReader(astb)); err != nil { + return miner.State{}, err + } + return out, nil +} + +func (p *Processor) getMinerPreCommitChanges(ctx context.Context, m minerActorInfo) (*state.MinerPreCommitChanges, error) { + pred := state.NewStatePredicates(p.node) + changed, val, err := pred.OnMinerActorChange(m.common.addr, pred.OnMinerPreCommitChange())(ctx, m.common.parentTsKey, m.common.tsKey) + if err != nil { + return nil, xerrors.Errorf("Failed to diff miner precommit amt: %w", err) + } + if !changed { + return nil, nil + } + out := val.(*state.MinerPreCommitChanges) + return out, nil +} + +func (p *Processor) getMinerSectorChanges(ctx context.Context, m minerActorInfo) (*state.MinerSectorChanges, error) { + pred := state.NewStatePredicates(p.node) + changed, val, err := pred.OnMinerActorChange(m.common.addr, pred.OnMinerSectorChange())(ctx, m.common.parentTsKey, m.common.tsKey) + if err != nil { + return nil, xerrors.Errorf("Failed to diff miner sectors amt: %w", err) + } + if !changed { + return nil, nil + } + out := val.(*state.MinerSectorChanges) + return out, nil +} + +func (p *Processor) diffMinerPartitions(ctx context.Context, m minerActorInfo, events chan<- *MinerSectorsEvent) error { + prevMiner, err := p.getMinerStateAt(ctx, m.common.addr, m.common.tsKey) + if err != nil { + return err + } + dlIdx := prevMiner.CurrentDeadline + curMiner := m.state + + // load the old deadline + prevDls, err := prevMiner.LoadDeadlines(p.ctxStore) + if err != nil { + return err + } + var prevDl miner.Deadline + if err := p.ctxStore.Get(ctx, prevDls.Due[dlIdx], &prevDl); err != nil { + return err + } + + prevPartitions, err := prevDl.PartitionsArray(p.ctxStore) + if err != nil { + return err + } + + // load the new deadline + curDls, err := curMiner.LoadDeadlines(p.ctxStore) + if err != nil { + return err + } + + var curDl miner.Deadline + if err := p.ctxStore.Get(ctx, curDls.Due[dlIdx], &curDl); err != nil { + return err + } + + curPartitions, err := curDl.PartitionsArray(p.ctxStore) + if err != nil { + return err + } + + // TODO this can be optimized by inspecting the miner state for partitions that have changed and only inspecting those. + var prevPart miner.Partition + if err := prevPartitions.ForEach(&prevPart, func(i int64) error { + var curPart miner.Partition + if found, err := curPartitions.Get(uint64(i), &curPart); err != nil { + return err + } else if !found { + log.Fatal("I don't know what this means, are partitions ever removed?") + } + partitionDiff, err := p.diffPartition(prevPart, curPart) + if err != nil { + return err + } + + recovered, err := partitionDiff.Recovered.All(miner.SectorsMax) + if err != nil { + return err + } + events <- &MinerSectorsEvent{ + MinerID: m.common.addr, + StateRoot: m.common.stateroot, + SectorIDs: recovered, + Event: SectorRecovered, + } + inRecovery, err := partitionDiff.InRecovery.All(miner.SectorsMax) + if err != nil { + return err + } + events <- &MinerSectorsEvent{ + MinerID: m.common.addr, + StateRoot: m.common.stateroot, + SectorIDs: inRecovery, + Event: SectorRecovering, + } + faulted, err := partitionDiff.Faulted.All(miner.SectorsMax) + if err != nil { + return err + } + events <- &MinerSectorsEvent{ + MinerID: m.common.addr, + StateRoot: m.common.stateroot, + SectorIDs: faulted, + Event: SectorFaulted, + } + terminated, err := partitionDiff.Terminated.All(miner.SectorsMax) + if err != nil { + return err + } + events <- &MinerSectorsEvent{ + MinerID: m.common.addr, + StateRoot: m.common.stateroot, + SectorIDs: terminated, + Event: SectorTerminated, + } + expired, err := partitionDiff.Expired.All(miner.SectorsMax) + if err != nil { + return err + } + events <- &MinerSectorsEvent{ + MinerID: m.common.addr, + StateRoot: m.common.stateroot, + SectorIDs: expired, + Event: SectorExpired, + } + + return nil + }); err != nil { + return err + } + + return nil +} + +func (p *Processor) diffPartition(prevPart, curPart miner.Partition) (*PartitionStatus, error) { + // all the sectors that were in previous but not in current + allRemovedSectors, err := bitfield.SubtractBitField(prevPart.Sectors, curPart.Sectors) + if err != nil { + return nil, err + } + + // list of sectors that were terminated before their expiration. + terminatedEarlyArr, err := adt.AsArray(p.ctxStore, curPart.EarlyTerminated) + if err != nil { + return nil, err + } + + expired := abi.NewBitField() + var bf *abi.BitField + if err := terminatedEarlyArr.ForEach(bf, func(i int64) error { + // expired = all removals - termination + expirations, err := bitfield.SubtractBitField(allRemovedSectors, bf) + if err != nil { + return err + } + // merge with expired sectors from other epochs + expired, err = bitfield.MergeBitFields(expirations, expired) + if err != nil { + return nil + } + return nil + }); err != nil { + return nil, err + } + + // terminated = all removals - expired + terminated, err := bitfield.SubtractBitField(allRemovedSectors, expired) + if err != nil { + return nil, err + } + + // faults in current but not previous + faults, err := bitfield.SubtractBitField(curPart.Recoveries, prevPart.Recoveries) + if err != nil { + return nil, err + } + + // recoveries in current but not previous + inRecovery, err := bitfield.SubtractBitField(curPart.Recoveries, prevPart.Recoveries) + if err != nil { + return nil, err + } + + // all current good sectors + newActiveSectors, err := curPart.ActiveSectors() + if err != nil { + return nil, err + } + + // sectors that were previously fault and are now currently active are considered recovered. + recovered, err := bitfield.IntersectBitField(prevPart.Faults, newActiveSectors) + if err != nil { + return nil, err + } + + return &PartitionStatus{ + Terminated: terminated, + Expired: expired, + Faulted: faults, + InRecovery: inRecovery, + Recovered: recovered, + }, nil +} + +func (p *Processor) storeMinersActorInfoState(ctx context.Context, miners []minerActorInfo) error { start := time.Now() defer func() { log.Debugw("Stored Miners Actor State", "duration", time.Since(start).String()) @@ -286,37 +847,31 @@ func (p *Processor) storeMinersActorState(miners []minerActorInfo) error { return xerrors.Errorf("prep temp: %w", err) } - stmt, err := tx.Prepare(`copy mi (miner_id, owner_addr, worker_addr, peer_id, sector_size, precommit_deposits, locked_funds, next_deadline_process_faults) from STDIN`) + stmt, err := tx.Prepare(`copy mi (miner_id, owner_addr, worker_addr, peer_id, sector_size) from STDIN`) if err != nil { return err } - // TODO: Consume new Actor API - //for _, m := range miners { - //var pid string - //if len(m.state.Info.PeerId) != 0 { - //peerid, err := peer.IDFromBytes(m.state.Info.PeerId) - //if err != nil { - //// this should "never happen", but if it does we should still store info about the miner. - //log.Warnw("failed to decode peerID", "peerID (bytes)", m.state.Info.PeerId, "miner", m.common.addr, "tipset", m.common.tsKey.String()) - //} else { - //pid = peerid.String() - //} - //} - //if _, err := stmt.Exec( - //m.common.addr.String(), - //m.state.Info.Owner.String(), - //m.state.Info.Worker.String(), - //pid, - //m.state.Info.SectorSize.ShortString(), - //m.state.PreCommitDeposits.String(), - //m.state.LockedFunds.String(), - //m.state.NextDeadlineToProcessFaults, - //); err != nil { - //log.Errorw("failed to store miner state", "state", m.state, "info", m.state.Info, "error", err) - //return xerrors.Errorf("failed to store miner state: %w", err) - //} + for _, m := range miners { + mi, err := p.node.StateMinerInfo(ctx, m.common.addr, m.common.tsKey) + if err != nil { + if strings.Contains(err.Error(), "address not found") { + continue + } else { + return err + } + } + if _, err := stmt.Exec( + m.common.addr.String(), + mi.Owner.String(), + mi.Worker.String(), + mi.PeerId.String(), + mi.SectorSize.ShortString(), + ); err != nil { + log.Errorw("failed to store miner state", "state", m.state, "info", m.state.Info, "error", err) + return xerrors.Errorf("failed to store miner state: %w", err) + } - //} + } if err := stmt.Close(); err != nil { return err } @@ -375,409 +930,6 @@ func (p *Processor) storeMinersPower(miners []minerActorInfo) error { } -func (p *Processor) storeMinersSectorState(ctx context.Context, miners []minerActorInfo) error { - start := time.Now() - defer func() { - log.Debugw("Stored Miners Sector State", "duration", time.Since(start).String()) - }() - - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(`create temp table ms (like miner_sectors excluding constraints) on commit drop;`); err != nil { - return xerrors.Errorf("prep temp: %w", err) - } - - stmt, err := tx.Prepare(`copy ms (miner_id, sector_id, activation_epoch, expiration_epoch, deal_weight, verified_deal_weight, seal_cid, seal_rand_epoch) from STDIN`) - if err != nil { - return err - } - - grp, ctx := errgroup.WithContext(ctx) - for _, m := range miners { - m := m - grp.Go(func() error { - sectors, err := p.node.StateMinerSectors(ctx, m.common.addr, nil, true, m.common.tsKey) - if err != nil { - log.Debugw("Failed to load sectors", "tipset", m.common.tsKey.String(), "miner", m.common.addr.String(), "error", err) - } - - for _, sector := range sectors { - if _, err := stmt.Exec( - m.common.addr.String(), - uint64(sector.ID), - // TODO: Consume new Actor API - //int64(sector.Info.ActivationEpoch), - 0, - //int64(sector.Info.Info.Expiration), - 0, - sector.Info.DealWeight.String(), - sector.Info.VerifiedDealWeight.String(), - //sector.Info.Info.SealedCID.String(), - "", - //int64(sector.Info.Info.SealRandEpoch), - 0, - ); err != nil { - return err - } - } - return nil - }) - } - - if err := grp.Wait(); err != nil { - return err - } - - if err := stmt.Close(); err != nil { - return err - } - - if _, err := tx.Exec(`insert into miner_sectors select * from ms on conflict do nothing `); err != nil { - return xerrors.Errorf("actor put: %w", err) - } - - return tx.Commit() -} - -func (p *Processor) storeMinersSectorHeads(miners []minerActorInfo) error { - start := time.Now() - defer func() { - log.Debugw("Stored Miners Sector Heads", "duration", time.Since(start).String()) - }() - - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(`create temp table msh (like miner_sectors_heads excluding constraints) on commit drop;`); err != nil { - return xerrors.Errorf("prep temp: %w", err) - } - - stmt, err := tx.Prepare(`copy msh (miner_id, miner_sectors_cid, state_root) from STDIN`) - if err != nil { - return err - } - - for _, m := range miners { - if _, err := stmt.Exec( - m.common.addr.String(), - m.state.Sectors.String(), - m.common.stateroot.String(), - ); err != nil { - log.Errorw("failed to store miners sectors head", "state", m.state, "info", m.state.Info, "error", err) - return err - } - - } - if err := stmt.Close(); err != nil { - return err - } - - if _, err := tx.Exec(`insert into miner_sectors_heads select * from msh on conflict do nothing `); err != nil { - return xerrors.Errorf("actor put: %w", err) - } - - return tx.Commit() -} - -func (p *Processor) storeMinersPreCommitState(ctx context.Context, miners []minerActorInfo) error { - start := time.Now() - defer func() { - log.Infow("Stored Miners Precommit State", "duration", time.Since(start).String()) - }() - - precommitTx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := precommitTx.Exec(`create temp table mp (like miner_precommits excluding constraints) on commit drop;`); err != nil { - return xerrors.Errorf("prep temp: %w", err) - } - - precommitStmt, err := precommitTx.Prepare(`copy mp (miner_id, sector_id, precommit_deposit, precommit_epoch) from STDIN`) - if err != nil { - return err - } - - for _, m := range miners { - m := m - pcMap, err := adt.AsMap(cw_util.NewAPIIpldStore(ctx, p.node), m.state.PreCommittedSectors) - if err != nil { - return err - } - precommit := new(miner.SectorPreCommitOnChainInfo) - if err := pcMap.ForEach(precommit, func(key string) error { - if _, err := precommitStmt.Exec( - m.common.addr.String(), - precommit.Info.SectorNumber, - precommit.PreCommitDeposit.String(), - precommit.PreCommitEpoch, - ); err != nil { - return err - } - - return nil - }); err != nil { - return err - } - } - if err := precommitStmt.Close(); err != nil { - return err - } - if _, err := precommitTx.Exec(`insert into miner_precommits select * from mp on conflict do nothing`); err != nil { - return err - } - - return precommitTx.Commit() -} - -func (p *Processor) updateMiners(ctx context.Context, miners []minerActorInfo) error { - // TODO when/if there is more than one update operation here use an errgroup as is done in persistMiners - if err := p.updateMinersSectors(ctx, miners); err != nil { - return err - } - - if err := p.updateMinersPrecommits(ctx, miners); err != nil { - return err - } - return nil -} - -func (p *Processor) updateMinersPrecommits(ctx context.Context, miners []minerActorInfo) error { - start := time.Now() - defer func() { - log.Infow("Updated Miner Precommits", "duration", time.Since(start).String()) - }() - - pred := state.NewStatePredicates(p.node) - - eventTx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := eventTx.Exec(`create temp table mse (like miner_sector_events excluding constraints) on commit drop;`); err != nil { - return xerrors.Errorf("prep temp: %w", err) - } - - eventStmt, err := eventTx.Prepare(`copy mse (sector_id, event, miner_id, state_root) from STDIN `) - if err != nil { - return err - } - - for _, m := range miners { - pcDiffFn := pred.OnMinerActorChange(m.common.addr, pred.OnMinerPreCommitChange()) - changed, val, err := pcDiffFn(ctx, m.common.parentTsKey, m.common.tsKey) - if err != nil { - if strings.Contains(err.Error(), "address not found") { - continue - } - log.Errorw("error getting miner precommit diff", "miner", m.common.addr, "error", err) - return err - } - if !changed { - continue - } - changes, ok := val.(*state.MinerPreCommitChanges) - if !ok { - log.Fatal("Developer Error") - } - for _, added := range changes.Added { - if _, err := eventStmt.Exec(added.Info.SectorNumber, "PRECOMMIT", m.common.addr.String(), m.common.stateroot.String()); err != nil { - return err - } - } - } - - if err := eventStmt.Close(); err != nil { - return err - } - - if _, err := eventTx.Exec(`insert into miner_sector_events select * from mse on conflict do nothing `); err != nil { - return xerrors.Errorf("actor put: %w", err) - } - - return eventTx.Commit() -} - -func (p *Processor) updateMinersSectors(ctx context.Context, miners []minerActorInfo) error { - log.Debugw("Updating Miners Sectors", "#miners", len(miners)) - start := time.Now() - defer func() { - log.Debugw("Updated Miners Sectors", "duration", time.Since(start).String()) - }() - - //pred := state.NewStatePredicates(p.node) - - eventTx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := eventTx.Exec(`create temp table mse (like miner_sector_events excluding constraints) on commit drop;`); err != nil { - return xerrors.Errorf("prep temp: %w", err) - } - - eventStmt, err := eventTx.Prepare(`copy mse (sector_id, event, miner_id, state_root) from STDIN `) - if err != nil { - return err - } - - var updateWg sync.WaitGroup - updateWg.Add(1) - sectorUpdatesCh := make(chan sectorUpdate) - var sectorUpdates []sectorUpdate - go func() { - for u := range sectorUpdatesCh { - sectorUpdates = append(sectorUpdates, u) - } - updateWg.Done() - }() - - // TODO: Resolve Actor interface shift - //minerGrp, ctx := errgroup.WithContext(ctx) - //complete := 0 - //for _, m := range miners { - // m := m - // if m.common.tsKey == p.genesisTs.Key() { - // genSectors, err := p.node.StateMinerSectors(ctx, m.common.addr, nil, true, p.genesisTs.Key()) - // if err != nil { - // return err - // } - // for _, sector := range genSectors { - // if _, err := eventStmt.Exec(sector.ID, "COMMIT", m.common.addr.String(), m.common.stateroot.String()); err != nil { - // return err - // } - // } - // complete++ - // continue - // } - // minerGrp.Go(func() error { - // // special case genesis miners - // sectorDiffFn := pred.OnMinerActorChange(m.common.addr, pred.OnMinerSectorChange()) - // changed, val, err := sectorDiffFn(ctx, m.common.parentTsKey, m.common.tsKey) - // if err != nil { - // if strings.Contains(err.Error(), "address not found") { - // return nil - // } - // log.Errorw("error getting miner sector diff", "miner", m.common.addr, "error", err) - // return err - // } - // if !changed { - // complete++ - // return nil - // } - // changes, ok := val.(*state.MinerSectorChanges) - // if !ok { - // log.Fatalw("Developer Error") - // } - // log.Debugw("sector changes for miner", "miner", m.common.addr.String(), "Added", len(changes.Added), "Extended", len(changes.Extended), "Removed", len(changes.Removed), "oldState", m.common.parentTsKey, "newState", m.common.tsKey) - - //for _, extended := range changes.Extended { - //if _, err := eventStmt.Exec(extended.To.Info.SectorNumber, "EXTENDED", m.common.addr.String(), m.common.stateroot.String()); err != nil { - //return err - //} - //sectorUpdatesCh <- sectorUpdate{ - //terminationEpoch: 0, - //terminated: false, - //expirationEpoch: extended.To.Info.Expiration, - //sectorID: extended.From.Info.SectorNumber, - //minerID: m.common.addr, - //} - - //log.Debugw("sector extended", "miner", m.common.addr.String(), "sector", extended.To.Info.SectorNumber, "old", extended.To.Info.Expiration, "new", extended.From.Info.Expiration) - //} - //curTs, err := p.node.ChainGetTipSet(ctx, m.common.tsKey) - //if err != nil { - //return err - //} - - //for _, removed := range changes.Removed { - //log.Debugw("removed", "miner", m.common.addr) - //// decide if they were terminated or extended - //if removed.Info.Expiration > curTs.Height() { - //if _, err := eventStmt.Exec(removed.Info.SectorNumber, "TERMINATED", m.common.addr.String(), m.common.stateroot.String()); err != nil { - //return err - //} - //log.Debugw("sector terminated", "miner", m.common.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "terminationEpoch", curTs.Height()) - //sectorUpdatesCh <- sectorUpdate{ - //terminationEpoch: curTs.Height(), - //terminated: true, - //expirationEpoch: removed.Info.Expiration, - //sectorID: removed.Info.SectorNumber, - //minerID: m.common.addr, - //} - - //} - //if _, err := eventStmt.Exec(removed.Info.SectorNumber, "EXPIRED", m.common.addr.String(), m.common.stateroot.String()); err != nil { - //return err - //} - //log.Debugw("sector removed", "miner", m.common.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "currEpoch", curTs.Height()) - //} - - // for _, added := range changes.Added { - // if _, err := eventStmt.Exec(added.Info.SectorNumber, "COMMIT", m.common.addr.String(), m.common.stateroot.String()); err != nil { - // return err - // } - // } - // complete++ - // log.Debugw("Update Done", "complete", complete, "added", len(changes.Added), "removed", len(changes.Removed), "modified", len(changes.Extended)) - // return nil - // }) - //} - //if err := minerGrp.Wait(); err != nil { - // return err - //} - close(sectorUpdatesCh) - // wait for the update channel to be drained - updateWg.Wait() - - if err := eventStmt.Close(); err != nil { - return err - } - - if _, err := eventTx.Exec(`insert into miner_sector_events select * from mse on conflict do nothing `); err != nil { - return xerrors.Errorf("actor put: %w", err) - } - - if err := eventTx.Commit(); err != nil { - return err - } - - updateTx, err := p.db.Begin() - if err != nil { - return err - } - - updateStmt, err := updateTx.Prepare(`UPDATE miner_sectors SET termination_epoch=$1, expiration_epoch=$2 WHERE miner_id=$3 AND sector_id=$4`) - if err != nil { - return err - } - - for _, update := range sectorUpdates { - if update.terminated { - if _, err := updateStmt.Exec(update.terminationEpoch, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil { - return err - } - } else { - if _, err := updateStmt.Exec(nil, update.expirationEpoch, update.minerID.String(), update.sectorID); err != nil { - return err - } - } - } - - if err := updateStmt.Close(); err != nil { - return err - } - - return updateTx.Commit() -} - // load the power actor state clam as an adt.Map at the tipset `ts`. func getPowerActorClaimsMap(ctx context.Context, api api.FullNode, ts types.TipSetKey) (*adt.Map, error) { powerActor, err := api.StateGetActor(ctx, builtin.StoragePowerActorAddr, ts) diff --git a/cmd/lotus-chainwatch/processor/processor.go b/cmd/lotus-chainwatch/processor/processor.go index ef43601ec..8c578db37 100644 --- a/cmd/lotus-chainwatch/processor/processor.go +++ b/cmd/lotus-chainwatch/processor/processor.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "encoding/json" + "math" "sync" "time" @@ -19,6 +20,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" + cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util" "github.com/filecoin-project/lotus/lib/parmap" ) @@ -27,7 +29,8 @@ var log = logging.Logger("processor") type Processor struct { db *sql.DB - node api.FullNode + node api.FullNode + ctxStore *cw_util.APIIpldStore genesisTs *types.TipSet @@ -50,11 +53,13 @@ type actorInfo struct { state string } -func NewProcessor(db *sql.DB, node api.FullNode, batch int) *Processor { +func NewProcessor(ctx context.Context, db *sql.DB, node api.FullNode, batch int) *Processor { + ctxStore := cw_util.NewAPIIpldStore(ctx, node) return &Processor{ - db: db, - node: node, - batch: batch, + db: db, + ctxStore: ctxStore, + node: node, + batch: batch, } } @@ -102,17 +107,18 @@ func (p *Processor) Start(ctx context.Context) { for { select { case <-ctx.Done(): - log.Debugw("Stopping Processor...") + log.Info("Stopping Processor...") return default: + loopStart := time.Now() toProcess, err := p.unprocessedBlocks(ctx, p.batch) if err != nil { log.Fatalw("Failed to get unprocessed blocks", "error", err) } if len(toProcess) == 0 { - log.Debugw("No unprocessed blocks. Wait then try again...") - time.Sleep(time.Second * 10) + log.Info("No unprocessed blocks. Wait then try again...") + time.Sleep(time.Second * 30) continue } @@ -123,6 +129,12 @@ func (p *Processor) Start(ctx context.Context) { if err != nil { log.Fatalw("Failed to collect actor changes", "error", err) } + log.Infow("Collected Actor Changes", + "MarketChanges", len(actorChanges[builtin.StorageMarketActorCodeID]), + "MinerChanges", len(actorChanges[builtin.StorageMinerActorCodeID]), + "RewardChanges", len(actorChanges[builtin.RewardActorCodeID]), + "AccountChanges", len(actorChanges[builtin.AccountActorCodeID]), + "nullRounds", len(nullRounds)) grp, ctx := errgroup.WithContext(ctx) @@ -130,6 +142,7 @@ func (p *Processor) Start(ctx context.Context) { if err := p.HandleMarketChanges(ctx, actorChanges[builtin.StorageMarketActorCodeID]); err != nil { return xerrors.Errorf("Failed to handle market changes: %w", err) } + log.Info("Processed Market Changes") return nil }) @@ -137,6 +150,7 @@ func (p *Processor) Start(ctx context.Context) { if err := p.HandleMinerChanges(ctx, actorChanges[builtin.StorageMinerActorCodeID]); err != nil { return xerrors.Errorf("Failed to handle miner changes: %w", err) } + log.Info("Processed Miner Changes") return nil }) @@ -144,6 +158,7 @@ func (p *Processor) Start(ctx context.Context) { if err := p.HandleRewardChanges(ctx, actorChanges[builtin.RewardActorCodeID], nullRounds); err != nil { return xerrors.Errorf("Failed to handle reward changes: %w", err) } + log.Info("Processed Reward Changes") return nil }) @@ -151,6 +166,7 @@ func (p *Processor) Start(ctx context.Context) { if err := p.HandleMessageChanges(ctx, toProcess); err != nil { return xerrors.Errorf("Failed to handle message changes: %w", err) } + log.Info("Processed Message Changes") return nil }) @@ -158,6 +174,7 @@ func (p *Processor) Start(ctx context.Context) { if err := p.HandleCommonActorsChanges(ctx, actorChanges); err != nil { return xerrors.Errorf("Failed to handle common actor changes: %w", err) } + log.Info("Processed CommonActor Changes") return nil }) @@ -173,6 +190,7 @@ func (p *Processor) Start(ctx context.Context) { if err := p.refreshViews(); err != nil { log.Errorw("Failed to refresh views", "error", err) } + log.Infow("Processed Batch", "duration", time.Since(loopStart).String()) } } }() @@ -184,10 +202,6 @@ func (p *Processor) refreshViews() error { return err } - if _, err := p.db.Exec(`refresh materialized view miner_sectors_view`); err != nil { - return err - } - return nil } @@ -239,6 +253,15 @@ func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.C act := act a := a + // ignore actors that were deleted. + has, err := p.node.ChainHasObj(ctx, act.Head) + if err != nil { + log.Fatal(err) + } + if !has { + continue + } + addr, err := address.NewFromString(a) if err != nil { log.Fatal(err.Error()) @@ -300,6 +323,8 @@ where rnk <= $1 } out := map[cid.Cid]*types.BlockHeader{} + minBlock := abi.ChainEpoch(math.MaxInt64) + maxBlock := abi.ChainEpoch(0) // TODO consider parallel execution here for getting the blocks from the api as is done in fetchMessages() for rows.Next() { if rows.Err() != nil { @@ -319,14 +344,23 @@ where rnk <= $1 return nil, xerrors.Errorf("Failed to get block header %s: %w", ci.String(), err) } out[ci] = bh + if bh.Height < minBlock { + minBlock = bh.Height + } + if bh.Height > maxBlock { + maxBlock = bh.Height + } } + log.Infow("Gathered Blocks to Process", "start", minBlock, "end", maxBlock) return out, rows.Close() } func (p *Processor) markBlocksProcessed(ctx context.Context, processed map[cid.Cid]*types.BlockHeader) error { start := time.Now() + processedHeight := abi.ChainEpoch(0) defer func() { log.Debugw("Marked blocks as Processed", "duration", time.Since(start).String()) + log.Infow("Processed Blocks", "height", processedHeight) }() tx, err := p.db.Begin() if err != nil { @@ -339,7 +373,10 @@ func (p *Processor) markBlocksProcessed(ctx context.Context, processed map[cid.C return err } - for c := range processed { + for c, bh := range processed { + if bh.Height > processedHeight { + processedHeight = bh.Height + } if _, err := stmt.Exec(processedAt, c.String()); err != nil { return err } diff --git a/cmd/lotus-chainwatch/run.go b/cmd/lotus-chainwatch/run.go index 0ed08cf8d..f5f1afa53 100644 --- a/cmd/lotus-chainwatch/run.go +++ b/cmd/lotus-chainwatch/run.go @@ -73,7 +73,7 @@ var runCmd = &cli.Command{ sync := syncer.NewSyncer(db, api) sync.Start(ctx) - proc := processor.NewProcessor(db, api, maxBatch) + proc := processor.NewProcessor(ctx, db, api, maxBatch) proc.Start(ctx) sched := scheduler.PrepareScheduler(db)