Co-authored-by: yihuang <huang@crypto.com> Co-authored-by: mmsqe <mavis@crypto.com> Co-authored-by: mmsqe <tqd0800210105@gmail.com> Co-authored-by: Tyler <48813565+technicallyty@users.noreply.github.com>
90 lines
2.2 KiB
Go
90 lines
2.2 KiB
Go
package blockstm
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
)
|
|
|
|
// Executor fields are not mutated during execution.
|
|
type Executor struct {
|
|
ctx context.Context // context for cancellation
|
|
scheduler *Scheduler // scheduler for task management
|
|
txExecutor TxExecutor // callback to actually execute a transaction
|
|
mvMemory *MVMemory // multi-version memory for the executor
|
|
|
|
// index of the executor, used for debugging output
|
|
i int
|
|
}
|
|
|
|
func NewExecutor(
|
|
ctx context.Context,
|
|
scheduler *Scheduler,
|
|
txExecutor TxExecutor,
|
|
mvMemory *MVMemory,
|
|
i int,
|
|
) *Executor {
|
|
return &Executor{
|
|
ctx: ctx,
|
|
scheduler: scheduler,
|
|
txExecutor: txExecutor,
|
|
mvMemory: mvMemory,
|
|
i: i,
|
|
}
|
|
}
|
|
|
|
// Run executes all tasks until completion
|
|
// Invariant `num_active_tasks`:
|
|
// - `NextTask` increases it if returns a valid task.
|
|
// - `TryExecute` and `NeedsReexecution` don't change it if it returns a new valid task to run,
|
|
// otherwise it decreases it.
|
|
func (e *Executor) Run() error {
|
|
var kind TaskKind
|
|
version := InvalidTxnVersion
|
|
for !e.scheduler.Done() {
|
|
if !version.Valid() {
|
|
// check for cancellation
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
version, kind = e.scheduler.NextTask()
|
|
continue
|
|
}
|
|
|
|
switch kind {
|
|
case TaskKindExecution:
|
|
version, kind = e.TryExecute(version)
|
|
case TaskKindValidation:
|
|
version, kind = e.NeedsReexecution(version)
|
|
default:
|
|
return fmt.Errorf("unknown task kind %v", kind)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *Executor) TryExecute(version TxnVersion) (TxnVersion, TaskKind) {
|
|
e.scheduler.executedTxns.Add(1)
|
|
view := e.execute(version.Index)
|
|
wroteNewLocation := e.mvMemory.Record(version, view)
|
|
return e.scheduler.FinishExecution(version, wroteNewLocation)
|
|
}
|
|
|
|
func (e *Executor) NeedsReexecution(version TxnVersion) (TxnVersion, TaskKind) {
|
|
e.scheduler.validatedTxns.Add(1)
|
|
valid := e.mvMemory.ValidateReadSet(version.Index)
|
|
aborted := !valid && e.scheduler.TryValidationAbort(version)
|
|
if aborted {
|
|
e.mvMemory.ConvertWritesToEstimates(version.Index)
|
|
}
|
|
return e.scheduler.FinishValidation(version.Index, aborted)
|
|
}
|
|
|
|
func (e *Executor) execute(txn TxnIndex) *MultiMVMemoryView {
|
|
view := e.mvMemory.View(txn)
|
|
e.txExecutor(txn, view)
|
|
return view
|
|
}
|