Merge pull request #10551 from filecoin-project/vyzo/feat/exec-lanes
feat: VM Execution Lanes
This commit is contained in:
commit
bf666a3f7e
@ -109,6 +109,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
|
||||
TipSetGetter: stmgr.TipSetGetterForTipset(sm.ChainStore(), ts),
|
||||
Tracing: vmTracing,
|
||||
ReturnEvents: sm.ChainStore().IsStoringEvents(),
|
||||
ExecutionLane: vm.ExecutionLanePriority,
|
||||
}
|
||||
|
||||
return sm.VMConstructor()(ctx, vmopt)
|
||||
|
192
chain/vm/execution.go
Normal file
192
chain/vm/execution.go
Normal file
@ -0,0 +1,192 @@
|
||||
package vm
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"go.opencensus.io/stats"
|
||||
"go.opencensus.io/tag"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultAvailableExecutionLanes is the number of available execution lanes; it is the bound of
|
||||
// concurrent active executions.
|
||||
// This is the default value in filecoin-ffi
|
||||
DefaultAvailableExecutionLanes = 4
|
||||
// DefaultPriorityExecutionLanes is the number of reserved execution lanes for priority computations.
|
||||
// This is purely userspace, but we believe it is a reasonable default, even with more available
|
||||
// lanes.
|
||||
DefaultPriorityExecutionLanes = 2
|
||||
)
|
||||
|
||||
// the execution environment; see below for definition, methods, and initialization
|
||||
var execution *executionEnv
|
||||
|
||||
// implementation of vm executor with simple sanity check preventing use after free.
|
||||
type vmExecutor struct {
|
||||
vmi Interface
|
||||
lane ExecutionLane
|
||||
}
|
||||
|
||||
var _ Interface = (*vmExecutor)(nil)
|
||||
|
||||
func newVMExecutor(vmi Interface, lane ExecutionLane) Interface {
|
||||
return &vmExecutor{vmi: vmi, lane: lane}
|
||||
}
|
||||
|
||||
func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) {
|
||||
token := execution.getToken(e.lane)
|
||||
defer token.Done()
|
||||
|
||||
return e.vmi.ApplyMessage(ctx, cmsg)
|
||||
}
|
||||
|
||||
func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
|
||||
token := execution.getToken(e.lane)
|
||||
defer token.Done()
|
||||
|
||||
return e.vmi.ApplyImplicitMessage(ctx, msg)
|
||||
}
|
||||
|
||||
func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) {
|
||||
return e.vmi.Flush(ctx)
|
||||
}
|
||||
|
||||
type executionToken struct {
|
||||
lane ExecutionLane
|
||||
reserved int
|
||||
}
|
||||
|
||||
func (token *executionToken) Done() {
|
||||
execution.putToken(token)
|
||||
}
|
||||
|
||||
type executionEnv struct {
|
||||
mx *sync.Mutex
|
||||
cond *sync.Cond
|
||||
|
||||
// available executors
|
||||
available int
|
||||
// reserved executors
|
||||
reserved int
|
||||
}
|
||||
|
||||
func (e *executionEnv) getToken(lane ExecutionLane) *executionToken {
|
||||
metricsUp(metrics.VMExecutionWaiting, lane)
|
||||
defer metricsDown(metrics.VMExecutionWaiting, lane)
|
||||
|
||||
e.mx.Lock()
|
||||
defer e.mx.Unlock()
|
||||
|
||||
switch lane {
|
||||
case ExecutionLaneDefault:
|
||||
for e.available <= e.reserved {
|
||||
e.cond.Wait()
|
||||
}
|
||||
|
||||
e.available--
|
||||
|
||||
metricsUp(metrics.VMExecutionRunning, lane)
|
||||
return &executionToken{lane: lane, reserved: 0}
|
||||
|
||||
case ExecutionLanePriority:
|
||||
for e.available == 0 {
|
||||
e.cond.Wait()
|
||||
}
|
||||
|
||||
e.available--
|
||||
|
||||
reserving := 0
|
||||
if e.reserved > 0 {
|
||||
e.reserved--
|
||||
reserving = 1
|
||||
}
|
||||
|
||||
metricsUp(metrics.VMExecutionRunning, lane)
|
||||
return &executionToken{lane: lane, reserved: reserving}
|
||||
|
||||
default:
|
||||
// already checked at interface boundary in NewVM, so this is appropriate
|
||||
panic("bogus execution lane")
|
||||
}
|
||||
}
|
||||
|
||||
func (e *executionEnv) putToken(token *executionToken) {
|
||||
e.mx.Lock()
|
||||
defer e.mx.Unlock()
|
||||
|
||||
e.available++
|
||||
e.reserved += token.reserved
|
||||
|
||||
// Note: Signal is unsound, because a priority token could wake up a non-priority
|
||||
// goroutnie and lead to deadlock. So Broadcast it must be.
|
||||
e.cond.Broadcast()
|
||||
|
||||
metricsDown(metrics.VMExecutionRunning, token.lane)
|
||||
}
|
||||
|
||||
func metricsUp(metric *stats.Int64Measure, lane ExecutionLane) {
|
||||
metricsAdjust(metric, lane, 1)
|
||||
}
|
||||
|
||||
func metricsDown(metric *stats.Int64Measure, lane ExecutionLane) {
|
||||
metricsAdjust(metric, lane, -1)
|
||||
}
|
||||
|
||||
func metricsAdjust(metric *stats.Int64Measure, lane ExecutionLane, delta int) {
|
||||
laneName := "default"
|
||||
if lane > ExecutionLaneDefault {
|
||||
laneName = "priority"
|
||||
}
|
||||
|
||||
ctx, _ := tag.New(
|
||||
context.Background(),
|
||||
tag.Upsert(metrics.ExecutionLane, laneName),
|
||||
)
|
||||
stats.Record(ctx, metric.M(int64(delta)))
|
||||
}
|
||||
|
||||
func init() {
|
||||
var err error
|
||||
|
||||
available := DefaultAvailableExecutionLanes
|
||||
if concurrency := os.Getenv("LOTUS_FVM_CONCURRENCY"); concurrency != "" {
|
||||
available, err = strconv.Atoi(concurrency)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
priority := DefaultPriorityExecutionLanes
|
||||
if reserved := os.Getenv("LOTUS_FVM_CONCURRENCY_RESERVED"); reserved != "" {
|
||||
priority, err = strconv.Atoi(reserved)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
// some sanity checks
|
||||
if available < 2 {
|
||||
panic("insufficient execution concurrency")
|
||||
}
|
||||
|
||||
if available <= priority {
|
||||
panic("insufficient default execution concurrency")
|
||||
}
|
||||
|
||||
mx := &sync.Mutex{}
|
||||
cond := sync.NewCond(mx)
|
||||
|
||||
execution = &executionEnv{
|
||||
mx: mx,
|
||||
cond: cond,
|
||||
available: available,
|
||||
reserved: priority,
|
||||
}
|
||||
}
|
@ -250,6 +250,8 @@ type VMOpts struct {
|
||||
Tracing bool
|
||||
// ReturnEvents decodes and returns emitted events.
|
||||
ReturnEvents bool
|
||||
// ExecutionLane specifies the execution priority of the created vm
|
||||
ExecutionLane ExecutionLane
|
||||
}
|
||||
|
||||
func NewLegacyVM(ctx context.Context, opts *VMOpts) (*LegacyVM, error) {
|
||||
|
@ -2,6 +2,7 @@ package vm
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
@ -17,6 +18,15 @@ var (
|
||||
StatApplied uint64
|
||||
)
|
||||
|
||||
type ExecutionLane int
|
||||
|
||||
const (
|
||||
// ExecutionLaneDefault signifies a default, non prioritized execution lane.
|
||||
ExecutionLaneDefault ExecutionLane = iota
|
||||
// ExecutionLanePriority signifies a prioritized execution lane with reserved resources.
|
||||
ExecutionLanePriority
|
||||
)
|
||||
|
||||
type Interface interface {
|
||||
// Applies the given message onto the VM's current state, returning the result of the execution
|
||||
ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error)
|
||||
@ -33,7 +43,7 @@ type Interface interface {
|
||||
// Message failures, unexpected terminations,gas costs, etc. should all be ignored.
|
||||
var useFvmDebug = os.Getenv("LOTUS_FVM_DEVELOPER_DEBUG") == "1"
|
||||
|
||||
func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) {
|
||||
func makeVM(ctx context.Context, opts *VMOpts) (Interface, error) {
|
||||
if opts.NetworkVersion >= network.Version16 {
|
||||
if useFvmDebug {
|
||||
return NewDualExecutionFVM(ctx, opts)
|
||||
@ -43,3 +53,18 @@ func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) {
|
||||
|
||||
return NewLegacyVM(ctx, opts)
|
||||
}
|
||||
|
||||
func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) {
|
||||
switch opts.ExecutionLane {
|
||||
case ExecutionLaneDefault, ExecutionLanePriority:
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid execution lane: %d", opts.ExecutionLane)
|
||||
}
|
||||
|
||||
vmi, err := makeVM(ctx, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return newVMExecutor(vmi, opts.ExecutionLane), nil
|
||||
}
|
||||
|
@ -58,6 +58,9 @@ var (
|
||||
ProtocolID, _ = tag.NewKey("proto")
|
||||
Direction, _ = tag.NewKey("direction")
|
||||
UseFD, _ = tag.NewKey("use_fd")
|
||||
|
||||
// vm execution
|
||||
ExecutionLane, _ = tag.NewKey("lane")
|
||||
)
|
||||
|
||||
// Measures
|
||||
@ -121,6 +124,8 @@ var (
|
||||
VMApplyFlush = stats.Float64("vm/applyblocks_flush", "Time spent flushing vm state", stats.UnitMilliseconds)
|
||||
VMSends = stats.Int64("vm/sends", "Counter for sends processed by the VM", stats.UnitDimensionless)
|
||||
VMApplied = stats.Int64("vm/applied", "Counter for messages (including internal messages) processed by the VM", stats.UnitDimensionless)
|
||||
VMExecutionWaiting = stats.Int64("vm/execution_waiting", "Counter for VM executions waiting to be assigned to a lane", stats.UnitDimensionless)
|
||||
VMExecutionRunning = stats.Int64("vm/execution_running", "Counter for running VM executions", stats.UnitDimensionless)
|
||||
|
||||
// miner
|
||||
WorkerCallsStarted = stats.Int64("sealing/worker_calls_started", "Counter of started worker tasks", stats.UnitDimensionless)
|
||||
@ -363,6 +368,16 @@ var (
|
||||
Measure: VMApplied,
|
||||
Aggregation: view.LastValue(),
|
||||
}
|
||||
VMExecutionWaitingView = &view.View{
|
||||
Measure: VMExecutionWaiting,
|
||||
Aggregation: view.Sum(),
|
||||
TagKeys: []tag.Key{ExecutionLane},
|
||||
}
|
||||
VMExecutionRunningView = &view.View{
|
||||
Measure: VMExecutionRunning,
|
||||
Aggregation: view.Sum(),
|
||||
TagKeys: []tag.Key{ExecutionLane},
|
||||
}
|
||||
|
||||
// miner
|
||||
WorkerCallsStartedView = &view.View{
|
||||
@ -727,6 +742,8 @@ var ChainNodeViews = append([]*view.View{
|
||||
VMApplyFlushView,
|
||||
VMSendsView,
|
||||
VMAppliedView,
|
||||
VMExecutionWaitingView,
|
||||
VMExecutionRunningView,
|
||||
}, DefaultViews...)
|
||||
|
||||
var MinerNodeViews = append([]*view.View{
|
||||
|
Loading…
Reference in New Issue
Block a user