199 lines
4.2 KiB
Go
199 lines
4.2 KiB
Go
package vm
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"os"
|
|
"strconv"
|
|
"sync"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
"go.opencensus.io/stats"
|
|
"go.opencensus.io/tag"
|
|
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/metrics"
|
|
)
|
|
|
|
const (
|
|
// DefaultAvailableExecutionLanes is the number of available execution lanes; it is the bound of
|
|
// concurrent active executions.
|
|
// This is the default value in filecoin-ffi
|
|
DefaultAvailableExecutionLanes = 4
|
|
// DefaultPriorityExecutionLanes is the number of reserved execution lanes for priority computations.
|
|
// This is purely userspace, but we believe it is a reasonable default, even with more available
|
|
// lanes.
|
|
DefaultPriorityExecutionLanes = 2
|
|
)
|
|
|
|
var ErrExecutorDone = errors.New("executor has been released")
|
|
|
|
// the execution environment; see below for definition, methods, and initilization
|
|
var execution *executionEnv
|
|
|
|
// implementation of vm executor with simple sanity check preventing use after free.
|
|
type vmExecutor struct {
|
|
vmi Interface
|
|
lane ExecutionLane
|
|
}
|
|
|
|
var _ Interface = (*vmExecutor)(nil)
|
|
|
|
func newVMExecutor(vmi Interface, lane ExecutionLane) Interface {
|
|
return &vmExecutor{vmi: vmi, lane: lane}
|
|
}
|
|
|
|
func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) {
|
|
token := execution.getToken(e.lane)
|
|
defer token.Done()
|
|
|
|
return e.vmi.ApplyMessage(ctx, cmsg)
|
|
}
|
|
|
|
func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
|
|
token := execution.getToken(e.lane)
|
|
defer token.Done()
|
|
|
|
return e.vmi.ApplyImplicitMessage(ctx, msg)
|
|
}
|
|
|
|
func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) {
|
|
return e.vmi.Flush(ctx)
|
|
}
|
|
|
|
type executionToken struct {
|
|
lane ExecutionLane
|
|
reserved int
|
|
}
|
|
|
|
func (token *executionToken) Done() {
|
|
execution.putToken(token)
|
|
}
|
|
|
|
type executionEnv struct {
|
|
mx *sync.Mutex
|
|
cond *sync.Cond
|
|
|
|
// available executors
|
|
available int
|
|
// reserved executors
|
|
reserved int
|
|
}
|
|
|
|
func (e *executionEnv) getToken(lane ExecutionLane) *executionToken {
|
|
metricsUp(metrics.VMExecutionWaiting, lane)
|
|
defer metricsDown(metrics.VMExecutionWaiting, lane)
|
|
|
|
e.mx.Lock()
|
|
defer e.mx.Unlock()
|
|
|
|
switch lane {
|
|
case ExecutionLaneDefault:
|
|
for e.available <= e.reserved {
|
|
e.cond.Wait()
|
|
}
|
|
|
|
e.available--
|
|
|
|
metricsUp(metrics.VMExecutionRunning, lane)
|
|
return &executionToken{lane: lane, reserved: 0}
|
|
|
|
case ExecutionLanePriority:
|
|
for e.available == 0 {
|
|
e.cond.Wait()
|
|
}
|
|
|
|
e.available--
|
|
|
|
reserving := 0
|
|
if e.reserved > 0 {
|
|
e.reserved--
|
|
reserving = 1
|
|
}
|
|
|
|
metricsUp(metrics.VMExecutionRunning, lane)
|
|
return &executionToken{lane: lane, reserved: reserving}
|
|
|
|
default:
|
|
// already checked at interface boundary in NewVM, so this is appropriate
|
|
panic("bogus execution lane")
|
|
}
|
|
}
|
|
|
|
func (e *executionEnv) putToken(token *executionToken) {
|
|
e.mx.Lock()
|
|
defer e.mx.Unlock()
|
|
|
|
e.available++
|
|
e.reserved += token.reserved
|
|
|
|
e.cond.Broadcast()
|
|
|
|
metricsDown(metrics.VMExecutionRunning, token.lane)
|
|
}
|
|
|
|
func metricsUp(metric *stats.Int64Measure, lane ExecutionLane) {
|
|
metricsAdjust(metric, lane, 1)
|
|
}
|
|
|
|
func metricsDown(metric *stats.Int64Measure, lane ExecutionLane) {
|
|
metricsAdjust(metric, lane, -1)
|
|
}
|
|
|
|
func metricsAdjust(metric *stats.Int64Measure, lane ExecutionLane, delta int) {
|
|
laneName := "default"
|
|
if lane > ExecutionLaneDefault {
|
|
laneName = "priority"
|
|
}
|
|
|
|
ctx, _ := tag.New(
|
|
context.Background(),
|
|
tag.Upsert(metrics.ExecutionLane, laneName),
|
|
)
|
|
stats.Record(ctx, metric.M(int64(delta)))
|
|
}
|
|
|
|
func init() {
|
|
var available, priority int
|
|
var err error
|
|
|
|
concurrency := os.Getenv("LOTUS_FVM_CONCURRENCY")
|
|
if concurrency == "" {
|
|
available = DefaultAvailableExecutionLanes
|
|
} else {
|
|
available, err = strconv.Atoi(concurrency)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
reserved := os.Getenv("LOTUS_FVM_CONCURRENCY_RESERVED")
|
|
if reserved == "" {
|
|
priority = DefaultPriorityExecutionLanes
|
|
} else {
|
|
priority, err = strconv.Atoi(reserved)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
// some sanity checks
|
|
if available < 2 {
|
|
panic("insufficient execution concurrency")
|
|
}
|
|
|
|
if available <= priority {
|
|
panic("insufficient default execution concurrency")
|
|
}
|
|
|
|
mx := &sync.Mutex{}
|
|
cond := sync.NewCond(mx)
|
|
|
|
execution = &executionEnv{
|
|
mx: mx,
|
|
cond: cond,
|
|
available: available,
|
|
reserved: priority,
|
|
}
|
|
}
|