Pubsub for blocks/messages

This commit is contained in:
Łukasz Magiera 2019-07-08 16:07:09 +02:00
parent e162b5395c
commit d1eb9073d1
6 changed files with 139 additions and 56 deletions

62
chain/sub/incoming.go Normal file
View File

@ -0,0 +1,62 @@
package sub
import (
"context"
"fmt"
"github.com/filecoin-project/go-lotus/chain"
logging "github.com/ipfs/go-log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
var log = logging.Logger("sub")
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer) {
for {
msg, err := bsub.Next(ctx)
if err != nil {
fmt.Println("error from block subscription: ", err)
continue
}
blk, err := chain.DecodeBlockMsg(msg.GetData())
if err != nil {
log.Error("got invalid block over pubsub: ", err)
continue
}
go func() {
msgs, err := s.Bsync.FetchMessagesByCids(blk.Messages)
if err != nil {
log.Errorf("failed to fetch all messages for block received over pubusb: %s", err)
return
}
fmt.Println("inform new block over pubsub")
s.InformNewBlock(msg.GetFrom(), &chain.FullBlock{
Header: blk.Header,
Messages: msgs,
})
}()
}
}
func HandleIncomingMessages(ctx context.Context, mpool *chain.MessagePool, msub *pubsub.Subscription) {
for {
msg, err := msub.Next(ctx)
if err != nil {
fmt.Println("error from message subscription: ", err)
continue
}
m, err := chain.DecodeSignedMessage(msg.GetData())
if err != nil {
log.Errorf("got incorrectly formatted Message: %s", err)
continue
}
if err := mpool.Add(m); err != nil {
log.Errorf("failed to add message from network to message pool: %s", err)
continue
}
}
}

View File

@ -35,7 +35,7 @@ type Syncer struct {
bad BadTipSetCache bad BadTipSetCache
// handle to the block sync service // handle to the block sync service
bsync *BlockSync Bsync *BlockSync
// peer heads // peer heads
// Note: clear cache on disconnects // Note: clear cache on disconnects
@ -57,7 +57,7 @@ func NewSyncer(cs *ChainStore, bsync *BlockSync) (*Syncer, error) {
return &Syncer{ return &Syncer{
syncMode: Bootstrap, syncMode: Bootstrap,
Genesis: gent, Genesis: gent,
bsync: bsync, Bsync: bsync,
peerHeads: make(map[peer.ID]*TipSet), peerHeads: make(map[peer.ID]*TipSet),
head: cs.GetHeaviestTipSet(), head: cs.GetHeaviestTipSet(),
store: cs, store: cs,
@ -123,7 +123,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *FullTipSet) {
syncer.peerHeadsLk.Lock() syncer.peerHeadsLk.Lock()
syncer.peerHeads[from] = fts.TipSet() syncer.peerHeads[from] = fts.TipSet()
syncer.peerHeadsLk.Unlock() syncer.peerHeadsLk.Unlock()
syncer.bsync.AddPeer(from) syncer.Bsync.AddPeer(from)
go func() { go func() {
syncer.syncLock.Lock() syncer.syncLock.Lock()
@ -185,7 +185,7 @@ func (syncer *Syncer) SyncBootstrap() {
// requested, and that they are correctly linked to eachother. It does // requested, and that they are correctly linked to eachother. It does
// not validate any state transitions // not validate any state transitions
fmt.Println("Get blocks: ", cur) fmt.Println("Get blocks: ", cur)
blks, err := syncer.bsync.GetBlocks(context.TODO(), cur, 10) blks, err := syncer.Bsync.GetBlocks(context.TODO(), cur, 10)
if err != nil { if err != nil {
log.Error("failed to get blocks: ", err) log.Error("failed to get blocks: ", err)
return return
@ -238,7 +238,7 @@ func (syncer *Syncer) SyncBootstrap() {
} }
next := blockSet[nextHeight] next := blockSet[nextHeight]
bstips, err := syncer.bsync.GetChainMessages(ctx, next, (nextHeight+1)-i) bstips, err := syncer.Bsync.GetChainMessages(ctx, next, (nextHeight+1)-i)
if err != nil { if err != nil {
log.Errorf("failed to fetch messages: %s", err) log.Errorf("failed to fetch messages: %s", err)
return return
@ -400,7 +400,7 @@ func (syncer *Syncer) FetchTipSet(ctx context.Context, p peer.ID, cids []cid.Cid
return fts, nil return fts, nil
} }
return syncer.bsync.GetFullTipSet(ctx, p, cids) return syncer.Bsync.GetFullTipSet(ctx, p, cids)
} }
func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*FullTipSet, error) { func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*FullTipSet, error) {

View File

@ -3,19 +3,19 @@ package node
import ( import (
"context" "context"
"errors" "errors"
"github.com/filecoin-project/go-lotus/node/modules/testing"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
"reflect" "reflect"
"time" "time"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
ci "github.com/libp2p/go-libp2p-core/crypto" ci "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing" "github.com/libp2p/go-libp2p-core/routing"
"github.com/libp2p/go-libp2p-peerstore/pstoremem" "github.com/libp2p/go-libp2p-peerstore/pstoremem"
pubsub "github.com/libp2p/go-libp2p-pubsub"
record "github.com/libp2p/go-libp2p-record" record "github.com/libp2p/go-libp2p-record"
"go.uber.org/fx" "go.uber.org/fx"
@ -27,6 +27,7 @@ import (
"github.com/filecoin-project/go-lotus/node/modules" "github.com/filecoin-project/go-lotus/node/modules"
"github.com/filecoin-project/go-lotus/node/modules/helpers" "github.com/filecoin-project/go-lotus/node/modules/helpers"
"github.com/filecoin-project/go-lotus/node/modules/lp2p" "github.com/filecoin-project/go-lotus/node/modules/lp2p"
"github.com/filecoin-project/go-lotus/node/modules/testing"
) )
// special is a type used to give keys to modules which // special is a type used to give keys to modules which
@ -58,7 +59,12 @@ const (
// filecoin // filecoin
SetGenisisKey SetGenisisKey
RunHelloKey RunHelloKey
RunBlockSyncKey
HandleIncomingBlocksKey
HandleIncomingMessagesKey
_nInvokes // keep this last _nInvokes // keep this last
) )
@ -146,6 +152,8 @@ func Online() Option {
Override(NatPortMapKey, lp2p.NatPortMap), Override(NatPortMapKey, lp2p.NatPortMap),
Override(ConnectionManagerKey, lp2p.ConnectionManager(50, 200, 20*time.Second)), Override(ConnectionManagerKey, lp2p.ConnectionManager(50, 200, 20*time.Second)),
Override(new(*pubsub.PubSub), lp2p.GossipSub()),
Override(PstoreAddSelfKeysKey, lp2p.PstoreAddSelfKeys), Override(PstoreAddSelfKeysKey, lp2p.PstoreAddSelfKeys),
Override(StartListeningKey, lp2p.StartListening(defConf.Libp2p.ListenAddresses)), Override(StartListeningKey, lp2p.StartListening(defConf.Libp2p.ListenAddresses)),
@ -159,14 +167,18 @@ func Online() Option {
Override(new(*chain.Syncer), chain.NewSyncer), Override(new(*chain.Syncer), chain.NewSyncer),
Override(new(*chain.BlockSync), chain.NewBlockSyncClient), Override(new(*chain.BlockSync), chain.NewBlockSyncClient),
Override(new(*chain.Wallet), chain.NewWallet), Override(new(*chain.Wallet), chain.NewWallet),
Override(new(*chain.MessagePool), chain.NewMessagePool),
Override(new(modules.Genesis), testing.MakeGenesis), Override(new(modules.Genesis), testing.MakeGenesis),
Override(SetGenisisKey, modules.SetGenesis), Override(SetGenisisKey, modules.SetGenesis),
Override(new(*hello.Service), hello.NewHelloService), Override(new(*hello.Service), hello.NewHelloService),
Override(RunHelloKey, hello.Run), Override(new(*chain.BlockSyncService), chain.NewBlockSyncService),
Override(RunHelloKey, modules.RunHello),
Override(RunBlockSyncKey, modules.RunBlockSync),
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),
Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages),
) )
} }
// Config sets up constructors based on the provided config // Config sets up constructors based on the provided config

View File

@ -5,9 +5,6 @@ import (
"fmt" "fmt"
"github.com/filecoin-project/go-lotus/chain" "github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/lib/cborrpc" "github.com/filecoin-project/go-lotus/lib/cborrpc"
"github.com/filecoin-project/go-lotus/node/modules/helpers"
"go.uber.org/fx"
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -102,20 +99,3 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
return nil return nil
} }
func Run(mctx helpers.MetricsCtx, lc fx.Lifecycle, hs *Service, h host.Host) {
ctx := helpers.LifecycleCtx(mctx, lc)
h.SetStreamHandler(ProtocolID, hs.HandleStream)
bundle := inet.NotifyBundle{
ConnectedF: func(_ inet.Network, c inet.Conn) {
go func() {
if err := hs.SayHello(ctx, c.RemotePeer()); err != nil {
log.Error("failed to say hello: ", err)
return
}
}()
},
}
h.Network().Notify(&bundle)
}

View File

@ -1,25 +0,0 @@
package modules
import (
"github.com/filecoin-project/go-lotus/node/hello"
"github.com/filecoin-project/go-lotus/node/modules/helpers"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
"go.uber.org/fx"
)
func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) {
h.SetStreamHandler(hello.ProtocolID, svc.HandleStream)
bundle := inet.NotifyBundle{
ConnectedF: func(_ inet.Network, c inet.Conn) {
go func() {
if err := svc.SayHello(helpers.LifecycleCtx(mctx, lc), c.RemotePeer()); err != nil {
log.Warnw("failed to say hello", "error", err)
return
}
}()
},
}
h.Network().Notify(&bundle)
}

54
node/modules/services.go Normal file
View File

@ -0,0 +1,54 @@
package modules
import (
"github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/sub"
"github.com/filecoin-project/go-lotus/node/hello"
"github.com/filecoin-project/go-lotus/node/modules/helpers"
"github.com/libp2p/go-libp2p-core/host"
inet "github.com/libp2p/go-libp2p-core/network"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"
)
func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) {
h.SetStreamHandler(hello.ProtocolID, svc.HandleStream)
bundle := inet.NotifyBundle{
ConnectedF: func(_ inet.Network, c inet.Conn) {
go func() {
if err := svc.SayHello(helpers.LifecycleCtx(mctx, lc), c.RemotePeer()); err != nil {
log.Warnw("failed to say hello", "error", err)
return
}
}()
},
}
h.Network().Notify(&bundle)
}
func RunBlockSync(h host.Host, svc *chain.BlockSyncService) {
h.SetStreamHandler(chain.BlockSyncProtocolID, svc.HandleStream)
}
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubsub.PubSub, s *chain.Syncer) {
ctx := helpers.LifecycleCtx(mctx, lc)
blocksub, err := pubsub.Subscribe("/fil/blocks")
if err != nil {
panic(err)
}
go sub.HandleIncomingBlocks(ctx, blocksub, s)
}
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubsub.PubSub, mpool *chain.MessagePool) {
ctx := helpers.LifecycleCtx(mctx, lc)
msgsub, err := pubsub.Subscribe("/fil/messages")
if err != nil {
panic(err)
}
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
}