v1.27.0-a #10

Closed
jonathanface wants to merge 473 commits from v1.27.0-a into master
Showing only changes of commit 20de759aee - Show all commits

View File

@ -41,14 +41,14 @@ func newVMExecutor(vmi Interface, lane ExecutionLane) Interface {
} }
func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) { func (e *vmExecutor) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) {
token := execution.getToken(e.lane) token := execution.getToken(ctx, e.lane)
defer token.Done() defer token.Done()
return e.vmi.ApplyMessage(ctx, cmsg) return e.vmi.ApplyMessage(ctx, cmsg)
} }
func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) { func (e *vmExecutor) ApplyImplicitMessage(ctx context.Context, msg *types.Message) (*ApplyRet, error) {
token := execution.getToken(e.lane) token := execution.getToken(ctx, e.lane)
defer token.Done() defer token.Done()
return e.vmi.ApplyImplicitMessage(ctx, msg) return e.vmi.ApplyImplicitMessage(ctx, msg)
@ -61,6 +61,7 @@ func (e *vmExecutor) Flush(ctx context.Context) (cid.Cid, error) {
type executionToken struct { type executionToken struct {
lane ExecutionLane lane ExecutionLane
reserved int reserved int
ctx context.Context
} }
func (token *executionToken) Done() { func (token *executionToken) Done() {
@ -77,78 +78,69 @@ type executionEnv struct {
reserved int reserved int
} }
func (e *executionEnv) getToken(lane ExecutionLane) *executionToken { func (e *executionEnv) getToken(ctx context.Context, lane ExecutionLane) *executionToken {
metricsUp(metrics.VMExecutionWaiting, lane) metricsUp(ctx, metrics.VMExecutionWaiting, lane)
defer metricsDown(metrics.VMExecutionWaiting, lane) defer metricsDown(ctx, metrics.VMExecutionWaiting, lane)
e.mx.Lock() e.mx.Lock()
defer e.mx.Unlock()
switch lane { reserving := 0
case ExecutionLaneDefault: if lane == ExecutionLaneDefault {
for e.available <= e.reserved { for e.available <= e.reserved {
e.cond.Wait() e.cond.Wait()
} }
e.available-- } else {
metricsUp(metrics.VMExecutionRunning, lane)
return &executionToken{lane: lane, reserved: 0}
case ExecutionLanePriority:
for e.available == 0 { for e.available == 0 {
e.cond.Wait() e.cond.Wait()
} }
e.available--
reserving := 0
if e.reserved > 0 { if e.reserved > 0 {
e.reserved-- e.reserved--
reserving = 1 reserving = 1
} }
metricsUp(metrics.VMExecutionRunning, lane)
return &executionToken{lane: lane, reserved: reserving}
default:
// already checked at interface boundary in NewVM, so this is appropriate
panic("bogus execution lane")
} }
e.available--
e.mx.Unlock()
metricsUp(ctx, metrics.VMExecutionRunning, lane)
return &executionToken{lane: lane, reserved: reserving, ctx: ctx}
} }
func (e *executionEnv) putToken(token *executionToken) { func (e *executionEnv) putToken(token *executionToken) {
e.mx.Lock() e.mx.Lock()
defer e.mx.Unlock()
e.available++ e.available++
e.reserved += token.reserved e.reserved += token.reserved
// Note: Signal is unsound, because a priority token could wake up a non-priority // Note: Signal is unsound, because a priority token could wake up a non-priority
// goroutnie and lead to deadlock. So Broadcast it must be. // goroutine and lead to deadlock. So Broadcast it must be.
e.cond.Broadcast() e.cond.Broadcast()
e.mx.Unlock()
metricsDown(metrics.VMExecutionRunning, token.lane) metricsDown(token.ctx, metrics.VMExecutionRunning, token.lane)
} }
func metricsUp(metric *stats.Int64Measure, lane ExecutionLane) { func metricsUp(ctx context.Context, metric *stats.Int64Measure, lane ExecutionLane) {
metricsAdjust(metric, lane, 1) metricsAdjust(ctx, metric, lane, 1)
} }
func metricsDown(metric *stats.Int64Measure, lane ExecutionLane) { func metricsDown(ctx context.Context, metric *stats.Int64Measure, lane ExecutionLane) {
metricsAdjust(metric, lane, -1) metricsAdjust(ctx, metric, lane, -1)
} }
func metricsAdjust(metric *stats.Int64Measure, lane ExecutionLane, delta int) { var (
laneName := "default" defaultLaneTag = tag.Upsert(metrics.ExecutionLane, "default")
priorityLaneTag = tag.Upsert(metrics.ExecutionLane, "priority")
)
func metricsAdjust(ctx context.Context, metric *stats.Int64Measure, lane ExecutionLane, delta int) {
laneTag := defaultLaneTag
if lane > ExecutionLaneDefault { if lane > ExecutionLaneDefault {
laneName = "priority" laneTag = priorityLaneTag
} }
ctx, _ := tag.New( ctx, _ = tag.New(ctx, laneTag)
context.Background(),
tag.Upsert(metrics.ExecutionLane, laneName),
)
stats.Record(ctx, metric.M(int64(delta))) stats.Record(ctx, metric.M(int64(delta)))
} }