3795cc2bd2
This paves the way for better object lifetime management. Concretely, it makes it possible to: - have different stores backing chain and state data. - having the same datastore library, but using different parameters. - attach different caching layers/policies to each class of data, e.g. sizing caches differently. - specifying different retention policies for chain and state data. This separation is important because: - access patterns/frequency of chain and state data are different. - state is derivable from chain, so one could never expunge the chain store, and only retain state objects reachable from the last finality in the state store.
152 lines
4.5 KiB
Go
152 lines
4.5 KiB
Go
package modules
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/ipfs/go-bitswap"
|
|
"github.com/ipfs/go-bitswap/network"
|
|
"github.com/ipfs/go-blockservice"
|
|
"github.com/libp2p/go-libp2p-core/host"
|
|
"github.com/libp2p/go-libp2p-core/routing"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
"go.uber.org/fx"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/lotus/blockstore"
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/chain"
|
|
"github.com/filecoin-project/lotus/chain/beacon"
|
|
"github.com/filecoin-project/lotus/chain/exchange"
|
|
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
|
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
|
"github.com/filecoin-project/lotus/chain/store"
|
|
"github.com/filecoin-project/lotus/chain/vm"
|
|
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
|
"github.com/filecoin-project/lotus/journal"
|
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
|
)
|
|
|
|
// ChainBitswap uses a blockstore that bypasses all caches.
|
|
func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ExposedBlockstore) dtypes.ChainBitswap {
|
|
// prefix protocol for chain bitswap
|
|
// (so bitswap uses /chain/ipfs/bitswap/1.0.0 internally for chain sync stuff)
|
|
bitswapNetwork := network.NewFromIpfsHost(host, rt, network.Prefix("/chain"))
|
|
bitswapOptions := []bitswap.Option{bitswap.ProvideEnabled(false)}
|
|
|
|
// Write all incoming bitswap blocks into a temporary blockstore for two
|
|
// block times. If they validate, they'll be persisted later.
|
|
cache := blockstore.NewTimedCacheBlockstore(2 * time.Duration(build.BlockDelaySecs) * time.Second)
|
|
lc.Append(fx.Hook{OnStop: cache.Stop, OnStart: cache.Start})
|
|
|
|
bitswapBs := blockstore.NewTieredBstore(bs, cache)
|
|
|
|
// Use just exch.Close(), closing the context is not needed
|
|
exch := bitswap.New(mctx, bitswapNetwork, bitswapBs, bitswapOptions...)
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return exch.Close()
|
|
},
|
|
})
|
|
|
|
return exch
|
|
}
|
|
|
|
func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService {
|
|
return blockservice.New(bs, rem)
|
|
}
|
|
|
|
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) {
|
|
mpp := messagepool.NewProvider(sm, ps)
|
|
mp, err := messagepool.New(mpp, ds, nn, j)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("constructing mpool: %w", err)
|
|
}
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(_ context.Context) error {
|
|
return mp.Close()
|
|
},
|
|
})
|
|
return mp, nil
|
|
}
|
|
|
|
func ChainStore(lc fx.Lifecycle, cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
|
|
chain := store.NewChainStore(cbs, sbs, ds, syscalls, j)
|
|
|
|
if err := chain.Load(); err != nil {
|
|
log.Warnf("loading chain state from disk: %s", err)
|
|
}
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(_ context.Context) error {
|
|
return chain.Close()
|
|
},
|
|
})
|
|
|
|
return chain
|
|
}
|
|
|
|
func NetworkName(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, us stmgr.UpgradeSchedule, _ dtypes.AfterGenesisSet) (dtypes.NetworkName, error) {
|
|
if !build.Devnet {
|
|
return "testnetnet", nil
|
|
}
|
|
|
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
|
|
|
sm, err := stmgr.NewStateManagerWithUpgradeSchedule(cs, us)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
netName, err := stmgr.GetNetworkName(ctx, sm, cs.GetHeaviestTipSet().ParentState())
|
|
return netName, err
|
|
}
|
|
|
|
type SyncerParams struct {
|
|
fx.In
|
|
|
|
Lifecycle fx.Lifecycle
|
|
MetadataDS dtypes.MetadataDS
|
|
StateManager *stmgr.StateManager
|
|
ChainXchg exchange.Client
|
|
SyncMgrCtor chain.SyncManagerCtor
|
|
Host host.Host
|
|
Beacon beacon.Schedule
|
|
Verifier ffiwrapper.Verifier
|
|
}
|
|
|
|
func NewSyncer(params SyncerParams) (*chain.Syncer, error) {
|
|
var (
|
|
lc = params.Lifecycle
|
|
ds = params.MetadataDS
|
|
sm = params.StateManager
|
|
ex = params.ChainXchg
|
|
smCtor = params.SyncMgrCtor
|
|
h = params.Host
|
|
b = params.Beacon
|
|
v = params.Verifier
|
|
)
|
|
syncer, err := chain.NewSyncer(ds, sm, ex, smCtor, h.ConnManager(), h.ID(), b, v)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(_ context.Context) error {
|
|
syncer.Start()
|
|
return nil
|
|
},
|
|
OnStop: func(_ context.Context) error {
|
|
syncer.Stop()
|
|
return nil
|
|
},
|
|
})
|
|
return syncer, nil
|
|
}
|
|
|
|
func NewSlashFilter(ds dtypes.MetadataDS) *slashfilter.SlashFilter {
|
|
return slashfilter.New(ds)
|
|
}
|