lotus/node/modules/chain.go

227 lines
6.8 KiB
Go
Raw Normal View History

2019-08-01 14:19:53 +00:00
package modules
import (
"bytes"
"context"
"os"
"time"
2020-02-27 21:45:31 +00:00
2019-08-01 14:19:53 +00:00
"github.com/ipfs/go-bitswap"
2019-08-08 17:16:41 +00:00
"github.com/ipfs/go-bitswap/network"
2019-08-01 14:19:53 +00:00
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-car"
2019-08-01 14:19:53 +00:00
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing"
pubsub "github.com/libp2p/go-libp2p-pubsub"
2019-08-01 14:19:53 +00:00
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/journal"
2020-04-17 14:47:19 +00:00
2020-09-24 21:30:11 +00:00
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/exchange"
2020-08-06 01:16:54 +00:00
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
2019-12-01 23:11:43 +00:00
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/lib/bufbstore"
"github.com/filecoin-project/lotus/lib/timedbs"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
2019-08-01 14:19:53 +00:00
)
func ChainBitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs dtypes.ChainGCBlockstore) dtypes.ChainBitswap {
2019-08-06 22:04:21 +00:00
// prefix protocol for chain bitswap
// (so bitswap uses /chain/ipfs/bitswap/1.0.0 internally for chain sync stuff)
2019-08-08 17:16:41 +00:00
bitswapNetwork := network.NewFromIpfsHost(host, rt, network.Prefix("/chain"))
bitswapOptions := []bitswap.Option{bitswap.ProvideEnabled(false)}
// Write all incoming bitswap blocks into a temporary blockstore for two
// block times. If they validate, they'll be persisted later.
cache := timedbs.NewTimedCacheBS(2 * time.Duration(build.BlockDelaySecs) * time.Second)
lc.Append(fx.Hook{OnStop: cache.Stop, OnStart: cache.Start})
bitswapBs := bufbstore.NewTieredBstore(bs, cache)
// Use just exch.Close(), closing the context is not needed
exch := bitswap.New(mctx, bitswapNetwork, bitswapBs, bitswapOptions...)
2019-08-01 14:19:53 +00:00
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return exch.Close()
},
})
return exch
}
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) {
2019-12-02 19:39:50 +00:00
mpp := messagepool.NewProvider(sm, ps)
mp, err := messagepool.New(mpp, ds, nn, j)
2019-11-23 19:01:56 +00:00
if err != nil {
return nil, xerrors.Errorf("constructing mpool: %w", err)
}
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return mp.Close()
},
})
2019-11-23 19:01:56 +00:00
return mp, nil
}
2020-05-16 18:31:14 +00:00
func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainBlockstore, error) {
blocks, err := r.Datastore("/chain")
2019-08-01 14:19:53 +00:00
if err != nil {
return nil, err
}
bs := blockstore.NewBlockstore(blocks)
2020-05-16 18:31:14 +00:00
cbs, err := blockstore.CachedBlockstore(helpers.LifecycleCtx(mctx, lc), bs, blockstore.DefaultCacheOpts())
if err != nil {
return nil, err
}
return cbs, nil
2019-08-01 14:19:53 +00:00
}
func ChainGCBlockstore(bs dtypes.ChainBlockstore, gcl dtypes.ChainGCLocker) dtypes.ChainGCBlockstore {
return blockstore.NewGCBlockstore(bs, gcl)
}
func ChainBlockService(bs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService {
2019-08-01 14:19:53 +00:00
return blockservice.New(bs, rem)
}
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
chain := store.NewChainStore(bs, ds, syscalls, j)
2019-08-01 14:19:53 +00:00
if err := chain.Load(); err != nil {
log.Warnf("loading chain state from disk: %s", err)
}
2019-08-01 14:19:53 +00:00
return chain
}
func ErrorGenesis() Genesis {
return func() (header *types.BlockHeader, e error) {
return nil, xerrors.New("No genesis block provided, provide the file with 'lotus daemon --genesis=[genesis file]'")
}
}
func LoadGenesis(genBytes []byte) func(dtypes.ChainBlockstore) Genesis {
return func(bs dtypes.ChainBlockstore) Genesis {
return func() (header *types.BlockHeader, e error) {
c, err := car.LoadCar(bs, bytes.NewReader(genBytes))
if err != nil {
2020-02-27 23:14:15 +00:00
return nil, xerrors.Errorf("loading genesis car file failed: %w", err)
2019-08-01 14:19:53 +00:00
}
if len(c.Roots) != 1 {
return nil, xerrors.New("expected genesis file to have one root")
}
root, err := bs.Get(c.Roots[0])
if err != nil {
2020-02-27 23:14:15 +00:00
return nil, err
2019-08-01 14:19:53 +00:00
}
2020-02-27 23:14:15 +00:00
h, err := types.DecodeBlock(root.RawData())
if err != nil {
return nil, xerrors.Errorf("decoding block failed: %w", err)
}
return h, nil
2019-08-01 14:19:53 +00:00
}
}
}
2020-03-31 23:13:37 +00:00
func DoSetGenesis(_ dtypes.AfterGenesisSet) {}
func SetGenesis(cs *store.ChainStore, g Genesis) (dtypes.AfterGenesisSet, error) {
genFromRepo, err := cs.GetGenesis()
2019-08-01 14:19:53 +00:00
if err == nil {
if os.Getenv("LOTUS_SKIP_GENESIS_CHECK") != "_yes_" {
expectedGenesis, err := g()
if err != nil {
return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting expected genesis failed: %w", err)
}
if genFromRepo.Cid() != expectedGenesis.Cid() {
return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis in the repo is not the one expected by this version of Lotus!")
}
}
2020-03-31 23:13:37 +00:00
return dtypes.AfterGenesisSet{}, nil // already set, noop
2019-08-01 14:19:53 +00:00
}
if err != datastore.ErrNotFound {
2020-03-31 23:13:37 +00:00
return dtypes.AfterGenesisSet{}, xerrors.Errorf("getting genesis block failed: %w", err)
2019-08-01 14:19:53 +00:00
}
genesis, err := g()
if err != nil {
2020-03-31 23:13:37 +00:00
return dtypes.AfterGenesisSet{}, xerrors.Errorf("genesis func failed: %w", err)
2019-08-01 14:19:53 +00:00
}
2020-03-31 23:13:37 +00:00
return dtypes.AfterGenesisSet{}, cs.SetGenesis(genesis)
}
func NetworkName(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, _ dtypes.AfterGenesisSet) (dtypes.NetworkName, error) {
2020-09-24 21:30:11 +00:00
if !build.Devnet {
return "testnetnet", nil
}
2020-03-31 23:13:37 +00:00
ctx := helpers.LifecycleCtx(mctx, lc)
netName, err := stmgr.GetNetworkName(ctx, stmgr.NewStateManager(cs), cs.GetHeaviestTipSet().ParentState())
return netName, err
2019-08-01 14:19:53 +00:00
}
2019-11-15 21:35:29 +00:00
2020-09-14 20:58:59 +00:00
type SyncerParams struct {
fx.In
Lifecycle fx.Lifecycle
MetadataDS dtypes.MetadataDS
StateManager *stmgr.StateManager
ChainXchg exchange.Client
SyncMgrCtor chain.SyncManagerCtor
Host host.Host
Beacon beacon.Schedule
Verifier ffiwrapper.Verifier
}
func NewSyncer(params SyncerParams) (*chain.Syncer, error) {
var (
lc = params.Lifecycle
ds = params.MetadataDS
sm = params.StateManager
ex = params.ChainXchg
smCtor = params.SyncMgrCtor
h = params.Host
b = params.Beacon
v = params.Verifier
)
syncer, err := chain.NewSyncer(ds, sm, ex, smCtor, h.ConnManager(), h.ID(), b, v)
2019-11-15 21:35:29 +00:00
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
syncer.Start()
return nil
},
OnStop: func(_ context.Context) error {
syncer.Stop()
return nil
},
})
return syncer, nil
}
2020-08-06 01:14:13 +00:00
func NewSlashFilter(ds dtypes.MetadataDS) *slashfilter.SlashFilter {
return slashfilter.New(ds)
}