From 20bf46f3098b3c433231480dda98cbf66940a201 Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 6 Apr 2022 10:59:58 -0700 Subject: [PATCH] chore: events: implement event observer deregister method --- chain/events/events_test.go | 32 ++++++++++++++++++++++++++++++++ chain/events/observer.go | 24 ++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/chain/events/events_test.go b/chain/events/events_test.go index 1bc5ce710..d20be550b 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -1469,3 +1469,35 @@ func TestReconnect(t *testing.T) { fcs.advance(0, 5, 2, nil, 0, 1, 3) require.True(t, fcs.callNumber["ChainGetPath"] == 4) } + +func TestUnregister(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fcs := newFakeCS(t) + + events, err := NewEvents(ctx, fcs) + require.NoError(t, err) + + tsObs := &testObserver{t: t} + events.Observe(tsObs) + + // observer receives heads as the chain advances + fcs.advance(0, 1, 0, nil) + headBeforeDeregister := events.lastTs + require.Equal(t, tsObs.head, headBeforeDeregister) + + // observer unregistered successfully + found := events.Unregister(tsObs) + require.True(t, found) + + // observer stops receiving heads as the chain advances + fcs.advance(0, 1, 0, nil) + require.Equal(t, tsObs.head, headBeforeDeregister) + require.NotEqual(t, tsObs.head, events.lastTs) + + // unregistering an invalid observer returns false + dneObs := &testObserver{t: t} + found = events.Unregister(dneObs) + require.False(t, found) +} diff --git a/chain/events/observer.go b/chain/events/observer.go index c67d821b5..7c365c7c5 100644 --- a/chain/events/observer.go +++ b/chain/events/observer.go @@ -253,3 +253,27 @@ func (o *observer) Observe(obs TipSetObserver) *types.TipSet { o.observers = append(o.observers, obs) return o.head } + +// Unregister unregisters an observer. Returns true if we successfully removed the observer. +// +// NOTE: The observer _may_ be called after being removed. Observers MUST handle this case +// internally. +func (o *observer) Unregister(obs TipSetObserver) (found bool) { + o.lk.Lock() + defer o.lk.Unlock() + // We _copy_ the observers list because we may be concurrently reading it from a headChange + // handler. + // + // This should happen infrequently, so it's fine if we spend a bit of time here. + newObservers := make([]TipSetObserver, 0, len(o.observers)) + for _, existingObs := range o.observers { + if existingObs == obs { + found = true + continue + } + newObservers = append(newObservers, existingObs) + } + + o.observers = newObservers + return found +}