Merge remote-tracking branch 'origin/master' into next

This commit is contained in:
Łukasz Magiera 2020-07-18 00:43:08 +02:00
commit 1a1bd38495
4 changed files with 346 additions and 56 deletions

View File

@ -53,3 +53,54 @@ func DiffAdtArray(preArr, curArr *adt.Array, out AdtArrayDiff) error {
return out.Add(uint64(i), curVal)
})
}
// TODO Performance can be improved by diffing the underlying IPLD graph, e.g. https://github.com/ipfs/go-merkledag/blob/749fd8717d46b4f34c9ce08253070079c89bc56d/dagutils/diff.go#L104
// CBOR Marshaling will likely be the largest performance bottleneck here.
// AdtMapDiff generalizes adt.Map diffing by accepting a Deferred type that can unmarshalled to its corresponding struct
// in an interface implantation.
// AsKey should return the Keyer implementation specific to the map
// Add should be called when a new k,v is added to the map
// Modify should be called when a value is modified in the map
// Remove should be called when a value is removed from the map
type AdtMapDiff interface {
AsKey(key string) (adt.Keyer, error)
Add(key string, val *typegen.Deferred) error
Modify(key string, from, to *typegen.Deferred) error
Remove(key string, val *typegen.Deferred) error
}
func DiffAdtMap(preMap, curMap *adt.Map, out AdtMapDiff) error {
prevVal := new(typegen.Deferred)
if err := preMap.ForEach(prevVal, func(key string) error {
curVal := new(typegen.Deferred)
k, err := out.AsKey(key)
if err != nil {
return err
}
found, err := curMap.Get(k, curVal)
if err != nil {
return err
}
if !found {
if err := out.Remove(key, prevVal); err != nil {
return err
}
return nil
}
if err := out.Modify(key, prevVal, curVal); err != nil {
return err
}
return curMap.Delete(k)
}); err != nil {
return err
}
curVal := new(typegen.Deferred)
return curMap.ForEach(curVal, func(key string) error {
return out.Add(key, curVal)
})
}

View File

@ -3,7 +3,6 @@ package state
import (
"bytes"
"context"
"github.com/filecoin-project/go-address"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
@ -419,3 +418,78 @@ func (sp *StatePredicates) OnMinerSectorChange() DiffMinerActorStateFunc {
return true, sectorChanges, nil
}
}
type MinerPreCommitChanges struct {
Added []miner.SectorPreCommitOnChainInfo
Removed []miner.SectorPreCommitOnChainInfo
}
func (m *MinerPreCommitChanges) AsKey(key string) (adt.Keyer, error) {
sector, err := adt.ParseUIntKey(key)
if err != nil {
return nil, err
}
return miner.SectorKey(abi.SectorNumber(sector)), nil
}
func (m *MinerPreCommitChanges) Add(key string, val *typegen.Deferred) error {
sp := new(miner.SectorPreCommitOnChainInfo)
err := sp.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Added = append(m.Added, *sp)
return nil
}
func (m *MinerPreCommitChanges) Modify(key string, from, to *typegen.Deferred) error {
return nil
}
func (m *MinerPreCommitChanges) Remove(key string, val *typegen.Deferred) error {
sp := new(miner.SectorPreCommitOnChainInfo)
err := sp.UnmarshalCBOR(bytes.NewReader(val.Raw))
if err != nil {
return err
}
m.Removed = append(m.Removed, *sp)
return nil
}
func (sp *StatePredicates) OnMinerPreCommitChange() DiffMinerActorStateFunc {
return func(ctx context.Context, oldState, newState *miner.State) (changed bool, user UserData, err error) {
ctxStore := &contextStore{
ctx: ctx,
cst: sp.cst,
}
precommitChanges := &MinerPreCommitChanges{
Added: []miner.SectorPreCommitOnChainInfo{},
Removed: []miner.SectorPreCommitOnChainInfo{},
}
if oldState.PreCommittedSectors.Equals(newState.PreCommittedSectors) {
return false, nil, nil
}
oldPrecommits, err := adt.AsMap(ctxStore, oldState.PreCommittedSectors)
if err != nil {
return false, nil, err
}
newPrecommits, err := adt.AsMap(ctxStore, newState.PreCommittedSectors)
if err != nil {
return false, nil, err
}
if err := DiffAdtMap(oldPrecommits, newPrecommits, precommitChanges); err != nil {
return false, nil, err
}
if len(precommitChanges.Added)+len(precommitChanges.Removed) == 0 {
return false, nil, nil
}
return true, precommitChanges, nil
}
}

View File

@ -4,12 +4,15 @@ import (
"bytes"
"context"
"fmt"
"strings"
//"strings"
"sync"
"time"
"github.com/filecoin-project/go-address"
//"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/lotus/chain/events/state"
"github.com/filecoin-project/specs-actors/actors/abi"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
@ -34,25 +37,6 @@ func (p *Processor) setupMiners() error {
}
if _, err := tx.Exec(`
create table if not exists miner_sectors
(
miner_id text not null,
sector_id bigint not null,
activation_epoch bigint not null,
expiration_epoch bigint not null,
termination_epoch bigint,
deal_weight text not null,
verified_deal_weight text not null,
seal_cid text not null,
seal_rand_epoch bigint not null,
constraint miner_sectors_pk
primary key (miner_id, sector_id)
);
create index if not exists miner_sectors_miner_sectorid_index
on miner_sectors (miner_id, sector_id);
create table if not exists miner_info
(
@ -82,6 +66,35 @@ create table if not exists miner_power
primary key (miner_id, state_root)
);
create table if not exists miner_precommits
(
miner_id text not null,
sector_id bigint not null,
precommit_deposit text not null,
precommit_epoch text not null,
constraint miner_precommits_pk
primary key (miner_id, sector_id)
);
create table if not exists miner_sectors
(
miner_id text not null,
sector_id bigint not null,
activation_epoch bigint not null,
expiration_epoch bigint not null,
termination_epoch bigint,
deal_weight text not null,
verified_deal_weight text not null,
seal_cid text not null,
seal_rand_epoch bigint not null,
constraint miner_sectors_pk
primary key (miner_id, sector_id)
);
/* used to tell when a miners sectors (proven-not-yet-expired) changed if the miner_sectors_cid's are different a new sector was added or removed (terminated/expired) */
create table if not exists miner_sectors_heads
(
@ -95,12 +108,13 @@ create table if not exists miner_sectors_heads
);
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'miner_sector_event_type') THEN
CREATE TYPE miner_sector_event_type AS ENUM
(
'ADDED','EXTENDED', 'EXPIRED', 'TERMINATED'
'PRECOMMIT', 'COMMIT', 'EXTENDED', 'EXPIRED', 'TERMINATED'
);
END IF;
END$$;
@ -114,7 +128,13 @@ create table if not exists miner_sector_events
constraint miner_sector_events_pk
primary key (sector_id, event, miner_id, state_root)
)
);
create materialized view if not exists miner_sectors_view as
select ms.miner_id, ms.sector_id, mp.precommit_epoch, ms.activation_epoch, ms.expiration_epoch, ms.termination_epoch, ms.deal_weight, ms.verified_deal_weight
from miner_sectors ms
left join miner_precommits mp on ms.sector_id = mp.sector_id and ms.miner_id = mp.miner_id
`); err != nil {
return err
}
@ -228,7 +248,7 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo)
})
grp.Go(func() error {
if err := p.storeMinersSectorState(miners); err != nil {
if err := p.storeMinersSectorState(ctx, miners); err != nil {
return err
}
return nil
@ -241,6 +261,13 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo)
return nil
})
grp.Go(func() error {
if err := p.storeMinersPreCommitState(ctx, miners); err != nil {
return err
}
return nil
})
return grp.Wait()
}
@ -348,7 +375,7 @@ func (p *Processor) storeMinersPower(miners []minerActorInfo) error {
}
func (p *Processor) storeMinersSectorState(miners []minerActorInfo) error {
func (p *Processor) storeMinersSectorState(ctx context.Context, miners []minerActorInfo) error {
start := time.Now()
defer func() {
log.Debugw("Stored Miners Sector State", "duration", time.Since(start).String())
@ -368,7 +395,7 @@ func (p *Processor) storeMinersSectorState(miners []minerActorInfo) error {
return err
}
grp, ctx := errgroup.WithContext(context.TODO())
grp, ctx := errgroup.WithContext(ctx)
for _, m := range miners {
m := m
grp.Go(func() error {
@ -457,14 +484,127 @@ func (p *Processor) storeMinersSectorHeads(miners []minerActorInfo) error {
return tx.Commit()
}
func (p *Processor) storeMinersPreCommitState(ctx context.Context, miners []minerActorInfo) error {
start := time.Now()
defer func() {
log.Infow("Stored Miners Precommit State", "duration", time.Since(start).String())
}()
precommitTx, err := p.db.Begin()
if err != nil {
return err
}
if _, err := precommitTx.Exec(`create temp table mp (like miner_precommits excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
precommitStmt, err := precommitTx.Prepare(`copy mp (miner_id, sector_id, precommit_deposit, precommit_epoch) from STDIN`)
if err != nil {
return err
}
for _, m := range miners {
m := m
pcMap, err := adt.AsMap(cw_util.NewAPIIpldStore(ctx, p.node), m.state.PreCommittedSectors)
if err != nil {
return err
}
precommit := new(miner.SectorPreCommitOnChainInfo)
if err := pcMap.ForEach(precommit, func(key string) error {
if _, err := precommitStmt.Exec(
m.common.addr.String(),
precommit.Info.SectorNumber,
precommit.PreCommitDeposit.String(),
precommit.PreCommitEpoch,
); err != nil {
return err
}
return nil
}); err != nil {
return err
}
}
if err := precommitStmt.Close(); err != nil {
return err
}
if _, err := precommitTx.Exec(`insert into miner_precommits select * from mp on conflict do nothing`); err != nil {
return err
}
return precommitTx.Commit()
}
func (p *Processor) updateMiners(ctx context.Context, miners []minerActorInfo) error {
// TODO when/if there is more than one update operation here use an errgroup as is done in persistMiners
if err := p.updateMinersSectors(ctx, miners); err != nil {
return err
}
if err := p.updateMinersPrecommits(ctx, miners); err != nil {
return err
}
return nil
}
func (p *Processor) updateMinersPrecommits(ctx context.Context, miners []minerActorInfo) error {
start := time.Now()
defer func() {
log.Infow("Updated Miner Precommits", "duration", time.Since(start).String())
}()
pred := state.NewStatePredicates(p.node)
eventTx, err := p.db.Begin()
if err != nil {
return err
}
if _, err := eventTx.Exec(`create temp table mse (like miner_sector_events excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
eventStmt, err := eventTx.Prepare(`copy mse (sector_id, event, miner_id, state_root) from STDIN `)
if err != nil {
return err
}
for _, m := range miners {
pcDiffFn := pred.OnMinerActorChange(m.common.addr, pred.OnMinerPreCommitChange())
changed, val, err := pcDiffFn(ctx, m.common.parentTsKey, m.common.tsKey)
if err != nil {
if strings.Contains(err.Error(), "address not found") {
continue
}
log.Errorw("error getting miner precommit diff", "miner", m.common.addr, "error", err)
return err
}
if !changed {
continue
}
changes, ok := val.(*state.MinerPreCommitChanges)
if !ok {
log.Fatal("Developer Error")
}
for _, added := range changes.Added {
if _, err := eventStmt.Exec(added.Info.SectorNumber, "PRECOMMIT", m.common.addr.String(), m.common.stateroot.String()); err != nil {
return err
}
}
}
if err := eventStmt.Close(); err != nil {
return err
}
if _, err := eventTx.Exec(`insert into miner_sector_events select * from mse on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return eventTx.Commit()
}
func (p *Processor) updateMinersSectors(ctx context.Context, miners []minerActorInfo) error {
log.Debugw("Updating Miners Sectors", "#miners", len(miners))
start := time.Now()
@ -503,27 +643,40 @@ func (p *Processor) updateMinersSectors(ctx context.Context, miners []minerActor
//minerGrp, ctx := errgroup.WithContext(ctx)
//complete := 0
//for _, m := range miners {
//m := m
//minerGrp.Go(func() error {
//// special case genesis miners
//sectorDiffFn := pred.OnMinerActorChange(m.common.addr, pred.OnMinerSectorChange())
//changed, val, err := sectorDiffFn(ctx, m.common.parentTsKey, m.common.tsKey)
//if err != nil {
//if strings.Contains(err.Error(), "address not found") {
//return nil
//}
//log.Errorw("error getting miner sector diff", "miner", m.common.addr, "error", err)
//return err
//}
//if !changed {
//complete++
//return nil
//}
//changes, ok := val.(*state.MinerSectorChanges)
//if !ok {
//log.Fatalw("Developer Error")
//}
//log.Debugw("sector changes for miner", "miner", m.common.addr.String(), "Added", len(changes.Added), "Extended", len(changes.Extended), "Removed", len(changes.Removed), "oldState", m.common.parentTsKey, "newState", m.common.tsKey)
// m := m
// if m.common.tsKey == p.genesisTs.Key() {
// genSectors, err := p.node.StateMinerSectors(ctx, m.common.addr, nil, true, p.genesisTs.Key())
// if err != nil {
// return err
// }
// for _, sector := range genSectors {
// if _, err := eventStmt.Exec(sector.ID, "COMMIT", m.common.addr.String(), m.common.stateroot.String()); err != nil {
// return err
// }
// }
// complete++
// continue
// }
// minerGrp.Go(func() error {
// // special case genesis miners
// sectorDiffFn := pred.OnMinerActorChange(m.common.addr, pred.OnMinerSectorChange())
// changed, val, err := sectorDiffFn(ctx, m.common.parentTsKey, m.common.tsKey)
// if err != nil {
// if strings.Contains(err.Error(), "address not found") {
// return nil
// }
// log.Errorw("error getting miner sector diff", "miner", m.common.addr, "error", err)
// return err
// }
// if !changed {
// complete++
// return nil
// }
// changes, ok := val.(*state.MinerSectorChanges)
// if !ok {
// log.Fatalw("Developer Error")
// }
// log.Debugw("sector changes for miner", "miner", m.common.addr.String(), "Added", len(changes.Added), "Extended", len(changes.Extended), "Removed", len(changes.Removed), "oldState", m.common.parentTsKey, "newState", m.common.tsKey)
//for _, extended := range changes.Extended {
//if _, err := eventStmt.Exec(extended.To.Info.SectorNumber, "EXTENDED", m.common.addr.String(), m.common.stateroot.String()); err != nil {
@ -567,18 +720,18 @@ func (p *Processor) updateMinersSectors(ctx context.Context, miners []minerActor
//log.Debugw("sector removed", "miner", m.common.addr.String(), "sector", removed.Info.SectorNumber, "old", "sectorExpiration", removed.Info.Expiration, "currEpoch", curTs.Height())
//}
//for _, added := range changes.Added {
//if _, err := eventStmt.Exec(added.Info.SectorNumber, "ADDED", m.common.addr.String(), m.common.stateroot.String()); err != nil {
//return err
//}
//}
//complete++
//log.Debugw("Update Done", "complete", complete, "added", len(changes.Added), "removed", len(changes.Removed), "modified", len(changes.Extended))
//return nil
//})
// for _, added := range changes.Added {
// if _, err := eventStmt.Exec(added.Info.SectorNumber, "COMMIT", m.common.addr.String(), m.common.stateroot.String()); err != nil {
// return err
// }
// }
// complete++
// log.Debugw("Update Done", "complete", complete, "added", len(changes.Added), "removed", len(changes.Removed), "modified", len(changes.Extended))
// return nil
// })
//}
//if err := minerGrp.Wait(); err != nil {
//return err
// return err
//}
close(sectorUpdatesCh)
// wait for the update channel to be drained

View File

@ -29,6 +29,8 @@ type Processor struct {
node api.FullNode
genesisTs *types.TipSet
// number of blocks processed at a time
batch int
}
@ -87,6 +89,12 @@ func (p *Processor) Start(ctx context.Context) {
log.Fatalw("Failed to setup processor", "error", err)
}
var err error
p.genesisTs, err = p.node.ChainGetGenesis(ctx)
if err != nil {
log.Fatalw("Failed to get genesis state from lotus", "error", err.Error())
}
go p.subMpool(ctx)
// main processor loop
@ -176,6 +184,10 @@ func (p *Processor) refreshViews() error {
return err
}
if _, err := p.db.Exec(`refresh materialized view miner_sectors_view`); err != nil {
return err
}
return nil
}