2019-10-17 08:57:56 +00:00
package peermgr
import (
"context"
"sync"
"time"
2022-06-14 15:00:51 +00:00
logging "github.com/ipfs/go-log/v2"
2019-10-17 08:57:56 +00:00
dht "github.com/libp2p/go-libp2p-kad-dht"
2022-08-25 18:20:41 +00:00
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
net "github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
2022-06-14 15:00:51 +00:00
"go.opencensus.io/stats"
"go.uber.org/fx"
"go.uber.org/multierr"
"golang.org/x/xerrors"
2019-10-17 08:57:56 +00:00
2022-06-14 15:00:51 +00:00
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/modules/dtypes"
2019-10-17 08:57:56 +00:00
)
var log = logging . Logger ( "peermgr" )
const (
MaxFilPeers = 32
2019-12-11 20:19:47 +00:00
MinFilPeers = 12
2019-10-17 08:57:56 +00:00
)
2019-12-10 15:02:01 +00:00
type MaybePeerMgr struct {
fx . In
Mgr * PeerMgr ` optional:"true" `
}
2019-10-17 08:57:56 +00:00
type PeerMgr struct {
bootstrappers [ ] peer . AddrInfo
// peerLeads is a set of peers we hear about through the network
// and who may be good peers to connect to for expanding our peer set
2019-12-05 02:04:09 +00:00
//peerLeads map[peer.ID]time.Time // TODO: unused
2019-10-17 08:57:56 +00:00
peersLk sync . Mutex
2019-12-10 15:02:01 +00:00
peers map [ peer . ID ] time . Duration
2019-10-17 08:57:56 +00:00
maxFilPeers int
minFilPeers int
2020-01-23 23:40:17 +00:00
expanding chan struct { }
2019-10-17 08:57:56 +00:00
h host . Host
dht * dht . IpfsDHT
2020-11-20 14:15:44 +00:00
notifee * net . NotifyBundle
emitter event . Emitter
2020-07-20 08:49:13 +00:00
done chan struct { }
2019-10-17 08:57:56 +00:00
}
2020-11-20 14:15:44 +00:00
type FilPeerEvt struct {
Type FilPeerEvtType
ID peer . ID
2020-09-08 08:18:51 +00:00
}
2020-11-20 14:15:44 +00:00
type FilPeerEvtType int
const (
AddFilPeerEvt FilPeerEvtType = iota
RemoveFilPeerEvt
)
2020-11-06 16:26:14 +00:00
2020-09-08 08:18:51 +00:00
func NewPeerMgr ( lc fx . Lifecycle , h host . Host , dht * dht . IpfsDHT , bootstrap dtypes . BootstrapPeers ) ( * PeerMgr , error ) {
2019-10-17 08:57:56 +00:00
pm := & PeerMgr {
2019-10-23 11:11:18 +00:00
h : h ,
dht : dht ,
bootstrappers : bootstrap ,
2020-01-23 23:40:17 +00:00
peers : make ( map [ peer . ID ] time . Duration ) ,
expanding : make ( chan struct { } , 1 ) ,
2019-10-23 11:11:18 +00:00
2019-10-17 08:57:56 +00:00
maxFilPeers : MaxFilPeers ,
minFilPeers : MinFilPeers ,
2020-07-20 08:49:13 +00:00
done : make ( chan struct { } ) ,
2019-10-17 08:57:56 +00:00
}
2020-11-20 14:15:44 +00:00
emitter , err := h . EventBus ( ) . Emitter ( new ( FilPeerEvt ) )
2020-11-06 16:26:14 +00:00
if err != nil {
2020-11-20 14:15:44 +00:00
return nil , xerrors . Errorf ( "creating FilPeerEvt emitter: %w" , err )
2020-11-06 16:26:14 +00:00
}
2020-11-20 14:15:44 +00:00
pm . emitter = emitter
2019-10-17 08:57:56 +00:00
2020-07-20 08:49:13 +00:00
lc . Append ( fx . Hook {
OnStop : func ( ctx context . Context ) error {
2020-09-08 08:18:51 +00:00
return multierr . Combine (
2020-11-20 14:15:44 +00:00
pm . emitter . Close ( ) ,
2020-09-08 08:18:51 +00:00
pm . Stop ( ctx ) ,
)
2020-07-20 08:49:13 +00:00
} ,
} )
2019-10-17 08:57:56 +00:00
pm . notifee = & net . NotifyBundle {
DisconnectedF : func ( _ net . Network , c net . Conn ) {
pm . Disconnect ( c . RemotePeer ( ) )
} ,
}
h . Network ( ) . Notify ( pm . notifee )
2020-09-08 08:18:51 +00:00
return pm , nil
2019-10-17 08:57:56 +00:00
}
func ( pmgr * PeerMgr ) AddFilecoinPeer ( p peer . ID ) {
2020-11-20 14:15:44 +00:00
_ = pmgr . emitter . Emit ( FilPeerEvt { Type : AddFilPeerEvt , ID : p } ) //nolint:errcheck
2019-10-17 08:57:56 +00:00
pmgr . peersLk . Lock ( )
defer pmgr . peersLk . Unlock ( )
2019-12-10 15:02:01 +00:00
pmgr . peers [ p ] = time . Duration ( 0 )
}
func ( pmgr * PeerMgr ) GetPeerLatency ( p peer . ID ) ( time . Duration , bool ) {
pmgr . peersLk . Lock ( )
defer pmgr . peersLk . Unlock ( )
dur , ok := pmgr . peers [ p ]
return dur , ok
}
func ( pmgr * PeerMgr ) SetPeerLatency ( p peer . ID , latency time . Duration ) {
pmgr . peersLk . Lock ( )
defer pmgr . peersLk . Unlock ( )
if _ , ok := pmgr . peers [ p ] ; ok {
pmgr . peers [ p ] = latency
}
2019-10-17 08:57:56 +00:00
}
func ( pmgr * PeerMgr ) Disconnect ( p peer . ID ) {
2020-11-06 16:26:14 +00:00
disconnected := false
2019-10-17 08:57:56 +00:00
if pmgr . h . Network ( ) . Connectedness ( p ) == net . NotConnected {
pmgr . peersLk . Lock ( )
2020-11-06 16:26:14 +00:00
_ , disconnected = pmgr . peers [ p ]
if disconnected {
delete ( pmgr . peers , p )
}
pmgr . peersLk . Unlock ( )
}
if disconnected {
2020-11-20 14:15:44 +00:00
_ = pmgr . emitter . Emit ( FilPeerEvt { Type : RemoveFilPeerEvt , ID : p } ) //nolint:errcheck
2019-10-17 08:57:56 +00:00
}
}
2020-07-20 08:49:13 +00:00
func ( pmgr * PeerMgr ) Stop ( ctx context . Context ) error {
log . Warn ( "closing peermgr done" )
close ( pmgr . done )
return nil
}
2019-10-17 08:57:56 +00:00
func ( pmgr * PeerMgr ) Run ( ctx context . Context ) {
2020-07-10 14:43:14 +00:00
tick := build . Clock . Ticker ( time . Second * 5 )
2019-10-17 08:57:56 +00:00
for {
select {
case <- tick . C :
pcount := pmgr . getPeerCount ( )
if pcount < pmgr . minFilPeers {
pmgr . expandPeers ( )
} else if pcount > pmgr . maxFilPeers {
2020-07-22 00:40:41 +00:00
log . Debugf ( "peer count about threshold: %d > %d" , pcount , pmgr . maxFilPeers )
2019-10-17 08:57:56 +00:00
}
2020-02-26 02:42:34 +00:00
stats . Record ( ctx , metrics . PeerCount . M ( int64 ( pmgr . getPeerCount ( ) ) ) )
2020-07-20 08:49:13 +00:00
case <- pmgr . done :
log . Warn ( "exiting peermgr run" )
return
2019-10-17 08:57:56 +00:00
}
}
}
func ( pmgr * PeerMgr ) getPeerCount ( ) int {
pmgr . peersLk . Lock ( )
defer pmgr . peersLk . Unlock ( )
return len ( pmgr . peers )
}
func ( pmgr * PeerMgr ) expandPeers ( ) {
2020-01-23 23:40:17 +00:00
select {
case pmgr . expanding <- struct { } { } :
default :
2019-10-17 08:57:56 +00:00
return
}
2020-01-23 23:40:17 +00:00
2019-10-17 08:57:56 +00:00
go func ( ) {
ctx , cancel := context . WithTimeout ( context . TODO ( ) , time . Second * 30 )
defer cancel ( )
pmgr . doExpand ( ctx )
2020-01-23 23:40:17 +00:00
<- pmgr . expanding
2019-10-17 08:57:56 +00:00
} ( )
}
func ( pmgr * PeerMgr ) doExpand ( ctx context . Context ) {
pcount := pmgr . getPeerCount ( )
if pcount == 0 {
if len ( pmgr . bootstrappers ) == 0 {
log . Warn ( "no peers connected, and no bootstrappers configured" )
return
}
log . Info ( "connecting to bootstrap peers" )
2020-11-10 09:23:46 +00:00
wg := sync . WaitGroup { }
2019-10-17 08:57:56 +00:00
for _ , bsp := range pmgr . bootstrappers {
2020-11-10 09:23:46 +00:00
wg . Add ( 1 )
go func ( bsp peer . AddrInfo ) {
defer wg . Done ( )
if err := pmgr . h . Connect ( ctx , bsp ) ; err != nil {
log . Warnf ( "failed to connect to bootstrap peer: %s" , err )
}
} ( bsp )
2019-10-17 08:57:56 +00:00
}
2020-11-10 09:23:46 +00:00
wg . Wait ( )
2019-10-17 08:57:56 +00:00
return
}
// if we already have some peers and need more, the dht is really good at connecting to most peers. Use that for now until something better comes along.
if err := pmgr . dht . Bootstrap ( ctx ) ; err != nil {
log . Warnf ( "dht bootstrapping failed: %s" , err )
}
}