chore: events: implement event observer deregister method
This commit is contained in:
parent
0fa74a7712
commit
20bf46f309
@ -1469,3 +1469,35 @@ func TestReconnect(t *testing.T) {
|
|||||||
fcs.advance(0, 5, 2, nil, 0, 1, 3)
|
fcs.advance(0, 5, 2, nil, 0, 1, 3)
|
||||||
require.True(t, fcs.callNumber["ChainGetPath"] == 4)
|
require.True(t, fcs.callNumber["ChainGetPath"] == 4)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUnregister(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
fcs := newFakeCS(t)
|
||||||
|
|
||||||
|
events, err := NewEvents(ctx, fcs)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tsObs := &testObserver{t: t}
|
||||||
|
events.Observe(tsObs)
|
||||||
|
|
||||||
|
// observer receives heads as the chain advances
|
||||||
|
fcs.advance(0, 1, 0, nil)
|
||||||
|
headBeforeDeregister := events.lastTs
|
||||||
|
require.Equal(t, tsObs.head, headBeforeDeregister)
|
||||||
|
|
||||||
|
// observer unregistered successfully
|
||||||
|
found := events.Unregister(tsObs)
|
||||||
|
require.True(t, found)
|
||||||
|
|
||||||
|
// observer stops receiving heads as the chain advances
|
||||||
|
fcs.advance(0, 1, 0, nil)
|
||||||
|
require.Equal(t, tsObs.head, headBeforeDeregister)
|
||||||
|
require.NotEqual(t, tsObs.head, events.lastTs)
|
||||||
|
|
||||||
|
// unregistering an invalid observer returns false
|
||||||
|
dneObs := &testObserver{t: t}
|
||||||
|
found = events.Unregister(dneObs)
|
||||||
|
require.False(t, found)
|
||||||
|
}
|
||||||
|
@ -253,3 +253,27 @@ func (o *observer) Observe(obs TipSetObserver) *types.TipSet {
|
|||||||
o.observers = append(o.observers, obs)
|
o.observers = append(o.observers, obs)
|
||||||
return o.head
|
return o.head
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unregister unregisters an observer. Returns true if we successfully removed the observer.
|
||||||
|
//
|
||||||
|
// NOTE: The observer _may_ be called after being removed. Observers MUST handle this case
|
||||||
|
// internally.
|
||||||
|
func (o *observer) Unregister(obs TipSetObserver) (found bool) {
|
||||||
|
o.lk.Lock()
|
||||||
|
defer o.lk.Unlock()
|
||||||
|
// We _copy_ the observers list because we may be concurrently reading it from a headChange
|
||||||
|
// handler.
|
||||||
|
//
|
||||||
|
// This should happen infrequently, so it's fine if we spend a bit of time here.
|
||||||
|
newObservers := make([]TipSetObserver, 0, len(o.observers))
|
||||||
|
for _, existingObs := range o.observers {
|
||||||
|
if existingObs == obs {
|
||||||
|
found = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
newObservers = append(newObservers, existingObs)
|
||||||
|
}
|
||||||
|
|
||||||
|
o.observers = newObservers
|
||||||
|
return found
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user