feat(events): define Observer intreface for events

- allows tipset apply and revert to be observed
This commit is contained in:
frrist 2021-03-03 13:12:53 -08:00
parent 4c7df0ac38
commit 0b579c1e03

View File

@ -55,11 +55,11 @@ type Events struct {
heightEvents
*hcEvents
observers []TipSetObserver
}
func NewEvents(ctx context.Context, api eventAPI) *Events {
gcConfidence := 2 * build.ForkLengthThreshold
func NewEventsWithConfidence(ctx context.Context, api eventAPI, gcConfidence abi.ChainEpoch) *Events {
tsc := newTSCache(gcConfidence, api)
e := &Events{
@ -77,8 +77,9 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
htHeights: map[abi.ChainEpoch][]uint64{},
},
hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
observers: []TipSetObserver{},
}
go e.listenHeadChanges(ctx)
@ -92,6 +93,11 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
return e
}
func NewEvents(ctx context.Context, api eventAPI) *Events {
gcConfidence := 2 * build.ForkLengthThreshold
return NewEventsWithConfidence(ctx, api, gcConfidence)
}
func (e *Events) listenHeadChanges(ctx context.Context) {
for {
if err := e.listenHeadChangesOnce(ctx); err != nil {
@ -164,7 +170,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
}
}
if err := e.headChange(rev, app); err != nil {
if err := e.headChange(ctx, rev, app); err != nil {
log.Warnf("headChange failed: %s", err)
}
@ -177,7 +183,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
return nil
}
func (e *Events) headChange(rev, app []*types.TipSet) error {
func (e *Events) headChange(ctx context.Context, rev, app []*types.TipSet) error {
if len(app) == 0 {
return xerrors.New("events.headChange expected at least one applied tipset")
}
@ -189,5 +195,39 @@ func (e *Events) headChange(rev, app []*types.TipSet) error {
return err
}
if err := e.observeChanges(ctx, rev, app); err != nil {
return err
}
return e.processHeadChangeEvent(rev, app)
}
// A TipSetObserver receives notifications of tipsets
type TipSetObserver interface {
Apply(ctx context.Context, ts *types.TipSet) error
Revert(ctx context.Context, ts *types.TipSet) error
}
// TODO: add a confidence level so we can have observers with difference levels of confidence
func (e *Events) Observe(obs TipSetObserver) error {
e.lk.Lock()
defer e.lk.Unlock()
e.observers = append(e.observers, obs)
return nil
}
// observeChanges expects caller to hold e.lk
func (e *Events) observeChanges(ctx context.Context, rev, app []*types.TipSet) error {
for _, ts := range rev {
for _, o := range e.observers {
_ = o.Revert(ctx, ts)
}
}
for _, ts := range app {
for _, o := range e.observers {
_ = o.Apply(ctx, ts)
}
}
return nil
}