fix: atomically get head when registering an observer

This lets us always call check (accurately).
This commit is contained in:
Steven Allen 2021-08-10 23:53:57 -07:00
parent 82ac0a24a0
commit f518e34131
5 changed files with 61 additions and 62 deletions

View File

@ -50,17 +50,13 @@ func NewEventsWithConfidence(ctx context.Context, api EventAPI, gcConfidence abi
cache := newCache(api, gcConfidence)
ob := newObserver(cache, gcConfidence)
he := newHeightEvents(cache, gcConfidence)
headChange := newHCEvents(cache)
// Cache first. Observers are ordered and we always want to fill the cache first.
ob.Observe(cache.observer())
ob.Observe(he.observer())
ob.Observe(headChange.observer())
if err := ob.start(ctx); err != nil {
return nil, err
}
he := newHeightEvents(cache, ob, gcConfidence)
headChange := newHCEvents(cache, ob)
return &Events{ob, he, headChange}, nil
}

View File

@ -69,10 +69,9 @@ type queuedEvent struct {
type hcEvents struct {
cs EventAPI
lk sync.Mutex
lastTs *types.TipSet
lk sync.Mutex
ctr triggerID
// TODO: get rid of trigger IDs and just use pointers as keys.
@ -93,7 +92,7 @@ type hcEvents struct {
watcherEvents
}
func newHCEvents(api EventAPI) *hcEvents {
func newHCEvents(api EventAPI, obs *observer) *hcEvents {
e := &hcEvents{
cs: api,
confQueue: map[triggerH]map[msgH][]*queuedEvent{},
@ -105,15 +104,16 @@ func newHCEvents(api EventAPI) *hcEvents {
e.messageEvents = newMessageEvents(e, api)
e.watcherEvents = newWatcherEvents(e, api)
// We need to take the lock as the observer could immediately try calling us.
e.lk.Lock()
e.lastTs = obs.Observe((*hcEventsObserver)(e))
e.lk.Unlock()
return e
}
type hcEventsObserver hcEvents
func (e *hcEvents) observer() TipSetObserver {
return (*hcEventsObserver)(e)
}
func (e *hcEventsObserver) Apply(ctx context.Context, from, to *types.TipSet) error {
e.lk.Lock()
defer e.lk.Unlock()
@ -284,14 +284,9 @@ func (e *hcEvents) onHeadChanged(ctx context.Context, check CheckFunc, hnd Event
defer e.lk.Unlock()
// Check if the event has already occurred
more := true
done := false
if e.lastTs != nil {
var err error
done, more, err = check(ctx, e.lastTs)
if err != nil {
return 0, xerrors.Errorf("called check error (h: %d): %w", e.lastTs.Height(), err)
}
done, more, err := check(ctx, e.lastTs)
if err != nil {
return 0, xerrors.Errorf("called check error (h: %d): %w", e.lastTs.Height(), err)
}
if done {
timeout = NoTimeout

View File

@ -30,13 +30,17 @@ type heightEvents struct {
lastGc abi.ChainEpoch //nolint:structcheck
}
func newHeightEvents(api EventAPI, gcConfidence abi.ChainEpoch) *heightEvents {
return &heightEvents{
func newHeightEvents(api EventAPI, obs *observer, gcConfidence abi.ChainEpoch) *heightEvents {
he := &heightEvents{
api: api,
gcConfidence: gcConfidence,
tsHeights: map[abi.ChainEpoch][]*heightHandler{},
triggerHeights: map[abi.ChainEpoch][]*heightHandler{},
}
he.lk.Lock()
he.head = obs.Observe((*heightEventsObserver)(he))
he.lk.Unlock()
return he
}
// ChainAt invokes the specified `HeightHandler` when the chain reaches the
@ -69,15 +73,6 @@ func (e *heightEvents) ChainAt(ctx context.Context, hnd HeightHandler, rev Rever
e.lk.Lock()
for {
head := e.head
// If we haven't initialized yet, store the trigger and move on.
if head == nil {
e.triggerHeights[triggerAt] = append(e.triggerHeights[triggerAt], handler)
e.tsHeights[h] = append(e.tsHeights[h], handler)
e.lk.Unlock()
return nil
}
if head.Height() >= h {
// Head is past the handler height. We at least need to stash the tipset to
// avoid doing this from the main event loop.
@ -152,10 +147,6 @@ func (e *heightEvents) ChainAt(ctx context.Context, hnd HeightHandler, rev Rever
}
}
func (e *heightEvents) observer() TipSetObserver {
return (*heightEventsObserver)(e)
}
// Updates the head and garbage collects if we're 2x over our garbage collection confidence period.
func (e *heightEventsObserver) updateHead(h *types.TipSet) {
e.lk.Lock()

View File

@ -875,8 +875,6 @@ func TestCalledTimeout(t *testing.T) {
events, err = NewEvents(context.Background(), fcs)
require.NoError(t, err)
fcs.advance(0, 1, 0, nil)
err = events.Called(context.Background(), func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
@ -1298,8 +1296,6 @@ func TestStateChangedTimeout(t *testing.T) {
events, err = NewEvents(context.Background(), fcs)
require.NoError(t, err)
fcs.advance(0, 1, 0, nil)
err = events.StateChanged(func(ctx context.Context, ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
}, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) {

View File

@ -18,25 +18,26 @@ import (
type observer struct {
api EventAPI
lk sync.Mutex
gcConfidence abi.ChainEpoch
ready chan struct{}
lk sync.Mutex
head *types.TipSet
maxHeight abi.ChainEpoch
observers []TipSetObserver
}
func newObserver(api EventAPI, gcConfidence abi.ChainEpoch) *observer {
return &observer{
func newObserver(api *cache, gcConfidence abi.ChainEpoch) *observer {
obs := &observer{
api: api,
gcConfidence: gcConfidence,
ready: make(chan struct{}),
observers: []TipSetObserver{},
}
obs.Observe(api.observer())
return obs
}
func (o *observer) start(ctx context.Context) error {
@ -100,12 +101,18 @@ func (o *observer) listenHeadChangesOnce(ctx context.Context) error {
return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type)
}
head := cur[0].Val
curHead := cur[0].Val
o.lk.Lock()
if o.head == nil {
o.head = head
o.head = curHead
close(o.ready)
} else if !o.head.Equals(head) {
changes, err := o.api.ChainGetPath(ctx, o.head.Key(), head.Key())
}
startHead := o.head
o.lk.Unlock()
if !startHead.Equals(curHead) {
changes, err := o.api.ChainGetPath(ctx, startHead.Key(), curHead.Key())
if err != nil {
return xerrors.Errorf("failed to get path from last applied tipset to head: %w", err)
}
@ -152,18 +159,23 @@ func (o *observer) headChange(ctx context.Context, rev, app []*types.TipSet) err
ctx, span := trace.StartSpan(ctx, "events.HeadChange")
span.AddAttributes(trace.Int64Attribute("reverts", int64(len(rev))))
span.AddAttributes(trace.Int64Attribute("applies", int64(len(app))))
o.lk.Lock()
head := o.head
o.lk.Unlock()
defer func() {
span.AddAttributes(trace.Int64Attribute("endHeight", int64(o.head.Height())))
span.AddAttributes(trace.Int64Attribute("endHeight", int64(head.Height())))
span.End()
}()
// NOTE: bailing out here if the head isn't what we expected is fine. We'll re-start the
// entire process and handle any strange reorgs.
for i, from := range rev {
if !from.Equals(o.head) {
if !from.Equals(head) {
return xerrors.Errorf(
"expected to revert %s (%d), reverting %s (%d)",
o.head.Key(), o.head.Height(), from.Key(), from.Height(),
head.Key(), head.Height(), from.Key(), from.Height(),
)
}
var to *types.TipSet
@ -171,7 +183,7 @@ func (o *observer) headChange(ctx context.Context, rev, app []*types.TipSet) err
// If we have more reverts, the next revert is the next head.
to = rev[i+1]
} else {
// At the end of the revert sequenece, we need to looup the joint tipset
// At the end of the revert sequenece, we need to lookup the joint tipset
// between the revert sequence and the apply sequence.
var err error
to, err = o.api.ChainGetTipSet(ctx, from.Parents())
@ -181,9 +193,14 @@ func (o *observer) headChange(ctx context.Context, rev, app []*types.TipSet) err
}
}
// Get the observers late in case an observer registers/unregisters itself.
// Get the current observers and atomically set the head.
//
// 1. We need to get the observers every time in case some registered/deregistered.
// 2. We need to atomically set the head so new observers don't see events twice or
// skip them.
o.lk.Lock()
observers := o.observers
o.head = to
o.lk.Unlock()
for _, obs := range observers {
@ -196,39 +213,43 @@ func (o *observer) headChange(ctx context.Context, rev, app []*types.TipSet) err
log.Errorf("reverted past finality, from %d to %d", o.maxHeight, to.Height())
}
o.head = to
head = to
}
for _, to := range app {
if to.Parents() != o.head.Key() {
if to.Parents() != head.Key() {
return xerrors.Errorf(
"cannot apply %s (%d) with parents %s on top of %s (%d)",
to.Key(), to.Height(), to.Parents(), o.head.Key(), o.head.Height(),
to.Key(), to.Height(), to.Parents(), head.Key(), head.Height(),
)
}
// Get the observers late in case an observer registers/unregisters itself.
o.lk.Lock()
observers := o.observers
o.head = to
o.lk.Unlock()
for _, obs := range observers {
if err := obs.Apply(ctx, o.head, to); err != nil {
if err := obs.Apply(ctx, head, to); err != nil {
log.Errorf("observer %T failed to revert tipset %s (%d) with: %s", obs, to.Key(), to.Height(), err)
}
}
o.head = to
if to.Height() > o.maxHeight {
o.maxHeight = to.Height()
}
head = to
}
return nil
}
// TODO: add a confidence level so we can have observers with difference levels of confidence
func (o *observer) Observe(obs TipSetObserver) {
// Observe registers the observer, and returns the current tipset. The observer is guaranteed to
// observe events starting at this tipset.
//
// Returns nil if the observer hasn't started yet (but still registers).
func (o *observer) Observe(obs TipSetObserver) *types.TipSet {
o.lk.Lock()
defer o.lk.Unlock()
o.observers = append(o.observers, obs)
return o.head
}