From 9175a1be557e48e76924d8f2be8907f2a2dd2229 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 29 Mar 2023 16:45:45 +0300 Subject: [PATCH] refactor execution lanes: hide the lock --- chain/consensus/compute_state.go | 9 ++----- chain/gen/genesis/genesis.go | 1 - chain/gen/genesis/miners.go | 7 +---- chain/stmgr/call.go | 3 --- chain/stmgr/stmgr.go | 8 +++--- chain/stmgr/utils.go | 1 - chain/vm/execution.go | 46 +++++++------------------------- chain/vm/vmi.go | 18 ++----------- 8 files changed, 18 insertions(+), 75 deletions(-) diff --git a/chain/consensus/compute_state.go b/chain/consensus/compute_state.go index 449a0ed20..8442b55e9 100644 --- a/chain/consensus/compute_state.go +++ b/chain/consensus/compute_state.go @@ -93,7 +93,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, }() ctx = blockstore.WithHotView(ctx) - makeVm := func(base cid.Cid, e abi.ChainEpoch, timestamp uint64) (vm.Executor, error) { + makeVm := func(base cid.Cid, e abi.ChainEpoch, timestamp uint64) (vm.Interface, error) { vmopt := &vm.VMOpts{ StateBase: base, Epoch: e, @@ -117,7 +117,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, var cronGas int64 - runCron := func(vmCron vm.Executor, epoch abi.ChainEpoch) error { + runCron := func(vmCron vm.Interface, epoch abi.ChainEpoch) error { cronMsg := &types.Message{ To: cron.Address, From: builtin.SystemActorAddr, @@ -170,17 +170,13 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, // run cron for null rounds if any if err = runCron(vmCron, i); err != nil { - vmCron.Done() return cid.Undef, cid.Undef, xerrors.Errorf("running cron: %w", err) } pstate, err = vmCron.Flush(ctx) if err != nil { - vmCron.Done() return cid.Undef, cid.Undef, xerrors.Errorf("flushing cron vm: %w", err) } - - vmCron.Done() } // handle state forks @@ -201,7 +197,6 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context, if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("making vm: %w", err) } - defer vmi.Done() var ( receipts []*types.MessageReceipt diff --git a/chain/gen/genesis/genesis.go b/chain/gen/genesis/genesis.go index 3ef8de968..3e8848021 100644 --- a/chain/gen/genesis/genesis.go +++ b/chain/gen/genesis/genesis.go @@ -496,7 +496,6 @@ func VerifyPreSealedData(ctx context.Context, cs *store.ChainStore, sys vm.Sysca if err != nil { return cid.Undef, xerrors.Errorf("failed to create VM: %w", err) } - defer vm.Done() for mi, m := range template.Miners { for si, s := range m.Sectors { diff --git a/chain/gen/genesis/miners.go b/chain/gen/genesis/miners.go index 09b46a6e7..5f741fd7c 100644 --- a/chain/gen/genesis/miners.go +++ b/chain/gen/genesis/miners.go @@ -88,7 +88,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal return big.Zero(), nil } - newVM := func(base cid.Cid) (vm.Executor, error) { + newVM := func(base cid.Cid) (vm.Interface, error) { vmopt := &vm.VMOpts{ StateBase: base, Epoch: 0, @@ -108,8 +108,6 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal if err != nil { return cid.Undef, fmt.Errorf("creating vm: %w", err) } - // Note: genesisVm is mutated, so this has to happen in a deferred func; go horror show. - defer func() { genesisVm.Done() }() if len(miners) == 0 { return cid.Undef, xerrors.New("no genesis miners") @@ -340,7 +338,6 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal return cid.Undef, xerrors.Errorf("flushing state tree: %w", err) } - genesisVm.Done() genesisVm, err = newVM(nh) if err != nil { return cid.Undef, fmt.Errorf("creating new vm: %w", err) @@ -413,7 +410,6 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal return cid.Undef, xerrors.Errorf("flushing state tree: %w", err) } - genesisVm.Done() genesisVm, err = newVM(nh) if err != nil { return cid.Undef, fmt.Errorf("creating new vm: %w", err) @@ -521,7 +517,6 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sys vm.Syscal return cid.Undef, xerrors.Errorf("flushing state tree: %w", err) } - genesisVm.Done() genesisVm, err = newVM(nh) if err != nil { return cid.Undef, fmt.Errorf("creating new vm: %w", err) diff --git a/chain/stmgr/call.go b/chain/stmgr/call.go index a4d8d4410..cba1b95ab 100644 --- a/chain/stmgr/call.go +++ b/chain/stmgr/call.go @@ -169,8 +169,6 @@ func (sm *StateManager) callInternal(ctx context.Context, msg *types.Message, pr if err != nil { return nil, xerrors.Errorf("failed to set up vm: %w", err) } - // Note: vmi is mutated, so this has to happen in a deferred func; go horror show. - defer func() { vmi.Done() }() for i, m := range priorMsgs { _, err = vmi.ApplyMessage(ctx, m) @@ -204,7 +202,6 @@ func (sm *StateManager) callInternal(ctx context.Context, msg *types.Message, pr vmopt.BaseFee = big.Zero() vmopt.StateBase = stateCid - vmi.Done() vmi, err = sm.newVM(ctx, vmopt) if err != nil { return nil, xerrors.Errorf("failed to set up estimation vm: %w", err) diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 5c6f3bef6..bf10665e7 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -128,7 +128,7 @@ type StateManager struct { compWait map[string]chan struct{} stlk sync.Mutex genesisMsigLk sync.Mutex - newVM func(context.Context, *vm.VMOpts) (vm.Executor, error) + newVM func(context.Context, *vm.VMOpts) (vm.Interface, error) Syscalls vm.SyscallBuilder preIgnitionVesting []msig0.State postIgnitionVesting []msig0.State @@ -459,12 +459,12 @@ func (sm *StateManager) ValidateChain(ctx context.Context, ts *types.TipSet) err return nil } -func (sm *StateManager) SetVMConstructor(nvm func(context.Context, *vm.VMOpts) (vm.Executor, error)) { +func (sm *StateManager) SetVMConstructor(nvm func(context.Context, *vm.VMOpts) (vm.Interface, error)) { sm.newVM = nvm } -func (sm *StateManager) VMConstructor() func(context.Context, *vm.VMOpts) (vm.Executor, error) { - return func(ctx context.Context, opts *vm.VMOpts) (vm.Executor, error) { +func (sm *StateManager) VMConstructor() func(context.Context, *vm.VMOpts) (vm.Interface, error) { + return func(ctx context.Context, opts *vm.VMOpts) (vm.Interface, error) { return sm.newVM(ctx, opts) } } diff --git a/chain/stmgr/utils.go b/chain/stmgr/utils.go index 78129cb16..c93267d50 100644 --- a/chain/stmgr/utils.go +++ b/chain/stmgr/utils.go @@ -106,7 +106,6 @@ func ComputeState(ctx context.Context, sm *StateManager, height abi.ChainEpoch, if err != nil { return cid.Undef, nil, err } - defer vmi.Done() for i, msg := range msgs { // TODO: Use the signed message length for secp messages diff --git a/chain/vm/execution.go b/chain/vm/execution.go index 58f61ca6d..fb86a3a7d 100644 --- a/chain/vm/execution.go +++ b/chain/vm/execution.go @@ -33,62 +33,34 @@ var execution *executionEnv // implementation of vm executor with simple sanity check preventing use after free. type vmExecutor struct { - lk sync.RWMutex - vmi Interface - token *executionToken - done bool + vmi Interface + lane ExecutionLane } -var _ Executor = (*vmExecutor)(nil) +var _ Interface = (*vmExecutor)(nil) -func newVMExecutor(vmi Interface, token *executionToken) Executor { - return &vmExecutor{vmi: vmi, token: token} +func newVMExecutor(vmi Interface, lane ExecutionLane) Interface { + return &vmExecutor{vmi: vmi, lane: lane} } func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) { - e.lk.RLock() - defer e.lk.RUnlock() - - if e.done { - return nil, ErrExecutorDone - } + token := execution.getToken(e.lane) + defer token.Done() return e.vmi.ApplyMessage(ctx, cmsg) } func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) { - e.lk.RLock() - defer e.lk.RUnlock() - - if e.done { - return nil, ErrExecutorDone - } + token := execution.getToken(e.lane) + defer token.Done() return e.vmi.ApplyImplicitMessage(ctx, msg) } func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) { - e.lk.RLock() - defer e.lk.RUnlock() - - if e.done { - return cid.Undef, ErrExecutorDone - } - return e.vmi.Flush(ctx) } -func (e *vmExecutor) Done() { - e.lk.Lock() - defer e.lk.Unlock() - - if !e.done { - e.token.Done() - e.token = nil - e.done = true - } -} - type executionToken struct { lane ExecutionLane reserved int diff --git a/chain/vm/vmi.go b/chain/vm/vmi.go index a19c38fce..042621ca2 100644 --- a/chain/vm/vmi.go +++ b/chain/vm/vmi.go @@ -37,17 +37,6 @@ type Interface interface { Flush(ctx context.Context) (cid.Cid, error) } -// Executor is the general vm execution interface, which is prioritized according to execution lanes. -// User must call Done when it is done with this executor to release resource holds by the execution -// environment -type Executor interface { - Interface - - // Done must be called when done with the executor to release resource holds. - // It is an error to invoke Interface methods after Done has been called. - Done() -} - // WARNING: You will not affect your node's execution by misusing this feature, but you will confuse yourself thoroughly! // An envvar that allows the user to specify debug actors bundles to be used by the FVM // alongside regular execution. This is basically only to be used to print out specific logging information. @@ -65,20 +54,17 @@ func makeVM(ctx context.Context, opts *VMOpts) (Interface, error) { return NewLegacyVM(ctx, opts) } -func NewVM(ctx context.Context, opts *VMOpts) (Executor, error) { +func NewVM(ctx context.Context, opts *VMOpts) (Interface, error) { switch opts.ExecutionLane { case ExecutionLaneDefault, ExecutionLanePriority: default: return nil, fmt.Errorf("invalid execution lane: %d", opts.ExecutionLane) } - token := execution.getToken(opts.ExecutionLane) - vmi, err := makeVM(ctx, opts) if err != nil { - token.Done() return nil, err } - return newVMExecutor(vmi, token), nil + return newVMExecutor(vmi, opts.ExecutionLane), nil }