diff --git a/chain/store/events.go b/chain/store/events.go index f1adc1bd7..4b85aaa98 100644 --- a/chain/store/events.go +++ b/chain/store/events.go @@ -67,8 +67,9 @@ func NewEvents(cs eventChainStore) *Events { confQueue: map[uint64]map[uint64][]*queuedEvent{}, revertQueue: map[uint64][]uint64{}, - triggers: map[uint64]callHandler{}, + triggers: map[uint64]*callHandler{}, callTuples: map[callTuple][]uint64{}, + timeouts: map[uint64]map[uint64]int{}, }, } diff --git a/chain/store/events_called.go b/chain/store/events_called.go index ccb0b0f35..4c73124c8 100644 --- a/chain/store/events_called.go +++ b/chain/store/events_called.go @@ -15,7 +15,7 @@ import ( // `ts` is the tipset, in which the `msg` is included. // `curH`-`ts.Height` = `confidence` -type CalledHandler func(msg *types.Message, ts *types.TipSet, curH uint64) error +type CalledHandler func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) // CheckFunc is used before one-shoot callbacks for atomicity // guarantees. If the condition the callbacks wait for has already happened in @@ -26,6 +26,8 @@ type callHandler struct { confidence int timeout uint64 + disabled bool // TODO: GC after gcConfidence reached + handle CalledHandler revert RevertHandler } @@ -47,6 +49,9 @@ type calledEvents struct { ctr uint64 + triggers map[uint64]*callHandler + callTuples map[callTuple][]uint64 + // maps block heights to events // [triggerH][msgH][event] confQueue map[uint64]map[uint64][]*queuedEvent @@ -54,8 +59,8 @@ type calledEvents struct { // [msgH][triggerH] revertQueue map[uint64][]uint64 - triggers map[uint64]callHandler - callTuples map[callTuple][]uint64 + // [timeoutH+confidence][triggerId]{calls} + timeouts map[uint64]map[uint64]int } type callTuple struct { @@ -77,6 +82,7 @@ func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error { } e.applyWithConfidence(ts) + e.applyTimeouts(ts) } return nil @@ -160,17 +166,58 @@ func (e *calledEvents) applyWithConfidence(ts *types.TipSet) { for _, event := range events { trigger := e.triggers[event.trigger] + if trigger.disabled { + continue + } - if err := trigger.handle(event.msg, triggerTs, ts.Height()); err != nil { + more, err := trigger.handle(event.msg, triggerTs, ts.Height()) + if err != nil { log.Errorf("chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, origH, ts.Height(), err) continue // don't revert failed calls } event.called = true + + touts, ok := e.timeouts[trigger.timeout] + if ok { + touts[event.trigger]++ + } + + trigger.disabled = !more } } } +func (e *calledEvents) applyTimeouts(ts *types.TipSet) { + triggers, ok := e.timeouts[ts.Height()] + if !ok { + return // nothing to do + } + + for triggerId, calls := range triggers { + if calls > 0 { + continue // don't timeout if the method was called + } + trigger := e.triggers[triggerId] + if trigger.disabled { + continue + } + + timeoutTs, err := e.tsc.get(ts.Height() - uint64(trigger.confidence)) + if err != nil { + log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-uint64(trigger.confidence), ts.Height()) + } + + more, err := trigger.handle(nil, timeoutTs, ts.Height()) + if err != nil { + log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err) + continue // don't revert failed calls + } + + trigger.disabled = !more // allows messages after timeout + } +} + func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) error { seen := map[cid.Cid]struct{}{} @@ -204,7 +251,7 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa return nil } -func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHandler, confidence int, actor address.Address, method uint64) error { +func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHandler, confidence int, timeout uint64, actor address.Address, method uint64) error { e.lk.Lock() defer e.lk.Unlock() @@ -221,9 +268,9 @@ func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHand id := e.ctr e.ctr++ - e.triggers[id] = callHandler{ + e.triggers[id] = &callHandler{ confidence: confidence, - timeout: math.MaxUint64, // TODO + timeout: timeout + uint64(confidence), handle: hnd, revert: rev, @@ -235,6 +282,13 @@ func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHand } e.callTuples[ct] = append(e.callTuples[ct], id) + if timeout != math.MaxUint64 { + if e.timeouts[timeout+uint64(confidence)] == nil { + e.timeouts[timeout+uint64(confidence)] = map[uint64]int{} + } + e.timeouts[timeout+uint64(confidence)][id] = 0 + } + return nil } diff --git a/chain/store/events_test.go b/chain/store/events_test.go index 1516d9ca2..536f588ce 100644 --- a/chain/store/events_test.go +++ b/chain/store/events_test.go @@ -191,6 +191,7 @@ func TestCalled(t *testing.T) { t0123, err := address.NewFromString("t0123") require.NoError(t, err) + more := true var applied, reverted bool var appliedMsg *types.Message var appliedTs *types.TipSet @@ -198,16 +199,16 @@ func TestCalled(t *testing.T) { err = events.Called(func(ts *types.TipSet) (b bool, e error) { return false, nil - }, func(msg *types.Message, ts *types.TipSet, curH uint64) error { + }, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) { applied = true appliedMsg = msg appliedTs = ts appliedH = curH - return nil + return more, nil }, func(ts *types.TipSet) error { reverted = true return nil - }, 3, t0123, 5) + }, 3, 20, t0123, 5) require.NoError(t, err) // create few blocks to make sure nothing get's randomly called @@ -326,4 +327,86 @@ func TestCalled(t *testing.T) { require.Equal(t, false, applied) require.Equal(t, false, reverted) + // test timeout (it's set to 20 in the call to `events.Called` above) + + fcs.advance(0, 6, nil) // H=25 + + require.Equal(t, false, applied) // not calling timeout as we received messages + require.Equal(t, false, reverted) + + // test unregistering with more + + more = false + fcs.advance(0, 4, map[int]cid.Cid{ // msg at H=26; H=29 + 0: fcs.fakeMsgs(fakeMsg{ + bmsgs: []*types.Message{ + {To: t0123, Method: 5, Nonce: 4}, // this signals we don't want more + }, + }), + }) + + require.Equal(t, true, applied) + require.Equal(t, false, reverted) + applied = false + + fcs.advance(0, 4, map[int]cid.Cid{ // msg at H=26; H=29 + 0: fcs.fakeMsgs(fakeMsg{ + bmsgs: []*types.Message{ + {To: t0123, Method: 5, Nonce: 5}, + }, + }), + }) + + require.Equal(t, false, applied) // should not get any further notifications + require.Equal(t, false, reverted) + + // revert after disabled + + fcs.advance(5, 1, nil) // try reverting msg sent after disabling + + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + fcs.advance(5, 1, nil) // try reverting msg sent before disabling + + require.Equal(t, false, applied) + require.Equal(t, true, reverted) +} + +func TestCalledTimeout(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + + msgs: map[cid.Cid]fakeMsg{}, + tsc: newTSCache(2 * build.ForkLengthThreshold), + } + require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) + + events := NewEvents(fcs) + + t0123, err := address.NewFromString("t0123") + require.NoError(t, err) + + called := false + + err = events.Called(func(ts *types.TipSet) (b bool, e error) { + return false, nil + }, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) { + called = true + require.Nil(t, msg) + require.Equal(t, uint64(20), ts.Height()) + require.Equal(t, uint64(23), curH) + return false, nil + }, func(ts *types.TipSet) error { + t.Fatal("revert on timeout") + return nil + }, 3, 20, t0123, 5) + require.NoError(t, err) + + fcs.advance(0, 21, nil) + require.False(t, called) + + fcs.advance(0, 5, nil) + require.True(t, called) }