fix a race in tipset cache usage
This tipset cache is shared between multiple services and is called from multiple places.
This commit is contained in:
parent
114a1aaffd
commit
f32b3bf99b
@ -2,6 +2,7 @@ package events
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"golang.org/x/xerrors"
|
||||
@ -17,6 +18,8 @@ type tsCacheAPI interface {
|
||||
// tipSetCache implements a simple ring-buffer cache to keep track of recent
|
||||
// tipsets
|
||||
type tipSetCache struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
cache []*types.TipSet
|
||||
start int
|
||||
len int
|
||||
@ -35,6 +38,9 @@ func newTSCache(cap abi.ChainEpoch, storage tsCacheAPI) *tipSetCache {
|
||||
}
|
||||
|
||||
func (tsc *tipSetCache) add(ts *types.TipSet) error {
|
||||
tsc.mu.Lock()
|
||||
defer tsc.mu.Unlock()
|
||||
|
||||
if tsc.len > 0 {
|
||||
if tsc.cache[tsc.start].Height() >= ts.Height() {
|
||||
return xerrors.Errorf("tipSetCache.add: expected new tipset height to be at least %d, was %d", tsc.cache[tsc.start].Height()+1, ts.Height())
|
||||
@ -65,6 +71,13 @@ func (tsc *tipSetCache) add(ts *types.TipSet) error {
|
||||
}
|
||||
|
||||
func (tsc *tipSetCache) revert(ts *types.TipSet) error {
|
||||
tsc.mu.Lock()
|
||||
defer tsc.mu.Unlock()
|
||||
|
||||
return tsc.revertUnlocked(ts)
|
||||
}
|
||||
|
||||
func (tsc *tipSetCache) revertUnlocked(ts *types.TipSet) error {
|
||||
if tsc.len == 0 {
|
||||
return nil // this can happen, and it's fine
|
||||
}
|
||||
@ -77,7 +90,7 @@ func (tsc *tipSetCache) revert(ts *types.TipSet) error {
|
||||
tsc.start = normalModulo(tsc.start-1, len(tsc.cache))
|
||||
tsc.len--
|
||||
|
||||
_ = tsc.revert(nil) // revert null block gap
|
||||
_ = tsc.revertUnlocked(nil) // revert null block gap
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -95,7 +108,10 @@ func (tsc *tipSetCache) getNonNull(height abi.ChainEpoch) (*types.TipSet, error)
|
||||
}
|
||||
|
||||
func (tsc *tipSetCache) get(height abi.ChainEpoch) (*types.TipSet, error) {
|
||||
tsc.mu.RLock()
|
||||
|
||||
if tsc.len == 0 {
|
||||
tsc.mu.RUnlock()
|
||||
log.Warnf("tipSetCache.get: cache is empty, requesting from storage (h=%d)", height)
|
||||
return tsc.storage.ChainGetTipSetByHeight(context.TODO(), height, types.EmptyTSK)
|
||||
}
|
||||
@ -103,6 +119,7 @@ func (tsc *tipSetCache) get(height abi.ChainEpoch) (*types.TipSet, error) {
|
||||
headH := tsc.cache[tsc.start].Height()
|
||||
|
||||
if height > headH {
|
||||
tsc.mu.RUnlock()
|
||||
return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache head: %d)", height, headH)
|
||||
}
|
||||
|
||||
@ -116,15 +133,20 @@ func (tsc *tipSetCache) get(height abi.ChainEpoch) (*types.TipSet, error) {
|
||||
}
|
||||
|
||||
if height < tail.Height() {
|
||||
tsc.mu.RUnlock()
|
||||
log.Warnf("tipSetCache.get: requested tipset not in cache, requesting from storage (h=%d; tail=%d)", height, tail.Height())
|
||||
return tsc.storage.ChainGetTipSetByHeight(context.TODO(), height, tail.Key())
|
||||
}
|
||||
|
||||
return tsc.cache[normalModulo(tsc.start-int(headH-height), clen)], nil
|
||||
ts := tsc.cache[normalModulo(tsc.start-int(headH-height), clen)]
|
||||
tsc.mu.RUnlock()
|
||||
return ts, nil
|
||||
}
|
||||
|
||||
func (tsc *tipSetCache) best() (*types.TipSet, error) {
|
||||
tsc.mu.RLock()
|
||||
best := tsc.cache[tsc.start]
|
||||
tsc.mu.RUnlock()
|
||||
if best == nil {
|
||||
return tsc.storage.ChainHead(context.TODO())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user