fix: if cache best is nil, return chain head
This commit is contained in:
parent
0ad0d4ea11
commit
76a1b3286b
@ -35,6 +35,7 @@ type eventAPI interface {
|
|||||||
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
|
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
|
||||||
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
|
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
|
||||||
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
|
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
|
||||||
|
ChainHead(context.Context) (*types.TipSet, error)
|
||||||
StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error)
|
StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error)
|
||||||
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
|
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
|
||||||
|
|
||||||
@ -57,7 +58,7 @@ type Events struct {
|
|||||||
func NewEvents(ctx context.Context, api eventAPI) *Events {
|
func NewEvents(ctx context.Context, api eventAPI) *Events {
|
||||||
gcConfidence := 2 * build.ForkLengthThreshold
|
gcConfidence := 2 * build.ForkLengthThreshold
|
||||||
|
|
||||||
tsc := newTSCache(gcConfidence, api.ChainGetTipSetByHeight)
|
tsc := newTSCache(gcConfidence, api)
|
||||||
|
|
||||||
e := &Events{
|
e := &Events{
|
||||||
api: api,
|
api: api,
|
||||||
|
@ -307,7 +307,10 @@ func (e *hcEvents) onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHa
|
|||||||
defer e.lk.Unlock()
|
defer e.lk.Unlock()
|
||||||
|
|
||||||
// Check if the event has already occurred
|
// Check if the event has already occurred
|
||||||
ts := e.tsc.best()
|
ts, err := e.tsc.best()
|
||||||
|
if err != nil {
|
||||||
|
return 0, xerrors.Errorf("error getting best tipset: %w", err)
|
||||||
|
}
|
||||||
done, more, err := check(ts)
|
done, more, err := check(ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err)
|
return 0, xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err)
|
||||||
|
@ -4,6 +4,8 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
"go.opencensus.io/trace"
|
"go.opencensus.io/trace"
|
||||||
|
|
||||||
@ -152,8 +154,12 @@ func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence
|
|||||||
|
|
||||||
e.lk.Lock() // Tricky locking, check your locks if you modify this function!
|
e.lk.Lock() // Tricky locking, check your locks if you modify this function!
|
||||||
|
|
||||||
bestH := e.tsc.best().Height()
|
best, err := e.tsc.best()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error getting best tipset: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
bestH := best.Height()
|
||||||
if bestH >= h+abi.ChainEpoch(confidence) {
|
if bestH >= h+abi.ChainEpoch(confidence) {
|
||||||
ts, err := e.tsc.getNonNull(h)
|
ts, err := e.tsc.getNonNull(h)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -172,7 +178,11 @@ func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence
|
|||||||
}
|
}
|
||||||
|
|
||||||
e.lk.Lock()
|
e.lk.Lock()
|
||||||
bestH = e.tsc.best().Height()
|
best, err = e.tsc.best()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error getting best tipset: %w", err)
|
||||||
|
}
|
||||||
|
bestH = best.Height()
|
||||||
}
|
}
|
||||||
|
|
||||||
defer e.lk.Unlock()
|
defer e.lk.Unlock()
|
||||||
|
@ -46,6 +46,10 @@ type fakeCS struct {
|
|||||||
sub func(rev, app []*types.TipSet)
|
sub func(rev, app []*types.TipSet)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fcs *fakeCS) ChainHead(ctx context.Context) (*types.TipSet, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
func (fcs *fakeCS) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) {
|
func (fcs *fakeCS) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) {
|
||||||
return fcs.tipsets[key], nil
|
return fcs.tipsets[key], nil
|
||||||
}
|
}
|
||||||
@ -110,7 +114,11 @@ func (fcs *fakeCS) makeTs(t *testing.T, parents []cid.Cid, h abi.ChainEpoch, msg
|
|||||||
|
|
||||||
func (fcs *fakeCS) ChainNotify(context.Context) (<-chan []*api.HeadChange, error) {
|
func (fcs *fakeCS) ChainNotify(context.Context) (<-chan []*api.HeadChange, error) {
|
||||||
out := make(chan []*api.HeadChange, 1)
|
out := make(chan []*api.HeadChange, 1)
|
||||||
out <- []*api.HeadChange{{Type: store.HCCurrent, Val: fcs.tsc.best()}}
|
best, err := fcs.tsc.best()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out <- []*api.HeadChange{{Type: store.HCCurrent, Val: best}}
|
||||||
|
|
||||||
fcs.sub = func(rev, app []*types.TipSet) {
|
fcs.sub = func(rev, app []*types.TipSet) {
|
||||||
notif := make([]*api.HeadChange, len(rev)+len(app))
|
notif := make([]*api.HeadChange, len(rev)+len(app))
|
||||||
@ -174,7 +182,8 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid, nulls ...int) { /
|
|||||||
|
|
||||||
var revs []*types.TipSet
|
var revs []*types.TipSet
|
||||||
for i := 0; i < rev; i++ {
|
for i := 0; i < rev; i++ {
|
||||||
ts := fcs.tsc.best()
|
ts, err := fcs.tsc.best()
|
||||||
|
require.NoError(fcs.t, err)
|
||||||
|
|
||||||
if _, ok := nullm[int(ts.Height())]; !ok {
|
if _, ok := nullm[int(ts.Height())]; !ok {
|
||||||
revs = append(revs, ts)
|
revs = append(revs, ts)
|
||||||
@ -196,7 +205,9 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid, nulls ...int) { /
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
ts := fcs.makeTs(fcs.t, fcs.tsc.best().Key().Cids(), fcs.h, mc)
|
best, err := fcs.tsc.best()
|
||||||
|
require.NoError(fcs.t, err)
|
||||||
|
ts := fcs.makeTs(fcs.t, best.Key().Cids(), fcs.h, mc)
|
||||||
require.NoError(fcs.t, fcs.tsc.add(ts))
|
require.NoError(fcs.t, fcs.tsc.add(ts))
|
||||||
|
|
||||||
if hasMsgs {
|
if hasMsgs {
|
||||||
|
@ -9,7 +9,10 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type tsByHFunc func(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
|
type tsCacheAPI interface {
|
||||||
|
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
|
||||||
|
ChainHead(context.Context) (*types.TipSet, error)
|
||||||
|
}
|
||||||
|
|
||||||
// tipSetCache implements a simple ring-buffer cache to keep track of recent
|
// tipSetCache implements a simple ring-buffer cache to keep track of recent
|
||||||
// tipsets
|
// tipsets
|
||||||
@ -18,10 +21,10 @@ type tipSetCache struct {
|
|||||||
start int
|
start int
|
||||||
len int
|
len int
|
||||||
|
|
||||||
storage tsByHFunc
|
storage tsCacheAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTSCache(cap abi.ChainEpoch, storage tsByHFunc) *tipSetCache {
|
func newTSCache(cap abi.ChainEpoch, storage tsCacheAPI) *tipSetCache {
|
||||||
return &tipSetCache{
|
return &tipSetCache{
|
||||||
cache: make([]*types.TipSet, cap),
|
cache: make([]*types.TipSet, cap),
|
||||||
start: 0,
|
start: 0,
|
||||||
@ -94,7 +97,7 @@ func (tsc *tipSetCache) getNonNull(height abi.ChainEpoch) (*types.TipSet, error)
|
|||||||
func (tsc *tipSetCache) get(height abi.ChainEpoch) (*types.TipSet, error) {
|
func (tsc *tipSetCache) get(height abi.ChainEpoch) (*types.TipSet, error) {
|
||||||
if tsc.len == 0 {
|
if tsc.len == 0 {
|
||||||
log.Warnf("tipSetCache.get: cache is empty, requesting from storage (h=%d)", height)
|
log.Warnf("tipSetCache.get: cache is empty, requesting from storage (h=%d)", height)
|
||||||
return tsc.storage(context.TODO(), height, types.EmptyTSK)
|
return tsc.storage.ChainGetTipSetByHeight(context.TODO(), height, types.EmptyTSK)
|
||||||
}
|
}
|
||||||
|
|
||||||
headH := tsc.cache[tsc.start].Height()
|
headH := tsc.cache[tsc.start].Height()
|
||||||
@ -114,14 +117,18 @@ func (tsc *tipSetCache) get(height abi.ChainEpoch) (*types.TipSet, error) {
|
|||||||
|
|
||||||
if height < tail.Height() {
|
if height < tail.Height() {
|
||||||
log.Warnf("tipSetCache.get: requested tipset not in cache, requesting from storage (h=%d; tail=%d)", height, tail.Height())
|
log.Warnf("tipSetCache.get: requested tipset not in cache, requesting from storage (h=%d; tail=%d)", height, tail.Height())
|
||||||
return tsc.storage(context.TODO(), height, tail.Key())
|
return tsc.storage.ChainGetTipSetByHeight(context.TODO(), height, tail.Key())
|
||||||
}
|
}
|
||||||
|
|
||||||
return tsc.cache[normalModulo(tsc.start-int(headH-height), clen)], nil
|
return tsc.cache[normalModulo(tsc.start-int(headH-height), clen)], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tsc *tipSetCache) best() *types.TipSet {
|
func (tsc *tipSetCache) best() (*types.TipSet, error) {
|
||||||
return tsc.cache[tsc.start]
|
best := tsc.cache[tsc.start]
|
||||||
|
if best == nil {
|
||||||
|
return tsc.storage.ChainHead(context.TODO())
|
||||||
|
}
|
||||||
|
return best, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func normalModulo(n, m int) int {
|
func normalModulo(n, m int) int {
|
||||||
|
@ -13,10 +13,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestTsCache(t *testing.T) {
|
func TestTsCache(t *testing.T) {
|
||||||
tsc := newTSCache(50, func(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) {
|
tsc := newTSCache(50, &tsCacheAPIFailOnStorageCall{t: t})
|
||||||
t.Fatal("storage call")
|
|
||||||
return &types.TipSet{}, nil
|
|
||||||
})
|
|
||||||
|
|
||||||
h := abi.ChainEpoch(75)
|
h := abi.ChainEpoch(75)
|
||||||
|
|
||||||
@ -43,7 +40,12 @@ func TestTsCache(t *testing.T) {
|
|||||||
|
|
||||||
for i := 0; i < 9000; i++ {
|
for i := 0; i < 9000; i++ {
|
||||||
if i%90 > 60 {
|
if i%90 > 60 {
|
||||||
if err := tsc.revert(tsc.best()); err != nil {
|
best, err := tsc.best()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err, "; i:", i)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := tsc.revert(best); err != nil {
|
||||||
t.Fatal(err, "; i:", i)
|
t.Fatal(err, "; i:", i)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -55,11 +57,21 @@ func TestTsCache(t *testing.T) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTsCacheNulls(t *testing.T) {
|
type tsCacheAPIFailOnStorageCall struct {
|
||||||
tsc := newTSCache(50, func(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) {
|
t *testing.T
|
||||||
t.Fatal("storage call")
|
}
|
||||||
|
|
||||||
|
func (tc *tsCacheAPIFailOnStorageCall) ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) {
|
||||||
|
tc.t.Fatal("storage call")
|
||||||
return &types.TipSet{}, nil
|
return &types.TipSet{}, nil
|
||||||
})
|
}
|
||||||
|
func (tc *tsCacheAPIFailOnStorageCall) ChainHead(ctx context.Context) (*types.TipSet, error) {
|
||||||
|
tc.t.Fatal("storage call")
|
||||||
|
return &types.TipSet{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTsCacheNulls(t *testing.T) {
|
||||||
|
tsc := newTSCache(50, &tsCacheAPIFailOnStorageCall{t: t})
|
||||||
|
|
||||||
h := abi.ChainEpoch(75)
|
h := abi.ChainEpoch(75)
|
||||||
|
|
||||||
@ -91,7 +103,9 @@ func TestTsCacheNulls(t *testing.T) {
|
|||||||
add()
|
add()
|
||||||
add()
|
add()
|
||||||
|
|
||||||
require.Equal(t, h-1, tsc.best().Height())
|
best, err := tsc.best()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, h-1, best.Height())
|
||||||
|
|
||||||
ts, err := tsc.get(h - 1)
|
ts, err := tsc.get(h - 1)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -109,9 +123,17 @@ func TestTsCacheNulls(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, h-8, ts.Height())
|
require.Equal(t, h-8, ts.Height())
|
||||||
|
|
||||||
require.NoError(t, tsc.revert(tsc.best()))
|
best, err = tsc.best()
|
||||||
require.NoError(t, tsc.revert(tsc.best()))
|
require.NoError(t, err)
|
||||||
require.Equal(t, h-8, tsc.best().Height())
|
require.NoError(t, tsc.revert(best))
|
||||||
|
|
||||||
|
best, err = tsc.best()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, tsc.revert(best))
|
||||||
|
|
||||||
|
best, err = tsc.best()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, h-8, best.Height())
|
||||||
|
|
||||||
h += 50
|
h += 50
|
||||||
add()
|
add()
|
||||||
@ -120,3 +142,27 @@ func TestTsCacheNulls(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, h-1, ts.Height())
|
require.Equal(t, h-1, ts.Height())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type tsCacheAPIStorageCallCounter struct {
|
||||||
|
t *testing.T
|
||||||
|
chainGetTipSetByHeight int
|
||||||
|
chainHead int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tc *tsCacheAPIStorageCallCounter) ChainGetTipSetByHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) {
|
||||||
|
tc.chainGetTipSetByHeight++
|
||||||
|
return &types.TipSet{}, nil
|
||||||
|
}
|
||||||
|
func (tc *tsCacheAPIStorageCallCounter) ChainHead(ctx context.Context) (*types.TipSet, error) {
|
||||||
|
tc.chainHead++
|
||||||
|
return &types.TipSet{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTsCacheEmpty(t *testing.T) {
|
||||||
|
// Calling best on an empty cache should just call out to the chain API
|
||||||
|
callCounter := &tsCacheAPIStorageCallCounter{t: t}
|
||||||
|
tsc := newTSCache(50, callCounter)
|
||||||
|
_, err := tsc.best()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 1, callCounter.chainHead)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user