From 86e459f58502833ffb8d49082026123ec291110c Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 9 Jun 2021 13:47:38 -0700 Subject: [PATCH] feat(lotus-sim): return receipt Instead of returning a "full" boolean and an error, return a receipt and an error. We now use the error to indicate if the block is "full". --- cmd/lotus-sim/simulation/precommit.go | 45 +++--- cmd/lotus-sim/simulation/provecommit.go | 173 +++++++++++++----------- cmd/lotus-sim/simulation/step.go | 34 +++-- cmd/lotus-sim/simulation/wdpost.go | 12 +- 4 files changed, 148 insertions(+), 116 deletions(-) diff --git a/cmd/lotus-sim/simulation/precommit.go b/cmd/lotus-sim/simulation/precommit.go index 38b745a52..b048aa66c 100644 --- a/cmd/lotus-sim/simulation/precommit.go +++ b/cmd/lotus-sim/simulation/precommit.go @@ -31,8 +31,11 @@ func makeCommR(minerAddr address.Address, sno abi.SectorNumber) cid.Cid { } // packPreCommits packs pre-commit messages until the block is full. -func (ss *simulationState) packPreCommits(ctx context.Context, cb packFunc) (full bool, _err error) { - var top1Count, top10Count, restCount int +func (ss *simulationState) packPreCommits(ctx context.Context, cb packFunc) (_err error) { + var ( + full bool + top1Count, top10Count, restCount int + ) defer func() { if _err != nil { return @@ -74,16 +77,20 @@ func (ss *simulationState) packPreCommits(ctx context.Context, cb packFunc) (ful restMiners++ default: // Well, we've run through all miners. - return false, nil + return nil } - added, full, err := ss.packPreCommitsMiner(ctx, cb, minerAddr, maxProveCommitBatchSize) + var ( + added int + err error + ) + added, full, err = ss.packPreCommitsMiner(ctx, cb, minerAddr, maxProveCommitBatchSize) if err != nil { - return false, xerrors.Errorf("failed to pack precommits for miner %s: %w", minerAddr, err) + return xerrors.Errorf("failed to pack precommits for miner %s: %w", minerAddr, err) } *count += added if full { - return true, nil + return nil } } } @@ -111,17 +118,15 @@ func (ss *simulationState) packPreCommitsMiner(ctx context.Context, cb packFunc, } if big.Cmp(minerBalance, minFunds) < 0 { - full, err := cb(&types.Message{ + if _, err := cb(&types.Message{ From: builtin.BurntFundsActorAddr, To: minerAddr, Value: targetFunds, Method: builtin.MethodSend, - }) - if err != nil { - return 0, false, xerrors.Errorf("failed to fund miner %s: %w", minerAddr, err) - } - if full { + }); err == ErrOutOfGas { return 0, true, nil + } else if err != nil { + return 0, false, xerrors.Errorf("failed to fund miner %s: %w", minerAddr, err) } } @@ -168,18 +173,18 @@ func (ss *simulationState) packPreCommitsMiner(ctx context.Context, cb packFunc, } // NOTE: just in-case, sendAndFund will "fund" and re-try for any message // that fails due to "insufficient funds". - if full, err := sendAndFund(cb, &types.Message{ + if _, err := sendAndFund(cb, &types.Message{ To: minerAddr, From: minerInfo.Worker, Value: abi.NewTokenAmount(0), Method: miner.Methods.PreCommitSectorBatch, Params: enc, - }); err != nil { - return 0, false, err - } else if full { + }); err == ErrOutOfGas { // try again with a smaller batch. targetBatchSize /= 2 continue + } else if err != nil { + return 0, false, err } for _, info := range batch { @@ -196,14 +201,16 @@ func (ss *simulationState) packPreCommitsMiner(ctx context.Context, cb packFunc, if err != nil { return 0, false, err } - if full, err := sendAndFund(cb, &types.Message{ + if _, err := sendAndFund(cb, &types.Message{ To: minerAddr, From: minerInfo.Worker, Value: abi.NewTokenAmount(0), Method: miner.Methods.PreCommitSector, Params: enc, - }); full || err != nil { - return added, full, err + }); err == ErrOutOfGas { + return added, true, nil + } else if err != nil { + return added, false, err } if err := ss.commitQueue.enqueueProveCommit(minerAddr, epoch, info); err != nil { diff --git a/cmd/lotus-sim/simulation/provecommit.go b/cmd/lotus-sim/simulation/provecommit.go index 208af38a7..1a615d830 100644 --- a/cmd/lotus-sim/simulation/provecommit.go +++ b/cmd/lotus-sim/simulation/provecommit.go @@ -23,11 +23,11 @@ import ( // packProveCOmmits packs all prove-commits for all "ready to be proven" sectors until it fills the // block or runs out. -func (ss *simulationState) packProveCommits(ctx context.Context, cb packFunc) (_full bool, _err error) { +func (ss *simulationState) packProveCommits(ctx context.Context, cb packFunc) (_err error) { // Roll the commitQueue forward. ss.commitQueue.advanceEpoch(ss.nextEpoch()) - var failed, done, unbatched, count int + var full, failed, done, unbatched, count int defer func() { if _err != nil { return @@ -39,32 +39,33 @@ func (ss *simulationState) packProveCommits(ctx context.Context, cb packFunc) (_ "failed", failed, "unbatched", unbatched, "miners-processed", count, - "filled-block", _full, + "filled-block", full, ) }() for { addr, pending, ok := ss.commitQueue.nextMiner() if !ok { - return false, nil + return nil } - res, full, err := ss.packProveCommitsMiner(ctx, cb, addr, pending) + res, err := ss.packProveCommitsMiner(ctx, cb, addr, pending) if err != nil { - return false, err + return err } failed += res.failed done += res.done unbatched += res.unbatched count++ - if full { - return true, nil + if res.full { + return nil } } } type proveCommitResult struct { done, failed, unbatched int + full bool } // sendAndFund "packs" the given message, funding the actor if necessary. It: @@ -75,25 +76,24 @@ type proveCommitResult struct { // somewhere) and re-tries the message.0 // // NOTE: If the message fails a second time, the funds won't be "unsent". -func sendAndFund(send packFunc, msg *types.Message) (bool, error) { - full, err := send(msg) +func sendAndFund(send packFunc, msg *types.Message) (*types.MessageReceipt, error) { + res, err := send(msg) aerr, ok := err.(aerrors.ActorError) if !ok || aerr.RetCode() != exitcode.ErrInsufficientFunds { - return full, err + return res, err } // Ok, insufficient funds. Let's fund this miner and try again. - full, err = send(&types.Message{ + _, err = send(&types.Message{ From: builtin.BurntFundsActorAddr, To: msg.To, Value: targetFunds, Method: builtin.MethodSend, }) if err != nil { - return false, xerrors.Errorf("failed to fund %s: %w", msg.To, err) - } - // ok, nothing's going to work. - if full { - return true, nil + if err != ErrOutOfGas { + err = xerrors.Errorf("failed to fund %s: %w", msg.To, err) + } + return nil, err } return send(msg) } @@ -105,10 +105,10 @@ func sendAndFund(send packFunc, msg *types.Message) (bool, error) { func (ss *simulationState) packProveCommitsMiner( ctx context.Context, cb packFunc, minerAddr address.Address, pending minerPendingCommits, -) (res proveCommitResult, full bool, _err error) { +) (res proveCommitResult, _err error) { info, err := ss.getMinerInfo(ctx, minerAddr) if err != nil { - return res, false, err + return res, err } nv := ss.StateManager.GetNtwkVersion(ctx, ss.nextEpoch()) @@ -123,7 +123,7 @@ func (ss *simulationState) packProveCommitsMiner( proof, err := mockAggregateSealProof(sealType, minerAddr, batchSize) if err != nil { - return res, false, err + return res, err } params := miner5.ProveCommitAggregateParams{ @@ -136,24 +136,27 @@ func (ss *simulationState) packProveCommitsMiner( enc, err := actors.SerializeParams(¶ms) if err != nil { - return res, false, err + return res, err } - if full, err := sendAndFund(cb, &types.Message{ + if _, err := sendAndFund(cb, &types.Message{ From: info.Worker, To: minerAddr, Value: abi.NewTokenAmount(0), Method: miner.Methods.ProveCommitAggregate, Params: enc, - }); err != nil { + }); err == nil { + res.done += len(batch) + } else if err == ErrOutOfGas { + res.full = true + return res, nil + } else if aerr, ok := err.(aerrors.ActorError); !ok || aerr.IsFatal() { // If we get a random error, or a fatal actor error, bail. - aerr, ok := err.(aerrors.ActorError) - if !ok || aerr.IsFatal() { - return res, false, err - } - // If we get a "not-found" error, try to remove any missing - // prove-commits and continue. This can happen either - // because: + return res, err + } else if aerr.RetCode() == exitcode.ErrNotFound || aerr.RetCode() == exitcode.ErrIllegalArgument { + // If we get a "not-found" or illegal argument error, try to + // remove any missing prove-commits and continue. This can + // happen either because: // // 1. The pre-commit failed on execution (but not when // packing). This shouldn't happen, but we might as well @@ -161,34 +164,56 @@ func (ss *simulationState) packProveCommitsMiner( // 2. The pre-commit has expired. We'd have to be really // backloged to hit this case, but we might as well handle // it. - if aerr.RetCode() == exitcode.ErrNotFound { - // First, split into "good" and "missing" - good, err := ss.filterProveCommits(ctx, minerAddr, batch) - if err != nil { - log.Errorw("failed to filter prove commits", "miner", minerAddr, "error", err) - // fail with the original error. - return res, false, aerr - } - removed := len(batch) - len(good) - // If they're all missing, skip. If they're all good, skip too (and log). - if len(good) > 0 && removed > 0 { - res.failed += removed - - // update the pending sector numbers in-place to remove the expired ones. - snos = snos[removed:] - copy(snos, good) - pending.finish(sealType, removed) - - log.Errorw("failed to prove commit expired/missing pre-commits", - "error", aerr, - "miner", minerAddr, - "discarded", removed, - "kept", len(good), - "epoch", ss.nextEpoch(), - ) - continue - } + // First, split into "good" and "missing" + good, err := ss.filterProveCommits(ctx, minerAddr, batch) + if err != nil { + log.Errorw("failed to filter prove commits", "miner", minerAddr, "error", err) + // fail with the original error. + return res, aerr } + removed := len(batch) - len(good) + if removed == 0 { + log.Errorw("failed to prove-commit for unknown reasons", + "error", aerr, + "miner", minerAddr, + "sectors", batch, + "epoch", ss.nextEpoch(), + ) + res.failed += len(batch) + } else if len(good) == 0 { + log.Errorw("failed to prove commit missing pre-commits", + "error", aerr, + "miner", minerAddr, + "discarded", removed, + "epoch", ss.nextEpoch(), + ) + res.failed += len(batch) + } else { + // update the pending sector numbers in-place to remove the expired ones. + snos = snos[removed:] + copy(snos, good) + pending.finish(sealType, removed) + + log.Errorw("failed to prove commit expired/missing pre-commits", + "error", aerr, + "miner", minerAddr, + "discarded", removed, + "kept", len(good), + "epoch", ss.nextEpoch(), + ) + res.failed += removed + + // Then try again. + continue + } + log.Errorw("failed to prove commit missing sector(s)", + "error", err, + "miner", minerAddr, + "sectors", batch, + "epoch", ss.nextEpoch(), + ) + res.failed += len(batch) + } else { log.Errorw("failed to prove commit sector(s)", "error", err, "miner", minerAddr, @@ -196,13 +221,9 @@ func (ss *simulationState) packProveCommitsMiner( "epoch", ss.nextEpoch(), ) res.failed += len(batch) - } else if full { - return res, true, nil - } else { - res.done += len(batch) } - pending.finish(sealType, batchSize) - snos = snos[batchSize:] + pending.finish(sealType, len(batch)) + snos = snos[len(batch):] } } for len(snos) > 0 && res.unbatched < power5.MaxMinerProveCommitsPerEpoch { @@ -211,7 +232,7 @@ func (ss *simulationState) packProveCommitsMiner( proof, err := mockSealProof(sealType, minerAddr) if err != nil { - return res, false, err + return res, err } params := miner.ProveCommitSectorParams{ SectorNumber: sno, @@ -219,18 +240,23 @@ func (ss *simulationState) packProveCommitsMiner( } enc, err := actors.SerializeParams(¶ms) if err != nil { - return res, false, err + return res, err } - if full, err := sendAndFund(cb, &types.Message{ + if _, err := sendAndFund(cb, &types.Message{ From: info.Worker, To: minerAddr, Value: abi.NewTokenAmount(0), Method: miner.Methods.ProveCommitSector, Params: enc, - }); err != nil { - if aerr, ok := err.(aerrors.ActorError); !ok || aerr.IsFatal() { - return res, false, err - } + }); err == nil { + res.unbatched++ + res.done++ + } else if err == ErrOutOfGas { + res.full = true + return res, nil + } else if aerr, ok := err.(aerrors.ActorError); !ok || aerr.IsFatal() { + return res, err + } else { log.Errorw("failed to prove commit sector(s)", "error", err, "miner", minerAddr, @@ -238,18 +264,13 @@ func (ss *simulationState) packProveCommitsMiner( "epoch", ss.nextEpoch(), ) res.failed++ - } else if full { - return res, true, nil - } else { - res.unbatched++ - res.done++ } // mark it as "finished" regardless so we skip it. pending.finish(sealType, 1) } // if we get here, we can't pre-commit anything more. } - return res, false, nil + return res, nil } // loadProveCommitsMiner enqueue all pending prove-commits for the given miner. This is called on diff --git a/cmd/lotus-sim/simulation/step.go b/cmd/lotus-sim/simulation/step.go index b99f318d6..e076b7b86 100644 --- a/cmd/lotus-sim/simulation/step.go +++ b/cmd/lotus-sim/simulation/step.go @@ -2,6 +2,7 @@ package simulation import ( "context" + "errors" "reflect" "runtime" "strings" @@ -61,7 +62,13 @@ func (ss *simulationState) step(ctx context.Context) (*types.TipSet, error) { return head, nil } -type packFunc func(*types.Message) (full bool, err error) +var ErrOutOfGas = errors.New("out of gas") + +// packFunc takes a message and attempts to pack it into a block. +// +// - If the block is full, returns the error ErrOutOfGas. +// - If message execution fails, check if error is an ActorError to get the return code. +type packFunc func(*types.Message) (*types.MessageReceipt, error) // popNextMessages generates/picks a set of messages to be included in the next block. // @@ -130,9 +137,9 @@ func (ss *simulationState) popNextMessages(ctx context.Context) ([]*types.Messag vmStore := vmi.ActorStore(ctx) var gasTotal int64 var messages []*types.Message - tryPushMsg := func(msg *types.Message) (bool, error) { + tryPushMsg := func(msg *types.Message) (*types.MessageReceipt, error) { if gasTotal >= targetGas { - return true, nil + return nil, ErrOutOfGas } // Copy the message before we start mutating it. @@ -142,17 +149,17 @@ func (ss *simulationState) popNextMessages(ctx context.Context) ([]*types.Messag actor, err := st.GetActor(msg.From) if err != nil { - return false, err + return nil, err } msg.Nonce = actor.Nonce if msg.From.Protocol() == address.ID { state, err := account.Load(vmStore, actor) if err != nil { - return false, err + return nil, err } msg.From, err = state.PubkeyAddress() if err != nil { - return false, err + return nil, err } } @@ -172,17 +179,17 @@ func (ss *simulationState) popNextMessages(ctx context.Context) ([]*types.Messag ret, err := vmi.ApplyMessage(ctx, msg) if err != nil { _ = st.Revert() - return false, err + return nil, err } if ret.ActorErr != nil { _ = st.Revert() - return false, ret.ActorErr + return nil, ret.ActorErr } // Sometimes there are bugs. Let's catch them. if ret.GasUsed == 0 { _ = st.Revert() - return false, xerrors.Errorf("used no gas", + return nil, xerrors.Errorf("used no gas", "msg", msg, "ret", ret, ) @@ -195,7 +202,7 @@ func (ss *simulationState) popNextMessages(ctx context.Context) ([]*types.Messag newTotal := gasTotal + ret.GasUsed if newTotal > targetGas { _ = st.Revert() - return true, nil + return nil, ErrOutOfGas } gasTotal = newTotal @@ -203,7 +210,7 @@ func (ss *simulationState) popNextMessages(ctx context.Context) ([]*types.Messag msg.GasLimit = ret.GasUsed messages = append(messages, msg) - return false, nil + return &ret.MessageReceipt, nil } // Finally, we generate a set of messages to be included in @@ -232,7 +239,7 @@ func functionName(fn interface{}) string { // true). // TODO: Make this more configurable for other simulations. func (ss *simulationState) packMessages(ctx context.Context, cb packFunc) error { - type messageGenerator func(ctx context.Context, cb packFunc) (full bool, err error) + type messageGenerator func(ctx context.Context, cb packFunc) error // We pack messages in-order: // 1. Any window posts. We pack window posts as soon as the deadline opens to ensure we only @@ -248,8 +255,7 @@ func (ss *simulationState) packMessages(ctx context.Context, cb packFunc) error for _, mgen := range messageGenerators { // We're intentionally ignoring the "full" signal so we can try to pack a few more // messages. - _, err := mgen(ctx, cb) - if err != nil { + if err := mgen(ctx, cb); err != nil && !xerrors.Is(err, ErrOutOfGas) { return xerrors.Errorf("when packing messages with %s: %w", functionName(mgen), err) } } diff --git a/cmd/lotus-sim/simulation/wdpost.go b/cmd/lotus-sim/simulation/wdpost.go index c940c8d51..3e8d482ff 100644 --- a/cmd/lotus-sim/simulation/wdpost.go +++ b/cmd/lotus-sim/simulation/wdpost.go @@ -28,10 +28,10 @@ func (sim *Simulation) postChainCommitInfo(ctx context.Context, epoch abi.ChainE // packWindowPoSts packs window posts until either the block is full or all healty sectors // have been proven. It does not recover sectors. -func (ss *simulationState) packWindowPoSts(ctx context.Context, cb packFunc) (full bool, _err error) { +func (ss *simulationState) packWindowPoSts(ctx context.Context, cb packFunc) (_err error) { // Push any new window posts into the queue. if err := ss.queueWindowPoSts(ctx); err != nil { - return false, err + return err } done := 0 failed := 0 @@ -50,9 +50,9 @@ func (ss *simulationState) packWindowPoSts(ctx context.Context, cb packFunc) (fu // Then pack as many as we can. for len(ss.pendingWposts) > 0 { next := ss.pendingWposts[0] - if full, err := cb(next); err != nil { + if _, err := cb(next); err != nil { if aerr, ok := err.(aerrors.ActorError); !ok || aerr.IsFatal() { - return false, err + return err } log.Errorw("failed to submit windowed post", "error", err, @@ -60,8 +60,6 @@ func (ss *simulationState) packWindowPoSts(ctx context.Context, cb packFunc) (fu "epoch", ss.nextEpoch(), ) failed++ - } else if full { - return true, nil } else { done++ } @@ -69,7 +67,7 @@ func (ss *simulationState) packWindowPoSts(ctx context.Context, cb packFunc) (fu ss.pendingWposts = ss.pendingWposts[1:] } ss.pendingWposts = nil - return false, nil + return nil } // stepWindowPoStsMiner enqueues all missing window posts for the current epoch for the given miner.