52261fb814
While the previous version "worked", this version nicely separates out the state for the separate stages. Hopefully, we'll be able to use this to build different pipelines with different configs.
408 lines
11 KiB
Go
408 lines
11 KiB
Go
package simulation
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"runtime"
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
"github.com/filecoin-project/go-state-types/network"
|
|
"github.com/ipfs/go-cid"
|
|
"github.com/ipfs/go-datastore"
|
|
logging "github.com/ipfs/go-log/v2"
|
|
|
|
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"
|
|
|
|
"github.com/filecoin-project/lotus/chain/actors/builtin"
|
|
"github.com/filecoin-project/lotus/chain/state"
|
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/cmd/lotus-sim/simulation/stages"
|
|
)
|
|
|
|
var log = logging.Logger("simulation")
|
|
|
|
const onboardingProjectionLookback = 2 * 7 * builtin.EpochsInDay // lookback two weeks
|
|
|
|
// config is the simulation's config, persisted to the local metadata store and loaded on start.
|
|
//
|
|
// See Simulation.loadConfig and Simulation.saveConfig.
|
|
type config struct {
|
|
Upgrades map[network.Version]abi.ChainEpoch
|
|
}
|
|
|
|
// upgradeSchedule constructs an stmgr.StateManager upgrade schedule, overriding any network upgrade
|
|
// epochs as specified in the config.
|
|
func (c *config) upgradeSchedule() (stmgr.UpgradeSchedule, error) {
|
|
upgradeSchedule := stmgr.DefaultUpgradeSchedule()
|
|
expected := make(map[network.Version]struct{}, len(c.Upgrades))
|
|
for nv := range c.Upgrades {
|
|
expected[nv] = struct{}{}
|
|
}
|
|
|
|
// Update network upgrade epochs.
|
|
newUpgradeSchedule := upgradeSchedule[:0]
|
|
for _, upgrade := range upgradeSchedule {
|
|
if height, ok := c.Upgrades[upgrade.Network]; ok {
|
|
delete(expected, upgrade.Network)
|
|
if height < 0 {
|
|
continue
|
|
}
|
|
upgrade.Height = height
|
|
}
|
|
newUpgradeSchedule = append(newUpgradeSchedule, upgrade)
|
|
}
|
|
|
|
// Make sure we didn't try to configure an unknown network version.
|
|
if len(expected) > 0 {
|
|
missing := make([]network.Version, 0, len(expected))
|
|
for nv := range expected {
|
|
missing = append(missing, nv)
|
|
}
|
|
return nil, xerrors.Errorf("unknown network versions %v in config", missing)
|
|
}
|
|
|
|
// Finally, validate it. This ensures we don't change the order of the upgrade or anything
|
|
// like that.
|
|
if err := newUpgradeSchedule.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
return newUpgradeSchedule, nil
|
|
}
|
|
|
|
// Simulation specifies a lotus-sim simulation.
|
|
type Simulation struct {
|
|
*Node
|
|
StateManager *stmgr.StateManager
|
|
|
|
name string
|
|
config config
|
|
start *types.TipSet
|
|
|
|
// head
|
|
st *state.StateTree
|
|
head *types.TipSet
|
|
|
|
stages []stages.Stage
|
|
}
|
|
|
|
// loadConfig loads a simulation's config from the datastore. This must be called on startup and may
|
|
// be called to restore the config from-disk.
|
|
func (sim *Simulation) loadConfig() error {
|
|
configBytes, err := sim.MetadataDS.Get(sim.key("config"))
|
|
if err == nil {
|
|
err = json.Unmarshal(configBytes, &sim.config)
|
|
}
|
|
switch err {
|
|
case nil:
|
|
case datastore.ErrNotFound:
|
|
sim.config = config{}
|
|
default:
|
|
return xerrors.Errorf("failed to load config: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// saveConfig saves the current config to the datastore. This must be called whenever the config is
|
|
// changed.
|
|
func (sim *Simulation) saveConfig() error {
|
|
buf, err := json.Marshal(sim.config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return sim.MetadataDS.Put(sim.key("config"), buf)
|
|
}
|
|
|
|
// stateTree returns the current state-tree for the current head, computing the tipset if necessary.
|
|
// The state-tree is cached until the head is changed.
|
|
func (sim *Simulation) stateTree(ctx context.Context) (*state.StateTree, error) {
|
|
if sim.st == nil {
|
|
st, _, err := sim.StateManager.TipSetState(ctx, sim.head)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sim.st, err = sim.StateManager.StateTree(st)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return sim.st, nil
|
|
}
|
|
|
|
var simulationPrefix = datastore.NewKey("/simulation")
|
|
|
|
// key returns the the key in the form /simulation/<subkey>/<simulation-name>. For example,
|
|
// /simulation/head/default.
|
|
func (sim *Simulation) key(subkey string) datastore.Key {
|
|
return simulationPrefix.ChildString(subkey).ChildString(sim.name)
|
|
}
|
|
|
|
// loadNamedTipSet the tipset with the given name (for this simulation)
|
|
func (sim *Simulation) loadNamedTipSet(name string) (*types.TipSet, error) {
|
|
tskBytes, err := sim.MetadataDS.Get(sim.key(name))
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("failed to load tipset %s/%s: %w", sim.name, name, err)
|
|
}
|
|
tsk, err := types.TipSetKeyFromBytes(tskBytes)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("failed to parse tipste %v (%s/%s): %w", tskBytes, sim.name, name, err)
|
|
}
|
|
ts, err := sim.Chainstore.LoadTipSet(tsk)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("failed to load tipset %s (%s/%s): %w", tsk, sim.name, name, err)
|
|
}
|
|
return ts, nil
|
|
}
|
|
|
|
// storeNamedTipSet stores the tipset at name (relative to the simulation).
|
|
func (sim *Simulation) storeNamedTipSet(name string, ts *types.TipSet) error {
|
|
if err := sim.MetadataDS.Put(sim.key(name), ts.Key().Bytes()); err != nil {
|
|
return xerrors.Errorf("failed to store tipset (%s/%s): %w", sim.name, name, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetHead returns the current simulation head.
|
|
func (sim *Simulation) GetHead() *types.TipSet {
|
|
return sim.head
|
|
}
|
|
|
|
// GetStart returns simulation's parent tipset.
|
|
func (sim *Simulation) GetStart() *types.TipSet {
|
|
return sim.start
|
|
}
|
|
|
|
// GetNetworkVersion returns the current network version for the simulation.
|
|
func (sim *Simulation) GetNetworkVersion() network.Version {
|
|
return sim.StateManager.GetNtwkVersion(context.TODO(), sim.head.Height())
|
|
}
|
|
|
|
// SetHead updates the current head of the simulation and stores it in the metadata store. This is
|
|
// called for every Simulation.Step.
|
|
func (sim *Simulation) SetHead(head *types.TipSet) error {
|
|
if err := sim.storeNamedTipSet("head", head); err != nil {
|
|
return err
|
|
}
|
|
sim.st = nil // we'll compute this on-demand.
|
|
sim.head = head
|
|
return nil
|
|
}
|
|
|
|
// Name returns the simulation's name.
|
|
func (sim *Simulation) Name() string {
|
|
return sim.name
|
|
}
|
|
|
|
// SetUpgradeHeight sets the height of the given network version change (and saves the config).
|
|
//
|
|
// This fails if the specified epoch has already passed or the new upgrade schedule is invalid.
|
|
func (sim *Simulation) SetUpgradeHeight(nv network.Version, epoch abi.ChainEpoch) (_err error) {
|
|
if epoch <= sim.head.Height() {
|
|
return xerrors.Errorf("cannot set upgrade height in the past (%d <= %d)", epoch, sim.head.Height())
|
|
}
|
|
|
|
if sim.config.Upgrades == nil {
|
|
sim.config.Upgrades = make(map[network.Version]abi.ChainEpoch, 1)
|
|
}
|
|
|
|
sim.config.Upgrades[nv] = epoch
|
|
defer func() {
|
|
if _err != nil {
|
|
// try to restore the old config on error.
|
|
_ = sim.loadConfig()
|
|
}
|
|
}()
|
|
|
|
newUpgradeSchedule, err := sim.config.upgradeSchedule()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sm, err := stmgr.NewStateManagerWithUpgradeSchedule(sim.Chainstore, newUpgradeSchedule)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = sim.saveConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
sim.StateManager = sm
|
|
return nil
|
|
}
|
|
|
|
// ListUpgrades returns any future network upgrades.
|
|
func (sim *Simulation) ListUpgrades() (stmgr.UpgradeSchedule, error) {
|
|
upgrades, err := sim.config.upgradeSchedule()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var pending stmgr.UpgradeSchedule
|
|
for _, upgrade := range upgrades {
|
|
if upgrade.Height < sim.head.Height() {
|
|
continue
|
|
}
|
|
pending = append(pending, upgrade)
|
|
}
|
|
return pending, nil
|
|
}
|
|
|
|
type AppliedMessage struct {
|
|
types.Message
|
|
types.MessageReceipt
|
|
}
|
|
|
|
// Walk walks the simulation's chain from the current head back to the first tipset.
|
|
func (sim *Simulation) Walk(
|
|
ctx context.Context,
|
|
maxLookback int64,
|
|
cb func(sm *stmgr.StateManager,
|
|
ts *types.TipSet,
|
|
stCid cid.Cid,
|
|
messages []*AppliedMessage) error,
|
|
) error {
|
|
store := sim.Chainstore.ActorStore(ctx)
|
|
minEpoch := abi.ChainEpoch(0)
|
|
if maxLookback != 0 {
|
|
minEpoch = sim.head.Height() - abi.ChainEpoch(maxLookback)
|
|
}
|
|
|
|
// Given tha loading messages and receipts can be a little bit slow, we do this in parallel.
|
|
//
|
|
// 1. We spin up some number of workers.
|
|
// 2. We hand tipsets to workers in round-robin order.
|
|
// 3. We pull "resolved" tipsets in the same round-robin order.
|
|
// 4. We serially call the callback in reverse-chain order.
|
|
//
|
|
// We have a buffer of size 1 for both resolved tipsets and unresolved tipsets. This should
|
|
// ensure that we never block unecessarily.
|
|
|
|
type work struct {
|
|
ts *types.TipSet
|
|
stCid cid.Cid
|
|
recCid cid.Cid
|
|
}
|
|
type result struct {
|
|
ts *types.TipSet
|
|
stCid cid.Cid
|
|
messages []*AppliedMessage
|
|
}
|
|
|
|
// This is more disk bound than CPU bound, but eh...
|
|
workerCount := runtime.NumCPU() * 2
|
|
|
|
workQs := make([]chan *work, workerCount)
|
|
resultQs := make([]chan *result, workerCount)
|
|
|
|
for i := range workQs {
|
|
workQs[i] = make(chan *work, 1)
|
|
}
|
|
|
|
for i := range resultQs {
|
|
resultQs[i] = make(chan *result, 1)
|
|
}
|
|
|
|
grp, ctx := errgroup.WithContext(ctx)
|
|
|
|
// Walk the chain and fire off work items.
|
|
grp.Go(func() error {
|
|
ts := sim.head
|
|
stCid, recCid, err := sim.StateManager.TipSetState(ctx, ts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
i := 0
|
|
for !ts.Equals(sim.start) && ctx.Err() == nil && ts.Height() > minEpoch {
|
|
select {
|
|
case workQs[i] <- &work{ts, stCid, recCid}:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
|
|
stCid = ts.MinTicketBlock().ParentStateRoot
|
|
recCid = ts.MinTicketBlock().ParentMessageReceipts
|
|
ts, err = sim.Chainstore.LoadTipSet(ts.Parents())
|
|
if err != nil {
|
|
return xerrors.Errorf("loading parent: %w", err)
|
|
}
|
|
i = (i + 1) % workerCount
|
|
}
|
|
for _, q := range workQs {
|
|
close(q)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Spin up one worker per queue pair.
|
|
for i := 0; i < workerCount; i++ {
|
|
workQ := workQs[i]
|
|
resultQ := resultQs[i]
|
|
grp.Go(func() error {
|
|
for job := range workQ {
|
|
msgs, err := sim.Chainstore.MessagesForTipset(job.ts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
recs, err := blockadt.AsArray(store, job.recCid)
|
|
if err != nil {
|
|
return xerrors.Errorf("amt load: %w", err)
|
|
}
|
|
applied := make([]*AppliedMessage, len(msgs))
|
|
var rec types.MessageReceipt
|
|
err = recs.ForEach(&rec, func(i int64) error {
|
|
applied[i] = &AppliedMessage{
|
|
Message: *msgs[i].VMMessage(),
|
|
MessageReceipt: rec,
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
select {
|
|
case resultQ <- &result{
|
|
ts: job.ts,
|
|
stCid: job.stCid,
|
|
messages: applied,
|
|
}:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
close(resultQ)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Process results in the same order we enqueued them.
|
|
grp.Go(func() error {
|
|
qs := resultQs
|
|
for len(qs) > 0 {
|
|
newQs := qs[:0]
|
|
for _, q := range qs {
|
|
select {
|
|
case r, ok := <-q:
|
|
if !ok {
|
|
continue
|
|
}
|
|
err := cb(sim.StateManager, r.ts, r.stCid, r.messages)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
newQs = append(newQs, q)
|
|
}
|
|
qs = newQs
|
|
}
|
|
return nil
|
|
})
|
|
|
|
// Wait for everything to finish.
|
|
return grp.Wait()
|
|
}
|