diff --git a/chain/events/events.go b/chain/events/events.go index acb65d2c1..8ad40f95f 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -55,11 +55,11 @@ type Events struct { heightEvents *hcEvents + + observers []TipSetObserver } -func NewEvents(ctx context.Context, api eventAPI) *Events { - gcConfidence := 2 * build.ForkLengthThreshold - +func NewEventsWithConfidence(ctx context.Context, api eventAPI, gcConfidence abi.ChainEpoch) *Events { tsc := newTSCache(gcConfidence, api) e := &Events{ @@ -77,8 +77,9 @@ func NewEvents(ctx context.Context, api eventAPI) *Events { htHeights: map[abi.ChainEpoch][]uint64{}, }, - hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)), - ready: make(chan struct{}), + hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)), + ready: make(chan struct{}), + observers: []TipSetObserver{}, } go e.listenHeadChanges(ctx) @@ -92,6 +93,11 @@ func NewEvents(ctx context.Context, api eventAPI) *Events { return e } +func NewEvents(ctx context.Context, api eventAPI) *Events { + gcConfidence := 2 * build.ForkLengthThreshold + return NewEventsWithConfidence(ctx, api, gcConfidence) +} + func (e *Events) listenHeadChanges(ctx context.Context) { for { if err := e.listenHeadChangesOnce(ctx); err != nil { @@ -164,7 +170,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error { } } - if err := e.headChange(rev, app); err != nil { + if err := e.headChange(ctx, rev, app); err != nil { log.Warnf("headChange failed: %s", err) } @@ -177,7 +183,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error { return nil } -func (e *Events) headChange(rev, app []*types.TipSet) error { +func (e *Events) headChange(ctx context.Context, rev, app []*types.TipSet) error { if len(app) == 0 { return xerrors.New("events.headChange expected at least one applied tipset") } @@ -189,5 +195,39 @@ func (e *Events) headChange(rev, app []*types.TipSet) error { return err } + if err := e.observeChanges(ctx, rev, app); err != nil { + return err + } return e.processHeadChangeEvent(rev, app) } + +// A TipSetObserver receives notifications of tipsets +type TipSetObserver interface { + Apply(ctx context.Context, ts *types.TipSet) error + Revert(ctx context.Context, ts *types.TipSet) error +} + +// TODO: add a confidence level so we can have observers with difference levels of confidence +func (e *Events) Observe(obs TipSetObserver) error { + e.lk.Lock() + defer e.lk.Unlock() + e.observers = append(e.observers, obs) + return nil +} + +// observeChanges expects caller to hold e.lk +func (e *Events) observeChanges(ctx context.Context, rev, app []*types.TipSet) error { + for _, ts := range rev { + for _, o := range e.observers { + _ = o.Revert(ctx, ts) + } + } + + for _, ts := range app { + for _, o := range e.observers { + _ = o.Apply(ctx, ts) + } + } + + return nil +} diff --git a/node/builder.go b/node/builder.go index 33a947826..ece74896c 100644 --- a/node/builder.go +++ b/node/builder.go @@ -729,3 +729,19 @@ func Test() Option { Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})), ) } + +// For 3rd party dep injection. + +func WithRepoType(repoType repo.RepoType) func(s *Settings) error { + return func(s *Settings) error { + s.nodeType = repoType + return nil + } +} + +func WithInvokesKey(i invoke, resApi interface{}) func(s *Settings) error { + return func(s *Settings) error { + s.invokes[i] = fx.Populate(resApi) + return nil + } +}