From 921f11db34e083745ad75c01f40ee83074b094c4 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 20 Mar 2020 17:18:57 -0700 Subject: [PATCH 1/2] Rewrite chain export walker function to actually do the right thing --- chain/store/store.go | 102 +++++++++++++++++++++++++++++----------- node/impl/full/chain.go | 9 +++- node/impl/full/state.go | 1 - node/impl/full/sync.go | 1 - 4 files changed, 82 insertions(+), 31 deletions(-) diff --git a/chain/store/store.go b/chain/store/store.go index f9935a3ec..ae3564901 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -30,16 +30,14 @@ import ( lru "github.com/hashicorp/golang-lru" block "github.com/ipfs/go-block-format" - "github.com/ipfs/go-blockservice" car "github.com/ipfs/go-car" + carutil "github.com/ipfs/go-car/util" "github.com/ipfs/go-cid" dstore "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" bstore "github.com/ipfs/go-ipfs-blockstore" cbor "github.com/ipfs/go-ipld-cbor" - format "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" - dag "github.com/ipfs/go-merkledag" cbg "github.com/whyrusleeping/cbor-gen" pubsub "github.com/whyrusleeping/pubsub" "golang.org/x/xerrors" @@ -734,7 +732,7 @@ func (cs *ChainStore) readMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error) cst := cbor.NewCborStore(cs.bs) var msgmeta types.MsgMeta if err := cst.Get(context.TODO(), mmc, &msgmeta); err != nil { - return nil, nil, xerrors.Errorf("failed to load msgmeta: %w", err) + return nil, nil, xerrors.Errorf("failed to load msgmeta (%s): %w", mmc, err) } blscids, err := cs.readAMTCids(msgmeta.BlsMessages) @@ -972,14 +970,18 @@ func (cs *ChainStore) GetTipsetByHeight(ctx context.Context, h abi.ChainEpoch, t } func recurseLinks(bs blockstore.Blockstore, root cid.Cid, in []cid.Cid) ([]cid.Cid, error) { + if root.Prefix().Codec != cid.DagCBOR { + return in, nil + } + data, err := bs.Get(root) if err != nil { - return nil, err + return nil, xerrors.Errorf("recurse links get (%s) failed: %w", root, err) } top, err := cbg.ScanForLinks(bytes.NewReader(data.RawData())) if err != nil { - return nil, err + return nil, xerrors.Errorf("scanning for links failed: %w", err) } in = append(in, top...) @@ -998,41 +1000,87 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, w io.Writer) if ts == nil { ts = cs.GetHeaviestTipSet() } - bsrv := blockservice.New(cs.bs, nil) - dserv := dag.NewDAGService(bsrv) - return car.WriteCarWithWalker(ctx, dserv, ts.Cids(), w, func(nd format.Node) ([]*format.Link, error) { - var b types.BlockHeader - if err := b.UnmarshalCBOR(bytes.NewBuffer(nd.RawData())); err != nil { - return nil, err + + seen := cid.NewSet() + + h := &car.CarHeader{ + Roots: ts.Cids(), + Version: 1, + } + + if err := car.WriteHeader(h, w); err != nil { + return xerrors.Errorf("failed to write car header: %s", err) + } + + blocksToWalk := ts.Cids() + + walkChain := func(blk cid.Cid) error { + if !seen.Visit(blk) { + return nil } - var out []*format.Link - for _, p := range b.Parents { - out = append(out, &format.Link{Cid: p}) - } - - cids, err := recurseLinks(cs.bs, b.Messages, nil) + data, err := cs.bs.Get(blk) if err != nil { - return nil, err + return xerrors.Errorf("getting block: %w", err) } - for _, c := range cids { - out = append(out, &format.Link{Cid: c}) + if err := carutil.LdWrite(w, blk.Bytes(), data.RawData()); err != nil { + return xerrors.Errorf("failed to write block to car output: %w", err) } + var b types.BlockHeader + if err := b.UnmarshalCBOR(bytes.NewBuffer(data.RawData())); err != nil { + return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blk, err) + } + + for _, p := range b.Parents { + blocksToWalk = append(blocksToWalk, p) + } + + cids, err := recurseLinks(cs.bs, b.Messages, []cid.Cid{b.Messages}) + if err != nil { + return xerrors.Errorf("recursing messages failed: %w", err) + } + + out := cids + if b.Height == 0 { - cids, err := recurseLinks(cs.bs, b.ParentStateRoot, nil) + cids, err := recurseLinks(cs.bs, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot}) if err != nil { - return nil, err + return xerrors.Errorf("recursing genesis state failed: %w", err) } - for _, c := range cids { - out = append(out, &format.Link{Cid: c}) + out = append(out, cids...) + } + + for _, c := range out { + if seen.Visit(c) { + if c.Prefix().Codec != cid.DagCBOR { + continue + } + data, err := cs.bs.Get(c) + if err != nil { + return xerrors.Errorf("writing object to car (get %s): %w", c, err) + } + + if err := carutil.LdWrite(w, c.Bytes(), data.RawData()); err != nil { + return xerrors.Errorf("failed to write out car object: %w", err) + } } } - return out, nil - }) + return nil + } + + for len(blocksToWalk) > 0 { + next := blocksToWalk[0] + blocksToWalk = blocksToWalk[1:] + if err := walkChain(next); err != nil { + return xerrors.Errorf("walk chain failed: %w", err) + } + } + + return nil } func (cs *ChainStore) Import(r io.Reader) (*types.TipSet, error) { diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index 5d65dd278..c1bcfb0f5 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -20,11 +20,11 @@ import ( offline "github.com/ipfs/go-ipfs-exchange-offline" cbor "github.com/ipfs/go-ipld-cbor" ipld "github.com/ipfs/go-ipld-format" + logging "github.com/ipfs/go-log" "github.com/ipfs/go-merkledag" "github.com/ipfs/go-path" "github.com/ipfs/go-path/resolver" mh "github.com/multiformats/go-multihash" - "github.com/prometheus/common/log" "go.uber.org/fx" "golang.org/x/xerrors" @@ -35,6 +35,8 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) +var log = logging.Logger("fullnode") + type ChainAPI struct { fx.In @@ -424,7 +426,7 @@ func (a *ChainAPI) ChainExport(ctx context.Context, tsk types.TipSetKey) (<-chan for { buf := make([]byte, 4096) n, err := r.Read(buf) - if err != nil { + if err != nil && err != io.EOF { log.Errorf("chain export pipe read failed: %s", err) return } @@ -433,6 +435,9 @@ func (a *ChainAPI) ChainExport(ctx context.Context, tsk types.TipSetKey) (<-chan case <-ctx.Done(): log.Warnf("export writer failed: %s", ctx.Err()) } + if err == io.EOF { + return + } } }() diff --git a/node/impl/full/state.go b/node/impl/full/state.go index 0e7ab49c0..49131a02c 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -10,7 +10,6 @@ import ( "github.com/ipfs/go-hamt-ipld" cbor "github.com/ipfs/go-ipld-cbor" "github.com/libp2p/go-libp2p-core/peer" - "github.com/prometheus/common/log" cbg "github.com/whyrusleeping/cbor-gen" "go.uber.org/fx" "golang.org/x/xerrors" diff --git a/node/impl/full/sync.go b/node/impl/full/sync.go index 436e14356..97d23043e 100644 --- a/node/impl/full/sync.go +++ b/node/impl/full/sync.go @@ -8,7 +8,6 @@ import ( "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/types" cid "github.com/ipfs/go-cid" - "github.com/prometheus/common/log" pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/fx" From 6fcfd0a4f0993af94b6568ba112ae352b901e0e2 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 20 Mar 2020 18:15:31 -0700 Subject: [PATCH 2/2] add a test to roundtrip export/import a chain --- chain/store/store_test.go | 37 +++++++++++++++++++++++++++++++++++++ go.mod | 2 +- 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/chain/store/store_test.go b/chain/store/store_test.go index 70513fed9..d182b08f2 100644 --- a/chain/store/store_test.go +++ b/chain/store/store_test.go @@ -1,6 +1,7 @@ package store_test import ( + "bytes" "context" "testing" @@ -13,6 +14,8 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi/big" "github.com/filecoin-project/specs-actors/actors/builtin/power" "github.com/filecoin-project/specs-actors/actors/crypto" + + datastore "github.com/ipfs/go-datastore" blockstore "github.com/ipfs/go-ipfs-blockstore" ) @@ -70,3 +73,37 @@ func BenchmarkGetRandomness(b *testing.B) { } } } + +func TestChainExportImport(t *testing.T) { + cg, err := gen.NewGenerator() + if err != nil { + t.Fatal(err) + } + + var last *types.TipSet + for i := 0; i < 100; i++ { + ts, err := cg.NextTipSet() + if err != nil { + t.Fatal(err) + } + + last = ts.TipSet.TipSet() + } + + buf := new(bytes.Buffer) + if err := cg.ChainStore().Export(context.TODO(), last, buf); err != nil { + t.Fatal(err) + } + + nbs := blockstore.NewBlockstore(datastore.NewMapDatastore()) + cs := store.NewChainStore(nbs, datastore.NewMapDatastore(), nil) + + root, err := cs.Import(buf) + if err != nil { + t.Fatal(err) + } + + if !root.Equals(last) { + t.Fatal("imported chain differed from exported chain") + } +} diff --git a/go.mod b/go.mod index 02a3ebb31..b15a48922 100644 --- a/go.mod +++ b/go.mod @@ -89,7 +89,7 @@ require ( github.com/multiformats/go-multiaddr-net v0.1.2 github.com/multiformats/go-multihash v0.0.13 github.com/opentracing/opentracing-go v1.1.0 - github.com/prometheus/common v0.4.0 + github.com/prometheus/common v0.4.0 // indirect github.com/stretchr/testify v1.4.0 github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba github.com/whyrusleeping/cbor-gen v0.0.0-20200222160900-51052a1e8191