diff --git a/chain/events/events.go b/chain/events/events.go index 4550fc98a..ba5899270 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -35,6 +35,7 @@ type eventAPI interface { ChainNotify(context.Context) (<-chan []*api.HeadChange, error) ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) + ChainHead(context.Context) (*types.TipSet, error) StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error) ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error) @@ -57,7 +58,7 @@ type Events struct { func NewEvents(ctx context.Context, api eventAPI) *Events { gcConfidence := 2 * build.ForkLengthThreshold - tsc := newTSCache(gcConfidence, api.ChainGetTipSetByHeight) + tsc := newTSCache(gcConfidence, api) e := &Events{ api: api, diff --git a/chain/events/events_called.go b/chain/events/events_called.go index 196034a9a..2f813a1d4 100644 --- a/chain/events/events_called.go +++ b/chain/events/events_called.go @@ -307,7 +307,10 @@ func (e *hcEvents) onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHa defer e.lk.Unlock() // Check if the event has already occurred - ts := e.tsc.best() + ts, err := e.tsc.best() + if err != nil { + return 0, xerrors.Errorf("error getting best tipset: %w", err) + } done, more, err := check(ts) if err != nil { return 0, xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err) diff --git a/chain/events/events_height.go b/chain/events/events_height.go index 24d758a31..8317c4da4 100644 --- a/chain/events/events_height.go +++ b/chain/events/events_height.go @@ -4,6 +4,8 @@ import ( "context" "sync" + "golang.org/x/xerrors" + "github.com/filecoin-project/specs-actors/actors/abi" "go.opencensus.io/trace" @@ -152,8 +154,12 @@ func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence e.lk.Lock() // Tricky locking, check your locks if you modify this function! - bestH := e.tsc.best().Height() + best, err := e.tsc.best() + if err != nil { + return xerrors.Errorf("error getting best tipset: %w", err) + } + bestH := best.Height() if bestH >= h+abi.ChainEpoch(confidence) { ts, err := e.tsc.getNonNull(h) if err != nil { @@ -172,7 +178,11 @@ func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence } e.lk.Lock() - bestH = e.tsc.best().Height() + best, err = e.tsc.best() + if err != nil { + return xerrors.Errorf("error getting best tipset: %w", err) + } + bestH = best.Height() } defer e.lk.Unlock() diff --git a/chain/events/events_test.go b/chain/events/events_test.go index 1204e3938..58cb855e2 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -46,6 +46,10 @@ type fakeCS struct { sub func(rev, app []*types.TipSet) } +func (fcs *fakeCS) ChainHead(ctx context.Context) (*types.TipSet, error) { + panic("implement me") +} + func (fcs *fakeCS) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) { return fcs.tipsets[key], nil } @@ -110,7 +114,11 @@ func (fcs *fakeCS) makeTs(t *testing.T, parents []cid.Cid, h abi.ChainEpoch, msg func (fcs *fakeCS) ChainNotify(context.Context) (<-chan []*api.HeadChange, error) { out := make(chan []*api.HeadChange, 1) - out <- []*api.HeadChange{{Type: store.HCCurrent, Val: fcs.tsc.best()}} + best, err := fcs.tsc.best() + if err != nil { + return nil, err + } + out <- []*api.HeadChange{{Type: store.HCCurrent, Val: best}} fcs.sub = func(rev, app []*types.TipSet) { notif := make([]*api.HeadChange, len(rev)+len(app)) @@ -174,7 +182,8 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid, nulls ...int) { / var revs []*types.TipSet for i := 0; i < rev; i++ { - ts := fcs.tsc.best() + ts, err := fcs.tsc.best() + require.NoError(fcs.t, err) if _, ok := nullm[int(ts.Height())]; !ok { revs = append(revs, ts) @@ -196,7 +205,9 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid, nulls ...int) { / continue } - ts := fcs.makeTs(fcs.t, fcs.tsc.best().Key().Cids(), fcs.h, mc) + best, err := fcs.tsc.best() + require.NoError(fcs.t, err) + ts := fcs.makeTs(fcs.t, best.Key().Cids(), fcs.h, mc) require.NoError(fcs.t, fcs.tsc.add(ts)) if hasMsgs { diff --git a/chain/events/tscache.go b/chain/events/tscache.go index 3852c9930..20935976c 100644 --- a/chain/events/tscache.go +++ b/chain/events/tscache.go @@ -9,7 +9,10 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -type tsByHFunc func(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) +type tsCacheAPI interface { + ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) + ChainHead(context.Context) (*types.TipSet, error) +} // tipSetCache implements a simple ring-buffer cache to keep track of recent // tipsets @@ -18,10 +21,10 @@ type tipSetCache struct { start int len int - storage tsByHFunc + storage tsCacheAPI } -func newTSCache(cap abi.ChainEpoch, storage tsByHFunc) *tipSetCache { +func newTSCache(cap abi.ChainEpoch, storage tsCacheAPI) *tipSetCache { return &tipSetCache{ cache: make([]*types.TipSet, cap), start: 0, @@ -94,7 +97,7 @@ func (tsc *tipSetCache) getNonNull(height abi.ChainEpoch) (*types.TipSet, error) func (tsc *tipSetCache) get(height abi.ChainEpoch) (*types.TipSet, error) { if tsc.len == 0 { log.Warnf("tipSetCache.get: cache is empty, requesting from storage (h=%d)", height) - return tsc.storage(context.TODO(), height, types.EmptyTSK) + return tsc.storage.ChainGetTipSetByHeight(context.TODO(), height, types.EmptyTSK) } headH := tsc.cache[tsc.start].Height() @@ -114,14 +117,18 @@ func (tsc *tipSetCache) get(height abi.ChainEpoch) (*types.TipSet, error) { if height < tail.Height() { log.Warnf("tipSetCache.get: requested tipset not in cache, requesting from storage (h=%d; tail=%d)", height, tail.Height()) - return tsc.storage(context.TODO(), height, tail.Key()) + return tsc.storage.ChainGetTipSetByHeight(context.TODO(), height, tail.Key()) } return tsc.cache[normalModulo(tsc.start-int(headH-height), clen)], nil } -func (tsc *tipSetCache) best() *types.TipSet { - return tsc.cache[tsc.start] +func (tsc *tipSetCache) best() (*types.TipSet, error) { + best := tsc.cache[tsc.start] + if best == nil { + return tsc.storage.ChainHead(context.TODO()) + } + return best, nil } func normalModulo(n, m int) int { diff --git a/chain/events/tscache_test.go b/chain/events/tscache_test.go index 1278e58e9..201221e9f 100644 --- a/chain/events/tscache_test.go +++ b/chain/events/tscache_test.go @@ -13,10 +13,7 @@ import ( ) func TestTsCache(t *testing.T) { - tsc := newTSCache(50, func(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) { - t.Fatal("storage call") - return &types.TipSet{}, nil - }) + tsc := newTSCache(50, &tsCacheAPIFailOnStorageCall{t: t}) h := abi.ChainEpoch(75) @@ -43,7 +40,12 @@ func TestTsCache(t *testing.T) { for i := 0; i < 9000; i++ { if i%90 > 60 { - if err := tsc.revert(tsc.best()); err != nil { + best, err := tsc.best() + if err != nil { + t.Fatal(err, "; i:", i) + return + } + if err := tsc.revert(best); err != nil { t.Fatal(err, "; i:", i) return } @@ -55,11 +57,21 @@ func TestTsCache(t *testing.T) { } +type tsCacheAPIFailOnStorageCall struct { + t *testing.T +} + +func (tc *tsCacheAPIFailOnStorageCall) ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) { + tc.t.Fatal("storage call") + return &types.TipSet{}, nil +} +func (tc *tsCacheAPIFailOnStorageCall) ChainHead(ctx context.Context) (*types.TipSet, error) { + tc.t.Fatal("storage call") + return &types.TipSet{}, nil +} + func TestTsCacheNulls(t *testing.T) { - tsc := newTSCache(50, func(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) { - t.Fatal("storage call") - return &types.TipSet{}, nil - }) + tsc := newTSCache(50, &tsCacheAPIFailOnStorageCall{t: t}) h := abi.ChainEpoch(75) @@ -91,7 +103,9 @@ func TestTsCacheNulls(t *testing.T) { add() add() - require.Equal(t, h-1, tsc.best().Height()) + best, err := tsc.best() + require.NoError(t, err) + require.Equal(t, h-1, best.Height()) ts, err := tsc.get(h - 1) require.NoError(t, err) @@ -109,9 +123,17 @@ func TestTsCacheNulls(t *testing.T) { require.NoError(t, err) require.Equal(t, h-8, ts.Height()) - require.NoError(t, tsc.revert(tsc.best())) - require.NoError(t, tsc.revert(tsc.best())) - require.Equal(t, h-8, tsc.best().Height()) + best, err = tsc.best() + require.NoError(t, err) + require.NoError(t, tsc.revert(best)) + + best, err = tsc.best() + require.NoError(t, err) + require.NoError(t, tsc.revert(best)) + + best, err = tsc.best() + require.NoError(t, err) + require.Equal(t, h-8, best.Height()) h += 50 add() @@ -120,3 +142,27 @@ func TestTsCacheNulls(t *testing.T) { require.NoError(t, err) require.Equal(t, h-1, ts.Height()) } + +type tsCacheAPIStorageCallCounter struct { + t *testing.T + chainGetTipSetByHeight int + chainHead int +} + +func (tc *tsCacheAPIStorageCallCounter) ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) { + tc.chainGetTipSetByHeight++ + return &types.TipSet{}, nil +} +func (tc *tsCacheAPIStorageCallCounter) ChainHead(ctx context.Context) (*types.TipSet, error) { + tc.chainHead++ + return &types.TipSet{}, nil +} + +func TestTsCacheEmpty(t *testing.T) { + // Calling best on an empty cache should just call out to the chain API + callCounter := &tsCacheAPIStorageCallCounter{t: t} + tsc := newTSCache(50, callCounter) + _, err := tsc.best() + require.NoError(t, err) + require.Equal(t, 1, callCounter.chainHead) +}