lotus/chain/vm/execution.go

185 lines
4.2 KiB
Go
Raw Normal View History

2023-03-23 14:53:50 +00:00
package vm
import (
"context"
"os"
"strconv"
"sync"
2023-03-28 13:58:09 +00:00
"github.com/ipfs/go-cid"
2023-03-24 13:16:22 +00:00
"go.opencensus.io/stats"
"go.opencensus.io/tag"
2023-03-23 14:53:50 +00:00
"github.com/filecoin-project/lotus/chain/types"
2023-03-24 13:16:22 +00:00
"github.com/filecoin-project/lotus/metrics"
2023-03-23 14:53:50 +00:00
)
const (
2023-03-24 13:48:58 +00:00
// DefaultAvailableExecutionLanes is the number of available execution lanes; it is the bound of
// concurrent active executions.
// This is the default value in filecoin-ffi
2023-03-23 14:53:50 +00:00
DefaultAvailableExecutionLanes = 4
2023-03-24 13:48:58 +00:00
// DefaultPriorityExecutionLanes is the number of reserved execution lanes for priority computations.
// This is purely userspace, but we believe it is a reasonable default, even with more available
// lanes.
DefaultPriorityExecutionLanes = 2
2023-03-23 14:53:50 +00:00
)
2023-03-30 15:11:44 +00:00
// the execution environment; see below for definition, methods, and initialization
2023-03-23 14:53:50 +00:00
var execution *executionEnv
// implementation of vm executor with simple sanity check preventing use after free.
type vmExecutor struct {
vmi Interface
lane ExecutionLane
2023-03-23 14:53:50 +00:00
}
var _ Interface = (*vmExecutor)(nil)
2023-03-23 14:53:50 +00:00
func newVMExecutor(vmi Interface, lane ExecutionLane) Interface {
return &vmExecutor{vmi: vmi, lane: lane}
2023-03-23 14:53:50 +00:00
}
func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) {
token := execution.getToken(ctx, e.lane)
defer token.Done()
2023-03-23 14:53:50 +00:00
return e.vmi.ApplyMessage(ctx, cmsg)
}
func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
token := execution.getToken(ctx, e.lane)
defer token.Done()
2023-03-23 14:53:50 +00:00
return e.vmi.ApplyImplicitMessage(ctx, msg)
}
func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) {
return e.vmi.Flush(ctx)
}
type executionToken struct {
2023-03-24 13:16:22 +00:00
lane ExecutionLane
2023-03-23 14:53:50 +00:00
reserved int
ctx context.Context
2023-03-23 14:53:50 +00:00
}
func (token *executionToken) Done() {
execution.putToken(token)
}
type executionEnv struct {
mx *sync.Mutex
cond *sync.Cond
// available executors
available int
// reserved executors
reserved int
}
func (e *executionEnv) getToken(ctx context.Context, lane ExecutionLane) *executionToken {
metricsUp(ctx, metrics.VMExecutionWaiting, lane)
defer metricsDown(ctx, metrics.VMExecutionWaiting, lane)
2023-03-24 13:16:22 +00:00
2023-03-23 14:53:50 +00:00
e.mx.Lock()
reserving := 0
if lane == ExecutionLaneDefault {
2023-03-23 14:53:50 +00:00
for e.available <= e.reserved {
e.cond.Wait()
}
} else {
2023-03-23 14:53:50 +00:00
for e.available == 0 {
e.cond.Wait()
}
if e.reserved > 0 {
e.reserved--
reserving = 1
}
}
2023-03-24 13:16:22 +00:00
e.available--
e.mx.Unlock()
2023-03-23 14:53:50 +00:00
metricsUp(ctx, metrics.VMExecutionRunning, lane)
return &executionToken{lane: lane, reserved: reserving, ctx: ctx}
2023-03-23 14:53:50 +00:00
}
func (e *executionEnv) putToken(token *executionToken) {
e.mx.Lock()
e.available++
e.reserved += token.reserved
2023-03-30 15:13:08 +00:00
// Note: Signal is unsound, because a priority token could wake up a non-priority
// goroutine and lead to deadlock. So Broadcast it must be.
2023-03-23 14:53:50 +00:00
e.cond.Broadcast()
e.mx.Unlock()
2023-03-24 13:16:22 +00:00
metricsDown(token.ctx, metrics.VMExecutionRunning, token.lane)
2023-03-24 13:16:22 +00:00
}
func metricsUp(ctx context.Context, metric *stats.Int64Measure, lane ExecutionLane) {
metricsAdjust(ctx, metric, lane, 1)
2023-03-24 13:16:22 +00:00
}
func metricsDown(ctx context.Context, metric *stats.Int64Measure, lane ExecutionLane) {
metricsAdjust(ctx, metric, lane, -1)
2023-03-24 13:16:22 +00:00
}
var (
defaultLaneTag = tag.Upsert(metrics.ExecutionLane, "default")
priorityLaneTag = tag.Upsert(metrics.ExecutionLane, "priority")
)
func metricsAdjust(ctx context.Context, metric *stats.Int64Measure, lane ExecutionLane, delta int) {
laneTag := defaultLaneTag
2023-03-24 13:16:22 +00:00
if lane > ExecutionLaneDefault {
laneTag = priorityLaneTag
2023-03-24 13:16:22 +00:00
}
ctx, _ = tag.New(ctx, laneTag)
2023-03-24 13:16:22 +00:00
stats.Record(ctx, metric.M(int64(delta)))
2023-03-23 14:53:50 +00:00
}
func init() {
var err error
available := DefaultAvailableExecutionLanes
if concurrency := os.Getenv("LOTUS_FVM_CONCURRENCY"); concurrency != "" {
2023-03-23 15:28:08 +00:00
available, err = strconv.Atoi(concurrency)
if err != nil {
panic(err)
}
2023-03-23 14:53:50 +00:00
}
priority := DefaultPriorityExecutionLanes
if reserved := os.Getenv("LOTUS_FVM_CONCURRENCY_RESERVED"); reserved != "" {
2023-03-23 15:28:08 +00:00
priority, err = strconv.Atoi(reserved)
if err != nil {
panic(err)
}
2023-03-23 14:53:50 +00:00
}
// some sanity checks
if available < 2 {
panic("insufficient execution concurrency")
}
if available <= priority {
panic("insufficient default execution concurrency")
}
2023-03-23 14:53:50 +00:00
mx := &sync.Mutex{}
cond := sync.NewCond(mx)
execution = &executionEnv{
mx: mx,
cond: cond,
available: available,
reserved: priority,
}
}