lotus/lib/peermgr/peermgr.go

213 lines
4.6 KiB
Go
Raw Normal View History

2019-10-17 08:57:56 +00:00
package peermgr
import (
"context"
"sync"
"time"
2020-07-10 14:43:14 +00:00
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"go.opencensus.io/stats"
"go.uber.org/fx"
"go.uber.org/multierr"
"golang.org/x/xerrors"
"github.com/libp2p/go-libp2p-core/event"
2019-10-17 08:57:56 +00:00
host "github.com/libp2p/go-libp2p-core/host"
net "github.com/libp2p/go-libp2p-core/network"
peer "github.com/libp2p/go-libp2p-core/peer"
dht "github.com/libp2p/go-libp2p-kad-dht"
logging "github.com/ipfs/go-log/v2"
2019-10-17 08:57:56 +00:00
)
var log = logging.Logger("peermgr")
const (
MaxFilPeers = 32
MinFilPeers = 12
2019-10-17 08:57:56 +00:00
)
type MaybePeerMgr struct {
fx.In
Mgr *PeerMgr `optional:"true"`
}
2019-10-17 08:57:56 +00:00
type PeerMgr struct {
bootstrappers []peer.AddrInfo
// peerLeads is a set of peers we hear about through the network
// and who may be good peers to connect to for expanding our peer set
//peerLeads map[peer.ID]time.Time // TODO: unused
2019-10-17 08:57:56 +00:00
peersLk sync.Mutex
peers map[peer.ID]time.Duration
2019-10-17 08:57:56 +00:00
maxFilPeers int
minFilPeers int
expanding chan struct{}
2019-10-17 08:57:56 +00:00
h host.Host
dht *dht.IpfsDHT
notifee *net.NotifyBundle
filPeerEmitter event.Emitter
2020-07-20 08:49:13 +00:00
done chan struct{}
2019-10-17 08:57:56 +00:00
}
type NewFilPeer struct {
Id peer.ID
}
func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) (*PeerMgr, error) {
2019-10-17 08:57:56 +00:00
pm := &PeerMgr{
h: h,
dht: dht,
bootstrappers: bootstrap,
peers: make(map[peer.ID]time.Duration),
expanding: make(chan struct{}, 1),
2019-10-17 08:57:56 +00:00
maxFilPeers: MaxFilPeers,
minFilPeers: MinFilPeers,
2020-07-20 08:49:13 +00:00
done: make(chan struct{}),
2019-10-17 08:57:56 +00:00
}
emitter, err := h.EventBus().Emitter(new(NewFilPeer))
if err != nil {
return nil, xerrors.Errorf("creating NewFilPeer emitter: %w", err)
}
pm.filPeerEmitter = emitter
2019-10-17 08:57:56 +00:00
2020-07-20 08:49:13 +00:00
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return multierr.Combine(
pm.filPeerEmitter.Close(),
pm.Stop(ctx),
)
2020-07-20 08:49:13 +00:00
},
})
2019-10-17 08:57:56 +00:00
pm.notifee = &net.NotifyBundle{
DisconnectedF: func(_ net.Network, c net.Conn) {
pm.Disconnect(c.RemotePeer())
},
}
h.Network().Notify(pm.notifee)
return pm, nil
2019-10-17 08:57:56 +00:00
}
func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) {
_ = pmgr.filPeerEmitter.Emit(NewFilPeer{Id: p}) //nolint:errcheck
2019-10-17 08:57:56 +00:00
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
pmgr.peers[p] = time.Duration(0)
}
func (pmgr *PeerMgr) GetPeerLatency(p peer.ID) (time.Duration, bool) {
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
dur, ok := pmgr.peers[p]
return dur, ok
}
func (pmgr *PeerMgr) SetPeerLatency(p peer.ID, latency time.Duration) {
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
if _, ok := pmgr.peers[p]; ok {
pmgr.peers[p] = latency
}
2019-10-17 08:57:56 +00:00
}
func (pmgr *PeerMgr) Disconnect(p peer.ID) {
if pmgr.h.Network().Connectedness(p) == net.NotConnected {
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
delete(pmgr.peers, p)
}
}
2020-07-20 08:49:13 +00:00
func (pmgr *PeerMgr) Stop(ctx context.Context) error {
log.Warn("closing peermgr done")
close(pmgr.done)
return nil
}
2019-10-17 08:57:56 +00:00
func (pmgr *PeerMgr) Run(ctx context.Context) {
2020-07-10 14:43:14 +00:00
tick := build.Clock.Ticker(time.Second * 5)
2019-10-17 08:57:56 +00:00
for {
select {
case <-tick.C:
pcount := pmgr.getPeerCount()
if pcount < pmgr.minFilPeers {
pmgr.expandPeers()
} else if pcount > pmgr.maxFilPeers {
2020-07-22 00:40:41 +00:00
log.Debugf("peer count about threshold: %d > %d", pcount, pmgr.maxFilPeers)
2019-10-17 08:57:56 +00:00
}
stats.Record(ctx, metrics.PeerCount.M(int64(pmgr.getPeerCount())))
2020-07-20 08:49:13 +00:00
case <-pmgr.done:
log.Warn("exiting peermgr run")
return
2019-10-17 08:57:56 +00:00
}
}
}
func (pmgr *PeerMgr) getPeerCount() int {
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
return len(pmgr.peers)
}
func (pmgr *PeerMgr) expandPeers() {
select {
case pmgr.expanding <- struct{}{}:
default:
2019-10-17 08:57:56 +00:00
return
}
2019-10-17 08:57:56 +00:00
go func() {
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
defer cancel()
pmgr.doExpand(ctx)
<-pmgr.expanding
2019-10-17 08:57:56 +00:00
}()
}
func (pmgr *PeerMgr) doExpand(ctx context.Context) {
pcount := pmgr.getPeerCount()
if pcount == 0 {
if len(pmgr.bootstrappers) == 0 {
log.Warn("no peers connected, and no bootstrappers configured")
return
}
log.Info("connecting to bootstrap peers")
2020-11-10 09:23:46 +00:00
wg := sync.WaitGroup{}
2019-10-17 08:57:56 +00:00
for _, bsp := range pmgr.bootstrappers {
2020-11-10 09:23:46 +00:00
wg.Add(1)
go func(bsp peer.AddrInfo) {
defer wg.Done()
if err := pmgr.h.Connect(ctx, bsp); err != nil {
log.Warnf("failed to connect to bootstrap peer: %s", err)
}
}(bsp)
2019-10-17 08:57:56 +00:00
}
2020-11-10 09:23:46 +00:00
wg.Wait()
2019-10-17 08:57:56 +00:00
return
}
// if we already have some peers and need more, the dht is really good at connecting to most peers. Use that for now until something better comes along.
if err := pmgr.dht.Bootstrap(ctx); err != nil {
log.Warnf("dht bootstrapping failed: %s", err)
}
}