diff --git a/chain/consensus/compute_state.go b/chain/consensus/compute_state.go index e627a62d2..3c1bab9ca 100644 --- a/chain/consensus/compute_state.go +++ b/chain/consensus/compute_state.go @@ -93,7 +93,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, }() ctx = blockstore.WithHotView(ctx) - makeVm := func(base cid.Cid, e abi.ChainEpoch, timestamp uint64) (vm.Interface, error) { + makeVm := func(base cid.Cid, e abi.ChainEpoch, timestamp uint64) (vm.Executor, error) { vmopt := &vm.VMOpts{ StateBase: base, Epoch: e, @@ -109,6 +109,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, TipSetGetter: stmgr.TipSetGetterForTipset(sm.ChainStore(), ts), Tracing: vmTracing, ReturnEvents: sm.ChainStore().IsStoringEvents(), + ExecutionLane: vm.ExecutionLanePriority, } return sm.VMConstructor()(ctx, vmopt) @@ -116,7 +117,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, var cronGas int64 - runCron := func(vmCron vm.Interface, epoch abi.ChainEpoch) error { + runCron := func(vmCron vm.Executor, epoch abi.ChainEpoch) error { cronMsg := &types.Message{ To: cron.Address, From: builtin.SystemActorAddr, @@ -169,13 +170,17 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, // run cron for null rounds if any if err = runCron(vmCron, i); err != nil { + vmCron.Done() return cid.Undef, cid.Undef, xerrors.Errorf("running cron: %w", err) } pstate, err = vmCron.Flush(ctx) if err != nil { + vmCron.Done() return cid.Undef, cid.Undef, xerrors.Errorf("flushing cron vm: %w", err) } + + vmCron.Done() } // handle state forks @@ -191,10 +196,12 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, cronGas = 0 partDone = metrics.Timer(ctx, metrics.VMApplyMessages) + // TODO reorg the code to minimize the execution critical section vmi, err := makeVm(pstate, epoch, ts.MinTimestamp()) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("making vm: %w", err) } + defer vmi.Done() var ( receipts []*types.MessageReceipt diff --git a/chain/gen/genesis/genesis.go b/chain/gen/genesis/genesis.go index 3e8848021..3ef8de968 100644 --- a/chain/gen/genesis/genesis.go +++ b/chain/gen/genesis/genesis.go @@ -496,6 +496,7 @@ func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, sys vm.Sysca if err != nil { return cid.Undef, xerrors.Errorf("failed to create VM: %w", err) } + defer vm.Done() for mi, m := range template.Miners { for si, s := range m.Sectors { diff --git a/chain/gen/genesis/miners.go b/chain/gen/genesis/miners.go index 5f741fd7c..09b46a6e7 100644 --- a/chain/gen/genesis/miners.go +++ b/chain/gen/genesis/miners.go @@ -88,7 +88,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal return big.Zero(), nil } - newVM := func(base cid.Cid) (vm.Interface, error) { + newVM := func(base cid.Cid) (vm.Executor, error) { vmopt := &vm.VMOpts{ StateBase: base, Epoch: 0, @@ -108,6 +108,8 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal if err != nil { return cid.Undef, fmt.Errorf("creating vm: %w", err) } + // Note: genesisVm is mutated, so this has to happen in a deferred func; go horror show. + defer func() { genesisVm.Done() }() if len(miners) == 0 { return cid.Undef, xerrors.New("no genesis miners") @@ -338,6 +340,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal return cid.Undef, xerrors.Errorf("flushing state tree: %w", err) } + genesisVm.Done() genesisVm, err = newVM(nh) if err != nil { return cid.Undef, fmt.Errorf("creating new vm: %w", err) @@ -410,6 +413,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal return cid.Undef, xerrors.Errorf("flushing state tree: %w", err) } + genesisVm.Done() genesisVm, err = newVM(nh) if err != nil { return cid.Undef, fmt.Errorf("creating new vm: %w", err) @@ -517,6 +521,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal return cid.Undef, xerrors.Errorf("flushing state tree: %w", err) } + genesisVm.Done() genesisVm, err = newVM(nh) if err != nil { return cid.Undef, fmt.Errorf("creating new vm: %w", err) diff --git a/chain/stmgr/call.go b/chain/stmgr/call.go index 61056528f..a4d8d4410 100644 --- a/chain/stmgr/call.go +++ b/chain/stmgr/call.go @@ -169,6 +169,9 @@ func (sm *StateManager) callInternal(ctx context.Context, msg *types.Message, pr if err != nil { return nil, xerrors.Errorf("failed to set up vm: %w", err) } + // Note: vmi is mutated, so this has to happen in a deferred func; go horror show. + defer func() { vmi.Done() }() + for i, m := range priorMsgs { _, err = vmi.ApplyMessage(ctx, m) if err != nil { @@ -201,6 +204,7 @@ func (sm *StateManager) callInternal(ctx context.Context, msg *types.Message, pr vmopt.BaseFee = big.Zero() vmopt.StateBase = stateCid + vmi.Done() vmi, err = sm.newVM(ctx, vmopt) if err != nil { return nil, xerrors.Errorf("failed to set up estimation vm: %w", err) diff --git a/chain/stmgr/forks_test.go b/chain/stmgr/forks_test.go index bf8793488..123e9a8ff 100644 --- a/chain/stmgr/forks_test.go +++ b/chain/stmgr/forks_test.go @@ -56,6 +56,12 @@ const testForkHeight = 40 type testActor struct { } +type mockExecutor struct { + vm.Interface +} + +func (*mockExecutor) Done() {} + // must use existing actor that an account is allowed to exec. func (testActor) Code() cid.Cid { return builtin0.PaymentChannelActorCodeID } func (testActor) State() cbor.Er { return new(testActorState) } @@ -178,13 +184,13 @@ func TestForkHeightTriggers(t *testing.T) { registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}}) inv.Register(actorstypes.Version0, nil, registry) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) if err != nil { return nil, err } nvm.SetInvoker(inv) - return nvm, nil + return &mockExecutor{nvm}, nil }) cg.SetStateManager(sm) @@ -296,13 +302,13 @@ func testForkRefuseCall(t *testing.T, nullsBefore, nullsAfter int) { registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}}) inv.Register(actorstypes.Version0, nil, registry) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) if err != nil { return nil, err } nvm.SetInvoker(inv) - return nvm, nil + return &mockExecutor{nvm}, nil }) cg.SetStateManager(sm) @@ -518,13 +524,13 @@ func TestForkPreMigration(t *testing.T) { registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}}) inv.Register(actorstypes.Version0, nil, registry) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) if err != nil { return nil, err } nvm.SetInvoker(inv) - return nvm, nil + return &mockExecutor{nvm}, nil }) cg.SetStateManager(sm) @@ -592,11 +598,11 @@ func TestDisablePreMigration(t *testing.T) { registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}}) inv.Register(actorstypes.Version0, nil, registry) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) require.NoError(t, err) nvm.SetInvoker(inv) - return nvm, nil + return &mockExecutor{nvm}, nil }) cg.SetStateManager(sm) @@ -647,11 +653,11 @@ func TestMigrtionCache(t *testing.T) { registry := builtin.MakeRegistryLegacy([]rtt.VMActor{testActor{}}) inv.Register(actorstypes.Version0, nil, registry) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) require.NoError(t, err) nvm.SetInvoker(inv) - return nvm, nil + return &mockExecutor{nvm}, nil }) cg.SetStateManager(sm) @@ -691,11 +697,11 @@ func TestMigrtionCache(t *testing.T) { index.DummyMsgIndex, ) require.NoError(t, err) - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { nvm, err := vm.NewLegacyVM(ctx, vmopt) require.NoError(t, err) nvm.SetInvoker(inv) - return nvm, nil + return &mockExecutor{nvm}, nil }) ctx := context.Background() diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 2d528c91b..f8df1de5f 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -125,7 +125,7 @@ type StateManager struct { compWait map[string]chan struct{} stlk sync.Mutex genesisMsigLk sync.Mutex - newVM func(context.Context, *vm.VMOpts) (vm.Interface, error) + newVM func(context.Context, *vm.VMOpts) (vm.Executor, error) Syscalls vm.SyscallBuilder preIgnitionVesting []msig0.State postIgnitionVesting []msig0.State @@ -438,12 +438,12 @@ func (sm *StateManager) ValidateChain(ctx context.Context, ts *types.TipSet) err return nil } -func (sm *StateManager) SetVMConstructor(nvm func(context.Context, *vm.VMOpts) (vm.Interface, error)) { +func (sm *StateManager) SetVMConstructor(nvm func(context.Context, *vm.VMOpts) (vm.Executor, error)) { sm.newVM = nvm } -func (sm *StateManager) VMConstructor() func(context.Context, *vm.VMOpts) (vm.Interface, error) { - return func(ctx context.Context, opts *vm.VMOpts) (vm.Interface, error) { +func (sm *StateManager) VMConstructor() func(context.Context, *vm.VMOpts) (vm.Executor, error) { + return func(ctx context.Context, opts *vm.VMOpts) (vm.Executor, error) { return sm.newVM(ctx, opts) } } diff --git a/chain/stmgr/utils.go b/chain/stmgr/utils.go index c93267d50..78129cb16 100644 --- a/chain/stmgr/utils.go +++ b/chain/stmgr/utils.go @@ -106,6 +106,7 @@ func ComputeState(ctx context.Context, sm *StateManager, height abi.ChainEpoch, if err != nil { return cid.Undef, nil, err } + defer vmi.Done() for i, msg := range msgs { // TODO: Use the signed message length for secp messages diff --git a/chain/vm/execution.go b/chain/vm/execution.go new file mode 100644 index 000000000..0edb13884 --- /dev/null +++ b/chain/vm/execution.go @@ -0,0 +1,227 @@ +package vm + +import ( + "context" + "errors" + "os" + "strconv" + "sync" + + "go.opencensus.io/stats" + "go.opencensus.io/tag" + + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/metrics" +) + +const ( + // DefaultAvailableExecutionLanes is the number of available execution lanes; it is the bound of + // concurrent active executions. + // This is the default value in filecoin-ffi + DefaultAvailableExecutionLanes = 4 + // DefaultPriorityExecutionLanes is the number of reserved execution lanes for priority computations. + // This is purely userspace, but we believe it is a reasonable default, even with more available + // lanes. + DefaultPriorityExecutionLanes = 2 +) + +var ErrExecutorDone = errors.New("executor has been released") + +// the execution environment; see below for definition, methods, and initilization +var execution *executionEnv + +// implementation of vm executor with simple sanity check preventing use after free. +type vmExecutor struct { + lk sync.RWMutex + vmi Interface + token *executionToken + done bool +} + +var _ Executor = (*vmExecutor)(nil) + +func newVMExecutor(vmi Interface, token *executionToken) Executor { + return &vmExecutor{vmi: vmi, token: token} +} + +func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) { + e.lk.RLock() + defer e.lk.RUnlock() + + if e.done { + return nil, ErrExecutorDone + } + + return e.vmi.ApplyMessage(ctx, cmsg) +} + +func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) { + e.lk.RLock() + defer e.lk.RUnlock() + + if e.done { + return nil, ErrExecutorDone + } + + return e.vmi.ApplyImplicitMessage(ctx, msg) +} + +func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) { + e.lk.RLock() + defer e.lk.RUnlock() + + if e.done { + return cid.Undef, ErrExecutorDone + } + + return e.vmi.Flush(ctx) +} + +func (e *vmExecutor) Done() { + e.lk.Lock() + defer e.lk.Unlock() + + if !e.done { + e.token.Done() + e.token = nil + e.done = true + } +} + +type executionToken struct { + lane ExecutionLane + reserved int +} + +func (token *executionToken) Done() { + execution.putToken(token) +} + +type executionEnv struct { + mx *sync.Mutex + cond *sync.Cond + + // available executors + available int + // reserved executors + reserved int +} + +func (e *executionEnv) getToken(lane ExecutionLane) *executionToken { + metricsUp(metrics.VMExecutionWaiting, lane) + defer metricsDown(metrics.VMExecutionWaiting, lane) + + e.mx.Lock() + defer e.mx.Unlock() + + switch lane { + case ExecutionLaneDefault: + for e.available <= e.reserved { + e.cond.Wait() + } + + e.available-- + + metricsUp(metrics.VMExecutionRunning, lane) + return &executionToken{lane: lane, reserved: 0} + + case ExecutionLanePriority: + for e.available == 0 { + e.cond.Wait() + } + + e.available-- + + reserving := 0 + if e.reserved > 0 { + e.reserved-- + reserving = 1 + } + + metricsUp(metrics.VMExecutionRunning, lane) + return &executionToken{lane: lane, reserved: reserving} + + default: + // already checked at interface boundary in NewVM, so this is appropriate + panic("bogus execution lane") + } +} + +func (e *executionEnv) putToken(token *executionToken) { + e.mx.Lock() + defer e.mx.Unlock() + + e.available++ + e.reserved += token.reserved + + e.cond.Broadcast() + + metricsDown(metrics.VMExecutionRunning, token.lane) +} + +func metricsUp(metric *stats.Int64Measure, lane ExecutionLane) { + metricsAdjust(metric, lane, 1) +} + +func metricsDown(metric *stats.Int64Measure, lane ExecutionLane) { + metricsAdjust(metric, lane, -1) +} + +func metricsAdjust(metric *stats.Int64Measure, lane ExecutionLane, delta int) { + laneName := "default" + if lane > ExecutionLaneDefault { + laneName = "priority" + } + + ctx, _ := tag.New( + context.Background(), + tag.Upsert(metrics.ExecutionLane, laneName), + ) + stats.Record(ctx, metric.M(int64(delta))) +} + +func init() { + var available, priority int + var err error + + concurrency := os.Getenv("LOTUS_FVM_CONCURRENCY") + if concurrency == "" { + available = DefaultAvailableExecutionLanes + } else { + available, err = strconv.Atoi(concurrency) + if err != nil { + panic(err) + } + } + + reserved := os.Getenv("LOTUS_FVM_CONCURRENCY_RESERVED") + if reserved == "" { + priority = DefaultPriorityExecutionLanes + } else { + priority, err = strconv.Atoi(reserved) + if err != nil { + panic(err) + } + } + + // some sanity checks + if available < 2 { + panic("insufficient execution concurrency") + } + + if priority > available-1 { + panic("insufficient default execution concurrency") + } + + mx := &sync.Mutex{} + cond := sync.NewCond(mx) + + execution = &executionEnv{ + mx: mx, + cond: cond, + available: available, + reserved: priority, + } +} diff --git a/chain/vm/vm.go b/chain/vm/vm.go index c8e3f2519..6fbe7933b 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -250,6 +250,8 @@ type VMOpts struct { Tracing bool // ReturnEvents decodes and returns emitted events. ReturnEvents bool + // ExecutionLane specifies the execution priority of the created vm + ExecutionLane ExecutionLane } func NewLegacyVM(ctx context.Context, opts *VMOpts) (*LegacyVM, error) { diff --git a/chain/vm/vmi.go b/chain/vm/vmi.go index 01b32d4ad..7aa52b585 100644 --- a/chain/vm/vmi.go +++ b/chain/vm/vmi.go @@ -2,6 +2,7 @@ package vm import ( "context" + "fmt" "os" cid "github.com/ipfs/go-cid" @@ -17,6 +18,15 @@ var ( StatApplied uint64 ) +type ExecutionLane int + +const ( + // ExecutionLaneDefault signifies a default, non prioritized execution lane. + ExecutionLaneDefault ExecutionLane = iota + // ExecutionLanePriority signifies a prioritized execution lane with reserved resources. + ExecutionLanePriority +) + type Interface interface { // Applies the given message onto the VM's current state, returning the result of the execution ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) @@ -27,13 +37,24 @@ type Interface interface { Flush(ctx context.Context) (cid.Cid, error) } +// Executor is the general vm execution interface, which is prioritized according to execution lanes. +// User must call Done when it is done with this executor to release resource holds by the execution +// environment +type Executor interface { + Interface + + // Done must be called when done with the executor to release resource holds. + // It is an error to invoke Interface methods after Done has been called. + Done() +} + // WARNING: You will not affect your node's execution by misusing this feature, but you will confuse yourself thoroughly! // An envvar that allows the user to specify debug actors bundles to be used by the FVM // alongside regular execution. This is basically only to be used to print out specific logging information. // Message failures, unexpected terminations,gas costs, etc. should all be ignored. var useFvmDebug = os.Getenv("LOTUS_FVM_DEVELOPER_DEBUG") == "1" -func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) { +func newVM(ctx context.Context, opts *VMOpts) (Interface, error) { if opts.NetworkVersion >= network.Version16 { if useFvmDebug { return NewDualExecutionFVM(ctx, opts) @@ -43,3 +64,21 @@ func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) { return NewLegacyVM(ctx, opts) } + +func NewVM(ctx context.Context, opts *VMOpts) (Executor, error) { + switch opts.ExecutionLane { + case ExecutionLaneDefault, ExecutionLanePriority: + default: + return nil, fmt.Errorf("invalid execution lane: %d", opts.ExecutionLane) + } + + token := execution.getToken(opts.ExecutionLane) + + vmi, err := newVM(ctx, opts) + if err != nil { + token.Done() + return nil, err + } + + return newVMExecutor(vmi, token), nil +} diff --git a/conformance/driver.go b/conformance/driver.go index e0d56d074..c3041be71 100644 --- a/conformance/driver.go +++ b/conformance/driver.go @@ -158,7 +158,7 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, params results: []*vm.ApplyRet{}, } - sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) { + sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Executor, error) { vmopt.CircSupplyCalc = func(context.Context, abi.ChainEpoch, *state.StateTree) (abi.TokenAmount, error) { return big.Zero(), nil } diff --git a/metrics/metrics.go b/metrics/metrics.go index ca638ac27..bd1295d17 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -58,6 +58,9 @@ var ( ProtocolID, _ = tag.NewKey("proto") Direction, _ = tag.NewKey("direction") UseFD, _ = tag.NewKey("use_fd") + + // vm execution + ExecutionLane, _ = tag.NewKey("lane") ) // Measures @@ -121,6 +124,8 @@ var ( VMApplyFlush = stats.Float64("vm/applyblocks_flush", "Time spent flushing vm state", stats.UnitMilliseconds) VMSends = stats.Int64("vm/sends", "Counter for sends processed by the VM", stats.UnitDimensionless) VMApplied = stats.Int64("vm/applied", "Counter for messages (including internal messages) processed by the VM", stats.UnitDimensionless) + VMExecutionWaiting = stats.Int64("vm/execution_waiting", "Counter for VM executions waiting to be assigned to a lane", stats.UnitDimensionless) + VMExecutionRunning = stats.Int64("vm/execution_running", "Counter for running VM executions", stats.UnitDimensionless) // miner WorkerCallsStarted = stats.Int64("sealing/worker_calls_started", "Counter of started worker tasks", stats.UnitDimensionless) @@ -363,6 +368,16 @@ var ( Measure: VMApplied, Aggregation: view.LastValue(), } + VMExecutionWaitingView = &view.View{ + Measure: VMExecutionWaiting, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{ExecutionLane}, + } + VMExecutionRunningView = &view.View{ + Measure: VMExecutionRunning, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{ExecutionLane}, + } // miner WorkerCallsStartedView = &view.View{