Merge pull request #3648 from filecoin-project/fix/dont-use-latency

Don't use latency as initital estimate for blocksync
This commit is contained in:
Łukasz Magiera 2020-09-08 14:56:23 +02:00 committed by GitHub
commit 867469e9b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 26 deletions

View File

@ -11,6 +11,7 @@ import (
inet "github.com/libp2p/go-libp2p-core/network" inet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"go.opencensus.io/trace" "go.opencensus.io/trace"
"go.uber.org/fx"
"golang.org/x/xerrors" "golang.org/x/xerrors"
cborutil "github.com/filecoin-project/go-cbor-util" cborutil "github.com/filecoin-project/go-cbor-util"
@ -36,12 +37,13 @@ type BlockSync struct {
} }
func NewClient( func NewClient(
lc fx.Lifecycle,
host host.Host, host host.Host,
pmgr peermgr.MaybePeerMgr, pmgr peermgr.MaybePeerMgr,
) *BlockSync { ) *BlockSync {
return &BlockSync{ return &BlockSync{
host: host, host: host,
peerTracker: newPeerTracker(pmgr.Mgr), peerTracker: newPeerTracker(lc, host, pmgr.Mgr),
} }
} }
@ -360,6 +362,7 @@ func (client *BlockSync) sendRequestToPeer(
supported, err := client.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID) supported, err := client.host.Peerstore().SupportsProtocols(peer, BlockSyncProtocolID)
if err != nil { if err != nil {
client.RemovePeer(peer)
return nil, xerrors.Errorf("failed to get protocols for peer: %w", err) return nil, xerrors.Errorf("failed to get protocols for peer: %w", err)
} }
if len(supported) == 0 || supported[0] != BlockSyncProtocolID { if len(supported) == 0 || supported[0] != BlockSyncProtocolID {
@ -385,7 +388,7 @@ func (client *BlockSync) sendRequestToPeer(
_ = stream.SetWriteDeadline(time.Now().Add(WRITE_REQ_DEADLINE)) _ = stream.SetWriteDeadline(time.Now().Add(WRITE_REQ_DEADLINE))
if err := cborutil.WriteCborRPC(stream, req); err != nil { if err := cborutil.WriteCborRPC(stream, req); err != nil {
_ = stream.SetWriteDeadline(time.Time{}) _ = stream.SetWriteDeadline(time.Time{})
client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart)) client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length)
// FIXME: Should we also remove peer here? // FIXME: Should we also remove peer here?
return nil, err return nil, err
} }
@ -398,7 +401,7 @@ func (client *BlockSync) sendRequestToPeer(
bufio.NewReader(incrt.New(stream, READ_RES_MIN_SPEED, READ_RES_DEADLINE)), bufio.NewReader(incrt.New(stream, READ_RES_MIN_SPEED, READ_RES_DEADLINE)),
&res) &res)
if err != nil { if err != nil {
client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart)) client.peerTracker.logFailure(peer, build.Clock.Since(connectionStart), req.Length)
return nil, xerrors.Errorf("failed to read blocksync response: %w", err) return nil, xerrors.Errorf("failed to read blocksync response: %w", err)
} }
@ -412,7 +415,7 @@ func (client *BlockSync) sendRequestToPeer(
) )
} }
client.peerTracker.logSuccess(peer, build.Clock.Since(connectionStart)) client.peerTracker.logSuccess(peer, build.Clock.Since(connectionStart), uint64(len(res.Chain)))
// FIXME: We should really log a success only after we validate the response. // FIXME: We should really log a success only after we validate the response.
// It might be a bit hard to do. // It might be a bit hard to do.
return &res, nil return &res, nil

View File

@ -3,11 +3,14 @@ package blocksync
// FIXME: This needs to be reviewed. // FIXME: This needs to be reviewed.
import ( import (
"context"
"sort" "sort"
"sync" "sync"
"time" "time"
host "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"go.uber.org/fx"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/lib/peermgr" "github.com/filecoin-project/lotus/lib/peermgr"
@ -29,11 +32,30 @@ type bsPeerTracker struct {
pmgr *peermgr.PeerMgr pmgr *peermgr.PeerMgr
} }
func newPeerTracker(pmgr *peermgr.PeerMgr) *bsPeerTracker { func newPeerTracker(lc fx.Lifecycle, h host.Host, pmgr *peermgr.PeerMgr) *bsPeerTracker {
return &bsPeerTracker{ bsPt := &bsPeerTracker{
peers: make(map[peer.ID]*peerStats), peers: make(map[peer.ID]*peerStats),
pmgr: pmgr, pmgr: pmgr,
} }
sub, err := h.EventBus().Subscribe(new(peermgr.NewFilPeer))
if err != nil {
panic(err)
}
go func() {
for newPeer := range sub.Out() {
bsPt.addPeer(newPeer.(peermgr.NewFilPeer).Id)
}
}()
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return sub.Close()
},
})
return bsPt
} }
func (bpt *bsPeerTracker) addPeer(p peer.ID) { func (bpt *bsPeerTracker) addPeer(p peer.ID) {
@ -72,16 +94,7 @@ func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID {
var costI, costJ float64 var costI, costJ float64
getPeerInitLat := func(p peer.ID) float64 { getPeerInitLat := func(p peer.ID) float64 {
var res float64 return float64(bpt.avgGlobalTime) * newPeerMul
if bpt.pmgr != nil {
if lat, ok := bpt.pmgr.GetPeerLatency(p); ok {
res = float64(lat)
}
}
if res == 0 {
res = float64(bpt.avgGlobalTime)
}
return res * newPeerMul
} }
if pi.successes+pi.failures > 0 { if pi.successes+pi.failures > 0 {
@ -107,8 +120,8 @@ func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID {
const ( const (
// xInvAlpha = (N+1)/2 // xInvAlpha = (N+1)/2
localInvAlpha = 5 // 86% of the value is the last 9 localInvAlpha = 10 // 86% of the value is the last 19
globalInvAlpha = 20 // 86% of the value is the last 39 globalInvAlpha = 25 // 86% of the value is the last 49
) )
func (bpt *bsPeerTracker) logGlobalSuccess(dur time.Duration) { func (bpt *bsPeerTracker) logGlobalSuccess(dur time.Duration) {
@ -133,7 +146,7 @@ func logTime(pi *peerStats, dur time.Duration) {
} }
func (bpt *bsPeerTracker) logSuccess(p peer.ID, dur time.Duration) { func (bpt *bsPeerTracker) logSuccess(p peer.ID, dur time.Duration, reqSize uint64) {
bpt.lk.Lock() bpt.lk.Lock()
defer bpt.lk.Unlock() defer bpt.lk.Unlock()
@ -145,10 +158,13 @@ func (bpt *bsPeerTracker) logSuccess(p peer.ID, dur time.Duration) {
} }
pi.successes++ pi.successes++
logTime(pi, dur) if reqSize == 0 {
reqSize = 1
}
logTime(pi, dur/time.Duration(reqSize))
} }
func (bpt *bsPeerTracker) logFailure(p peer.ID, dur time.Duration) { func (bpt *bsPeerTracker) logFailure(p peer.ID, dur time.Duration, reqSize uint64) {
bpt.lk.Lock() bpt.lk.Lock()
defer bpt.lk.Unlock() defer bpt.lk.Unlock()
@ -160,7 +176,10 @@ func (bpt *bsPeerTracker) logFailure(p peer.ID, dur time.Duration) {
} }
pi.failures++ pi.failures++
logTime(pi, dur) if reqSize == 0 {
reqSize = 1
}
logTime(pi, dur/time.Duration(reqSize))
} }
func (bpt *bsPeerTracker) removePeer(p peer.ID) { func (bpt *bsPeerTracker) removePeer(p peer.ID) {

View File

@ -10,7 +10,10 @@ import (
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"go.opencensus.io/stats" "go.opencensus.io/stats"
"go.uber.org/fx" "go.uber.org/fx"
"go.uber.org/multierr"
"golang.org/x/xerrors"
"github.com/libp2p/go-libp2p-core/event"
host "github.com/libp2p/go-libp2p-core/host" host "github.com/libp2p/go-libp2p-core/host"
net "github.com/libp2p/go-libp2p-core/network" net "github.com/libp2p/go-libp2p-core/network"
peer "github.com/libp2p/go-libp2p-core/peer" peer "github.com/libp2p/go-libp2p-core/peer"
@ -50,12 +53,17 @@ type PeerMgr struct {
h host.Host h host.Host
dht *dht.IpfsDHT dht *dht.IpfsDHT
notifee *net.NotifyBundle notifee *net.NotifyBundle
filPeerEmitter event.Emitter
done chan struct{} done chan struct{}
} }
func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) *PeerMgr { type NewFilPeer struct {
Id peer.ID
}
func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes.BootstrapPeers) (*PeerMgr, error) {
pm := &PeerMgr{ pm := &PeerMgr{
h: h, h: h,
dht: dht, dht: dht,
@ -69,10 +77,18 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes
done: make(chan struct{}), done: make(chan struct{}),
} }
emitter, err := h.EventBus().Emitter(new(NewFilPeer))
if err != nil {
return nil, xerrors.Errorf("creating NewFilPeer emitter: %w", err)
}
pm.filPeerEmitter = emitter
lc.Append(fx.Hook{ lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error { OnStop: func(ctx context.Context) error {
return pm.Stop(ctx) return multierr.Combine(
pm.filPeerEmitter.Close(),
pm.Stop(ctx),
)
}, },
}) })
@ -84,10 +100,11 @@ func NewPeerMgr(lc fx.Lifecycle, h host.Host, dht *dht.IpfsDHT, bootstrap dtypes
h.Network().Notify(pm.notifee) h.Network().Notify(pm.notifee)
return pm return pm, nil
} }
func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) { func (pmgr *PeerMgr) AddFilecoinPeer(p peer.ID) {
_ = pmgr.filPeerEmitter.Emit(NewFilPeer{Id: p}) //nolint:errcheck
pmgr.peersLk.Lock() pmgr.peersLk.Lock()
defer pmgr.peersLk.Unlock() defer pmgr.peersLk.Unlock()
pmgr.peers[p] = time.Duration(0) pmgr.peers[p] = time.Duration(0)