introduce execution lanes

This commit is contained in:
vyzo 2023-03-23 16:53:50 +02:00
parent 1022ac5e2b
commit 6550abdfcc
3 changed files with 215 additions and 1 deletions

173
chain/vm/execution.go Normal file
View File

@ -0,0 +1,173 @@
package vm
import (
"context"
"errors"
"os"
"strconv"
"sync"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/chain/types"
)
const (
DefaultAvailableExecutionLanes = 4
DefaultPriorityExecutionLanes = 2
)
var ErrExecutorDone = errors.New("executor has been released")
// the execution environment; see below for definition, methods, and initilization
var execution *executionEnv
// implementation of vm executor with simple sanity check preventing use after free.
type vmExecutor struct {
lk sync.RWMutex
vmi Interface
token *executionToken
done bool
}
var _ Executor = (*vmExecutor)(nil)
func newVMExecutor(vmi Interface, token *executionToken) Executor {
return &vmExecutor{vmi: vmi, token: token}
}
func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) {
e.lk.RLock()
defer e.lk.RUnlock()
if e.done {
return nil, ErrExecutorDone
}
return e.vmi.ApplyMessage(ctx, cmsg)
}
func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
e.lk.RLock()
defer e.lk.RUnlock()
if e.done {
return nil, ErrExecutorDone
}
return e.vmi.ApplyImplicitMessage(ctx, msg)
}
func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) {
e.lk.RLock()
defer e.lk.RUnlock()
if e.done {
return cid.Undef, ErrExecutorDone
}
return e.vmi.Flush(ctx)
}
func (e *vmExecutor) Done() {
e.lk.Lock()
defer e.lk.Unlock()
e.token.Done()
e.token = nil
e.done = true
}
type executionToken struct {
reserved int
}
func (token *executionToken) Done() {
execution.putToken(token)
}
type executionEnv struct {
mx *sync.Mutex
cond *sync.Cond
// available executors
available int
// reserved executors
reserved int
}
func (e *executionEnv) getToken(lane ExecutionLane) *executionToken {
e.mx.Lock()
defer e.mx.Unlock()
switch lane {
case ExecutionLaneDefault:
for e.available <= e.reserved {
e.cond.Wait()
}
e.available--
return &executionToken{reserved: 0}
case ExecutionLanePriority:
for e.available == 0 {
e.cond.Wait()
}
e.available--
reserving := 0
if e.reserved > 0 {
e.reserved--
reserving = 1
}
return &executionToken{reserved: reserving}
default:
// already checked at interface boundary in NewVM, so this is appropriate
panic("bogus execution lane")
}
}
func (e *executionEnv) putToken(token *executionToken) {
e.mx.Lock()
defer e.mx.Unlock()
e.available++
e.reserved += token.reserved
e.cond.Broadcast()
}
func init() {
var available, priority int
var err error
concurrency := os.Getenv("LOTUS_FVM_CONCURRENCY")
if concurrency == "" {
available = DefaultAvailableExecutionLanes
}
available, err = strconv.Atoi(concurrency)
if err != nil {
panic(err)
}
reserved := os.Getenv("LOTUS_FVM_CONCURRENCY_RESERVED")
if reserved == "" {
priority = DefaultPriorityExecutionLanes
}
priority, err = strconv.Atoi(reserved)
if err != nil {
panic(err)
}
mx := &sync.Mutex{}
cond := sync.NewCond(mx)
execution = &executionEnv{
mx: mx,
cond: cond,
available: available,
reserved: priority,
}
}

View File

@ -250,6 +250,8 @@ type VMOpts struct {
Tracing bool Tracing bool
// ReturnEvents decodes and returns emitted events. // ReturnEvents decodes and returns emitted events.
ReturnEvents bool ReturnEvents bool
// ExecutionLane specifies the execution priority of the created vm
ExecutionLane ExecutionLane
} }
func NewLegacyVM(ctx context.Context, opts *VMOpts) (*LegacyVM, error) { func NewLegacyVM(ctx context.Context, opts *VMOpts) (*LegacyVM, error) {

View File

@ -2,6 +2,7 @@ package vm
import ( import (
"context" "context"
"fmt"
"os" "os"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
@ -17,6 +18,15 @@ var (
StatApplied uint64 StatApplied uint64
) )
type ExecutionLane int
const (
// ExecutionLaneDefault signifies a default, non prioritized execution lane.
ExecutionLaneDefault ExecutionLane = iota
// ExecutionLanePriority signifies a prioritized execution lane with reserved resources.
ExecutionLanePriority
)
type Interface interface { type Interface interface {
// Applies the given message onto the VM's current state, returning the result of the execution // Applies the given message onto the VM's current state, returning the result of the execution
ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error)
@ -27,13 +37,24 @@ type Interface interface {
Flush(ctx context.Context) (cid.Cid, error) Flush(ctx context.Context) (cid.Cid, error)
} }
// Executor is the general vm execution interface, which is prioritized according to execution langes.
// User must call Done when it is done with this executor to release resource holds by the execution
// environment
type Executor interface {
Interface
// Done must be called when done with the executor to release resource holds.
// It is an error to invoke Interface methods after Done has been called.
Done()
}
// WARNING: You will not affect your node's execution by misusing this feature, but you will confuse yourself thoroughly! // WARNING: You will not affect your node's execution by misusing this feature, but you will confuse yourself thoroughly!
// An envvar that allows the user to specify debug actors bundles to be used by the FVM // An envvar that allows the user to specify debug actors bundles to be used by the FVM
// alongside regular execution. This is basically only to be used to print out specific logging information. // alongside regular execution. This is basically only to be used to print out specific logging information.
// Message failures, unexpected terminations,gas costs, etc. should all be ignored. // Message failures, unexpected terminations,gas costs, etc. should all be ignored.
var useFvmDebug = os.Getenv("LOTUS_FVM_DEVELOPER_DEBUG") == "1" var useFvmDebug = os.Getenv("LOTUS_FVM_DEVELOPER_DEBUG") == "1"
func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) { func newVM(ctx context.Context, opts *VMOpts) (Interface, error) {
if opts.NetworkVersion >= network.Version16 { if opts.NetworkVersion >= network.Version16 {
if useFvmDebug { if useFvmDebug {
return NewDualExecutionFVM(ctx, opts) return NewDualExecutionFVM(ctx, opts)
@ -43,3 +64,21 @@ func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) {
return NewLegacyVM(ctx, opts) return NewLegacyVM(ctx, opts)
} }
func NewVM(ctx context.Context, opts *VMOpts) (Executor, error) {
switch opts.ExecutionLane {
case ExecutionLaneDefault, ExecutionLanePriority:
default:
return nil, fmt.Errorf("invalid execution lane: %d", opts.ExecutionLane)
}
token := execution.getToken(opts.ExecutionLane)
vmi, err := newVM(ctx, opts)
if err != nil {
token.Done()
return nil, err
}
return newVMExecutor(vmi, token), nil
}