fix events API timeout handling for nil blocks (#7184)
This commit is contained in:
parent
1cf556c3a2
commit
1da59fa2fe
@ -145,7 +145,7 @@ func (e *hcEventsObserver) Apply(ctx context.Context, from, to *types.TipSet) er
|
|||||||
// Apply any queued events and timeouts that were targeted at the
|
// Apply any queued events and timeouts that were targeted at the
|
||||||
// current chain height
|
// current chain height
|
||||||
e.applyWithConfidence(ctx, at)
|
e.applyWithConfidence(ctx, at)
|
||||||
e.applyTimeouts(ctx, to)
|
e.applyTimeouts(ctx, at, to)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -242,8 +242,8 @@ func (e *hcEventsObserver) applyWithConfidence(ctx context.Context, height abi.C
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Apply any timeouts that expire at this height
|
// Apply any timeouts that expire at this height
|
||||||
func (e *hcEventsObserver) applyTimeouts(ctx context.Context, ts *types.TipSet) {
|
func (e *hcEventsObserver) applyTimeouts(ctx context.Context, at abi.ChainEpoch, ts *types.TipSet) {
|
||||||
triggers, ok := e.timeouts[ts.Height()]
|
triggers, ok := e.timeouts[at]
|
||||||
if !ok {
|
if !ok {
|
||||||
return // nothing to do
|
return // nothing to do
|
||||||
}
|
}
|
||||||
@ -258,14 +258,14 @@ func (e *hcEventsObserver) applyTimeouts(ctx context.Context, ts *types.TipSet)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// This should be cached.
|
// This should be cached.
|
||||||
timeoutTs, err := e.cs.ChainGetTipSetAfterHeight(ctx, ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Key())
|
timeoutTs, err := e.cs.ChainGetTipSetAfterHeight(ctx, at-abi.ChainEpoch(trigger.confidence), ts.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height())
|
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", at-abi.ChainEpoch(trigger.confidence), at)
|
||||||
}
|
}
|
||||||
|
|
||||||
more, err := trigger.handle(ctx, nil, nil, timeoutTs, ts.Height())
|
more, err := trigger.handle(ctx, nil, nil, timeoutTs, at)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err)
|
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), at, err)
|
||||||
continue // don't revert failed calls
|
continue // don't revert failed calls
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1255,72 +1255,80 @@ func TestStateChangedRevert(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestStateChangedTimeout(t *testing.T) {
|
func TestStateChangedTimeout(t *testing.T) {
|
||||||
|
timeoutHeight := abi.ChainEpoch(20)
|
||||||
|
confidence := 3
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
checkFn CheckFunc
|
||||||
|
nilBlocks []int
|
||||||
|
expectTimeout bool
|
||||||
|
}{{
|
||||||
|
// Verify that the state changed timeout is called at the expected height
|
||||||
|
name: "state changed timeout",
|
||||||
|
checkFn: func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
|
||||||
|
return false, true, nil
|
||||||
|
},
|
||||||
|
expectTimeout: true,
|
||||||
|
}, {
|
||||||
|
// Verify that the state changed timeout is called even if the timeout
|
||||||
|
// falls on nil block
|
||||||
|
name: "state changed timeout falls on nil block",
|
||||||
|
checkFn: func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
|
||||||
|
return false, true, nil
|
||||||
|
},
|
||||||
|
nilBlocks: []int{20, 21, 22, 23},
|
||||||
|
expectTimeout: true,
|
||||||
|
}, {
|
||||||
|
// Verify that the state changed timeout is not called if the check
|
||||||
|
// function reports that it's complete
|
||||||
|
name: "no timeout callback if check func reports done",
|
||||||
|
checkFn: func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
|
||||||
|
return true, true, nil
|
||||||
|
},
|
||||||
|
expectTimeout: false,
|
||||||
|
}}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
tc := tc
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
fcs := newFakeCS(t)
|
fcs := newFakeCS(t)
|
||||||
|
|
||||||
events, err := NewEvents(context.Background(), fcs)
|
events, err := NewEvents(context.Background(), fcs)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Track whether the callback was called
|
||||||
called := false
|
called := false
|
||||||
|
|
||||||
err = events.StateChanged(func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
|
// Set up state change tracking that will timeout at the given height
|
||||||
return false, true, nil
|
err = events.StateChanged(
|
||||||
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
|
tc.checkFn,
|
||||||
if data != nil {
|
func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
|
||||||
require.Equal(t, oldTs.Key(), newTs.Parents())
|
// Expect the callback to be called at the timeout height with nil data
|
||||||
}
|
|
||||||
called = true
|
called = true
|
||||||
require.Nil(t, data)
|
require.Nil(t, data)
|
||||||
require.Equal(t, abi.ChainEpoch(20), newTs.Height())
|
require.Equal(t, timeoutHeight, newTs.Height())
|
||||||
require.Equal(t, abi.ChainEpoch(23), curH)
|
require.Equal(t, timeoutHeight+abi.ChainEpoch(confidence), curH)
|
||||||
return false, nil
|
return false, nil
|
||||||
}, func(_ context.Context, ts *types.TipSet) error {
|
}, func(_ context.Context, ts *types.TipSet) error {
|
||||||
t.Fatal("revert on timeout")
|
t.Fatal("revert on timeout")
|
||||||
return nil
|
return nil
|
||||||
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
|
}, confidence, timeoutHeight, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
|
||||||
require.Equal(t, oldTs.Key(), newTs.Parents())
|
|
||||||
return false, nil, nil
|
return false, nil, nil
|
||||||
})
|
})
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
fcs.advance(0, 21, 0, nil)
|
// Advance to timeout height
|
||||||
|
fcs.advance(0, int(timeoutHeight)+1, 0, nil)
|
||||||
require.False(t, called)
|
require.False(t, called)
|
||||||
|
|
||||||
fcs.advance(0, 5, 0, nil)
|
// Advance past timeout height
|
||||||
require.True(t, called)
|
fcs.advance(0, 5, 0, nil, tc.nilBlocks...)
|
||||||
|
require.Equal(t, tc.expectTimeout, called)
|
||||||
called = false
|
called = false
|
||||||
|
|
||||||
// with check func reporting done
|
|
||||||
|
|
||||||
fcs = newFakeCS(t)
|
|
||||||
events, err = NewEvents(context.Background(), fcs)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
err = events.StateChanged(func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
|
|
||||||
return true, true, nil
|
|
||||||
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {
|
|
||||||
if data != nil {
|
|
||||||
require.Equal(t, oldTs.Key(), newTs.Parents())
|
|
||||||
}
|
|
||||||
called = true
|
|
||||||
require.Nil(t, data)
|
|
||||||
require.Equal(t, abi.ChainEpoch(20), newTs.Height())
|
|
||||||
require.Equal(t, abi.ChainEpoch(23), curH)
|
|
||||||
return false, nil
|
|
||||||
}, func(_ context.Context, ts *types.TipSet) error {
|
|
||||||
t.Fatal("revert on timeout")
|
|
||||||
return nil
|
|
||||||
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) {
|
|
||||||
require.Equal(t, oldTs.Key(), newTs.Parents())
|
|
||||||
return false, nil, nil
|
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
}
|
||||||
|
|
||||||
fcs.advance(0, 21, 0, nil)
|
|
||||||
require.False(t, called)
|
|
||||||
|
|
||||||
fcs.advance(0, 5, 0, nil)
|
|
||||||
require.False(t, called)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCalledMultiplePerEpoch(t *testing.T) {
|
func TestCalledMultiplePerEpoch(t *testing.T) {
|
||||||
|
Loading…
Reference in New Issue
Block a user