retry on MDB_READERS_FULL errors

This commit is contained in:
vyzo 2021-02-01 14:27:20 +02:00
parent 2080e467ba
commit c89ab1a990
2 changed files with 82 additions and 47 deletions

View File

@ -0,0 +1,20 @@
package splitstore
import (
"math/rand"
"time"
"github.com/ledgerwatch/lmdb-go/lmdb"
)
func withMaxReadersRetry(f func() error) error {
retry:
err := f()
if lmdb.IsErrno(err, lmdb.ReadersFull) {
dt := time.Microsecond + time.Duration(rand.Intn(int(time.Millisecond)))
time.Sleep(dt)
goto retry
}
return err
}

View File

@ -75,77 +75,92 @@ func NewTrackingStore(path string) (TrackingStore, error) {
func (s *trackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error {
val := epochToBytes(epoch)
return s.env.Update(func(txn *lmdb.Txn) error {
return txn.Put(s.db, cid.Hash(), val, 0)
})
return withMaxReadersRetry(
func() error {
return s.env.Update(func(txn *lmdb.Txn) error {
return txn.Put(s.db, cid.Hash(), val, 0)
})
})
}
func (s *trackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error {
val := epochToBytes(epoch)
return s.env.Update(func(txn *lmdb.Txn) error {
for _, cid := range cids {
err := txn.Put(s.db, cid.Hash(), val, 0)
if err != nil {
return err
}
}
return withMaxReadersRetry(
func() error {
return s.env.Update(func(txn *lmdb.Txn) error {
for _, cid := range cids {
err := txn.Put(s.db, cid.Hash(), val, 0)
if err != nil {
return err
}
}
return nil
})
return nil
})
})
}
func (s *trackingStore) Get(cid cid.Cid) (epoch abi.ChainEpoch, err error) {
err = s.env.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
err = withMaxReadersRetry(
func() error {
return s.env.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
val, err := txn.Get(s.db, cid.Hash())
if err != nil {
return err
}
val, err := txn.Get(s.db, cid.Hash())
if err != nil {
return err
}
epoch = bytesToEpoch(val)
return nil
})
epoch = bytesToEpoch(val)
return nil
})
})
return
}
func (s *trackingStore) Delete(cid cid.Cid) error {
return s.env.Update(func(txn *lmdb.Txn) error {
return txn.Del(s.db, cid.Hash(), nil)
})
return withMaxReadersRetry(
func() error {
return s.env.Update(func(txn *lmdb.Txn) error {
return txn.Del(s.db, cid.Hash(), nil)
})
})
}
func (s *trackingStore) Keys(ctx context.Context) (<-chan cid.Cid, error) {
ch := make(chan cid.Cid)
go func() {
err := s.env.View(func(txn *lmdb.Txn) error {
defer close(ch)
err := withMaxReadersRetry(
func() error {
return s.env.View(func(txn *lmdb.Txn) error {
defer close(ch)
txn.RawRead = true
cur, err := txn.OpenCursor(s.db)
if err != nil {
return err
}
defer cur.Close()
for {
k, _, err := cur.Get(nil, nil, lmdb.Next)
if err != nil {
if lmdb.IsNotFound(err) {
return nil
txn.RawRead = true
cur, err := txn.OpenCursor(s.db)
if err != nil {
return err
}
defer cur.Close()
return err
}
for {
k, _, err := cur.Get(nil, nil, lmdb.Next)
if err != nil {
if lmdb.IsNotFound(err) {
return nil
}
select {
case ch <- cid.NewCidV1(cid.Raw, k):
case <-ctx.Done():
return nil
}
}
})
return err
}
select {
case ch <- cid.NewCidV1(cid.Raw, k):
case <-ctx.Done():
return nil
}
}
})
})
if err != nil {
log.Errorf("error iterating over tracking store keys: %s", err)