refactor: update cache to the new generic version (#10463)
- Adds type safety. - Reduces allocations. - Fixes the drand cache (was storing by value, but retrieving by pointer)
This commit is contained in:
parent
97a9921cdd
commit
dcb49dc8ee
@ -3,14 +3,14 @@ package chain
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
)
|
)
|
||||||
|
|
||||||
type BadBlockCache struct {
|
type BadBlockCache struct {
|
||||||
badBlocks *lru.ARCCache
|
badBlocks *lru.ARCCache[cid.Cid, BadBlockReason]
|
||||||
}
|
}
|
||||||
|
|
||||||
type BadBlockReason struct {
|
type BadBlockReason struct {
|
||||||
@ -43,7 +43,7 @@ func (bbr BadBlockReason) String() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewBadBlockCache() *BadBlockCache {
|
func NewBadBlockCache() *BadBlockCache {
|
||||||
cache, err := lru.NewARC(build.BadBlockCacheSize)
|
cache, err := lru.NewARC[cid.Cid, BadBlockReason](build.BadBlockCacheSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err) // ok
|
panic(err) // ok
|
||||||
}
|
}
|
||||||
@ -66,10 +66,5 @@ func (bts *BadBlockCache) Purge() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (bts *BadBlockCache) Has(c cid.Cid) (BadBlockReason, bool) {
|
func (bts *BadBlockCache) Has(c cid.Cid) (BadBlockReason, bool) {
|
||||||
rval, ok := bts.badBlocks.Get(c)
|
return bts.badBlocks.Get(c)
|
||||||
if !ok {
|
|
||||||
return BadBlockReason{}, false
|
|
||||||
}
|
|
||||||
|
|
||||||
return rval.(BadBlockReason), true
|
|
||||||
}
|
}
|
||||||
|
@ -12,7 +12,7 @@ import (
|
|||||||
gclient "github.com/drand/drand/lp2p/client"
|
gclient "github.com/drand/drand/lp2p/client"
|
||||||
"github.com/drand/kyber"
|
"github.com/drand/kyber"
|
||||||
kzap "github.com/go-kit/kit/log/zap"
|
kzap "github.com/go-kit/kit/log/zap"
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
@ -61,7 +61,7 @@ type DrandBeacon struct {
|
|||||||
filGenTime uint64
|
filGenTime uint64
|
||||||
filRoundTime uint64
|
filRoundTime uint64
|
||||||
|
|
||||||
localCache *lru.Cache
|
localCache *lru.Cache[uint64, *types.BeaconEntry]
|
||||||
}
|
}
|
||||||
|
|
||||||
// DrandHTTPClient interface overrides the user agent used by drand
|
// DrandHTTPClient interface overrides the user agent used by drand
|
||||||
@ -110,7 +110,7 @@ func NewDrandBeacon(genesisTs, interval uint64, ps *pubsub.PubSub, config dtypes
|
|||||||
return nil, xerrors.Errorf("creating drand client: %w", err)
|
return nil, xerrors.Errorf("creating drand client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
lc, err := lru.New(1024)
|
lc, err := lru.New[uint64, *types.BeaconEntry](1024)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -160,16 +160,12 @@ func (db *DrandBeacon) Entry(ctx context.Context, round uint64) <-chan beacon.Re
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
func (db *DrandBeacon) cacheValue(e types.BeaconEntry) {
|
func (db *DrandBeacon) cacheValue(e types.BeaconEntry) {
|
||||||
db.localCache.Add(e.Round, e)
|
db.localCache.Add(e.Round, &e)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DrandBeacon) getCachedValue(round uint64) *types.BeaconEntry {
|
func (db *DrandBeacon) getCachedValue(round uint64) *types.BeaconEntry {
|
||||||
v, ok := db.localCache.Get(round)
|
v, _ := db.localCache.Get(round)
|
||||||
if !ok {
|
return v
|
||||||
return nil
|
|
||||||
}
|
|
||||||
e, _ := v.(types.BeaconEntry)
|
|
||||||
return &e
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DrandBeacon) VerifyEntry(curr types.BeaconEntry, prev types.BeaconEntry) error {
|
func (db *DrandBeacon) VerifyEntry(curr types.BeaconEntry, prev types.BeaconEntry) error {
|
||||||
|
@ -5,7 +5,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
@ -17,7 +17,7 @@ type blockReceiptTracker struct {
|
|||||||
|
|
||||||
// using an LRU cache because i don't want to handle all the edge cases for
|
// using an LRU cache because i don't want to handle all the edge cases for
|
||||||
// manual cleanup and maintenance of a fixed size set
|
// manual cleanup and maintenance of a fixed size set
|
||||||
cache *lru.Cache
|
cache *lru.Cache[types.TipSetKey, *peerSet]
|
||||||
}
|
}
|
||||||
|
|
||||||
type peerSet struct {
|
type peerSet struct {
|
||||||
@ -25,7 +25,7 @@ type peerSet struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newBlockReceiptTracker() *blockReceiptTracker {
|
func newBlockReceiptTracker() *blockReceiptTracker {
|
||||||
c, _ := lru.New(512)
|
c, _ := lru.New[types.TipSetKey, *peerSet](512)
|
||||||
return &blockReceiptTracker{
|
return &blockReceiptTracker{
|
||||||
cache: c,
|
cache: c,
|
||||||
}
|
}
|
||||||
@ -46,20 +46,18 @@ func (brt *blockReceiptTracker) Add(p peer.ID, ts *types.TipSet) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
val.(*peerSet).peers[p] = build.Clock.Now()
|
val.peers[p] = build.Clock.Now()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (brt *blockReceiptTracker) GetPeers(ts *types.TipSet) []peer.ID {
|
func (brt *blockReceiptTracker) GetPeers(ts *types.TipSet) []peer.ID {
|
||||||
brt.lk.Lock()
|
brt.lk.Lock()
|
||||||
defer brt.lk.Unlock()
|
defer brt.lk.Unlock()
|
||||||
|
|
||||||
val, ok := brt.cache.Get(ts.Key())
|
ps, ok := brt.cache.Get(ts.Key())
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ps := val.(*peerSet)
|
|
||||||
|
|
||||||
out := make([]peer.ID, 0, len(ps.peers))
|
out := make([]peer.ID, 0, len(ps.peers))
|
||||||
for p := range ps.peers {
|
for p := range ps.peers {
|
||||||
out = append(out, p)
|
out = append(out, p)
|
||||||
|
@ -4,7 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
@ -14,14 +14,14 @@ type messageCache struct {
|
|||||||
api EventAPI
|
api EventAPI
|
||||||
|
|
||||||
blockMsgLk sync.Mutex
|
blockMsgLk sync.Mutex
|
||||||
blockMsgCache *lru.ARCCache
|
blockMsgCache *lru.ARCCache[cid.Cid, *api.BlockMessages]
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMessageCache(api EventAPI) *messageCache {
|
func newMessageCache(a EventAPI) *messageCache {
|
||||||
blsMsgCache, _ := lru.NewARC(500)
|
blsMsgCache, _ := lru.NewARC[cid.Cid, *api.BlockMessages](500)
|
||||||
|
|
||||||
return &messageCache{
|
return &messageCache{
|
||||||
api: api,
|
api: a,
|
||||||
blockMsgCache: blsMsgCache,
|
blockMsgCache: blsMsgCache,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -30,14 +30,14 @@ func (c *messageCache) ChainGetBlockMessages(ctx context.Context, blkCid cid.Cid
|
|||||||
c.blockMsgLk.Lock()
|
c.blockMsgLk.Lock()
|
||||||
defer c.blockMsgLk.Unlock()
|
defer c.blockMsgLk.Unlock()
|
||||||
|
|
||||||
msgsI, ok := c.blockMsgCache.Get(blkCid)
|
msgs, ok := c.blockMsgCache.Get(blkCid)
|
||||||
var err error
|
var err error
|
||||||
if !ok {
|
if !ok {
|
||||||
msgsI, err = c.api.ChainGetBlockMessages(ctx, blkCid)
|
msgs, err = c.api.ChainGetBlockMessages(ctx, blkCid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
c.blockMsgCache.Add(blkCid, msgsI)
|
c.blockMsgCache.Add(blkCid, msgs)
|
||||||
}
|
}
|
||||||
return msgsI.(*api.BlockMessages), nil
|
return msgs, nil
|
||||||
}
|
}
|
||||||
|
@ -12,7 +12,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/go-multierror"
|
"github.com/hashicorp/go-multierror"
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
@ -159,7 +159,7 @@ type MessagePool struct {
|
|||||||
// pruneCooldown is a channel used to allow a cooldown time between prunes
|
// pruneCooldown is a channel used to allow a cooldown time between prunes
|
||||||
pruneCooldown chan struct{}
|
pruneCooldown chan struct{}
|
||||||
|
|
||||||
blsSigCache *lru.TwoQueueCache
|
blsSigCache *lru.TwoQueueCache[cid.Cid, crypto.Signature]
|
||||||
|
|
||||||
changes *lps.PubSub
|
changes *lps.PubSub
|
||||||
|
|
||||||
@ -167,9 +167,9 @@ type MessagePool struct {
|
|||||||
|
|
||||||
netName dtypes.NetworkName
|
netName dtypes.NetworkName
|
||||||
|
|
||||||
sigValCache *lru.TwoQueueCache
|
sigValCache *lru.TwoQueueCache[string, struct{}]
|
||||||
|
|
||||||
nonceCache *lru.Cache
|
nonceCache *lru.Cache[nonceCacheKey, uint64]
|
||||||
|
|
||||||
evtTypes [3]journal.EventType
|
evtTypes [3]journal.EventType
|
||||||
journal journal.Journal
|
journal journal.Journal
|
||||||
@ -369,9 +369,9 @@ func (ms *msgSet) toSlice() []*types.SignedMessage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, us stmgr.UpgradeSchedule, netName dtypes.NetworkName, j journal.Journal) (*MessagePool, error) {
|
func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, us stmgr.UpgradeSchedule, netName dtypes.NetworkName, j journal.Journal) (*MessagePool, error) {
|
||||||
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
cache, _ := lru.New2Q[cid.Cid, crypto.Signature](build.BlsSignatureCacheSize)
|
||||||
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
|
verifcache, _ := lru.New2Q[string, struct{}](build.VerifSigCacheSize)
|
||||||
noncecache, _ := lru.New(256)
|
noncecache, _ := lru.New[nonceCacheKey, uint64](256)
|
||||||
|
|
||||||
cfg, err := loadConfig(ctx, ds)
|
cfg, err := loadConfig(ctx, ds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1053,7 +1053,7 @@ func (mp *MessagePool) getStateNonce(ctx context.Context, addr address.Address,
|
|||||||
|
|
||||||
n, ok := mp.nonceCache.Get(nk)
|
n, ok := mp.nonceCache.Get(nk)
|
||||||
if ok {
|
if ok {
|
||||||
return n.(uint64), nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
act, err := mp.api.GetActorAfter(addr, ts)
|
act, err := mp.api.GetActorAfter(addr, ts)
|
||||||
@ -1473,15 +1473,10 @@ func (mp *MessagePool) MessagesForBlocks(ctx context.Context, blks []*types.Bloc
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage {
|
func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage {
|
||||||
val, ok := mp.blsSigCache.Get(msg.Cid())
|
sig, ok := mp.blsSigCache.Get(msg.Cid())
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
sig, ok := val.(crypto.Signature)
|
|
||||||
if !ok {
|
|
||||||
log.Errorf("value in signature cache was not a signature (got %T)", val)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return &types.SignedMessage{
|
return &types.SignedMessage{
|
||||||
Message: *msg,
|
Message: *msg,
|
||||||
|
@ -207,9 +207,7 @@ type mmCids struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) ReadMsgMetaCids(ctx context.Context, mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) {
|
func (cs *ChainStore) ReadMsgMetaCids(ctx context.Context, mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) {
|
||||||
o, ok := cs.mmCache.Get(mmc)
|
if mmcids, ok := cs.mmCache.Get(mmc); ok {
|
||||||
if ok {
|
|
||||||
mmcids := o.(*mmCids)
|
|
||||||
return mmcids.bls, mmcids.secpk, nil
|
return mmcids.bls, mmcids.secpk, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,7 +227,7 @@ func (cs *ChainStore) ReadMsgMetaCids(ctx context.Context, mmc cid.Cid) ([]cid.C
|
|||||||
return nil, nil, xerrors.Errorf("loading secpk message cids for block: %w", err)
|
return nil, nil, xerrors.Errorf("loading secpk message cids for block: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cs.mmCache.Add(mmc, &mmCids{
|
cs.mmCache.Add(mmc, mmCids{
|
||||||
bls: blscids,
|
bls: blscids,
|
||||||
secpk: secpkcids,
|
secpk: secpkcids,
|
||||||
})
|
})
|
||||||
|
@ -11,7 +11,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
dstore "github.com/ipfs/go-datastore"
|
dstore "github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/query"
|
"github.com/ipfs/go-datastore/query"
|
||||||
@ -120,8 +120,8 @@ type ChainStore struct {
|
|||||||
reorgCh chan<- reorg
|
reorgCh chan<- reorg
|
||||||
reorgNotifeeCh chan ReorgNotifee
|
reorgNotifeeCh chan ReorgNotifee
|
||||||
|
|
||||||
mmCache *lru.ARCCache // msg meta cache (mh.Messages -> secp, bls []cid)
|
mmCache *lru.ARCCache[cid.Cid, mmCids]
|
||||||
tsCache *lru.ARCCache
|
tsCache *lru.ARCCache[types.TipSetKey, *types.TipSet]
|
||||||
|
|
||||||
evtTypes [1]journal.EventType
|
evtTypes [1]journal.EventType
|
||||||
journal journal.Journal
|
journal journal.Journal
|
||||||
@ -133,8 +133,8 @@ type ChainStore struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewChainStore(chainBs bstore.Blockstore, stateBs bstore.Blockstore, ds dstore.Batching, weight WeightFunc, j journal.Journal) *ChainStore {
|
func NewChainStore(chainBs bstore.Blockstore, stateBs bstore.Blockstore, ds dstore.Batching, weight WeightFunc, j journal.Journal) *ChainStore {
|
||||||
c, _ := lru.NewARC(DefaultMsgMetaCacheSize)
|
c, _ := lru.NewARC[cid.Cid, mmCids](DefaultMsgMetaCacheSize)
|
||||||
tsc, _ := lru.NewARC(DefaultTipSetCacheSize)
|
tsc, _ := lru.NewARC[types.TipSetKey, *types.TipSet](DefaultTipSetCacheSize)
|
||||||
if j == nil {
|
if j == nil {
|
||||||
j = journal.NilJournal()
|
j = journal.NilJournal()
|
||||||
}
|
}
|
||||||
@ -818,9 +818,8 @@ func (cs *ChainStore) GetBlock(ctx context.Context, c cid.Cid) (*types.BlockHead
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ChainStore) LoadTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
|
func (cs *ChainStore) LoadTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
|
||||||
v, ok := cs.tsCache.Get(tsk)
|
if ts, ok := cs.tsCache.Get(tsk); ok {
|
||||||
if ok {
|
return ts, nil
|
||||||
return v.(*types.TipSet), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch tipset block headers from blockstore in parallel
|
// Fetch tipset block headers from blockstore in parallel
|
||||||
|
@ -7,7 +7,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
bserv "github.com/ipfs/go-blockservice"
|
bserv "github.com/ipfs/go-blockservice"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
blocks "github.com/ipfs/go-libipfs/blocks"
|
blocks "github.com/ipfs/go-libipfs/blocks"
|
||||||
@ -217,7 +217,7 @@ func fetchCids(
|
|||||||
type BlockValidator struct {
|
type BlockValidator struct {
|
||||||
self peer.ID
|
self peer.ID
|
||||||
|
|
||||||
peers *lru.TwoQueueCache
|
peers *lru.TwoQueueCache[peer.ID, int]
|
||||||
|
|
||||||
killThresh int
|
killThresh int
|
||||||
|
|
||||||
@ -231,7 +231,7 @@ type BlockValidator struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewBlockValidator(self peer.ID, chain *store.ChainStore, cns consensus.Consensus, blacklist func(peer.ID)) *BlockValidator {
|
func NewBlockValidator(self peer.ID, chain *store.ChainStore, cns consensus.Consensus, blacklist func(peer.ID)) *BlockValidator {
|
||||||
p, _ := lru.New2Q(4096)
|
p, _ := lru.New2Q[peer.ID, int](4096)
|
||||||
return &BlockValidator{
|
return &BlockValidator{
|
||||||
self: self,
|
self: self,
|
||||||
peers: p,
|
peers: p,
|
||||||
@ -244,21 +244,19 @@ func NewBlockValidator(self peer.ID, chain *store.ChainStore, cns consensus.Cons
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (bv *BlockValidator) flagPeer(p peer.ID) {
|
func (bv *BlockValidator) flagPeer(p peer.ID) {
|
||||||
v, ok := bv.peers.Get(p)
|
val, ok := bv.peers.Get(p)
|
||||||
if !ok {
|
if !ok {
|
||||||
bv.peers.Add(p, int(1))
|
bv.peers.Add(p, 1)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
val := v.(int)
|
|
||||||
|
|
||||||
if val >= bv.killThresh {
|
if val >= bv.killThresh {
|
||||||
log.Warnf("blacklisting peer %s", p)
|
log.Warnf("blacklisting peer %s", p)
|
||||||
bv.blacklist(p)
|
bv.blacklist(p)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
bv.peers.Add(p, v.(int)+1)
|
bv.peers.Add(p, val+1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) (res pubsub.ValidationResult) {
|
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) (res pubsub.ValidationResult) {
|
||||||
@ -293,11 +291,11 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
|
|||||||
}
|
}
|
||||||
|
|
||||||
type blockReceiptCache struct {
|
type blockReceiptCache struct {
|
||||||
blocks *lru.TwoQueueCache
|
blocks *lru.TwoQueueCache[cid.Cid, int]
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBlockReceiptCache() *blockReceiptCache {
|
func newBlockReceiptCache() *blockReceiptCache {
|
||||||
c, _ := lru.New2Q(8192)
|
c, _ := lru.New2Q[cid.Cid, int](8192)
|
||||||
|
|
||||||
return &blockReceiptCache{
|
return &blockReceiptCache{
|
||||||
blocks: c,
|
blocks: c,
|
||||||
@ -307,12 +305,12 @@ func newBlockReceiptCache() *blockReceiptCache {
|
|||||||
func (brc *blockReceiptCache) add(bcid cid.Cid) int {
|
func (brc *blockReceiptCache) add(bcid cid.Cid) int {
|
||||||
val, ok := brc.blocks.Get(bcid)
|
val, ok := brc.blocks.Get(bcid)
|
||||||
if !ok {
|
if !ok {
|
||||||
brc.blocks.Add(bcid, int(1))
|
brc.blocks.Add(bcid, 1)
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
brc.blocks.Add(bcid, val.(int)+1)
|
brc.blocks.Add(bcid, val+1)
|
||||||
return val.(int)
|
return val
|
||||||
}
|
}
|
||||||
|
|
||||||
type MessageValidator struct {
|
type MessageValidator struct {
|
||||||
@ -466,13 +464,13 @@ type peerMsgInfo struct {
|
|||||||
type IndexerMessageValidator struct {
|
type IndexerMessageValidator struct {
|
||||||
self peer.ID
|
self peer.ID
|
||||||
|
|
||||||
peerCache *lru.TwoQueueCache
|
peerCache *lru.TwoQueueCache[address.Address, *peerMsgInfo]
|
||||||
chainApi full.ChainModuleAPI
|
chainApi full.ChainModuleAPI
|
||||||
stateApi full.StateModuleAPI
|
stateApi full.StateModuleAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewIndexerMessageValidator(self peer.ID, chainApi full.ChainModuleAPI, stateApi full.StateModuleAPI) *IndexerMessageValidator {
|
func NewIndexerMessageValidator(self peer.ID, chainApi full.ChainModuleAPI, stateApi full.StateModuleAPI) *IndexerMessageValidator {
|
||||||
peerCache, _ := lru.New2Q(8192)
|
peerCache, _ := lru.New2Q[address.Address, *peerMsgInfo](8192)
|
||||||
|
|
||||||
return &IndexerMessageValidator{
|
return &IndexerMessageValidator{
|
||||||
self: self,
|
self: self,
|
||||||
@ -515,15 +513,12 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg
|
|||||||
return pubsub.ValidationReject
|
return pubsub.ValidationReject
|
||||||
}
|
}
|
||||||
|
|
||||||
minerID := minerAddr.String()
|
|
||||||
msgCid := idxrMsg.Cid
|
msgCid := idxrMsg.Cid
|
||||||
|
|
||||||
var msgInfo *peerMsgInfo
|
var msgInfo *peerMsgInfo
|
||||||
val, ok := v.peerCache.Get(minerID)
|
msgInfo, ok := v.peerCache.Get(minerAddr)
|
||||||
if !ok {
|
if !ok {
|
||||||
msgInfo = &peerMsgInfo{}
|
msgInfo = &peerMsgInfo{}
|
||||||
} else {
|
|
||||||
msgInfo = val.(*peerMsgInfo)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock this peer's message info.
|
// Lock this peer's message info.
|
||||||
@ -544,7 +539,7 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg
|
|||||||
// Check that the miner ID maps to the peer that sent the message.
|
// Check that the miner ID maps to the peer that sent the message.
|
||||||
err = v.authenticateMessage(ctx, minerAddr, originPeer)
|
err = v.authenticateMessage(ctx, minerAddr, originPeer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnw("cannot authenticate messsage", "err", err, "peer", originPeer, "minerID", minerID)
|
log.Warnw("cannot authenticate messsage", "err", err, "peer", originPeer, "minerID", minerAddr)
|
||||||
stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1))
|
stats.Record(ctx, metrics.IndexerMessageValidationFailure.M(1))
|
||||||
return pubsub.ValidationReject
|
return pubsub.ValidationReject
|
||||||
}
|
}
|
||||||
@ -554,7 +549,7 @@ func (v *IndexerMessageValidator) Validate(ctx context.Context, pid peer.ID, msg
|
|||||||
// messages from the same peer are handled concurrently, there is a
|
// messages from the same peer are handled concurrently, there is a
|
||||||
// small chance that one msgInfo could replace the other here when
|
// small chance that one msgInfo could replace the other here when
|
||||||
// the info is first cached. This is OK, so no need to prevent it.
|
// the info is first cached. This is OK, so no need to prevent it.
|
||||||
v.peerCache.Add(minerID, msgInfo)
|
v.peerCache.Add(minerAddr, msgInfo)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -9,7 +9,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/docker/go-units"
|
"github.com/docker/go-units"
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
"github.com/ipfs/go-blockservice"
|
"github.com/ipfs/go-blockservice"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
offline "github.com/ipfs/go-ipfs-exchange-offline"
|
offline "github.com/ipfs/go-ipfs-exchange-offline"
|
||||||
@ -50,13 +50,13 @@ type fieldItem struct {
|
|||||||
|
|
||||||
type cacheNodeGetter struct {
|
type cacheNodeGetter struct {
|
||||||
ds format.NodeGetter
|
ds format.NodeGetter
|
||||||
cache *lru.TwoQueueCache
|
cache *lru.TwoQueueCache[cid.Cid, format.Node]
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCacheNodeGetter(d format.NodeGetter, size int) (*cacheNodeGetter, error) {
|
func newCacheNodeGetter(d format.NodeGetter, size int) (*cacheNodeGetter, error) {
|
||||||
cng := &cacheNodeGetter{ds: d}
|
cng := &cacheNodeGetter{ds: d}
|
||||||
|
|
||||||
cache, err := lru.New2Q(size)
|
cache, err := lru.New2Q[cid.Cid, format.Node](size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -68,7 +68,7 @@ func newCacheNodeGetter(d format.NodeGetter, size int) (*cacheNodeGetter, error)
|
|||||||
|
|
||||||
func (cng *cacheNodeGetter) Get(ctx context.Context, c cid.Cid) (format.Node, error) {
|
func (cng *cacheNodeGetter) Get(ctx context.Context, c cid.Cid) (format.Node, error) {
|
||||||
if n, ok := cng.cache.Get(c); ok {
|
if n, ok := cng.cache.Get(c); ok {
|
||||||
return n.(format.Node), nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
n, err := cng.ds.Get(ctx, c)
|
n, err := cng.ds.Get(ctx, c)
|
||||||
|
3
go.mod
3
go.mod
@ -70,7 +70,7 @@ require (
|
|||||||
github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026
|
github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026
|
||||||
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
|
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
|
||||||
github.com/hashicorp/go-multierror v1.1.1
|
github.com/hashicorp/go-multierror v1.1.1
|
||||||
github.com/hashicorp/golang-lru v0.5.4
|
github.com/hashicorp/golang-lru/v2 v2.0.2
|
||||||
github.com/hashicorp/raft v1.1.1
|
github.com/hashicorp/raft v1.1.1
|
||||||
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
|
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
|
||||||
github.com/icza/backscanner v0.0.0-20210726202459-ac2ffc679f94
|
github.com/icza/backscanner v0.0.0-20210726202459-ac2ffc679f94
|
||||||
@ -230,6 +230,7 @@ require (
|
|||||||
github.com/hashicorp/go-hclog v0.16.2 // indirect
|
github.com/hashicorp/go-hclog v0.16.2 // indirect
|
||||||
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
|
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
|
||||||
github.com/hashicorp/go-msgpack v0.5.5 // indirect
|
github.com/hashicorp/go-msgpack v0.5.5 // indirect
|
||||||
|
github.com/hashicorp/golang-lru v0.6.0 // indirect
|
||||||
github.com/huin/goupnp v1.0.3 // indirect
|
github.com/huin/goupnp v1.0.3 // indirect
|
||||||
github.com/iancoleman/orderedmap v0.1.0 // indirect
|
github.com/iancoleman/orderedmap v0.1.0 // indirect
|
||||||
github.com/ipfs/go-bitfield v1.0.0 // indirect
|
github.com/ipfs/go-bitfield v1.0.0 // indirect
|
||||||
|
5
go.sum
5
go.sum
@ -640,8 +640,11 @@ github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09
|
|||||||
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
|
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
|
||||||
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||||
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||||
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
|
|
||||||
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
|
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
|
||||||
|
github.com/hashicorp/golang-lru v0.6.0 h1:uL2shRDx7RTrOrTCUZEGP/wJUFiUI8QT6E7z5o8jga4=
|
||||||
|
github.com/hashicorp/golang-lru v0.6.0/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
|
||||||
|
github.com/hashicorp/golang-lru/v2 v2.0.2 h1:Dwmkdr5Nc/oBiXgJS3CDHNhJtIHkuZ3DZF5twqnfBdU=
|
||||||
|
github.com/hashicorp/golang-lru/v2 v2.0.2/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||||
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
|
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
|
||||||
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
|
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
|
||||||
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
|
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
|
||||||
|
@ -10,7 +10,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"go.opencensus.io/trace"
|
"go.opencensus.io/trace"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
@ -61,7 +61,7 @@ func randTimeOffset(width time.Duration) time.Duration {
|
|||||||
// NewMiner instantiates a miner with a concrete WinningPoStProver and a miner
|
// NewMiner instantiates a miner with a concrete WinningPoStProver and a miner
|
||||||
// address (which can be different from the worker's address).
|
// address (which can be different from the worker's address).
|
||||||
func NewMiner(api v1api.FullNode, epp gen.WinningPoStProver, addr address.Address, sf *slashfilter.SlashFilter, j journal.Journal) *Miner {
|
func NewMiner(api v1api.FullNode, epp gen.WinningPoStProver, addr address.Address, sf *slashfilter.SlashFilter, j journal.Journal) *Miner {
|
||||||
arc, err := lru.NewARC(10000)
|
arc, err := lru.NewARC[abi.ChainEpoch, bool](10000)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
@ -122,7 +122,7 @@ type Miner struct {
|
|||||||
// minedBlockHeights is a safeguard that caches the last heights we mined.
|
// minedBlockHeights is a safeguard that caches the last heights we mined.
|
||||||
// It is consulted before publishing a newly mined block, for a sanity check
|
// It is consulted before publishing a newly mined block, for a sanity check
|
||||||
// intended to avoid slashings in case of a bug.
|
// intended to avoid slashings in case of a bug.
|
||||||
minedBlockHeights *lru.ARCCache
|
minedBlockHeights *lru.ARCCache[abi.ChainEpoch, bool]
|
||||||
|
|
||||||
evtTypes [1]journal.EventType
|
evtTypes [1]journal.EventType
|
||||||
journal journal.Journal
|
journal journal.Journal
|
||||||
@ -331,13 +331,12 @@ minerLoop:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blkKey := fmt.Sprintf("%d", b.Header.Height)
|
if _, ok := m.minedBlockHeights.Get(b.Header.Height); ok {
|
||||||
if _, ok := m.minedBlockHeights.Get(blkKey); ok {
|
|
||||||
log.Warnw("Created a block at the same height as another block we've created", "height", b.Header.Height, "miner", b.Header.Miner, "parents", b.Header.Parents)
|
log.Warnw("Created a block at the same height as another block we've created", "height", b.Header.Height, "miner", b.Header.Miner, "parents", b.Header.Parents)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
m.minedBlockHeights.Add(blkKey, true)
|
m.minedBlockHeights.Add(b.Header.Height, true)
|
||||||
|
|
||||||
if err := m.api.SyncSubmitBlock(ctx, b); err != nil {
|
if err := m.api.SyncSubmitBlock(ctx, b); err != nil {
|
||||||
log.Errorf("failed to submit newly mined block: %+v", err)
|
log.Errorf("failed to submit newly mined block: %+v", err)
|
||||||
|
@ -3,7 +3,7 @@ package miner
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
ds "github.com/ipfs/go-datastore"
|
ds "github.com/ipfs/go-datastore"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
@ -22,7 +22,7 @@ type MineReq struct {
|
|||||||
|
|
||||||
func NewTestMiner(nextCh <-chan MineReq, addr address.Address) func(v1api.FullNode, gen.WinningPoStProver) *Miner {
|
func NewTestMiner(nextCh <-chan MineReq, addr address.Address) func(v1api.FullNode, gen.WinningPoStProver) *Miner {
|
||||||
return func(api v1api.FullNode, epp gen.WinningPoStProver) *Miner {
|
return func(api v1api.FullNode, epp gen.WinningPoStProver) *Miner {
|
||||||
arc, err := lru.NewARC(10000)
|
arc, err := lru.NewARC[abi.ChainEpoch, bool](10000)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,7 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
@ -61,7 +61,7 @@ type GasAPI struct {
|
|||||||
|
|
||||||
func NewGasPriceCache() *GasPriceCache {
|
func NewGasPriceCache() *GasPriceCache {
|
||||||
// 50 because we usually won't access more than 40
|
// 50 because we usually won't access more than 40
|
||||||
c, err := lru.New2Q(50)
|
c, err := lru.New2Q[types.TipSetKey, []GasMeta](50)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// err only if parameter is bad
|
// err only if parameter is bad
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -73,7 +73,7 @@ func NewGasPriceCache() *GasPriceCache {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type GasPriceCache struct {
|
type GasPriceCache struct {
|
||||||
c *lru.TwoQueueCache
|
c *lru.TwoQueueCache[types.TipSetKey, []GasMeta]
|
||||||
}
|
}
|
||||||
|
|
||||||
type GasMeta struct {
|
type GasMeta struct {
|
||||||
@ -84,7 +84,7 @@ type GasMeta struct {
|
|||||||
func (g *GasPriceCache) GetTSGasStats(ctx context.Context, cstore *store.ChainStore, ts *types.TipSet) ([]GasMeta, error) {
|
func (g *GasPriceCache) GetTSGasStats(ctx context.Context, cstore *store.ChainStore, ts *types.TipSet) ([]GasMeta, error) {
|
||||||
i, has := g.c.Get(ts.Key())
|
i, has := g.c.Get(ts.Key())
|
||||||
if has {
|
if has {
|
||||||
return i.([]GasMeta), nil
|
return i, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var prices []GasMeta
|
var prices []GasMeta
|
||||||
|
@ -4,7 +4,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||||
@ -17,13 +17,13 @@ type cachedLocalStorage struct {
|
|||||||
base LocalStorage
|
base LocalStorage
|
||||||
|
|
||||||
statLk sync.Mutex
|
statLk sync.Mutex
|
||||||
stats *lru.Cache // path -> statEntry
|
stats *lru.Cache[string, statEntry]
|
||||||
pathDUs *lru.Cache // path -> *diskUsageEntry
|
pathDUs *lru.Cache[string, *diskUsageEntry]
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCachedLocalStorage(ls LocalStorage) *cachedLocalStorage {
|
func newCachedLocalStorage(ls LocalStorage) *cachedLocalStorage {
|
||||||
statCache, _ := lru.New(1024)
|
statCache, _ := lru.New[string, statEntry](1024)
|
||||||
duCache, _ := lru.New(1024)
|
duCache, _ := lru.New[string, *diskUsageEntry](1024)
|
||||||
|
|
||||||
return &cachedLocalStorage{
|
return &cachedLocalStorage{
|
||||||
base: ls,
|
base: ls,
|
||||||
@ -60,8 +60,8 @@ func (c *cachedLocalStorage) Stat(path string) (fsutil.FsStat, error) {
|
|||||||
c.statLk.Lock()
|
c.statLk.Lock()
|
||||||
defer c.statLk.Unlock()
|
defer c.statLk.Unlock()
|
||||||
|
|
||||||
if v, ok := c.stats.Get(path); ok && time.Now().Sub(v.(statEntry).time) < StatTimeout {
|
if v, ok := c.stats.Get(path); ok && time.Now().Sub(v.time) < StatTimeout {
|
||||||
return v.(statEntry).stat, nil
|
return v.stat, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// if we don't, get the stat
|
// if we don't, get the stat
|
||||||
@ -83,7 +83,7 @@ func (c *cachedLocalStorage) DiskUsage(path string) (int64, error) {
|
|||||||
var entry *diskUsageEntry
|
var entry *diskUsageEntry
|
||||||
|
|
||||||
if v, ok := c.pathDUs.Get(path); ok {
|
if v, ok := c.pathDUs.Get(path); ok {
|
||||||
entry = v.(*diskUsageEntry)
|
entry = v
|
||||||
|
|
||||||
// if we have recent cached entry, use that
|
// if we have recent cached entry, use that
|
||||||
if time.Now().Sub(entry.last.time) < StatTimeout {
|
if time.Now().Sub(entry.last.time) < StatTimeout {
|
||||||
|
@ -5,7 +5,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
cbg "github.com/whyrusleeping/cbor-gen"
|
cbg "github.com/whyrusleeping/cbor-gen"
|
||||||
"go.opencensus.io/stats"
|
"go.opencensus.io/stats"
|
||||||
@ -16,7 +16,7 @@ import (
|
|||||||
type ApiIpldStore struct {
|
type ApiIpldStore struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
api apiIpldStoreApi
|
api apiIpldStoreApi
|
||||||
cache *lru.TwoQueueCache
|
cache *lru.TwoQueueCache[cid.Cid, []byte]
|
||||||
cacheSize int
|
cacheSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -31,7 +31,7 @@ func NewApiIpldStore(ctx context.Context, api apiIpldStoreApi, cacheSize int) (*
|
|||||||
cacheSize: cacheSize,
|
cacheSize: cacheSize,
|
||||||
}
|
}
|
||||||
|
|
||||||
cache, err := lru.New2Q(store.cacheSize)
|
cache, err := lru.New2Q[cid.Cid, []byte](store.cacheSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -64,7 +64,7 @@ func (ht *ApiIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) err
|
|||||||
|
|
||||||
if a, ok := ht.cache.Get(c); ok {
|
if a, ok := ht.cache.Get(c); ok {
|
||||||
stats.Record(ctx, metrics.IpldStoreCacheHit.M(1))
|
stats.Record(ctx, metrics.IpldStoreCacheHit.M(1))
|
||||||
raw = a.([]byte)
|
raw = a
|
||||||
} else {
|
} else {
|
||||||
bs, err := ht.read(ctx, c)
|
bs, err := ht.read(ctx, c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -8,7 +8,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru/v2"
|
||||||
client "github.com/influxdata/influxdb1-client/v2"
|
client "github.com/influxdata/influxdb1-client/v2"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"go.opencensus.io/stats"
|
"go.opencensus.io/stats"
|
||||||
@ -41,11 +41,11 @@ type ChainPointCollector struct {
|
|||||||
ctx context.Context
|
ctx context.Context
|
||||||
api LotusApi
|
api LotusApi
|
||||||
store adt.Store
|
store adt.Store
|
||||||
actorDigestCache *lru.TwoQueueCache
|
actorDigestCache *lru.TwoQueueCache[address.Address, string]
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewChainPointCollector(ctx context.Context, store adt.Store, api LotusApi) (*ChainPointCollector, error) {
|
func NewChainPointCollector(ctx context.Context, store adt.Store, api LotusApi) (*ChainPointCollector, error) {
|
||||||
actorDigestCache, err := lru.New2Q(2 << 15)
|
actorDigestCache, err := lru.New2Q[address.Address, string](2 << 15)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -62,7 +62,7 @@ func NewChainPointCollector(ctx context.Context, store adt.Store, api LotusApi)
|
|||||||
|
|
||||||
func (c *ChainPointCollector) actorDigest(ctx context.Context, addr address.Address, tipset *types.TipSet) (string, error) {
|
func (c *ChainPointCollector) actorDigest(ctx context.Context, addr address.Address, tipset *types.TipSet) (string, error) {
|
||||||
if code, ok := c.actorDigestCache.Get(addr); ok {
|
if code, ok := c.actorDigestCache.Get(addr); ok {
|
||||||
return code.(string), nil
|
return code, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
actor, err := c.api.StateGetActor(ctx, addr, tipset.Key())
|
actor, err := c.api.StateGetActor(ctx, addr, tipset.Key())
|
||||||
|
Loading…
Reference in New Issue
Block a user