package vm import ( "context" "os" "strconv" "sync" "github.com/ipfs/go-cid" "go.opencensus.io/stats" "go.opencensus.io/tag" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/metrics" ) const ( // DefaultAvailableExecutionLanes is the number of available execution lanes; it is the bound of // concurrent active executions. // This is the default value in filecoin-ffi DefaultAvailableExecutionLanes = 4 // DefaultPriorityExecutionLanes is the number of reserved execution lanes for priority computations. // This is purely userspace, but we believe it is a reasonable default, even with more available // lanes. DefaultPriorityExecutionLanes = 2 ) // the execution environment; see below for definition, methods, and initialization var execution *executionEnv // implementation of vm executor with simple sanity check preventing use after free. type vmExecutor struct { vmi Interface lane ExecutionLane } var _ Interface = (*vmExecutor)(nil) func newVMExecutor(vmi Interface, lane ExecutionLane) Interface { return &vmExecutor{vmi: vmi, lane: lane} } func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) { token := execution.getToken(e.lane) defer token.Done() return e.vmi.ApplyMessage(ctx, cmsg) } func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) { token := execution.getToken(e.lane) defer token.Done() return e.vmi.ApplyImplicitMessage(ctx, msg) } func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) { return e.vmi.Flush(ctx) } type executionToken struct { lane ExecutionLane reserved int } func (token *executionToken) Done() { execution.putToken(token) } type executionEnv struct { mx *sync.Mutex cond *sync.Cond // available executors available int // reserved executors reserved int } func (e *executionEnv) getToken(lane ExecutionLane) *executionToken { metricsUp(metrics.VMExecutionWaiting, lane) defer metricsDown(metrics.VMExecutionWaiting, lane) e.mx.Lock() defer e.mx.Unlock() switch lane { case ExecutionLaneDefault: for e.available <= e.reserved { e.cond.Wait() } e.available-- metricsUp(metrics.VMExecutionRunning, lane) return &executionToken{lane: lane, reserved: 0} case ExecutionLanePriority: for e.available == 0 { e.cond.Wait() } e.available-- reserving := 0 if e.reserved > 0 { e.reserved-- reserving = 1 } metricsUp(metrics.VMExecutionRunning, lane) return &executionToken{lane: lane, reserved: reserving} default: // already checked at interface boundary in NewVM, so this is appropriate panic("bogus execution lane") } } func (e *executionEnv) putToken(token *executionToken) { e.mx.Lock() defer e.mx.Unlock() e.available++ e.reserved += token.reserved // Note: Signal is unsound, because a priority token could wake up a non-priority // goroutnie and lead to deadlock. So Broadcast it must be. e.cond.Broadcast() metricsDown(metrics.VMExecutionRunning, token.lane) } func metricsUp(metric *stats.Int64Measure, lane ExecutionLane) { metricsAdjust(metric, lane, 1) } func metricsDown(metric *stats.Int64Measure, lane ExecutionLane) { metricsAdjust(metric, lane, -1) } func metricsAdjust(metric *stats.Int64Measure, lane ExecutionLane, delta int) { laneName := "default" if lane > ExecutionLaneDefault { laneName = "priority" } ctx, _ := tag.New( context.Background(), tag.Upsert(metrics.ExecutionLane, laneName), ) stats.Record(ctx, metric.M(int64(delta))) } func init() { var err error available := DefaultAvailableExecutionLanes if concurrency := os.Getenv("LOTUS_FVM_CONCURRENCY"); concurrency != "" { available, err = strconv.Atoi(concurrency) if err != nil { panic(err) } } priority := DefaultPriorityExecutionLanes if reserved := os.Getenv("LOTUS_FVM_CONCURRENCY_RESERVED"); reserved != "" { priority, err = strconv.Atoi(reserved) if err != nil { panic(err) } } // some sanity checks if available < 2 { panic("insufficient execution concurrency") } if available <= priority { panic("insufficient default execution concurrency") } mx := &sync.Mutex{} cond := sync.NewCond(mx) execution = &executionEnv{ mx: mx, cond: cond, available: available, reserved: priority, } }