Merge branch 'vyzo/feat/exec-lanes' into feat/rpcenhancement-mirror
This commit is contained in:
commit
b284beaaef
@ -93,7 +93,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
ctx = blockstore.WithHotView(ctx)
|
ctx = blockstore.WithHotView(ctx)
|
||||||
makeVm := func(base cid.Cid, e abi.ChainEpoch, timestamp uint64) (vm.Interface, error) {
|
makeVm := func(base cid.Cid, e abi.ChainEpoch, timestamp uint64) (vm.Executor, error) {
|
||||||
vmopt := &vm.VMOpts{
|
vmopt := &vm.VMOpts{
|
||||||
StateBase: base,
|
StateBase: base,
|
||||||
Epoch: e,
|
Epoch: e,
|
||||||
@ -109,6 +109,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
|
|||||||
TipSetGetter: stmgr.TipSetGetterForTipset(sm.ChainStore(), ts),
|
TipSetGetter: stmgr.TipSetGetterForTipset(sm.ChainStore(), ts),
|
||||||
Tracing: vmTracing,
|
Tracing: vmTracing,
|
||||||
ReturnEvents: sm.ChainStore().IsStoringEvents(),
|
ReturnEvents: sm.ChainStore().IsStoringEvents(),
|
||||||
|
ExecutionLane: vm.ExecutionLanePriority,
|
||||||
}
|
}
|
||||||
|
|
||||||
return sm.VMConstructor()(ctx, vmopt)
|
return sm.VMConstructor()(ctx, vmopt)
|
||||||
@ -116,7 +117,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
|
|||||||
|
|
||||||
var cronGas int64
|
var cronGas int64
|
||||||
|
|
||||||
runCron := func(vmCron vm.Interface, epoch abi.ChainEpoch) error {
|
runCron := func(vmCron vm.Executor, epoch abi.ChainEpoch) error {
|
||||||
cronMsg := &types.Message{
|
cronMsg := &types.Message{
|
||||||
To: cron.Address,
|
To: cron.Address,
|
||||||
From: builtin.SystemActorAddr,
|
From: builtin.SystemActorAddr,
|
||||||
@ -169,13 +170,17 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
|
|||||||
|
|
||||||
// run cron for null rounds if any
|
// run cron for null rounds if any
|
||||||
if err = runCron(vmCron, i); err != nil {
|
if err = runCron(vmCron, i); err != nil {
|
||||||
|
vmCron.Done()
|
||||||
return cid.Undef, cid.Undef, xerrors.Errorf("running cron: %w", err)
|
return cid.Undef, cid.Undef, xerrors.Errorf("running cron: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pstate, err = vmCron.Flush(ctx)
|
pstate, err = vmCron.Flush(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
vmCron.Done()
|
||||||
return cid.Undef, cid.Undef, xerrors.Errorf("flushing cron vm: %w", err)
|
return cid.Undef, cid.Undef, xerrors.Errorf("flushing cron vm: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vmCron.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
// handle state forks
|
// handle state forks
|
||||||
@ -191,10 +196,12 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
|
|||||||
cronGas = 0
|
cronGas = 0
|
||||||
partDone = metrics.Timer(ctx, metrics.VMApplyMessages)
|
partDone = metrics.Timer(ctx, metrics.VMApplyMessages)
|
||||||
|
|
||||||
|
// TODO reorg the code to minimize the execution critical section
|
||||||
vmi, err := makeVm(pstate, epoch, ts.MinTimestamp())
|
vmi, err := makeVm(pstate, epoch, ts.MinTimestamp())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, cid.Undef, xerrors.Errorf("making vm: %w", err)
|
return cid.Undef, cid.Undef, xerrors.Errorf("making vm: %w", err)
|
||||||
}
|
}
|
||||||
|
defer vmi.Done()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
receipts []*types.MessageReceipt
|
receipts []*types.MessageReceipt
|
||||||
|
@ -496,6 +496,7 @@ func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, sys vm.Sysca
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, xerrors.Errorf("failed to create VM: %w", err)
|
return cid.Undef, xerrors.Errorf("failed to create VM: %w", err)
|
||||||
}
|
}
|
||||||
|
defer vm.Done()
|
||||||
|
|
||||||
for mi, m := range template.Miners {
|
for mi, m := range template.Miners {
|
||||||
for si, s := range m.Sectors {
|
for si, s := range m.Sectors {
|
||||||
|
@ -88,7 +88,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal
|
|||||||
return big.Zero(), nil
|
return big.Zero(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
newVM := func(base cid.Cid) (vm.Interface, error) {
|
newVM := func(base cid.Cid) (vm.Executor, error) {
|
||||||
vmopt := &vm.VMOpts{
|
vmopt := &vm.VMOpts{
|
||||||
StateBase: base,
|
StateBase: base,
|
||||||
Epoch: 0,
|
Epoch: 0,
|
||||||
@ -108,6 +108,8 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, fmt.Errorf("creating vm: %w", err)
|
return cid.Undef, fmt.Errorf("creating vm: %w", err)
|
||||||
}
|
}
|
||||||
|
// Note: genesisVm is mutated, so this has to happen in a deferred func; go horror show.
|
||||||
|
defer func() { genesisVm.Done() }()
|
||||||
|
|
||||||
if len(miners) == 0 {
|
if len(miners) == 0 {
|
||||||
return cid.Undef, xerrors.New("no genesis miners")
|
return cid.Undef, xerrors.New("no genesis miners")
|
||||||
@ -338,6 +340,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal
|
|||||||
return cid.Undef, xerrors.Errorf("flushing state tree: %w", err)
|
return cid.Undef, xerrors.Errorf("flushing state tree: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
genesisVm.Done()
|
||||||
genesisVm, err = newVM(nh)
|
genesisVm, err = newVM(nh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, fmt.Errorf("creating new vm: %w", err)
|
return cid.Undef, fmt.Errorf("creating new vm: %w", err)
|
||||||
@ -410,6 +413,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal
|
|||||||
return cid.Undef, xerrors.Errorf("flushing state tree: %w", err)
|
return cid.Undef, xerrors.Errorf("flushing state tree: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
genesisVm.Done()
|
||||||
genesisVm, err = newVM(nh)
|
genesisVm, err = newVM(nh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, fmt.Errorf("creating new vm: %w", err)
|
return cid.Undef, fmt.Errorf("creating new vm: %w", err)
|
||||||
@ -517,6 +521,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal
|
|||||||
return cid.Undef, xerrors.Errorf("flushing state tree: %w", err)
|
return cid.Undef, xerrors.Errorf("flushing state tree: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
genesisVm.Done()
|
||||||
genesisVm, err = newVM(nh)
|
genesisVm, err = newVM(nh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, fmt.Errorf("creating new vm: %w", err)
|
return cid.Undef, fmt.Errorf("creating new vm: %w", err)
|
||||||
|
@ -169,6 +169,9 @@ func (sm *StateManager) callInternal(ctx context.Context, msg *types.Message, pr
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to set up vm: %w", err)
|
return nil, xerrors.Errorf("failed to set up vm: %w", err)
|
||||||
}
|
}
|
||||||
|
// Note: vmi is mutated, so this has to happen in a deferred func; go horror show.
|
||||||
|
defer func() { vmi.Done() }()
|
||||||
|
|
||||||
for i, m := range priorMsgs {
|
for i, m := range priorMsgs {
|
||||||
_, err = vmi.ApplyMessage(ctx, m)
|
_, err = vmi.ApplyMessage(ctx, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -201,6 +204,7 @@ func (sm *StateManager) callInternal(ctx context.Context, msg *types.Message, pr
|
|||||||
vmopt.BaseFee = big.Zero()
|
vmopt.BaseFee = big.Zero()
|
||||||
vmopt.StateBase = stateCid
|
vmopt.StateBase = stateCid
|
||||||
|
|
||||||
|
vmi.Done()
|
||||||
vmi, err = sm.newVM(ctx, vmopt)
|
vmi, err = sm.newVM(ctx, vmopt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to set up estimation vm: %w", err)
|
return nil, xerrors.Errorf("failed to set up estimation vm: %w", err)
|
||||||
|
@ -56,6 +56,12 @@ const testForkHeight = 40
|
|||||||
type testActor struct {
|
type testActor struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type mockExecutor struct {
|
||||||
|
vm.Interface
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*mockExecutor) Done() {}
|
||||||
|
|
||||||
// must use existing actor that an account is allowed to exec.
|
// must use existing actor that an account is allowed to exec.
|
||||||
func (testActor) Code() cid.Cid { return builtin0.PaymentChannelActorCodeID }
|
func (testActor) Code() cid.Cid { return builtin0.PaymentChannelActorCodeID }
|
||||||
func (testActor) State() cbor.Er { return new(testActorState) }
|
func (testActor) State() cbor.Er { return new(testActorState) }
|
||||||
@ -178,13 +184,13 @@ func TestForkHeightTriggers(t *testing.T) {
|
|||||||
registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}})
|
registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}})
|
||||||
inv.Register(actorstypes.Version0, nil, registry)
|
inv.Register(actorstypes.Version0, nil, registry)
|
||||||
|
|
||||||
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) {
|
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) {
|
||||||
nvm, err := vm.NewLegacyVM(ctx, vmopt)
|
nvm, err := vm.NewLegacyVM(ctx, vmopt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
nvm.SetInvoker(inv)
|
nvm.SetInvoker(inv)
|
||||||
return nvm, nil
|
return &mockExecutor{nvm}, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
cg.SetStateManager(sm)
|
cg.SetStateManager(sm)
|
||||||
@ -296,13 +302,13 @@ func testForkRefuseCall(t *testing.T, nullsBefore, nullsAfter int) {
|
|||||||
registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}})
|
registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}})
|
||||||
inv.Register(actorstypes.Version0, nil, registry)
|
inv.Register(actorstypes.Version0, nil, registry)
|
||||||
|
|
||||||
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) {
|
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) {
|
||||||
nvm, err := vm.NewLegacyVM(ctx, vmopt)
|
nvm, err := vm.NewLegacyVM(ctx, vmopt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
nvm.SetInvoker(inv)
|
nvm.SetInvoker(inv)
|
||||||
return nvm, nil
|
return &mockExecutor{nvm}, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
cg.SetStateManager(sm)
|
cg.SetStateManager(sm)
|
||||||
@ -518,13 +524,13 @@ func TestForkPreMigration(t *testing.T) {
|
|||||||
registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}})
|
registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}})
|
||||||
inv.Register(actorstypes.Version0, nil, registry)
|
inv.Register(actorstypes.Version0, nil, registry)
|
||||||
|
|
||||||
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) {
|
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) {
|
||||||
nvm, err := vm.NewLegacyVM(ctx, vmopt)
|
nvm, err := vm.NewLegacyVM(ctx, vmopt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
nvm.SetInvoker(inv)
|
nvm.SetInvoker(inv)
|
||||||
return nvm, nil
|
return &mockExecutor{nvm}, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
cg.SetStateManager(sm)
|
cg.SetStateManager(sm)
|
||||||
@ -592,11 +598,11 @@ func TestDisablePreMigration(t *testing.T) {
|
|||||||
registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}})
|
registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}})
|
||||||
inv.Register(actorstypes.Version0, nil, registry)
|
inv.Register(actorstypes.Version0, nil, registry)
|
||||||
|
|
||||||
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) {
|
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) {
|
||||||
nvm, err := vm.NewLegacyVM(ctx, vmopt)
|
nvm, err := vm.NewLegacyVM(ctx, vmopt)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
nvm.SetInvoker(inv)
|
nvm.SetInvoker(inv)
|
||||||
return nvm, nil
|
return &mockExecutor{nvm}, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
cg.SetStateManager(sm)
|
cg.SetStateManager(sm)
|
||||||
@ -647,11 +653,11 @@ func TestMigrtionCache(t *testing.T) {
|
|||||||
registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}})
|
registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}})
|
||||||
inv.Register(actorstypes.Version0, nil, registry)
|
inv.Register(actorstypes.Version0, nil, registry)
|
||||||
|
|
||||||
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) {
|
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) {
|
||||||
nvm, err := vm.NewLegacyVM(ctx, vmopt)
|
nvm, err := vm.NewLegacyVM(ctx, vmopt)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
nvm.SetInvoker(inv)
|
nvm.SetInvoker(inv)
|
||||||
return nvm, nil
|
return &mockExecutor{nvm}, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
cg.SetStateManager(sm)
|
cg.SetStateManager(sm)
|
||||||
@ -691,11 +697,11 @@ func TestMigrtionCache(t *testing.T) {
|
|||||||
index.DummyMsgIndex,
|
index.DummyMsgIndex,
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) {
|
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) {
|
||||||
nvm, err := vm.NewLegacyVM(ctx, vmopt)
|
nvm, err := vm.NewLegacyVM(ctx, vmopt)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
nvm.SetInvoker(inv)
|
nvm.SetInvoker(inv)
|
||||||
return nvm, nil
|
return &mockExecutor{nvm}, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
@ -125,7 +125,7 @@ type StateManager struct {
|
|||||||
compWait map[string]chan struct{}
|
compWait map[string]chan struct{}
|
||||||
stlk sync.Mutex
|
stlk sync.Mutex
|
||||||
genesisMsigLk sync.Mutex
|
genesisMsigLk sync.Mutex
|
||||||
newVM func(context.Context, *vm.VMOpts) (vm.Interface, error)
|
newVM func(context.Context, *vm.VMOpts) (vm.Executor, error)
|
||||||
Syscalls vm.SyscallBuilder
|
Syscalls vm.SyscallBuilder
|
||||||
preIgnitionVesting []msig0.State
|
preIgnitionVesting []msig0.State
|
||||||
postIgnitionVesting []msig0.State
|
postIgnitionVesting []msig0.State
|
||||||
@ -438,12 +438,12 @@ func (sm *StateManager) ValidateChain(ctx context.Context, ts *types.TipSet) err
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StateManager) SetVMConstructor(nvm func(context.Context, *vm.VMOpts) (vm.Interface, error)) {
|
func (sm *StateManager) SetVMConstructor(nvm func(context.Context, *vm.VMOpts) (vm.Executor, error)) {
|
||||||
sm.newVM = nvm
|
sm.newVM = nvm
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StateManager) VMConstructor() func(context.Context, *vm.VMOpts) (vm.Interface, error) {
|
func (sm *StateManager) VMConstructor() func(context.Context, *vm.VMOpts) (vm.Executor, error) {
|
||||||
return func(ctx context.Context, opts *vm.VMOpts) (vm.Interface, error) {
|
return func(ctx context.Context, opts *vm.VMOpts) (vm.Executor, error) {
|
||||||
return sm.newVM(ctx, opts)
|
return sm.newVM(ctx, opts)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -106,6 +106,7 @@ func ComputeState(ctx context.Context, sm *StateManager, height abi.ChainEpoch,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, nil, err
|
return cid.Undef, nil, err
|
||||||
}
|
}
|
||||||
|
defer vmi.Done()
|
||||||
|
|
||||||
for i, msg := range msgs {
|
for i, msg := range msgs {
|
||||||
// TODO: Use the signed message length for secp messages
|
// TODO: Use the signed message length for secp messages
|
||||||
|
227
chain/vm/execution.go
Normal file
227
chain/vm/execution.go
Normal file
@ -0,0 +1,227 @@
|
|||||||
|
package vm
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"go.opencensus.io/stats"
|
||||||
|
"go.opencensus.io/tag"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// DefaultAvailableExecutionLanes is the number of available execution lanes; it is the bound of
|
||||||
|
// concurrent active executions.
|
||||||
|
// This is the default value in filecoin-ffi
|
||||||
|
DefaultAvailableExecutionLanes = 4
|
||||||
|
// DefaultPriorityExecutionLanes is the number of reserved execution lanes for priority computations.
|
||||||
|
// This is purely userspace, but we believe it is a reasonable default, even with more available
|
||||||
|
// lanes.
|
||||||
|
DefaultPriorityExecutionLanes = 2
|
||||||
|
)
|
||||||
|
|
||||||
|
var ErrExecutorDone = errors.New("executor has been released")
|
||||||
|
|
||||||
|
// the execution environment; see below for definition, methods, and initilization
|
||||||
|
var execution *executionEnv
|
||||||
|
|
||||||
|
// implementation of vm executor with simple sanity check preventing use after free.
|
||||||
|
type vmExecutor struct {
|
||||||
|
lk sync.RWMutex
|
||||||
|
vmi Interface
|
||||||
|
token *executionToken
|
||||||
|
done bool
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Executor = (*vmExecutor)(nil)
|
||||||
|
|
||||||
|
func newVMExecutor(vmi Interface, token *executionToken) Executor {
|
||||||
|
return &vmExecutor{vmi: vmi, token: token}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) {
|
||||||
|
e.lk.RLock()
|
||||||
|
defer e.lk.RUnlock()
|
||||||
|
|
||||||
|
if e.done {
|
||||||
|
return nil, ErrExecutorDone
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.vmi.ApplyMessage(ctx, cmsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
|
||||||
|
e.lk.RLock()
|
||||||
|
defer e.lk.RUnlock()
|
||||||
|
|
||||||
|
if e.done {
|
||||||
|
return nil, ErrExecutorDone
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.vmi.ApplyImplicitMessage(ctx, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) {
|
||||||
|
e.lk.RLock()
|
||||||
|
defer e.lk.RUnlock()
|
||||||
|
|
||||||
|
if e.done {
|
||||||
|
return cid.Undef, ErrExecutorDone
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.vmi.Flush(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *vmExecutor) Done() {
|
||||||
|
e.lk.Lock()
|
||||||
|
defer e.lk.Unlock()
|
||||||
|
|
||||||
|
if !e.done {
|
||||||
|
e.token.Done()
|
||||||
|
e.token = nil
|
||||||
|
e.done = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type executionToken struct {
|
||||||
|
lane ExecutionLane
|
||||||
|
reserved int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (token *executionToken) Done() {
|
||||||
|
execution.putToken(token)
|
||||||
|
}
|
||||||
|
|
||||||
|
type executionEnv struct {
|
||||||
|
mx *sync.Mutex
|
||||||
|
cond *sync.Cond
|
||||||
|
|
||||||
|
// available executors
|
||||||
|
available int
|
||||||
|
// reserved executors
|
||||||
|
reserved int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *executionEnv) getToken(lane ExecutionLane) *executionToken {
|
||||||
|
metricsUp(metrics.VMExecutionWaiting, lane)
|
||||||
|
defer metricsDown(metrics.VMExecutionWaiting, lane)
|
||||||
|
|
||||||
|
e.mx.Lock()
|
||||||
|
defer e.mx.Unlock()
|
||||||
|
|
||||||
|
switch lane {
|
||||||
|
case ExecutionLaneDefault:
|
||||||
|
for e.available <= e.reserved {
|
||||||
|
e.cond.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
e.available--
|
||||||
|
|
||||||
|
metricsUp(metrics.VMExecutionRunning, lane)
|
||||||
|
return &executionToken{lane: lane, reserved: 0}
|
||||||
|
|
||||||
|
case ExecutionLanePriority:
|
||||||
|
for e.available == 0 {
|
||||||
|
e.cond.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
e.available--
|
||||||
|
|
||||||
|
reserving := 0
|
||||||
|
if e.reserved > 0 {
|
||||||
|
e.reserved--
|
||||||
|
reserving = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
metricsUp(metrics.VMExecutionRunning, lane)
|
||||||
|
return &executionToken{lane: lane, reserved: reserving}
|
||||||
|
|
||||||
|
default:
|
||||||
|
// already checked at interface boundary in NewVM, so this is appropriate
|
||||||
|
panic("bogus execution lane")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *executionEnv) putToken(token *executionToken) {
|
||||||
|
e.mx.Lock()
|
||||||
|
defer e.mx.Unlock()
|
||||||
|
|
||||||
|
e.available++
|
||||||
|
e.reserved += token.reserved
|
||||||
|
|
||||||
|
e.cond.Broadcast()
|
||||||
|
|
||||||
|
metricsDown(metrics.VMExecutionRunning, token.lane)
|
||||||
|
}
|
||||||
|
|
||||||
|
func metricsUp(metric *stats.Int64Measure, lane ExecutionLane) {
|
||||||
|
metricsAdjust(metric, lane, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func metricsDown(metric *stats.Int64Measure, lane ExecutionLane) {
|
||||||
|
metricsAdjust(metric, lane, -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func metricsAdjust(metric *stats.Int64Measure, lane ExecutionLane, delta int) {
|
||||||
|
laneName := "default"
|
||||||
|
if lane > ExecutionLaneDefault {
|
||||||
|
laneName = "priority"
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, _ := tag.New(
|
||||||
|
context.Background(),
|
||||||
|
tag.Upsert(metrics.ExecutionLane, laneName),
|
||||||
|
)
|
||||||
|
stats.Record(ctx, metric.M(int64(delta)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
var available, priority int
|
||||||
|
var err error
|
||||||
|
|
||||||
|
concurrency := os.Getenv("LOTUS_FVM_CONCURRENCY")
|
||||||
|
if concurrency == "" {
|
||||||
|
available = DefaultAvailableExecutionLanes
|
||||||
|
} else {
|
||||||
|
available, err = strconv.Atoi(concurrency)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
reserved := os.Getenv("LOTUS_FVM_CONCURRENCY_RESERVED")
|
||||||
|
if reserved == "" {
|
||||||
|
priority = DefaultPriorityExecutionLanes
|
||||||
|
} else {
|
||||||
|
priority, err = strconv.Atoi(reserved)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// some sanity checks
|
||||||
|
if available < 2 {
|
||||||
|
panic("insufficient execution concurrency")
|
||||||
|
}
|
||||||
|
|
||||||
|
if priority > available-1 {
|
||||||
|
panic("insufficient default execution concurrency")
|
||||||
|
}
|
||||||
|
|
||||||
|
mx := &sync.Mutex{}
|
||||||
|
cond := sync.NewCond(mx)
|
||||||
|
|
||||||
|
execution = &executionEnv{
|
||||||
|
mx: mx,
|
||||||
|
cond: cond,
|
||||||
|
available: available,
|
||||||
|
reserved: priority,
|
||||||
|
}
|
||||||
|
}
|
@ -250,6 +250,8 @@ type VMOpts struct {
|
|||||||
Tracing bool
|
Tracing bool
|
||||||
// ReturnEvents decodes and returns emitted events.
|
// ReturnEvents decodes and returns emitted events.
|
||||||
ReturnEvents bool
|
ReturnEvents bool
|
||||||
|
// ExecutionLane specifies the execution priority of the created vm
|
||||||
|
ExecutionLane ExecutionLane
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLegacyVM(ctx context.Context, opts *VMOpts) (*LegacyVM, error) {
|
func NewLegacyVM(ctx context.Context, opts *VMOpts) (*LegacyVM, error) {
|
||||||
|
@ -2,6 +2,7 @@ package vm
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
cid "github.com/ipfs/go-cid"
|
cid "github.com/ipfs/go-cid"
|
||||||
@ -17,6 +18,15 @@ var (
|
|||||||
StatApplied uint64
|
StatApplied uint64
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type ExecutionLane int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ExecutionLaneDefault signifies a default, non prioritized execution lane.
|
||||||
|
ExecutionLaneDefault ExecutionLane = iota
|
||||||
|
// ExecutionLanePriority signifies a prioritized execution lane with reserved resources.
|
||||||
|
ExecutionLanePriority
|
||||||
|
)
|
||||||
|
|
||||||
type Interface interface {
|
type Interface interface {
|
||||||
// Applies the given message onto the VM's current state, returning the result of the execution
|
// Applies the given message onto the VM's current state, returning the result of the execution
|
||||||
ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error)
|
ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error)
|
||||||
@ -27,13 +37,24 @@ type Interface interface {
|
|||||||
Flush(ctx context.Context) (cid.Cid, error)
|
Flush(ctx context.Context) (cid.Cid, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Executor is the general vm execution interface, which is prioritized according to execution lanes.
|
||||||
|
// User must call Done when it is done with this executor to release resource holds by the execution
|
||||||
|
// environment
|
||||||
|
type Executor interface {
|
||||||
|
Interface
|
||||||
|
|
||||||
|
// Done must be called when done with the executor to release resource holds.
|
||||||
|
// It is an error to invoke Interface methods after Done has been called.
|
||||||
|
Done()
|
||||||
|
}
|
||||||
|
|
||||||
// WARNING: You will not affect your node's execution by misusing this feature, but you will confuse yourself thoroughly!
|
// WARNING: You will not affect your node's execution by misusing this feature, but you will confuse yourself thoroughly!
|
||||||
// An envvar that allows the user to specify debug actors bundles to be used by the FVM
|
// An envvar that allows the user to specify debug actors bundles to be used by the FVM
|
||||||
// alongside regular execution. This is basically only to be used to print out specific logging information.
|
// alongside regular execution. This is basically only to be used to print out specific logging information.
|
||||||
// Message failures, unexpected terminations,gas costs, etc. should all be ignored.
|
// Message failures, unexpected terminations,gas costs, etc. should all be ignored.
|
||||||
var useFvmDebug = os.Getenv("LOTUS_FVM_DEVELOPER_DEBUG") == "1"
|
var useFvmDebug = os.Getenv("LOTUS_FVM_DEVELOPER_DEBUG") == "1"
|
||||||
|
|
||||||
func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) {
|
func newVM(ctx context.Context, opts *VMOpts) (Interface, error) {
|
||||||
if opts.NetworkVersion >= network.Version16 {
|
if opts.NetworkVersion >= network.Version16 {
|
||||||
if useFvmDebug {
|
if useFvmDebug {
|
||||||
return NewDualExecutionFVM(ctx, opts)
|
return NewDualExecutionFVM(ctx, opts)
|
||||||
@ -43,3 +64,21 @@ func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) {
|
|||||||
|
|
||||||
return NewLegacyVM(ctx, opts)
|
return NewLegacyVM(ctx, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewVM(ctx context.Context, opts *VMOpts) (Executor, error) {
|
||||||
|
switch opts.ExecutionLane {
|
||||||
|
case ExecutionLaneDefault, ExecutionLanePriority:
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("invalid execution lane: %d", opts.ExecutionLane)
|
||||||
|
}
|
||||||
|
|
||||||
|
token := execution.getToken(opts.ExecutionLane)
|
||||||
|
|
||||||
|
vmi, err := newVM(ctx, opts)
|
||||||
|
if err != nil {
|
||||||
|
token.Done()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return newVMExecutor(vmi, token), nil
|
||||||
|
}
|
||||||
|
@ -158,7 +158,7 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, params
|
|||||||
results: []*vm.ApplyRet{},
|
results: []*vm.ApplyRet{},
|
||||||
}
|
}
|
||||||
|
|
||||||
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) {
|
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) {
|
||||||
vmopt.CircSupplyCalc = func(context.Context, abi.ChainEpoch, *state.StateTree) (abi.TokenAmount, error) {
|
vmopt.CircSupplyCalc = func(context.Context, abi.ChainEpoch, *state.StateTree) (abi.TokenAmount, error) {
|
||||||
return big.Zero(), nil
|
return big.Zero(), nil
|
||||||
}
|
}
|
||||||
|
@ -58,6 +58,9 @@ var (
|
|||||||
ProtocolID, _ = tag.NewKey("proto")
|
ProtocolID, _ = tag.NewKey("proto")
|
||||||
Direction, _ = tag.NewKey("direction")
|
Direction, _ = tag.NewKey("direction")
|
||||||
UseFD, _ = tag.NewKey("use_fd")
|
UseFD, _ = tag.NewKey("use_fd")
|
||||||
|
|
||||||
|
// vm execution
|
||||||
|
ExecutionLane, _ = tag.NewKey("lane")
|
||||||
)
|
)
|
||||||
|
|
||||||
// Measures
|
// Measures
|
||||||
@ -121,6 +124,8 @@ var (
|
|||||||
VMApplyFlush = stats.Float64("vm/applyblocks_flush", "Time spent flushing vm state", stats.UnitMilliseconds)
|
VMApplyFlush = stats.Float64("vm/applyblocks_flush", "Time spent flushing vm state", stats.UnitMilliseconds)
|
||||||
VMSends = stats.Int64("vm/sends", "Counter for sends processed by the VM", stats.UnitDimensionless)
|
VMSends = stats.Int64("vm/sends", "Counter for sends processed by the VM", stats.UnitDimensionless)
|
||||||
VMApplied = stats.Int64("vm/applied", "Counter for messages (including internal messages) processed by the VM", stats.UnitDimensionless)
|
VMApplied = stats.Int64("vm/applied", "Counter for messages (including internal messages) processed by the VM", stats.UnitDimensionless)
|
||||||
|
VMExecutionWaiting = stats.Int64("vm/execution_waiting", "Counter for VM executions waiting to be assigned to a lane", stats.UnitDimensionless)
|
||||||
|
VMExecutionRunning = stats.Int64("vm/execution_running", "Counter for running VM executions", stats.UnitDimensionless)
|
||||||
|
|
||||||
// miner
|
// miner
|
||||||
WorkerCallsStarted = stats.Int64("sealing/worker_calls_started", "Counter of started worker tasks", stats.UnitDimensionless)
|
WorkerCallsStarted = stats.Int64("sealing/worker_calls_started", "Counter of started worker tasks", stats.UnitDimensionless)
|
||||||
@ -363,6 +368,16 @@ var (
|
|||||||
Measure: VMApplied,
|
Measure: VMApplied,
|
||||||
Aggregation: view.LastValue(),
|
Aggregation: view.LastValue(),
|
||||||
}
|
}
|
||||||
|
VMExecutionWaitingView = &view.View{
|
||||||
|
Measure: VMExecutionWaiting,
|
||||||
|
Aggregation: view.LastValue(),
|
||||||
|
TagKeys: []tag.Key{ExecutionLane},
|
||||||
|
}
|
||||||
|
VMExecutionRunningView = &view.View{
|
||||||
|
Measure: VMExecutionRunning,
|
||||||
|
Aggregation: view.LastValue(),
|
||||||
|
TagKeys: []tag.Key{ExecutionLane},
|
||||||
|
}
|
||||||
|
|
||||||
// miner
|
// miner
|
||||||
WorkerCallsStartedView = &view.View{
|
WorkerCallsStartedView = &view.View{
|
||||||
|
Loading…
Reference in New Issue
Block a user