Fold together call functions

This commit is contained in:
Geoff Stuart 2022-11-16 18:19:59 -05:00
parent 21afb3f5ea
commit 96035005b8
6 changed files with 109 additions and 251 deletions

View File

@ -30,74 +30,7 @@ var ErrExpensiveFork = errors.New("refusing explicit call due to state fork at e
// Call applies the given message to the given tipset's parent state, at the epoch following the
// tipset's parent. In the presence of null blocks, the height at which the message is invoked may
// be less than the specified tipset.
//
// - If no tipset is specified, the first tipset without an expensive migration is used.
// - If executing a message at a given tipset would trigger an expensive migration, the call will
// fail with ErrExpensiveFork.
func (sm *StateManager) Call(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.InvocResult, error) {
ctx, span := trace.StartSpan(ctx, "statemanager.Call")
defer span.End()
var pheight abi.ChainEpoch = -1
// If no tipset is provided, try to find one without a fork.
if ts == nil {
ts = sm.cs.GetHeaviestTipSet()
// Search back till we find a height with no fork, or we reach the beginning.
for ts.Height() > 0 {
pts, err := sm.cs.GetTipSetFromKey(ctx, ts.Parents())
if err != nil {
return nil, xerrors.Errorf("failed to find a non-forking epoch: %w", err)
}
if !sm.hasExpensiveFork(pts.Height()) {
pheight = pts.Height()
break
}
ts = pts
}
} else if ts.Height() > 0 {
pts, err := sm.cs.LoadTipSet(ctx, ts.Parents())
if err != nil {
return nil, xerrors.Errorf("failed to load parent tipset: %w", err)
}
pheight = pts.Height()
if sm.hasExpensiveFork(pheight) {
return nil, ErrExpensiveFork
}
} else {
// We can't get the parent tipset in this case.
pheight = ts.Height() - 1
}
// Since we're simulating a future message, pretend we're applying it in the "next" tipset
vmHeight := pheight + 1
bstate := ts.ParentState()
// Run the (not expensive) migration.
bstate, err := sm.HandleStateForks(ctx, bstate, pheight, nil, ts)
if err != nil {
return nil, fmt.Errorf("failed to handle fork: %w", err)
}
vmopt := &vm.VMOpts{
StateBase: bstate,
Epoch: vmHeight,
Rand: rand.NewStateRand(sm.cs, ts.Cids(), sm.beacon, sm.GetNetworkVersion),
Bstore: sm.cs.StateBlockstore(),
Actors: sm.tsExec.NewActorRegistry(),
Syscalls: sm.Syscalls,
CircSupplyCalc: sm.GetVMCirculatingSupply,
NetworkVersion: sm.GetNetworkVersion(ctx, pheight+1),
BaseFee: types.NewInt(0),
LookbackState: LookbackStateGetterForTipset(sm, ts),
Tracing: true,
}
vmi, err := sm.newVM(ctx, vmopt)
if err != nil {
return nil, xerrors.Errorf("failed to set up vm: %w", err)
}
if msg.GasLimit == 0 {
msg.GasLimit = build.BlockGasLimit
}
@ -107,138 +40,43 @@ func (sm *StateManager) Call(ctx context.Context, msg *types.Message, ts *types.
if msg.GasPremium == types.EmptyInt {
msg.GasPremium = types.NewInt(0)
}
if msg.Value == types.EmptyInt {
msg.Value = types.NewInt(0)
}
if span.IsRecordingEvents() {
span.AddAttributes(
trace.Int64Attribute("gas_limit", msg.GasLimit),
trace.StringAttribute("gas_feecap", msg.GasFeeCap.String()),
trace.StringAttribute("value", msg.Value.String()),
)
}
stTree, err := sm.StateTree(bstate)
if err != nil {
return nil, xerrors.Errorf("failed to load state tree: %w", err)
}
fromActor, err := stTree.GetActor(msg.From)
if err != nil {
return nil, xerrors.Errorf("call raw get actor: %s", err)
}
msg.Nonce = fromActor.Nonce
ret, err := vmi.ApplyMessage(ctx, msg)
if err != nil && ret == nil {
return nil, xerrors.Errorf("apply message failed: %w", err)
}
var errs string
if ret.ActorErr != nil {
errs = ret.ActorErr.Error()
log.Warnf("chain call failed: %s", ret.ActorErr)
}
return &api.InvocResult{
MsgCid: msg.Cid(),
Msg: msg,
MsgRct: &ret.MessageReceipt,
ExecutionTrace: ret.ExecutionTrace,
Error: errs,
Duration: ret.Duration,
}, err
return sm.callInternal(ctx, msg, nil, ts, cid.Undef, sm.GetNetworkVersion, false)
}
func (sm *StateManager) CallAtStateAndVersion(ctx context.Context, msg *types.Message, ts *types.TipSet, stateCid cid.Cid, v network.Version) (*api.InvocResult, error) {
// CallWithGas calculates the state for a given tipset, and then applies the given message on top of that state.
func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, priorMsgs []types.ChainMsg, ts *types.TipSet) (*api.InvocResult, error) {
return sm.callInternal(ctx, msg, priorMsgs, ts, cid.Undef, sm.GetNetworkVersion, true)
}
// CallAtStateAndVersion allows you to specify a message to execute on the given stateCid and network version.
// This should mostly be used for gas modelling on a migrated state.
// Tipset here is not needed because stateCid and network version fully describe execution we want. The internal function
// will get the heaviest tipset for use for things like basefee, which we don't really care about here.
func (sm *StateManager) CallAtStateAndVersion(ctx context.Context, msg *types.Message, stateCid cid.Cid, v network.Version) (*api.InvocResult, error) {
nvGetter := func(context.Context, abi.ChainEpoch) network.Version {
return v
}
buffStore := blockstore.NewTieredBstore(sm.cs.StateBlockstore(), blockstore.NewMemorySync())
vmopt := &vm.VMOpts{
StateBase: stateCid,
Epoch: ts.Height() + 1,
Rand: rand.NewStateRand(sm.cs, ts.Cids(), sm.beacon, nvGetter),
Bstore: buffStore,
Actors: sm.tsExec.NewActorRegistry(),
Syscalls: sm.Syscalls,
CircSupplyCalc: sm.GetVMCirculatingSupply,
NetworkVersion: v,
BaseFee: types.NewInt(0),
LookbackState: LookbackStateGetterForTipset(sm, ts),
Tracing: true,
}
vmi, err := sm.newVM(ctx, vmopt)
if err != nil {
return nil, xerrors.Errorf("failed to set up vm: %w", err)
}
stTree, err := state.LoadStateTree(cbor.NewCborStore(buffStore), stateCid)
if err != nil {
return nil, xerrors.Errorf("loading state tree: %w", err)
}
fromActor, err := stTree.GetActor(msg.From)
if err != nil {
return nil, xerrors.Errorf("call raw get actor: %s", err)
}
msg.Nonce = fromActor.Nonce
fromKey, err := sm.ResolveToKeyAddress(ctx, msg.From, ts)
if err != nil {
return nil, xerrors.Errorf("could not resolve key: %w", err)
}
var msgApply types.ChainMsg
switch fromKey.Protocol() {
case address.BLS:
msgApply = msg
case address.SECP256K1:
msgApply = &types.SignedMessage{
Message: *msg,
Signature: crypto.Signature{
Type: crypto.SigTypeSecp256k1,
Data: make([]byte, 65),
},
}
}
ret, err := vmi.ApplyMessage(ctx, msgApply)
if err != nil {
return nil, xerrors.Errorf("gas estimation failed: %w", err)
}
var errs string
if ret.ActorErr != nil {
errs = ret.ActorErr.Error()
}
return &api.InvocResult{
MsgCid: msg.Cid(),
Msg: msg,
MsgRct: &ret.MessageReceipt,
GasCost: MakeMsgGasCost(msg, ret),
ExecutionTrace: ret.ExecutionTrace,
Error: errs,
Duration: ret.Duration,
}, nil
return sm.callInternal(ctx, msg, nil, nil, stateCid, nvGetter, true)
}
func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, priorMsgs []types.ChainMsg, ts *types.TipSet) (*api.InvocResult, error) {
ctx, span := trace.StartSpan(ctx, "statemanager.CallWithGas")
// - If no tipset is specified, the first tipset without an expensive migration or one in its parent is used.
// - If executing a message at a given tipset or its parent would trigger an expensive migration, the call will
// fail with ErrExpensiveFork.
func (sm *StateManager) callInternal(ctx context.Context, msg *types.Message, priorMsgs []types.ChainMsg, ts *types.TipSet, stateCid cid.Cid, nvGetter rand.NetworkVersionGetter, checkGas bool) (*api.InvocResult, error) {
ctx, span := trace.StartSpan(ctx, "statemanager.callInternal")
defer span.End()
// Copy the message as we'll be modifying the nonce.
msgCopy := *msg
msg = &msgCopy
var err error
var pts *types.TipSet
if ts == nil {
ts = sm.cs.GetHeaviestTipSet()
@ -248,10 +86,11 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
// height to have no fork, because we'll run it inside this
// function before executing the given message.
for ts.Height() > 0 {
pts, err := sm.cs.GetTipSetFromKey(ctx, ts.Parents())
pts, err = sm.cs.GetTipSetFromKey(ctx, ts.Parents())
if err != nil {
return nil, xerrors.Errorf("failed to find a non-forking epoch: %w", err)
}
// Checks for expensive forks from the parents to the tipset, including nil tipsets
if !sm.hasExpensiveForkBetween(pts.Height(), ts.Height()+1) {
break
}
@ -259,7 +98,7 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
ts = pts
}
} else if ts.Height() > 0 {
pts, err := sm.cs.GetTipSetFromKey(ctx, ts.Parents())
pts, err = sm.cs.GetTipSetFromKey(ctx, ts.Parents())
if err != nil {
return nil, xerrors.Errorf("failed to find a non-forking epoch: %w", err)
}
@ -268,12 +107,22 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
}
}
// Since we're simulating a future message, pretend we're applying it in the "next" tipset
vmHeight := ts.Height() + 1
stateCid, _, err := sm.TipSetState(ctx, ts)
if err != nil {
return nil, xerrors.Errorf("computing tipset state: %w", err)
var vmHeight abi.ChainEpoch
if checkGas {
// Since we're simulating a future message, pretend we're applying it in the "next" tipset
vmHeight = ts.Height() + 1
if stateCid == cid.Undef {
stateCid, _, err = sm.TipSetState(ctx, ts)
if err != nil {
return nil, xerrors.Errorf("computing tipset state: %w", err)
}
}
} else {
// If we're not checking gas, we don't want to have to execute the tipset like above. This saves a lot of computation time
vmHeight = pts.Height() + 1
if stateCid == cid.Undef {
stateCid = ts.ParentState()
}
}
// Technically, the tipset we're passing in here should be ts+1, but that may not exist.
@ -294,12 +143,12 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
vmopt := &vm.VMOpts{
StateBase: stateCid,
Epoch: vmHeight,
Rand: rand.NewStateRand(sm.cs, ts.Cids(), sm.beacon, sm.GetNetworkVersion),
Rand: rand.NewStateRand(sm.cs, ts.Cids(), sm.beacon, nvGetter),
Bstore: buffStore,
Actors: sm.tsExec.NewActorRegistry(),
Syscalls: sm.Syscalls,
CircSupplyCalc: sm.GetVMCirculatingSupply,
NetworkVersion: sm.GetNetworkVersion(ctx, ts.Height()+1),
NetworkVersion: nvGetter(ctx, vmHeight),
BaseFee: ts.Blocks()[0].ParentBaseFee,
LookbackState: LookbackStateGetterForTipset(sm, ts),
Tracing: true,
@ -309,7 +158,7 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
return nil, xerrors.Errorf("failed to set up vm: %w", err)
}
for i, m := range priorMsgs {
_, err := vmi.ApplyMessage(ctx, m)
_, err = vmi.ApplyMessage(ctx, m)
if err != nil {
return nil, xerrors.Errorf("applying prior message (%d, %s): %w", i, m.Cid(), err)
}
@ -334,27 +183,6 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
msg.Nonce = fromActor.Nonce
fromKey, err := sm.ResolveToKeyAddress(ctx, msg.From, ts)
if err != nil {
return nil, xerrors.Errorf("could not resolve key: %w", err)
}
var msgApply types.ChainMsg
switch fromKey.Protocol() {
case address.BLS:
msgApply = msg
case address.SECP256K1:
msgApply = &types.SignedMessage{
Message: *msg,
Signature: crypto.Signature{
Type: crypto.SigTypeSecp256k1,
Data: make([]byte, 65),
},
}
}
// If the fee cap is set to zero, make gas free.
if msg.GasFeeCap.NilOrZero() {
// Now estimate with a new VM with no base fee.
@ -367,9 +195,39 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
}
}
ret, err := vmi.ApplyMessage(ctx, msgApply)
if err != nil {
return nil, xerrors.Errorf("gas estimation failed: %w", err)
var ret *vm.ApplyRet
var gasInfo api.MsgGasCost
if checkGas {
fromKey, err := sm.ResolveToKeyAddress(ctx, msg.From, ts)
if err != nil {
return nil, xerrors.Errorf("could not resolve key: %w", err)
}
var msgApply types.ChainMsg
switch fromKey.Protocol() {
case address.BLS:
msgApply = msg
case address.SECP256K1:
msgApply = &types.SignedMessage{
Message: *msg,
Signature: crypto.Signature{
Type: crypto.SigTypeSecp256k1,
Data: make([]byte, 65),
},
}
}
ret, err = vmi.ApplyMessage(ctx, msgApply)
if err != nil {
return nil, xerrors.Errorf("gas estimation failed: %w", err)
}
gasInfo = MakeMsgGasCost(msg, ret)
} else {
ret, err = vmi.ApplyImplicitMessage(ctx, msg)
if err != nil && ret == nil {
return nil, xerrors.Errorf("apply message failed: %w", err)
}
}
var errs string
@ -381,11 +239,11 @@ func (sm *StateManager) CallWithGas(ctx context.Context, msg *types.Message, pri
MsgCid: msg.Cid(),
Msg: msg,
MsgRct: &ret.MessageReceipt,
GasCost: MakeMsgGasCost(msg, ret),
GasCost: gasInfo,
ExecutionTrace: ret.ExecutionTrace,
Error: errs,
Duration: ret.Duration,
}, nil
}, err
}
var errHaltExecution = fmt.Errorf("halt")

View File

@ -335,7 +335,7 @@ func testForkRefuseCall(t *testing.T, nullsBefore, nullsAfter int) {
parentHeight := pts.Height()
currentHeight := ts.TipSet.TipSet().Height()
// CallWithGas calls _at_ the current tipset.
// CallWithGas calls on top of the given tipset.
ret, err := sm.CallWithGas(ctx, m, nil, ts.TipSet.TipSet())
if parentHeight <= testForkHeight && currentHeight >= testForkHeight {
// If I had a fork, or I _will_ have a fork, it should fail.
@ -347,7 +347,7 @@ func testForkRefuseCall(t *testing.T, nullsBefore, nullsAfter int) {
// Call always applies the message to the "next block" after the tipset's parent state.
ret, err = sm.Call(ctx, m, ts.TipSet.TipSet())
if parentHeight == testForkHeight {
if parentHeight <= testForkHeight && currentHeight >= testForkHeight {
require.Equal(t, ErrExpensiveFork, err)
} else {
require.NoError(t, err)

View File

@ -9,7 +9,6 @@ import (
"text/tabwriter"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
@ -29,6 +28,8 @@ import (
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
)
const MAINNET_GENESIS_TIME = 1598306400
// USAGE: Sync a node, then call migrate-nv17 on some old state. Pass in the cid of the migrated state root,
// the epoch you migrated at, the network version you migrated to, and a message CID. You will be able to replay any
// message from between the migration epoch, and where your node originally synced to. Note: You may run into issues
@ -38,7 +39,7 @@ import (
var gasTraceCmd = &cli.Command{
Name: "trace-gas",
Description: "replay a message on the specified stateRoot and network version to get an execution trace",
ArgsUsage: "[migratedStateRootCid migrationEpoch networkVersion messageCid]",
ArgsUsage: "[migratedStateRootCid networkVersion messageCid]",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
@ -48,7 +49,7 @@ var gasTraceCmd = &cli.Command{
Action: func(cctx *cli.Context) error {
ctx := context.TODO()
if cctx.NArg() != 4 {
if cctx.NArg() != 3 {
return lcli.IncorrectNumArgs(cctx)
}
@ -57,17 +58,12 @@ var gasTraceCmd = &cli.Command{
return fmt.Errorf("failed to parse input: %w", err)
}
epoch, err := strconv.ParseInt(cctx.Args().Get(1), 10, 64)
nv, err := strconv.ParseInt(cctx.Args().Get(1), 10, 32)
if err != nil {
return fmt.Errorf("failed to parse input: %w", err)
}
nv, err := strconv.ParseInt(cctx.Args().Get(2), 10, 32)
if err != nil {
return fmt.Errorf("failed to parse input: %w", err)
}
messageCid, err := cid.Decode(cctx.Args().Get(3))
messageCid, err := cid.Decode(cctx.Args().Get(2))
if err != nil {
return fmt.Errorf("failed to parse input: %w", err)
}
@ -105,7 +101,7 @@ var gasTraceCmd = &cli.Command{
dcs := build.DrandConfigSchedule()
shd := beacon.Schedule{}
for _, dc := range dcs {
bc, err := drand.NewDrandBeacon(1598306400, build.BlockDelaySecs, nil, dc.Config)
bc, err := drand.NewDrandBeacon(MAINNET_GENESIS_TIME, build.BlockDelaySecs, nil, dc.Config)
if err != nil {
return xerrors.Errorf("creating drand beacon: %w", err)
}
@ -132,17 +128,12 @@ var gasTraceCmd = &cli.Command{
return err
}
executionTs, err := cs.GetTipsetByHeight(ctx, abi.ChainEpoch(epoch), nil, false)
if err != nil {
return err
}
tw := tabwriter.NewWriter(os.Stdout, 8, 2, 2, ' ', tabwriter.AlignRight)
res, err := sm.CallAtStateAndVersion(ctx, msg, executionTs, stateRootCid, network.Version(nv))
res, err := sm.CallAtStateAndVersion(ctx, msg, stateRootCid, network.Version(nv))
if err != nil {
return err
}
fmt.Println("Total gas used ", res.MsgRct.GasUsed)
fmt.Println("Total gas used: ", res.MsgRct.GasUsed)
printInternalExecutions(0, []types.ExecutionTrace{res.ExecutionTrace}, tw)
return tw.Flush()
@ -151,7 +142,7 @@ var gasTraceCmd = &cli.Command{
var replayOfflineCmd = &cli.Command{
Name: "replay-offline",
Description: "replay a message on the specified stateRoot and network version to get an execution trace",
Description: "replay a message to get a gas trace",
ArgsUsage: "[messageCid]",
Flags: []cli.Flag{
&cli.StringFlag{
@ -160,17 +151,12 @@ var replayOfflineCmd = &cli.Command{
},
&cli.Int64Flag{
Name: "lookback-limit",
Value: 1000,
Value: 10000,
},
},
Action: func(cctx *cli.Context) error {
ctx := context.TODO()
err := logging.SetLogLevel("*", "FATAL")
if err != nil {
return err
}
if cctx.NArg() != 1 {
return lcli.IncorrectNumArgs(cctx)
}
@ -215,12 +201,13 @@ var replayOfflineCmd = &cli.Command{
dcs := build.DrandConfigSchedule()
shd := beacon.Schedule{}
for _, dc := range dcs {
bc, err := drand.NewDrandBeacon(1598306400, build.BlockDelaySecs, nil, dc.Config) // 1598306400 is mainnet genesis time
bc, err := drand.NewDrandBeacon(MAINNET_GENESIS_TIME, build.BlockDelaySecs, nil, dc.Config)
if err != nil {
return xerrors.Errorf("creating drand beacon: %w", err)
}
shd = append(shd, beacon.BeaconPoint{Start: dc.Start, Beacon: bc})
}
cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck
@ -243,9 +230,13 @@ var replayOfflineCmd = &cli.Command{
if err != nil {
return err
}
if ts == nil {
return xerrors.Errorf("could not find message within the last %d epochs", lookbackLimit)
}
executionTs, err := cs.GetTipsetByHeight(ctx, ts.Height()-2, ts, true)
tw := tabwriter.NewWriter(os.Stdout, 8, 2, 2, ' ', tabwriter.AlignRight)
res, err := sm.Call(ctx, msg, ts)
res, err := sm.CallWithGas(ctx, msg, []types.ChainMsg{}, executionTs)
if err != nil {
return err
}

View File

@ -13,7 +13,10 @@ import (
var log = logging.Logger("lotus-shed")
func main() {
logging.SetLogLevel("*", "INFO")
_ = logging.SetLogLevel("*", "INFO")
_ = logging.SetLogLevelRegex("badger*", "ERROR")
_ = logging.SetLogLevel("drand", "ERROR")
_ = logging.SetLogLevel("chainstore", "ERROR")
local := []*cli.Command{
addressCmd,
@ -110,7 +113,7 @@ func main() {
}
if err := app.Run(os.Args); err != nil {
log.Warnf("%+v", err)
log.Errorf("%+v", err)
os.Exit(1)
return
}

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
@ -63,6 +64,11 @@ var migrationsCmd = &cli.Command{
Action: func(cctx *cli.Context) error {
ctx := context.TODO()
err := logging.SetLogLevelRegex("badger*", "ERROR")
if err != nil {
return err
}
if cctx.NArg() != 1 {
return lcli.IncorrectNumArgs(cctx)
}

View File

@ -97,8 +97,8 @@ func NewClusterRaftConfig(userRaftConfig *config.UserRaftConfig) *ClusterRaftCon
}
//// Validate checks that this configuration has working values,
//// at least in appearance.
// // Validate checks that this configuration has working values,
// // at least in appearance.
func ValidateConfig(cfg *ClusterRaftConfig) error {
if cfg.RaftConfig == nil {
return xerrors.Errorf("no hashicorp/raft.Config")