Rewrite chain export walker function to actually do the right thing

This commit is contained in:
Jeromy 2020-03-20 17:18:57 -07:00
parent aad449f28a
commit 921f11db34
4 changed files with 82 additions and 31 deletions

View File

@ -30,16 +30,14 @@ import (
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
block "github.com/ipfs/go-block-format" block "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
car "github.com/ipfs/go-car" car "github.com/ipfs/go-car"
carutil "github.com/ipfs/go-car/util"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
dstore "github.com/ipfs/go-datastore" dstore "github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
bstore "github.com/ipfs/go-ipfs-blockstore" bstore "github.com/ipfs/go-ipfs-blockstore"
cbor "github.com/ipfs/go-ipld-cbor" cbor "github.com/ipfs/go-ipld-cbor"
format "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
dag "github.com/ipfs/go-merkledag"
cbg "github.com/whyrusleeping/cbor-gen" cbg "github.com/whyrusleeping/cbor-gen"
pubsub "github.com/whyrusleeping/pubsub" pubsub "github.com/whyrusleeping/pubsub"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -734,7 +732,7 @@ func (cs *ChainStore) readMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error)
cst := cbor.NewCborStore(cs.bs) cst := cbor.NewCborStore(cs.bs)
var msgmeta types.MsgMeta var msgmeta types.MsgMeta
if err := cst.Get(context.TODO(), mmc, &msgmeta); err != nil { if err := cst.Get(context.TODO(), mmc, &msgmeta); err != nil {
return nil, nil, xerrors.Errorf("failed to load msgmeta: %w", err) return nil, nil, xerrors.Errorf("failed to load msgmeta (%s): %w", mmc, err)
} }
blscids, err := cs.readAMTCids(msgmeta.BlsMessages) blscids, err := cs.readAMTCids(msgmeta.BlsMessages)
@ -972,14 +970,18 @@ func (cs *ChainStore) GetTipsetByHeight(ctx context.Context, h abi.ChainEpoch, t
} }
func recurseLinks(bs blockstore.Blockstore, root cid.Cid, in []cid.Cid) ([]cid.Cid, error) { func recurseLinks(bs blockstore.Blockstore, root cid.Cid, in []cid.Cid) ([]cid.Cid, error) {
if root.Prefix().Codec != cid.DagCBOR {
return in, nil
}
data, err := bs.Get(root) data, err := bs.Get(root)
if err != nil { if err != nil {
return nil, err return nil, xerrors.Errorf("recurse links get (%s) failed: %w", root, err)
} }
top, err := cbg.ScanForLinks(bytes.NewReader(data.RawData())) top, err := cbg.ScanForLinks(bytes.NewReader(data.RawData()))
if err != nil { if err != nil {
return nil, err return nil, xerrors.Errorf("scanning for links failed: %w", err)
} }
in = append(in, top...) in = append(in, top...)
@ -998,41 +1000,87 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, w io.Writer)
if ts == nil { if ts == nil {
ts = cs.GetHeaviestTipSet() ts = cs.GetHeaviestTipSet()
} }
bsrv := blockservice.New(cs.bs, nil)
dserv := dag.NewDAGService(bsrv) seen := cid.NewSet()
return car.WriteCarWithWalker(ctx, dserv, ts.Cids(), w, func(nd format.Node) ([]*format.Link, error) {
var b types.BlockHeader h := &car.CarHeader{
if err := b.UnmarshalCBOR(bytes.NewBuffer(nd.RawData())); err != nil { Roots: ts.Cids(),
return nil, err Version: 1,
} }
var out []*format.Link if err := car.WriteHeader(h, w); err != nil {
for _, p := range b.Parents { return xerrors.Errorf("failed to write car header: %s", err)
out = append(out, &format.Link{Cid: p})
} }
cids, err := recurseLinks(cs.bs, b.Messages, nil) blocksToWalk := ts.Cids()
walkChain := func(blk cid.Cid) error {
if !seen.Visit(blk) {
return nil
}
data, err := cs.bs.Get(blk)
if err != nil { if err != nil {
return nil, err return xerrors.Errorf("getting block: %w", err)
} }
for _, c := range cids { if err := carutil.LdWrite(w, blk.Bytes(), data.RawData()); err != nil {
out = append(out, &format.Link{Cid: c}) return xerrors.Errorf("failed to write block to car output: %w", err)
} }
var b types.BlockHeader
if err := b.UnmarshalCBOR(bytes.NewBuffer(data.RawData())); err != nil {
return xerrors.Errorf("unmarshaling block header (cid=%s): %w", blk, err)
}
for _, p := range b.Parents {
blocksToWalk = append(blocksToWalk, p)
}
cids, err := recurseLinks(cs.bs, b.Messages, []cid.Cid{b.Messages})
if err != nil {
return xerrors.Errorf("recursing messages failed: %w", err)
}
out := cids
if b.Height == 0 { if b.Height == 0 {
cids, err := recurseLinks(cs.bs, b.ParentStateRoot, nil) cids, err := recurseLinks(cs.bs, b.ParentStateRoot, []cid.Cid{b.ParentStateRoot})
if err != nil { if err != nil {
return nil, err return xerrors.Errorf("recursing genesis state failed: %w", err)
} }
for _, c := range cids { out = append(out, cids...)
out = append(out, &format.Link{Cid: c}) }
for _, c := range out {
if seen.Visit(c) {
if c.Prefix().Codec != cid.DagCBOR {
continue
}
data, err := cs.bs.Get(c)
if err != nil {
return xerrors.Errorf("writing object to car (get %s): %w", c, err)
}
if err := carutil.LdWrite(w, c.Bytes(), data.RawData()); err != nil {
return xerrors.Errorf("failed to write out car object: %w", err)
}
} }
} }
return out, nil return nil
}) }
for len(blocksToWalk) > 0 {
next := blocksToWalk[0]
blocksToWalk = blocksToWalk[1:]
if err := walkChain(next); err != nil {
return xerrors.Errorf("walk chain failed: %w", err)
}
}
return nil
} }
func (cs *ChainStore) Import(r io.Reader) (*types.TipSet, error) { func (cs *ChainStore) Import(r io.Reader) (*types.TipSet, error) {

View File

@ -20,11 +20,11 @@ import (
offline "github.com/ipfs/go-ipfs-exchange-offline" offline "github.com/ipfs/go-ipfs-exchange-offline"
cbor "github.com/ipfs/go-ipld-cbor" cbor "github.com/ipfs/go-ipld-cbor"
ipld "github.com/ipfs/go-ipld-format" ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-merkledag" "github.com/ipfs/go-merkledag"
"github.com/ipfs/go-path" "github.com/ipfs/go-path"
"github.com/ipfs/go-path/resolver" "github.com/ipfs/go-path/resolver"
mh "github.com/multiformats/go-multihash" mh "github.com/multiformats/go-multihash"
"github.com/prometheus/common/log"
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -35,6 +35,8 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
var log = logging.Logger("fullnode")
type ChainAPI struct { type ChainAPI struct {
fx.In fx.In
@ -424,7 +426,7 @@ func (a *ChainAPI) ChainExport(ctx context.Context, tsk types.TipSetKey) (<-chan
for { for {
buf := make([]byte, 4096) buf := make([]byte, 4096)
n, err := r.Read(buf) n, err := r.Read(buf)
if err != nil { if err != nil && err != io.EOF {
log.Errorf("chain export pipe read failed: %s", err) log.Errorf("chain export pipe read failed: %s", err)
return return
} }
@ -433,6 +435,9 @@ func (a *ChainAPI) ChainExport(ctx context.Context, tsk types.TipSetKey) (<-chan
case <-ctx.Done(): case <-ctx.Done():
log.Warnf("export writer failed: %s", ctx.Err()) log.Warnf("export writer failed: %s", ctx.Err())
} }
if err == io.EOF {
return
}
} }
}() }()

View File

@ -10,7 +10,6 @@ import (
"github.com/ipfs/go-hamt-ipld" "github.com/ipfs/go-hamt-ipld"
cbor "github.com/ipfs/go-ipld-cbor" cbor "github.com/ipfs/go-ipld-cbor"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/prometheus/common/log"
cbg "github.com/whyrusleeping/cbor-gen" cbg "github.com/whyrusleeping/cbor-gen"
"go.uber.org/fx" "go.uber.org/fx"
"golang.org/x/xerrors" "golang.org/x/xerrors"

View File

@ -8,7 +8,6 @@ import (
"github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
"github.com/prometheus/common/log"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx" "go.uber.org/fx"