events: Plumb context to callbacks

This commit is contained in:
Łukasz Magiera 2019-11-05 15:03:59 +01:00
parent 82be4cd77e
commit eb28c45c00
8 changed files with 51 additions and 46 deletions

View File

@ -18,8 +18,8 @@ import (
var log = logging.Logger("events") var log = logging.Logger("events")
// `curH`-`ts.Height` = `confidence` // `curH`-`ts.Height` = `confidence`
type HeightHandler func(ts *types.TipSet, curH uint64) error type HeightHandler func(ctx context.Context, ts *types.TipSet, curH uint64) error
type RevertHandler func(ts *types.TipSet) error type RevertHandler func(ctx context.Context, ts *types.TipSet) error
type heightHandler struct { type heightHandler struct {
confidence int confidence int
@ -59,6 +59,7 @@ func NewEvents(ctx context.Context, api eventApi) *Events {
heightEvents: heightEvents{ heightEvents: heightEvents{
tsc: tsc, tsc: tsc,
ctx: ctx,
gcConfidence: uint64(gcConfidence), gcConfidence: uint64(gcConfidence),
heightTriggers: map[uint64]*heightHandler{}, heightTriggers: map[uint64]*heightHandler{},
@ -69,6 +70,7 @@ func NewEvents(ctx context.Context, api eventApi) *Events {
calledEvents: calledEvents{ calledEvents: calledEvents{
cs: api, cs: api,
tsc: tsc, tsc: tsc,
ctx: ctx,
gcConfidence: uint64(gcConfidence), gcConfidence: uint64(gcConfidence),
confQueue: map[triggerH]map[msgH][]*queuedEvent{}, confQueue: map[triggerH]map[msgH][]*queuedEvent{},

View File

@ -56,6 +56,7 @@ type queuedEvent struct {
type calledEvents struct { type calledEvents struct {
cs eventApi cs eventApi
tsc *tipSetCache tsc *tipSetCache
ctx context.Context
gcConfidence uint64 gcConfidence uint64
lk sync.Mutex lk sync.Mutex
@ -114,7 +115,7 @@ func (e *calledEvents) handleReverts(ts *types.TipSet) {
trigger := e.triggers[event.trigger] trigger := e.triggers[event.trigger]
if err := trigger.revert(ts); err != nil { if err := trigger.revert(e.ctx, ts); err != nil {
log.Errorf("reverting chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, ts.Height(), triggerH, err) log.Errorf("reverting chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, ts.Height(), triggerH, err)
} }
} }

View File

@ -1,6 +1,7 @@
package events package events
import ( import (
"context"
"sync" "sync"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
@ -17,6 +18,8 @@ type heightEvents struct {
htTriggerHeights map[triggerH][]triggerId htTriggerHeights map[triggerH][]triggerId
htHeights map[msgH][]triggerId htHeights map[msgH][]triggerId
ctx context.Context
} }
func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error { func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
@ -26,7 +29,7 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
revert := func(h uint64, ts *types.TipSet) { revert := func(h uint64, ts *types.TipSet) {
for _, tid := range e.htHeights[h] { for _, tid := range e.htHeights[h] {
err := e.heightTriggers[tid].revert(ts) err := e.heightTriggers[tid].revert(e.ctx, ts)
if err != nil { if err != nil {
log.Errorf("reverting chain trigger (@H %d): %s", h, err) log.Errorf("reverting chain trigger (@H %d): %s", h, err)
} }
@ -74,7 +77,7 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
return err return err
} }
if err := hnd.handle(incTs, h); err != nil { if err := hnd.handle(e.ctx, incTs, h); err != nil {
log.Errorf("chain trigger (@H %d, called @ %d) failed: %s", triggerH, ts.Height(), err) log.Errorf("chain trigger (@H %d, called @ %d) failed: %s", triggerH, ts.Height(), err)
} }
} }
@ -125,7 +128,7 @@ func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence
} }
e.lk.Unlock() e.lk.Unlock()
if err := hnd(ts, bestH); err != nil { if err := hnd(e.ctx, ts, bestH); err != nil {
return err return err
} }
e.lk.Lock() e.lk.Lock()

View File

@ -183,12 +183,12 @@ func TestAt(t *testing.T) {
var applied bool var applied bool
var reverted bool var reverted bool
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error { err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH uint64) error {
require.Equal(t, 5, int(ts.Height())) require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 8, int(curH)) require.Equal(t, 8, int(curH))
applied = true applied = true
return nil return nil
}, func(ts *types.TipSet) error { }, func(_ context.Context, ts *types.TipSet) error {
reverted = true reverted = true
return nil return nil
}, 3, 5) }, 3, 5)
@ -248,12 +248,12 @@ func TestAtNullTrigger(t *testing.T) {
var applied bool var applied bool
var reverted bool var reverted bool
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error { err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH uint64) error {
require.Equal(t, uint64(6), ts.Height()) require.Equal(t, uint64(6), ts.Height())
require.Equal(t, 8, int(curH)) require.Equal(t, 8, int(curH))
applied = true applied = true
return nil return nil
}, func(ts *types.TipSet) error { }, func(_ context.Context, ts *types.TipSet) error {
reverted = true reverted = true
return nil return nil
}, 3, 5) }, 3, 5)
@ -282,12 +282,12 @@ func TestAtNullConf(t *testing.T) {
var applied bool var applied bool
var reverted bool var reverted bool
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error { err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH uint64) error {
require.Equal(t, 5, int(ts.Height())) require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 8, int(curH)) require.Equal(t, 8, int(curH))
applied = true applied = true
return nil return nil
}, func(ts *types.TipSet) error { }, func(_ context.Context, ts *types.TipSet) error {
reverted = true reverted = true
return nil return nil
}, 3, 5) }, 3, 5)
@ -323,12 +323,12 @@ func TestAtStart(t *testing.T) {
var applied bool var applied bool
var reverted bool var reverted bool
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error { err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH uint64) error {
require.Equal(t, 5, int(ts.Height())) require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 8, int(curH)) require.Equal(t, 8, int(curH))
applied = true applied = true
return nil return nil
}, func(ts *types.TipSet) error { }, func(_ context.Context, ts *types.TipSet) error {
reverted = true reverted = true
return nil return nil
}, 3, 5) }, 3, 5)
@ -357,12 +357,12 @@ func TestAtStartConfidence(t *testing.T) {
var applied bool var applied bool
var reverted bool var reverted bool
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error { err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH uint64) error {
require.Equal(t, 5, int(ts.Height())) require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 11, int(curH)) require.Equal(t, 11, int(curH))
applied = true applied = true
return nil return nil
}, func(ts *types.TipSet) error { }, func(_ context.Context, ts *types.TipSet) error {
reverted = true reverted = true
return nil return nil
}, 3, 5) }, 3, 5)
@ -385,16 +385,16 @@ func TestAtChained(t *testing.T) {
var applied bool var applied bool
var reverted bool var reverted bool
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error { err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH uint64) error {
return events.ChainAt(func(ts *types.TipSet, curH uint64) error { return events.ChainAt(func(_ context.Context, ts *types.TipSet, curH uint64) error {
require.Equal(t, 10, int(ts.Height())) require.Equal(t, 10, int(ts.Height()))
applied = true applied = true
return nil return nil
}, func(ts *types.TipSet) error { }, func(_ context.Context, ts *types.TipSet) error {
reverted = true reverted = true
return nil return nil
}, 3, 10) }, 3, 10)
}, func(ts *types.TipSet) error { }, func(_ context.Context, ts *types.TipSet) error {
reverted = true reverted = true
return nil return nil
}, 3, 5) }, 3, 5)
@ -421,16 +421,16 @@ func TestAtChainedConfidence(t *testing.T) {
var applied bool var applied bool
var reverted bool var reverted bool
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error { err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH uint64) error {
return events.ChainAt(func(ts *types.TipSet, curH uint64) error { return events.ChainAt(func(_ context.Context, ts *types.TipSet, curH uint64) error {
require.Equal(t, 10, int(ts.Height())) require.Equal(t, 10, int(ts.Height()))
applied = true applied = true
return nil return nil
}, func(ts *types.TipSet) error { }, func(_ context.Context, ts *types.TipSet) error {
reverted = true reverted = true
return nil return nil
}, 3, 10) }, 3, 10)
}, func(ts *types.TipSet) error { }, func(_ context.Context, ts *types.TipSet) error {
reverted = true reverted = true
return nil return nil
}, 3, 5) }, 3, 5)
@ -455,11 +455,11 @@ func TestAtChainedConfidenceNull(t *testing.T) {
var applied bool var applied bool
var reverted bool var reverted bool
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error { err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH uint64) error {
applied = true applied = true
require.Equal(t, 6, int(ts.Height())) require.Equal(t, 6, int(ts.Height()))
return nil return nil
}, func(ts *types.TipSet) error { }, func(_ context.Context, ts *types.TipSet) error {
reverted = true reverted = true
return nil return nil
}, 3, 5) }, 3, 5)
@ -499,7 +499,7 @@ func TestCalled(t *testing.T) {
appliedTs = ts appliedTs = ts
appliedH = curH appliedH = curH
return more, nil return more, nil
}, func(ts *types.TipSet) error { }, func(_ context.Context, ts *types.TipSet) error {
reverted = true reverted = true
return nil return nil
}, 3, 20, t0123, 5) }, 3, 20, t0123, 5)
@ -693,7 +693,7 @@ func TestCalledTimeout(t *testing.T) {
require.Equal(t, uint64(20), ts.Height()) require.Equal(t, uint64(20), ts.Height())
require.Equal(t, uint64(23), curH) require.Equal(t, uint64(23), curH)
return false, nil return false, nil
}, func(ts *types.TipSet) error { }, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout") t.Fatal("revert on timeout")
return nil return nil
}, 3, 20, t0123, 5) }, 3, 20, t0123, 5)
@ -728,7 +728,7 @@ func TestCalledTimeout(t *testing.T) {
require.Equal(t, uint64(20), ts.Height()) require.Equal(t, uint64(20), ts.Height())
require.Equal(t, uint64(23), curH) require.Equal(t, uint64(23), curH)
return false, nil return false, nil
}, func(ts *types.TipSet) error { }, func(_ context.Context, ts *types.TipSet) error {
t.Fatal("revert on timeout") t.Fatal("revert on timeout")
return nil return nil
}, 3, 20, t0123, 5) }, 3, 20, t0123, 5)
@ -774,7 +774,7 @@ func TestCalledOrder(t *testing.T) {
} }
at++ at++
return true, nil return true, nil
}, func(ts *types.TipSet) error { }, func(_ context.Context, ts *types.TipSet) error {
switch at { switch at {
case 2: case 2:
require.Equal(t, uint64(4), ts.Height()) require.Equal(t, uint64(4), ts.Height())

View File

@ -58,7 +58,7 @@ type Config struct {
func New(cfg *Config) (*SectorBuilder, error) { func New(cfg *Config) (*SectorBuilder, error) {
if cfg.WorkerThreads <= PoStReservedWorkers { if cfg.WorkerThreads <= PoStReservedWorkers {
return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers + 1, cfg.WorkerThreads) return nil, xerrors.Errorf("minimum worker threads is %d, specified %d", PoStReservedWorkers+1, cfg.WorkerThreads)
} }
proverId := addressToProverID(cfg.Miner) proverId := addressToProverID(cfg.Miner)
@ -69,8 +69,8 @@ func New(cfg *Config) (*SectorBuilder, error) {
} }
return &SectorBuilder{ return &SectorBuilder{
handle: sbp, handle: sbp,
rateLimit: make(chan struct{}, cfg.WorkerThreads - PoStReservedWorkers), rateLimit: make(chan struct{}, cfg.WorkerThreads-PoStReservedWorkers),
}, nil }, nil
} }

View File

@ -34,8 +34,9 @@ export default {
[code.miner]: [ [code.miner]: [
"Send", "Send",
"Constructor", "Constructor",
"CommitSector", "PreCommitSector",
"SubmitPost", "ProveCommitSector",
"SubmitPoSt",
"SlashStorageFault", "SlashStorageFault",
"GetCurrentProvingSet", "GetCurrentProvingSet",
"ArbitrateDeal", "ArbitrateDeal",
@ -49,8 +50,8 @@ export default {
"ChangeWorker", "ChangeWorker",
"IsSlashed", "IsSlashed",
"IsLate", "IsLate",
"PaymentVerifyInclusion", "DeclareFaults",
"PaymentVerifySector", "SlashConsensusFault",
], ],
[code.multisig]: [ [code.multisig]: [
"Send", "Send",

View File

@ -45,7 +45,7 @@ func (m *Miner) beginPosting(ctx context.Context) {
m.postLk.Unlock() m.postLk.Unlock()
log.Infof("Scheduling post at height %d", ppe-build.PoStChallangeTime) log.Infof("Scheduling post at height %d", ppe-build.PoStChallangeTime)
err = m.events.ChainAt(m.computePost(m.schedPost), func(ts *types.TipSet) error { // Revert err = m.events.ChainAt(m.computePost(m.schedPost), func(ctx context.Context, ts *types.TipSet) error { // Revert
// TODO: Cancel post // TODO: Cancel post
log.Errorf("TODO: Cancel PoSt, re-run") log.Errorf("TODO: Cancel PoSt, re-run")
return nil return nil
@ -84,7 +84,7 @@ func (m *Miner) scheduleNextPost(ppe uint64) {
log.Infow("scheduling PoSt", "post-height", ppe-build.PoStChallangeTime, log.Infow("scheduling PoSt", "post-height", ppe-build.PoStChallangeTime,
"height", ts.Height(), "ppe", ppe, "proving-period", provingPeriod) "height", ts.Height(), "ppe", ppe, "proving-period", provingPeriod)
err = m.events.ChainAt(m.computePost(ppe), func(ts *types.TipSet) error { // Revert err = m.events.ChainAt(m.computePost(ppe), func(ctx context.Context, ts *types.TipSet) error { // Revert
// TODO: Cancel post // TODO: Cancel post
log.Errorf("TODO: Cancel PoSt, re-run") log.Errorf("TODO: Cancel PoSt, re-run")
return nil return nil
@ -96,9 +96,9 @@ func (m *Miner) scheduleNextPost(ppe uint64) {
} }
} }
func (m *Miner) computePost(ppe uint64) func(ts *types.TipSet, curH uint64) error { func (m *Miner) computePost(ppe uint64) func(ctx context.Context, ts *types.TipSet, curH uint64) error {
called := 0 called := 0
return func(ts *types.TipSet, curH uint64) error { return func(ctx context.Context, ts *types.TipSet, curH uint64) error {
called++ called++
if called > 1 { if called > 1 {
log.Errorw("BUG: computePost callback called again", "ppe", ppe, log.Errorw("BUG: computePost callback called again", "ppe", ppe,
@ -106,8 +106,6 @@ func (m *Miner) computePost(ppe uint64) func(ts *types.TipSet, curH uint64) erro
return nil return nil
} }
ctx := context.TODO()
sset, err := m.api.StateMinerProvingSet(ctx, m.maddr, ts) sset, err := m.api.StateMinerProvingSet(ctx, m.maddr, ts)
if err != nil { if err != nil {
return xerrors.Errorf("failed to get proving set for miner: %w", err) return xerrors.Errorf("failed to get proving set for miner: %w", err)

View File

@ -98,9 +98,9 @@ func (m *Miner) preCommitted(ctx context.Context, sector SectorInfo) (func(*Sect
} }
randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay - 1 // -1 because of how the messages are applied randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay - 1 // -1 because of how the messages are applied
log.Infof("precommit for sector %d made it on chain, will start post computation at height %d", sector.SectorID, randHeight) log.Infof("precommit for sector %d made it on chain, will start proof computation at height %d", sector.SectorID, randHeight)
err = m.events.ChainAt(func(ts *types.TipSet, curH uint64) error { err = m.events.ChainAt(func(ctx context.Context, ts *types.TipSet, curH uint64) error {
m.sectorUpdated <- sectorUpdate{ m.sectorUpdated <- sectorUpdate{
newState: api.Committing, newState: api.Committing,
id: sector.SectorID, id: sector.SectorID,
@ -111,7 +111,7 @@ func (m *Miner) preCommitted(ctx context.Context, sector SectorInfo) (func(*Sect
} }
return nil return nil
}, func(ts *types.TipSet) error { }, func(ctx context.Context, ts *types.TipSet) error {
log.Warn("revert in interactive commit sector step") log.Warn("revert in interactive commit sector step")
return nil return nil
}, 3, mw.TipSet.Height()+build.InteractivePoRepDelay) }, 3, mw.TipSet.Height()+build.InteractivePoRepDelay)