ddade32bd3
If they end up validating, we'll write them back to the underlying blockstore. Otherwise, there's no reason to keep them.
227 lines
6.8 KiB
Go
227 lines
6.8 KiB
Go
package modules
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/ipfs/go-bitswap"
|
|
"github.com/ipfs/go-bitswap/network"
|
|
"github.com/ipfs/go-blockservice"
|
|
"github.com/ipfs/go-datastore"
|
|
"github.com/ipld/go-car"
|
|
"github.com/libp2p/go-libp2p-core/host"
|
|
"github.com/libp2p/go-libp2p-core/routing"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
"go.uber.org/fx"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
|
"github.com/filecoin-project/lotus/journal"
|
|
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/chain"
|
|
"github.com/filecoin-project/lotus/chain/beacon"
|
|
"github.com/filecoin-project/lotus/chain/exchange"
|
|
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
|
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
|
"github.com/filecoin-project/lotus/chain/store"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/chain/vm"
|
|
"github.com/filecoin-project/lotus/lib/blockstore"
|
|
"github.com/filecoin-project/lotus/lib/bufbstore"
|
|
"github.com/filecoin-project/lotus/lib/timedbs"
|
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
|
"github.com/filecoin-project/lotus/node/repo"
|
|
)
|
|
|
|
func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ChainGCBlockstore) dtypes.ChainBitswap {
|
|
// prefix protocol for chain bitswap
|
|
// (so bitswap uses /chain/ipfs/bitswap/1.0.0 internally for chain sync stuff)
|
|
bitswapNetwork := network.NewFromIpfsHost(host, rt, network.Prefix("/chain"))
|
|
bitswapOptions := []bitswap.Option{bitswap.ProvideEnabled(false)}
|
|
|
|
// Write all incoming bitswap blocks into a temporary blockstore for two
|
|
// block times. If they validate, they'll be persisted later.
|
|
cache := timedbs.NewTimedCacheBS(2 * time.Duration(build.BlockDelaySecs) * time.Second)
|
|
lc.Append(fx.Hook{OnStop: cache.Stop, OnStart: cache.Start})
|
|
|
|
bitswapBs := bufbstore.NewTieredBstore(bs, cache)
|
|
|
|
// Use just exch.Close(), closing the context is not needed
|
|
exch := bitswap.New(mctx, bitswapNetwork, bitswapBs, bitswapOptions...)
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
return exch.Close()
|
|
},
|
|
})
|
|
|
|
return exch
|
|
}
|
|
|
|
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) {
|
|
mpp := messagepool.NewProvider(sm, ps)
|
|
mp, err := messagepool.New(mpp, ds, nn, j)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("constructing mpool: %w", err)
|
|
}
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(_ context.Context) error {
|
|
return mp.Close()
|
|
},
|
|
})
|
|
return mp, nil
|
|
}
|
|
|
|
func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainBlockstore, error) {
|
|
blocks, err := r.Datastore("/chain")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
bs := blockstore.NewBlockstore(blocks)
|
|
cbs, err := blockstore.CachedBlockstore(helpers.LifecycleCtx(mctx, lc), bs, blockstore.DefaultCacheOpts())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return cbs, nil
|
|
}
|
|
|
|
func ChainGCBlockstore(bs dtypes.ChainBlockstore, gcl dtypes.ChainGCLocker) dtypes.ChainGCBlockstore {
|
|
return blockstore.NewGCBlockstore(bs, gcl)
|
|
}
|
|
|
|
func ChainBlockService(bs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService {
|
|
return blockservice.New(bs, rem)
|
|
}
|
|
|
|
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
|
|
chain := store.NewChainStore(bs, ds, syscalls, j)
|
|
|
|
if err := chain.Load(); err != nil {
|
|
log.Warnf("loading chain state from disk: %s", err)
|
|
}
|
|
|
|
return chain
|
|
}
|
|
|
|
func ErrorGenesis() Genesis {
|
|
return func() (header *types.BlockHeader, e error) {
|
|
return nil, xerrors.New("No genesis block provided, provide the file with 'lotus daemon --genesis=[genesis file]'")
|
|
}
|
|
}
|
|
|
|
func LoadGenesis(genBytes []byte) func(dtypes.ChainBlockstore) Genesis {
|
|
return func(bs dtypes.ChainBlockstore) Genesis {
|
|
return func() (header *types.BlockHeader, e error) {
|
|
c, err := car.LoadCar(bs, bytes.NewReader(genBytes))
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("loading genesis car file failed: %w", err)
|
|
}
|
|
if len(c.Roots) != 1 {
|
|
return nil, xerrors.New("expected genesis file to have one root")
|
|
}
|
|
root, err := bs.Get(c.Roots[0])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
h, err := types.DecodeBlock(root.RawData())
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("decoding block failed: %w", err)
|
|
}
|
|
return h, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func DoSetGenesis(_ dtypes.AfterGenesisSet) {}
|
|
|
|
func SetGenesis(cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error) {
|
|
genFromRepo, err := cs.GetGenesis()
|
|
if err == nil {
|
|
if os.Getenv("LOTUS_SKIP_GENESIS_CHECK") != "_yes_" {
|
|
expectedGenesis, err := g()
|
|
if err != nil {
|
|
return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting expected genesis failed: %w", err)
|
|
}
|
|
|
|
if genFromRepo.Cid() != expectedGenesis.Cid() {
|
|
return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis in the repo is not the one expected by this version of Lotus!")
|
|
}
|
|
}
|
|
return dtypes.AfterGenesisSet{}, nil // already set, noop
|
|
}
|
|
if err != datastore.ErrNotFound {
|
|
return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting genesis block failed: %w", err)
|
|
}
|
|
|
|
genesis, err := g()
|
|
if err != nil {
|
|
return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis func failed: %w", err)
|
|
}
|
|
|
|
return dtypes.AfterGenesisSet{}, cs.SetGenesis(genesis)
|
|
}
|
|
|
|
func NetworkName(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, _ dtypes.AfterGenesisSet) (dtypes.NetworkName, error) {
|
|
if !build.Devnet {
|
|
return "testnetnet", nil
|
|
}
|
|
|
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
|
|
|
netName, err := stmgr.GetNetworkName(ctx, stmgr.NewStateManager(cs), cs.GetHeaviestTipSet().ParentState())
|
|
return netName, err
|
|
}
|
|
|
|
type SyncerParams struct {
|
|
fx.In
|
|
|
|
Lifecycle fx.Lifecycle
|
|
MetadataDS dtypes.MetadataDS
|
|
StateManager *stmgr.StateManager
|
|
ChainXchg exchange.Client
|
|
SyncMgrCtor chain.SyncManagerCtor
|
|
Host host.Host
|
|
Beacon beacon.Schedule
|
|
Verifier ffiwrapper.Verifier
|
|
}
|
|
|
|
func NewSyncer(params SyncerParams) (*chain.Syncer, error) {
|
|
var (
|
|
lc = params.Lifecycle
|
|
ds = params.MetadataDS
|
|
sm = params.StateManager
|
|
ex = params.ChainXchg
|
|
smCtor = params.SyncMgrCtor
|
|
h = params.Host
|
|
b = params.Beacon
|
|
v = params.Verifier
|
|
)
|
|
syncer, err := chain.NewSyncer(ds, sm, ex, smCtor, h.ConnManager(), h.ID(), b, v)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
lc.Append(fx.Hook{
|
|
OnStart: func(_ context.Context) error {
|
|
syncer.Start()
|
|
return nil
|
|
},
|
|
OnStop: func(_ context.Context) error {
|
|
syncer.Stop()
|
|
return nil
|
|
},
|
|
})
|
|
return syncer, nil
|
|
}
|
|
|
|
func NewSlashFilter(ds dtypes.MetadataDS) *slashfilter.SlashFilter {
|
|
return slashfilter.New(ds)
|
|
}
|