refactor execution lanes: hide the lock

This commit is contained in:
vyzo 2023-03-29 16:45:45 +03:00
parent 71650cd8a4
commit 4184ce9c75
8 changed files with 18 additions and 75 deletions

View File

@ -93,7 +93,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
}() }()
ctx = blockstore.WithHotView(ctx) ctx = blockstore.WithHotView(ctx)
makeVm := func(base cid.Cid, e abi.ChainEpoch, timestamp uint64) (vm.Executor, error) { makeVm := func(base cid.Cid, e abi.ChainEpoch, timestamp uint64) (vm.Interface, error) {
vmopt := &vm.VMOpts{ vmopt := &vm.VMOpts{
StateBase: base, StateBase: base,
Epoch: e, Epoch: e,
@ -117,7 +117,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
var cronGas int64 var cronGas int64
runCron := func(vmCron vm.Executor, epoch abi.ChainEpoch) error { runCron := func(vmCron vm.Interface, epoch abi.ChainEpoch) error {
cronMsg := &types.Message{ cronMsg := &types.Message{
To: cron.Address, To: cron.Address,
From: builtin.SystemActorAddr, From: builtin.SystemActorAddr,
@ -170,17 +170,13 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
// run cron for null rounds if any // run cron for null rounds if any
if err = runCron(vmCron, i); err != nil { if err = runCron(vmCron, i); err != nil {
vmCron.Done()
return cid.Undef, cid.Undef, xerrors.Errorf("running cron: %w", err) return cid.Undef, cid.Undef, xerrors.Errorf("running cron: %w", err)
} }
pstate, err = vmCron.Flush(ctx) pstate, err = vmCron.Flush(ctx)
if err != nil { if err != nil {
vmCron.Done()
return cid.Undef, cid.Undef, xerrors.Errorf("flushing cron vm: %w", err) return cid.Undef, cid.Undef, xerrors.Errorf("flushing cron vm: %w", err)
} }
vmCron.Done()
} }
// handle state forks // handle state forks
@ -201,7 +197,6 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
if err != nil { if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("making vm: %w", err) return cid.Undef, cid.Undef, xerrors.Errorf("making vm: %w", err)
} }
defer vmi.Done()
var ( var (
receipts []*types.MessageReceipt receipts []*types.MessageReceipt

View File

@ -496,7 +496,6 @@ func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, sys vm.Sysca
if err != nil { if err != nil {
return cid.Undef, xerrors.Errorf("failed to create VM: %w", err) return cid.Undef, xerrors.Errorf("failed to create VM: %w", err)
} }
defer vm.Done()
for mi, m := range template.Miners { for mi, m := range template.Miners {
for si, s := range m.Sectors { for si, s := range m.Sectors {

View File

@ -88,7 +88,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal
return big.Zero(), nil return big.Zero(), nil
} }
newVM := func(base cid.Cid) (vm.Executor, error) { newVM := func(base cid.Cid) (vm.Interface, error) {
vmopt := &vm.VMOpts{ vmopt := &vm.VMOpts{
StateBase: base, StateBase: base,
Epoch: 0, Epoch: 0,
@ -108,8 +108,6 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal
if err != nil { if err != nil {
return cid.Undef, fmt.Errorf("creating vm: %w", err) return cid.Undef, fmt.Errorf("creating vm: %w", err)
} }
// Note: genesisVm is mutated, so this has to happen in a deferred func; go horror show.
defer func() { genesisVm.Done() }()
if len(miners) == 0 { if len(miners) == 0 {
return cid.Undef, xerrors.New("no genesis miners") return cid.Undef, xerrors.New("no genesis miners")
@ -340,7 +338,6 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal
return cid.Undef, xerrors.Errorf("flushing state tree: %w", err) return cid.Undef, xerrors.Errorf("flushing state tree: %w", err)
} }
genesisVm.Done()
genesisVm, err = newVM(nh) genesisVm, err = newVM(nh)
if err != nil { if err != nil {
return cid.Undef, fmt.Errorf("creating new vm: %w", err) return cid.Undef, fmt.Errorf("creating new vm: %w", err)
@ -413,7 +410,6 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal
return cid.Undef, xerrors.Errorf("flushing state tree: %w", err) return cid.Undef, xerrors.Errorf("flushing state tree: %w", err)
} }
genesisVm.Done()
genesisVm, err = newVM(nh) genesisVm, err = newVM(nh)
if err != nil { if err != nil {
return cid.Undef, fmt.Errorf("creating new vm: %w", err) return cid.Undef, fmt.Errorf("creating new vm: %w", err)
@ -521,7 +517,6 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal
return cid.Undef, xerrors.Errorf("flushing state tree: %w", err) return cid.Undef, xerrors.Errorf("flushing state tree: %w", err)
} }
genesisVm.Done()
genesisVm, err = newVM(nh) genesisVm, err = newVM(nh)
if err != nil { if err != nil {
return cid.Undef, fmt.Errorf("creating new vm: %w", err) return cid.Undef, fmt.Errorf("creating new vm: %w", err)

View File

@ -159,8 +159,6 @@ func (sm *StateManager) callInternal(ctx context.Context, msg *types.Message, pr
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to set up vm: %w", err) return nil, xerrors.Errorf("failed to set up vm: %w", err)
} }
// Note: vmi is mutated, so this has to happen in a deferred func; go horror show.
defer func() { vmi.Done() }()
for i, m := range priorMsgs { for i, m := range priorMsgs {
_, err = vmi.ApplyMessage(ctx, m) _, err = vmi.ApplyMessage(ctx, m)
@ -194,7 +192,6 @@ func (sm *StateManager) callInternal(ctx context.Context, msg *types.Message, pr
vmopt.BaseFee = big.Zero() vmopt.BaseFee = big.Zero()
vmopt.StateBase = stateCid vmopt.StateBase = stateCid
vmi.Done()
vmi, err = sm.newVM(ctx, vmopt) vmi, err = sm.newVM(ctx, vmopt)
if err != nil { if err != nil {
return nil, xerrors.Errorf("failed to set up estimation vm: %w", err) return nil, xerrors.Errorf("failed to set up estimation vm: %w", err)

View File

@ -125,7 +125,7 @@ type StateManager struct {
compWait map[string]chan struct{} compWait map[string]chan struct{}
stlk sync.Mutex stlk sync.Mutex
genesisMsigLk sync.Mutex genesisMsigLk sync.Mutex
newVM func(context.Context, *vm.VMOpts) (vm.Executor, error) newVM func(context.Context, *vm.VMOpts) (vm.Interface, error)
Syscalls vm.SyscallBuilder Syscalls vm.SyscallBuilder
preIgnitionVesting []msig0.State preIgnitionVesting []msig0.State
postIgnitionVesting []msig0.State postIgnitionVesting []msig0.State
@ -439,12 +439,12 @@ func (sm *StateManager) ValidateChain(ctx context.Context, ts *types.TipSet) err
return nil return nil
} }
func (sm *StateManager) SetVMConstructor(nvm func(context.Context, *vm.VMOpts) (vm.Executor, error)) { func (sm *StateManager) SetVMConstructor(nvm func(context.Context, *vm.VMOpts) (vm.Interface, error)) {
sm.newVM = nvm sm.newVM = nvm
} }
func (sm *StateManager) VMConstructor() func(context.Context, *vm.VMOpts) (vm.Executor, error) { func (sm *StateManager) VMConstructor() func(context.Context, *vm.VMOpts) (vm.Interface, error) {
return func(ctx context.Context, opts *vm.VMOpts) (vm.Executor, error) { return func(ctx context.Context, opts *vm.VMOpts) (vm.Interface, error) {
return sm.newVM(ctx, opts) return sm.newVM(ctx, opts)
} }
} }

View File

@ -106,7 +106,6 @@ func ComputeState(ctx context.Context, sm *StateManager, height abi.ChainEpoch,
if err != nil { if err != nil {
return cid.Undef, nil, err return cid.Undef, nil, err
} }
defer vmi.Done()
for i, msg := range msgs { for i, msg := range msgs {
// TODO: Use the signed message length for secp messages // TODO: Use the signed message length for secp messages

View File

@ -33,62 +33,34 @@ var execution *executionEnv
// implementation of vm executor with simple sanity check preventing use after free. // implementation of vm executor with simple sanity check preventing use after free.
type vmExecutor struct { type vmExecutor struct {
lk sync.RWMutex vmi Interface
vmi Interface lane ExecutionLane
token *executionToken
done bool
} }
var _ Executor = (*vmExecutor)(nil) var _ Interface = (*vmExecutor)(nil)
func newVMExecutor(vmi Interface, token *executionToken) Executor { func newVMExecutor(vmi Interface, lane ExecutionLane) Interface {
return &vmExecutor{vmi: vmi, token: token} return &vmExecutor{vmi: vmi, lane: lane}
} }
func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) { func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) {
e.lk.RLock() token := execution.getToken(e.lane)
defer e.lk.RUnlock() defer token.Done()
if e.done {
return nil, ErrExecutorDone
}
return e.vmi.ApplyMessage(ctx, cmsg) return e.vmi.ApplyMessage(ctx, cmsg)
} }
func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) { func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
e.lk.RLock() token := execution.getToken(e.lane)
defer e.lk.RUnlock() defer token.Done()
if e.done {
return nil, ErrExecutorDone
}
return e.vmi.ApplyImplicitMessage(ctx, msg) return e.vmi.ApplyImplicitMessage(ctx, msg)
} }
func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) { func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) {
e.lk.RLock()
defer e.lk.RUnlock()
if e.done {
return cid.Undef, ErrExecutorDone
}
return e.vmi.Flush(ctx) return e.vmi.Flush(ctx)
} }
func (e *vmExecutor) Done() {
e.lk.Lock()
defer e.lk.Unlock()
if !e.done {
e.token.Done()
e.token = nil
e.done = true
}
}
type executionToken struct { type executionToken struct {
lane ExecutionLane lane ExecutionLane
reserved int reserved int

View File

@ -37,17 +37,6 @@ type Interface interface {
Flush(ctx context.Context) (cid.Cid, error) Flush(ctx context.Context) (cid.Cid, error)
} }
// Executor is the general vm execution interface, which is prioritized according to execution lanes.
// User must call Done when it is done with this executor to release resource holds by the execution
// environment
type Executor interface {
Interface
// Done must be called when done with the executor to release resource holds.
// It is an error to invoke Interface methods after Done has been called.
Done()
}
// WARNING: You will not affect your node's execution by misusing this feature, but you will confuse yourself thoroughly! // WARNING: You will not affect your node's execution by misusing this feature, but you will confuse yourself thoroughly!
// An envvar that allows the user to specify debug actors bundles to be used by the FVM // An envvar that allows the user to specify debug actors bundles to be used by the FVM
// alongside regular execution. This is basically only to be used to print out specific logging information. // alongside regular execution. This is basically only to be used to print out specific logging information.
@ -65,20 +54,17 @@ func makeVM(ctx context.Context, opts *VMOpts) (Interface, error) {
return NewLegacyVM(ctx, opts) return NewLegacyVM(ctx, opts)
} }
func NewVM(ctx context.Context, opts *VMOpts) (Executor, error) { func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) {
switch opts.ExecutionLane { switch opts.ExecutionLane {
case ExecutionLaneDefault, ExecutionLanePriority: case ExecutionLaneDefault, ExecutionLanePriority:
default: default:
return nil, fmt.Errorf("invalid execution lane: %d", opts.ExecutionLane) return nil, fmt.Errorf("invalid execution lane: %d", opts.ExecutionLane)
} }
token := execution.getToken(opts.ExecutionLane)
vmi, err := makeVM(ctx, opts) vmi, err := makeVM(ctx, opts)
if err != nil { if err != nil {
token.Done()
return nil, err return nil, err
} }
return newVMExecutor(vmi, token), nil return newVMExecutor(vmi, opts.ExecutionLane), nil
} }