feat(lotus-sim): return receipt

Instead of returning a "full" boolean and an error, return a receipt and
an error. We now use the error to indicate if the block is "full".
This commit is contained in:
Steven Allen 2021-06-09 13:47:38 -07:00
parent 0075abea5e
commit 86e459f585
4 changed files with 148 additions and 116 deletions

View File

@ -31,8 +31,11 @@ func makeCommR(minerAddr address.Address, sno abi.SectorNumber) cid.Cid {
} }
// packPreCommits packs pre-commit messages until the block is full. // packPreCommits packs pre-commit messages until the block is full.
func (ss *simulationState) packPreCommits(ctx context.Context, cb packFunc) (full bool, _err error) { func (ss *simulationState) packPreCommits(ctx context.Context, cb packFunc) (_err error) {
var top1Count, top10Count, restCount int var (
full bool
top1Count, top10Count, restCount int
)
defer func() { defer func() {
if _err != nil { if _err != nil {
return return
@ -74,16 +77,20 @@ func (ss *simulationState) packPreCommits(ctx context.Context, cb packFunc) (ful
restMiners++ restMiners++
default: default:
// Well, we've run through all miners. // Well, we've run through all miners.
return false, nil return nil
} }
added, full, err := ss.packPreCommitsMiner(ctx, cb, minerAddr, maxProveCommitBatchSize) var (
added int
err error
)
added, full, err = ss.packPreCommitsMiner(ctx, cb, minerAddr, maxProveCommitBatchSize)
if err != nil { if err != nil {
return false, xerrors.Errorf("failed to pack precommits for miner %s: %w", minerAddr, err) return xerrors.Errorf("failed to pack precommits for miner %s: %w", minerAddr, err)
} }
*count += added *count += added
if full { if full {
return true, nil return nil
} }
} }
} }
@ -111,17 +118,15 @@ func (ss *simulationState) packPreCommitsMiner(ctx context.Context, cb packFunc,
} }
if big.Cmp(minerBalance, minFunds) < 0 { if big.Cmp(minerBalance, minFunds) < 0 {
full, err := cb(&types.Message{ if _, err := cb(&types.Message{
From: builtin.BurntFundsActorAddr, From: builtin.BurntFundsActorAddr,
To: minerAddr, To: minerAddr,
Value: targetFunds, Value: targetFunds,
Method: builtin.MethodSend, Method: builtin.MethodSend,
}) }); err == ErrOutOfGas {
if err != nil {
return 0, false, xerrors.Errorf("failed to fund miner %s: %w", minerAddr, err)
}
if full {
return 0, true, nil return 0, true, nil
} else if err != nil {
return 0, false, xerrors.Errorf("failed to fund miner %s: %w", minerAddr, err)
} }
} }
@ -168,18 +173,18 @@ func (ss *simulationState) packPreCommitsMiner(ctx context.Context, cb packFunc,
} }
// NOTE: just in-case, sendAndFund will "fund" and re-try for any message // NOTE: just in-case, sendAndFund will "fund" and re-try for any message
// that fails due to "insufficient funds". // that fails due to "insufficient funds".
if full, err := sendAndFund(cb, &types.Message{ if _, err := sendAndFund(cb, &types.Message{
To: minerAddr, To: minerAddr,
From: minerInfo.Worker, From: minerInfo.Worker,
Value: abi.NewTokenAmount(0), Value: abi.NewTokenAmount(0),
Method: miner.Methods.PreCommitSectorBatch, Method: miner.Methods.PreCommitSectorBatch,
Params: enc, Params: enc,
}); err != nil { }); err == ErrOutOfGas {
return 0, false, err
} else if full {
// try again with a smaller batch. // try again with a smaller batch.
targetBatchSize /= 2 targetBatchSize /= 2
continue continue
} else if err != nil {
return 0, false, err
} }
for _, info := range batch { for _, info := range batch {
@ -196,14 +201,16 @@ func (ss *simulationState) packPreCommitsMiner(ctx context.Context, cb packFunc,
if err != nil { if err != nil {
return 0, false, err return 0, false, err
} }
if full, err := sendAndFund(cb, &types.Message{ if _, err := sendAndFund(cb, &types.Message{
To: minerAddr, To: minerAddr,
From: minerInfo.Worker, From: minerInfo.Worker,
Value: abi.NewTokenAmount(0), Value: abi.NewTokenAmount(0),
Method: miner.Methods.PreCommitSector, Method: miner.Methods.PreCommitSector,
Params: enc, Params: enc,
}); full || err != nil { }); err == ErrOutOfGas {
return added, full, err return added, true, nil
} else if err != nil {
return added, false, err
} }
if err := ss.commitQueue.enqueueProveCommit(minerAddr, epoch, info); err != nil { if err := ss.commitQueue.enqueueProveCommit(minerAddr, epoch, info); err != nil {

View File

@ -23,11 +23,11 @@ import (
// packProveCOmmits packs all prove-commits for all "ready to be proven" sectors until it fills the // packProveCOmmits packs all prove-commits for all "ready to be proven" sectors until it fills the
// block or runs out. // block or runs out.
func (ss *simulationState) packProveCommits(ctx context.Context, cb packFunc) (_full bool, _err error) { func (ss *simulationState) packProveCommits(ctx context.Context, cb packFunc) (_err error) {
// Roll the commitQueue forward. // Roll the commitQueue forward.
ss.commitQueue.advanceEpoch(ss.nextEpoch()) ss.commitQueue.advanceEpoch(ss.nextEpoch())
var failed, done, unbatched, count int var full, failed, done, unbatched, count int
defer func() { defer func() {
if _err != nil { if _err != nil {
return return
@ -39,32 +39,33 @@ func (ss *simulationState) packProveCommits(ctx context.Context, cb packFunc) (_
"failed", failed, "failed", failed,
"unbatched", unbatched, "unbatched", unbatched,
"miners-processed", count, "miners-processed", count,
"filled-block", _full, "filled-block", full,
) )
}() }()
for { for {
addr, pending, ok := ss.commitQueue.nextMiner() addr, pending, ok := ss.commitQueue.nextMiner()
if !ok { if !ok {
return false, nil return nil
} }
res, full, err := ss.packProveCommitsMiner(ctx, cb, addr, pending) res, err := ss.packProveCommitsMiner(ctx, cb, addr, pending)
if err != nil { if err != nil {
return false, err return err
} }
failed += res.failed failed += res.failed
done += res.done done += res.done
unbatched += res.unbatched unbatched += res.unbatched
count++ count++
if full { if res.full {
return true, nil return nil
} }
} }
} }
type proveCommitResult struct { type proveCommitResult struct {
done, failed, unbatched int done, failed, unbatched int
full bool
} }
// sendAndFund "packs" the given message, funding the actor if necessary. It: // sendAndFund "packs" the given message, funding the actor if necessary. It:
@ -75,25 +76,24 @@ type proveCommitResult struct {
// somewhere) and re-tries the message.0 // somewhere) and re-tries the message.0
// //
// NOTE: If the message fails a second time, the funds won't be "unsent". // NOTE: If the message fails a second time, the funds won't be "unsent".
func sendAndFund(send packFunc, msg *types.Message) (bool, error) { func sendAndFund(send packFunc, msg *types.Message) (*types.MessageReceipt, error) {
full, err := send(msg) res, err := send(msg)
aerr, ok := err.(aerrors.ActorError) aerr, ok := err.(aerrors.ActorError)
if !ok || aerr.RetCode() != exitcode.ErrInsufficientFunds { if !ok || aerr.RetCode() != exitcode.ErrInsufficientFunds {
return full, err return res, err
} }
// Ok, insufficient funds. Let's fund this miner and try again. // Ok, insufficient funds. Let's fund this miner and try again.
full, err = send(&types.Message{ _, err = send(&types.Message{
From: builtin.BurntFundsActorAddr, From: builtin.BurntFundsActorAddr,
To: msg.To, To: msg.To,
Value: targetFunds, Value: targetFunds,
Method: builtin.MethodSend, Method: builtin.MethodSend,
}) })
if err != nil { if err != nil {
return false, xerrors.Errorf("failed to fund %s: %w", msg.To, err) if err != ErrOutOfGas {
err = xerrors.Errorf("failed to fund %s: %w", msg.To, err)
} }
// ok, nothing's going to work. return nil, err
if full {
return true, nil
} }
return send(msg) return send(msg)
} }
@ -105,10 +105,10 @@ func sendAndFund(send packFunc, msg *types.Message) (bool, error) {
func (ss *simulationState) packProveCommitsMiner( func (ss *simulationState) packProveCommitsMiner(
ctx context.Context, cb packFunc, minerAddr address.Address, ctx context.Context, cb packFunc, minerAddr address.Address,
pending minerPendingCommits, pending minerPendingCommits,
) (res proveCommitResult, full bool, _err error) { ) (res proveCommitResult, _err error) {
info, err := ss.getMinerInfo(ctx, minerAddr) info, err := ss.getMinerInfo(ctx, minerAddr)
if err != nil { if err != nil {
return res, false, err return res, err
} }
nv := ss.StateManager.GetNtwkVersion(ctx, ss.nextEpoch()) nv := ss.StateManager.GetNtwkVersion(ctx, ss.nextEpoch())
@ -123,7 +123,7 @@ func (ss *simulationState) packProveCommitsMiner(
proof, err := mockAggregateSealProof(sealType, minerAddr, batchSize) proof, err := mockAggregateSealProof(sealType, minerAddr, batchSize)
if err != nil { if err != nil {
return res, false, err return res, err
} }
params := miner5.ProveCommitAggregateParams{ params := miner5.ProveCommitAggregateParams{
@ -136,24 +136,27 @@ func (ss *simulationState) packProveCommitsMiner(
enc, err := actors.SerializeParams(&params) enc, err := actors.SerializeParams(&params)
if err != nil { if err != nil {
return res, false, err return res, err
} }
if full, err := sendAndFund(cb, &types.Message{ if _, err := sendAndFund(cb, &types.Message{
From: info.Worker, From: info.Worker,
To: minerAddr, To: minerAddr,
Value: abi.NewTokenAmount(0), Value: abi.NewTokenAmount(0),
Method: miner.Methods.ProveCommitAggregate, Method: miner.Methods.ProveCommitAggregate,
Params: enc, Params: enc,
}); err != nil { }); err == nil {
res.done += len(batch)
} else if err == ErrOutOfGas {
res.full = true
return res, nil
} else if aerr, ok := err.(aerrors.ActorError); !ok || aerr.IsFatal() {
// If we get a random error, or a fatal actor error, bail. // If we get a random error, or a fatal actor error, bail.
aerr, ok := err.(aerrors.ActorError) return res, err
if !ok || aerr.IsFatal() { } else if aerr.RetCode() == exitcode.ErrNotFound || aerr.RetCode() == exitcode.ErrIllegalArgument {
return res, false, err // If we get a "not-found" or illegal argument error, try to
} // remove any missing prove-commits and continue. This can
// If we get a "not-found" error, try to remove any missing // happen either because:
// prove-commits and continue. This can happen either
// because:
// //
// 1. The pre-commit failed on execution (but not when // 1. The pre-commit failed on execution (but not when
// packing). This shouldn't happen, but we might as well // packing). This shouldn't happen, but we might as well
@ -161,19 +164,31 @@ func (ss *simulationState) packProveCommitsMiner(
// 2. The pre-commit has expired. We'd have to be really // 2. The pre-commit has expired. We'd have to be really
// backloged to hit this case, but we might as well handle // backloged to hit this case, but we might as well handle
// it. // it.
if aerr.RetCode() == exitcode.ErrNotFound {
// First, split into "good" and "missing" // First, split into "good" and "missing"
good, err := ss.filterProveCommits(ctx, minerAddr, batch) good, err := ss.filterProveCommits(ctx, minerAddr, batch)
if err != nil { if err != nil {
log.Errorw("failed to filter prove commits", "miner", minerAddr, "error", err) log.Errorw("failed to filter prove commits", "miner", minerAddr, "error", err)
// fail with the original error. // fail with the original error.
return res, false, aerr return res, aerr
} }
removed := len(batch) - len(good) removed := len(batch) - len(good)
// If they're all missing, skip. If they're all good, skip too (and log). if removed == 0 {
if len(good) > 0 && removed > 0 { log.Errorw("failed to prove-commit for unknown reasons",
res.failed += removed "error", aerr,
"miner", minerAddr,
"sectors", batch,
"epoch", ss.nextEpoch(),
)
res.failed += len(batch)
} else if len(good) == 0 {
log.Errorw("failed to prove commit missing pre-commits",
"error", aerr,
"miner", minerAddr,
"discarded", removed,
"epoch", ss.nextEpoch(),
)
res.failed += len(batch)
} else {
// update the pending sector numbers in-place to remove the expired ones. // update the pending sector numbers in-place to remove the expired ones.
snos = snos[removed:] snos = snos[removed:]
copy(snos, good) copy(snos, good)
@ -186,9 +201,19 @@ func (ss *simulationState) packProveCommitsMiner(
"kept", len(good), "kept", len(good),
"epoch", ss.nextEpoch(), "epoch", ss.nextEpoch(),
) )
res.failed += removed
// Then try again.
continue continue
} }
} log.Errorw("failed to prove commit missing sector(s)",
"error", err,
"miner", minerAddr,
"sectors", batch,
"epoch", ss.nextEpoch(),
)
res.failed += len(batch)
} else {
log.Errorw("failed to prove commit sector(s)", log.Errorw("failed to prove commit sector(s)",
"error", err, "error", err,
"miner", minerAddr, "miner", minerAddr,
@ -196,13 +221,9 @@ func (ss *simulationState) packProveCommitsMiner(
"epoch", ss.nextEpoch(), "epoch", ss.nextEpoch(),
) )
res.failed += len(batch) res.failed += len(batch)
} else if full {
return res, true, nil
} else {
res.done += len(batch)
} }
pending.finish(sealType, batchSize) pending.finish(sealType, len(batch))
snos = snos[batchSize:] snos = snos[len(batch):]
} }
} }
for len(snos) > 0 && res.unbatched < power5.MaxMinerProveCommitsPerEpoch { for len(snos) > 0 && res.unbatched < power5.MaxMinerProveCommitsPerEpoch {
@ -211,7 +232,7 @@ func (ss *simulationState) packProveCommitsMiner(
proof, err := mockSealProof(sealType, minerAddr) proof, err := mockSealProof(sealType, minerAddr)
if err != nil { if err != nil {
return res, false, err return res, err
} }
params := miner.ProveCommitSectorParams{ params := miner.ProveCommitSectorParams{
SectorNumber: sno, SectorNumber: sno,
@ -219,18 +240,23 @@ func (ss *simulationState) packProveCommitsMiner(
} }
enc, err := actors.SerializeParams(&params) enc, err := actors.SerializeParams(&params)
if err != nil { if err != nil {
return res, false, err return res, err
} }
if full, err := sendAndFund(cb, &types.Message{ if _, err := sendAndFund(cb, &types.Message{
From: info.Worker, From: info.Worker,
To: minerAddr, To: minerAddr,
Value: abi.NewTokenAmount(0), Value: abi.NewTokenAmount(0),
Method: miner.Methods.ProveCommitSector, Method: miner.Methods.ProveCommitSector,
Params: enc, Params: enc,
}); err != nil { }); err == nil {
if aerr, ok := err.(aerrors.ActorError); !ok || aerr.IsFatal() { res.unbatched++
return res, false, err res.done++
} } else if err == ErrOutOfGas {
res.full = true
return res, nil
} else if aerr, ok := err.(aerrors.ActorError); !ok || aerr.IsFatal() {
return res, err
} else {
log.Errorw("failed to prove commit sector(s)", log.Errorw("failed to prove commit sector(s)",
"error", err, "error", err,
"miner", minerAddr, "miner", minerAddr,
@ -238,18 +264,13 @@ func (ss *simulationState) packProveCommitsMiner(
"epoch", ss.nextEpoch(), "epoch", ss.nextEpoch(),
) )
res.failed++ res.failed++
} else if full {
return res, true, nil
} else {
res.unbatched++
res.done++
} }
// mark it as "finished" regardless so we skip it. // mark it as "finished" regardless so we skip it.
pending.finish(sealType, 1) pending.finish(sealType, 1)
} }
// if we get here, we can't pre-commit anything more. // if we get here, we can't pre-commit anything more.
} }
return res, false, nil return res, nil
} }
// loadProveCommitsMiner enqueue all pending prove-commits for the given miner. This is called on // loadProveCommitsMiner enqueue all pending prove-commits for the given miner. This is called on

View File

@ -2,6 +2,7 @@ package simulation
import ( import (
"context" "context"
"errors"
"reflect" "reflect"
"runtime" "runtime"
"strings" "strings"
@ -61,7 +62,13 @@ func (ss *simulationState) step(ctx context.Context) (*types.TipSet, error) {
return head, nil return head, nil
} }
type packFunc func(*types.Message) (full bool, err error) var ErrOutOfGas = errors.New("out of gas")
// packFunc takes a message and attempts to pack it into a block.
//
// - If the block is full, returns the error ErrOutOfGas.
// - If message execution fails, check if error is an ActorError to get the return code.
type packFunc func(*types.Message) (*types.MessageReceipt, error)
// popNextMessages generates/picks a set of messages to be included in the next block. // popNextMessages generates/picks a set of messages to be included in the next block.
// //
@ -130,9 +137,9 @@ func (ss *simulationState) popNextMessages(ctx context.Context) ([]*types.Messag
vmStore := vmi.ActorStore(ctx) vmStore := vmi.ActorStore(ctx)
var gasTotal int64 var gasTotal int64
var messages []*types.Message var messages []*types.Message
tryPushMsg := func(msg *types.Message) (bool, error) { tryPushMsg := func(msg *types.Message) (*types.MessageReceipt, error) {
if gasTotal >= targetGas { if gasTotal >= targetGas {
return true, nil return nil, ErrOutOfGas
} }
// Copy the message before we start mutating it. // Copy the message before we start mutating it.
@ -142,17 +149,17 @@ func (ss *simulationState) popNextMessages(ctx context.Context) ([]*types.Messag
actor, err := st.GetActor(msg.From) actor, err := st.GetActor(msg.From)
if err != nil { if err != nil {
return false, err return nil, err
} }
msg.Nonce = actor.Nonce msg.Nonce = actor.Nonce
if msg.From.Protocol() == address.ID { if msg.From.Protocol() == address.ID {
state, err := account.Load(vmStore, actor) state, err := account.Load(vmStore, actor)
if err != nil { if err != nil {
return false, err return nil, err
} }
msg.From, err = state.PubkeyAddress() msg.From, err = state.PubkeyAddress()
if err != nil { if err != nil {
return false, err return nil, err
} }
} }
@ -172,17 +179,17 @@ func (ss *simulationState) popNextMessages(ctx context.Context) ([]*types.Messag
ret, err := vmi.ApplyMessage(ctx, msg) ret, err := vmi.ApplyMessage(ctx, msg)
if err != nil { if err != nil {
_ = st.Revert() _ = st.Revert()
return false, err return nil, err
} }
if ret.ActorErr != nil { if ret.ActorErr != nil {
_ = st.Revert() _ = st.Revert()
return false, ret.ActorErr return nil, ret.ActorErr
} }
// Sometimes there are bugs. Let's catch them. // Sometimes there are bugs. Let's catch them.
if ret.GasUsed == 0 { if ret.GasUsed == 0 {
_ = st.Revert() _ = st.Revert()
return false, xerrors.Errorf("used no gas", return nil, xerrors.Errorf("used no gas",
"msg", msg, "msg", msg,
"ret", ret, "ret", ret,
) )
@ -195,7 +202,7 @@ func (ss *simulationState) popNextMessages(ctx context.Context) ([]*types.Messag
newTotal := gasTotal + ret.GasUsed newTotal := gasTotal + ret.GasUsed
if newTotal > targetGas { if newTotal > targetGas {
_ = st.Revert() _ = st.Revert()
return true, nil return nil, ErrOutOfGas
} }
gasTotal = newTotal gasTotal = newTotal
@ -203,7 +210,7 @@ func (ss *simulationState) popNextMessages(ctx context.Context) ([]*types.Messag
msg.GasLimit = ret.GasUsed msg.GasLimit = ret.GasUsed
messages = append(messages, msg) messages = append(messages, msg)
return false, nil return &ret.MessageReceipt, nil
} }
// Finally, we generate a set of messages to be included in // Finally, we generate a set of messages to be included in
@ -232,7 +239,7 @@ func functionName(fn interface{}) string {
// true). // true).
// TODO: Make this more configurable for other simulations. // TODO: Make this more configurable for other simulations.
func (ss *simulationState) packMessages(ctx context.Context, cb packFunc) error { func (ss *simulationState) packMessages(ctx context.Context, cb packFunc) error {
type messageGenerator func(ctx context.Context, cb packFunc) (full bool, err error) type messageGenerator func(ctx context.Context, cb packFunc) error
// We pack messages in-order: // We pack messages in-order:
// 1. Any window posts. We pack window posts as soon as the deadline opens to ensure we only // 1. Any window posts. We pack window posts as soon as the deadline opens to ensure we only
@ -248,8 +255,7 @@ func (ss *simulationState) packMessages(ctx context.Context, cb packFunc) error
for _, mgen := range messageGenerators { for _, mgen := range messageGenerators {
// We're intentionally ignoring the "full" signal so we can try to pack a few more // We're intentionally ignoring the "full" signal so we can try to pack a few more
// messages. // messages.
_, err := mgen(ctx, cb) if err := mgen(ctx, cb); err != nil && !xerrors.Is(err, ErrOutOfGas) {
if err != nil {
return xerrors.Errorf("when packing messages with %s: %w", functionName(mgen), err) return xerrors.Errorf("when packing messages with %s: %w", functionName(mgen), err)
} }
} }

View File

@ -28,10 +28,10 @@ func (sim *Simulation) postChainCommitInfo(ctx context.Context, epoch abi.ChainE
// packWindowPoSts packs window posts until either the block is full or all healty sectors // packWindowPoSts packs window posts until either the block is full or all healty sectors
// have been proven. It does not recover sectors. // have been proven. It does not recover sectors.
func (ss *simulationState) packWindowPoSts(ctx context.Context, cb packFunc) (full bool, _err error) { func (ss *simulationState) packWindowPoSts(ctx context.Context, cb packFunc) (_err error) {
// Push any new window posts into the queue. // Push any new window posts into the queue.
if err := ss.queueWindowPoSts(ctx); err != nil { if err := ss.queueWindowPoSts(ctx); err != nil {
return false, err return err
} }
done := 0 done := 0
failed := 0 failed := 0
@ -50,9 +50,9 @@ func (ss *simulationState) packWindowPoSts(ctx context.Context, cb packFunc) (fu
// Then pack as many as we can. // Then pack as many as we can.
for len(ss.pendingWposts) > 0 { for len(ss.pendingWposts) > 0 {
next := ss.pendingWposts[0] next := ss.pendingWposts[0]
if full, err := cb(next); err != nil { if _, err := cb(next); err != nil {
if aerr, ok := err.(aerrors.ActorError); !ok || aerr.IsFatal() { if aerr, ok := err.(aerrors.ActorError); !ok || aerr.IsFatal() {
return false, err return err
} }
log.Errorw("failed to submit windowed post", log.Errorw("failed to submit windowed post",
"error", err, "error", err,
@ -60,8 +60,6 @@ func (ss *simulationState) packWindowPoSts(ctx context.Context, cb packFunc) (fu
"epoch", ss.nextEpoch(), "epoch", ss.nextEpoch(),
) )
failed++ failed++
} else if full {
return true, nil
} else { } else {
done++ done++
} }
@ -69,7 +67,7 @@ func (ss *simulationState) packWindowPoSts(ctx context.Context, cb packFunc) (fu
ss.pendingWposts = ss.pendingWposts[1:] ss.pendingWposts = ss.pendingWposts[1:]
} }
ss.pendingWposts = nil ss.pendingWposts = nil
return false, nil return nil
} }
// stepWindowPoStsMiner enqueues all missing window posts for the current epoch for the given miner. // stepWindowPoStsMiner enqueues all missing window posts for the current epoch for the given miner.