lotus/node/modules/chain.go
Jorropo 6c01310728
chore: migrate to boxo
This migrates everything except the `go-car` librairy: https://github.com/ipfs/boxo/issues/218#issuecomment-1529922103

I didn't migrated everything in the previous release because all the boxo code wasn't compatible with the go-ipld-prime one due to a an in flight (/ aftermath) revert of github.com/ipfs/go-block-format. go-block-format has been unmigrated since slight bellow absolutely everything depends on it that would have required everything to be moved on boxo or everything to optin into using boxo which were all deal breakers for different groups.

This worked fine because lotus's codebase could live hapely on the first multirepo setup however boost is now trying to use boxo's code with lotus's (still on multirepo) setup: https://filecoinproject.slack.com/archives/C03AQ3QAUG1/p1685022344779649

The alternative would be for boost to write shim types which just forward calls and return with the different interface definitions.

Btw why is that an issue in the first place is because unlike what go's duck typing model suggest interfaces are not transparent https://github.com/golang/go/issues/58112, interfaces are strongly typed but they have implicit narrowing. The issue is if you return an interface from an interface Go does not have a function definition to insert the implicit conversion thus instead the type checker complains you are not returning the right type.

Stubbing types were reverted https://github.com/ipfs/boxo/issues/218#issuecomment-1478650351

Last time I only migrated `go-bitswap` to `boxo/bitswap` because of the security issues and because we never had the interface return an interface problem (we had concrete wrappers where the implicit conversion took place).
2023-06-19 14:45:05 -07:00

189 lines
5.4 KiB
Go

package modules
import (
"context"
"time"
"github.com/ipfs/boxo/bitswap"
"github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/blockservice"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/routing"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/blockstore/splitstore"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/exchange"
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
)
// ChainBitswap uses a blockstore that bypasses all caches.
func ChainBitswap(lc fx.Lifecycle, mctx helpers.MetricsCtx, host host.Host, rt routing.Routing, bs dtypes.ExposedBlockstore) dtypes.ChainBitswap {
// prefix protocol for chain bitswap
// (so bitswap uses /chain/ipfs/bitswap/1.0.0 internally for chain sync stuff)
bitswapNetwork := network.NewFromIpfsHost(host, rt, network.Prefix("/chain"))
bitswapOptions := []bitswap.Option{bitswap.ProvideEnabled(false)}
// Write all incoming bitswap blocks into a temporary blockstore for two
// block times. If they validate, they'll be persisted later.
cache := blockstore.NewTimedCacheBlockstore(2 * time.Duration(build.BlockDelaySecs) * time.Second)
lc.Append(fx.Hook{OnStop: cache.Stop, OnStart: cache.Start})
bitswapBs := blockstore.NewTieredBstore(bs, cache)
// Use just exch.Close(), closing the context is not needed
exch := bitswap.New(mctx, bitswapNetwork, bitswapBs, bitswapOptions...)
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return exch.Close()
},
})
return exch
}
func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService {
return blockservice.New(bs, rem)
}
func MessagePool(lc fx.Lifecycle, mctx helpers.MetricsCtx, us stmgr.UpgradeSchedule, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal, protector dtypes.GCReferenceProtector) (*messagepool.MessagePool, error) {
mp, err := messagepool.New(helpers.LifecycleCtx(mctx, lc), mpp, ds, us, nn, j)
if err != nil {
return nil, xerrors.Errorf("constructing mpool: %w", err)
}
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return mp.Close()
},
})
protector.AddProtector(mp.ForEachPendingMessage)
return mp, nil
}
func ChainStore(lc fx.Lifecycle,
mctx helpers.MetricsCtx,
cbs dtypes.ChainBlockstore,
sbs dtypes.StateBlockstore,
ds dtypes.MetadataDS,
basebs dtypes.BaseBlockstore,
weight store.WeightFunc,
us stmgr.UpgradeSchedule,
j journal.Journal) *store.ChainStore {
chain := store.NewChainStore(cbs, sbs, ds, weight, j)
if err := chain.Load(helpers.LifecycleCtx(mctx, lc)); err != nil {
log.Warnf("loading chain state from disk: %s", err)
}
var startHook func(context.Context) error
if ss, ok := basebs.(*splitstore.SplitStore); ok {
startHook = func(_ context.Context) error {
err := ss.Start(chain, us)
if err != nil {
err = xerrors.Errorf("error starting splitstore: %w", err)
}
return err
}
}
lc.Append(fx.Hook{
OnStart: startHook,
OnStop: func(_ context.Context) error {
return chain.Close()
},
})
return chain
}
func NetworkName(mctx helpers.MetricsCtx,
lc fx.Lifecycle,
cs *store.ChainStore,
tsexec stmgr.Executor,
syscalls vm.SyscallBuilder,
us stmgr.UpgradeSchedule,
_ dtypes.AfterGenesisSet) (dtypes.NetworkName, error) {
if !build.Devnet {
return "testnetnet", nil
}
ctx := helpers.LifecycleCtx(mctx, lc)
sm, err := stmgr.NewStateManager(cs, tsexec, syscalls, us, nil, nil, index.DummyMsgIndex)
if err != nil {
return "", err
}
netName, err := stmgr.GetNetworkName(ctx, sm, cs.GetHeaviestTipSet().ParentState())
return netName, err
}
type SyncerParams struct {
fx.In
Lifecycle fx.Lifecycle
MetadataDS dtypes.MetadataDS
StateManager *stmgr.StateManager
ChainXchg exchange.Client
SyncMgrCtor chain.SyncManagerCtor
Host host.Host
Beacon beacon.Schedule
Gent chain.Genesis
Consensus consensus.Consensus
}
func NewSyncer(params SyncerParams) (*chain.Syncer, error) {
var (
lc = params.Lifecycle
ds = params.MetadataDS
sm = params.StateManager
ex = params.ChainXchg
smCtor = params.SyncMgrCtor
h = params.Host
b = params.Beacon
)
syncer, err := chain.NewSyncer(ds, sm, ex, smCtor, h.ConnManager(), h.ID(), b, params.Gent, params.Consensus)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
syncer.Start()
return nil
},
OnStop: func(_ context.Context) error {
syncer.Stop()
return nil
},
})
return syncer, nil
}
func NewSlashFilter(ds dtypes.MetadataDS) *slashfilter.SlashFilter {
return slashfilter.New(ds)
}
func UpgradeSchedule() stmgr.UpgradeSchedule {
return filcns.DefaultUpgradeSchedule()
}
func EnableStoringEvents(cs *store.ChainStore) {
cs.StoreEvents(true)
}