edb2b9584b
We did this to avoid using cached migrations in the borked watermelon upgrade on calibrationnet, but we don't need to keep this in the code (even for users syncing from scratch).
509 lines
16 KiB
Go
509 lines
16 KiB
Go
package stmgr
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"os"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
"github.com/ipfs/go-datastore"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-address"
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
"github.com/filecoin-project/go-state-types/big"
|
|
"github.com/filecoin-project/go-state-types/network"
|
|
"github.com/filecoin-project/specs-actors/v8/actors/migration/nv16"
|
|
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/chain/actors/adt"
|
|
"github.com/filecoin-project/lotus/chain/actors/builtin"
|
|
init_ "github.com/filecoin-project/lotus/chain/actors/builtin/init"
|
|
"github.com/filecoin-project/lotus/chain/state"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/chain/vm"
|
|
)
|
|
|
|
// EnvDisablePreMigrations when set to '1' stops pre-migrations from running
|
|
const EnvDisablePreMigrations = "LOTUS_DISABLE_PRE_MIGRATIONS"
|
|
|
|
// MigrationCache can be used to cache information used by a migration. This is primarily useful to
|
|
// "pre-compute" some migration state ahead of time, and make it accessible in the migration itself.
|
|
type MigrationCache interface {
|
|
Write(key string, value cid.Cid) error
|
|
Read(key string) (bool, cid.Cid, error)
|
|
Load(key string, loadFunc func() (cid.Cid, error)) (cid.Cid, error)
|
|
}
|
|
|
|
// MigrationFunc is a migration function run at every upgrade.
|
|
//
|
|
// - The cache is a per-upgrade cache, pre-populated by pre-migrations.
|
|
// - The oldState is the state produced by the upgrade epoch.
|
|
// - The returned newState is the new state that will be used by the next epoch.
|
|
// - The height is the upgrade epoch height (already executed).
|
|
// - The tipset is the first non-null tipset after the upgrade height (the tipset in
|
|
// which the upgrade is executed). Do not assume that ts.Height() is the upgrade height.
|
|
//
|
|
// NOTE: In StateCompute and CallWithGas, the passed tipset is actually the tipset _before_ the
|
|
// upgrade. The tipset should really only be used for referencing the "current chain".
|
|
type MigrationFunc func(
|
|
ctx context.Context,
|
|
sm *StateManager, cache MigrationCache,
|
|
cb ExecMonitor,
|
|
oldState cid.Cid,
|
|
height abi.ChainEpoch, ts *types.TipSet,
|
|
) (newState cid.Cid, err error)
|
|
|
|
// PreMigrationFunc is a function run _before_ a network upgrade to pre-compute part of the network
|
|
// upgrade and speed it up.
|
|
type PreMigrationFunc func(
|
|
ctx context.Context,
|
|
sm *StateManager, cache MigrationCache,
|
|
oldState cid.Cid,
|
|
height abi.ChainEpoch, ts *types.TipSet,
|
|
) error
|
|
|
|
// PreMigration describes a pre-migration step to prepare for a network state upgrade. Pre-migrations
|
|
// are optimizations, are not guaranteed to run, and may be canceled and/or run multiple times.
|
|
type PreMigration struct {
|
|
// PreMigration is the pre-migration function to run at the specified time. This function is
|
|
// run asynchronously and must abort promptly when canceled.
|
|
PreMigration PreMigrationFunc
|
|
|
|
// StartWithin specifies that this pre-migration should be started at most StartWithin
|
|
// epochs before the upgrade.
|
|
StartWithin abi.ChainEpoch
|
|
|
|
// DontStartWithin specifies that this pre-migration should not be started DontStartWithin
|
|
// epochs before the final upgrade epoch.
|
|
//
|
|
// This should be set such that the pre-migration is likely to complete before StopWithin.
|
|
DontStartWithin abi.ChainEpoch
|
|
|
|
// StopWithin specifies that this pre-migration should be stopped StopWithin epochs of the
|
|
// final upgrade epoch.
|
|
StopWithin abi.ChainEpoch
|
|
}
|
|
|
|
type Upgrade struct {
|
|
Height abi.ChainEpoch
|
|
Network network.Version
|
|
Expensive bool
|
|
Migration MigrationFunc
|
|
|
|
// PreMigrations specifies a set of pre-migration functions to run at the indicated epochs.
|
|
// These functions should fill the given cache with information that can speed up the
|
|
// eventual full migration at the upgrade epoch.
|
|
PreMigrations []PreMigration
|
|
}
|
|
|
|
type UpgradeSchedule []Upgrade
|
|
|
|
func (us UpgradeSchedule) Validate() error {
|
|
// Make sure each upgrade is valid.
|
|
for _, u := range us {
|
|
if u.Network <= 0 {
|
|
return xerrors.Errorf("cannot upgrade to version <= 0: %d", u.Network)
|
|
}
|
|
|
|
for _, m := range u.PreMigrations {
|
|
if m.StartWithin <= 0 {
|
|
return xerrors.Errorf("pre-migration must specify a positive start-within epoch")
|
|
}
|
|
|
|
if m.DontStartWithin < 0 || m.StopWithin < 0 {
|
|
return xerrors.Errorf("pre-migration must specify non-negative epochs")
|
|
}
|
|
|
|
if m.StartWithin <= m.StopWithin {
|
|
return xerrors.Errorf("pre-migration start-within must come before stop-within")
|
|
}
|
|
|
|
// If we have a dont-start-within.
|
|
if m.DontStartWithin != 0 {
|
|
if m.DontStartWithin < m.StopWithin {
|
|
return xerrors.Errorf("pre-migration dont-start-within must come before stop-within")
|
|
}
|
|
if m.StartWithin <= m.DontStartWithin {
|
|
return xerrors.Errorf("pre-migration start-within must come after dont-start-within")
|
|
}
|
|
}
|
|
}
|
|
if !sort.SliceIsSorted(u.PreMigrations, func(i, j int) bool {
|
|
return u.PreMigrations[i].StartWithin > u.PreMigrations[j].StartWithin //nolint:scopelint,gosec
|
|
}) {
|
|
return xerrors.Errorf("pre-migrations must be sorted by start epoch")
|
|
}
|
|
}
|
|
|
|
// Make sure the upgrade order makes sense.
|
|
for i := 1; i < len(us); i++ {
|
|
prev := &us[i-1]
|
|
curr := &us[i]
|
|
if !(prev.Network <= curr.Network) {
|
|
return xerrors.Errorf("cannot downgrade from version %d to version %d", prev.Network, curr.Network)
|
|
}
|
|
// Make sure the heights make sense.
|
|
if prev.Height < 0 {
|
|
// Previous upgrade was disabled.
|
|
continue
|
|
}
|
|
if !(prev.Height < curr.Height) {
|
|
return xerrors.Errorf("upgrade heights must be strictly increasing: upgrade %d was at height %d, followed by upgrade %d at height %d", i-1, prev.Height, i, curr.Height)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (us UpgradeSchedule) GetNtwkVersion(e abi.ChainEpoch) (network.Version, error) {
|
|
// Traverse from newest to oldest returning upgrade active during epoch e
|
|
for i := len(us) - 1; i >= 0; i-- {
|
|
u := us[i]
|
|
// u.Height is the last epoch before u.Network becomes the active version
|
|
if u.Height < e {
|
|
return u.Network, nil
|
|
}
|
|
}
|
|
|
|
return build.GenesisNetworkVersion, nil
|
|
}
|
|
|
|
func (sm *StateManager) HandleStateForks(ctx context.Context, root cid.Cid, height abi.ChainEpoch, cb ExecMonitor, ts *types.TipSet) (cid.Cid, error) {
|
|
retCid := root
|
|
u := sm.stateMigrations[height]
|
|
if u != nil && u.upgrade != nil {
|
|
migCid, ok, err := u.migrationResultCache.Get(ctx, root)
|
|
if err == nil {
|
|
if ok {
|
|
log.Infow("CACHED migration", "height", height, "from", root, "to", migCid)
|
|
return migCid, nil
|
|
}
|
|
} else if !errors.Is(err, datastore.ErrNotFound) {
|
|
log.Errorw("failed to lookup previous migration result", "err", err)
|
|
} else {
|
|
log.Debug("no cached migration found, migrating from scratch")
|
|
}
|
|
|
|
startTime := time.Now()
|
|
log.Warnw("STARTING migration", "height", height, "from", root)
|
|
// Yes, we clone the cache, even for the final upgrade epoch. Why? Reverts. We may
|
|
// have to migrate multiple times.
|
|
tmpCache := u.cache.Clone()
|
|
retCid, err = u.upgrade(ctx, sm, tmpCache, cb, root, height, ts)
|
|
if err != nil {
|
|
log.Errorw("FAILED migration", "height", height, "from", root, "error", err)
|
|
return cid.Undef, err
|
|
}
|
|
// Yes, we update the cache, even for the final upgrade epoch. Why? Reverts. This
|
|
// can save us a _lot_ of time because very few actors will have changed if we
|
|
// do a small revert then need to re-run the migration.
|
|
u.cache.Update(tmpCache)
|
|
log.Warnw("COMPLETED migration",
|
|
"height", height,
|
|
"from", root,
|
|
"to", retCid,
|
|
"duration", time.Since(startTime),
|
|
)
|
|
|
|
// Only set if migration ran, we do not want a root => root mapping
|
|
if err := u.migrationResultCache.Store(ctx, root, retCid); err != nil {
|
|
log.Errorw("failed to store migration result", "err", err)
|
|
}
|
|
}
|
|
|
|
return retCid, nil
|
|
}
|
|
|
|
// Returns true executing tipsets between the specified heights would trigger an expensive
|
|
// migration. NOTE: migrations occurring _at_ the target height are not included, as they're
|
|
// executed _after_ the target height.
|
|
func (sm *StateManager) hasExpensiveForkBetween(parent, height abi.ChainEpoch) bool {
|
|
for h := parent; h < height; h++ {
|
|
if _, ok := sm.expensiveUpgrades[h]; ok {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func runPreMigration(ctx context.Context, sm *StateManager, fn PreMigrationFunc, cache *nv16.MemMigrationCache, ts *types.TipSet) {
|
|
height := ts.Height()
|
|
parent := ts.ParentState()
|
|
|
|
if disabled := os.Getenv(EnvDisablePreMigrations); strings.TrimSpace(disabled) == "1" {
|
|
log.Warnw("SKIPPING pre-migration", "height", height)
|
|
return
|
|
}
|
|
|
|
startTime := time.Now()
|
|
|
|
log.Warn("STARTING pre-migration")
|
|
// Clone the cache so we don't actually _update_ it
|
|
// till we're done. Otherwise, if we fail, the next
|
|
// migration to use the cache may assume that
|
|
// certain blocks exist, even if they don't.
|
|
tmpCache := cache.Clone()
|
|
err := fn(ctx, sm, tmpCache, parent, height, ts)
|
|
if err != nil {
|
|
log.Errorw("FAILED pre-migration", "error", err)
|
|
return
|
|
}
|
|
// Finally, if everything worked, update the cache.
|
|
cache.Update(tmpCache)
|
|
log.Warnw("COMPLETED pre-migration", "duration", time.Since(startTime))
|
|
}
|
|
|
|
func (sm *StateManager) preMigrationWorker(ctx context.Context) {
|
|
defer close(sm.shutdown)
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
type op struct {
|
|
after abi.ChainEpoch
|
|
notAfter abi.ChainEpoch
|
|
run func(ts *types.TipSet)
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
defer wg.Wait()
|
|
|
|
// Turn each pre-migration into an operation in a schedule.
|
|
var schedule []op
|
|
for upgradeEpoch, migration := range sm.stateMigrations {
|
|
cache := migration.cache
|
|
for _, prem := range migration.preMigrations {
|
|
preCtx, preCancel := context.WithCancel(ctx)
|
|
migrationFunc := prem.PreMigration
|
|
|
|
afterEpoch := upgradeEpoch - prem.StartWithin
|
|
notAfterEpoch := upgradeEpoch - prem.DontStartWithin
|
|
stopEpoch := upgradeEpoch - prem.StopWithin
|
|
// We can't start after we stop.
|
|
if notAfterEpoch > stopEpoch {
|
|
notAfterEpoch = stopEpoch - 1
|
|
}
|
|
|
|
// Add an op to start a pre-migration.
|
|
schedule = append(schedule, op{
|
|
after: afterEpoch,
|
|
notAfter: notAfterEpoch,
|
|
|
|
// TODO: are these values correct?
|
|
run: func(ts *types.TipSet) {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
runPreMigration(preCtx, sm, migrationFunc, cache, ts)
|
|
}()
|
|
},
|
|
})
|
|
|
|
// Add an op to cancel the pre-migration if it's still running.
|
|
schedule = append(schedule, op{
|
|
after: stopEpoch,
|
|
notAfter: -1,
|
|
run: func(ts *types.TipSet) { preCancel() },
|
|
})
|
|
}
|
|
}
|
|
|
|
// Then sort by epoch.
|
|
sort.Slice(schedule, func(i, j int) bool {
|
|
return schedule[i].after < schedule[j].after
|
|
})
|
|
|
|
// Finally, when the head changes, see if there's anything we need to do.
|
|
//
|
|
// We're intentionally ignoring reorgs as they don't matter for our purposes.
|
|
for change := range sm.cs.SubHeadChanges(ctx) {
|
|
for _, head := range change {
|
|
for len(schedule) > 0 {
|
|
op := &schedule[0]
|
|
if head.Val.Height() < op.after {
|
|
break
|
|
}
|
|
|
|
// If we haven't passed the pre-migration height...
|
|
if op.notAfter < 0 || head.Val.Height() < op.notAfter {
|
|
op.run(head.Val)
|
|
}
|
|
schedule = schedule[1:]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func DoTransfer(tree types.StateTree, from, to address.Address, amt abi.TokenAmount, cb func(trace types.ExecutionTrace)) error {
|
|
fromAct, err := tree.GetActor(from)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to get 'from' actor for transfer: %w", err)
|
|
}
|
|
|
|
fromAct.Balance = types.BigSub(fromAct.Balance, amt)
|
|
if fromAct.Balance.Sign() < 0 {
|
|
return xerrors.Errorf("(sanity) deducted more funds from target account than it had (%s, %s)", from, types.FIL(amt))
|
|
}
|
|
|
|
if err := tree.SetActor(from, fromAct); err != nil {
|
|
return xerrors.Errorf("failed to persist from actor: %w", err)
|
|
}
|
|
|
|
toAct, err := tree.GetActor(to)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to get 'to' actor for transfer: %w", err)
|
|
}
|
|
|
|
toAct.Balance = types.BigAdd(toAct.Balance, amt)
|
|
|
|
if err := tree.SetActor(to, toAct); err != nil {
|
|
return xerrors.Errorf("failed to persist to actor: %w", err)
|
|
}
|
|
|
|
if cb != nil {
|
|
// record the transfer in execution traces
|
|
|
|
cb(types.ExecutionTrace{
|
|
Msg: types.MessageTrace{
|
|
From: from,
|
|
To: to,
|
|
Value: amt,
|
|
},
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func TerminateActor(ctx context.Context, tree *state.StateTree, addr address.Address, em ExecMonitor, epoch abi.ChainEpoch, ts *types.TipSet) error {
|
|
a, err := tree.GetActor(addr)
|
|
if xerrors.Is(err, types.ErrActorNotFound) {
|
|
return types.ErrActorNotFound
|
|
} else if err != nil {
|
|
return xerrors.Errorf("failed to get actor to delete: %w", err)
|
|
}
|
|
|
|
var trace types.ExecutionTrace
|
|
if err := DoTransfer(tree, addr, builtin.BurntFundsActorAddr, a.Balance, func(t types.ExecutionTrace) {
|
|
trace = t
|
|
}); err != nil {
|
|
return xerrors.Errorf("transferring terminated actor's balance: %w", err)
|
|
}
|
|
|
|
if em != nil {
|
|
// record the transfer in execution traces
|
|
|
|
fakeMsg := MakeFakeMsg(builtin.SystemActorAddr, addr, big.Zero(), uint64(epoch))
|
|
|
|
if err := em.MessageApplied(ctx, ts, fakeMsg.Cid(), fakeMsg, &vm.ApplyRet{
|
|
MessageReceipt: *MakeFakeRct(),
|
|
ActorErr: nil,
|
|
ExecutionTrace: trace,
|
|
Duration: 0,
|
|
GasCosts: nil,
|
|
}, false); err != nil {
|
|
return xerrors.Errorf("recording transfers: %w", err)
|
|
}
|
|
}
|
|
|
|
err = tree.DeleteActor(addr)
|
|
if err != nil {
|
|
return xerrors.Errorf("deleting actor from tree: %w", err)
|
|
}
|
|
|
|
ia, err := tree.GetActor(init_.Address)
|
|
if err != nil {
|
|
return xerrors.Errorf("loading init actor: %w", err)
|
|
}
|
|
|
|
ias, err := init_.Load(&state.AdtStore{IpldStore: tree.Store}, ia)
|
|
if err != nil {
|
|
return xerrors.Errorf("loading init actor state: %w", err)
|
|
}
|
|
|
|
if err := ias.Remove(addr); err != nil {
|
|
return xerrors.Errorf("deleting entry from address map: %w", err)
|
|
}
|
|
|
|
nih, err := tree.Store.Put(ctx, ias)
|
|
if err != nil {
|
|
return xerrors.Errorf("writing new init actor state: %w", err)
|
|
}
|
|
|
|
ia.Head = nih
|
|
|
|
return tree.SetActor(init_.Address, ia)
|
|
}
|
|
|
|
func SetNetworkName(ctx context.Context, store adt.Store, tree *state.StateTree, name string) error {
|
|
ia, err := tree.GetActor(init_.Address)
|
|
if err != nil {
|
|
return xerrors.Errorf("getting init actor: %w", err)
|
|
}
|
|
|
|
initState, err := init_.Load(store, ia)
|
|
if err != nil {
|
|
return xerrors.Errorf("reading init state: %w", err)
|
|
}
|
|
|
|
if err := initState.SetNetworkName(name); err != nil {
|
|
return xerrors.Errorf("setting network name: %w", err)
|
|
}
|
|
|
|
ia.Head, err = store.Put(ctx, initState)
|
|
if err != nil {
|
|
return xerrors.Errorf("writing new init state: %w", err)
|
|
}
|
|
|
|
if err := tree.SetActor(init_.Address, ia); err != nil {
|
|
return xerrors.Errorf("setting init actor: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func MakeKeyAddr(splitAddr address.Address, count uint64) (address.Address, error) {
|
|
var b bytes.Buffer
|
|
if err := splitAddr.MarshalCBOR(&b); err != nil {
|
|
return address.Undef, xerrors.Errorf("marshalling split address: %w", err)
|
|
}
|
|
|
|
if err := binary.Write(&b, binary.BigEndian, count); err != nil {
|
|
return address.Undef, xerrors.Errorf("writing count into a buffer: %w", err)
|
|
}
|
|
|
|
if err := binary.Write(&b, binary.BigEndian, []byte("Ignition upgrade")); err != nil {
|
|
return address.Undef, xerrors.Errorf("writing fork name into a buffer: %w", err)
|
|
}
|
|
|
|
addr, err := address.NewActorAddress(b.Bytes())
|
|
if err != nil {
|
|
return address.Undef, xerrors.Errorf("create actor address: %w", err)
|
|
}
|
|
|
|
return addr, nil
|
|
}
|
|
|
|
func MakeFakeMsg(from address.Address, to address.Address, amt abi.TokenAmount, nonce uint64) *types.Message {
|
|
return &types.Message{
|
|
From: from,
|
|
To: to,
|
|
Value: amt,
|
|
Nonce: nonce,
|
|
}
|
|
}
|
|
|
|
func MakeFakeRct() *types.MessageReceipt {
|
|
return &types.MessageReceipt{
|
|
ExitCode: 0,
|
|
Return: nil,
|
|
GasUsed: 0,
|
|
}
|
|
}
|