From d1eb9073d15c1e87d40a1ba3d1a3ffb3737d1e30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 8 Jul 2019 16:07:09 +0200 Subject: [PATCH] Pubsub for blocks/messages --- chain/sub/incoming.go | 62 ++++++++++++++++++++++++++++++++++++++++ chain/sync.go | 12 ++++---- node/builder.go | 22 ++++++++++---- node/hello/hello.go | 20 ------------- node/modules/hello.go | 25 ---------------- node/modules/services.go | 54 ++++++++++++++++++++++++++++++++++ 6 files changed, 139 insertions(+), 56 deletions(-) create mode 100644 chain/sub/incoming.go delete mode 100644 node/modules/hello.go create mode 100644 node/modules/services.go diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go new file mode 100644 index 000000000..a8e63bc3c --- /dev/null +++ b/chain/sub/incoming.go @@ -0,0 +1,62 @@ +package sub + +import ( + "context" + "fmt" + + "github.com/filecoin-project/go-lotus/chain" + logging "github.com/ipfs/go-log" + pubsub "github.com/libp2p/go-libp2p-pubsub" +) + +var log = logging.Logger("sub") + +func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer) { + for { + msg, err := bsub.Next(ctx) + if err != nil { + fmt.Println("error from block subscription: ", err) + continue + } + + blk, err := chain.DecodeBlockMsg(msg.GetData()) + if err != nil { + log.Error("got invalid block over pubsub: ", err) + continue + } + + go func() { + msgs, err := s.Bsync.FetchMessagesByCids(blk.Messages) + if err != nil { + log.Errorf("failed to fetch all messages for block received over pubusb: %s", err) + return + } + fmt.Println("inform new block over pubsub") + s.InformNewBlock(msg.GetFrom(), &chain.FullBlock{ + Header: blk.Header, + Messages: msgs, + }) + }() + } +} + +func HandleIncomingMessages(ctx context.Context, mpool *chain.MessagePool, msub *pubsub.Subscription) { + for { + msg, err := msub.Next(ctx) + if err != nil { + fmt.Println("error from message subscription: ", err) + continue + } + + m, err := chain.DecodeSignedMessage(msg.GetData()) + if err != nil { + log.Errorf("got incorrectly formatted Message: %s", err) + continue + } + + if err := mpool.Add(m); err != nil { + log.Errorf("failed to add message from network to message pool: %s", err) + continue + } + } +} diff --git a/chain/sync.go b/chain/sync.go index 67321aabb..ca0aafa60 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -35,7 +35,7 @@ type Syncer struct { bad BadTipSetCache // handle to the block sync service - bsync *BlockSync + Bsync *BlockSync // peer heads // Note: clear cache on disconnects @@ -57,7 +57,7 @@ func NewSyncer(cs *ChainStore, bsync *BlockSync) (*Syncer, error) { return &Syncer{ syncMode: Bootstrap, Genesis: gent, - bsync: bsync, + Bsync: bsync, peerHeads: make(map[peer.ID]*TipSet), head: cs.GetHeaviestTipSet(), store: cs, @@ -123,7 +123,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *FullTipSet) { syncer.peerHeadsLk.Lock() syncer.peerHeads[from] = fts.TipSet() syncer.peerHeadsLk.Unlock() - syncer.bsync.AddPeer(from) + syncer.Bsync.AddPeer(from) go func() { syncer.syncLock.Lock() @@ -185,7 +185,7 @@ func (syncer *Syncer) SyncBootstrap() { // requested, and that they are correctly linked to eachother. It does // not validate any state transitions fmt.Println("Get blocks: ", cur) - blks, err := syncer.bsync.GetBlocks(context.TODO(), cur, 10) + blks, err := syncer.Bsync.GetBlocks(context.TODO(), cur, 10) if err != nil { log.Error("failed to get blocks: ", err) return @@ -238,7 +238,7 @@ func (syncer *Syncer) SyncBootstrap() { } next := blockSet[nextHeight] - bstips, err := syncer.bsync.GetChainMessages(ctx, next, (nextHeight+1)-i) + bstips, err := syncer.Bsync.GetChainMessages(ctx, next, (nextHeight+1)-i) if err != nil { log.Errorf("failed to fetch messages: %s", err) return @@ -400,7 +400,7 @@ func (syncer *Syncer) FetchTipSet(ctx context.Context, p peer.ID, cids []cid.Cid return fts, nil } - return syncer.bsync.GetFullTipSet(ctx, p, cids) + return syncer.Bsync.GetFullTipSet(ctx, p, cids) } func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*FullTipSet, error) { diff --git a/node/builder.go b/node/builder.go index 347315f64..e4b2b7f1f 100644 --- a/node/builder.go +++ b/node/builder.go @@ -3,19 +3,19 @@ package node import ( "context" "errors" - "github.com/filecoin-project/go-lotus/node/modules/testing" - blockstore "github.com/ipfs/go-ipfs-blockstore" - exchange "github.com/ipfs/go-ipfs-exchange-interface" "reflect" "time" "github.com/ipfs/go-datastore" + blockstore "github.com/ipfs/go-ipfs-blockstore" + exchange "github.com/ipfs/go-ipfs-exchange-interface" ci "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" "github.com/libp2p/go-libp2p-peerstore/pstoremem" + pubsub "github.com/libp2p/go-libp2p-pubsub" record "github.com/libp2p/go-libp2p-record" "go.uber.org/fx" @@ -27,6 +27,7 @@ import ( "github.com/filecoin-project/go-lotus/node/modules" "github.com/filecoin-project/go-lotus/node/modules/helpers" "github.com/filecoin-project/go-lotus/node/modules/lp2p" + "github.com/filecoin-project/go-lotus/node/modules/testing" ) // special is a type used to give keys to modules which @@ -58,7 +59,12 @@ const ( // filecoin SetGenisisKey + RunHelloKey + RunBlockSyncKey + + HandleIncomingBlocksKey + HandleIncomingMessagesKey _nInvokes // keep this last ) @@ -146,6 +152,8 @@ func Online() Option { Override(NatPortMapKey, lp2p.NatPortMap), Override(ConnectionManagerKey, lp2p.ConnectionManager(50, 200, 20*time.Second)), + Override(new(*pubsub.PubSub), lp2p.GossipSub()), + Override(PstoreAddSelfKeysKey, lp2p.PstoreAddSelfKeys), Override(StartListeningKey, lp2p.StartListening(defConf.Libp2p.ListenAddresses)), @@ -159,14 +167,18 @@ func Online() Option { Override(new(*chain.Syncer), chain.NewSyncer), Override(new(*chain.BlockSync), chain.NewBlockSyncClient), Override(new(*chain.Wallet), chain.NewWallet), + Override(new(*chain.MessagePool), chain.NewMessagePool), Override(new(modules.Genesis), testing.MakeGenesis), Override(SetGenisisKey, modules.SetGenesis), Override(new(*hello.Service), hello.NewHelloService), - Override(RunHelloKey, hello.Run), + Override(new(*chain.BlockSyncService), chain.NewBlockSyncService), + Override(RunHelloKey, modules.RunHello), + Override(RunBlockSyncKey, modules.RunBlockSync), + Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks), + Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages), ) - } // Config sets up constructors based on the provided config diff --git a/node/hello/hello.go b/node/hello/hello.go index 65f9a6e8e..d45aca7c7 100644 --- a/node/hello/hello.go +++ b/node/hello/hello.go @@ -5,9 +5,6 @@ import ( "fmt" "github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/lib/cborrpc" - "github.com/filecoin-project/go-lotus/node/modules/helpers" - "go.uber.org/fx" - "github.com/libp2p/go-libp2p-core/host" "github.com/ipfs/go-cid" @@ -102,20 +99,3 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error { return nil } - -func Run(mctx helpers.MetricsCtx, lc fx.Lifecycle, hs *Service, h host.Host) { - ctx := helpers.LifecycleCtx(mctx, lc) - h.SetStreamHandler(ProtocolID, hs.HandleStream) - - bundle := inet.NotifyBundle{ - ConnectedF: func(_ inet.Network, c inet.Conn) { - go func() { - if err := hs.SayHello(ctx, c.RemotePeer()); err != nil { - log.Error("failed to say hello: ", err) - return - } - }() - }, - } - h.Network().Notify(&bundle) -} diff --git a/node/modules/hello.go b/node/modules/hello.go deleted file mode 100644 index aa1cdfeae..000000000 --- a/node/modules/hello.go +++ /dev/null @@ -1,25 +0,0 @@ -package modules - -import ( - "github.com/filecoin-project/go-lotus/node/hello" - "github.com/filecoin-project/go-lotus/node/modules/helpers" - "github.com/libp2p/go-libp2p-core/host" - inet "github.com/libp2p/go-libp2p-core/network" - "go.uber.org/fx" -) - -func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) { - h.SetStreamHandler(hello.ProtocolID, svc.HandleStream) - - bundle := inet.NotifyBundle{ - ConnectedF: func(_ inet.Network, c inet.Conn) { - go func() { - if err := svc.SayHello(helpers.LifecycleCtx(mctx, lc), c.RemotePeer()); err != nil { - log.Warnw("failed to say hello", "error", err) - return - } - }() - }, - } - h.Network().Notify(&bundle) -} diff --git a/node/modules/services.go b/node/modules/services.go new file mode 100644 index 000000000..f1af01436 --- /dev/null +++ b/node/modules/services.go @@ -0,0 +1,54 @@ +package modules + +import ( + "github.com/filecoin-project/go-lotus/chain" + "github.com/filecoin-project/go-lotus/chain/sub" + "github.com/filecoin-project/go-lotus/node/hello" + "github.com/filecoin-project/go-lotus/node/modules/helpers" + "github.com/libp2p/go-libp2p-core/host" + inet "github.com/libp2p/go-libp2p-core/network" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "go.uber.org/fx" +) + +func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) { + h.SetStreamHandler(hello.ProtocolID, svc.HandleStream) + + bundle := inet.NotifyBundle{ + ConnectedF: func(_ inet.Network, c inet.Conn) { + go func() { + if err := svc.SayHello(helpers.LifecycleCtx(mctx, lc), c.RemotePeer()); err != nil { + log.Warnw("failed to say hello", "error", err) + return + } + }() + }, + } + h.Network().Notify(&bundle) +} + +func RunBlockSync(h host.Host, svc *chain.BlockSyncService) { + h.SetStreamHandler(chain.BlockSyncProtocolID, svc.HandleStream) +} + +func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubsub.PubSub, s *chain.Syncer) { + ctx := helpers.LifecycleCtx(mctx, lc) + + blocksub, err := pubsub.Subscribe("/fil/blocks") + if err != nil { + panic(err) + } + + go sub.HandleIncomingBlocks(ctx, blocksub, s) +} + +func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubsub.PubSub, mpool *chain.MessagePool) { + ctx := helpers.LifecycleCtx(mctx, lc) + + msgsub, err := pubsub.Subscribe("/fil/messages") + if err != nil { + panic(err) + } + + go sub.HandleIncomingMessages(ctx, mpool, msgsub) +}