optimize sector loading

And avoid exposing "arrays" via the miner abstraction. We may change these
structures later.
This commit is contained in:
Steven Allen 2020-09-21 12:05:01 -07:00
parent 025663118f
commit 4cf0c105eb
13 changed files with 81 additions and 159 deletions

View File

@ -313,11 +313,9 @@ type FullNode interface {
// StateNetworkName returns the name of the network the node is synced to
StateNetworkName(context.Context) (dtypes.NetworkName, error)
// StateMinerSectors returns info about the given miner's sectors. If the filter bitfield is nil, all sectors are included.
// If the filterOut boolean is set to true, any sectors in the filter are excluded.
// If false, only those sectors in the filter are included.
StateMinerSectors(context.Context, address.Address, *bitfield.BitField, bool, types.TipSetKey) ([]*miner.ChainSectorInfo, error)
StateMinerSectors(context.Context, address.Address, *bitfield.BitField, types.TipSetKey) ([]*miner.SectorOnChainInfo, error)
// StateMinerActiveSectors returns info about sectors that a given miner is actively proving.
StateMinerActiveSectors(context.Context, address.Address, types.TipSetKey) ([]*miner.ChainSectorInfo, error)
StateMinerActiveSectors(context.Context, address.Address, types.TipSetKey) ([]*miner.SectorOnChainInfo, error)
// StateMinerProvingDeadline calculates the deadline at some epoch for a proving period
// and returns the deadline-related calculations.
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)

View File

@ -162,8 +162,8 @@ type FullNodeStruct struct {
ClientRetrieveTryRestartInsufficientFunds func(ctx context.Context, paymentChannel address.Address) error `perm:"write"`
StateNetworkName func(context.Context) (dtypes.NetworkName, error) `perm:"read"`
StateMinerSectors func(context.Context, address.Address, *bitfield.BitField, bool, types.TipSetKey) ([]*miner.ChainSectorInfo, error) `perm:"read"`
StateMinerActiveSectors func(context.Context, address.Address, types.TipSetKey) ([]*miner.ChainSectorInfo, error) `perm:"read"`
StateMinerSectors func(context.Context, address.Address, *bitfield.BitField, types.TipSetKey) ([]*miner.SectorOnChainInfo, error) `perm:"read"`
StateMinerActiveSectors func(context.Context, address.Address, types.TipSetKey) ([]*miner.SectorOnChainInfo, error) `perm:"read"`
StateMinerProvingDeadline func(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) `perm:"read"`
StateMinerPower func(context.Context, address.Address, types.TipSetKey) (*api.MinerPower, error) `perm:"read"`
StateMinerInfo func(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error) `perm:"read"`
@ -734,11 +734,11 @@ func (c *FullNodeStruct) StateNetworkName(ctx context.Context) (dtypes.NetworkNa
return c.Internal.StateNetworkName(ctx)
}
func (c *FullNodeStruct) StateMinerSectors(ctx context.Context, addr address.Address, filter *bitfield.BitField, filterOut bool, tsk types.TipSetKey) ([]*miner.ChainSectorInfo, error) {
return c.Internal.StateMinerSectors(ctx, addr, filter, filterOut, tsk)
func (c *FullNodeStruct) StateMinerSectors(ctx context.Context, addr address.Address, sectorNos *bitfield.BitField, tsk types.TipSetKey) ([]*miner.SectorOnChainInfo, error) {
return c.Internal.StateMinerSectors(ctx, addr, sectorNos, tsk)
}
func (c *FullNodeStruct) StateMinerActiveSectors(ctx context.Context, addr address.Address, tsk types.TipSetKey) ([]*miner.ChainSectorInfo, error) {
func (c *FullNodeStruct) StateMinerActiveSectors(ctx context.Context, addr address.Address, tsk types.TipSetKey) ([]*miner.SectorOnChainInfo, error) {
return c.Internal.StateMinerActiveSectors(ctx, addr, tsk)
}

View File

@ -55,20 +55,3 @@ func AsArray(store Store, root cid.Cid, version network.Version) (Array, error)
}
return nil, xerrors.Errorf("unknown network version: %d", version)
}
type ROnlyArray interface {
Get(idx uint64, v cbor.Unmarshaler) (bool, error)
ForEach(v cbor.Unmarshaler, fn func(idx int64) error) error
}
type ProxyArray struct {
GetFunc func(idx uint64, v cbor.Unmarshaler) (bool, error)
ForEachFunc func(v cbor.Unmarshaler, fn func(idx int64) error) error
}
func (a *ProxyArray) Get(idx uint64, v cbor.Unmarshaler) (bool, error) {
return a.GetFunc(idx, v)
}
func (a *ProxyArray) ForEach(v cbor.Unmarshaler, fn func(idx int64) error) error {
return a.ForEachFunc(v, fn)
}

View File

@ -47,7 +47,7 @@ type State interface {
FindSector(abi.SectorNumber) (*SectorLocation, error)
GetSectorExpiration(abi.SectorNumber) (*SectorExpiration, error)
GetPrecommittedSector(abi.SectorNumber) (*SectorPreCommitOnChainInfo, error)
LoadSectorsFromSet(filter *bitfield.BitField, filterOut bool) (adt.ROnlyArray, error)
LoadSectors(sectorNos *bitfield.BitField) ([]*SectorOnChainInfo, error)
IsAllocated(abi.SectorNumber) (bool, error)
LoadDeadline(idx uint64) (Deadline, error)
@ -129,11 +129,6 @@ type MinerInfo struct {
WindowPoStPartitionSectors uint64
}
type ChainSectorInfo struct {
Info SectorOnChainInfo
ID abi.SectorNumber
}
type SectorExpiration struct {
OnTime abi.ChainEpoch

View File

@ -6,12 +6,10 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/cbor"
"github.com/filecoin-project/go-state-types/dline"
miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
adt0 "github.com/filecoin-project/specs-actors/actors/util/adt"
@ -163,59 +161,37 @@ func (s *state0) GetPrecommittedSector(num abi.SectorNumber) (*SectorPreCommitOn
return &ret, nil
}
func (s *state0) LoadSectorsFromSet(filter *bitfield.BitField, filterOut bool) (adt.ROnlyArray, error) {
a, err := adt0.AsArray(s.store, s.State.Sectors)
func (s *state0) LoadSectors(snos *bitfield.BitField) ([]*SectorOnChainInfo, error) {
sectors, err := miner0.LoadSectors(s.store, s.State.Sectors)
if err != nil {
return nil, err
}
incl := func(i uint64) (bool, error) {
include := true
if filter != nil {
set, err := filter.IsSet(i)
if err != nil {
return false, xerrors.Errorf("filter check error: %w", err)
}
if set == filterOut {
include = false
}
}
return include, nil
}
return &adt.ProxyArray{
GetFunc: func(idx uint64, v cbor.Unmarshaler) (bool, error) {
i, err := incl(idx)
if err != nil {
return false, err
}
if !i {
return false, nil
}
// TODO: ActorUpgrade potentially convert
return a.Get(idx, v)
},
ForEachFunc: func(v cbor.Unmarshaler, fn func(int64) error) error {
// TODO: ActorUpgrade potentially convert the output
return a.ForEach(v, func(i int64) error {
include, err := incl(uint64(i))
if err != nil {
return err
}
if !include {
// If no sector numbers are specified, load all.
if snos == nil {
infos := make([]*SectorOnChainInfo, 0, sectors.Length())
var info0 miner0.SectorOnChainInfo
if err := sectors.ForEach(&info0, func(i int64) error {
info := fromV0SectorOnChainInfo(info0)
infos[i] = &info
return nil
}); err != nil {
return nil, err
}
return infos, nil
}
return fn(i)
})
},
}, nil
}
func (s *state0) LoadPreCommittedSectors() (adt.Map, error) {
return adt0.AsMap(s.store, s.State.PreCommittedSectors)
// Otherwise, load selected.
infos0, err := sectors.Load(*snos)
if err != nil {
return nil, err
}
infos := make([]*SectorOnChainInfo, len(infos0))
for i, info0 := range infos0 {
info := fromV0SectorOnChainInfo(*info0)
infos[i] = &info
}
return infos, nil
}
func (s *state0) IsAllocated(num abi.SectorNumber) (bool, error) {

View File

@ -150,7 +150,7 @@ func MinerSectorInfo(ctx context.Context, sm *StateManager, maddr address.Addres
return mas.GetSector(sid)
}
func GetMinerSectorSet(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address, filter *bitfield.BitField, filterOut bool) ([]*miner.ChainSectorInfo, error) {
func GetMinerSectorSet(ctx context.Context, sm *StateManager, ts *types.TipSet, maddr address.Address, snos *bitfield.BitField) ([]*miner.SectorOnChainInfo, error) {
act, err := sm.LoadActor(ctx, maddr, ts)
if err != nil {
return nil, xerrors.Errorf("(get sset) failed to load miner actor: %w", err)
@ -161,29 +161,7 @@ func GetMinerSectorSet(ctx context.Context, sm *StateManager, ts *types.TipSet,
return nil, xerrors.Errorf("(get sset) failed to load miner actor state: %w", err)
}
sectors, err := mas.LoadSectorsFromSet(filter, filterOut)
if err != nil {
return nil, xerrors.Errorf("(get sset) failed to load sectors: %w", err)
}
var sset []*miner.ChainSectorInfo
var v cbg.Deferred
if err := sectors.ForEach(&v, func(i int64) error {
var oci miner.SectorOnChainInfo
if err := oci.UnmarshalCBOR(bytes.NewReader(v.Raw)); err != nil {
return err
}
sset = append(sset, &miner.ChainSectorInfo{
Info: oci,
ID: abi.SectorNumber(i),
})
return nil
}); err != nil {
return nil, err
}
return sset, nil
return mas.LoadSectors(snos)
}
func GetSectorsForWinningPoSt(ctx context.Context, pv ffiwrapper.Verifier, sm *StateManager, st cid.Cid, maddr address.Address, rand abi.PoStRandomness) ([]proof.SectorInfo, error) {
@ -249,35 +227,30 @@ func GetSectorsForWinningPoSt(ctx context.Context, pv ffiwrapper.Verifier, sm *S
return nil, xerrors.Errorf("generating winning post challenges: %w", err)
}
// we don't need to filter here (and it's **very** slow)
sectors, err := mas.LoadSectorsFromSet(nil, false)
iter, err := provingSectors.BitIterator()
if err != nil {
return nil, xerrors.Errorf("iterating over proving sectors: %w", err)
}
// Select winning sectors by _index_ in the all-sectors bitfield.
selectedSectors := bitfield.New()
prev := uint64(0)
for _, n := range ids {
sno, err := iter.Nth(n - prev)
if err != nil {
return nil, xerrors.Errorf("iterating over proving sectors: %w", err)
}
selectedSectors.Set(sno)
prev = n
}
sectors, err := mas.LoadSectors(&selectedSectors)
if err != nil {
return nil, xerrors.Errorf("loading proving sectors: %w", err)
}
out := make([]proof.SectorInfo, len(ids))
for i, n := range ids {
sb, err := provingSectors.Slice(n, 1)
if err != nil {
return nil, err
}
sid, err := sb.First()
if err != nil {
return nil, err
}
var sinfo miner.SectorOnChainInfo
found, err := sectors.Get(sid, &sinfo)
if err != nil {
return nil, xerrors.Errorf("loading sector info: %w", err)
}
if !found {
return nil, xerrors.Errorf("didn't find sector info for sector %d", n)
}
out := make([]proof.SectorInfo, len(sectors))
for i, sinfo := range sectors {
out[i] = proof.SectorInfo{
SealProof: spt,
SectorNumber: sinfo.SectorNumber,

View File

@ -259,13 +259,13 @@ var stateSectorsCmd = &cli.Command{
return err
}
sectors, err := api.StateMinerSectors(ctx, maddr, nil, true, ts.Key())
sectors, err := api.StateMinerSectors(ctx, maddr, nil, ts.Key())
if err != nil {
return err
}
for _, s := range sectors {
fmt.Printf("%d: %x\n", s.Info.SectorNumber, s.Info.SealedCID)
fmt.Printf("%d: %x\n", s.SectorNumber, s.SealedCID)
}
return nil
@ -305,7 +305,7 @@ var stateActiveSectorsCmd = &cli.Command{
}
for _, s := range sectors {
fmt.Printf("%d: %x\n", s.Info.SectorNumber, s.Info.SealedCID)
fmt.Printf("%d: %x\n", s.SectorNumber, s.SealedCID)
}
return nil

View File

@ -176,16 +176,16 @@ var sectorsListCmd = &cli.Command{
}
activeIDs := make(map[abi.SectorNumber]struct{}, len(activeSet))
for _, info := range activeSet {
activeIDs[info.ID] = struct{}{}
activeIDs[info.SectorNumber] = struct{}{}
}
sset, err := fullApi.StateMinerSectors(ctx, maddr, nil, true, types.EmptyTSK)
sset, err := fullApi.StateMinerSectors(ctx, maddr, nil, types.EmptyTSK)
if err != nil {
return err
}
commitedIDs := make(map[abi.SectorNumber]struct{}, len(activeSet))
for _, info := range sset {
commitedIDs[info.ID] = struct{}{}
commitedIDs[info.SectorNumber] = struct{}{}
}
sort.Slice(list, func(i, j int) bool {

View File

@ -3714,8 +3714,6 @@ Response:
### StateMinerSectors
StateMinerSectors returns info about the given miner's sectors. If the filter bitfield is nil, all sectors are included.
If the filterOut boolean is set to true, any sectors in the filter are excluded.
If false, only those sectors in the filter are included.
Perms: read
@ -3727,7 +3725,6 @@ Inputs:
[
0
],
true,
[
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"

View File

@ -64,15 +64,15 @@ func (a *StateAPI) StateNetworkName(ctx context.Context) (dtypes.NetworkName, er
return stmgr.GetNetworkName(ctx, a.StateManager, a.Chain.GetHeaviestTipSet().ParentState())
}
func (a *StateAPI) StateMinerSectors(ctx context.Context, addr address.Address, filter *bitfield.BitField, filterOut bool, tsk types.TipSetKey) ([]*miner.ChainSectorInfo, error) {
func (a *StateAPI) StateMinerSectors(ctx context.Context, addr address.Address, sectorNos *bitfield.BitField, tsk types.TipSetKey) ([]*miner.SectorOnChainInfo, error) {
ts, err := a.Chain.GetTipSetFromKey(tsk)
if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
return stmgr.GetMinerSectorSet(ctx, a.StateManager, ts, addr, filter, filterOut)
return stmgr.GetMinerSectorSet(ctx, a.StateManager, ts, addr, sectorNos)
}
func (a *StateAPI) StateMinerActiveSectors(ctx context.Context, maddr address.Address, tsk types.TipSetKey) ([]*miner.ChainSectorInfo, error) { // TODO: only used in cli
func (a *StateAPI) StateMinerActiveSectors(ctx context.Context, maddr address.Address, tsk types.TipSetKey) ([]*miner.SectorOnChainInfo, error) { // TODO: only used in cli
ts, err := a.Chain.GetTipSetFromKey(tsk)
if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
@ -93,7 +93,7 @@ func (a *StateAPI) StateMinerActiveSectors(ctx context.Context, maddr address.Ad
return nil, xerrors.Errorf("merge partition active sets: %w", err)
}
return stmgr.GetMinerSectorSet(ctx, a.StateManager, ts, maddr, &activeSectors, false)
return stmgr.GetMinerSectorSet(ctx, a.StateManager, ts, maddr, &activeSectors)
}
func (a *StateAPI) StateMinerInfo(ctx context.Context, actor address.Address, tsk types.TipSetKey) (miner.MinerInfo, error) {

View File

@ -71,7 +71,7 @@ type SealingStateEvt struct {
type storageMinerApi interface {
// Call a read only method on actors (no interaction with the chain required)
StateCall(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error)
StateMinerSectors(context.Context, address.Address, *bitfield.BitField, bool, types.TipSetKey) ([]*miner.ChainSectorInfo, error)
StateMinerSectors(context.Context, address.Address, *bitfield.BitField, types.TipSetKey) ([]*miner.SectorOnChainInfo, error)
StateSectorPreCommitInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (miner.SectorPreCommitOnChainInfo, error)
StateSectorGetInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (*miner.SectorOnChainInfo, error)
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*miner.SectorLocation, error)

View File

@ -594,7 +594,7 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition) ([][]a
}
func (s *WindowPoStScheduler) sectorsForProof(ctx context.Context, goodSectors, allSectors bitfield.BitField, ts *types.TipSet) ([]proof.SectorInfo, error) {
sset, err := s.api.StateMinerSectors(ctx, s.actor, &goodSectors, false, ts.Key())
sset, err := s.api.StateMinerSectors(ctx, s.actor, &goodSectors, ts.Key())
if err != nil {
return nil, err
}
@ -604,17 +604,17 @@ func (s *WindowPoStScheduler) sectorsForProof(ctx context.Context, goodSectors,
}
substitute := proof.SectorInfo{
SectorNumber: sset[0].ID,
SealedCID: sset[0].Info.SealedCID,
SealProof: sset[0].Info.SealProof,
SectorNumber: sset[0].SectorNumber,
SealedCID: sset[0].SealedCID,
SealProof: sset[0].SealProof,
}
sectorByID := make(map[uint64]proof.SectorInfo, len(sset))
for _, sector := range sset {
sectorByID[uint64(sector.ID)] = proof.SectorInfo{
SectorNumber: sector.ID,
SealedCID: sector.Info.SealedCID,
SealProof: sector.Info.SealProof,
sectorByID[uint64(sector.SectorNumber)] = proof.SectorInfo{
SectorNumber: sector.SectorNumber,
SealedCID: sector.SealedCID,
SealProof: sector.SealProof,
}
}

View File

@ -65,14 +65,14 @@ func (m *mockStorageMinerAPI) StateMinerPartitions(ctx context.Context, a addres
return m.partitions, nil
}
func (m *mockStorageMinerAPI) StateMinerSectors(ctx context.Context, address address.Address, field *bitfield.BitField, b bool, key types.TipSetKey) ([]*miner.ChainSectorInfo, error) {
var sis []*miner.ChainSectorInfo
_ = field.ForEach(func(i uint64) error {
sis = append(sis, &miner.ChainSectorInfo{
Info: miner.SectorOnChainInfo{
func (m *mockStorageMinerAPI) StateMinerSectors(ctx context.Context, address address.Address, snos *bitfield.BitField, key types.TipSetKey) ([]*miner.SectorOnChainInfo, error) {
var sis []*miner.SectorOnChainInfo
if snos == nil {
panic("unsupported")
}
_ = snos.ForEach(func(i uint64) error {
sis = append(sis, &miner.SectorOnChainInfo{
SectorNumber: abi.SectorNumber(i),
},
ID: abi.SectorNumber(i),
})
return nil
})