feat(lotus-sim): refactor and document

Hopefully, this'll make this code a bit easier to approach.
This commit is contained in:
Steven Allen 2021-06-07 17:45:53 -07:00
parent b7bfc06ebe
commit 2f7d7aed31
13 changed files with 446 additions and 251 deletions

View File

@ -0,0 +1,38 @@
package simulation
import (
"math/rand"
"github.com/filecoin-project/go-address"
)
// actorIter is a simple persistent iterator that loops over a set of actors.
type actorIter struct {
actors []address.Address
offset int
}
// shuffle randomly permutes the set of actors.
func (p *actorIter) shuffle() {
rand.Shuffle(len(p.actors), func(i, j int) {
p.actors[i], p.actors[j] = p.actors[j], p.actors[i]
})
}
// next returns the next actor's address and advances the iterator.
func (p *actorIter) next() address.Address {
next := p.actors[p.offset]
p.offset++
p.offset %= len(p.actors)
return next
}
// add adds a new actor to the iterator.
func (p *actorIter) add(addr address.Address) {
p.actors = append(p.actors, addr)
}
// len returns the number of actors in the iterator.
func (p *actorIter) len() int {
return len(p.actors)
}

View File

@ -0,0 +1,81 @@
package simulation
import (
"context"
"crypto/sha256"
"encoding/binary"
"time"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"golang.org/x/xerrors"
)
const beaconPrefix = "mockbeacon:"
func (sim *Simulation) nextBeaconEntries() []types.BeaconEntry {
parentBeacons := sim.head.Blocks()[0].BeaconEntries
lastBeacon := parentBeacons[len(parentBeacons)-1]
beaconRound := lastBeacon.Round + 1
buf := make([]byte, len(beaconPrefix)+8)
copy(buf, beaconPrefix)
binary.BigEndian.PutUint64(buf[len(beaconPrefix):], beaconRound)
beaconRand := sha256.Sum256(buf)
return []types.BeaconEntry{{
Round: beaconRound,
Data: beaconRand[:],
}}
}
func (sim *Simulation) nextTicket() *types.Ticket {
newProof := sha256.Sum256(sim.head.MinTicket().VRFProof)
return &types.Ticket{
VRFProof: newProof[:],
}
}
func (sim *Simulation) makeTipSet(ctx context.Context, messages []*types.Message) (*types.TipSet, error) {
parentTs := sim.head
parentState, parentRec, err := sim.sm.TipSetState(ctx, parentTs)
if err != nil {
return nil, xerrors.Errorf("failed to compute parent tipset: %w", err)
}
msgsCid, err := sim.storeMessages(ctx, messages)
if err != nil {
return nil, xerrors.Errorf("failed to store block messages: %w", err)
}
uts := parentTs.MinTimestamp() + build.BlockDelaySecs
blks := []*types.BlockHeader{{
Miner: parentTs.MinTicketBlock().Miner, // keep reusing the same miner.
Ticket: sim.nextTicket(),
BeaconEntries: sim.nextBeaconEntries(),
Parents: parentTs.Cids(),
Height: parentTs.Height() + 1,
ParentStateRoot: parentState,
ParentMessageReceipts: parentRec,
Messages: msgsCid,
ParentBaseFee: baseFee,
Timestamp: uts,
ElectionProof: &types.ElectionProof{WinCount: 1},
}}
err = sim.Chainstore.PersistBlockHeaders(blks...)
if err != nil {
return nil, xerrors.Errorf("failed to persist block headers: %w", err)
}
newTipSet, err := types.NewTipSet(blks)
if err != nil {
return nil, xerrors.Errorf("failed to create new tipset: %w", err)
}
now := time.Now()
_, _, err = sim.sm.TipSetState(ctx, newTipSet)
if err != nil {
return nil, xerrors.Errorf("failed to compute new tipset: %w", err)
}
duration := time.Since(now)
log.Infow("computed tipset", "duration", duration, "height", newTipSet.Height())
return newTipSet, nil
}

View File

@ -9,9 +9,13 @@ import (
"github.com/filecoin-project/lotus/chain/actors/policy"
)
// pendingCommitTracker tracks pending commits per-miner for a single epohc.
type pendingCommitTracker map[address.Address]minerPendingCommits
// minerPendingCommits tracks a miner's pending commits during a single epoch (grouped by seal proof type).
type minerPendingCommits map[abi.RegisteredSealProof][]abi.SectorNumber
// finish markes count sectors of the given proof type as "prove-committed".
func (m minerPendingCommits) finish(proof abi.RegisteredSealProof, count int) {
snos := m[proof]
if len(snos) < count {
@ -23,10 +27,12 @@ func (m minerPendingCommits) finish(proof abi.RegisteredSealProof, count int) {
}
}
// empty returns true if there are no pending commits.
func (m minerPendingCommits) empty() bool {
return len(m) == 0
}
// count returns the number of pending commits.
func (m minerPendingCommits) count() int {
count := 0
for _, snos := range m {
@ -35,12 +41,17 @@ func (m minerPendingCommits) count() int {
return count
}
// commitQueue is used to track pending prove-commits.
//
// Miners are processed in round-robin where _all_ commits from a given miner are finished before
// moving on to the next. This is designed to maximize batching.
type commitQueue struct {
minerQueue []address.Address
queue []pendingCommitTracker
offset abi.ChainEpoch
}
// ready returns the number of prove-commits ready to be proven at the current epoch. Useful for logging.
func (q *commitQueue) ready() int {
if len(q.queue) == 0 {
return 0
@ -52,6 +63,9 @@ func (q *commitQueue) ready() int {
return count
}
// nextMiner returns the next miner to be proved and the set of pending prove commits for that
// miner. When some number of sectors have successfully been proven, call "finish" so we don't try
// to prove them again.
func (q *commitQueue) nextMiner() (address.Address, minerPendingCommits, bool) {
if len(q.queue) == 0 {
return address.Undef, nil, false
@ -72,6 +86,8 @@ func (q *commitQueue) nextMiner() (address.Address, minerPendingCommits, bool) {
return address.Undef, nil, false
}
// advanceEpoch will advance to the next epoch. If some sectors were left unproven in the current
// epoch, they will be "prepended" into the next epochs sector set.
func (q *commitQueue) advanceEpoch(epoch abi.ChainEpoch) {
if epoch < q.offset {
panic("cannot roll epoch backwards")
@ -146,6 +162,7 @@ func (q *commitQueue) advanceEpoch(epoch abi.ChainEpoch) {
})
}
// enquueProveCommit enqueues prove-commit for the given pre-commit for the given miner.
func (q *commitQueue) enqueueProveCommit(addr address.Address, preCommitEpoch abi.ChainEpoch, info miner.SectorPreCommitInfo) error {
// Compute the epoch at which we can start trying to commit.
preCommitDelay := policy.GetPreCommitChallengeDelay()
@ -178,10 +195,3 @@ func (q *commitQueue) enqueueProveCommit(addr address.Address, preCommitEpoch ab
minerPending[info.SealProof] = append(minerPending[info.SealProof], info.SectorNumber)
return nil
}
func (q *commitQueue) head() pendingCommitTracker {
if len(q.queue) > 0 {
return q.queue[0]
}
return nil
}

View File

@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)
// toArray converts the given set of CIDs to an AMT. This is usually used to pack messages into blocks.
func toArray(store blockadt.Store, cids []cid.Cid) (cid.Cid, error) {
arr := blockadt.MakeEmptyArray(store)
for i, c := range cids {
@ -26,6 +27,8 @@ func toArray(store blockadt.Store, cids []cid.Cid) (cid.Cid, error) {
return arr.Root()
}
// storeMessages packs a set of messages into a types.MsgMeta and returns the resulting CID. The
// resulting CID is valid for the BlocKHeader's Messages field.
func (nd *Node) storeMessages(ctx context.Context, messages []*types.Message) (cid.Cid, error) {
var blsMessages, sekpMessages []cid.Cid
fakeSig := make([]byte, 32)

View File

@ -8,6 +8,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
)
@ -21,74 +22,11 @@ const (
mockPoStProofPrefix = "valid post proof:"
)
func mockSealProof(proofType abi.RegisteredSealProof, minerAddr address.Address) ([]byte, error) {
plen, err := proofType.ProofSize()
if err != nil {
return nil, err
}
proof := make([]byte, plen)
i := copy(proof, mockSealProofPrefix)
binary.BigEndian.PutUint64(proof[i:], uint64(proofType))
i += 8
i += copy(proof[i:], minerAddr.Bytes())
return proof, nil
}
func mockAggregateSealProof(proofType abi.RegisteredSealProof, minerAddr address.Address, count int) ([]byte, error) {
proof := make([]byte, aggProofLen(count))
i := copy(proof, mockAggregateSealProofPrefix)
binary.BigEndian.PutUint64(proof[i:], uint64(proofType))
i += 8
binary.BigEndian.PutUint64(proof[i:], uint64(count))
i += 8
i += copy(proof[i:], minerAddr.Bytes())
return proof, nil
}
func mockWpostProof(proofType abi.RegisteredPoStProof, minerAddr address.Address) ([]byte, error) {
plen, err := proofType.ProofSize()
if err != nil {
return nil, err
}
proof := make([]byte, plen)
i := copy(proof, mockPoStProofPrefix)
i += copy(proof[i:], minerAddr.Bytes())
return proof, nil
}
// TODO: dedup
func aggProofLen(nproofs int) int {
switch {
case nproofs <= 8:
return 11220
case nproofs <= 16:
return 14196
case nproofs <= 32:
return 17172
case nproofs <= 64:
return 20148
case nproofs <= 128:
return 23124
case nproofs <= 256:
return 26100
case nproofs <= 512:
return 29076
case nproofs <= 1024:
return 32052
case nproofs <= 2048:
return 35028
case nproofs <= 4096:
return 38004
case nproofs <= 8192:
return 40980
default:
panic("too many proofs")
}
}
// mockVerifier is a simple mock for verifying "fake" proofs.
type mockVerifier struct{}
var _ ffiwrapper.Verifier = mockVerifier{}
func (mockVerifier) VerifySeal(proof proof5.SealVerifyInfo) (bool, error) {
addr, err := address.NewIDAddress(uint64(proof.Miner))
if err != nil {
@ -134,3 +72,74 @@ func (mockVerifier) VerifyWindowPoSt(ctx context.Context, info proof5.WindowPoSt
func (mockVerifier) GenerateWinningPoStSectorChallenge(context.Context, abi.RegisteredPoStProof, abi.ActorID, abi.PoStRandomness, uint64) ([]uint64, error) {
panic("should not be called")
}
// mockSealProof generates a mock "seal" proof tied to the specified proof type and the given miner.
func mockSealProof(proofType abi.RegisteredSealProof, minerAddr address.Address) ([]byte, error) {
plen, err := proofType.ProofSize()
if err != nil {
return nil, err
}
proof := make([]byte, plen)
i := copy(proof, mockSealProofPrefix)
binary.BigEndian.PutUint64(proof[i:], uint64(proofType))
i += 8
i += copy(proof[i:], minerAddr.Bytes())
return proof, nil
}
// mockAggregateSealProof generates a mock "seal" aggregate proof tied to the specified proof type,
// the given miner, and the number of proven sectors.
func mockAggregateSealProof(proofType abi.RegisteredSealProof, minerAddr address.Address, count int) ([]byte, error) {
proof := make([]byte, aggProofLen(count))
i := copy(proof, mockAggregateSealProofPrefix)
binary.BigEndian.PutUint64(proof[i:], uint64(proofType))
i += 8
binary.BigEndian.PutUint64(proof[i:], uint64(count))
i += 8
i += copy(proof[i:], minerAddr.Bytes())
return proof, nil
}
// mockWpostProof generates a mock "window post" proof tied to the specified proof type, and the
// given miner.
func mockWpostProof(proofType abi.RegisteredPoStProof, minerAddr address.Address) ([]byte, error) {
plen, err := proofType.ProofSize()
if err != nil {
return nil, err
}
proof := make([]byte, plen)
i := copy(proof, mockPoStProofPrefix)
i += copy(proof[i:], minerAddr.Bytes())
return proof, nil
}
// TODO: dedup
func aggProofLen(nproofs int) int {
switch {
case nproofs <= 8:
return 11220
case nproofs <= 16:
return 14196
case nproofs <= 32:
return 17172
case nproofs <= 64:
return 20148
case nproofs <= 128:
return 23124
case nproofs <= 256:
return 26100
case nproofs <= 512:
return 29076
case nproofs <= 1024:
return 32052
case nproofs <= 2048:
return 35028
case nproofs <= 4096:
return 38004
case nproofs <= 8192:
return 40980
default:
panic("too many proofs")
}
}

View File

@ -19,6 +19,7 @@ import (
"github.com/filecoin-project/lotus/node/repo"
)
// Node represents the local lotus node, or at least the part of it we care about.
type Node struct {
Repo repo.LockedRepo
Blockstore blockstore.Blockstore
@ -26,6 +27,7 @@ type Node struct {
Chainstore *store.ChainStore
}
// OpenNode opens the local lotus node for writing. This will fail if the node is online.
func OpenNode(ctx context.Context, path string) (*Node, error) {
var node Node
r, err := repo.NewFS(path)
@ -55,6 +57,7 @@ func OpenNode(ctx context.Context, path string) (*Node, error) {
return &node, nil
}
// Close cleanly close the node. Please call this on shutdown to make sure everything is flushed.
func (nd *Node) Close() error {
var err error
if closer, ok := nd.Blockstore.(io.Closer); ok && closer != nil {
@ -69,6 +72,7 @@ func (nd *Node) Close() error {
return err
}
// LoadSim loads
func (nd *Node) LoadSim(ctx context.Context, name string) (*Simulation, error) {
sim := &Simulation{
Node: nd,
@ -103,6 +107,10 @@ func (nd *Node) LoadSim(ctx context.Context, name string) (*Simulation, error) {
return sim, nil
}
// Create creates a new simulation.
//
// - This will fail if a simulation already exists with the given name.
// - Name must not contain a '/'.
func (nd *Node) CreateSim(ctx context.Context, name string, head *types.TipSet) (*Simulation, error) {
if strings.Contains(name, "/") {
return nil, xerrors.Errorf("simulation name %q cannot contain a '/'", name)
@ -125,6 +133,7 @@ func (nd *Node) CreateSim(ctx context.Context, name string, head *types.TipSet)
return sim, nil
}
// ListSims lists all simulations.
func (nd *Node) ListSims(ctx context.Context) ([]string, error) {
prefix := simulationPrefix.ChildString("head").String()
items, err := nd.MetadataDS.Query(query.Query{
@ -153,6 +162,9 @@ func (nd *Node) ListSims(ctx context.Context) ([]string, error) {
}
}
// DeleteSim deletes a simulation and all related metadata.
//
// NOTE: This function does not delete associated messages, blocks, or chain state.
func (nd *Node) DeleteSim(ctx context.Context, name string) error {
// TODO: make this a bit more generic?
keys := []datastore.Key{

View File

@ -12,10 +12,6 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin/power"
)
type powerInfo struct {
powerLookback, powerNow abi.StoragePower
}
// Load all power claims at the given height.
func (sim *Simulation) loadClaims(ctx context.Context, height abi.ChainEpoch) (map[address.Address]power.Claim, error) {
powerTable := make(map[address.Address]power.Claim)

View File

@ -20,15 +20,17 @@ import (
tutils "github.com/filecoin-project/specs-actors/v5/support/testing"
)
func makeCommR(minerAddr address.Address, sno abi.SectorNumber) cid.Cid {
return tutils.MakeCID(fmt.Sprintf("%s:%d", minerAddr, sno), &miner5.SealedCIDPrefix)
}
var (
targetFunds = abi.TokenAmount(types.MustParseFIL("1000FIL"))
minFunds = abi.TokenAmount(types.MustParseFIL("100FIL"))
)
// makeCommR generates a "fake" but valid CommR for a sector. It is unique for the given sector/miner.
func makeCommR(minerAddr address.Address, sno abi.SectorNumber) cid.Cid {
return tutils.MakeCID(fmt.Sprintf("%s:%d", minerAddr, sno), &miner5.SealedCIDPrefix)
}
// packPreCommits packs pre-commit messages until the block is full.
func (ss *simulationState) packPreCommits(ctx context.Context, cb packFunc) (full bool, _err error) {
var top1Count, top10Count, restCount int
defer func() {
@ -50,6 +52,13 @@ func (ss *simulationState) packPreCommits(ctx context.Context, cb packFunc) (ful
minerAddr address.Address
count *int
)
// We pre-commit for the top 1%, 10%, and the of the network 1/3rd of the time each.
// This won't yeild the most accurate distribution... but it'll give us a good
// enough distribution.
// NOTE: We submit at most _one_ 819 sector batch per-miner per-block. See the
// comment on packPreCommitsMiner for why. We should fix this.
switch {
case (i%3) <= 0 && top1Miners < ss.minerDist.top1.len():
count = &top1Count
@ -67,6 +76,7 @@ func (ss *simulationState) packPreCommits(ctx context.Context, cb packFunc) (ful
// Well, we've run through all miners.
return false, nil
}
added, full, err := ss.packPreCommitsMiner(ctx, cb, minerAddr, maxProveCommitBatchSize)
if err != nil {
return false, xerrors.Errorf("failed to pack precommits for miner %s: %w", minerAddr, err)
@ -78,7 +88,10 @@ func (ss *simulationState) packPreCommits(ctx context.Context, cb packFunc) (ful
}
}
// packPreCommitsMiner packs count pre-commits for the given miner. This should only be called once
// per-miner, per-epoch to avoid packing multiple pre-commits with the same sector numbers.
func (ss *simulationState) packPreCommitsMiner(ctx context.Context, cb packFunc, minerAddr address.Address, count int) (int, bool, error) {
// Load everything.
epoch := ss.nextEpoch()
nv := ss.sm.GetNtwkVersion(ctx, epoch)
st, err := ss.stateTree(ctx)
@ -120,6 +133,7 @@ func (ss *simulationState) packPreCommitsMiner(ctx context.Context, cb packFunc,
}
}
// Generate pre-commits.
sealType, err := miner.PreferredSealProofTypeFromWindowPoStType(
nv, minerInfo.WindowPoStProofType,
)
@ -143,6 +157,8 @@ func (ss *simulationState) packPreCommitsMiner(ctx context.Context, cb packFunc,
Expiration: expiration,
}
}
// Commit the pre-commits.
added := 0
if nv >= network.Version13 {
targetBatchSize := maxPreCommitBatchSize
@ -158,6 +174,8 @@ func (ss *simulationState) packPreCommitsMiner(ctx context.Context, cb packFunc,
if err != nil {
return 0, false, err
}
// NOTE: just in-case, sendAndFund will "fund" and re-try for any message
// that fails due to "insufficient funds".
if full, err := sendAndFund(cb, &types.Message{
To: minerAddr,
From: minerInfo.Worker,

View File

@ -21,7 +21,10 @@ import (
power5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/power"
)
// packProveCOmmits packs all prove-commits for all "ready to be proven" sectors until it fills the
// block or runs out.
func (ss *simulationState) packProveCommits(ctx context.Context, cb packFunc) (_full bool, _err error) {
// Roll the commitQueue forward.
ss.commitQueue.advanceEpoch(ss.nextEpoch())
var failed, done, unbatched, count int
@ -64,6 +67,14 @@ type proveCommitResult struct {
done, failed, unbatched int
}
// sendAndFund "packs" the given message, funding the actor if necessary. It:
//
// 1. Tries to send the given message.
// 2. If that fails, it checks to see if the exit code was ErrInsufficientFunds.
// 3. If so, it sends 1K FIL from the "burnt funds actor" (because we need to send it from
// somewhere) and re-tries the message.0
//
// NOTE: If the message fails a second time, the funds won't be "unsent".
func sendAndFund(send packFunc, msg *types.Message) (bool, error) {
full, err := send(msg)
aerr, ok := err.(aerrors.ActorError)
@ -87,7 +98,10 @@ func sendAndFund(send packFunc, msg *types.Message) (bool, error) {
return send(msg)
}
// Enqueue a single prove commit from the given miner.
// packProveCommitsMiner enqueues a prove commits from the given miner until it runs out of
// available prove-commits, batching as much as possible.
//
// This function will fund as necessary from the "burnt funds actor" (look, it's convenient).
func (ss *simulationState) packProveCommitsMiner(
ctx context.Context, cb packFunc, minerAddr address.Address,
pending minerPendingCommits,
@ -200,7 +214,10 @@ func (ss *simulationState) packProveCommitsMiner(
return res, false, nil
}
// Enqueue all pending prove-commits for the given miner.
// loadProveCommitsMiner enqueue all pending prove-commits for the given miner. This is called on
// load to populate the commitQueue and should not need to be called later.
//
// It will drop any pre-commits that have already expired.
func (ss *simulationState) loadProveCommitsMiner(ctx context.Context, addr address.Address, minerState miner.State) error {
// Find all pending prove commits and group by proof type. Really, there should never
// (except during upgrades be more than one type.

View File

@ -2,10 +2,7 @@ package simulation
import (
"context"
"crypto/sha256"
"encoding/binary"
"encoding/json"
"time"
"golang.org/x/xerrors"
@ -13,9 +10,7 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/stmgr"
@ -35,16 +30,23 @@ const (
maxProveCommitBatchSize = miner5.MaxAggregatedSectors
)
// config is the simulation's config, persisted to the local metadata store and loaded on start.
//
// See simulationState.loadConfig and simulationState.saveConfig.
type config struct {
Upgrades map[network.Version]abi.ChainEpoch
}
// upgradeSchedule constructs an stmgr.StateManager upgrade schedule, overriding any network upgrade
// epochs as specified in the config.
func (c *config) upgradeSchedule() (stmgr.UpgradeSchedule, error) {
upgradeSchedule := stmgr.DefaultUpgradeSchedule()
expected := make(map[network.Version]struct{}, len(c.Upgrades))
for nv := range c.Upgrades {
expected[nv] = struct{}{}
}
// Update network upgrade epochs.
newUpgradeSchedule := upgradeSchedule[:0]
for _, upgrade := range upgradeSchedule {
if height, ok := c.Upgrades[upgrade.Network]; ok {
@ -56,6 +58,8 @@ func (c *config) upgradeSchedule() (stmgr.UpgradeSchedule, error) {
}
newUpgradeSchedule = append(newUpgradeSchedule, upgrade)
}
// Make sure we didn't try to configure an unknown network version.
if len(expected) > 0 {
missing := make([]network.Version, 0, len(expected))
for nv := range expected {
@ -63,9 +67,16 @@ func (c *config) upgradeSchedule() (stmgr.UpgradeSchedule, error) {
}
return nil, xerrors.Errorf("unknown network versions %v in config", missing)
}
// Finally, validate it. This ensures we don't change the order of the upgrade or anything
// like that.
if err := newUpgradeSchedule.Validate(); err != nil {
return nil, err
}
return newUpgradeSchedule, nil
}
// Simulation specifies a lotus-sim simulation.
type Simulation struct {
*Node
@ -82,6 +93,8 @@ type Simulation struct {
state *simulationState
}
// loadConfig loads a simulation's config from the datastore. This must be called on startup and may
// be called to restore the config from-disk.
func (sim *Simulation) loadConfig() error {
configBytes, err := sim.MetadataDS.Get(sim.key("config"))
if err == nil {
@ -97,6 +110,18 @@ func (sim *Simulation) loadConfig() error {
return nil
}
// saveConfig saves the current config to the datastore. This must be called whenever the config is
// changed.
func (sim *Simulation) saveConfig() error {
buf, err := json.Marshal(sim.config)
if err != nil {
return err
}
return sim.MetadataDS.Put(sim.key("config"), buf)
}
// stateTree returns the current state-tree for the current head, computing the tipset if necessary.
// The state-tree is cached until the head is changed.
func (sim *Simulation) stateTree(ctx context.Context) (*state.StateTree, error) {
if sim.st == nil {
st, _, err := sim.sm.TipSetState(ctx, sim.head)
@ -128,6 +153,8 @@ func (sim *Simulation) simState(ctx context.Context) (*simulationState, error) {
var simulationPrefix = datastore.NewKey("/simulation")
// key returns the the key in the form /simulation/<subkey>/<simulation-name>. For example,
// /simulation/head/default.
func (sim *Simulation) key(subkey string) datastore.Key {
return simulationPrefix.ChildString(subkey).ChildString(sim.name)
}
@ -139,14 +166,18 @@ func (sim *Simulation) Load(ctx context.Context) error {
return err
}
// GetHead returns the current simulation head.
func (sim *Simulation) GetHead() *types.TipSet {
return sim.head
}
// GetNetworkVersion returns the current network version for the simulation.
func (sim *Simulation) GetNetworkVersion() network.Version {
return sim.sm.GetNtwkVersion(context.TODO(), sim.head.Height())
}
// SetHead updates the current head of the simulation and stores it in the metadata store. This is
// called for every Simulation.Step.
func (sim *Simulation) SetHead(head *types.TipSet) error {
if err := sim.MetadataDS.Put(sim.key("head"), head.Key().Bytes()); err != nil {
return xerrors.Errorf("failed to store simulation head: %w", err)
@ -156,85 +187,14 @@ func (sim *Simulation) SetHead(head *types.TipSet) error {
return nil
}
// Name returns the simulation's name.
func (sim *Simulation) Name() string {
return sim.name
}
func (sim *Simulation) postChainCommitInfo(ctx context.Context, epoch abi.ChainEpoch) (abi.Randomness, error) {
commitRand, err := sim.Chainstore.GetChainRandomness(
ctx, sim.head.Cids(), crypto.DomainSeparationTag_PoStChainCommit, epoch, nil, true)
return commitRand, err
}
const beaconPrefix = "mockbeacon:"
func (sim *Simulation) nextBeaconEntries() []types.BeaconEntry {
parentBeacons := sim.head.Blocks()[0].BeaconEntries
lastBeacon := parentBeacons[len(parentBeacons)-1]
beaconRound := lastBeacon.Round + 1
buf := make([]byte, len(beaconPrefix)+8)
copy(buf, beaconPrefix)
binary.BigEndian.PutUint64(buf[len(beaconPrefix):], beaconRound)
beaconRand := sha256.Sum256(buf)
return []types.BeaconEntry{{
Round: beaconRound,
Data: beaconRand[:],
}}
}
func (sim *Simulation) nextTicket() *types.Ticket {
newProof := sha256.Sum256(sim.head.MinTicket().VRFProof)
return &types.Ticket{
VRFProof: newProof[:],
}
}
func (sim *Simulation) makeTipSet(ctx context.Context, messages []*types.Message) (*types.TipSet, error) {
parentTs := sim.head
parentState, parentRec, err := sim.sm.TipSetState(ctx, parentTs)
if err != nil {
return nil, xerrors.Errorf("failed to compute parent tipset: %w", err)
}
msgsCid, err := sim.storeMessages(ctx, messages)
if err != nil {
return nil, xerrors.Errorf("failed to store block messages: %w", err)
}
uts := parentTs.MinTimestamp() + build.BlockDelaySecs
blks := []*types.BlockHeader{{
Miner: parentTs.MinTicketBlock().Miner, // keep reusing the same miner.
Ticket: sim.nextTicket(),
BeaconEntries: sim.nextBeaconEntries(),
Parents: parentTs.Cids(),
Height: parentTs.Height() + 1,
ParentStateRoot: parentState,
ParentMessageReceipts: parentRec,
Messages: msgsCid,
ParentBaseFee: baseFee,
Timestamp: uts,
ElectionProof: &types.ElectionProof{WinCount: 1},
}}
err = sim.Chainstore.PersistBlockHeaders(blks...)
if err != nil {
return nil, xerrors.Errorf("failed to persist block headers: %w", err)
}
newTipSet, err := types.NewTipSet(blks)
if err != nil {
return nil, xerrors.Errorf("failed to create new tipset: %w", err)
}
now := time.Now()
_, _, err = sim.sm.TipSetState(ctx, newTipSet)
if err != nil {
return nil, xerrors.Errorf("failed to compute new tipset: %w", err)
}
duration := time.Since(now)
log.Infow("computed tipset", "duration", duration, "height", newTipSet.Height())
return newTipSet, nil
}
// SetUpgradeHeight sets the height of the given network version change (and saves the config).
//
// This fails if the specified epoch has already passed or the new upgrade schedule is invalid.
func (sim *Simulation) SetUpgradeHeight(nv network.Version, epoch abi.ChainEpoch) (_err error) {
if epoch <= sim.head.Height() {
return xerrors.Errorf("cannot set upgrade height in the past (%d <= %d)", epoch, sim.head.Height())
@ -269,6 +229,7 @@ func (sim *Simulation) SetUpgradeHeight(nv network.Version, epoch abi.ChainEpoch
return nil
}
// ListUpgrades returns any future network upgrades.
func (sim *Simulation) ListUpgrades() (stmgr.UpgradeSchedule, error) {
upgrades, err := sim.config.upgradeSchedule()
if err != nil {
@ -283,11 +244,3 @@ func (sim *Simulation) ListUpgrades() (stmgr.UpgradeSchedule, error) {
}
return pending, nil
}
func (sim *Simulation) saveConfig() error {
buf, err := json.Marshal(sim.config)
if err != nil {
return err
}
return sim.MetadataDS.Put(sim.key("config"), buf)
}

View File

@ -2,7 +2,6 @@ package simulation
import (
"context"
"math/rand"
"sort"
"github.com/filecoin-project/go-address"
@ -11,41 +10,19 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)
type perm struct {
miners []address.Address
offset int
}
func (p *perm) shuffle() {
rand.Shuffle(len(p.miners), func(i, j int) {
p.miners[i], p.miners[j] = p.miners[j], p.miners[i]
})
}
func (p *perm) next() address.Address {
next := p.miners[p.offset]
p.offset++
p.offset %= len(p.miners)
return next
}
func (p *perm) add(addr address.Address) {
p.miners = append(p.miners, addr)
}
func (p *perm) len() int {
return len(p.miners)
}
// simualtionState holds the "state" of the simulation. This is split from the Simulation type so we
// can load it on-dempand if and when we need to actually _run_ the simualation. Loading the
// simulation state requires walking all active miners.
type simulationState struct {
*Simulation
// The tiers represent the top 1%, top 10%, and everyone else. When sealing sectors, we seal
// a group of sectors for the top 1%, a group (half that size) for the top 10%, and one
// sector for everyone else. We determine these rates by looking at two power tables.
// TODO Ideally we'd "learn" this distribution from the network. But this is good enough for
// now. The tiers represent the top 1%, top 10%, and everyone else. When sealing sectors, we
// seal a group of sectors for the top 1%, a group (half that size) for the top 10%, and one
// sector for everyone else. We really should pick a better algorithm.
// now.
minerDist struct {
top1, top10, rest perm
top1, top10, rest actorIter
}
// We track the window post periods per miner and assume that no new miners are ever added.
@ -58,6 +35,9 @@ type simulationState struct {
pendingWposts []*types.Message
nextWpostEpoch abi.ChainEpoch
// We track the set of pending commits. On simulation load, and when a new pre-commit is
// added to the chain, we put the commit in this queue. advanceEpoch(currentEpoch) should be
// called on this queue at every epoch before using it.
commitQueue commitQueue
}
@ -167,7 +147,7 @@ func loadSimulationState(ctx context.Context, sim *Simulation) (*simulationState
})
for i, oi := range sealList {
var dist *perm
var dist *actorIter
if i < len(sealList)/100 {
dist = &state.minerDist.top1
} else if i < len(sealList)/10 {
@ -185,6 +165,35 @@ func loadSimulationState(ctx context.Context, sim *Simulation) (*simulationState
return state, nil
}
// nextEpoch returns the next epoch (head+1).
func (ss *simulationState) nextEpoch() abi.ChainEpoch {
return ss.GetHead().Height() + 1
}
// getMinerInfo returns the miner's cached info.
//
// NOTE: we assume that miner infos won't change. We'll need to fix this if we start supporting arbitrary message.
func (ss *simulationState) getMinerInfo(ctx context.Context, addr address.Address) (*miner.MinerInfo, error) {
minerInfo, ok := ss.minerInfos[addr]
if !ok {
st, err := ss.stateTree(ctx)
if err != nil {
return nil, err
}
act, err := st.GetActor(addr)
if err != nil {
return nil, err
}
minerState, err := miner.Load(ss.Chainstore.ActorStore(ctx), act)
if err != nil {
return nil, err
}
info, err := minerState.Info()
if err != nil {
return nil, err
}
minerInfo = &info
ss.minerInfos[addr] = minerInfo
}
return minerInfo, nil
}

View File

@ -20,6 +20,8 @@ import (
)
const (
// The number of expected blocks in a tipset. We use this to determine how much gas a tipset
// has.
expectedBlocks = 5
// TODO: This will produce invalid blocks but it will accurately model the amount of gas
// we're willing to use per-tipset.
@ -42,6 +44,7 @@ func (sim *Simulation) Step(ctx context.Context) (*types.TipSet, error) {
return ts, nil
}
// step steps the simulation state forward one step, producing and executing a new tipset.
func (ss *simulationState) step(ctx context.Context) (*types.TipSet, error) {
log.Infow("step", "epoch", ss.head.Height()+1)
messages, err := ss.popNextMessages(ctx)
@ -59,20 +62,25 @@ func (ss *simulationState) step(ctx context.Context) (*types.TipSet, error) {
}
type packFunc func(*types.Message) (full bool, err error)
type messageGenerator func(ctx context.Context, cb packFunc) (full bool, err error)
// popNextMessages generates/picks a set of messages to be included in the next block.
//
// - This function is destructive and should only be called once per epoch.
// - This function does not store anything in the repo.
// - This function handles all gas estimation. The returned messages should all fit in a single
// block.
func (ss *simulationState) popNextMessages(ctx context.Context) ([]*types.Message, error) {
parentTs := ss.head
parentState, _, err := ss.sm.TipSetState(ctx, parentTs)
if err != nil {
return nil, err
}
// First we make sure we don't have an upgrade at this epoch. If we do, we return no
// messages so we can just create an empty block at that epoch.
//
// This isn't what the network does, but it makes things easier. Otherwise, we'd need to run
// migrations before this epoch and I'd rather not deal with that.
nextHeight := parentTs.Height() + 1
prevVer := ss.sm.GetNtwkVersion(ctx, nextHeight-1)
nextVer := ss.sm.GetNtwkVersion(ctx, nextHeight)
if nextVer != prevVer {
// So... we _could_ actually run the migration, but that's a pain. It's easier to
// just have an empty block then let the state manager run the migration as normal.
log.Warnw("packing no messages for version upgrade block",
"old", prevVer,
"new", nextVer,
@ -81,10 +89,20 @@ func (ss *simulationState) popNextMessages(ctx context.Context) ([]*types.Messag
return nil, nil
}
// Then we need to execute messages till we run out of gas. Those messages will become the
// block's messages.
// Next, we compute the state for the parent tipset. In practice, this will likely be
// cached.
parentState, _, err := ss.sm.TipSetState(ctx, parentTs)
if err != nil {
return nil, err
}
// Then we construct a VM to execute messages for gas estimation.
//
// Most parts of this VM are "real" except:
// 1. We don't charge a fee.
// 2. The runtime has "fake" proof logic.
// 3. We don't actually save any of the results.
r := store.NewChainRand(ss.sm.ChainStore(), parentTs.Cids())
// TODO: Factor this out maybe?
vmopt := &vm.VMOpts{
StateBase: parentState,
Epoch: nextHeight,
@ -100,9 +118,15 @@ func (ss *simulationState) popNextMessages(ctx context.Context) ([]*types.Messag
if err != nil {
return nil, err
}
// TODO: This is the wrong store and may not include important state for what we're doing
// here....
// Maybe we just track nonces separately? Yeah, probably better that way.
// Next we define a helper function for "pushing" messages. This is the function that will
// be passed to the "pack" functions.
//
// It.
//
// 1. Tries to execute the message on-top-of the already pushed message.
// 2. Is careful to revert messages on failure to avoid nasties like nonce-gaps.
// 3. Resolves IDs as necessary, fills in missing parts of the message, etc.
vmStore := vmi.ActorStore(ctx)
var gasTotal int64
var messages []*types.Message
@ -181,16 +205,52 @@ func (ss *simulationState) popNextMessages(ctx context.Context) ([]*types.Messag
messages = append(messages, msg)
return false, nil
}
for _, mgen := range []messageGenerator{ss.packWindowPoSts, ss.packProveCommits, ss.packPreCommits} {
if full, err := mgen(ctx, tryPushMsg); err != nil {
name := runtime.FuncForPC(reflect.ValueOf(mgen).Pointer()).Name()
lastDot := strings.LastIndexByte(name, '.')
fName := name[lastDot+1 : len(name)-3]
return nil, xerrors.Errorf("when packing messages with %s: %w", fName, err)
} else if full {
break
}
// Finally, we generate a set of messages to be included in
if err := ss.packMessages(ctx, tryPushMsg); err != nil {
return nil, err
}
return messages, nil
}
// functionName extracts the name of given function.
func functionName(fn interface{}) string {
name := runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
lastDot := strings.LastIndexByte(name, '.')
if lastDot >= 0 {
name = name[lastDot+1 : len(name)-3]
}
lastDash := strings.LastIndexByte(name, '-')
if lastDash > 0 {
name = name[:lastDash]
}
return name
}
// packMessages packs messages with the given packFunc until the block is full (packFunc returns
// true).
// TODO: Make this more configurable for other simulations.
func (ss *simulationState) packMessages(ctx context.Context, cb packFunc) error {
type messageGenerator func(ctx context.Context, cb packFunc) (full bool, err error)
// We pack messages in-order:
// 1. Any window posts. We pack window posts as soon as the deadline opens to ensure we only
// miss them if/when we run out of chain bandwidth.
// 2. Prove commits. We do this eagerly to ensure they don't expire.
// 3. Finally, we fill the rest of the space with pre-commits.
messageGenerators := []messageGenerator{
ss.packWindowPoSts,
ss.packProveCommits,
ss.packPreCommits,
}
for _, mgen := range messageGenerators {
if full, err := mgen(ctx, cb); err != nil {
return xerrors.Errorf("when packing messages with %s: %w", functionName(mgen), err)
} else if full {
break
}
}
return nil
}

View File

@ -5,41 +5,29 @@ import (
"math"
"time"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/aerrors"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
"golang.org/x/xerrors"
)
func (ss *simulationState) getMinerInfo(ctx context.Context, addr address.Address) (*miner.MinerInfo, error) {
minerInfo, ok := ss.minerInfos[addr]
if !ok {
st, err := ss.stateTree(ctx)
if err != nil {
return nil, err
}
act, err := st.GetActor(addr)
if err != nil {
return nil, err
}
minerState, err := miner.Load(ss.Chainstore.ActorStore(ctx), act)
if err != nil {
return nil, err
}
info, err := minerState.Info()
if err != nil {
return nil, err
}
minerInfo = &info
ss.minerInfos[addr] = minerInfo
}
return minerInfo, nil
// postChainCommitInfo returns th
func (sim *Simulation) postChainCommitInfo(ctx context.Context, epoch abi.ChainEpoch) (abi.Randomness, error) {
commitRand, err := sim.Chainstore.GetChainRandomness(
ctx, sim.head.Cids(), crypto.DomainSeparationTag_PoStChainCommit, epoch, nil, true)
return commitRand, err
}
// packWindowPoSts packs window posts until either the block is full or all healty sectors
// have been proven. It does not recover sectors.
func (ss *simulationState) packWindowPoSts(ctx context.Context, cb packFunc) (full bool, _err error) {
// Push any new window posts into the queue.
if err := ss.queueWindowPoSts(ctx); err != nil {
@ -84,7 +72,7 @@ func (ss *simulationState) packWindowPoSts(ctx context.Context, cb packFunc) (fu
return false, nil
}
// Enqueue all missing window posts for the current epoch for the given miner.
// stepWindowPoStsMiner enqueues all missing window posts for the current epoch for the given miner.
func (ss *simulationState) stepWindowPoStsMiner(
ctx context.Context,
addr address.Address, minerState miner.State,
@ -198,7 +186,8 @@ func (ss *simulationState) stepWindowPoStsMiner(
return nil
}
// Enqueue missing window posts for all miners with deadlines opening at the current epoch.
// queueWindowPoSts enqueues missing window posts for all miners with deadlines opening between the
// last epoch in which this function was called and the current epoch (head+1).
func (ss *simulationState) queueWindowPoSts(ctx context.Context) error {
targetHeight := ss.nextEpoch()