From 7ff468ce85b2691ec0b58c2ca1b6e6f11087bf23 Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 15 Jul 2020 17:22:48 -0700 Subject: [PATCH] feat: track miner precommit - add materalized view showing all miner sector info --- chain/events/state/diff_adt.go | 51 +++++ chain/events/state/predicates.go | 76 +++++++- cmd/lotus-chainwatch/processor/miner.go | 200 +++++++++++++++++--- cmd/lotus-chainwatch/processor/processor.go | 12 ++ 4 files changed, 313 insertions(+), 26 deletions(-) diff --git a/chain/events/state/diff_adt.go b/chain/events/state/diff_adt.go index a3603be42..9624aaad1 100644 --- a/chain/events/state/diff_adt.go +++ b/chain/events/state/diff_adt.go @@ -53,3 +53,54 @@ func DiffAdtArray(preArr, curArr *adt.Array, out AdtArrayDiff) error { return out.Add(uint64(i), curVal) }) } + +// TODO Performance can be improved by diffing the underlying IPLD graph, e.g. https://github.com/ipfs/go-merkledag/blob/749fd8717d46b4f34c9ce08253070079c89bc56d/dagutils/diff.go#L104 +// CBOR Marshaling will likely be the largest performance bottleneck here. + +// AdtMapDiff generalizes adt.Map diffing by accepting a Deferred type that can unmarshalled to its corresponding struct +// in an interface implantation. +// AsKey should return the Keyer implementation specific to the map +// Add should be called when a new k,v is added to the map +// Modify should be called when a value is modified in the map +// Remove should be called when a value is removed from the map +type AdtMapDiff interface { + AsKey(key string) (adt.Keyer, error) + Add(key string, val *typegen.Deferred) error + Modify(key string, from, to *typegen.Deferred) error + Remove(key string, val *typegen.Deferred) error +} + +func DiffAdtMap(preMap, curMap *adt.Map, out AdtMapDiff) error { + prevVal := new(typegen.Deferred) + if err := preMap.ForEach(prevVal, func(key string) error { + curVal := new(typegen.Deferred) + k, err := out.AsKey(key) + if err != nil { + return err + } + + found, err := curMap.Get(k, curVal) + if err != nil { + return err + } + if !found { + if err := out.Remove(key, prevVal); err != nil { + return err + } + return nil + } + + if err := out.Modify(key, prevVal, curVal); err != nil { + return err + } + + return curMap.Delete(k) + }); err != nil { + return err + } + + curVal := new(typegen.Deferred) + return curMap.ForEach(curVal, func(key string) error { + return out.Add(key, curVal) + }) +} diff --git a/chain/events/state/predicates.go b/chain/events/state/predicates.go index 16abc4928..df0210f44 100644 --- a/chain/events/state/predicates.go +++ b/chain/events/state/predicates.go @@ -3,7 +3,6 @@ package state import ( "bytes" "context" - "github.com/filecoin-project/go-address" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" @@ -414,3 +413,78 @@ func (sp *StatePredicates) OnMinerSectorChange() DiffMinerActorStateFunc { return true, sectorChanges, nil } } + +type MinerPreCommitChanges struct { + Added []miner.SectorPreCommitOnChainInfo + Removed []miner.SectorPreCommitOnChainInfo +} + +func (m *MinerPreCommitChanges) AsKey(key string) (adt.Keyer, error) { + sector, err := adt.ParseUIntKey(key) + if err != nil { + return nil, err + } + return miner.SectorKey(abi.SectorNumber(sector)), nil +} + +func (m *MinerPreCommitChanges) Add(key string, val *typegen.Deferred) error { + sp := new(miner.SectorPreCommitOnChainInfo) + err := sp.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + m.Added = append(m.Added, *sp) + return nil +} + +func (m *MinerPreCommitChanges) Modify(key string, from, to *typegen.Deferred) error { + return nil +} + +func (m *MinerPreCommitChanges) Remove(key string, val *typegen.Deferred) error { + sp := new(miner.SectorPreCommitOnChainInfo) + err := sp.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + m.Removed = append(m.Removed, *sp) + return nil +} + +func (sp *StatePredicates) OnMinerPreCommitChange() DiffMinerActorStateFunc { + return func(ctx context.Context, oldState, newState *miner.State) (changed bool, user UserData, err error) { + ctxStore := &contextStore{ + ctx: ctx, + cst: sp.cst, + } + + precommitChanges := &MinerPreCommitChanges{ + Added: []miner.SectorPreCommitOnChainInfo{}, + Removed: []miner.SectorPreCommitOnChainInfo{}, + } + + if oldState.PreCommittedSectors.Equals(newState.PreCommittedSectors) { + return false, nil, nil + } + + oldPrecommits, err := adt.AsMap(ctxStore, oldState.PreCommittedSectors) + if err != nil { + return false, nil, err + } + + newPrecommits, err := adt.AsMap(ctxStore, newState.PreCommittedSectors) + if err != nil { + return false, nil, err + } + + if err := DiffAdtMap(oldPrecommits, newPrecommits, precommitChanges); err != nil { + return false, nil, err + } + + if len(precommitChanges.Added)+len(precommitChanges.Removed) == 0 { + return false, nil, nil + } + + return true, precommitChanges, nil + } +} diff --git a/cmd/lotus-chainwatch/processor/miner.go b/cmd/lotus-chainwatch/processor/miner.go index a76f88dfc..7b0801dfd 100644 --- a/cmd/lotus-chainwatch/processor/miner.go +++ b/cmd/lotus-chainwatch/processor/miner.go @@ -34,25 +34,6 @@ func (p *Processor) setupMiners() error { } if _, err := tx.Exec(` -create table if not exists miner_sectors -( - miner_id text not null, - sector_id bigint not null, - - activation_epoch bigint not null, - expiration_epoch bigint not null, - termination_epoch bigint, - - deal_weight text not null, - verified_deal_weight text not null, - seal_cid text not null, - seal_rand_epoch bigint not null, - constraint miner_sectors_pk - primary key (miner_id, sector_id) -); - -create index if not exists miner_sectors_miner_sectorid_index - on miner_sectors (miner_id, sector_id); create table if not exists miner_info ( @@ -82,6 +63,35 @@ create table if not exists miner_power primary key (miner_id, state_root) ); +create table if not exists miner_precommits +( + miner_id text not null, + sector_id bigint not null, + + precommit_deposit text not null, + precommit_epoch text not null, + constraint miner_precommits_pk + primary key (miner_id, sector_id) + +); + +create table if not exists miner_sectors +( + miner_id text not null, + sector_id bigint not null, + + activation_epoch bigint not null, + expiration_epoch bigint not null, + termination_epoch bigint, + + deal_weight text not null, + verified_deal_weight text not null, + seal_cid text not null, + seal_rand_epoch bigint not null, + constraint miner_sectors_pk + primary key (miner_id, sector_id) +); + /* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */ create table if not exists miner_sectors_heads ( @@ -95,12 +105,13 @@ create table if not exists miner_sectors_heads ); + DO $$ BEGIN IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'miner_sector_event_type') THEN CREATE TYPE miner_sector_event_type AS ENUM ( - 'ADDED','EXTENDED', 'EXPIRED', 'TERMINATED' + 'PRECOMMIT', 'COMMIT', 'EXTENDED', 'EXPIRED', 'TERMINATED' ); END IF; END$$; @@ -114,7 +125,13 @@ create table if not exists miner_sector_events constraint miner_sector_events_pk primary key (sector_id, event, miner_id, state_root) -) +); + +create materialized view if not exists miner_sectors_view as +select ms.miner_id, ms.sector_id, mp.precommit_epoch, ms.activation_epoch, ms.expiration_epoch, ms.termination_epoch, ms.deal_weight, ms.verified_deal_weight +from miner_sectors ms +left join miner_precommits mp on ms.sector_id = mp.sector_id and ms.miner_id = mp.miner_id + `); err != nil { return err } @@ -228,7 +245,7 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) }) grp.Go(func() error { - if err := p.storeMinersSectorState(miners); err != nil { + if err := p.storeMinersSectorState(ctx, miners); err != nil { return err } return nil @@ -241,6 +258,13 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) return nil }) + grp.Go(func() error { + if err := p.storeMinersPreCommitState(ctx, miners); err != nil { + return err + } + return nil + }) + return grp.Wait() } @@ -347,7 +371,7 @@ func (p *Processor) storeMinersPower(miners []minerActorInfo) error { } -func (p *Processor) storeMinersSectorState(miners []minerActorInfo) error { +func (p *Processor) storeMinersSectorState(ctx context.Context, miners []minerActorInfo) error { start := time.Now() defer func() { log.Infow("Stored Miners Sector State", "duration", time.Since(start).String()) @@ -367,7 +391,7 @@ func (p *Processor) storeMinersSectorState(miners []minerActorInfo) error { return err } - grp, ctx := errgroup.WithContext(context.TODO()) + grp, ctx := errgroup.WithContext(ctx) for _, m := range miners { m := m grp.Go(func() error { @@ -451,14 +475,127 @@ func (p *Processor) storeMinersSectorHeads(miners []minerActorInfo) error { return tx.Commit() } +func (p *Processor) storeMinersPreCommitState(ctx context.Context, miners []minerActorInfo) error { + start := time.Now() + defer func() { + log.Infow("Stored Miners Precommit State", "duration", time.Since(start).String()) + }() + + precommitTx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := precommitTx.Exec(`create temp table mp (like miner_precommits excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + precommitStmt, err := precommitTx.Prepare(`copy mp (miner_id, sector_id, precommit_deposit, precommit_epoch) from STDIN`) + if err != nil { + return err + } + + for _, m := range miners { + m := m + pcMap, err := adt.AsMap(cw_util.NewAPIIpldStore(ctx, p.node), m.state.PreCommittedSectors) + if err != nil { + return err + } + precommit := new(miner.SectorPreCommitOnChainInfo) + if err := pcMap.ForEach(precommit, func(key string) error { + if _, err := precommitStmt.Exec( + m.common.addr.String(), + precommit.Info.SectorNumber, + precommit.PreCommitDeposit.String(), + precommit.PreCommitEpoch, + ); err != nil { + return err + } + + return nil + }); err != nil { + return err + } + } + if err := precommitStmt.Close(); err != nil { + return err + } + if _, err := precommitTx.Exec(`insert into miner_precommits select * from mp on conflict do nothing`); err != nil { + return err + } + + return precommitTx.Commit() +} + func (p *Processor) updateMiners(ctx context.Context, miners []minerActorInfo) error { // TODO when/if there is more than one update operation here use an errgroup as is done in persistMiners if err := p.updateMinersSectors(ctx, miners); err != nil { return err } + + if err := p.updateMinersPrecommits(ctx, miners); err != nil { + return err + } return nil } +func (p *Processor) updateMinersPrecommits(ctx context.Context, miners []minerActorInfo) error { + start := time.Now() + defer func() { + log.Infow("Updated Miner Precommits", "duration", time.Since(start).String()) + }() + + pred := state.NewStatePredicates(p.node) + + eventTx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := eventTx.Exec(`create temp table mse (like miner_sector_events excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + eventStmt, err := eventTx.Prepare(`copy mse (sector_id, event, miner_id, state_root) from STDIN `) + if err != nil { + return err + } + + for _, m := range miners { + pcDiffFn := pred.OnMinerActorChange(m.common.addr, pred.OnMinerPreCommitChange()) + changed, val, err := pcDiffFn(ctx, m.common.parentTsKey, m.common.tsKey) + if err != nil { + if strings.Contains(err.Error(), "address not found") { + continue + } + log.Errorw("error getting miner precommit diff", "miner", m.common.addr, "error", err) + return err + } + if !changed { + continue + } + changes, ok := val.(*state.MinerPreCommitChanges) + if !ok { + log.Fatal("Developer Error") + } + for _, added := range changes.Added { + if _, err := eventStmt.Exec(added.Info.SectorNumber, "PRECOMMIT", m.common.addr.String(), m.common.stateroot.String()); err != nil { + return err + } + } + } + + if err := eventStmt.Close(); err != nil { + return err + } + + if _, err := eventTx.Exec(`insert into miner_sector_events select * from mse on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + return eventTx.Commit() +} + func (p *Processor) updateMinersSectors(ctx context.Context, miners []minerActorInfo) error { log.Infow("Updating Miners Sectors", "#miners", len(miners)) start := time.Now() @@ -497,6 +634,19 @@ func (p *Processor) updateMinersSectors(ctx context.Context, miners []minerActor complete := 0 for _, m := range miners { m := m + if m.common.tsKey == p.genesisTs.Key() { + genSectors, err := p.node.StateMinerSectors(ctx, m.common.addr, nil, true, p.genesisTs.Key()) + if err != nil { + return err + } + for _, sector := range genSectors { + if _, err := eventStmt.Exec(sector.ID, "COMMIT", m.common.addr.String(), m.common.stateroot.String()); err != nil { + return err + } + } + complete++ + continue + } minerGrp.Go(func() error { // special case genesis miners sectorDiffFn := pred.OnMinerActorChange(m.common.addr, pred.OnMinerSectorChange()) @@ -561,7 +711,7 @@ func (p *Processor) updateMinersSectors(ctx context.Context, miners []minerActor } for _, added := range changes.Added { - if _, err := eventStmt.Exec(added.Info.SectorNumber, "ADDED", m.common.addr.String(), m.common.stateroot.String()); err != nil { + if _, err := eventStmt.Exec(added.Info.SectorNumber, "COMMIT", m.common.addr.String(), m.common.stateroot.String()); err != nil { return err } } diff --git a/cmd/lotus-chainwatch/processor/processor.go b/cmd/lotus-chainwatch/processor/processor.go index e44172822..732d30b2a 100644 --- a/cmd/lotus-chainwatch/processor/processor.go +++ b/cmd/lotus-chainwatch/processor/processor.go @@ -29,6 +29,8 @@ type Processor struct { node api.FullNode + genesisTs *types.TipSet + // number of blocks processed at a time batch int } @@ -87,6 +89,12 @@ func (p *Processor) Start(ctx context.Context) { log.Fatalw("Failed to setup processor", "error", err) } + var err error + p.genesisTs, err = p.node.ChainGetGenesis(ctx) + if err != nil { + log.Fatalw("Failed to get genesis state from lotus", "error", err.Error()) + } + go p.subMpool(ctx) // main processor loop @@ -170,6 +178,10 @@ func (p *Processor) refreshViews() error { return err } + if _, err := p.db.Exec(`refresh materialized view miner_sectors_view`); err != nil { + return err + } + return nil }