Add better hueristic to blocksync client, and start feeding it data

This commit is contained in:
whyrusleeping 2019-12-08 16:02:38 +01:00
parent 8bb6ab02e3
commit 6cf55b0b1e

View File

@ -73,6 +73,8 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int)
}
peers := bs.getPeers()
// randomize the first few peers so we don't always pick the same peer
shufflePrefix(peers)
var oerr error
for _, p := range peers {
@ -139,13 +141,30 @@ func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, h []cid.Cid)
}
}
func shufflePrefix(peers []peer.ID) {
pref := 5
if len(peers) < pref {
pref = len(peers)
}
buf := make([]peer.ID, pref)
perm := rand.Perm(pref)
for i, v := range perm {
buf[i] = peers[v]
}
copy(peers, buf)
}
func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, count uint64) ([]*BSTipSet, error) {
ctx, span := trace.StartSpan(ctx, "GetChainMessages")
defer span.End()
peers := bs.getPeers()
perm := rand.Perm(len(peers))
// TODO: round robin through these peers on error
fmt.Println("BEST PEER: ", bs.syncPeers.peers[peers[0]])
// randomize the first few peers so we don't always pick the same peer
shufflePrefix(peers)
req := &BlockSyncRequest{
Start: h.Cids(),
@ -154,11 +173,11 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
}
var err error
for _, p := range perm {
res, rerr := bs.sendRequestToPeer(ctx, peers[p], req)
for _, p := range peers {
res, rerr := bs.sendRequestToPeer(ctx, p, req)
if rerr != nil {
err = rerr
log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err)
log.Warnf("BlockSync request failed for peer %s: %s", p.String(), err)
continue
}
@ -168,7 +187,7 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
err = bs.processStatus(req, res)
if err != nil {
log.Warnf("BlockSync peer %s response was an error: %s", peers[p].String(), err)
log.Warnf("BlockSync peer %s response was an error: %s", p.String(), err)
}
}
@ -183,6 +202,7 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) {
ctx, span := trace.StartSpan(ctx, "sendRequestToPeer")
defer span.End()
start := time.Now()
if span.IsRecordingEvents() {
span.AddAttributes(
@ -202,6 +222,7 @@ func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *Bloc
var res BlockSyncResponse
if err := cborutil.ReadCborRPC(bufio.NewReader(s), &res); err != nil {
bs.syncPeers.logFailure(p)
return nil, err
}
@ -212,6 +233,9 @@ func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *Bloc
trace.Int64Attribute("chain_len", int64(len(res.Chain))),
)
}
bs.syncPeers.logSuccess(p, time.Since(start))
return &res, nil
}
@ -338,9 +362,10 @@ func (bs *BlockSync) fetchCids(ctx context.Context, cids []cid.Cid, cb func(int,
}
type peerStats struct {
successes int
failures int
firstSeen time.Time
successes int
failures int
firstSeen time.Time
averageTime time.Duration
}
type bsPeerTracker struct {
@ -374,13 +399,26 @@ func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID {
out = append(out, p)
}
// sort by 'expected cost' of requesting data from that peer
// additionally handle edge cases where not enough data is available
sort.Slice(out, func(i, j int) bool {
pi := bpt.peers[out[i]]
pj := bpt.peers[out[j]]
if pj.successes > 0 && pi.successes > 0 {
failRateI := float64(pi.failures) / float64(pi.failures+pi.successes)
costI := float64(pi.averageTime) * failRateI
failRateJ := float64(pj.failures) / float64(pj.failures+pj.successes)
costJ := float64(pj.averageTime) * failRateJ
return costI < costJ
}
if pi.successes > pj.successes {
return true
}
if pi.failures < pj.successes {
if pi.failures < pj.failures {
return true
}
return pi.firstSeen.Before(pj.firstSeen)
@ -389,7 +427,7 @@ func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID {
return out
}
func (bpt *bsPeerTracker) logSuccess(p peer.ID) {
func (bpt *bsPeerTracker) logSuccess(p peer.ID, dur time.Duration) {
bpt.lk.Lock()
defer bpt.lk.Unlock()
if pi, ok := bpt.peers[p]; !ok {
@ -397,6 +435,9 @@ func (bpt *bsPeerTracker) logSuccess(p peer.ID) {
return
} else {
pi.successes++
delta := (dur - pi.averageTime) / 20
pi.averageTime += delta
}
}