Merge pull request #5693 from filecoin-project/sentinel/binary

Implement Event observer and Settings for 3rd party dep injection
This commit is contained in:
Łukasz Magiera 2021-03-23 11:15:11 +01:00 committed by GitHub
commit f145d4f46b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 63 additions and 7 deletions

View File

@ -55,11 +55,11 @@ type Events struct {
heightEvents heightEvents
*hcEvents *hcEvents
observers []TipSetObserver
} }
func NewEvents(ctx context.Context, api eventAPI) *Events { func NewEventsWithConfidence(ctx context.Context, api eventAPI, gcConfidence abi.ChainEpoch) *Events {
gcConfidence := 2 * build.ForkLengthThreshold
tsc := newTSCache(gcConfidence, api) tsc := newTSCache(gcConfidence, api)
e := &Events{ e := &Events{
@ -77,8 +77,9 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
htHeights: map[abi.ChainEpoch][]uint64{}, htHeights: map[abi.ChainEpoch][]uint64{},
}, },
hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)), hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}), ready: make(chan struct{}),
observers: []TipSetObserver{},
} }
go e.listenHeadChanges(ctx) go e.listenHeadChanges(ctx)
@ -92,6 +93,11 @@ func NewEvents(ctx context.Context, api eventAPI) *Events {
return e return e
} }
func NewEvents(ctx context.Context, api eventAPI) *Events {
gcConfidence := 2 * build.ForkLengthThreshold
return NewEventsWithConfidence(ctx, api, gcConfidence)
}
func (e *Events) listenHeadChanges(ctx context.Context) { func (e *Events) listenHeadChanges(ctx context.Context) {
for { for {
if err := e.listenHeadChangesOnce(ctx); err != nil { if err := e.listenHeadChangesOnce(ctx); err != nil {
@ -164,7 +170,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
} }
} }
if err := e.headChange(rev, app); err != nil { if err := e.headChange(ctx, rev, app); err != nil {
log.Warnf("headChange failed: %s", err) log.Warnf("headChange failed: %s", err)
} }
@ -177,7 +183,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
return nil return nil
} }
func (e *Events) headChange(rev, app []*types.TipSet) error { func (e *Events) headChange(ctx context.Context, rev, app []*types.TipSet) error {
if len(app) == 0 { if len(app) == 0 {
return xerrors.New("events.headChange expected at least one applied tipset") return xerrors.New("events.headChange expected at least one applied tipset")
} }
@ -189,5 +195,39 @@ func (e *Events) headChange(rev, app []*types.TipSet) error {
return err return err
} }
if err := e.observeChanges(ctx, rev, app); err != nil {
return err
}
return e.processHeadChangeEvent(rev, app) return e.processHeadChangeEvent(rev, app)
} }
// A TipSetObserver receives notifications of tipsets
type TipSetObserver interface {
Apply(ctx context.Context, ts *types.TipSet) error
Revert(ctx context.Context, ts *types.TipSet) error
}
// TODO: add a confidence level so we can have observers with difference levels of confidence
func (e *Events) Observe(obs TipSetObserver) error {
e.lk.Lock()
defer e.lk.Unlock()
e.observers = append(e.observers, obs)
return nil
}
// observeChanges expects caller to hold e.lk
func (e *Events) observeChanges(ctx context.Context, rev, app []*types.TipSet) error {
for _, ts := range rev {
for _, o := range e.observers {
_ = o.Revert(ctx, ts)
}
}
for _, ts := range app {
for _, o := range e.observers {
_ = o.Apply(ctx, ts)
}
}
return nil
}

View File

@ -729,3 +729,19 @@ func Test() Option {
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})), Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})),
) )
} }
// For 3rd party dep injection.
func WithRepoType(repoType repo.RepoType) func(s *Settings) error {
return func(s *Settings) error {
s.nodeType = repoType
return nil
}
}
func WithInvokesKey(i invoke, resApi interface{}) func(s *Settings) error {
return func(s *Settings) error {
s.invokes[i] = fx.Populate(resApi)
return nil
}
}