From 0b579c1e0354908fd71417933cdfe496bce56822 Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 3 Mar 2021 13:12:53 -0800 Subject: [PATCH] feat(events): define Observer intreface for events - allows tipset apply and revert to be observed --- chain/events/events.go | 54 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 47 insertions(+), 7 deletions(-) diff --git a/chain/events/events.go b/chain/events/events.go index acb65d2c1..8ad40f95f 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -55,11 +55,11 @@ type Events struct { heightEvents *hcEvents + + observers []TipSetObserver } -func NewEvents(ctx context.Context, api eventAPI) *Events { - gcConfidence := 2 * build.ForkLengthThreshold - +func NewEventsWithConfidence(ctx context.Context, api eventAPI, gcConfidence abi.ChainEpoch) *Events { tsc := newTSCache(gcConfidence, api) e := &Events{ @@ -77,8 +77,9 @@ func NewEvents(ctx context.Context, api eventAPI) *Events { htHeights: map[abi.ChainEpoch][]uint64{}, }, - hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)), - ready: make(chan struct{}), + hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)), + ready: make(chan struct{}), + observers: []TipSetObserver{}, } go e.listenHeadChanges(ctx) @@ -92,6 +93,11 @@ func NewEvents(ctx context.Context, api eventAPI) *Events { return e } +func NewEvents(ctx context.Context, api eventAPI) *Events { + gcConfidence := 2 * build.ForkLengthThreshold + return NewEventsWithConfidence(ctx, api, gcConfidence) +} + func (e *Events) listenHeadChanges(ctx context.Context) { for { if err := e.listenHeadChangesOnce(ctx); err != nil { @@ -164,7 +170,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error { } } - if err := e.headChange(rev, app); err != nil { + if err := e.headChange(ctx, rev, app); err != nil { log.Warnf("headChange failed: %s", err) } @@ -177,7 +183,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error { return nil } -func (e *Events) headChange(rev, app []*types.TipSet) error { +func (e *Events) headChange(ctx context.Context, rev, app []*types.TipSet) error { if len(app) == 0 { return xerrors.New("events.headChange expected at least one applied tipset") } @@ -189,5 +195,39 @@ func (e *Events) headChange(rev, app []*types.TipSet) error { return err } + if err := e.observeChanges(ctx, rev, app); err != nil { + return err + } return e.processHeadChangeEvent(rev, app) } + +// A TipSetObserver receives notifications of tipsets +type TipSetObserver interface { + Apply(ctx context.Context, ts *types.TipSet) error + Revert(ctx context.Context, ts *types.TipSet) error +} + +// TODO: add a confidence level so we can have observers with difference levels of confidence +func (e *Events) Observe(obs TipSetObserver) error { + e.lk.Lock() + defer e.lk.Unlock() + e.observers = append(e.observers, obs) + return nil +} + +// observeChanges expects caller to hold e.lk +func (e *Events) observeChanges(ctx context.Context, rev, app []*types.TipSet) error { + for _, ts := range rev { + for _, o := range e.observers { + _ = o.Revert(ctx, ts) + } + } + + for _, ts := range app { + for _, o := range e.observers { + _ = o.Apply(ctx, ts) + } + } + + return nil +}