fix race condition, 'thundering herd'-like issue, and add logging
This commit is contained in:
parent
6d1914ce43
commit
6caf725e14
@ -119,16 +119,26 @@ func NewDrandBeacon(genesisTs, interval uint64) (*DrandBeacon, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (db *DrandBeacon) rotatePeersIndex() {
|
func (db *DrandBeacon) rotatePeersIndex() {
|
||||||
|
db.peersIndexMtx.Lock()
|
||||||
|
nval := rand.Intn(len(db.peers))
|
||||||
|
db.peersIndex = nval
|
||||||
|
db.peersIndexMtx.Unlock()
|
||||||
|
|
||||||
|
log.Warnf("rotated to drand peer %d, %q", nval, db.peers[nval].Address())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DrandBeacon) getPeerIndex() int {
|
||||||
db.peersIndexMtx.Lock()
|
db.peersIndexMtx.Lock()
|
||||||
defer db.peersIndexMtx.Unlock()
|
defer db.peersIndexMtx.Unlock()
|
||||||
db.peersIndex = (db.peersIndex + 1) % len(db.peers)
|
return db.peersIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DrandBeacon) handleStreamingUpdates() {
|
func (db *DrandBeacon) handleStreamingUpdates() {
|
||||||
for {
|
for {
|
||||||
ch, err := db.client.PublicRandStream(context.Background(), db.peers[db.peersIndex], &dproto.PublicRandRequest{})
|
p := db.peers[db.getPeerIndex()]
|
||||||
|
ch, err := db.client.PublicRandStream(context.Background(), p, &dproto.PublicRandRequest{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("failed to get public rand stream: %s", err)
|
log.Warnf("failed to get public rand stream to peer %q: %s", p.Address(), err)
|
||||||
log.Warnf("trying again in 10 seconds")
|
log.Warnf("trying again in 10 seconds")
|
||||||
db.rotatePeersIndex()
|
db.rotatePeersIndex()
|
||||||
time.Sleep(time.Second * 10)
|
time.Sleep(time.Second * 10)
|
||||||
@ -142,7 +152,7 @@ func (db *DrandBeacon) handleStreamingUpdates() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Warn("drand beacon stream broke, reconnecting in 10 seconds")
|
log.Warnf("drand beacon stream to peer %q broke, reconnecting in 10 seconds", p.Address())
|
||||||
db.rotatePeersIndex()
|
db.rotatePeersIndex()
|
||||||
time.Sleep(time.Second * 10)
|
time.Sleep(time.Second * 10)
|
||||||
}
|
}
|
||||||
@ -161,12 +171,13 @@ func (db *DrandBeacon) Entry(ctx context.Context, round uint64) <-chan beacon.Re
|
|||||||
out := make(chan beacon.Response, 1)
|
out := make(chan beacon.Response, 1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
resp, err := db.client.PublicRand(ctx, db.peers[db.peersIndex], &dproto.PublicRandRequest{Round: round})
|
p := db.peers[db.getPeerIndex()]
|
||||||
|
resp, err := db.client.PublicRand(ctx, p, &dproto.PublicRandRequest{Round: round})
|
||||||
|
|
||||||
var br beacon.Response
|
var br beacon.Response
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.rotatePeersIndex()
|
db.rotatePeersIndex()
|
||||||
br.Err = err
|
br.Err = xerrors.Errorf("drand peer %q failed publicRand request: %w", p.Address(), err)
|
||||||
} else {
|
} else {
|
||||||
br.Entry.Round = resp.GetRound()
|
br.Entry.Round = resp.GetRound()
|
||||||
br.Entry.Data = resp.GetSignature()
|
br.Entry.Data = resp.GetSignature()
|
||||||
|
Loading…
Reference in New Issue
Block a user