revert post changes

1. Calling the specific partition/deadline APIs is faster.
2. It's _much_ easier to test this way.
This commit is contained in:
Steven Allen 2020-09-18 15:40:49 -07:00
parent 1bf3b4989d
commit 35bce5a5c6
4 changed files with 22 additions and 87 deletions

View File

@ -58,8 +58,6 @@ type State interface {
DeadlineInfo(epoch abi.ChainEpoch) *dline.Info DeadlineInfo(epoch abi.ChainEpoch) *dline.Info
MaxAddressedSectors() (uint64, error)
// Diff helpers. Used by Diff* functions internally. // Diff helpers. Used by Diff* functions internally.
sectors() (adt.Array, error) sectors() (adt.Array, error)
decodeSectorOnChainInfo(*cbg.Deferred) (SectorOnChainInfo, error) decodeSectorOnChainInfo(*cbg.Deferred) (SectorOnChainInfo, error)

View File

@ -224,11 +224,6 @@ func (s *state0) NumDeadlines() (uint64, error) {
return miner0.WPoStPeriodDeadlines, nil return miner0.WPoStPeriodDeadlines, nil
} }
// Max sectors per PoSt
func (s *state0) MaxAddressedSectors() (uint64, error) {
return miner0.AddressedSectorsMax, nil
}
func (s *state0) DeadlinesChanged(other State) bool { func (s *state0) DeadlinesChanged(other State) bool {
other0, ok := other.(*state0) other0, ok := other.(*state0)
if !ok { if !ok {

View File

@ -77,6 +77,7 @@ type storageMinerApi interface {
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*miner.SectorLocation, error) StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok types.TipSetKey) (*miner.SectorLocation, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error)
StateMinerDeadlines(context.Context, address.Address, types.TipSetKey) ([]api.Deadline, error) StateMinerDeadlines(context.Context, address.Address, types.TipSetKey) ([]api.Deadline, error)
StateMinerPartitions(context.Context, address.Address, uint64, types.TipSetKey) ([]api.Partition, error)
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error) StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error)
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error) StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error)

View File

@ -5,6 +5,8 @@ import (
"context" "context"
"time" "time"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
@ -18,16 +20,14 @@ import (
"go.opencensus.io/trace" "go.opencensus.io/trace"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apibstore"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/journal"
miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
) )
func (s *WindowPoStScheduler) failPost(err error, deadline *dline.Info) { func (s *WindowPoStScheduler) failPost(err error, deadline *dline.Info) {
@ -154,7 +154,7 @@ func (s *WindowPoStScheduler) checkSectors(ctx context.Context, check bitfield.B
return sbf, nil return sbf, nil
} }
func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uint64, partitions []miner.Partition) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) { func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uint64, partitions []api.Partition) ([]miner.RecoveryDeclaration, *types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.checkNextRecoveries") ctx, span := trace.StartSpan(ctx, "storage.checkNextRecoveries")
defer span.End() defer span.End()
@ -164,15 +164,7 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
} }
for partIdx, partition := range partitions { for partIdx, partition := range partitions {
faults, err := partition.FaultySectors() unrecovered, err := bitfield.SubtractBitField(partition.FaultySectors, partition.RecoveringSectors)
if err != nil {
return nil, nil, xerrors.Errorf("getting faults: %w", err)
}
recovering, err := partition.RecoveringSectors()
if err != nil {
return nil, nil, xerrors.Errorf("getting recovering: %w", err)
}
unrecovered, err := bitfield.SubtractBitField(faults, recovering)
if err != nil { if err != nil {
return nil, nil, xerrors.Errorf("subtracting recovered set from fault set: %w", err) return nil, nil, xerrors.Errorf("subtracting recovered set from fault set: %w", err)
} }
@ -253,7 +245,7 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
return recoveries, sm, nil return recoveries, sm, nil
} }
func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64, partitions []miner.Partition) ([]miner.FaultDeclaration, *types.SignedMessage, error) { func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64, partitions []api.Partition) ([]miner.FaultDeclaration, *types.SignedMessage, error) {
ctx, span := trace.StartSpan(ctx, "storage.checkNextFaults") ctx, span := trace.StartSpan(ctx, "storage.checkNextFaults")
defer span.End() defer span.End()
@ -263,17 +255,12 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64,
} }
for partIdx, partition := range partitions { for partIdx, partition := range partitions {
toCheck, err := partition.ActiveSectors() good, err := s.checkSectors(ctx, partition.ActiveSectors)
if err != nil {
return nil, nil, xerrors.Errorf("getting active sectors: %w", err)
}
good, err := s.checkSectors(ctx, toCheck)
if err != nil { if err != nil {
return nil, nil, xerrors.Errorf("checking sectors: %w", err) return nil, nil, xerrors.Errorf("checking sectors: %w", err)
} }
faulty, err := bitfield.SubtractBitField(toCheck, good) faulty, err := bitfield.SubtractBitField(partition.ActiveSectors, good)
if err != nil { if err != nil {
return nil, nil, xerrors.Errorf("calculating faulty sector set: %w", err) return nil, nil, xerrors.Errorf("calculating faulty sector set: %w", err)
} }
@ -341,17 +328,6 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
ctx, span := trace.StartSpan(ctx, "storage.runPost") ctx, span := trace.StartSpan(ctx, "storage.runPost")
defer span.End() defer span.End()
stor := store.ActorStore(ctx, apibstore.NewAPIBlockstore(s.api))
act, err := s.api.StateGetActor(context.TODO(), s.actor, ts.Key())
if err != nil {
return nil, xerrors.Errorf("resolving actor: %w", err)
}
mas, err := miner.Load(stor, act)
if err != nil {
return nil, xerrors.Errorf("getting miner state: %w", err)
}
go func() { go func() {
// TODO: extract from runPost, run on fault cutoff boundaries // TODO: extract from runPost, run on fault cutoff boundaries
@ -359,18 +335,9 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
// late to declare them for this deadline // late to declare them for this deadline
declDeadline := (di.Index + 2) % di.WPoStPeriodDeadlines declDeadline := (di.Index + 2) % di.WPoStPeriodDeadlines
dl, err := mas.LoadDeadline(declDeadline) partitions, err := s.api.StateMinerPartitions(context.TODO(), s.actor, declDeadline, ts.Key())
if err != nil { if err != nil {
log.Errorf("loading deadline: %v", err) log.Errorf("getting partitions: %v", err)
return
}
var partitions []miner.Partition
err = dl.ForEachPartition(func(_ uint64, part miner.Partition) error {
partitions = append(partitions, part)
return nil
})
if err != nil {
log.Errorf("loading partitions: %v", err)
return return
} }
@ -429,24 +396,15 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
return nil, xerrors.Errorf("failed to get chain randomness for window post (ts=%d; deadline=%d): %w", ts.Height(), di, err) return nil, xerrors.Errorf("failed to get chain randomness for window post (ts=%d; deadline=%d): %w", ts.Height(), di, err)
} }
dl, err := mas.LoadDeadline(di.Index)
if err != nil {
return nil, xerrors.Errorf("loading deadline: %w", err)
}
// Get the partitions for the given deadline // Get the partitions for the given deadline
var partitions []miner.Partition partitions, err := s.api.StateMinerPartitions(ctx, s.actor, di.Index, ts.Key())
err = dl.ForEachPartition(func(_ uint64, part miner.Partition) error {
partitions = append(partitions, part)
return nil
})
if err != nil { if err != nil {
return nil, xerrors.Errorf("loading partitions: %w", err) return nil, xerrors.Errorf("getting partitions: %w", err)
} }
// Split partitions into batches, so as not to exceed the number of sectors // Split partitions into batches, so as not to exceed the number of sectors
// allowed in a single message // allowed in a single message
partitionBatches, err := s.batchPartitions(partitions, mas) partitionBatches, err := s.batchPartitions(partitions)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -475,16 +433,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
var sinfos []proof.SectorInfo var sinfos []proof.SectorInfo
for partIdx, partition := range batch { for partIdx, partition := range batch {
// TODO: Can do this in parallel // TODO: Can do this in parallel
toProve, err := partition.ActiveSectors() toProve, err := bitfield.MergeBitFields(partition.ActiveSectors, partition.RecoveringSectors)
if err != nil {
return nil, xerrors.Errorf("getting active sectors: %w", err)
}
recs, err := partition.RecoveringSectors()
if err != nil {
return nil, xerrors.Errorf("getting recovering sectors: %w", err)
}
toProve, err = bitfield.MergeBitFields(toProve, recs)
if err != nil { if err != nil {
return nil, xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err) return nil, xerrors.Errorf("adding recoveries to set of sectors to prove: %w", err)
} }
@ -511,12 +460,7 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
skipCount += sc skipCount += sc
partitionSectors, err := partition.AllSectors() ssi, err := s.sectorsForProof(ctx, good, partition.AllSectors, ts)
if err != nil {
return nil, xerrors.Errorf("getting partition sectors: %w", err)
}
ssi, err := s.sectorsForProof(ctx, good, partitionSectors, ts)
if err != nil { if err != nil {
return nil, xerrors.Errorf("getting sorted sector info: %w", err) return nil, xerrors.Errorf("getting sorted sector info: %w", err)
} }
@ -609,18 +553,13 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di dline.Info, ts *ty
return posts, nil return posts, nil
} }
func (s *WindowPoStScheduler) batchPartitions(partitions []miner.Partition, mas miner.State) ([][]miner.Partition, error) { func (s *WindowPoStScheduler) batchPartitions(partitions []api.Partition) ([][]api.Partition, error) {
// Get the number of sectors allowed in a partition, for this proof size // Get the number of sectors allowed in a partition, for this proof size
sectorsPerPartition, err := builtin.PoStProofWindowPoStPartitionSectors(s.proofType) sectorsPerPartition, err := builtin.PoStProofWindowPoStPartitionSectors(s.proofType)
if err != nil { if err != nil {
return nil, xerrors.Errorf("getting sectors per partition: %w", err) return nil, xerrors.Errorf("getting sectors per partition: %w", err)
} }
maxSectors, err := mas.MaxAddressedSectors()
if err != nil {
return nil, err
}
// We don't want to exceed the number of sectors allowed in a message. // We don't want to exceed the number of sectors allowed in a message.
// So given the number of sectors in a partition, work out the number of // So given the number of sectors in a partition, work out the number of
// partitions that can be in a message without exceeding sectors per // partitions that can be in a message without exceeding sectors per
@ -631,7 +570,9 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []miner.Partition, mas
// sectors per partition 3: ooo // sectors per partition 3: ooo
// partitions per message 2: oooOOO // partitions per message 2: oooOOO
// <1><2> (3rd doesn't fit) // <1><2> (3rd doesn't fit)
partitionsPerMsg := int(maxSectors / sectorsPerPartition) // TODO(NETUPGRADE): we're going to need some form of policy abstraction
// where we can get policy from the future. Unfortunately, we can't just get this from the state.
partitionsPerMsg := int(miner0.AddressedSectorsMax / sectorsPerPartition)
// The number of messages will be: // The number of messages will be:
// ceiling(number of partitions / partitions per message) // ceiling(number of partitions / partitions per message)
@ -641,7 +582,7 @@ func (s *WindowPoStScheduler) batchPartitions(partitions []miner.Partition, mas
} }
// Split the partitions into batches // Split the partitions into batches
batches := make([][]miner.Partition, 0, batchCount) batches := make([][]api.Partition, 0, batchCount)
for i := 0; i < len(partitions); i += partitionsPerMsg { for i := 0; i < len(partitions); i += partitionsPerMsg {
end := i + partitionsPerMsg end := i + partitionsPerMsg
if end > len(partitions) { if end > len(partitions) {