317 lines
9.2 KiB
Go
317 lines
9.2 KiB
Go
package simulation
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/filecoin-project/go-bitfield"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-address"
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
"github.com/filecoin-project/go-state-types/exitcode"
|
|
"github.com/filecoin-project/go-state-types/network"
|
|
"github.com/filecoin-project/lotus/chain/actors"
|
|
"github.com/filecoin-project/lotus/chain/actors/aerrors"
|
|
"github.com/filecoin-project/lotus/chain/actors/builtin"
|
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
|
"github.com/filecoin-project/lotus/chain/actors/policy"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
|
|
miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner"
|
|
power5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/power"
|
|
)
|
|
|
|
// packProveCOmmits packs all prove-commits for all "ready to be proven" sectors until it fills the
|
|
// block or runs out.
|
|
func (ss *simulationState) packProveCommits(ctx context.Context, cb packFunc) (_full bool, _err error) {
|
|
// Roll the commitQueue forward.
|
|
ss.commitQueue.advanceEpoch(ss.nextEpoch())
|
|
|
|
var failed, done, unbatched, count int
|
|
defer func() {
|
|
if _err != nil {
|
|
return
|
|
}
|
|
remaining := ss.commitQueue.ready()
|
|
log.Debugw("packed prove commits",
|
|
"remaining", remaining,
|
|
"done", done,
|
|
"failed", failed,
|
|
"unbatched", unbatched,
|
|
"miners-processed", count,
|
|
"filled-block", _full,
|
|
)
|
|
}()
|
|
|
|
for {
|
|
addr, pending, ok := ss.commitQueue.nextMiner()
|
|
if !ok {
|
|
return false, nil
|
|
}
|
|
|
|
res, full, err := ss.packProveCommitsMiner(ctx, cb, addr, pending)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
failed += res.failed
|
|
done += res.done
|
|
unbatched += res.unbatched
|
|
count++
|
|
if full {
|
|
return true, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
type proveCommitResult struct {
|
|
done, failed, unbatched int
|
|
}
|
|
|
|
// sendAndFund "packs" the given message, funding the actor if necessary. It:
|
|
//
|
|
// 1. Tries to send the given message.
|
|
// 2. If that fails, it checks to see if the exit code was ErrInsufficientFunds.
|
|
// 3. If so, it sends 1K FIL from the "burnt funds actor" (because we need to send it from
|
|
// somewhere) and re-tries the message.0
|
|
//
|
|
// NOTE: If the message fails a second time, the funds won't be "unsent".
|
|
func sendAndFund(send packFunc, msg *types.Message) (bool, error) {
|
|
full, err := send(msg)
|
|
aerr, ok := err.(aerrors.ActorError)
|
|
if !ok || aerr.RetCode() != exitcode.ErrInsufficientFunds {
|
|
return full, err
|
|
}
|
|
// Ok, insufficient funds. Let's fund this miner and try again.
|
|
full, err = send(&types.Message{
|
|
From: builtin.BurntFundsActorAddr,
|
|
To: msg.To,
|
|
Value: targetFunds,
|
|
Method: builtin.MethodSend,
|
|
})
|
|
if err != nil {
|
|
return false, xerrors.Errorf("failed to fund %s: %w", msg.To, err)
|
|
}
|
|
// ok, nothing's going to work.
|
|
if full {
|
|
return true, nil
|
|
}
|
|
return send(msg)
|
|
}
|
|
|
|
// packProveCommitsMiner enqueues a prove commits from the given miner until it runs out of
|
|
// available prove-commits, batching as much as possible.
|
|
//
|
|
// This function will fund as necessary from the "burnt funds actor" (look, it's convenient).
|
|
func (ss *simulationState) packProveCommitsMiner(
|
|
ctx context.Context, cb packFunc, minerAddr address.Address,
|
|
pending minerPendingCommits,
|
|
) (res proveCommitResult, full bool, _err error) {
|
|
info, err := ss.getMinerInfo(ctx, minerAddr)
|
|
if err != nil {
|
|
return res, false, err
|
|
}
|
|
|
|
nv := ss.StateManager.GetNtwkVersion(ctx, ss.nextEpoch())
|
|
for sealType, snos := range pending {
|
|
if nv >= network.Version13 {
|
|
for len(snos) > minProveCommitBatchSize {
|
|
batchSize := maxProveCommitBatchSize
|
|
if len(snos) < batchSize {
|
|
batchSize = len(snos)
|
|
}
|
|
batch := snos[:batchSize]
|
|
|
|
proof, err := mockAggregateSealProof(sealType, minerAddr, batchSize)
|
|
if err != nil {
|
|
return res, false, err
|
|
}
|
|
|
|
params := miner5.ProveCommitAggregateParams{
|
|
SectorNumbers: bitfield.New(),
|
|
AggregateProof: proof,
|
|
}
|
|
for _, sno := range batch {
|
|
params.SectorNumbers.Set(uint64(sno))
|
|
}
|
|
|
|
enc, err := actors.SerializeParams(¶ms)
|
|
if err != nil {
|
|
return res, false, err
|
|
}
|
|
|
|
if full, err := sendAndFund(cb, &types.Message{
|
|
From: info.Worker,
|
|
To: minerAddr,
|
|
Value: abi.NewTokenAmount(0),
|
|
Method: miner.Methods.ProveCommitAggregate,
|
|
Params: enc,
|
|
}); err != nil {
|
|
// If we get a random error, or a fatal actor error, bail.
|
|
aerr, ok := err.(aerrors.ActorError)
|
|
if !ok || aerr.IsFatal() {
|
|
return res, false, err
|
|
}
|
|
// If we get a "not-found" error, try to remove any missing
|
|
// prove-commits and continue. This can happen either
|
|
// because:
|
|
//
|
|
// 1. The pre-commit failed on execution (but not when
|
|
// packing). This shouldn't happen, but we might as well
|
|
// gracefully handle it.
|
|
// 2. The pre-commit has expired. We'd have to be really
|
|
// backloged to hit this case, but we might as well handle
|
|
// it.
|
|
if aerr.RetCode() == exitcode.ErrNotFound {
|
|
// First, split into "good" and "missing"
|
|
good, err := ss.filterProveCommits(ctx, minerAddr, batch)
|
|
if err != nil {
|
|
log.Errorw("failed to filter prove commits", "miner", minerAddr, "error", err)
|
|
// fail with the original error.
|
|
return res, false, aerr
|
|
}
|
|
removed := len(batch) - len(good)
|
|
// If they're all missing, skip. If they're all good, skip too (and log).
|
|
if len(good) > 0 && removed > 0 {
|
|
res.failed += removed
|
|
|
|
// update the pending sector numbers in-place to remove the expired ones.
|
|
snos = snos[removed:]
|
|
copy(snos, good)
|
|
pending.finish(sealType, removed)
|
|
|
|
log.Errorw("failed to prove commit expired/missing pre-commits",
|
|
"error", aerr,
|
|
"miner", minerAddr,
|
|
"discarded", removed,
|
|
"kept", len(good),
|
|
"epoch", ss.nextEpoch(),
|
|
)
|
|
continue
|
|
}
|
|
}
|
|
log.Errorw("failed to prove commit sector(s)",
|
|
"error", err,
|
|
"miner", minerAddr,
|
|
"sectors", batch,
|
|
"epoch", ss.nextEpoch(),
|
|
)
|
|
res.failed += len(batch)
|
|
} else if full {
|
|
return res, true, nil
|
|
} else {
|
|
res.done += len(batch)
|
|
}
|
|
pending.finish(sealType, batchSize)
|
|
snos = snos[batchSize:]
|
|
}
|
|
}
|
|
for len(snos) > 0 && res.unbatched < power5.MaxMinerProveCommitsPerEpoch {
|
|
sno := snos[0]
|
|
snos = snos[1:]
|
|
|
|
proof, err := mockSealProof(sealType, minerAddr)
|
|
if err != nil {
|
|
return res, false, err
|
|
}
|
|
params := miner.ProveCommitSectorParams{
|
|
SectorNumber: sno,
|
|
Proof: proof,
|
|
}
|
|
enc, err := actors.SerializeParams(¶ms)
|
|
if err != nil {
|
|
return res, false, err
|
|
}
|
|
if full, err := sendAndFund(cb, &types.Message{
|
|
From: info.Worker,
|
|
To: minerAddr,
|
|
Value: abi.NewTokenAmount(0),
|
|
Method: miner.Methods.ProveCommitSector,
|
|
Params: enc,
|
|
}); err != nil {
|
|
if aerr, ok := err.(aerrors.ActorError); !ok || aerr.IsFatal() {
|
|
return res, false, err
|
|
}
|
|
log.Errorw("failed to prove commit sector(s)",
|
|
"error", err,
|
|
"miner", minerAddr,
|
|
"sectors", []abi.SectorNumber{sno},
|
|
"epoch", ss.nextEpoch(),
|
|
)
|
|
res.failed++
|
|
} else if full {
|
|
return res, true, nil
|
|
} else {
|
|
res.unbatched++
|
|
res.done++
|
|
}
|
|
// mark it as "finished" regardless so we skip it.
|
|
pending.finish(sealType, 1)
|
|
}
|
|
// if we get here, we can't pre-commit anything more.
|
|
}
|
|
return res, false, nil
|
|
}
|
|
|
|
// loadProveCommitsMiner enqueue all pending prove-commits for the given miner. This is called on
|
|
// load to populate the commitQueue and should not need to be called later.
|
|
//
|
|
// It will drop any pre-commits that have already expired.
|
|
func (ss *simulationState) loadProveCommitsMiner(ctx context.Context, addr address.Address, minerState miner.State) error {
|
|
// Find all pending prove commits and group by proof type. Really, there should never
|
|
// (except during upgrades be more than one type.
|
|
nextEpoch := ss.nextEpoch()
|
|
nv := ss.StateManager.GetNtwkVersion(ctx, nextEpoch)
|
|
av := actors.VersionForNetwork(nv)
|
|
|
|
var total, dropped int
|
|
err := minerState.ForEachPrecommittedSector(func(info miner.SectorPreCommitOnChainInfo) error {
|
|
total++
|
|
msd := policy.GetMaxProveCommitDuration(av, info.Info.SealProof)
|
|
if nextEpoch > info.PreCommitEpoch+msd {
|
|
dropped++
|
|
return nil
|
|
}
|
|
return ss.commitQueue.enqueueProveCommit(addr, info.PreCommitEpoch, info.Info)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if dropped > 0 {
|
|
log.Warnw("dropped expired pre-commits on load",
|
|
"miner", addr,
|
|
"total", total,
|
|
"expired", dropped,
|
|
)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// filterProveCommits filters out expired and/or missing pre-commits.
|
|
func (ss *simulationState) filterProveCommits(ctx context.Context, minerAddr address.Address, snos []abi.SectorNumber) ([]abi.SectorNumber, error) {
|
|
_, minerState, err := ss.getMinerState(ctx, minerAddr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
nextEpoch := ss.nextEpoch()
|
|
nv := ss.StateManager.GetNtwkVersion(ctx, nextEpoch)
|
|
av := actors.VersionForNetwork(nv)
|
|
|
|
good := make([]abi.SectorNumber, 0, len(snos))
|
|
for _, sno := range snos {
|
|
info, err := minerState.GetPrecommittedSector(sno)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if info == nil {
|
|
continue
|
|
}
|
|
msd := policy.GetMaxProveCommitDuration(av, info.Info.SealProof)
|
|
if nextEpoch > info.PreCommitEpoch+msd {
|
|
continue
|
|
}
|
|
good = append(good, sno)
|
|
}
|
|
return good, nil
|
|
}
|