lotus/node/impl/full/chain.go

751 lines
20 KiB
Go
Raw Normal View History

2019-08-20 16:48:33 +00:00
package full
import (
2020-09-03 19:21:10 +00:00
"bufio"
"bytes"
2019-08-20 16:48:33 +00:00
"context"
"encoding/json"
"fmt"
2020-01-20 23:51:02 +00:00
"io"
"math"
"os"
Chain ranged export: rework and address current shortcomings This commit moderately refactors the ranged export code. It addresses several problems: * Code does not finish cleanly and things hang on ctrl-c * Same block is read multiple times in a row (artificially increasing cached blockstore metrics to 50%) * It is unclear whether there are additional races (a single worker quits when reaching height 0) * CARs produced have duplicated blocks (~400k for an 80M-blocks CAR or so). Some blocks appear up to 5 times. * Using pointers for tasks where it is not necessary. The changes: * Use a FIFO instead of stack: simpler implementation as its own type. This has not proven to be much more memory-friendly, but it has not made things worse either. * We avoid a probably not small amount of allocations by not using unnecessary pointers. * Fix duplicated blocks by atomically checking+adding to CID set. * Context-termination now works correctly. Worker lifetime is correctly tracked and all channels are closed, avoiding any memory leaks and deadlocks. * We ensure all work is finished before finishing, something that might have been broken in some edge cases previously. In practice, we would not have seen this except perhaps in very early snapshots close to genesis. Initial testing shows the code is currently about 5% faster. Resulting snapshots do not have duplicates so they are a bit smaller. We have manually verified that no CID is lost versus previous results, with both old and recent snapshots.
2023-02-02 16:51:52 +00:00
"path/filepath"
"strconv"
"strings"
2020-03-04 23:52:28 +00:00
"sync"
"time"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
offline "github.com/ipfs/go-ipfs-exchange-offline"
cbor "github.com/ipfs/go-ipld-cbor"
ipld "github.com/ipfs/go-ipld-format"
blocks "github.com/ipfs/go-libipfs/blocks"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-merkledag"
mh "github.com/multiformats/go-multihash"
cbg "github.com/whyrusleeping/cbor-gen"
2022-06-14 15:00:51 +00:00
"go.uber.org/fx"
"golang.org/x/xerrors"
2019-08-20 16:48:33 +00:00
"github.com/filecoin-project/go-address"
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/actors/util/adt"
2019-12-18 15:37:47 +00:00
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/blockstore"
2022-06-14 15:00:51 +00:00
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
2022-03-03 14:14:12 +00:00
"github.com/filecoin-project/lotus/lib/oldpath"
"github.com/filecoin-project/lotus/lib/oldpath/oldresolver"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
2019-08-20 16:48:33 +00:00
)
var log = logging.Logger("fullnode")
type ChainModuleAPI interface {
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainHasObj(context.Context, cid.Cid) (bool, error)
ChainHead(context.Context) (*types.TipSet, error)
ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error)
ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
2020-10-02 14:14:30 +00:00
ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
ChainGetTipSetAfterHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*api.HeadChange, error)
}
2021-04-03 11:20:50 +00:00
var _ ChainModuleAPI = *new(api.FullNode)
// ChainModule provides a default implementation of ChainModuleAPI.
// It can be swapped out with another implementation through Dependency
// Injection (for example with a thin RPC client).
type ChainModule struct {
fx.In
Chain *store.ChainStore
// ExposedBlockstore is the global monolith blockstore that is safe to
// expose externally. In the future, this will be segregated into two
// blockstores.
ExposedBlockstore dtypes.ExposedBlockstore
}
var _ ChainModuleAPI = (*ChainModule)(nil)
2019-08-20 16:48:33 +00:00
type ChainAPI struct {
fx.In
2019-08-27 00:46:39 +00:00
WalletAPI
ChainModuleAPI
2019-08-27 00:46:39 +00:00
2021-09-02 16:07:23 +00:00
Chain *store.ChainStore
TsExec stmgr.Executor
// ExposedBlockstore is the global monolith blockstore that is safe to
// expose externally. In the future, this will be segregated into two
// blockstores.
ExposedBlockstore dtypes.ExposedBlockstore
2021-07-25 08:14:48 +00:00
// BaseBlockstore is the underlying blockstore
2021-07-25 08:14:48 +00:00
BaseBlockstore dtypes.BaseBlockstore
Repo repo.LockedRepo
2019-08-20 16:48:33 +00:00
}
func (m *ChainModule) ChainNotify(ctx context.Context) (<-chan []*api.HeadChange, error) {
return m.Chain.SubHeadChanges(ctx), nil
2019-08-20 16:48:33 +00:00
}
func (m *ChainModule) ChainHead(context.Context) (*types.TipSet, error) {
return m.Chain.GetHeaviestTipSet(), nil
2019-08-20 16:48:33 +00:00
}
func (a *ChainAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*types.BlockHeader, error) {
2021-12-11 21:03:00 +00:00
return a.Chain.GetBlock(ctx, msg)
2019-08-20 16:48:33 +00:00
}
func (m *ChainModule) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) {
2021-12-11 21:03:00 +00:00
return m.Chain.LoadTipSet(ctx, key)
2019-10-11 06:25:25 +00:00
}
func (m *ChainModule) ChainGetPath(ctx context.Context, from, to types.TipSetKey) ([]*api.HeadChange, error) {
return m.Chain.GetPath(ctx, from, to)
}
func (m *ChainModule) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api.BlockMessages, error) {
2021-12-11 21:03:00 +00:00
b, err := m.Chain.GetBlock(ctx, msg)
2019-08-20 16:48:33 +00:00
if err != nil {
return nil, err
}
2021-12-17 09:42:09 +00:00
bmsgs, smsgs, err := m.Chain.MessagesForBlock(ctx, b)
2019-08-20 16:48:33 +00:00
if err != nil {
return nil, err
}
2019-09-23 11:15:16 +00:00
cids := make([]cid.Cid, len(bmsgs)+len(smsgs))
for i, m := range bmsgs {
cids[i] = m.Cid()
}
for i, m := range smsgs {
cids[i+len(bmsgs)] = m.Cid()
}
2019-08-20 16:48:33 +00:00
return &api.BlockMessages{
BlsMessages: bmsgs,
SecpkMessages: smsgs,
2019-09-23 11:15:16 +00:00
Cids: cids,
2019-08-20 16:48:33 +00:00
}, nil
}
func (a *ChainAPI) ChainGetPath(ctx context.Context, from types.TipSetKey, to types.TipSetKey) ([]*api.HeadChange, error) {
return a.Chain.GetPath(ctx, from, to)
}
func (a *ChainAPI) ChainGetParentMessages(ctx context.Context, bcid cid.Cid) ([]api.Message, error) {
2021-12-11 21:03:00 +00:00
b, err := a.Chain.GetBlock(ctx, bcid)
2019-08-20 16:48:33 +00:00
if err != nil {
return nil, err
}
// genesis block has no parent messages...
if b.Height == 0 {
return nil, nil
}
2019-08-20 16:48:33 +00:00
// TODO: need to get the number of messages better than this
2021-12-11 21:03:00 +00:00
pts, err := a.Chain.LoadTipSet(ctx, types.NewTipSetKey(b.Parents...))
if err != nil {
return nil, err
}
2021-12-17 09:42:09 +00:00
cm, err := a.Chain.MessagesForTipset(ctx, pts)
if err != nil {
return nil, err
}
var out []api.Message
for _, m := range cm {
out = append(out, api.Message{
Cid: m.Cid(),
Message: m.VMMessage(),
})
}
return out, nil
}
func (a *ChainAPI) ChainGetParentReceipts(ctx context.Context, bcid cid.Cid) ([]*types.MessageReceipt, error) {
2021-12-11 21:03:00 +00:00
b, err := a.Chain.GetBlock(ctx, bcid)
if err != nil {
return nil, err
}
if b.Height == 0 {
return nil, nil
}
// TODO: need to get the number of messages better than this
2021-12-11 21:03:00 +00:00
pts, err := a.Chain.LoadTipSet(ctx, types.NewTipSetKey(b.Parents...))
if err != nil {
return nil, err
}
2021-12-17 09:42:09 +00:00
cm, err := a.Chain.MessagesForTipset(ctx, pts)
2019-08-20 16:48:33 +00:00
if err != nil {
return nil, err
}
var out []*types.MessageReceipt
for i := 0; i < len(cm); i++ {
2021-12-17 09:42:09 +00:00
r, err := a.Chain.GetParentReceipt(ctx, b, i)
2019-08-20 16:48:33 +00:00
if err != nil {
return nil, err
}
out = append(out, r)
}
return out, nil
}
2021-07-01 03:20:22 +00:00
func (a *ChainAPI) ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error) {
2021-12-11 21:03:00 +00:00
ts, err := a.Chain.GetTipSetFromKey(ctx, tsk)
2021-07-01 03:20:22 +00:00
if err != nil {
return nil, err
}
// genesis block has no parent messages...
if ts.Height() == 0 {
return nil, nil
}
2021-12-17 09:42:09 +00:00
cm, err := a.Chain.MessagesForTipset(ctx, ts)
2021-07-01 03:20:22 +00:00
if err != nil {
return nil, err
}
var out []api.Message
for _, m := range cm {
out = append(out, api.Message{
Cid: m.Cid(),
Message: m.VMMessage(),
})
}
return out, nil
}
2020-10-02 14:14:30 +00:00
func (m *ChainModule) ChainGetTipSetByHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
2021-12-11 21:03:00 +00:00
ts, err := m.Chain.GetTipSetFromKey(ctx, tsk)
if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
2020-10-02 14:14:30 +00:00
return m.Chain.GetTipsetByHeight(ctx, h, ts, true)
}
func (m *ChainModule) ChainGetTipSetAfterHeight(ctx context.Context, h abi.ChainEpoch, tsk types.TipSetKey) (*types.TipSet, error) {
2021-12-11 21:03:00 +00:00
ts, err := m.Chain.GetTipSetFromKey(ctx, tsk)
if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
return m.Chain.GetTipsetByHeight(ctx, h, ts, false)
}
func (m *ChainModule) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, error) {
2021-12-11 21:03:00 +00:00
blk, err := m.ExposedBlockstore.Get(ctx, obj)
if err != nil {
return nil, xerrors.Errorf("blockstore get: %w", err)
}
return blk.RawData(), nil
}
2019-10-10 03:50:50 +00:00
func (a *ChainAPI) ChainPutObj(ctx context.Context, obj blocks.Block) error {
return a.ExposedBlockstore.Put(ctx, obj)
}
2020-09-10 17:35:06 +00:00
func (a *ChainAPI) ChainDeleteObj(ctx context.Context, obj cid.Cid) error {
2021-12-11 21:03:00 +00:00
return a.ExposedBlockstore.DeleteBlock(ctx, obj)
2020-09-10 17:35:06 +00:00
}
func (m *ChainModule) ChainHasObj(ctx context.Context, obj cid.Cid) (bool, error) {
2021-12-11 21:03:00 +00:00
return m.ExposedBlockstore.Has(ctx, obj)
2020-02-04 02:45:20 +00:00
}
2020-03-04 23:52:28 +00:00
func (a *ChainAPI) ChainStatObj(ctx context.Context, obj cid.Cid, base cid.Cid) (api.ObjStat, error) {
bs := a.ExposedBlockstore
2020-03-04 23:52:28 +00:00
bsvc := blockservice.New(bs, offline.Exchange(bs))
dag := merkledag.NewDAGService(bsvc)
seen := cid.NewSet()
var statslk sync.Mutex
var stats api.ObjStat
var collect = true
walker := func(ctx context.Context, c cid.Cid) ([]*ipld.Link, error) {
if c.Prefix().Codec == cid.FilCommitmentSealed || c.Prefix().Codec == cid.FilCommitmentUnsealed {
2020-03-04 23:52:28 +00:00
return []*ipld.Link{}, nil
}
nd, err := dag.Get(ctx, c)
if err != nil {
return nil, err
}
if collect {
s := uint64(len(nd.RawData()))
statslk.Lock()
stats.Size = stats.Size + s
stats.Links = stats.Links + 1
statslk.Unlock()
}
return nd.Links(), nil
}
if base != cid.Undef {
collect = false
if err := merkledag.Walk(ctx, walker, base, seen.Visit, merkledag.Concurrent()); err != nil {
return api.ObjStat{}, err
}
collect = true
}
if err := merkledag.Walk(ctx, walker, obj, seen.Visit, merkledag.Concurrent()); err != nil {
return api.ObjStat{}, err
}
return stats, nil
}
func (a *ChainAPI) ChainSetHead(ctx context.Context, tsk types.TipSetKey) error {
2021-12-11 21:03:00 +00:00
newHeadTs, err := a.Chain.GetTipSetFromKey(ctx, tsk)
if err != nil {
return xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
2020-10-08 01:45:41 +00:00
currentTs, err := a.ChainHead(ctx)
if err != nil {
return xerrors.Errorf("getting head: %w", err)
}
for currentTs.Height() >= newHeadTs.Height() {
for _, blk := range currentTs.Key().Cids() {
err = a.Chain.UnmarkBlockAsValidated(ctx, blk)
if err != nil {
return xerrors.Errorf("unmarking block as validated %s: %w", blk, err)
}
}
currentTs, err = a.ChainGetTipSet(ctx, currentTs.Parents())
if err != nil {
return xerrors.Errorf("loading tipset: %w", err)
}
}
2021-12-11 21:03:00 +00:00
return a.Chain.SetHead(ctx, newHeadTs)
2019-10-10 03:50:50 +00:00
}
2019-10-11 02:14:22 +00:00
func (a *ChainAPI) ChainGetGenesis(ctx context.Context) (*types.TipSet, error) {
2021-12-11 21:03:00 +00:00
genb, err := a.Chain.GetGenesis(ctx)
2019-10-11 02:14:22 +00:00
if err != nil {
return nil, err
}
return types.NewTipSet([]*types.BlockHeader{genb})
}
2019-10-15 05:00:30 +00:00
func (a *ChainAPI) ChainTipSetWeight(ctx context.Context, tsk types.TipSetKey) (types.BigInt, error) {
2021-12-11 21:03:00 +00:00
ts, err := a.Chain.GetTipSetFromKey(ctx, tsk)
if err != nil {
return types.EmptyInt, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
2019-10-15 05:00:30 +00:00
return a.Chain.Weight(ctx, ts)
}
// This allows us to lookup string keys in the actor's adt.Map type.
type stringKey string
func (s stringKey) Key() string {
return (string)(s)
}
2020-09-28 21:25:58 +00:00
// TODO: ActorUpgrade: this entire function is a problem (in theory) as we don't know the HAMT version.
// In practice, hamt v0 should work "just fine" for reading.
2021-09-02 16:07:23 +00:00
func resolveOnce(bs blockstore.Blockstore, tse stmgr.Executor) func(ctx context.Context, ds ipld.NodeGetter, nd ipld.Node, names []string) (*ipld.Link, []string, error) {
return func(ctx context.Context, ds ipld.NodeGetter, nd ipld.Node, names []string) (*ipld.Link, []string, error) {
store := adt.WrapStore(ctx, cbor.NewCborStore(bs))
if strings.HasPrefix(names[0], "@Ha:") {
addr, err := address.NewFromString(names[0][4:])
if err != nil {
return nil, nil, xerrors.Errorf("parsing addr: %w", err)
}
names[0] = "@H:" + string(addr.Bytes())
}
2020-03-09 03:09:45 +00:00
if strings.HasPrefix(names[0], "@Hi:") {
i, err := strconv.ParseInt(names[0][4:], 10, 64)
if err != nil {
return nil, nil, xerrors.Errorf("parsing int64: %w", err)
}
ik := abi.IntKey(i)
2020-03-09 03:09:45 +00:00
names[0] = "@H:" + ik.Key()
}
2020-04-04 01:49:42 +00:00
if strings.HasPrefix(names[0], "@Hu:") {
i, err := strconv.ParseUint(names[0][4:], 10, 64)
if err != nil {
return nil, nil, xerrors.Errorf("parsing uint64: %w", err)
2020-04-04 01:49:42 +00:00
}
ik := abi.UIntKey(i)
2020-04-04 01:49:42 +00:00
names[0] = "@H:" + ik.Key()
}
if strings.HasPrefix(names[0], "@H:") {
h, err := adt.AsMap(store, nd.Cid())
if err != nil {
return nil, nil, xerrors.Errorf("resolving hamt link: %w", err)
}
var deferred cbg.Deferred
if found, err := h.Get(stringKey(names[0][3:]), &deferred); err != nil {
return nil, nil, xerrors.Errorf("resolve hamt: %w", err)
} else if !found {
return nil, nil, xerrors.Errorf("resolve hamt: not found")
}
var m interface{}
if err := cbor.DecodeInto(deferred.Raw, &m); err != nil {
return nil, nil, xerrors.Errorf("failed to decode cbor object: %w", err)
}
if c, ok := m.(cid.Cid); ok {
return &ipld.Link{
Name: names[0][3:],
Cid: c,
}, names[1:], nil
}
n, err := cbor.WrapObject(m, mh.SHA2_256, 32)
if err != nil {
return nil, nil, err
}
2021-12-11 21:03:00 +00:00
if err := bs.Put(ctx, n); err != nil {
return nil, nil, xerrors.Errorf("put hamt val: %w", err)
}
if len(names) == 1 {
return &ipld.Link{
Name: names[0][3:],
Cid: n.Cid(),
}, nil, nil
}
2021-09-02 16:07:23 +00:00
return resolveOnce(bs, tse)(ctx, ds, n, names[1:])
}
if strings.HasPrefix(names[0], "@A:") {
a, err := adt.AsArray(store, nd.Cid())
if err != nil {
return nil, nil, xerrors.Errorf("load amt: %w", err)
}
idx, err := strconv.ParseUint(names[0][3:], 10, 64)
if err != nil {
return nil, nil, xerrors.Errorf("parsing amt index: %w", err)
}
var deferred cbg.Deferred
if found, err := a.Get(idx, &deferred); err != nil {
return nil, nil, xerrors.Errorf("resolve amt: %w", err)
} else if !found {
return nil, nil, xerrors.Errorf("resolve amt: not found")
}
var m interface{}
if err := cbor.DecodeInto(deferred.Raw, &m); err != nil {
return nil, nil, xerrors.Errorf("failed to decode cbor object: %w", err)
}
if c, ok := m.(cid.Cid); ok {
return &ipld.Link{
Name: names[0][3:],
Cid: c,
}, names[1:], nil
}
n, err := cbor.WrapObject(m, mh.SHA2_256, 32)
if err != nil {
return nil, nil, err
}
2021-12-11 21:03:00 +00:00
if err := bs.Put(ctx, n); err != nil {
return nil, nil, xerrors.Errorf("put amt val: %w", err)
}
if len(names) == 1 {
return &ipld.Link{
Name: names[0][3:],
Size: 0,
Cid: n.Cid(),
}, nil, nil
}
2021-09-02 16:07:23 +00:00
return resolveOnce(bs, tse)(ctx, ds, n, names[1:])
}
if names[0] == "@state" {
var act types.Actor
if err := act.UnmarshalCBOR(bytes.NewReader(nd.RawData())); err != nil {
return nil, nil, xerrors.Errorf("unmarshaling actor struct for @state: %w", err)
}
head, err := ds.Get(ctx, act.Head)
if err != nil {
return nil, nil, xerrors.Errorf("getting actor head for @state: %w", err)
}
2021-09-02 16:07:23 +00:00
m, err := vm.DumpActorState(tse.NewActorRegistry(), &act, head.RawData())
if err != nil {
return nil, nil, err
}
// a hack to workaround struct aliasing in refmt
ms := map[string]interface{}{}
{
mstr, err := json.Marshal(m)
if err != nil {
return nil, nil, err
}
if err := json.Unmarshal(mstr, &ms); err != nil {
return nil, nil, err
}
}
n, err := cbor.WrapObject(ms, mh.SHA2_256, 32)
if err != nil {
return nil, nil, err
}
2021-12-11 21:03:00 +00:00
if err := bs.Put(ctx, n); err != nil {
return nil, nil, xerrors.Errorf("put amt val: %w", err)
}
if len(names) == 1 {
return &ipld.Link{
Name: "state",
Size: 0,
Cid: n.Cid(),
}, nil, nil
}
2021-09-02 16:07:23 +00:00
return resolveOnce(bs, tse)(ctx, ds, n, names[1:])
}
return nd.ResolveLink(names)
}
}
2020-03-07 02:47:19 +00:00
func (a *ChainAPI) ChainGetNode(ctx context.Context, p string) (*api.IpldObject, error) {
2022-03-03 14:14:12 +00:00
ip, err := oldpath.ParsePath(p)
if err != nil {
return nil, xerrors.Errorf("parsing path: %w", err)
}
bs := a.ExposedBlockstore
bsvc := blockservice.New(bs, offline.Exchange(bs))
2022-03-03 14:14:12 +00:00
dag := merkledag.NewDAGService(bsvc)
r := &oldresolver.Resolver{
DAG: dag,
ResolveOnce: resolveOnce(bs, a.TsExec),
}
2022-03-03 14:14:12 +00:00
node, err := r.ResolvePath(ctx, ip)
if err != nil {
return nil, err
}
2020-03-07 02:47:19 +00:00
return &api.IpldObject{
2022-03-03 14:14:12 +00:00
Cid: node.Cid(),
2020-03-07 02:47:19 +00:00
Obj: node,
}, nil
}
2020-01-07 19:03:11 +00:00
func (m *ChainModule) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) {
2021-12-17 09:42:09 +00:00
cm, err := m.Chain.GetCMessage(ctx, mc)
2020-01-07 19:03:11 +00:00
if err != nil {
return nil, err
}
return cm.VMMessage(), nil
}
2020-01-20 23:51:02 +00:00
Chain ranged export: rework and address current shortcomings This commit moderately refactors the ranged export code. It addresses several problems: * Code does not finish cleanly and things hang on ctrl-c * Same block is read multiple times in a row (artificially increasing cached blockstore metrics to 50%) * It is unclear whether there are additional races (a single worker quits when reaching height 0) * CARs produced have duplicated blocks (~400k for an 80M-blocks CAR or so). Some blocks appear up to 5 times. * Using pointers for tasks where it is not necessary. The changes: * Use a FIFO instead of stack: simpler implementation as its own type. This has not proven to be much more memory-friendly, but it has not made things worse either. * We avoid a probably not small amount of allocations by not using unnecessary pointers. * Fix duplicated blocks by atomically checking+adding to CID set. * Context-termination now works correctly. Worker lifetime is correctly tracked and all channels are closed, avoiding any memory leaks and deadlocks. * We ensure all work is finished before finishing, something that might have been broken in some edge cases previously. In practice, we would not have seen this except perhaps in very early snapshots close to genesis. Initial testing shows the code is currently about 5% faster. Resulting snapshots do not have duplicates so they are a bit smaller. We have manually verified that no CID is lost versus previous results, with both old and recent snapshots.
2023-02-02 16:51:52 +00:00
func (a ChainAPI) ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg api.ChainExportConfig) error {
headTs, err := a.Chain.GetTipSetFromKey(ctx, head)
if err != nil {
return xerrors.Errorf("loading tipset %s: %w", head, err)
}
tailTs, err := a.Chain.GetTipSetFromKey(ctx, tail)
if err != nil {
return xerrors.Errorf("loading tipset %s: %w", tail, err)
}
if headTs.Height() < tailTs.Height() {
return xerrors.Errorf("Height of head-tipset (%d) must be greater or equal to the height of the tail-tipset (%d)", headTs.Height(), tailTs.Height())
}
fileName := filepath.Join(a.Repo.Path(), fmt.Sprintf("snapshot_%d_%d_%d.car", tailTs.Height(), headTs.Height(), time.Now().Unix()))
if err != nil {
return err
}
Chain ranged export: rework and address current shortcomings This commit moderately refactors the ranged export code. It addresses several problems: * Code does not finish cleanly and things hang on ctrl-c * Same block is read multiple times in a row (artificially increasing cached blockstore metrics to 50%) * It is unclear whether there are additional races (a single worker quits when reaching height 0) * CARs produced have duplicated blocks (~400k for an 80M-blocks CAR or so). Some blocks appear up to 5 times. * Using pointers for tasks where it is not necessary. The changes: * Use a FIFO instead of stack: simpler implementation as its own type. This has not proven to be much more memory-friendly, but it has not made things worse either. * We avoid a probably not small amount of allocations by not using unnecessary pointers. * Fix duplicated blocks by atomically checking+adding to CID set. * Context-termination now works correctly. Worker lifetime is correctly tracked and all channels are closed, avoiding any memory leaks and deadlocks. * We ensure all work is finished before finishing, something that might have been broken in some edge cases previously. In practice, we would not have seen this except perhaps in very early snapshots close to genesis. Initial testing shows the code is currently about 5% faster. Resulting snapshots do not have duplicates so they are a bit smaller. We have manually verified that no CID is lost versus previous results, with both old and recent snapshots.
2023-02-02 16:51:52 +00:00
f, err := os.Create(fileName)
if err != nil {
return err
}
log.Infow("Exporting chain range", "path", fileName)
// buffer writes to the chain export file.
bw := bufio.NewWriterSize(f, cfg.WriteBufferSize)
defer func() {
if err := bw.Flush(); err != nil {
log.Errorw("failed to flush buffer", "error", err)
}
if err := f.Close(); err != nil {
log.Errorw("failed to close file", "error", err)
}
}()
Chain ranged export: rework and address current shortcomings This commit moderately refactors the ranged export code. It addresses several problems: * Code does not finish cleanly and things hang on ctrl-c * Same block is read multiple times in a row (artificially increasing cached blockstore metrics to 50%) * It is unclear whether there are additional races (a single worker quits when reaching height 0) * CARs produced have duplicated blocks (~400k for an 80M-blocks CAR or so). Some blocks appear up to 5 times. * Using pointers for tasks where it is not necessary. The changes: * Use a FIFO instead of stack: simpler implementation as its own type. This has not proven to be much more memory-friendly, but it has not made things worse either. * We avoid a probably not small amount of allocations by not using unnecessary pointers. * Fix duplicated blocks by atomically checking+adding to CID set. * Context-termination now works correctly. Worker lifetime is correctly tracked and all channels are closed, avoiding any memory leaks and deadlocks. * We ensure all work is finished before finishing, something that might have been broken in some edge cases previously. In practice, we would not have seen this except perhaps in very early snapshots close to genesis. Initial testing shows the code is currently about 5% faster. Resulting snapshots do not have duplicates so they are a bit smaller. We have manually verified that no CID is lost versus previous results, with both old and recent snapshots.
2023-02-02 16:51:52 +00:00
if err := a.Chain.ExportRange(ctx,
bw,
headTs, tailTs,
cfg.IncludeMessages, cfg.IncludeReceipts, cfg.IncludeStateRoots,
cfg.NumWorkers,
Chain ranged export: rework and address current shortcomings This commit moderately refactors the ranged export code. It addresses several problems: * Code does not finish cleanly and things hang on ctrl-c * Same block is read multiple times in a row (artificially increasing cached blockstore metrics to 50%) * It is unclear whether there are additional races (a single worker quits when reaching height 0) * CARs produced have duplicated blocks (~400k for an 80M-blocks CAR or so). Some blocks appear up to 5 times. * Using pointers for tasks where it is not necessary. The changes: * Use a FIFO instead of stack: simpler implementation as its own type. This has not proven to be much more memory-friendly, but it has not made things worse either. * We avoid a probably not small amount of allocations by not using unnecessary pointers. * Fix duplicated blocks by atomically checking+adding to CID set. * Context-termination now works correctly. Worker lifetime is correctly tracked and all channels are closed, avoiding any memory leaks and deadlocks. * We ensure all work is finished before finishing, something that might have been broken in some edge cases previously. In practice, we would not have seen this except perhaps in very early snapshots close to genesis. Initial testing shows the code is currently about 5% faster. Resulting snapshots do not have duplicates so they are a bit smaller. We have manually verified that no CID is lost versus previous results, with both old and recent snapshots.
2023-02-02 16:51:52 +00:00
); err != nil {
return fmt.Errorf("exporting chain range: %w", err)
}
return nil
}
func (a *ChainAPI) ChainExport(ctx context.Context, nroots abi.ChainEpoch, skipoldmsgs bool, tsk types.TipSetKey) (<-chan []byte, error) {
2021-12-11 21:03:00 +00:00
ts, err := a.Chain.GetTipSetFromKey(ctx, tsk)
if err != nil {
return nil, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
2020-01-20 23:51:02 +00:00
r, w := io.Pipe()
out := make(chan []byte)
go func() {
2020-09-03 19:21:10 +00:00
bw := bufio.NewWriterSize(w, 1<<20)
err := a.Chain.Export(ctx, ts, nroots, skipoldmsgs, bw)
bw.Flush() //nolint:errcheck // it is a write to a pipe
w.CloseWithError(err) //nolint:errcheck // it is a pipe
2020-01-20 23:51:02 +00:00
}()
go func() {
2020-01-21 01:53:55 +00:00
defer close(out)
2020-01-20 23:51:02 +00:00
for {
2020-09-03 19:21:10 +00:00
buf := make([]byte, 1<<20)
2020-01-20 23:51:02 +00:00
n, err := r.Read(buf)
if err != nil && err != io.EOF {
2020-01-20 23:51:02 +00:00
log.Errorf("chain export pipe read failed: %s", err)
return
}
if n > 0 {
select {
case out <- buf[:n]:
case <-ctx.Done():
log.Warnf("export writer failed: %s", ctx.Err())
return
}
2020-01-20 23:51:02 +00:00
}
if err == io.EOF {
// send empty slice to indicate correct eof
select {
case out <- []byte{}:
case <-ctx.Done():
log.Warnf("export writer failed: %s", ctx.Err())
return
}
return
}
2020-01-20 23:51:02 +00:00
}
}()
return out, nil
}
2021-07-25 08:14:48 +00:00
func (a *ChainAPI) ChainCheckBlockstore(ctx context.Context) error {
checker, ok := a.BaseBlockstore.(interface{ Check() error })
if !ok {
return xerrors.Errorf("underlying blockstore does not support health checks")
}
return checker.Check()
}
2021-07-26 05:30:07 +00:00
func (a *ChainAPI) ChainBlockstoreInfo(ctx context.Context) (map[string]interface{}, error) {
info, ok := a.BaseBlockstore.(interface{ Info() map[string]interface{} })
if !ok {
return nil, xerrors.Errorf("underlying blockstore does not provide info")
}
return info.Info(), nil
}
// ChainGetEvents returns the events under an event AMT root CID.
//
// TODO (raulk) make copies of this logic elsewhere use this (e.g. itests, CLI, events filter).
func (a *ChainAPI) ChainGetEvents(ctx context.Context, root cid.Cid) ([]types.Event, error) {
store := cbor.NewCborStore(a.ExposedBlockstore)
evtArr, err := amt4.LoadAMT(ctx, store, root, amt4.UseTreeBitWidth(types.EventAMTBitwidth))
if err != nil {
return nil, xerrors.Errorf("load events amt: %w", err)
}
ret := make([]types.Event, 0, evtArr.Len())
var evt types.Event
err = evtArr.ForEach(ctx, func(u uint64, deferred *cbg.Deferred) error {
if u > math.MaxInt {
return xerrors.Errorf("too many events")
}
if err := evt.UnmarshalCBOR(bytes.NewReader(deferred.Raw)); err != nil {
return err
}
ret = append(ret, evt)
return nil
})
return ret, err
}
func (a *ChainAPI) ChainPrune(ctx context.Context, opts api.PruneOpts) error {
pruner, ok := a.BaseBlockstore.(interface {
PruneChain(opts api.PruneOpts) error
})
if !ok {
return xerrors.Errorf("base blockstore does not support pruning (%T)", a.BaseBlockstore)
}
return pruner.PruneChain(opts)
}