diff --git a/chain/store/index.go b/chain/store/index.go index 620cb2dee..5807a2705 100644 --- a/chain/store/index.go +++ b/chain/store/index.go @@ -2,18 +2,21 @@ package store import ( "context" + "hash/maphash" "os" "strconv" - "sync" + "github.com/puzpuzpuz/xsync/v2" "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/shardedmutex" ) -var DefaultChainIndexCacheSize = 32 << 15 +// DefaultChainIndexCacheSize no longer sets the maximum size, just the inital size of the map. +var DefaultChainIndexCacheSize = 1 << 15 func init() { if s := os.Getenv("LOTUS_CHAIN_INDEX_CACHE"); s != "" { @@ -27,8 +30,9 @@ func init() { } type ChainIndex struct { - indexCacheLk sync.Mutex - indexCache map[types.TipSetKey]*lbEntry + indexCache *xsync.MapOf[types.TipSetKey, *lbEntry] + + fillCacheLock shardedmutex.ShardedMutexFor[types.TipSetKey] loadTipSet loadTipSetFunc @@ -36,11 +40,16 @@ type ChainIndex struct { } type loadTipSetFunc func(context.Context, types.TipSetKey) (*types.TipSet, error) +func maphashTSK(s maphash.Seed, tsk types.TipSetKey) uint64 { + return maphash.Bytes(s, tsk.Bytes()) +} + func NewChainIndex(lts loadTipSetFunc) *ChainIndex { return &ChainIndex{ - indexCache: make(map[types.TipSetKey]*lbEntry, DefaultChainIndexCacheSize), - loadTipSet: lts, - skipLength: 20, + indexCache: xsync.NewTypedMapOfPresized[types.TipSetKey, *lbEntry](maphashTSK, DefaultChainIndexCacheSize), + fillCacheLock: shardedmutex.NewFor(maphashTSK, 32), + loadTipSet: lts, + skipLength: 20, } } @@ -59,17 +68,23 @@ func (ci *ChainIndex) GetTipsetByHeight(ctx context.Context, from *types.TipSet, return nil, xerrors.Errorf("failed to round down: %w", err) } - ci.indexCacheLk.Lock() - defer ci.indexCacheLk.Unlock() cur := rounded.Key() for { - lbe, ok := ci.indexCache[cur] + lbe, ok := ci.indexCache.Load(cur) // check the cache if !ok { - fc, err := ci.fillCache(ctx, cur) - if err != nil { - return nil, xerrors.Errorf("failed to fill cache: %w", err) + lk := ci.fillCacheLock.GetLock(cur) + lk.Lock() // if entry is missing, take the lock + lbe, ok = ci.indexCache.Load(cur) // check if someone else added it while we waited for lock + if !ok { + fc, err := ci.fillCache(ctx, cur) + if err != nil { + lk.Unlock() + return nil, xerrors.Errorf("failed to fill cache: %w", err) + } + lbe = fc + ci.indexCache.Store(cur, lbe) } - lbe = fc + lk.Unlock() } if to == lbe.targetHeight { @@ -137,7 +152,6 @@ func (ci *ChainIndex) fillCache(ctx context.Context, tsk types.TipSetKey) (*lbEn targetHeight: skipTarget.Height(), target: skipTarget.Key(), } - ci.indexCache[tsk] = lbe return lbe, nil } diff --git a/go.mod b/go.mod index d190323f2..f19c6d3ca 100644 --- a/go.mod +++ b/go.mod @@ -139,6 +139,7 @@ require ( github.com/open-rpc/meta-schema v0.0.0-20201029221707-1b72ef2ea333 github.com/polydawn/refmt v0.89.0 github.com/prometheus/client_golang v1.14.0 + github.com/puzpuzpuz/xsync/v2 v2.4.0 github.com/raulk/clock v1.1.0 github.com/raulk/go-watchdog v1.3.0 github.com/stretchr/testify v1.8.2 diff --git a/go.sum b/go.sum index 80dcf1433..0a423f8a9 100644 --- a/go.sum +++ b/go.sum @@ -1484,6 +1484,8 @@ github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5 github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/prometheus/statsd_exporter v0.21.0 h1:hA05Q5RFeIjgwKIYEdFd59xu5Wwaznf33yKI+pyX6T8= github.com/prometheus/statsd_exporter v0.21.0/go.mod h1:rbT83sZq2V+p73lHhPZfMc3MLCHmSHelCh9hSGYNLTQ= +github.com/puzpuzpuz/xsync/v2 v2.4.0 h1:5sXAMHrtx1bg9nbRZTOn8T4MkWe5V+o8yKRH02Eznag= +github.com/puzpuzpuz/xsync/v2 v2.4.0/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU= github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo= github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A= github.com/quic-go/qtls-go1-19 v0.2.1 h1:aJcKNMkH5ASEJB9FXNeZCyTEIHU1J7MmHyz1Q1TSG1A= diff --git a/lib/shardedmutex/shardedmutex.go b/lib/shardedmutex/shardedmutex.go new file mode 100644 index 000000000..47b677f51 --- /dev/null +++ b/lib/shardedmutex/shardedmutex.go @@ -0,0 +1,75 @@ +package shardedmutex + +import ( + "hash/maphash" + "sync" +) + +const cacheline = 64 + +// padding a mutex to a cacheline improves performance as the cachelines are not contested +// name old time/op new time/op delta +// Locks-8 74.6ns ± 7% 12.3ns ± 2% -83.54% (p=0.000 n=20+18) +type paddedMutex struct { + mt sync.Mutex + pad [cacheline - 8]uint8 +} + +type ShardedMutex struct { + shards []paddedMutex +} + +// New creates a new ShardedMutex with N shards +func New(n_shards int) ShardedMutex { + if n_shards < 1 { + panic("n_shards cannot be less than 1") + } + return ShardedMutex{ + shards: make([]paddedMutex, n_shards), + } +} + +func (sm ShardedMutex) Shards() int { + return len(sm.shards) +} + +func (sm ShardedMutex) Lock(shard int) { + sm.shards[shard].mt.Lock() +} + +func (sm ShardedMutex) Unlock(shard int) { + sm.shards[shard].mt.Unlock() +} + +func (sm ShardedMutex) GetLock(shard int) sync.Locker { + return &sm.shards[shard].mt +} + +type ShardedMutexFor[K any] struct { + inner ShardedMutex + + hasher func(maphash.Seed, K) uint64 + seed maphash.Seed +} + +func NewFor[K any](hasher func(maphash.Seed, K) uint64, n_shards int) ShardedMutexFor[K] { + return ShardedMutexFor[K]{ + inner: New(n_shards), + hasher: hasher, + seed: maphash.MakeSeed(), + } +} + +func (sm ShardedMutexFor[K]) shardFor(key K) int { + return int(sm.hasher(sm.seed, key) % uint64(len(sm.inner.shards))) +} + +func (sm ShardedMutexFor[K]) Lock(key K) { + sm.inner.Lock(sm.shardFor(key)) +} +func (sm ShardedMutexFor[K]) Unlock(key K) { + sm.inner.Unlock(sm.shardFor(key)) +} +func (sm ShardedMutexFor[K]) GetLock(key K) sync.Locker { + return sm.inner.GetLock(sm.shardFor(key)) +} diff --git a/lib/shardedmutex/shardedmutex_test.go b/lib/shardedmutex/shardedmutex_test.go new file mode 100644 index 000000000..a7d5f7d1d --- /dev/null +++ b/lib/shardedmutex/shardedmutex_test.go @@ -0,0 +1,159 @@ +package shardedmutex + +import ( + "fmt" + "hash/maphash" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestLockingDifferentShardsDoesNotBlock(t *testing.T) { + shards := 16 + sm := New(shards) + done := make(chan struct{}) + go func() { + select { + case <-done: + return + case <-time.After(5 * time.Second): + panic("test locked up") + } + }() + for i := 0; i < shards; i++ { + sm.Lock(i) + } + + close(done) +} +func TestLockingSameShardsBlocks(t *testing.T) { + shards := 16 + sm := New(shards) + wg := sync.WaitGroup{} + wg.Add(shards) + ch := make(chan int, shards) + + for i := 0; i < shards; i++ { + go func(i int) { + if i != 15 { + sm.Lock(i) + } + wg.Done() + wg.Wait() + sm.Lock((15 + i) % shards) + ch <- i + sm.Unlock(i) + }(i) + } + + wg.Wait() + for i := 0; i < 2*shards; i++ { + runtime.Gosched() + } + for i := 0; i < shards; i++ { + if a := <-ch; a != i { + t.Errorf("got %d instead of %d", a, i) + } + } +} + +func TestShardedByString(t *testing.T) { + shards := 16 + sm := NewFor(maphash.String, shards) + + wg1 := sync.WaitGroup{} + wg1.Add(shards * 20) + wg2 := sync.WaitGroup{} + wg2.Add(shards * 20) + + active := atomic.Int32{} + max := atomic.Int32{} + + for i := 0; i < shards*20; i++ { + go func(i int) { + wg1.Done() + wg1.Wait() + sm.Lock(fmt.Sprintf("goroutine %d", i)) + activeNew := active.Add(1) + for { + curMax := max.Load() + if curMax >= activeNew { + break + } + if max.CompareAndSwap(curMax, activeNew) { + break + } + } + for j := 0; j < 100; j++ { + runtime.Gosched() + } + active.Add(-1) + sm.Unlock(fmt.Sprintf("goroutine %d", i)) + wg2.Done() + }(i) + } + + wg2.Wait() + + if max.Load() != 16 { + t.Fatal("max load not achieved", max.Load()) + } + +} + +func BenchmarkShardedMutex(b *testing.B) { + shards := 16 + sm := New(shards) + + done := atomic.Int32{} + go func() { + for { + sm.Lock(0) + sm.Unlock(0) + if done.Load() != 0 { + return + } + } + }() + for i := 0; i < 100; i++ { + runtime.Gosched() + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + sm.Lock(1) + sm.Unlock(1) + } + done.Add(1) +} + +func BenchmarkShardedMutexOf(b *testing.B) { + shards := 16 + sm := NewFor(maphash.String, shards) + + str1 := "string1" + str2 := "string2" + + done := atomic.Int32{} + go func() { + for { + sm.Lock(str1) + sm.Unlock(str1) + if done.Load() != 0 { + return + } + } + }() + for i := 0; i < 100; i++ { + runtime.Gosched() + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + sm.Lock(str2) + sm.Unlock(str2) + } + done.Add(1) +}