diff --git a/chain/vm/execution.go b/chain/vm/execution.go new file mode 100644 index 000000000..e55883dae --- /dev/null +++ b/chain/vm/execution.go @@ -0,0 +1,173 @@ +package vm + +import ( + "context" + "errors" + "os" + "strconv" + "sync" + + "github.com/ipfs/go-cid" + + "github.com/filecoin-project/lotus/chain/types" +) + +const ( + DefaultAvailableExecutionLanes = 4 + DefaultPriorityExecutionLanes = 2 +) + +var ErrExecutorDone = errors.New("executor has been released") + +// the execution environment; see below for definition, methods, and initilization +var execution *executionEnv + +// implementation of vm executor with simple sanity check preventing use after free. +type vmExecutor struct { + lk sync.RWMutex + vmi Interface + token *executionToken + done bool +} + +var _ Executor = (*vmExecutor)(nil) + +func newVMExecutor(vmi Interface, token *executionToken) Executor { + return &vmExecutor{vmi: vmi, token: token} +} + +func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) { + e.lk.RLock() + defer e.lk.RUnlock() + + if e.done { + return nil, ErrExecutorDone + } + + return e.vmi.ApplyMessage(ctx, cmsg) +} + +func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) { + e.lk.RLock() + defer e.lk.RUnlock() + + if e.done { + return nil, ErrExecutorDone + } + + return e.vmi.ApplyImplicitMessage(ctx, msg) +} + +func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) { + e.lk.RLock() + defer e.lk.RUnlock() + + if e.done { + return cid.Undef, ErrExecutorDone + } + + return e.vmi.Flush(ctx) +} + +func (e *vmExecutor) Done() { + e.lk.Lock() + defer e.lk.Unlock() + + e.token.Done() + e.token = nil + e.done = true +} + +type executionToken struct { + reserved int +} + +func (token *executionToken) Done() { + execution.putToken(token) +} + +type executionEnv struct { + mx *sync.Mutex + cond *sync.Cond + + // available executors + available int + // reserved executors + reserved int +} + +func (e *executionEnv) getToken(lane ExecutionLane) *executionToken { + e.mx.Lock() + defer e.mx.Unlock() + + switch lane { + case ExecutionLaneDefault: + for e.available <= e.reserved { + e.cond.Wait() + } + + e.available-- + return &executionToken{reserved: 0} + + case ExecutionLanePriority: + for e.available == 0 { + e.cond.Wait() + } + + e.available-- + + reserving := 0 + if e.reserved > 0 { + e.reserved-- + reserving = 1 + } + return &executionToken{reserved: reserving} + + default: + // already checked at interface boundary in NewVM, so this is appropriate + panic("bogus execution lane") + } +} + +func (e *executionEnv) putToken(token *executionToken) { + e.mx.Lock() + defer e.mx.Unlock() + + e.available++ + e.reserved += token.reserved + + e.cond.Broadcast() +} + +func init() { + var available, priority int + var err error + + concurrency := os.Getenv("LOTUS_FVM_CONCURRENCY") + if concurrency == "" { + available = DefaultAvailableExecutionLanes + } + available, err = strconv.Atoi(concurrency) + if err != nil { + panic(err) + } + + reserved := os.Getenv("LOTUS_FVM_CONCURRENCY_RESERVED") + if reserved == "" { + priority = DefaultPriorityExecutionLanes + } + priority, err = strconv.Atoi(reserved) + if err != nil { + panic(err) + } + + mx := &sync.Mutex{} + cond := sync.NewCond(mx) + + execution = &executionEnv{ + mx: mx, + cond: cond, + available: available, + reserved: priority, + } +} diff --git a/chain/vm/vm.go b/chain/vm/vm.go index c8e3f2519..6fbe7933b 100644 --- a/chain/vm/vm.go +++ b/chain/vm/vm.go @@ -250,6 +250,8 @@ type VMOpts struct { Tracing bool // ReturnEvents decodes and returns emitted events. ReturnEvents bool + // ExecutionLane specifies the execution priority of the created vm + ExecutionLane ExecutionLane } func NewLegacyVM(ctx context.Context, opts *VMOpts) (*LegacyVM, error) { diff --git a/chain/vm/vmi.go b/chain/vm/vmi.go index 01b32d4ad..e5d5daff8 100644 --- a/chain/vm/vmi.go +++ b/chain/vm/vmi.go @@ -2,6 +2,7 @@ package vm import ( "context" + "fmt" "os" cid "github.com/ipfs/go-cid" @@ -17,6 +18,15 @@ var ( StatApplied uint64 ) +type ExecutionLane int + +const ( + // ExecutionLaneDefault signifies a default, non prioritized execution lane. + ExecutionLaneDefault ExecutionLane = iota + // ExecutionLanePriority signifies a prioritized execution lane with reserved resources. + ExecutionLanePriority +) + type Interface interface { // Applies the given message onto the VM's current state, returning the result of the execution ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) @@ -27,13 +37,24 @@ type Interface interface { Flush(ctx context.Context) (cid.Cid, error) } +// Executor is the general vm execution interface, which is prioritized according to execution langes. +// User must call Done when it is done with this executor to release resource holds by the execution +// environment +type Executor interface { + Interface + + // Done must be called when done with the executor to release resource holds. + // It is an error to invoke Interface methods after Done has been called. + Done() +} + // WARNING: You will not affect your node's execution by misusing this feature, but you will confuse yourself thoroughly! // An envvar that allows the user to specify debug actors bundles to be used by the FVM // alongside regular execution. This is basically only to be used to print out specific logging information. // Message failures, unexpected terminations,gas costs, etc. should all be ignored. var useFvmDebug = os.Getenv("LOTUS_FVM_DEVELOPER_DEBUG") == "1" -func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) { +func newVM(ctx context.Context, opts *VMOpts) (Interface, error) { if opts.NetworkVersion >= network.Version16 { if useFvmDebug { return NewDualExecutionFVM(ctx, opts) @@ -43,3 +64,21 @@ func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) { return NewLegacyVM(ctx, opts) } + +func NewVM(ctx context.Context, opts *VMOpts) (Executor, error) { + switch opts.ExecutionLane { + case ExecutionLaneDefault, ExecutionLanePriority: + default: + return nil, fmt.Errorf("invalid execution lane: %d", opts.ExecutionLane) + } + + token := execution.getToken(opts.ExecutionLane) + + vmi, err := newVM(ctx, opts) + if err != nil { + token.Done() + return nil, err + } + + return newVMExecutor(vmi, token), nil +}