chain events: Timeouts, cancellation

This commit is contained in:
Łukasz Magiera 2019-09-04 21:41:34 +02:00
parent 56b5574a72
commit e4c73e9fd2
3 changed files with 149 additions and 11 deletions

View File

@ -67,8 +67,9 @@ func NewEvents(cs eventChainStore) *Events {
confQueue: map[uint64]map[uint64][]*queuedEvent{},
revertQueue: map[uint64][]uint64{},
triggers: map[uint64]callHandler{},
triggers: map[uint64]*callHandler{},
callTuples: map[callTuple][]uint64{},
timeouts: map[uint64]map[uint64]int{},
},
}

View File

@ -15,7 +15,7 @@ import (
// `ts` is the tipset, in which the `msg` is included.
// `curH`-`ts.Height` = `confidence`
type CalledHandler func(msg *types.Message, ts *types.TipSet, curH uint64) error
type CalledHandler func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error)
// CheckFunc is used before one-shoot callbacks for atomicity
// guarantees. If the condition the callbacks wait for has already happened in
@ -26,6 +26,8 @@ type callHandler struct {
confidence int
timeout uint64
disabled bool // TODO: GC after gcConfidence reached
handle CalledHandler
revert RevertHandler
}
@ -47,6 +49,9 @@ type calledEvents struct {
ctr uint64
triggers map[uint64]*callHandler
callTuples map[callTuple][]uint64
// maps block heights to events
// [triggerH][msgH][event]
confQueue map[uint64]map[uint64][]*queuedEvent
@ -54,8 +59,8 @@ type calledEvents struct {
// [msgH][triggerH]
revertQueue map[uint64][]uint64
triggers map[uint64]callHandler
callTuples map[callTuple][]uint64
// [timeoutH+confidence][triggerId]{calls}
timeouts map[uint64]map[uint64]int
}
type callTuple struct {
@ -77,6 +82,7 @@ func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error {
}
e.applyWithConfidence(ts)
e.applyTimeouts(ts)
}
return nil
@ -160,17 +166,58 @@ func (e *calledEvents) applyWithConfidence(ts *types.TipSet) {
for _, event := range events {
trigger := e.triggers[event.trigger]
if trigger.disabled {
continue
}
if err := trigger.handle(event.msg, triggerTs, ts.Height()); err != nil {
more, err := trigger.handle(event.msg, triggerTs, ts.Height())
if err != nil {
log.Errorf("chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, origH, ts.Height(), err)
continue // don't revert failed calls
}
event.called = true
touts, ok := e.timeouts[trigger.timeout]
if ok {
touts[event.trigger]++
}
trigger.disabled = !more
}
}
}
func (e *calledEvents) applyTimeouts(ts *types.TipSet) {
triggers, ok := e.timeouts[ts.Height()]
if !ok {
return // nothing to do
}
for triggerId, calls := range triggers {
if calls > 0 {
continue // don't timeout if the method was called
}
trigger := e.triggers[triggerId]
if trigger.disabled {
continue
}
timeoutTs, err := e.tsc.get(ts.Height() - uint64(trigger.confidence))
if err != nil {
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-uint64(trigger.confidence), ts.Height())
}
more, err := trigger.handle(nil, timeoutTs, ts.Height())
if err != nil {
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err)
continue // don't revert failed calls
}
trigger.disabled = !more // allows messages after timeout
}
}
func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) error {
seen := map[cid.Cid]struct{}{}
@ -204,7 +251,7 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa
return nil
}
func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHandler, confidence int, actor address.Address, method uint64) error {
func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHandler, confidence int, timeout uint64, actor address.Address, method uint64) error {
e.lk.Lock()
defer e.lk.Unlock()
@ -221,9 +268,9 @@ func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHand
id := e.ctr
e.ctr++
e.triggers[id] = callHandler{
e.triggers[id] = &callHandler{
confidence: confidence,
timeout: math.MaxUint64, // TODO
timeout: timeout + uint64(confidence),
handle: hnd,
revert: rev,
@ -235,6 +282,13 @@ func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHand
}
e.callTuples[ct] = append(e.callTuples[ct], id)
if timeout != math.MaxUint64 {
if e.timeouts[timeout+uint64(confidence)] == nil {
e.timeouts[timeout+uint64(confidence)] = map[uint64]int{}
}
e.timeouts[timeout+uint64(confidence)][id] = 0
}
return nil
}

View File

@ -191,6 +191,7 @@ func TestCalled(t *testing.T) {
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
more := true
var applied, reverted bool
var appliedMsg *types.Message
var appliedTs *types.TipSet
@ -198,16 +199,16 @@ func TestCalled(t *testing.T) {
err = events.Called(func(ts *types.TipSet) (b bool, e error) {
return false, nil
}, func(msg *types.Message, ts *types.TipSet, curH uint64) error {
}, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) {
applied = true
appliedMsg = msg
appliedTs = ts
appliedH = curH
return nil
return more, nil
}, func(ts *types.TipSet) error {
reverted = true
return nil
}, 3, t0123, 5)
}, 3, 20, t0123, 5)
require.NoError(t, err)
// create few blocks to make sure nothing get's randomly called
@ -326,4 +327,86 @@ func TestCalled(t *testing.T) {
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// test timeout (it's set to 20 in the call to `events.Called` above)
fcs.advance(0, 6, nil) // H=25
require.Equal(t, false, applied) // not calling timeout as we received messages
require.Equal(t, false, reverted)
// test unregistering with more
more = false
fcs.advance(0, 4, map[int]cid.Cid{ // msg at H=26; H=29
0: fcs.fakeMsgs(fakeMsg{
bmsgs: []*types.Message{
{To: t0123, Method: 5, Nonce: 4}, // this signals we don't want more
},
}),
})
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
applied = false
fcs.advance(0, 4, map[int]cid.Cid{ // msg at H=26; H=29
0: fcs.fakeMsgs(fakeMsg{
bmsgs: []*types.Message{
{To: t0123, Method: 5, Nonce: 5},
},
}),
})
require.Equal(t, false, applied) // should not get any further notifications
require.Equal(t, false, reverted)
// revert after disabled
fcs.advance(5, 1, nil) // try reverting msg sent after disabling
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
fcs.advance(5, 1, nil) // try reverting msg sent before disabling
require.Equal(t, false, applied)
require.Equal(t, true, reverted)
}
func TestCalledTimeout(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
tsc: newTSCache(2 * build.ForkLengthThreshold),
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(fcs)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
called := false
err = events.Called(func(ts *types.TipSet) (b bool, e error) {
return false, nil
}, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) {
called = true
require.Nil(t, msg)
require.Equal(t, uint64(20), ts.Height())
require.Equal(t, uint64(23), curH)
return false, nil
}, func(ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, 3, 20, t0123, 5)
require.NoError(t, err)
fcs.advance(0, 21, nil)
require.False(t, called)
fcs.advance(0, 5, nil)
require.True(t, called)
}