Forward peers from hello to blocksync
Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
parent
93176c91f4
commit
74e577610a
@ -11,6 +11,7 @@ import (
|
||||
inet "github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"go.opencensus.io/trace"
|
||||
"go.uber.org/fx"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
cborutil "github.com/filecoin-project/go-cbor-util"
|
||||
@ -36,12 +37,13 @@ type BlockSync struct {
|
||||
}
|
||||
|
||||
func NewClient(
|
||||
lc fx.Lifecycle,
|
||||
host host.Host,
|
||||
pmgr peermgr.MaybePeerMgr,
|
||||
) *BlockSync {
|
||||
return &BlockSync{
|
||||
host: host,
|
||||
peerTracker: newPeerTracker(pmgr.Mgr),
|
||||
peerTracker: newPeerTracker(lc, host, pmgr.Mgr),
|
||||
}
|
||||
}
|
||||
|
||||
@ -360,6 +362,7 @@ func (client *BlockSync) sendRequestToPeer(
|
||||
|
||||
supported, err := client.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID)
|
||||
if err != nil {
|
||||
client.RemovePeer(peer)
|
||||
return nil, xerrors.Errorf("failed to get protocols for peer: %w", err)
|
||||
}
|
||||
if len(supported) == 0 || supported[0] != BlockSyncProtocolID {
|
||||
|
@ -3,11 +3,14 @@ package blocksync
|
||||
// FIXME: This needs to be reviewed.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
host "github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"go.uber.org/fx"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/lib/peermgr"
|
||||
@ -29,11 +32,33 @@ type bsPeerTracker struct {
|
||||
pmgr *peermgr.PeerMgr
|
||||
}
|
||||
|
||||
func newPeerTracker(pmgr *peermgr.PeerMgr) *bsPeerTracker {
|
||||
return &bsPeerTracker{
|
||||
func newPeerTracker(lc fx.Lifecycle, h host.Host, pmgr *peermgr.PeerMgr) *bsPeerTracker {
|
||||
bsPt := &bsPeerTracker{
|
||||
peers: make(map[peer.ID]*peerStats),
|
||||
pmgr: pmgr,
|
||||
}
|
||||
|
||||
sub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
go func() {
|
||||
var newPeer interface{}
|
||||
ok := true
|
||||
for ok {
|
||||
newPeer, ok = <-sub.Out()
|
||||
log.Warnf("new peer from hello in tracker: %s", newPeer.(peermgr.NewFilPeer).Id)
|
||||
bsPt.addPeer(newPeer.(peermgr.NewFilPeer).Id)
|
||||
}
|
||||
}()
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(ctx context.Context) error {
|
||||
return sub.Close()
|
||||
},
|
||||
})
|
||||
|
||||
return bsPt
|
||||
}
|
||||
|
||||
func (bpt *bsPeerTracker) addPeer(p peer.ID) {
|
||||
|
@ -10,7 +10,10 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"go.opencensus.io/stats"
|
||||
"go.uber.org/fx"
|
||||
"go.uber.org/multierr"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/event"
|
||||
host "github.com/libp2p/go-libp2p-core/host"
|
||||
net "github.com/libp2p/go-libp2p-core/network"
|
||||
peer "github.com/libp2p/go-libp2p-core/peer"
|
||||
@ -50,12 +53,17 @@ type PeerMgr struct {
|
||||
h host.Host
|
||||
dht *dht.IpfsDHT
|
||||
|
||||
notifee *net.NotifyBundle
|
||||
notifee *net.NotifyBundle
|
||||
filPeerEmitter event.Emitter
|
||||
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) *PeerMgr {
|
||||
type NewFilPeer struct {
|
||||
Id peer.ID
|
||||
}
|
||||
|
||||
func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) (*PeerMgr, error) {
|
||||
pm := &PeerMgr{
|
||||
h: h,
|
||||
dht: dht,
|
||||
@ -69,10 +77,18 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes
|
||||
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
emitter, err := h.EventBus().Emitter(new(NewFilPeer))
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("creating NewFilPeer emitter: %w", err)
|
||||
}
|
||||
pm.filPeerEmitter = emitter
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(ctx context.Context) error {
|
||||
return pm.Stop(ctx)
|
||||
return multierr.Combine(
|
||||
pm.filPeerEmitter.Close(),
|
||||
pm.Stop(ctx),
|
||||
)
|
||||
},
|
||||
})
|
||||
|
||||
@ -84,10 +100,11 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes
|
||||
|
||||
h.Network().Notify(pm.notifee)
|
||||
|
||||
return pm
|
||||
return pm, nil
|
||||
}
|
||||
|
||||
func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) {
|
||||
pmgr.filPeerEmitter.Emit(NewFilPeer{Id: p})
|
||||
pmgr.peersLk.Lock()
|
||||
defer pmgr.peersLk.Unlock()
|
||||
pmgr.peers[p] = time.Duration(0)
|
||||
|
Loading…
Reference in New Issue
Block a user