Merge pull request #791 from filecoin-project/feat/blocksync-hueristic
Add better hueristic to blocksync client, and start feeding it data
This commit is contained in:
commit
635398d57d
@ -73,8 +73,12 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int)
|
||||
}
|
||||
|
||||
peers := bs.getPeers()
|
||||
// randomize the first few peers so we don't always pick the same peer
|
||||
shufflePrefix(peers)
|
||||
|
||||
start := time.Now()
|
||||
var oerr error
|
||||
|
||||
for _, p := range peers {
|
||||
// TODO: doing this synchronously isnt great, but fetching in parallel
|
||||
// may not be a good idea either. think about this more
|
||||
@ -92,6 +96,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int)
|
||||
}
|
||||
|
||||
if res.Status == 0 {
|
||||
bs.syncPeers.logGlobalSuccess(time.Since(start))
|
||||
return bs.processBlocksResponse(req, res)
|
||||
}
|
||||
oerr = bs.processStatus(req, res)
|
||||
@ -139,13 +144,28 @@ func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, h []cid.Cid)
|
||||
}
|
||||
}
|
||||
|
||||
func shufflePrefix(peers []peer.ID) {
|
||||
pref := 5
|
||||
if len(peers) < pref {
|
||||
pref = len(peers)
|
||||
}
|
||||
|
||||
buf := make([]peer.ID, pref)
|
||||
perm := rand.Perm(pref)
|
||||
for i, v := range perm {
|
||||
buf[i] = peers[v]
|
||||
}
|
||||
|
||||
copy(peers, buf)
|
||||
}
|
||||
|
||||
func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, count uint64) ([]*BSTipSet, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "GetChainMessages")
|
||||
defer span.End()
|
||||
|
||||
peers := bs.getPeers()
|
||||
perm := rand.Perm(len(peers))
|
||||
// TODO: round robin through these peers on error
|
||||
// randomize the first few peers so we don't always pick the same peer
|
||||
shufflePrefix(peers)
|
||||
|
||||
req := &BlockSyncRequest{
|
||||
Start: h.Cids(),
|
||||
@ -154,21 +174,24 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
|
||||
}
|
||||
|
||||
var err error
|
||||
for _, p := range perm {
|
||||
res, rerr := bs.sendRequestToPeer(ctx, peers[p], req)
|
||||
start := time.Now()
|
||||
|
||||
for _, p := range peers {
|
||||
res, rerr := bs.sendRequestToPeer(ctx, p, req)
|
||||
if rerr != nil {
|
||||
err = rerr
|
||||
log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err)
|
||||
log.Warnf("BlockSync request failed for peer %s: %s", p.String(), err)
|
||||
continue
|
||||
}
|
||||
|
||||
if res.Status == 0 {
|
||||
bs.syncPeers.logGlobalSuccess(time.Since(start))
|
||||
return res.Chain, nil
|
||||
}
|
||||
|
||||
err = bs.processStatus(req, res)
|
||||
if err != nil {
|
||||
|
||||
log.Warnf("BlockSync peer %s response was an error: %s", peers[p].String(), err)
|
||||
log.Warnf("BlockSync peer %s response was an error: %s", p.String(), err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -180,10 +203,23 @@ func (bs *BlockSync) GetChainMessages(ctx context.Context, h *types.TipSet, coun
|
||||
return nil, xerrors.Errorf("GetChainMessages failed with all peers(%d): %w", len(peers), err)
|
||||
}
|
||||
|
||||
func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *BlockSyncRequest) (*BlockSyncResponse, error) {
|
||||
func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *BlockSyncRequest) (_ *BlockSyncResponse, err error) {
|
||||
ctx, span := trace.StartSpan(ctx, "sendRequestToPeer")
|
||||
defer span.End()
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if span.IsRecordingEvents() {
|
||||
span.SetStatus(trace.Status{
|
||||
Code: 5,
|
||||
Message: err.Error(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
start := time.Now()
|
||||
|
||||
if span.IsRecordingEvents() {
|
||||
span.AddAttributes(
|
||||
trace.StringAttribute("peer", p.Pretty()),
|
||||
@ -195,13 +231,17 @@ func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *Bloc
|
||||
bs.RemovePeer(p)
|
||||
return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
|
||||
}
|
||||
s.SetDeadline(time.Now().Add(10 * time.Second))
|
||||
defer s.SetDeadline(time.Time{})
|
||||
|
||||
if err := cborutil.WriteCborRPC(s, req); err != nil {
|
||||
bs.syncPeers.logFailure(p, time.Since(start))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var res BlockSyncResponse
|
||||
if err := cborutil.ReadCborRPC(bufio.NewReader(s), &res); err != nil {
|
||||
bs.syncPeers.logFailure(p, time.Since(start))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -212,6 +252,9 @@ func (bs *BlockSync) sendRequestToPeer(ctx context.Context, p peer.ID, req *Bloc
|
||||
trace.Int64Attribute("chain_len", int64(len(res.Chain))),
|
||||
)
|
||||
}
|
||||
|
||||
bs.syncPeers.logSuccess(p, time.Since(start))
|
||||
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
@ -338,14 +381,17 @@ func (bs *BlockSync) fetchCids(ctx context.Context, cids []cid.Cid, cb func(int,
|
||||
}
|
||||
|
||||
type peerStats struct {
|
||||
successes int
|
||||
failures int
|
||||
firstSeen time.Time
|
||||
successes int
|
||||
failures int
|
||||
firstSeen time.Time
|
||||
averageTime time.Duration
|
||||
}
|
||||
|
||||
type bsPeerTracker struct {
|
||||
peers map[peer.ID]*peerStats
|
||||
lk sync.Mutex
|
||||
lk sync.Mutex
|
||||
|
||||
peers map[peer.ID]*peerStats
|
||||
avgGlobalTime time.Duration
|
||||
}
|
||||
|
||||
func newPeerTracker() *bsPeerTracker {
|
||||
@ -353,6 +399,7 @@ func newPeerTracker() *bsPeerTracker {
|
||||
peers: make(map[peer.ID]*peerStats),
|
||||
}
|
||||
}
|
||||
|
||||
func (bpt *bsPeerTracker) addPeer(p peer.ID) {
|
||||
bpt.lk.Lock()
|
||||
defer bpt.lk.Unlock()
|
||||
@ -365,6 +412,12 @@ func (bpt *bsPeerTracker) addPeer(p peer.ID) {
|
||||
|
||||
}
|
||||
|
||||
const (
|
||||
// newPeerMul is how much better than average is the new peer assumed to be
|
||||
// less than one to encourouge trying new peers
|
||||
newPeerMul = 0.9
|
||||
)
|
||||
|
||||
func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID {
|
||||
// TODO: this could probably be cached, but as long as its not too many peers, fine for now
|
||||
bpt.lk.Lock()
|
||||
@ -374,40 +427,88 @@ func (bpt *bsPeerTracker) prefSortedPeers() []peer.ID {
|
||||
out = append(out, p)
|
||||
}
|
||||
|
||||
// sort by 'expected cost' of requesting data from that peer
|
||||
// additionally handle edge cases where not enough data is available
|
||||
sort.Slice(out, func(i, j int) bool {
|
||||
pi := bpt.peers[out[i]]
|
||||
pj := bpt.peers[out[j]]
|
||||
if pi.successes > pj.successes {
|
||||
return true
|
||||
|
||||
var costI, costJ float64
|
||||
|
||||
if pi.successes+pi.failures > 0 {
|
||||
failRateI := float64(pi.failures) / float64(pi.failures+pi.successes)
|
||||
costI = float64(pi.averageTime) + failRateI*float64(bpt.avgGlobalTime)
|
||||
} else {
|
||||
// we know nothing about this peer
|
||||
// make them bit better than average
|
||||
costI = 0.9 * float64(bpt.avgGlobalTime)
|
||||
}
|
||||
if pi.failures < pj.successes {
|
||||
return true
|
||||
|
||||
if pj.successes+pj.failures > 0 {
|
||||
failRateJ := float64(pj.failures) / float64(pj.failures+pj.successes)
|
||||
costJ = float64(pj.averageTime) + failRateJ*float64(bpt.avgGlobalTime)
|
||||
} else {
|
||||
costJ = 0.9 * float64(bpt.avgGlobalTime)
|
||||
}
|
||||
return pi.firstSeen.Before(pj.firstSeen)
|
||||
|
||||
return costI < costJ
|
||||
})
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func (bpt *bsPeerTracker) logSuccess(p peer.ID) {
|
||||
const (
|
||||
// xInvAlpha = (N+1)/2
|
||||
|
||||
localInvAlpha = 5 // 86% of the value is the last 9
|
||||
globalInvAlpha = 20 // 86% of the value is the last 39
|
||||
)
|
||||
|
||||
func (bpt *bsPeerTracker) logGlobalSuccess(dur time.Duration) {
|
||||
bpt.lk.Lock()
|
||||
defer bpt.lk.Unlock()
|
||||
|
||||
if bpt.avgGlobalTime == 0 {
|
||||
bpt.avgGlobalTime = dur
|
||||
return
|
||||
}
|
||||
delta := (dur - bpt.avgGlobalTime) / globalInvAlpha
|
||||
bpt.avgGlobalTime += delta
|
||||
}
|
||||
|
||||
func logTime(pi *peerStats, dur time.Duration) {
|
||||
if pi.averageTime == 0 {
|
||||
pi.averageTime = dur
|
||||
return
|
||||
}
|
||||
delta := (dur - pi.averageTime) / localInvAlpha
|
||||
pi.averageTime += delta
|
||||
|
||||
}
|
||||
|
||||
func (bpt *bsPeerTracker) logSuccess(p peer.ID, dur time.Duration) {
|
||||
bpt.lk.Lock()
|
||||
defer bpt.lk.Unlock()
|
||||
|
||||
if pi, ok := bpt.peers[p]; !ok {
|
||||
log.Warn("log success called on peer not in tracker")
|
||||
log.Warnw("log success called on peer not in tracker", "peerid", p.String())
|
||||
return
|
||||
} else {
|
||||
pi.successes++
|
||||
|
||||
logTime(pi, dur)
|
||||
}
|
||||
}
|
||||
|
||||
func (bpt *bsPeerTracker) logFailure(p peer.ID) {
|
||||
func (bpt *bsPeerTracker) logFailure(p peer.ID, dur time.Duration) {
|
||||
bpt.lk.Lock()
|
||||
defer bpt.lk.Unlock()
|
||||
if pi, ok := bpt.peers[p]; !ok {
|
||||
log.Warn("log failure called on peer not in tracker")
|
||||
log.Warn("log failure called on peer not in tracker", "peerid", p.String())
|
||||
return
|
||||
} else {
|
||||
pi.failures++
|
||||
logTime(pi, dur)
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user