package modules import ( "context" "time" "github.com/ipfs/go-bitswap" "github.com/ipfs/go-bitswap/network" "github.com/ipfs/go-blockservice" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/routing" pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/fx" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/beacon" "github.com/filecoin-project/lotus/chain/exchange" "github.com/filecoin-project/lotus/chain/gen/slashfilter" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store/splitstore" "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" ) // ChainBitswap uses a blockstore that bypasses all caches. func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ExposedBlockstore) dtypes.ChainBitswap { // prefix protocol for chain bitswap // (so bitswap uses /chain/ipfs/bitswap/1.0.0 internally for chain sync stuff) bitswapNetwork := network.NewFromIpfsHost(host, rt, network.Prefix("/chain")) bitswapOptions := []bitswap.Option{bitswap.ProvideEnabled(false)} // Write all incoming bitswap blocks into a temporary blockstore for two // block times. If they validate, they'll be persisted later. cache := blockstore.NewTimedCacheBlockstore(2 * time.Duration(build.BlockDelaySecs) * time.Second) lc.Append(fx.Hook{OnStop: cache.Stop, OnStart: cache.Start}) bitswapBs := blockstore.NewTieredBstore(bs, cache) // Use just exch.Close(), closing the context is not needed exch := bitswap.New(mctx, bitswapNetwork, bitswapBs, bitswapOptions...) lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return exch.Close() }, }) return exch } func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService { return blockservice.New(bs, rem) } func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) { mpp := messagepool.NewProvider(sm, ps) mp, err := messagepool.New(mpp, ds, nn, j) if err != nil { return nil, xerrors.Errorf("constructing mpool: %w", err) } lc.Append(fx.Hook{ OnStop: func(_ context.Context) error { return mp.Close() }, }) return mp, nil } func ChainStore(lc fx.Lifecycle, cbs dtypes.ChainBlockstore, sbs dtypes.StateBlockstore, ds dtypes.MetadataDS, ss dtypes.SplitBlockstore, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore { chain := store.NewChainStore(cbs, sbs, ds, syscalls, j) if err := chain.Load(); err != nil { log.Warnf("loading chain state from disk: %s", err) } if ssp, ok := ss.(*splitstore.SplitStore); ok { err := ssp.Start(chain) if err != nil { log.Errorf("error starting splitstore: %s", err) } } else { log.Warnf("unexpected splitstore type: %+v", ss) } lc.Append(fx.Hook{ OnStop: func(_ context.Context) error { return chain.Close() }, }) return chain } func NetworkName(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, us stmgr.UpgradeSchedule, _ dtypes.AfterGenesisSet) (dtypes.NetworkName, error) { if !build.Devnet { return "testnetnet", nil } ctx := helpers.LifecycleCtx(mctx, lc) sm, err := stmgr.NewStateManagerWithUpgradeSchedule(cs, us) if err != nil { return "", err } netName, err := stmgr.GetNetworkName(ctx, sm, cs.GetHeaviestTipSet().ParentState()) return netName, err } type SyncerParams struct { fx.In Lifecycle fx.Lifecycle MetadataDS dtypes.MetadataDS StateManager *stmgr.StateManager ChainXchg exchange.Client SyncMgrCtor chain.SyncManagerCtor Host host.Host Beacon beacon.Schedule Verifier ffiwrapper.Verifier } func NewSyncer(params SyncerParams) (*chain.Syncer, error) { var ( lc = params.Lifecycle ds = params.MetadataDS sm = params.StateManager ex = params.ChainXchg smCtor = params.SyncMgrCtor h = params.Host b = params.Beacon v = params.Verifier ) syncer, err := chain.NewSyncer(ds, sm, ex, smCtor, h.ConnManager(), h.ID(), b, v) if err != nil { return nil, err } lc.Append(fx.Hook{ OnStart: func(_ context.Context) error { syncer.Start() return nil }, OnStop: func(_ context.Context) error { syncer.Stop() return nil }, }) return syncer, nil } func NewSlashFilter(ds dtypes.MetadataDS) *slashfilter.SlashFilter { return slashfilter.New(ds) }