diff --git a/chain/blocksync.go b/chain/blocksync.go index a69caef85..87c4e3ffe 100644 --- a/chain/blocksync.go +++ b/chain/blocksync.go @@ -5,11 +5,12 @@ import ( "context" "fmt" "github.com/filecoin-project/go-lotus/lib/cborrpc" + exchange "github.com/ipfs/go-ipfs-exchange-interface" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/protocol" "math/rand" "sync" - "github.com/ipfs/go-bitswap" "github.com/ipfs/go-cid" cbor "github.com/ipfs/go-ipld-cbor" inet "github.com/libp2p/go-libp2p-core/network" @@ -180,17 +181,17 @@ func (bss *BlockSyncService) gatherMessages(ts *TipSet) ([]*SignedMessage, [][]i } type BlockSync struct { - bswap *bitswap.Bitswap + bswap exchange.Interface newStream NewStreamFunc syncPeersLk sync.Mutex syncPeers map[peer.ID]struct{} } -func NewBlockSyncClient(bswap *bitswap.Bitswap, newStreamF NewStreamFunc) *BlockSync { +func NewBlockSyncClient(bswap exchange.Interface, h host.Host) *BlockSync { return &BlockSync{ bswap: bswap, - newStream: newStreamF, + newStream: h.NewStream, syncPeers: make(map[peer.ID]struct{}), } } diff --git a/chain/chain.go b/chain/chain.go index 223edff8c..86956ddc6 100644 --- a/chain/chain.go +++ b/chain/chain.go @@ -173,7 +173,7 @@ type ChainStore struct { headChange func(rev, app []*TipSet) error } -func NewChainStore(bs bstore.Blockstore, ds datastore.Datastore) *ChainStore { +func NewChainStore(bs bstore.Blockstore, ds datastore.Batching) *ChainStore { return &ChainStore{ bs: bs, ds: ds, diff --git a/go.mod b/go.mod index 3dfacbc0e..713af265d 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/ipfs/go-datastore v0.0.5 github.com/ipfs/go-hamt-ipld v0.0.0-20190613164304-cd074602062f github.com/ipfs/go-ipfs-blockstore v0.0.1 + github.com/ipfs/go-ipfs-exchange-interface v0.0.1 github.com/ipfs/go-ipfs-routing v0.1.0 github.com/ipfs/go-ipld-cbor v0.0.2 github.com/ipfs/go-ipld-format v0.0.2 diff --git a/node/builder.go b/node/builder.go index 7a2cc4f8a..347315f64 100644 --- a/node/builder.go +++ b/node/builder.go @@ -3,6 +3,9 @@ package node import ( "context" "errors" + "github.com/filecoin-project/go-lotus/node/modules/testing" + blockstore "github.com/ipfs/go-ipfs-blockstore" + exchange "github.com/ipfs/go-ipfs-exchange-interface" "reflect" "time" @@ -18,7 +21,9 @@ import ( "github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/build" + "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/node/config" + "github.com/filecoin-project/go-lotus/node/hello" "github.com/filecoin-project/go-lotus/node/modules" "github.com/filecoin-project/go-lotus/node/modules/helpers" "github.com/filecoin-project/go-lotus/node/modules/lp2p" @@ -44,13 +49,17 @@ var ( type invoke int +//nolint:golint const ( - // PstoreAddSelfKeysKey is a key for Override for PstoreAddSelfKeys - PstoreAddSelfKeysKey = invoke(iota) + // libp2p - // StartListeningKey is a key for Override for StartListening + PstoreAddSelfKeysKey = invoke(iota) StartListeningKey + // filecoin + SetGenisisKey + RunHelloKey + _nInvokes // keep this last ) @@ -97,8 +106,13 @@ var defaults = []Option{ randomIdentity(), - Override(new(datastore.Batching), datastore.NewMapDatastore), + Override(new(datastore.Batching), testing.MapDatastore), + Override(new(blockstore.Blockstore), testing.MapBlockstore), // NOT on top of ds above Override(new(record.Validator), modules.RecordValidator), + + // Filecoin modules + + Override(new(*chain.ChainStore), chain.NewChainStore), } // Online sets up basic libp2p node @@ -134,7 +148,25 @@ func Online() Option { Override(PstoreAddSelfKeysKey, lp2p.PstoreAddSelfKeys), Override(StartListeningKey, lp2p.StartListening(defConf.Libp2p.ListenAddresses)), + + // + + Override(new(blockstore.GCLocker), blockstore.NewGCLocker), + Override(new(blockstore.GCBlockstore), blockstore.NewGCBlockstore), + Override(new(exchange.Interface), modules.Bitswap), + + // Filecoin services + Override(new(*chain.Syncer), chain.NewSyncer), + Override(new(*chain.BlockSync), chain.NewBlockSyncClient), + Override(new(*chain.Wallet), chain.NewWallet), + + Override(new(modules.Genesis), testing.MakeGenesis), + Override(SetGenisisKey, modules.SetGenesis), + + Override(new(*hello.Service), hello.NewHelloService), + Override(RunHelloKey, hello.Run), ) + } // Config sets up constructors based on the provided config diff --git a/node/hello/hello.go b/node/hello/hello.go index 2600da6ba..65f9a6e8e 100644 --- a/node/hello/hello.go +++ b/node/hello/hello.go @@ -5,6 +5,8 @@ import ( "fmt" "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/lib/cborrpc" + "github.com/filecoin-project/go-lotus/node/modules/helpers" + "go.uber.org/fx" "github.com/libp2p/go-libp2p-core/host" @@ -36,9 +38,12 @@ type Service struct { syncer *chain.Syncer } -func NewHelloService(h host.Host) *Service { +func NewHelloService(h host.Host, cs *chain.ChainStore, syncer *chain.Syncer) *Service { return &Service{ newStream: h.NewStream, + + cs: cs, + syncer: syncer, } } @@ -97,3 +102,20 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { return nil } + +func Run(mctx helpers.MetricsCtx, lc fx.Lifecycle, hs *Service, h host.Host) { + ctx := helpers.LifecycleCtx(mctx, lc) + h.SetStreamHandler(ProtocolID, hs.HandleStream) + + bundle := inet.NotifyBundle{ + ConnectedF: func(_ inet.Network, c inet.Conn) { + go func() { + if err := hs.SayHello(ctx, c.RemotePeer()); err != nil { + log.Error("failed to say hello: ", err) + return + } + }() + }, + } + h.Network().Notify(&bundle) +} diff --git a/node/modules/core.go b/node/modules/core.go index 070cdef99..7bdb2ab3f 100644 --- a/node/modules/core.go +++ b/node/modules/core.go @@ -1,16 +1,43 @@ package modules import ( + "context" + "github.com/filecoin-project/go-lotus/chain" + "github.com/filecoin-project/go-lotus/node/modules/helpers" + "github.com/ipfs/go-bitswap" + "github.com/ipfs/go-bitswap/network" + blockstore "github.com/ipfs/go-ipfs-blockstore" + exchange "github.com/ipfs/go-ipfs-exchange-interface" logging "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/libp2p/go-libp2p-core/routing" record "github.com/libp2p/go-libp2p-record" + "go.uber.org/fx" ) var log = logging.Logger("modules") +type Genesis *chain.BlockHeader + // RecordValidator provides namesys compatible routing record validator func RecordValidator(ps peerstore.Peerstore) record.Validator { return record.NamespacedValidator{ "pk": record.PublicKeyValidator{}, } } + +func Bitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.Routing, bs blockstore.GCBlockstore) exchange.Interface { + bitswapNetwork := network.NewFromIpfsHost(host, rt) + exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, bs) + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return exch.Close() + }, + }) + return exch +} + +func SetGenesis(cs *chain.ChainStore, g Genesis) error { + return cs.SetGenesis(g) +} diff --git a/node/modules/testing/genesis.go b/node/modules/testing/genesis.go new file mode 100644 index 000000000..dedf36279 --- /dev/null +++ b/node/modules/testing/genesis.go @@ -0,0 +1,15 @@ +package testing + +import ( + "github.com/filecoin-project/go-lotus/chain" + "github.com/filecoin-project/go-lotus/node/modules" + blockstore "github.com/ipfs/go-ipfs-blockstore" +) + +func MakeGenesis(bs blockstore.Blockstore, w *chain.Wallet) (modules.Genesis, error) { + genb, err := chain.MakeGenesisBlock(bs, w) + if err != nil { + return nil, err + } + return genb.Genesis, nil +} diff --git a/node/modules/testing/storage.go b/node/modules/testing/storage.go new file mode 100644 index 000000000..6d00bd94b --- /dev/null +++ b/node/modules/testing/storage.go @@ -0,0 +1,18 @@ +package testing + +import ( + "github.com/ipfs/go-datastore" + dsync "github.com/ipfs/go-datastore/sync" + blockstore "github.com/ipfs/go-ipfs-blockstore" +) + +func MapBlockstore() blockstore.Blockstore { + // TODO: proper datastore + bds := dsync.MutexWrap(datastore.NewMapDatastore()) + bs := blockstore.NewBlockstore(bds) + return blockstore.NewIdStore(bs) +} + +func MapDatastore() datastore.Batching { + return dsync.MutexWrap(datastore.NewMapDatastore()) +}