Initial pass at a peer manager

This commit is contained in:
whyrusleeping 2019-10-17 17:57:56 +09:00 committed by Łukasz Magiera
parent ec5e075fa8
commit fc7c7ddd97
5 changed files with 162 additions and 13 deletions

View File

@ -5,17 +5,6 @@ import (
"fmt"
"math/big"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/aerrors"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/bufbstore"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opencensus.io/trace"
block "github.com/ipfs/go-block-format"
bserv "github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
@ -24,7 +13,17 @@ import (
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
dag "github.com/ipfs/go-merkledag"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/aerrors"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/bufbstore"
)
var log = logging.Logger("vm")

View File

@ -37,6 +37,7 @@ import (
"github.com/filecoin-project/lotus/node/modules/testing"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/paych"
"github.com/filecoin-project/lotus/peermgr"
"github.com/filecoin-project/lotus/retrieval"
"github.com/filecoin-project/lotus/retrieval/discovery"
"github.com/filecoin-project/lotus/storage"
@ -78,6 +79,7 @@ const (
RunHelloKey
RunBlockSyncKey
RunPeerMgrKey
HandleIncomingBlocksKey
HandleIncomingMessagesKey
@ -227,8 +229,11 @@ func Online() Option {
Override(new(*hello.Service), hello.NewHelloService),
Override(new(*chain.BlockSyncService), chain.NewBlockSyncService),
Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr),
Override(RunHelloKey, modules.RunHello),
Override(RunBlockSyncKey, modules.RunBlockSync),
Override(RunPeerMgrKey, modules.RunPeerMgr),
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),
Override(HeadMetricsKey, metrics.SendHeadNotifs("")),

View File

@ -3,7 +3,6 @@ package hello
import (
"context"
"fmt"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
@ -14,7 +13,9 @@ import (
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/cborrpc"
"github.com/filecoin-project/lotus/peermgr"
)
const ProtocolID = "/fil/hello/1.0.0"
@ -36,14 +37,16 @@ type Service struct {
cs *store.ChainStore
syncer *chain.Syncer
pmgr *peermgr.PeerMgr
}
func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer) *Service {
func NewHelloService(h host.Host, cs *store.ChainStore, syncer *chain.Syncer, pmgr *peermgr.PeerMgr) *Service {
return &Service{
newStream: h.NewStream,
cs: cs,
syncer: syncer,
pmgr: pmgr,
}
}
@ -74,6 +77,7 @@ func (hs *Service) HandleStream(s inet.Stream) {
log.Infof("Got new tipset through Hello: %s from %s", ts.Cids(), s.Conn().RemotePeer())
hs.syncer.InformNewHead(s.Conn().RemotePeer(), ts)
hs.pmgr.AddFilecoinPeer(s.Conn().RemotePeer())
}
func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
@ -88,6 +92,7 @@ func (hs *Service) SayHello(ctx context.Context, pid peer.ID) error {
if err != nil {
return err
}
gen, err := hs.cs.GetGenesis()
if err != nil {
return err

View File

@ -13,6 +13,7 @@ import (
"github.com/filecoin-project/lotus/chain/sub"
"github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/peermgr"
"github.com/filecoin-project/lotus/retrieval/discovery"
"github.com/filecoin-project/lotus/storage/sector"
)
@ -33,6 +34,10 @@ func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.
h.Network().Notify(&bundle)
}
func RunPeerMgr(mctx helpers.MetricsCtx, lc fx.Lifecycle, pmgr *peermgr.PeerMgr) {
go pmgr.Run(helpers.LifecycleCtx(mctx, lc))
}
func RunBlockSync(h host.Host, svc *chain.BlockSyncService) {
h.SetStreamHandler(chain.BlockSyncProtocolID, svc.HandleStream)
}

135
peermgr/peermgr.go Normal file
View File

@ -0,0 +1,135 @@
package peermgr
import (
"context"
"sync"
"time"
host "github.com/libp2p/go-libp2p-core/host"
net "github.com/libp2p/go-libp2p-core/network"
peer "github.com/libp2p/go-libp2p-core/peer"
dht "github.com/libp2p/go-libp2p-kad-dht"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("peermgr")
const (
MaxFilPeers = 32
MinFilPeers = 8
)
type PeerMgr struct {
bootstrappers []peer.AddrInfo
// peerLeads is a set of peers we hear about through the network
// and who may be good peers to connect to for expanding our peer set
peerLeads map[peer.ID]time.Time
peersLk sync.Mutex
peers map[peer.ID]struct{}
maxFilPeers int
minFilPeers int
expanding bool
h host.Host
dht *dht.IpfsDHT
notifee *net.NotifyBundle
}
func NewPeerMgr(h host.Host, dht *dht.IpfsDHT) *PeerMgr {
pm := &PeerMgr{
peers: make(map[peer.ID]struct{}),
maxFilPeers: MaxFilPeers,
minFilPeers: MinFilPeers,
h: h,
}
pm.notifee = &net.NotifyBundle{
DisconnectedF: func(_ net.Network, c net.Conn) {
pm.Disconnect(c.RemotePeer())
},
}
h.Network().Notify(pm.notifee)
return pm
}
func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) {
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
pmgr.peers[p] = struct{}{}
}
func (pmgr *PeerMgr) Disconnect(p peer.ID) {
if pmgr.h.Network().Connectedness(p) == net.NotConnected {
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
delete(pmgr.peers, p)
}
}
func (pmgr *PeerMgr) Run(ctx context.Context) {
tick := time.NewTicker(time.Second * 5)
for {
select {
case <-tick.C:
pcount := pmgr.getPeerCount()
if pcount < pmgr.minFilPeers {
pmgr.expandPeers()
} else if pcount > pmgr.maxFilPeers {
log.Infof("peer count about threshold: %d > %d", pcount, pmgr.maxFilPeers)
}
}
}
}
func (pmgr *PeerMgr) getPeerCount() int {
pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock()
return len(pmgr.peers)
}
func (pmgr *PeerMgr) expandPeers() {
if pmgr.expanding {
return
}
pmgr.expanding = true
go func() {
defer func() {
pmgr.expanding = false
}()
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
defer cancel()
pmgr.doExpand(ctx)
}()
}
func (pmgr *PeerMgr) doExpand(ctx context.Context) {
pcount := pmgr.getPeerCount()
if pcount == 0 {
if len(pmgr.bootstrappers) == 0 {
log.Warn("no peers connected, and no bootstrappers configured")
return
}
log.Info("connecting to bootstrap peers")
for _, bsp := range pmgr.bootstrappers {
if err := pmgr.h.Connect(ctx, bsp); err != nil {
log.Warnf("failed to connect to bootstrap peer: %s", err)
}
}
return
}
// if we already have some peers and need more, the dht is really good at connecting to most peers. Use that for now until something better comes along.
if err := pmgr.dht.Bootstrap(ctx); err != nil {
log.Warnf("dht bootstrapping failed: %s", err)
}
}