Merge pull request #1439 from filecoin-project/fix/chain-export

Rewrite chain export walker function to actually do the right thing
This commit is contained in:
Whyrusleeping 2020-03-20 19:55:11 -07:00 committed by GitHub
commit 21255eddee
6 changed files with 120 additions and 32 deletions

View File

@ -30,16 +30,14 @@ import (
lru "github.com/hashicorp/golang-lru"
block "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
car "github.com/ipfs/go-car"
carutil "github.com/ipfs/go-car/util"
"github.com/ipfs/go-cid"
dstore "github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
bstore "github.com/ipfs/go-ipfs-blockstore"
cbor "github.com/ipfs/go-ipld-cbor"
format "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
dag "github.com/ipfs/go-merkledag"
cbg "github.com/whyrusleeping/cbor-gen"
pubsub "github.com/whyrusleeping/pubsub"
"golang.org/x/xerrors"
@ -734,7 +732,7 @@ func (cs *ChainStore) readMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error)
cst := cbor.NewCborStore(cs.bs)
var msgmeta types.MsgMeta
if err := cst.Get(context.TODO(), mmc, &msgmeta); err != nil {
return nil, nil, xerrors.Errorf("failed to load msgmeta: %w", err)
return nil, nil, xerrors.Errorf("failed to load msgmeta (%s): %w", mmc, err)
}
blscids, err := cs.readAMTCids(msgmeta.BlsMessages)
@ -972,14 +970,18 @@ func (cs *ChainStore) GetTipsetByHeight(ctx context.Context, h abi.ChainEpoch, t
}
func recurseLinks(bs blockstore.Blockstore, root cid.Cid, in []cid.Cid) ([]cid.Cid, error) {
if root.Prefix().Codec != cid.DagCBOR {
return in, nil
}
data, err := bs.Get(root)
if err != nil {
return nil, err
return nil, xerrors.Errorf("recurse links get (%s) failed: %w", root, err)
}
top, err := cbg.ScanForLinks(bytes.NewReader(data.RawData()))
if err != nil {
return nil, err
return nil, xerrors.Errorf("scanning for links failed: %w", err)
}
in = append(in, top...)
@ -998,41 +1000,87 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, w io.Writer)
if ts == nil {
ts = cs.GetHeaviestTipSet()
}
bsrv := blockservice.New(cs.bs, nil)
dserv := dag.NewDAGService(bsrv)
return car.WriteCarWithWalker(ctx, dserv, ts.Cids(), w, func(nd format.Node) ([]*format.Link, error) {
var b types.BlockHeader
if err := b.UnmarshalCBOR(bytes.NewBuffer(nd.RawData())); err != nil {
return nil, err
seen := cid.NewSet()
h := &car.CarHeader{
Roots: ts.Cids(),
Version: 1,
}
if err := car.WriteHeader(h, w); err != nil {
return xerrors.Errorf("failed to write car header: %s", err)
}
blocksToWalk := ts.Cids()
walkChain := func(blk cid.Cid) error {
if !seen.Visit(blk) {
return nil
}
var out []*format.Link
for _, p := range b.Parents {
out = append(out, &format.Link{Cid: p})
}
cids, err := recurseLinks(cs.bs, b.Messages, nil)
data, err := cs.bs.Get(blk)
if err != nil {
return nil, err
return xerrors.Errorf("getting block: %w", err)
}
for _, c := range cids {
out = append(out, &format.Link{Cid: c})
if err := carutil.LdWrite(w, blk.Bytes(), data.RawData()); err != nil {
return xerrors.Errorf("failed to write block to car output: %w", err)
}
var b types.BlockHeader
if err := b.UnmarshalCBOR(bytes.NewBuffer(data.RawData())); err != nil {
return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blk, err)
}
for _, p := range b.Parents {
blocksToWalk = append(blocksToWalk, p)
}
cids, err := recurseLinks(cs.bs, b.Messages, []cid.Cid{b.Messages})
if err != nil {
return xerrors.Errorf("recursing messages failed: %w", err)
}
out := cids
if b.Height == 0 {
cids, err := recurseLinks(cs.bs, b.ParentStateRoot, nil)
cids, err := recurseLinks(cs.bs, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot})
if err != nil {
return nil, err
return xerrors.Errorf("recursing genesis state failed: %w", err)
}
for _, c := range cids {
out = append(out, &format.Link{Cid: c})
out = append(out, cids...)
}
for _, c := range out {
if seen.Visit(c) {
if c.Prefix().Codec != cid.DagCBOR {
continue
}
data, err := cs.bs.Get(c)
if err != nil {
return xerrors.Errorf("writing object to car (get %s): %w", c, err)
}
if err := carutil.LdWrite(w, c.Bytes(), data.RawData()); err != nil {
return xerrors.Errorf("failed to write out car object: %w", err)
}
}
}
return out, nil
})
return nil
}
for len(blocksToWalk) > 0 {
next := blocksToWalk[0]
blocksToWalk = blocksToWalk[1:]
if err := walkChain(next); err != nil {
return xerrors.Errorf("walk chain failed: %w", err)
}
}
return nil
}
func (cs *ChainStore) Import(r io.Reader) (*types.TipSet, error) {

View File

@ -1,6 +1,7 @@
package store_test
import (
"bytes"
"context"
"testing"
@ -13,6 +14,8 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/crypto"
datastore "github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)
@ -70,3 +73,37 @@ func BenchmarkGetRandomness(b *testing.B) {
}
}
}
func TestChainExportImport(t *testing.T) {
cg, err := gen.NewGenerator()
if err != nil {
t.Fatal(err)
}
var last *types.TipSet
for i := 0; i < 100; i++ {
ts, err := cg.NextTipSet()
if err != nil {
t.Fatal(err)
}
last = ts.TipSet.TipSet()
}
buf := new(bytes.Buffer)
if err := cg.ChainStore().Export(context.TODO(), last, buf); err != nil {
t.Fatal(err)
}
nbs := blockstore.NewBlockstore(datastore.NewMapDatastore())
cs := store.NewChainStore(nbs, datastore.NewMapDatastore(), nil)
root, err := cs.Import(buf)
if err != nil {
t.Fatal(err)
}
if !root.Equals(last) {
t.Fatal("imported chain differed from exported chain")
}
}

2
go.mod
View File

@ -89,7 +89,7 @@ require (
github.com/multiformats/go-multiaddr-net v0.1.2
github.com/multiformats/go-multihash v0.0.13
github.com/opentracing/opentracing-go v1.1.0
github.com/prometheus/common v0.4.0
github.com/prometheus/common v0.4.0 // indirect
github.com/stretchr/testify v1.4.0
github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba
github.com/whyrusleeping/cbor-gen v0.0.0-20200222160900-51052a1e8191

View File

@ -20,11 +20,11 @@ import (
offline "github.com/ipfs/go-ipfs-exchange-offline"
cbor "github.com/ipfs/go-ipld-cbor"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-path"
"github.com/ipfs/go-path/resolver"
mh "github.com/multiformats/go-multihash"
"github.com/prometheus/common/log"
"go.uber.org/fx"
"golang.org/x/xerrors"
@ -35,6 +35,8 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)
var log = logging.Logger("fullnode")
type ChainAPI struct {
fx.In
@ -424,7 +426,7 @@ func (a *ChainAPI) ChainExport(ctx context.Context, tsk types.TipSetKey) (<-chan
for {
buf := make([]byte, 4096)
n, err := r.Read(buf)
if err != nil {
if err != nil && err != io.EOF {
log.Errorf("chain export pipe read failed: %s", err)
return
}
@ -433,6 +435,9 @@ func (a *ChainAPI) ChainExport(ctx context.Context, tsk types.TipSetKey) (<-chan
case <-ctx.Done():
log.Warnf("export writer failed: %s", ctx.Err())
}
if err == io.EOF {
return
}
}
}()

View File

@ -10,7 +10,6 @@ import (
"github.com/ipfs/go-hamt-ipld"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prometheus/common/log"
cbg "github.com/whyrusleeping/cbor-gen"
"go.uber.org/fx"
"golang.org/x/xerrors"

View File

@ -8,7 +8,6 @@ import (
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/types"
cid "github.com/ipfs/go-cid"
"github.com/prometheus/common/log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"