chain events: Make CalledHandler revert aware

This commit is contained in:
Łukasz Magiera 2019-09-04 22:22:25 +02:00
parent f02119168e
commit afcb35e969
2 changed files with 49 additions and 12 deletions

View File

@ -17,10 +17,13 @@ import (
// `curH`-`ts.Height` = `confidence` // `curH`-`ts.Height` = `confidence`
type CalledHandler func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) type CalledHandler func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error)
// CheckFunc is used before one-shoot callbacks for atomicity // CheckFunc is used for atomicity guarantees. If the condition the callbacks
// guarantees. If the condition the callbacks wait for has already happened in // wait for has already happened in tipset `ts`
// tipset `ts`, this function MUST return true //
type CheckFunc func(ts *types.TipSet) (bool, error) // If `done` is true, timeout won't be triggered
// If `more` is false, no messages will be sent to CalledHandler (RevertHandler
// may still be called)
type CheckFunc func(ts *types.TipSet) (done bool, more bool, err error)
type callHandler struct { type callHandler struct {
confidence int confidence int
@ -251,14 +254,12 @@ func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHand
e.lk.Lock() e.lk.Lock()
defer e.lk.Unlock() defer e.lk.Unlock()
// TODO: this should use older tipset, and take reverts into account done, more, err := check(e.tsc.best())
done, err := check(e.tsc.best())
if err != nil { if err != nil {
return err return err
} }
if done { if done {
// Already happened, don't bother registering callback timeout = math.MaxUint64
return nil
} }
id := e.ctr id := e.ctr
@ -268,6 +269,8 @@ func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHand
confidence: confidence, confidence: confidence,
timeout: timeout + uint64(confidence), timeout: timeout + uint64(confidence),
disabled: !more,
handle: hnd, handle: hnd,
revert: rev, revert: rev,
} }

View File

@ -197,8 +197,8 @@ func TestCalled(t *testing.T) {
var appliedTs *types.TipSet var appliedTs *types.TipSet
var appliedH uint64 var appliedH uint64
err = events.Called(func(ts *types.TipSet) (b bool, e error) { err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, nil return false, true, nil
}, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) { }, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) {
applied = true applied = true
appliedMsg = msg appliedMsg = msg
@ -390,8 +390,8 @@ func TestCalledTimeout(t *testing.T) {
called := false called := false
err = events.Called(func(ts *types.TipSet) (b bool, e error) { err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, nil return false, true, nil
}, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) { }, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) {
called = true called = true
require.Nil(t, msg) require.Nil(t, msg)
@ -409,4 +409,38 @@ func TestCalledTimeout(t *testing.T) {
fcs.advance(0, 5, nil) fcs.advance(0, 5, nil)
require.True(t, called) require.True(t, called)
called = false
// with check func reporting done
fcs = &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
tsc: newTSCache(2 * build.ForkLengthThreshold),
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events = NewEvents(fcs)
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
}, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) {
called = true
require.Nil(t, msg)
require.Equal(t, uint64(20), ts.Height())
require.Equal(t, uint64(23), curH)
return false, nil
}, func(ts *types.TipSet) error {
t.Fatal("revert on timeout")
return nil
}, 3, 20, t0123, 5)
require.NoError(t, err)
fcs.advance(0, 21, nil)
require.False(t, called)
fcs.advance(0, 5, nil)
require.False(t, called)
} }