feat: include previous TS in StateChangedHandler
This commit is contained in:
parent
dd490220d7
commit
b62fef7541
@ -13,6 +13,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const NoTimeout = math.MaxInt64
|
const NoTimeout = math.MaxInt64
|
||||||
|
const NoHeight = abi.ChainEpoch(-1)
|
||||||
|
|
||||||
type triggerID = uint64
|
type triggerID = uint64
|
||||||
|
|
||||||
@ -26,9 +27,10 @@ type triggerH = abi.ChainEpoch
|
|||||||
type eventData interface{}
|
type eventData interface{}
|
||||||
|
|
||||||
// EventHandler arguments:
|
// EventHandler arguments:
|
||||||
// `ts` is the tipset, in which the `msg` is included.
|
// `prevTs` is the previous tipset, eg the "from" tipset for a state change.
|
||||||
|
// `ts` is the event tipset, eg the tipset in which the `msg` is included.
|
||||||
// `curH`-`ts.Height` = `confidence`
|
// `curH`-`ts.Height` = `confidence`
|
||||||
type EventHandler func(data eventData, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
|
type EventHandler func(data eventData, prevTs, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
|
||||||
|
|
||||||
// CheckFunc is used for atomicity guarantees. If the condition the callbacks
|
// CheckFunc is used for atomicity guarantees. If the condition the callbacks
|
||||||
// wait for has already happened in tipset `ts`
|
// wait for has already happened in tipset `ts`
|
||||||
@ -54,8 +56,9 @@ type handlerInfo struct {
|
|||||||
type queuedEvent struct {
|
type queuedEvent struct {
|
||||||
trigger triggerID
|
trigger triggerID
|
||||||
|
|
||||||
h abi.ChainEpoch
|
prevH abi.ChainEpoch
|
||||||
data eventData
|
h abi.ChainEpoch
|
||||||
|
data eventData
|
||||||
|
|
||||||
called bool
|
called bool
|
||||||
}
|
}
|
||||||
@ -112,6 +115,9 @@ func newHCEvents(ctx context.Context, cs eventAPI, tsc *tipSetCache, gcConfidenc
|
|||||||
// Called when there is a change to the head with tipsets to be
|
// Called when there is a change to the head with tipsets to be
|
||||||
// reverted / applied
|
// reverted / applied
|
||||||
func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error {
|
func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error {
|
||||||
|
e.lk.Lock()
|
||||||
|
defer e.lk.Unlock()
|
||||||
|
|
||||||
for _, ts := range rev {
|
for _, ts := range rev {
|
||||||
e.handleReverts(ts)
|
e.handleReverts(ts)
|
||||||
e.lastTs = ts
|
e.lastTs = ts
|
||||||
@ -125,7 +131,7 @@ func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error {
|
|||||||
// Queue up calls until there have been enough blocks to reach
|
// Queue up calls until there have been enough blocks to reach
|
||||||
// confidence on the state changes
|
// confidence on the state changes
|
||||||
for tid, data := range stateChanges {
|
for tid, data := range stateChanges {
|
||||||
e.queueForConfidence(tid, data, ts)
|
e.queueForConfidence(tid, data, e.lastTs, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if the head change included any new message calls
|
// Check if the head change included any new message calls
|
||||||
@ -137,11 +143,11 @@ func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error {
|
|||||||
// Queue up calls until there have been enough blocks to reach
|
// Queue up calls until there have been enough blocks to reach
|
||||||
// confidence on the message calls
|
// confidence on the message calls
|
||||||
for tid, data := range newCalls {
|
for tid, data := range newCalls {
|
||||||
e.queueForConfidence(tid, data, ts)
|
e.queueForConfidence(tid, data, nil, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
for at := e.lastTs.Height(); at <= ts.Height(); at++ {
|
for at := e.lastTs.Height(); at <= ts.Height(); at++ {
|
||||||
// Apply any events and timeouts that were queued up until the
|
// Apply any queued events and timeouts that were targeted at the
|
||||||
// current chain height
|
// current chain height
|
||||||
e.applyWithConfidence(ts, at)
|
e.applyWithConfidence(ts, at)
|
||||||
e.applyTimeouts(ts)
|
e.applyTimeouts(ts)
|
||||||
@ -170,8 +176,7 @@ func (e *hcEvents) handleReverts(ts *types.TipSet) {
|
|||||||
trigger := e.triggers[event.trigger]
|
trigger := e.triggers[event.trigger]
|
||||||
|
|
||||||
if err := trigger.revert(e.ctx, ts); err != nil {
|
if err := trigger.revert(e.ctx, ts); err != nil {
|
||||||
log.Errorf("reverting chain trigger failed: %s", err)
|
log.Errorf("reverting chain trigger (@H %d, triggered @ %d) failed: %s", ts.Height(), triggerH, err)
|
||||||
// log.Errorf("reverting chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, ts.Height(), triggerH, err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
delete(e.confQueue[triggerH], ts.Height())
|
delete(e.confQueue[triggerH], ts.Height())
|
||||||
@ -181,9 +186,13 @@ func (e *hcEvents) handleReverts(ts *types.TipSet) {
|
|||||||
|
|
||||||
// Queue up events until the chain has reached a height that reflects the
|
// Queue up events until the chain has reached a height that reflects the
|
||||||
// desired confidence
|
// desired confidence
|
||||||
func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, ts *types.TipSet) {
|
func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, prevTs, ts *types.TipSet) {
|
||||||
trigger := e.triggers[trigID]
|
trigger := e.triggers[trigID]
|
||||||
|
|
||||||
|
prevH := NoHeight
|
||||||
|
if prevTs != nil {
|
||||||
|
prevH = prevTs.Height()
|
||||||
|
}
|
||||||
appliedH := ts.Height()
|
appliedH := ts.Height()
|
||||||
|
|
||||||
triggerH := appliedH + abi.ChainEpoch(trigger.confidence)
|
triggerH := appliedH + abi.ChainEpoch(trigger.confidence)
|
||||||
@ -196,6 +205,7 @@ func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, ts *types.T
|
|||||||
|
|
||||||
byOrigH[appliedH] = append(byOrigH[appliedH], &queuedEvent{
|
byOrigH[appliedH] = append(byOrigH[appliedH], &queuedEvent{
|
||||||
trigger: trigID,
|
trigger: trigID,
|
||||||
|
prevH: prevH,
|
||||||
h: appliedH,
|
h: appliedH,
|
||||||
data: data,
|
data: data,
|
||||||
})
|
})
|
||||||
@ -207,7 +217,7 @@ func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, ts *types.T
|
|||||||
func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) {
|
func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) {
|
||||||
byOrigH, ok := e.confQueue[height]
|
byOrigH, ok := e.confQueue[height]
|
||||||
if !ok {
|
if !ok {
|
||||||
return // no triggers at thin height
|
return // no triggers at this height
|
||||||
}
|
}
|
||||||
|
|
||||||
for origH, events := range byOrigH {
|
for origH, events := range byOrigH {
|
||||||
@ -226,10 +236,20 @@ func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch)
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
more, err := trigger.handle(event.data, triggerTs, height)
|
// Previous tipset - this is relevant for example in a state change
|
||||||
|
// from one tipset to another
|
||||||
|
var prevTs *types.TipSet
|
||||||
|
if event.prevH != NoHeight {
|
||||||
|
prevTs, err = e.tsc.get(event.prevH)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("events: applyWithConfidence didn't find tipset for previous event; wanted %d; current %d", event.prevH, height)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
more, err := trigger.handle(event.data, prevTs, triggerTs, height)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("chain trigger (@H %d, called @ %d) failed: %s", origH, height, err)
|
log.Errorf("chain trigger (@H %d, triggered @ %d) failed: %s", origH, height, err)
|
||||||
// log.Errorf("chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, origH, height, err)
|
|
||||||
continue // don't revert failed calls
|
continue // don't revert failed calls
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -266,8 +286,7 @@ func (e *hcEvents) applyTimeouts(ts *types.TipSet) {
|
|||||||
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height())
|
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height())
|
||||||
}
|
}
|
||||||
|
|
||||||
// more, err := trigger.handle(nil, nil, timeoutTs, ts.Height())
|
more, err := trigger.handle(nil, nil, timeoutTs, ts.Height())
|
||||||
more, err := trigger.handle(nil, timeoutTs, ts.Height())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err)
|
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err)
|
||||||
continue // don't revert failed calls
|
continue // don't revert failed calls
|
||||||
@ -355,7 +374,7 @@ func (we *watcherEvents) checkStateChanges(oldState, newState *types.TipSet) map
|
|||||||
|
|
||||||
res := make(map[triggerID]eventData)
|
res := make(map[triggerID]eventData)
|
||||||
for tid, matchFn := range we.matchers {
|
for tid, matchFn := range we.matchers {
|
||||||
ok, data, err := matchFn(we.ctx, oldState, newState)
|
ok, data, err := matchFn(oldState, newState)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("event diff fn failed: %s", err)
|
log.Errorf("event diff fn failed: %s", err)
|
||||||
continue
|
continue
|
||||||
@ -369,14 +388,19 @@ func (we *watcherEvents) checkStateChanges(oldState, newState *types.TipSet) map
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Used to store the state change
|
// Used to store the state change
|
||||||
type stateData interface{}
|
type stateChange interface{}
|
||||||
|
|
||||||
|
// Will be checked to ensure it has length 2
|
||||||
|
type stateData []stateChange
|
||||||
|
|
||||||
// StateChangeHandler arguments:
|
// StateChangeHandler arguments:
|
||||||
// `ts` is the tipset, in which the change occured
|
// `oldTs` is the state "from" tipset
|
||||||
|
// `newTs` is the state "to" tipset
|
||||||
|
// `data` is the old / new state
|
||||||
// `curH`-`ts.Height` = `confidence`
|
// `curH`-`ts.Height` = `confidence`
|
||||||
type StateChangeHandler func(ctx context.Context, oldState, newState stateData, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
|
type StateChangeHandler func(oldTs, newTs *types.TipSet, data stateData, curH abi.ChainEpoch) (more bool, err error)
|
||||||
|
|
||||||
type StateMatchFunc func(ctx context.Context, oldState, newState *types.TipSet) (bool, eventData, error)
|
type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, stateData, error)
|
||||||
|
|
||||||
// StateChanged registers a callback which is triggered when a specified state
|
// StateChanged registers a callback which is triggered when a specified state
|
||||||
// change occurs or a timeout is reached.
|
// change occurs or a timeout is reached.
|
||||||
@ -400,20 +424,20 @@ type StateMatchFunc func(ctx context.Context, oldState, newState *types.TipSet)
|
|||||||
//
|
//
|
||||||
// * `RevertHandler` is called after apply handler, when we drop the tipset
|
// * `RevertHandler` is called after apply handler, when we drop the tipset
|
||||||
// containing the message. The tipset passed as the argument is the tipset
|
// containing the message. The tipset passed as the argument is the tipset
|
||||||
// that is being dropped. Note that the message dropped may be re-applied
|
// that is being dropped. Note that the event dropped may be re-applied
|
||||||
// in a different tipset in small amount of time.
|
// in a different tipset in small amount of time.
|
||||||
//
|
//
|
||||||
// * `StateMatchFunc` is called against each tipset state. If there is a match,
|
// * `StateMatchFunc` is called against each tipset state. If there is a match,
|
||||||
// the state change is queued up until the confidence interval has elapsed (and
|
// the state change is queued up until the confidence interval has elapsed (and
|
||||||
// `StateChangeHandler` is called)
|
// `StateChangeHandler` is called)
|
||||||
func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf StateMatchFunc) error {
|
func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf StateMatchFunc) error {
|
||||||
hnd := func(data eventData, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
|
hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
|
||||||
states, ok := data.([]interface{})
|
states, ok := data.(stateData)
|
||||||
if !ok || len(states) != 2 {
|
if data != nil && (!ok || len(states) != 2) {
|
||||||
panic("expected 2 element array")
|
panic("StateChangeHandler: stateData passed to watcher must be a 2 element array: [old state, new state]")
|
||||||
}
|
}
|
||||||
|
|
||||||
return scHnd(we.ctx, states[0], states[1], ts, height)
|
return scHnd(prevTs, ts, states, height)
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := we.hcApi.onHeadChanged(check, hnd, rev, confidence, timeout)
|
id, err := we.hcApi.onHeadChanged(check, hnd, rev, confidence, timeout)
|
||||||
@ -557,7 +581,7 @@ type MsgMatchFunc func(msg *types.Message) (bool, error)
|
|||||||
// message is queued up until the confidence interval has elapsed (and
|
// message is queued up until the confidence interval has elapsed (and
|
||||||
// `MsgHandler` is called)
|
// `MsgHandler` is called)
|
||||||
func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MsgMatchFunc) error {
|
func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MsgMatchFunc) error {
|
||||||
hnd := func(data eventData, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
|
hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
|
||||||
msg, ok := data.(*types.Message)
|
msg, ok := data.(*types.Message)
|
||||||
if data != nil && !ok {
|
if data != nil && !ok {
|
||||||
panic("expected msg")
|
panic("expected msg")
|
||||||
|
@ -1004,8 +1004,6 @@ func TestRemoveTriggersOnMessage(t *testing.T) {
|
|||||||
return false, true, nil
|
return false, true, nil
|
||||||
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
|
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
|
||||||
require.Equal(t, false, applied)
|
require.Equal(t, false, applied)
|
||||||
fmt.Println(msg == nil)
|
|
||||||
fmt.Println(curH)
|
|
||||||
applied = true
|
applied = true
|
||||||
return more, nil
|
return more, nil
|
||||||
}, func(_ context.Context, ts *types.TipSet) error {
|
}, func(_ context.Context, ts *types.TipSet) error {
|
||||||
@ -1067,3 +1065,244 @@ func TestRemoveTriggersOnMessage(t *testing.T) {
|
|||||||
require.Equal(t, true, applied)
|
require.Equal(t, true, applied)
|
||||||
require.Equal(t, false, reverted)
|
require.Equal(t, false, reverted)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStateChanged(t *testing.T) {
|
||||||
|
fcs := &fakeCS{
|
||||||
|
t: t,
|
||||||
|
h: 1,
|
||||||
|
|
||||||
|
msgs: map[cid.Cid]fakeMsg{},
|
||||||
|
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||||
|
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||||
|
}
|
||||||
|
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||||
|
|
||||||
|
events := NewEvents(context.Background(), fcs)
|
||||||
|
|
||||||
|
more := true
|
||||||
|
var applied, reverted bool
|
||||||
|
var appliedData stateData
|
||||||
|
var appliedOldTs *types.TipSet
|
||||||
|
var appliedNewTs *types.TipSet
|
||||||
|
var appliedH abi.ChainEpoch
|
||||||
|
var matchData stateData
|
||||||
|
|
||||||
|
confidence := 3
|
||||||
|
timeout := abi.ChainEpoch(20)
|
||||||
|
|
||||||
|
err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
|
||||||
|
return false, true, nil
|
||||||
|
}, func(oldTs, newTs *types.TipSet, data stateData, curH abi.ChainEpoch) (bool, error) {
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
applied = true
|
||||||
|
appliedData = data
|
||||||
|
appliedOldTs = oldTs
|
||||||
|
appliedNewTs = newTs
|
||||||
|
appliedH = curH
|
||||||
|
return more, nil
|
||||||
|
}, func(_ context.Context, ts *types.TipSet) error {
|
||||||
|
reverted = true
|
||||||
|
return nil
|
||||||
|
}, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, stateData, error) {
|
||||||
|
if matchData == nil {
|
||||||
|
return false, matchData, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
d := matchData
|
||||||
|
matchData = nil
|
||||||
|
return true, d, nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// create few blocks to make sure nothing get's randomly called
|
||||||
|
|
||||||
|
fcs.advance(0, 4, nil) // H=5
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
// create state change (but below confidence threshold)
|
||||||
|
matchData = []stateChange{"a", "b"}
|
||||||
|
fcs.advance(0, 3, nil)
|
||||||
|
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
// create additional block so we are above confidence threshold
|
||||||
|
|
||||||
|
fcs.advance(0, 2, nil) // H=10 (confidence=3, apply)
|
||||||
|
|
||||||
|
require.Equal(t, true, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
applied = false
|
||||||
|
|
||||||
|
// dip below confidence (should not apply again)
|
||||||
|
fcs.advance(2, 2, nil) // H=10 (confidence=3, apply)
|
||||||
|
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
// Change happens from 5 -> 6
|
||||||
|
require.Equal(t, abi.ChainEpoch(5), appliedOldTs.Height())
|
||||||
|
require.Equal(t, abi.ChainEpoch(6), appliedNewTs.Height())
|
||||||
|
|
||||||
|
// Actually applied (with confidence) at 9
|
||||||
|
require.Equal(t, abi.ChainEpoch(9), appliedH)
|
||||||
|
|
||||||
|
// Make sure the state change was correctly passed through
|
||||||
|
require.Equal(t, "a", appliedData[0])
|
||||||
|
require.Equal(t, "b", appliedData[1])
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateChangedRevert(t *testing.T) {
|
||||||
|
fcs := &fakeCS{
|
||||||
|
t: t,
|
||||||
|
h: 1,
|
||||||
|
|
||||||
|
msgs: map[cid.Cid]fakeMsg{},
|
||||||
|
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||||
|
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||||
|
}
|
||||||
|
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||||
|
|
||||||
|
events := NewEvents(context.Background(), fcs)
|
||||||
|
|
||||||
|
more := true
|
||||||
|
var applied, reverted bool
|
||||||
|
var matchData stateData
|
||||||
|
|
||||||
|
confidence := 1
|
||||||
|
timeout := abi.ChainEpoch(20)
|
||||||
|
|
||||||
|
err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
|
||||||
|
return false, true, nil
|
||||||
|
}, func(oldTs, newTs *types.TipSet, data stateData, curH abi.ChainEpoch) (bool, error) {
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
applied = true
|
||||||
|
return more, nil
|
||||||
|
}, func(_ context.Context, ts *types.TipSet) error {
|
||||||
|
reverted = true
|
||||||
|
return nil
|
||||||
|
}, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, stateData, error) {
|
||||||
|
if matchData == nil {
|
||||||
|
return false, matchData, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
d := matchData
|
||||||
|
matchData = nil
|
||||||
|
return true, d, nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fcs.advance(0, 2, nil) // H=3
|
||||||
|
|
||||||
|
// Make a state change from TS at height 3 to TS at height 4
|
||||||
|
matchData = []stateChange{"a", "b"}
|
||||||
|
fcs.advance(0, 1, nil) // H=4
|
||||||
|
|
||||||
|
// Haven't yet reached confidence
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
// Advance to reach confidence level
|
||||||
|
fcs.advance(0, 1, nil) // H=5
|
||||||
|
|
||||||
|
// Should now have called the handler
|
||||||
|
require.Equal(t, true, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
applied = false
|
||||||
|
|
||||||
|
// Advance 3 more TS
|
||||||
|
fcs.advance(0, 3, nil) // H=8
|
||||||
|
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
// Regress but not so far as to cause a revert
|
||||||
|
fcs.advance(3, 1, nil) // H=6
|
||||||
|
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
// Regress back to state where change happened
|
||||||
|
fcs.advance(3, 1, nil) // H=4
|
||||||
|
|
||||||
|
// Expect revert to have happened
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, true, reverted)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateChangedTimeout(t *testing.T) {
|
||||||
|
fcs := &fakeCS{
|
||||||
|
t: t,
|
||||||
|
h: 1,
|
||||||
|
|
||||||
|
msgs: map[cid.Cid]fakeMsg{},
|
||||||
|
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||||
|
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||||
|
}
|
||||||
|
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||||
|
|
||||||
|
events := NewEvents(context.Background(), fcs)
|
||||||
|
|
||||||
|
called := false
|
||||||
|
|
||||||
|
err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
|
||||||
|
return false, true, nil
|
||||||
|
}, func(oldTs, newTs *types.TipSet, data stateData, curH abi.ChainEpoch) (bool, error) {
|
||||||
|
called = true
|
||||||
|
require.Nil(t, data)
|
||||||
|
require.Equal(t, abi.ChainEpoch(20), newTs.Height())
|
||||||
|
require.Equal(t, abi.ChainEpoch(23), curH)
|
||||||
|
return false, nil
|
||||||
|
}, func(_ context.Context, ts *types.TipSet) error {
|
||||||
|
t.Fatal("revert on timeout")
|
||||||
|
return nil
|
||||||
|
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, stateData, error) {
|
||||||
|
return false, stateData{}, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fcs.advance(0, 21, nil)
|
||||||
|
require.False(t, called)
|
||||||
|
|
||||||
|
fcs.advance(0, 5, nil)
|
||||||
|
require.True(t, called)
|
||||||
|
called = false
|
||||||
|
|
||||||
|
// with check func reporting done
|
||||||
|
|
||||||
|
fcs = &fakeCS{
|
||||||
|
t: t,
|
||||||
|
h: 1,
|
||||||
|
|
||||||
|
msgs: map[cid.Cid]fakeMsg{},
|
||||||
|
blkMsgs: map[cid.Cid]cid.Cid{},
|
||||||
|
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||||
|
}
|
||||||
|
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
|
||||||
|
|
||||||
|
events = NewEvents(context.Background(), fcs)
|
||||||
|
|
||||||
|
err = events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) {
|
||||||
|
return true, true, nil
|
||||||
|
}, func(oldTs, newTs *types.TipSet, data stateData, curH abi.ChainEpoch) (bool, error) {
|
||||||
|
called = true
|
||||||
|
require.Nil(t, data)
|
||||||
|
require.Equal(t, abi.ChainEpoch(20), newTs.Height())
|
||||||
|
require.Equal(t, abi.ChainEpoch(23), curH)
|
||||||
|
return false, nil
|
||||||
|
}, func(_ context.Context, ts *types.TipSet) error {
|
||||||
|
t.Fatal("revert on timeout")
|
||||||
|
return nil
|
||||||
|
}, 3, 20, func(oldTs, newTs *types.TipSet) (bool, stateData, error) {
|
||||||
|
return false, stateData{}, nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fcs.advance(0, 21, nil)
|
||||||
|
require.False(t, called)
|
||||||
|
|
||||||
|
fcs.advance(0, 5, nil)
|
||||||
|
require.False(t, called)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user