implement chain walking

This commit is contained in:
vyzo 2021-03-13 12:00:28 +02:00
parent 04f2e102a1
commit e3cbeec6ee
2 changed files with 158 additions and 103 deletions

View File

@ -1,6 +1,7 @@
package splitstore
import (
"bytes"
"context"
"encoding/binary"
"errors"
@ -15,6 +16,7 @@ import (
cid "github.com/ipfs/go-cid"
dstore "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
cbg "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/go-state-types/abi"
@ -48,7 +50,7 @@ var (
CompactionCold = build.Finality
// CompactionBoundary is the number of epochs from the current epoch at which
// we will walk the chain for live objects
// we will walk the chain for live objects.
CompactionBoundary = 2 * build.Finality
)
@ -73,7 +75,6 @@ const (
batchSize = 16384
defaultColdPurgeSize = 7_000_000
defaultDeadPurgeSize = 1_000_000
)
type Config struct {
@ -94,7 +95,6 @@ type ChainAccessor interface {
GetTipsetByHeight(context.Context, abi.ChainEpoch, *types.TipSet, bool) (*types.TipSet, error)
GetHeaviestTipSet() *types.TipSet
SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error)
WalkSnapshot(context.Context, *types.TipSet, abi.ChainEpoch, bool, bool, func(cid.Cid) error) error
}
type SplitStore struct {
@ -104,6 +104,7 @@ type SplitStore struct {
baseEpoch abi.ChainEpoch
warmupEpoch abi.ChainEpoch
warm bool
coldPurgeSize int
@ -340,6 +341,7 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
switch err {
case nil:
s.warmupEpoch = bytesToEpoch(bs)
s.warm = true
case dstore.ErrNotFound:
default:
@ -396,7 +398,7 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
return nil
}
if s.warmupEpoch == 0 {
if !s.warm {
// splitstore needs to warm up
go func() {
defer atomic.StoreInt32(&s.compacting, 0)
@ -404,7 +406,17 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
log.Info("warming up hotstore")
start := time.Now()
s.warmup(curTs)
baseTs, err := s.chain.GetTipsetByHeight(context.Background(), s.baseEpoch, curTs, true)
if err != nil {
log.Errorf("error warming up hotstore: error getting tipset at base epoch: %s", err)
return
}
err = s.warmup(baseTs)
if err != nil {
log.Errorf("error warming up hotstore: %s", err)
return
}
log.Infow("warm up done", "took", time.Since(start))
}()
@ -432,14 +444,16 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
return nil
}
func (s *SplitStore) warmup(curTs *types.TipSet) {
func (s *SplitStore) warmup(curTs *types.TipSet) error {
epoch := curTs.Height()
batchHot := make([]blocks.Block, 0, batchSize)
batchSnoop := make([]cid.Cid, 0, batchSize)
count := int64(0)
err := s.chain.WalkSnapshot(context.Background(), curTs, 1, true, true,
xcount := int64(0)
missing := int64(0)
err := s.walk(curTs, epoch,
func(cid cid.Cid) error {
count++
@ -454,9 +468,15 @@ func (s *SplitStore) warmup(curTs *types.TipSet) {
blk, err := s.cold.Get(cid)
if err != nil {
if err == bstore.ErrNotFound {
missing++
return nil
}
return err
}
xcount++
batchHot = append(batchHot, blk)
batchSnoop = append(batchSnoop, cid)
@ -478,39 +498,41 @@ func (s *SplitStore) warmup(curTs *types.TipSet) {
})
if err != nil {
log.Errorf("error warming up splitstore: %s", err)
return
return err
}
if len(batchHot) > 0 {
err = s.tracker.PutBatch(batchSnoop, epoch)
if err != nil {
log.Errorf("error warming up splitstore: %s", err)
return
return err
}
err = s.hot.PutMany(batchHot)
if err != nil {
log.Errorf("error warming up splitstore: %s", err)
return
return err
}
}
log.Infow("warmup stats", "visited", count, "cold", xcount, "missing", missing)
if count > s.markSetSize {
s.markSetSize = count + count>>2 // overestimate a bit
}
// save the warmup epoch
s.warm = true
s.warmupEpoch = epoch
err = s.ds.Put(warmupEpochKey, epochToBytes(epoch))
if err != nil {
log.Errorf("error saving warmup epoch: %s", err)
log.Warnf("error saving warmup epoch: %s", err)
}
err = s.ds.Put(markSetSizeKey, int64ToBytes(s.markSetSize))
if err != nil {
log.Errorf("error saving mark set size: %s", err)
log.Warnf("error saving mark set size: %s", err)
}
return nil
}
// Compaction/GC Algorithm
@ -540,8 +562,10 @@ func (s *SplitStore) compact(curTs *types.TipSet) {
}
func (s *SplitStore) estimateMarkSetSize(curTs *types.TipSet) error {
epoch := curTs.Height()
var count int64
err := s.chain.WalkSnapshot(context.Background(), curTs, 1, true, true,
err := s.walk(curTs, epoch,
func(cid cid.Cid) error {
count++
return nil
@ -578,7 +602,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}
var count int64
err = s.chain.WalkSnapshot(context.Background(), boundaryTs, 1, true, true,
err = s.walk(boundaryTs, boundaryEpoch,
func(cid cid.Cid) error {
count++
return coldSet.Mark(cid)
@ -592,7 +616,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
s.markSetSize = count + count>>2 // overestimate a bit
}
log.Infow("marking done", "took", time.Since(startMark))
log.Infow("marking done", "took", time.Since(startMark), "marked", count)
// 2. move cold unreachable objects to the coldstore
log.Info("collecting cold objects")
@ -700,6 +724,93 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return nil
}
func (s *SplitStore) walk(ts *types.TipSet, boundary abi.ChainEpoch, f func(cid.Cid) error) error {
walked := cid.NewSet()
toWalk := ts.Cids()
walkBlock := func(c cid.Cid) error {
if !walked.Visit(c) {
return nil
}
blk, err := s.Get(c)
if err != nil {
return xerrors.Errorf("error retrieving block (cid: %s): %w", c, err)
}
var hdr types.BlockHeader
if err := hdr.UnmarshalCBOR(bytes.NewBuffer(blk.RawData())); err != nil {
return xerrors.Errorf("error unmarshaling block header (cid: %s): %w", c, err)
}
// don't walk under the boundary
if hdr.Height < boundary {
return nil
}
if err := f(c); err != nil {
return err
}
if err := s.walkLinks(hdr.Messages, walked, f); err != nil {
return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err)
}
if err := s.walkLinks(hdr.ParentStateRoot, walked, f); err != nil {
return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err)
}
toWalk = append(toWalk, hdr.Parents...)
return nil
}
for len(toWalk) > 0 {
walking := toWalk
toWalk = nil
for _, c := range walking {
if err := walkBlock(c); err != nil {
return xerrors.Errorf("error walking block (cid: %s): %w", c, err)
}
}
}
return nil
}
func (s *SplitStore) walkLinks(c cid.Cid, walked *cid.Set, f func(cid.Cid) error) error {
if !walked.Visit(c) {
return nil
}
if c.Prefix().Codec != cid.DagCBOR {
return nil
}
if err := f(c); err != nil {
return err
}
blk, err := s.Get(c)
if err != nil {
return xerrors.Errorf("error retrieving linked block (cid: %s): %w", c, err)
}
var rerr error
err = cbg.ScanForLinks(bytes.NewReader(blk.RawData()), func(c cid.Cid) {
if rerr != nil {
return
}
rerr = s.walkLinks(c, walked, f)
})
if err != nil {
return xerrors.Errorf("error scanning links (cid: %s): %w", c, err)
}
return rerr
}
func (s *SplitStore) moveColdBlocks(cold []cid.Cid) error {
batch := make([]blocks.Block, 0, batchSize)

View File

@ -13,7 +13,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/mock"
cid "github.com/ipfs/go-cid"
blocks "github.com/ipfs/go-block-format"
datastore "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
logging "github.com/ipfs/go-log/v2"
@ -28,16 +28,28 @@ func init() {
func testSplitStore(t *testing.T, cfg *Config) {
chain := &mockChain{t: t}
// genesis
genBlock := mock.MkBlock(nil, 0, 0)
genTs := mock.TipSet(genBlock)
chain.push(genTs)
// the myriads of stores
ds := dssync.MutexWrap(datastore.NewMapDatastore())
hot := blockstore.NewMemorySync()
cold := blockstore.NewMemorySync()
// this is necessary to avoid the garbage mock puts in the blocks
garbage := blocks.NewBlock([]byte{1, 2, 3})
err := cold.Put(garbage)
if err != nil {
t.Fatal(err)
}
// genesis
genBlock := mock.MkBlock(nil, 0, 0)
genBlock.Messages = garbage.Cid()
genBlock.ParentMessageReceipts = garbage.Cid()
genBlock.ParentStateRoot = garbage.Cid()
genTs := mock.TipSet(genBlock)
chain.push(genTs)
// put the genesis block to cold store
blk, err := genBlock.ToStorageBlock()
if err != nil {
@ -64,6 +76,11 @@ func testSplitStore(t *testing.T, cfg *Config) {
// make some tipsets, but not enough to cause compaction
mkBlock := func(curTs *types.TipSet, i int) *types.TipSet {
blk := mock.MkBlock(curTs, uint64(i), uint64(i))
blk.Messages = garbage.Cid()
blk.ParentMessageReceipts = garbage.Cid()
blk.ParentStateRoot = garbage.Cid()
sblk, err := blk.ToStorageBlock()
if err != nil {
t.Fatal(err)
@ -78,18 +95,6 @@ func testSplitStore(t *testing.T, cfg *Config) {
return ts
}
mkGarbageBlock := func(curTs *types.TipSet, i int) {
blk := mock.MkBlock(curTs, uint64(i), uint64(i))
sblk, err := blk.ToStorageBlock()
if err != nil {
t.Fatal(err)
}
err = ss.Put(sblk)
if err != nil {
t.Fatal(err)
}
}
waitForCompaction := func() {
for atomic.LoadInt32(&ss.compacting) == 1 {
time.Sleep(100 * time.Millisecond)
@ -102,8 +107,6 @@ func testSplitStore(t *testing.T, cfg *Config) {
waitForCompaction()
}
mkGarbageBlock(genTs, 1)
// count objects in the cold and hot stores
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -123,8 +126,8 @@ func testSplitStore(t *testing.T, cfg *Config) {
coldCnt := countBlocks(cold)
hotCnt := countBlocks(hot)
if coldCnt != 1 {
t.Errorf("expected %d blocks, but got %d", 1, coldCnt)
if coldCnt != 2 {
t.Errorf("expected %d blocks, but got %d", 2, coldCnt)
}
if hotCnt != 5 {
@ -140,34 +143,12 @@ func testSplitStore(t *testing.T, cfg *Config) {
coldCnt = countBlocks(cold)
hotCnt = countBlocks(hot)
if !cfg.EnableFullCompaction {
if coldCnt != 5 {
t.Errorf("expected %d cold blocks, but got %d", 5, coldCnt)
}
if hotCnt != 5 {
t.Errorf("expected %d hot blocks, but got %d", 5, hotCnt)
}
if coldCnt != 7 {
t.Errorf("expected %d cold blocks, but got %d", 7, coldCnt)
}
if cfg.EnableFullCompaction && !cfg.EnableGC {
if coldCnt != 3 {
t.Errorf("expected %d cold blocks, but got %d", 3, coldCnt)
}
if hotCnt != 7 {
t.Errorf("expected %d hot blocks, but got %d", 7, hotCnt)
}
}
if cfg.EnableFullCompaction && cfg.EnableGC {
if coldCnt != 2 {
t.Errorf("expected %d cold blocks, but got %d", 2, coldCnt)
}
if hotCnt != 7 {
t.Errorf("expected %d hot blocks, but got %d", 7, hotCnt)
}
if hotCnt != 4 {
t.Errorf("expected %d hot blocks, but got %d", 4, hotCnt)
}
// Make sure we can revert without panicking.
@ -178,21 +159,6 @@ func TestSplitStoreSimpleCompaction(t *testing.T) {
testSplitStore(t, &Config{TrackingStoreType: "mem"})
}
func TestSplitStoreFullCompactionWithoutGC(t *testing.T) {
testSplitStore(t, &Config{
TrackingStoreType: "mem",
EnableFullCompaction: true,
})
}
func TestSplitStoreFullCompactionWithGC(t *testing.T) {
testSplitStore(t, &Config{
TrackingStoreType: "mem",
EnableFullCompaction: true,
EnableGC: true,
})
}
type mockChain struct {
t testing.TB
@ -242,7 +208,7 @@ func (c *mockChain) GetTipsetByHeight(_ context.Context, epoch abi.ChainEpoch, _
return nil, fmt.Errorf("bad epoch %d", epoch)
}
return c.tipsets[iEpoch-1], nil
return c.tipsets[iEpoch], nil
}
func (c *mockChain) GetHeaviestTipSet() *types.TipSet {
@ -255,25 +221,3 @@ func (c *mockChain) GetHeaviestTipSet() *types.TipSet {
func (c *mockChain) SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error) {
c.listener = change
}
func (c *mockChain) WalkSnapshot(_ context.Context, ts *types.TipSet, epochs abi.ChainEpoch, _ bool, _ bool, f func(cid.Cid) error) error {
c.Lock()
defer c.Unlock()
start := int(ts.Height()) - 1
end := start - int(epochs)
if end < 0 {
end = -1
}
for i := start; i > end; i-- {
ts := c.tipsets[i]
for _, cid := range ts.Cids() {
err := f(cid)
if err != nil {
return err
}
}
}
return nil
}