package modules import ( "bytes" "context" "os" "time" "github.com/ipfs/go-bitswap" "github.com/ipfs/go-bitswap/network" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" "github.com/ipld/go-car" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/routing" pubsub "github.com/libp2p/go-libp2p-pubsub" "go.uber.org/fx" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/beacon" "github.com/filecoin-project/lotus/chain/exchange" "github.com/filecoin-project/lotus/chain/gen/slashfilter" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/lib/blockstore" "github.com/filecoin-project/lotus/lib/bufbstore" "github.com/filecoin-project/lotus/lib/timedbs" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" ) func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ChainBlockstore) dtypes.ChainBitswap { // prefix protocol for chain bitswap // (so bitswap uses /chain/ipfs/bitswap/1.0.0 internally for chain sync stuff) bitswapNetwork := network.NewFromIpfsHost(host, rt, network.Prefix("/chain")) bitswapOptions := []bitswap.Option{bitswap.ProvideEnabled(false)} // Write all incoming bitswap blocks into a temporary blockstore for two // block times. If they validate, they'll be persisted later. cache := timedbs.NewTimedCacheBS(2 * time.Duration(build.BlockDelaySecs) * time.Second) lc.Append(fx.Hook{OnStop: cache.Stop, OnStart: cache.Start}) bitswapBs := bufbstore.NewTieredBstore(bs, cache) // Use just exch.Close(), closing the context is not needed exch := bitswap.New(mctx, bitswapNetwork, bitswapBs, bitswapOptions...) lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { return exch.Close() }, }) return exch } func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) { mpp := messagepool.NewProvider(sm, ps) mp, err := messagepool.New(mpp, ds, nn, j) if err != nil { return nil, xerrors.Errorf("constructing mpool: %w", err) } lc.Append(fx.Hook{ OnStop: func(_ context.Context) error { return mp.Close() }, }) return mp, nil } func ChainRawBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainRawBlockstore, error) { bs, err := r.Blockstore(repo.BlockstoreChain) if err != nil { return nil, err } // TODO potentially replace this cached blockstore by a CBOR cache. cbs, err := blockstore.CachedBlockstore(helpers.LifecycleCtx(mctx, lc), bs, blockstore.DefaultCacheOpts()) if err != nil { return nil, err } return cbs, nil } func ChainBlockService(bs dtypes.ChainRawBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService { return blockservice.New(bs, rem) } func FallbackChainBlockstore(rbs dtypes.ChainRawBlockstore) dtypes.ChainBlockstore { return &blockstore.FallbackStore{ Blockstore: rbs, } } func SetupFallbackBlockstore(cbs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) error { fbs, ok := cbs.(*blockstore.FallbackStore) if !ok { return xerrors.Errorf("expected a FallbackStore") } fbs.SetFallback(rem.GetBlock) return nil } func ChainStore(lc fx.Lifecycle, mctx helpers.MetricsCtx, bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore { ctx := helpers.LifecycleCtx(mctx, lc) chain := store.NewChainStore(ctx, bs, lbs, ds, syscalls, j) if err := chain.Load(); err != nil { log.Warnf("loading chain state from disk: %s", err) } return chain } func ErrorGenesis() Genesis { return func() (header *types.BlockHeader, e error) { return nil, xerrors.New("No genesis block provided, provide the file with 'lotus daemon --genesis=[genesis file]'") } } func LoadGenesis(genBytes []byte) func(dtypes.ChainBlockstore) Genesis { return func(bs dtypes.ChainBlockstore) Genesis { return func() (header *types.BlockHeader, e error) { c, err := car.LoadCar(bs, bytes.NewReader(genBytes)) if err != nil { return nil, xerrors.Errorf("loading genesis car file failed: %w", err) } if len(c.Roots) != 1 { return nil, xerrors.New("expected genesis file to have one root") } root, err := bs.Get(c.Roots[0]) if err != nil { return nil, err } h, err := types.DecodeBlock(root.RawData()) if err != nil { return nil, xerrors.Errorf("decoding block failed: %w", err) } return h, nil } } } func DoSetGenesis(_ dtypes.AfterGenesisSet) {} func SetGenesis(cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error) { genFromRepo, err := cs.GetGenesis() if err == nil { if os.Getenv("LOTUS_SKIP_GENESIS_CHECK") != "_yes_" { expectedGenesis, err := g() if err != nil { return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting expected genesis failed: %w", err) } if genFromRepo.Cid() != expectedGenesis.Cid() { return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis in the repo is not the one expected by this version of Lotus!") } } return dtypes.AfterGenesisSet{}, nil // already set, noop } if err != datastore.ErrNotFound { return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting genesis block failed: %w", err) } genesis, err := g() if err != nil { return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis func failed: %w", err) } return dtypes.AfterGenesisSet{}, cs.SetGenesis(genesis) } func NetworkName(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, us stmgr.UpgradeSchedule, _ dtypes.AfterGenesisSet) (dtypes.NetworkName, error) { if !build.Devnet { return "testnetnet", nil } ctx := helpers.LifecycleCtx(mctx, lc) sm, err := stmgr.NewStateManagerWithUpgradeSchedule(cs, us) if err != nil { return "", err } netName, err := stmgr.GetNetworkName(ctx, sm, cs.GetHeaviestTipSet().ParentState()) return netName, err } type SyncerParams struct { fx.In Lifecycle fx.Lifecycle MetadataDS dtypes.MetadataDS StateManager *stmgr.StateManager ChainXchg exchange.Client SyncMgrCtor chain.SyncManagerCtor Host host.Host Beacon beacon.Schedule Verifier ffiwrapper.Verifier } func NewSyncer(params SyncerParams) (*chain.Syncer, error) { var ( lc = params.Lifecycle ds = params.MetadataDS sm = params.StateManager ex = params.ChainXchg smCtor = params.SyncMgrCtor h = params.Host b = params.Beacon v = params.Verifier ) syncer, err := chain.NewSyncer(ds, sm, ex, smCtor, h.ConnManager(), h.ID(), b, v) if err != nil { return nil, err } lc.Append(fx.Hook{ OnStart: func(_ context.Context) error { syncer.Start() return nil }, OnStop: func(_ context.Context) error { syncer.Stop() return nil }, }) return syncer, nil } func NewSlashFilter(ds dtypes.MetadataDS) *slashfilter.SlashFilter { return slashfilter.New(ds) }