Merge pull request #4282 from filecoin-project/steb/fix-tipset-cache-race
fix a race in tipset cache usage
This commit is contained in:
commit
9a26896c69
@ -2,6 +2,7 @@ package events
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
@ -17,6 +18,8 @@ type tsCacheAPI interface {
|
|||||||
// tipSetCache implements a simple ring-buffer cache to keep track of recent
|
// tipSetCache implements a simple ring-buffer cache to keep track of recent
|
||||||
// tipsets
|
// tipsets
|
||||||
type tipSetCache struct {
|
type tipSetCache struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
|
||||||
cache []*types.TipSet
|
cache []*types.TipSet
|
||||||
start int
|
start int
|
||||||
len int
|
len int
|
||||||
@ -35,6 +38,9 @@ func newTSCache(cap abi.ChainEpoch, storage tsCacheAPI) *tipSetCache {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tsc *tipSetCache) add(ts *types.TipSet) error {
|
func (tsc *tipSetCache) add(ts *types.TipSet) error {
|
||||||
|
tsc.mu.Lock()
|
||||||
|
defer tsc.mu.Unlock()
|
||||||
|
|
||||||
if tsc.len > 0 {
|
if tsc.len > 0 {
|
||||||
if tsc.cache[tsc.start].Height() >= ts.Height() {
|
if tsc.cache[tsc.start].Height() >= ts.Height() {
|
||||||
return xerrors.Errorf("tipSetCache.add: expected new tipset height to be at least %d, was %d", tsc.cache[tsc.start].Height()+1, ts.Height())
|
return xerrors.Errorf("tipSetCache.add: expected new tipset height to be at least %d, was %d", tsc.cache[tsc.start].Height()+1, ts.Height())
|
||||||
@ -65,6 +71,13 @@ func (tsc *tipSetCache) add(ts *types.TipSet) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tsc *tipSetCache) revert(ts *types.TipSet) error {
|
func (tsc *tipSetCache) revert(ts *types.TipSet) error {
|
||||||
|
tsc.mu.Lock()
|
||||||
|
defer tsc.mu.Unlock()
|
||||||
|
|
||||||
|
return tsc.revertUnlocked(ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tsc *tipSetCache) revertUnlocked(ts *types.TipSet) error {
|
||||||
if tsc.len == 0 {
|
if tsc.len == 0 {
|
||||||
return nil // this can happen, and it's fine
|
return nil // this can happen, and it's fine
|
||||||
}
|
}
|
||||||
@ -77,7 +90,7 @@ func (tsc *tipSetCache) revert(ts *types.TipSet) error {
|
|||||||
tsc.start = normalModulo(tsc.start-1, len(tsc.cache))
|
tsc.start = normalModulo(tsc.start-1, len(tsc.cache))
|
||||||
tsc.len--
|
tsc.len--
|
||||||
|
|
||||||
_ = tsc.revert(nil) // revert null block gap
|
_ = tsc.revertUnlocked(nil) // revert null block gap
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,7 +108,10 @@ func (tsc *tipSetCache) getNonNull(height abi.ChainEpoch) (*types.TipSet, error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (tsc *tipSetCache) get(height abi.ChainEpoch) (*types.TipSet, error) {
|
func (tsc *tipSetCache) get(height abi.ChainEpoch) (*types.TipSet, error) {
|
||||||
|
tsc.mu.RLock()
|
||||||
|
|
||||||
if tsc.len == 0 {
|
if tsc.len == 0 {
|
||||||
|
tsc.mu.RUnlock()
|
||||||
log.Warnf("tipSetCache.get: cache is empty, requesting from storage (h=%d)", height)
|
log.Warnf("tipSetCache.get: cache is empty, requesting from storage (h=%d)", height)
|
||||||
return tsc.storage.ChainGetTipSetByHeight(context.TODO(), height, types.EmptyTSK)
|
return tsc.storage.ChainGetTipSetByHeight(context.TODO(), height, types.EmptyTSK)
|
||||||
}
|
}
|
||||||
@ -103,6 +119,7 @@ func (tsc *tipSetCache) get(height abi.ChainEpoch) (*types.TipSet, error) {
|
|||||||
headH := tsc.cache[tsc.start].Height()
|
headH := tsc.cache[tsc.start].Height()
|
||||||
|
|
||||||
if height > headH {
|
if height > headH {
|
||||||
|
tsc.mu.RUnlock()
|
||||||
return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache head: %d)", height, headH)
|
return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache head: %d)", height, headH)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -116,15 +133,20 @@ func (tsc *tipSetCache) get(height abi.ChainEpoch) (*types.TipSet, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if height < tail.Height() {
|
if height < tail.Height() {
|
||||||
|
tsc.mu.RUnlock()
|
||||||
log.Warnf("tipSetCache.get: requested tipset not in cache, requesting from storage (h=%d; tail=%d)", height, tail.Height())
|
log.Warnf("tipSetCache.get: requested tipset not in cache, requesting from storage (h=%d; tail=%d)", height, tail.Height())
|
||||||
return tsc.storage.ChainGetTipSetByHeight(context.TODO(), height, tail.Key())
|
return tsc.storage.ChainGetTipSetByHeight(context.TODO(), height, tail.Key())
|
||||||
}
|
}
|
||||||
|
|
||||||
return tsc.cache[normalModulo(tsc.start-int(headH-height), clen)], nil
|
ts := tsc.cache[normalModulo(tsc.start-int(headH-height), clen)]
|
||||||
|
tsc.mu.RUnlock()
|
||||||
|
return ts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tsc *tipSetCache) best() (*types.TipSet, error) {
|
func (tsc *tipSetCache) best() (*types.TipSet, error) {
|
||||||
|
tsc.mu.RLock()
|
||||||
best := tsc.cache[tsc.start]
|
best := tsc.cache[tsc.start]
|
||||||
|
tsc.mu.RUnlock()
|
||||||
if best == nil {
|
if best == nil {
|
||||||
return tsc.storage.ChainHead(context.TODO())
|
return tsc.storage.ChainHead(context.TODO())
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user