Improve the hueristic a bit
License: MIT Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai>
This commit is contained in:
parent
fd6f63a239
commit
9814d53460
@ -76,7 +76,9 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int)
|
|||||||
// randomize the first few peers so we don't always pick the same peer
|
// randomize the first few peers so we don't always pick the same peer
|
||||||
shufflePrefix(peers)
|
shufflePrefix(peers)
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
var oerr error
|
var oerr error
|
||||||
|
|
||||||
for _, p := range peers {
|
for _, p := range peers {
|
||||||
// TODO: doing this synchronously isnt great, but fetching in parallel
|
// TODO: doing this synchronously isnt great, but fetching in parallel
|
||||||
// may not be a good idea either. think about this more
|
// may not be a good idea either. think about this more
|
||||||
@ -94,6 +96,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if res.Status == 0 {
|
if res.Status == 0 {
|
||||||
|
bs.syncPeers.logGlobalSuccess(time.Since(start))
|
||||||
return bs.processBlocksResponse(req, res)
|
return bs.processBlocksResponse(req, res)
|
||||||
}
|
}
|
||||||
oerr = bs.processStatus(req, res)
|
oerr = bs.processStatus(req, res)
|
||||||
@ -173,6 +176,8 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
|
|||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
for _, p := range peers {
|
for _, p := range peers {
|
||||||
res, rerr := bs.sendRequestToPeer(ctx, p, req)
|
res, rerr := bs.sendRequestToPeer(ctx, p, req)
|
||||||
if rerr != nil {
|
if rerr != nil {
|
||||||
@ -182,11 +187,12 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
|
|||||||
}
|
}
|
||||||
|
|
||||||
if res.Status == 0 {
|
if res.Status == 0 {
|
||||||
|
bs.syncPeers.logGlobalSuccess(time.Since(start))
|
||||||
return res.Chain, nil
|
return res.Chain, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = bs.processStatus(req, res)
|
err = bs.processStatus(req, res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
||||||
log.Warnf("BlockSync peer %s response was an error: %s", p.String(), err)
|
log.Warnf("BlockSync peer %s response was an error: %s", p.String(), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -199,9 +205,21 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
|
|||||||
return nil, xerrors.Errorf("GetChainMessages failed with all peers(%d): %w", len(peers), err)
|
return nil, xerrors.Errorf("GetChainMessages failed with all peers(%d): %w", len(peers), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) {
|
func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *BlockSyncRequest) (_ *BlockSyncResponse, err error) {
|
||||||
ctx, span := trace.StartSpan(ctx, "sendRequestToPeer")
|
ctx, span := trace.StartSpan(ctx, "sendRequestToPeer")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
if span.IsRecordingEvents() {
|
||||||
|
span.SetStatus(trace.Status{
|
||||||
|
Code: 5,
|
||||||
|
Message: err.Error(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
if span.IsRecordingEvents() {
|
if span.IsRecordingEvents() {
|
||||||
@ -215,14 +233,17 @@ func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *Bloc
|
|||||||
bs.RemovePeer(p)
|
bs.RemovePeer(p)
|
||||||
return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
|
return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
|
||||||
}
|
}
|
||||||
|
s.SetDeadline(time.Now().Add(10 * time.Second))
|
||||||
|
defer s.SetDeadline(time.Time{})
|
||||||
|
|
||||||
if err := cborutil.WriteCborRPC(s, req); err != nil {
|
if err := cborutil.WriteCborRPC(s, req); err != nil {
|
||||||
|
bs.syncPeers.logFailure(p, time.Since(start))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var res BlockSyncResponse
|
var res BlockSyncResponse
|
||||||
if err := cborutil.ReadCborRPC(bufio.NewReader(s), &res); err != nil {
|
if err := cborutil.ReadCborRPC(bufio.NewReader(s), &res); err != nil {
|
||||||
bs.syncPeers.logFailure(p)
|
bs.syncPeers.logFailure(p, time.Since(start))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -368,9 +389,13 @@ type peerStats struct {
|
|||||||
averageTime time.Duration
|
averageTime time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const alpha = 20
|
||||||
|
|
||||||
type bsPeerTracker struct {
|
type bsPeerTracker struct {
|
||||||
peers map[peer.ID]*peerStats
|
lk sync.Mutex
|
||||||
lk sync.Mutex
|
|
||||||
|
peers map[peer.ID]*peerStats
|
||||||
|
avgGlobalTime time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPeerTracker() *bsPeerTracker {
|
func newPeerTracker() *bsPeerTracker {
|
||||||
@ -378,6 +403,7 @@ func newPeerTracker() *bsPeerTracker {
|
|||||||
peers: make(map[peer.ID]*peerStats),
|
peers: make(map[peer.ID]*peerStats),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bpt *bsPeerTracker) addPeer(p peer.ID) {
|
func (bpt *bsPeerTracker) addPeer(p peer.ID) {
|
||||||
bpt.lk.Lock()
|
bpt.lk.Lock()
|
||||||
defer bpt.lk.Unlock()
|
defer bpt.lk.Unlock()
|
||||||
@ -405,43 +431,54 @@ func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID {
|
|||||||
pi := bpt.peers[out[i]]
|
pi := bpt.peers[out[i]]
|
||||||
pj := bpt.peers[out[j]]
|
pj := bpt.peers[out[j]]
|
||||||
|
|
||||||
if pj.successes > 0 && pi.successes > 0 {
|
var costI, costJ float64
|
||||||
|
|
||||||
|
if pi.successes+pi.failures > 0 {
|
||||||
failRateI := float64(pi.failures) / float64(pi.failures+pi.successes)
|
failRateI := float64(pi.failures) / float64(pi.failures+pi.successes)
|
||||||
costI := float64(pi.averageTime) * (failRateI + 1)
|
costI = float64(pi.averageTime) + failRateI*float64(bpt.avgGlobalTime)
|
||||||
|
} else {
|
||||||
|
// we know nothing about this peer
|
||||||
|
// make them bit better than average
|
||||||
|
costI = float64(bpt.avgGlobalTime) * 0.8
|
||||||
|
}
|
||||||
|
|
||||||
|
if pj.successes+pj.failures > 0 {
|
||||||
failRateJ := float64(pj.failures) / float64(pj.failures+pj.successes)
|
failRateJ := float64(pj.failures) / float64(pj.failures+pj.successes)
|
||||||
costJ := float64(pj.averageTime) * (failRateJ + 1)
|
costJ = float64(pj.averageTime) + failRateJ*float64(bpt.avgGlobalTime)
|
||||||
|
} else {
|
||||||
return costI < costJ
|
costJ = float64(bpt.avgGlobalTime) * 0.8
|
||||||
}
|
}
|
||||||
|
|
||||||
if pi.successes > pj.successes {
|
return costI < costJ
|
||||||
return true
|
|
||||||
}
|
|
||||||
if pi.failures < pj.failures {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return pi.firstSeen.Before(pj.firstSeen)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bpt *bsPeerTracker) logGlobalSuccess(dur time.Duration) {
|
||||||
|
bpt.lk.Lock()
|
||||||
|
defer bpt.lk.Unlock()
|
||||||
|
|
||||||
|
delta := (dur - bpt.avgGlobalTime) / alpha
|
||||||
|
bpt.avgGlobalTime += delta
|
||||||
|
}
|
||||||
|
|
||||||
func (bpt *bsPeerTracker) logSuccess(p peer.ID, dur time.Duration) {
|
func (bpt *bsPeerTracker) logSuccess(p peer.ID, dur time.Duration) {
|
||||||
bpt.lk.Lock()
|
bpt.lk.Lock()
|
||||||
defer bpt.lk.Unlock()
|
defer bpt.lk.Unlock()
|
||||||
|
|
||||||
if pi, ok := bpt.peers[p]; !ok {
|
if pi, ok := bpt.peers[p]; !ok {
|
||||||
log.Warn("log success called on peer not in tracker")
|
log.Warn("log success called on peer not in tracker")
|
||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
pi.successes++
|
pi.successes++
|
||||||
|
|
||||||
delta := (dur - pi.averageTime) / 20
|
delta := (dur - pi.averageTime) / alpha
|
||||||
pi.averageTime += delta
|
pi.averageTime += delta
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bpt *bsPeerTracker) logFailure(p peer.ID) {
|
func (bpt *bsPeerTracker) logFailure(p peer.ID, dur time.Duration) {
|
||||||
bpt.lk.Lock()
|
bpt.lk.Lock()
|
||||||
defer bpt.lk.Unlock()
|
defer bpt.lk.Unlock()
|
||||||
if pi, ok := bpt.peers[p]; !ok {
|
if pi, ok := bpt.peers[p]; !ok {
|
||||||
@ -449,6 +486,9 @@ func (bpt *bsPeerTracker) logFailure(p peer.ID) {
|
|||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
pi.failures++
|
pi.failures++
|
||||||
|
|
||||||
|
delta := (dur - pi.averageTime) / alpha
|
||||||
|
pi.averageTime += delta
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user