Merge pull request #4406 from filecoin-project/steb/reduce-garbage

reduce garbage in blockstore
This commit is contained in:
Łukasz Magiera 2020-10-15 01:49:01 +02:00 committed by GitHub
commit 7ccb2aabf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 458 additions and 46 deletions

View File

@ -37,6 +37,7 @@ import (
"github.com/filecoin-project/lotus/lib/bufbstore"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/impl/client"
)
var log = logging.Logger("sub")
@ -44,6 +45,13 @@ var log = logging.Logger("sub")
var ErrSoftFailure = errors.New("soft validation failure")
var ErrInsufficientPower = errors.New("incoming block's miner does not have minimum power")
var msgCidPrefix = cid.Prefix{
Version: 1,
Codec: cid.DagCBOR,
MhType: client.DefaultHashFunction,
MhLength: 32,
}
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, bs bserv.BlockService, cmgr connmgr.ConnManager) {
// Timeout after (block time + propagation delay). This is useless at
// this point.
@ -168,6 +176,9 @@ func fetchCids(
cidIndex := make(map[cid.Cid]int)
for i, c := range cids {
if c.Prefix() != msgCidPrefix {
return fmt.Errorf("invalid msg CID: %s", c)
}
cidIndex[c] = i
}
if len(cids) != len(cidIndex) {

View File

@ -323,25 +323,35 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
return xerrors.Errorf("block %s has too many messages (%d)", fblk.Header.Cid(), msgc)
}
// Collect the CIDs of both types of messages separately: BLS and Secpk.
var bcids, scids []cid.Cid
for _, m := range fblk.BlsMessages {
bcids = append(bcids, m.Cid())
}
for _, m := range fblk.SecpkMessages {
scids = append(scids, m.Cid())
}
// TODO: IMPORTANT(GARBAGE). These message puts and the msgmeta
// computation need to go into the 'temporary' side of the blockstore when
// we implement that
blockstore := syncer.store.Blockstore()
bs := cbor.NewCborStore(blockstore)
// We use a temporary bstore here to avoid writing intermediate pieces
// into the blockstore.
blockstore := bstore.NewTemporary()
cst := cbor.NewCborStore(blockstore)
var bcids, scids []cid.Cid
for _, m := range fblk.BlsMessages {
c, err := store.PutMessage(blockstore, m)
if err != nil {
return xerrors.Errorf("putting bls message to blockstore after msgmeta computation: %w", err)
}
bcids = append(bcids, c)
}
for _, m := range fblk.SecpkMessages {
c, err := store.PutMessage(blockstore, m)
if err != nil {
return xerrors.Errorf("putting bls message to blockstore after msgmeta computation: %w", err)
}
scids = append(scids, c)
}
// Compute the root CID of the combined message trie.
smroot, err := computeMsgMeta(bs, bcids, scids)
smroot, err := computeMsgMeta(cst, bcids, scids)
if err != nil {
return xerrors.Errorf("validating msgmeta, compute failed: %w", err)
}
@ -351,21 +361,8 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
return xerrors.Errorf("messages in full block did not match msgmeta root in header (%s != %s)", fblk.Header.Messages, smroot)
}
for _, m := range fblk.BlsMessages {
_, err := store.PutMessage(blockstore, m)
if err != nil {
return xerrors.Errorf("putting bls message to blockstore after msgmeta computation: %w", err)
}
}
for _, m := range fblk.SecpkMessages {
_, err := store.PutMessage(blockstore, m)
if err != nil {
return xerrors.Errorf("putting bls message to blockstore after msgmeta computation: %w", err)
}
}
return nil
// Finally, flush.
return vm.Copy(context.TODO(), blockstore, syncer.store.Blockstore(), smroot)
}
func (syncer *Syncer) LocalPeer() peer.ID {
@ -1064,8 +1061,7 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
return err
}
cst := cbor.NewCborStore(syncer.store.Blockstore())
st, err := state.LoadStateTree(cst, stateroot)
st, err := state.LoadStateTree(syncer.store.Store(ctx), stateroot)
if err != nil {
return xerrors.Errorf("failed to load base state tree: %w", err)
}
@ -1111,21 +1107,28 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
return nil
}
store := adt0.WrapStore(ctx, cst)
// Validate message arrays in a temporary blockstore.
tmpbs := bstore.NewTemporary()
tmpstore := adt0.WrapStore(ctx, cbor.NewCborStore(tmpbs))
bmArr := adt0.MakeEmptyArray(store)
bmArr := adt0.MakeEmptyArray(tmpstore)
for i, m := range b.BlsMessages {
if err := checkMsg(m); err != nil {
return xerrors.Errorf("block had invalid bls message at index %d: %w", i, err)
}
c := cbg.CborCid(m.Cid())
if err := bmArr.Set(uint64(i), &c); err != nil {
c, err := store.PutMessage(tmpbs, m)
if err != nil {
return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err)
}
k := cbg.CborCid(c)
if err := bmArr.Set(uint64(i), &k); err != nil {
return xerrors.Errorf("failed to put bls message at index %d: %w", i, err)
}
}
smArr := adt0.MakeEmptyArray(store)
smArr := adt0.MakeEmptyArray(tmpstore)
for i, m := range b.SecpkMessages {
if err := checkMsg(m); err != nil {
return xerrors.Errorf("block had invalid secpk message at index %d: %w", i, err)
@ -1142,8 +1145,12 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
return xerrors.Errorf("secpk message %s has invalid signature: %w", m.Cid(), err)
}
c := cbg.CborCid(m.Cid())
if err := smArr.Set(uint64(i), &c); err != nil {
c, err := store.PutMessage(tmpbs, m)
if err != nil {
return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err)
}
k := cbg.CborCid(c)
if err := smArr.Set(uint64(i), &k); err != nil {
return xerrors.Errorf("failed to put secpk message at index %d: %w", i, err)
}
}
@ -1158,7 +1165,7 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
return err
}
mrcid, err := cst.Put(ctx, &types.MsgMeta{
mrcid, err := tmpstore.Put(ctx, &types.MsgMeta{
BlsMessages: bmroot,
SecpkMessages: smroot,
})
@ -1170,7 +1177,8 @@ func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock
return fmt.Errorf("messages didnt match message root in header")
}
return nil
// Finally, flush.
return vm.Copy(ctx, tmpbs, syncer.store.Blockstore(), mrcid)
}
func (syncer *Syncer) verifyBlsAggregate(ctx context.Context, sig *crypto.Signature, msgs []cid.Cid, pubks [][]byte) error {

View File

@ -18,19 +18,18 @@ import (
"context"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)
// NewTemporary returns a temporary blockstore.
func NewTemporary() blockstore.Blockstore {
return NewBlockstore(ds.NewMapDatastore())
func NewTemporary() MemStore {
return make(MemStore)
}
// NewTemporarySync returns a thread-safe temporary blockstore.
func NewTemporarySync() blockstore.Blockstore {
return NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
func NewTemporarySync() *SyncStore {
return &SyncStore{bs: make(MemStore)}
}
// WrapIDStore wraps the underlying blockstore in an "identity" blockstore.

View File

@ -0,0 +1,80 @@
package blockstore
import (
"context"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)
type MemStore map[cid.Cid]blocks.Block
func (m MemStore) DeleteBlock(k cid.Cid) error {
delete(m, k)
return nil
}
func (m MemStore) Has(k cid.Cid) (bool, error) {
_, ok := m[k]
return ok, nil
}
func (m MemStore) Get(k cid.Cid) (blocks.Block, error) {
b, ok := m[k]
if !ok {
return nil, blockstore.ErrNotFound
}
return b, nil
}
// GetSize returns the CIDs mapped BlockSize
func (m MemStore) GetSize(k cid.Cid) (int, error) {
b, ok := m[k]
if !ok {
return 0, blockstore.ErrNotFound
}
return len(b.RawData()), nil
}
// Put puts a given block to the underlying datastore
func (m MemStore) Put(b blocks.Block) error {
// Convert to a basic block for safety, but try to reuse the existing
// block if it's already a basic block.
k := b.Cid()
if _, ok := b.(*blocks.BasicBlock); !ok {
// If we already have the block, abort.
if _, ok := m[k]; ok {
return nil
}
// the error is only for debugging.
b, _ = blocks.NewBlockWithCid(b.RawData(), b.Cid())
}
m[b.Cid()] = b
return nil
}
// PutMany puts a slice of blocks at the same time using batching
// capabilities of the underlying datastore whenever possible.
func (m MemStore) PutMany(bs []blocks.Block) error {
for _, b := range bs {
_ = m.Put(b) // can't fail
}
return nil
}
// AllKeysChan returns a channel from which
// the CIDs in the Blockstore can be read. It should respect
// the given context, closing the channel if it becomes Done.
func (m MemStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
ch := make(chan cid.Cid, len(m))
for k := range m {
ch <- k
}
close(ch)
return ch, nil
}
// HashOnRead specifies if every read block should be
// rehashed to make sure it matches its CID.
func (m MemStore) HashOnRead(enabled bool) {
// no-op
}

View File

@ -0,0 +1,68 @@
package blockstore
import (
"context"
"sync"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)
type SyncStore struct {
mu sync.RWMutex
bs MemStore // specifically use a memStore to save indirection overhead.
}
func (m *SyncStore) DeleteBlock(k cid.Cid) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.bs.DeleteBlock(k)
}
func (m *SyncStore) Has(k cid.Cid) (bool, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.bs.Has(k)
}
func (m *SyncStore) Get(k cid.Cid) (blocks.Block, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.bs.Get(k)
}
// GetSize returns the CIDs mapped BlockSize
func (m *SyncStore) GetSize(k cid.Cid) (int, error) {
m.mu.RLock()
defer m.mu.RUnlock()
return m.bs.GetSize(k)
}
// Put puts a given block to the underlying datastore
func (m *SyncStore) Put(b blocks.Block) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.bs.Put(b)
}
// PutMany puts a slice of blocks at the same time using batching
// capabilities of the underlying datastore whenever possible.
func (m *SyncStore) PutMany(bs []blocks.Block) error {
m.mu.Lock()
defer m.mu.Unlock()
return m.bs.PutMany(bs)
}
// AllKeysChan returns a channel from which
// the CIDs in the Blockstore can be read. It should respect
// the given context, closing the channel if it becomes Done.
func (m *SyncStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
m.mu.RLock()
defer m.mu.RUnlock()
// this blockstore implementation doesn't do any async work.
return m.bs.AllKeysChan(ctx)
}
// HashOnRead specifies if every read block should be
// rehashed to make sure it matches its CID.
func (m *SyncStore) HashOnRead(enabled bool) {
// noop
}

View File

@ -19,10 +19,12 @@ type BufferedBS struct {
}
func NewBufferedBstore(base bstore.Blockstore) *BufferedBS {
buf := bstore.NewTemporary()
var buf bstore.Blockstore
if os.Getenv("LOTUS_DISABLE_VM_BUF") == "iknowitsabadidea" {
log.Warn("VM BLOCKSTORE BUFFERING IS DISABLED")
buf = base
} else {
buf = bstore.NewTemporary()
}
return &BufferedBS{

156
lib/timedbs/timedbs.go Normal file
View File

@ -0,0 +1,156 @@
package timedbs
import (
"context"
"fmt"
"sync"
"time"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"go.uber.org/multierr"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/lib/blockstore"
)
// TimedCacheBS is a blockstore that keeps blocks for at least the specified
// caching interval before discarding them. Garbage collection must be started
// and stopped by calling Start/Stop.
//
// Under the covers, it's implemented with an active and an inactive blockstore
// that are rotated every cache time interval. This means all blocks will be
// stored at most 2x the cache interval.
type TimedCacheBS struct {
mu sync.RWMutex
active, inactive blockstore.MemStore
interval time.Duration
closeCh chan struct{}
}
func NewTimedCacheBS(cacheTime time.Duration) *TimedCacheBS {
return &TimedCacheBS{
active: blockstore.NewTemporary(),
inactive: blockstore.NewTemporary(),
interval: cacheTime,
}
}
func (t *TimedCacheBS) Start(ctx context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.closeCh != nil {
return fmt.Errorf("already started")
}
t.closeCh = make(chan struct{})
go func() {
ticker := build.Clock.Ticker(t.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
t.rotate()
case <-t.closeCh:
return
}
}
}()
return nil
}
func (t *TimedCacheBS) Stop(ctx context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.closeCh == nil {
return fmt.Errorf("not started started")
}
select {
case <-t.closeCh:
// already closed
default:
close(t.closeCh)
}
return nil
}
func (t *TimedCacheBS) rotate() {
newBs := blockstore.NewTemporary()
t.mu.Lock()
t.inactive, t.active = t.active, newBs
t.mu.Unlock()
}
func (t *TimedCacheBS) Put(b blocks.Block) error {
// Don't check the inactive set here. We want to keep this block for at
// least one interval.
t.mu.Lock()
defer t.mu.Unlock()
return t.active.Put(b)
}
func (t *TimedCacheBS) PutMany(bs []blocks.Block) error {
t.mu.Lock()
defer t.mu.Unlock()
return t.active.PutMany(bs)
}
func (t *TimedCacheBS) Get(k cid.Cid) (blocks.Block, error) {
t.mu.RLock()
defer t.mu.RUnlock()
b, err := t.active.Get(k)
if err == blockstore.ErrNotFound {
b, err = t.inactive.Get(k)
}
return b, err
}
func (t *TimedCacheBS) GetSize(k cid.Cid) (int, error) {
t.mu.RLock()
defer t.mu.RUnlock()
size, err := t.active.GetSize(k)
if err == blockstore.ErrNotFound {
size, err = t.inactive.GetSize(k)
}
return size, err
}
func (t *TimedCacheBS) Has(k cid.Cid) (bool, error) {
t.mu.RLock()
defer t.mu.RUnlock()
if has, err := t.active.Has(k); err != nil {
return false, err
} else if has {
return true, nil
}
return t.inactive.Has(k)
}
func (t *TimedCacheBS) HashOnRead(_ bool) {
// no-op
}
func (t *TimedCacheBS) DeleteBlock(k cid.Cid) error {
t.mu.Lock()
defer t.mu.Unlock()
return multierr.Combine(t.active.DeleteBlock(k), t.inactive.DeleteBlock(k))
}
func (t *TimedCacheBS) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
t.mu.RLock()
defer t.mu.RUnlock()
ch := make(chan cid.Cid, len(t.active)+len(t.inactive))
for c := range t.active {
ch <- c
}
for c := range t.inactive {
if _, ok := t.active[c]; ok {
continue
}
ch <- c
}
close(ch)
return ch, nil
}

View File

@ -0,0 +1,78 @@
package timedbs_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/lib/timedbs"
)
func TestTimedBSSimple(t *testing.T) {
tc := timedbs.NewTimedCacheBS(3 * time.Millisecond)
_ = tc.Start(context.Background())
defer func() {
_ = tc.Stop(context.Background())
}()
b1 := blocks.NewBlock([]byte("foo"))
require.NoError(t, tc.Put(b1))
b2 := blocks.NewBlock([]byte("bar"))
require.NoError(t, tc.Put(b2))
b3 := blocks.NewBlock([]byte("baz"))
b1out, err := tc.Get(b1.Cid())
require.NoError(t, err)
require.Equal(t, b1.RawData(), b1out.RawData())
has, err := tc.Has(b1.Cid())
require.NoError(t, err)
require.True(t, has)
time.Sleep(4 * time.Millisecond)
// We should still have everything.
has, err = tc.Has(b1.Cid())
require.NoError(t, err)
require.True(t, has)
has, err = tc.Has(b2.Cid())
require.NoError(t, err)
require.True(t, has)
// extend b2, add b3.
require.NoError(t, tc.Put(b2))
require.NoError(t, tc.Put(b3))
// all keys once.
allKeys, err := tc.AllKeysChan(context.Background())
var ks []cid.Cid
for k := range allKeys {
ks = append(ks, k)
}
require.NoError(t, err)
require.ElementsMatch(t, ks, []cid.Cid{b1.Cid(), b2.Cid(), b3.Cid()})
time.Sleep(4 * time.Millisecond)
// should still have b2, and b3, but not b1
has, err = tc.Has(b1.Cid())
require.NoError(t, err)
require.False(t, has)
has, err = tc.Has(b2.Cid())
require.NoError(t, err)
require.True(t, has)
has, err = tc.Has(b3.Cid())
require.NoError(t, err)
require.True(t, has)
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"os"
"time"
"github.com/ipfs/go-bitswap"
"github.com/ipfs/go-bitswap/network"
@ -30,6 +31,8 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/lib/bufbstore"
"github.com/filecoin-project/lotus/lib/timedbs"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
@ -41,8 +44,15 @@ func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt r
bitswapNetwork := network.NewFromIpfsHost(host, rt, network.Prefix("/chain"))
bitswapOptions := []bitswap.Option{bitswap.ProvideEnabled(false)}
// Write all incoming bitswap blocks into a temporary blockstore for two
// block times. If they validate, they'll be persisted later.
cache := timedbs.NewTimedCacheBS(2 * time.Duration(build.BlockDelaySecs) * time.Second)
lc.Append(fx.Hook{OnStop: cache.Stop, OnStart: cache.Start})
bitswapBs := bufbstore.NewTieredBstore(bs, cache)
// Use just exch.Close(), closing the context is not needed
exch := bitswap.New(mctx, bitswapNetwork, bs, bitswapOptions...)
exch := bitswap.New(mctx, bitswapNetwork, bitswapBs, bitswapOptions...)
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return exch.Close()