v1.27.0-a #10

Closed
jonathanface wants to merge 473 commits from v1.27.0-a into master
Showing only changes of commit 223e08bb84 - Show all commits

View File

@ -92,11 +92,13 @@ func (d *debug) chainStateSSE(w http.ResponseWriter, r *http.Request) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(rpcInfos)) wg.Add(len(rpcInfos))
for addr, token := range apiInfos { for addr, token := range apiInfos {
addr := addr
ai := cliutil.APIInfo{ ai := cliutil.APIInfo{
Addr: addr, Addr: addr,
Token: token, Token: token,
} }
go func(info string) {
defer wg.Done()
var clayers []string var clayers []string
for layer, a := range confNameToAddr { for layer, a := range confNameToAddr {
if a == addr { if a == addr {
@ -104,20 +106,20 @@ func (d *debug) chainStateSSE(w http.ResponseWriter, r *http.Request) {
} }
} }
da, err := ai.DialArgs("v1") myinfo := rpcInfo{
if err != nil {
log.Warnw("DialArgs", "error", err)
infosLk.Lock()
infos[addr] = rpcInfo{
Address: ai.Addr, Address: ai.Addr,
Reachable: false, Reachable: false,
CLayers: clayers, CLayers: clayers,
} }
defer func() {
infosLk.Lock()
infos[ai.Addr] = myinfo
infosLk.Unlock() infosLk.Unlock()
}()
wg.Done() da, err := ai.DialArgs("v1")
continue if err != nil {
log.Warnw("DialArgs", "error", err)
return
} }
ah := ai.AuthHeader() ah := ai.AuthHeader()
@ -125,21 +127,8 @@ func (d *debug) chainStateSSE(w http.ResponseWriter, r *http.Request) {
v1api, closer, err := client.NewFullNodeRPCV1(ctx, da, ah) v1api, closer, err := client.NewFullNodeRPCV1(ctx, da, ah)
if err != nil { if err != nil {
log.Warnf("Not able to establish connection to node with addr: %s", addr) log.Warnf("Not able to establish connection to node with addr: %s", addr)
return
infosLk.Lock()
infos[addr] = rpcInfo{
Address: ai.Addr,
Reachable: false,
CLayers: clayers,
} }
infosLk.Unlock()
wg.Done()
continue
}
go func(info string) {
defer wg.Done()
defer closer() defer closer()
ver, err := v1api.Version(ctx) ver, err := v1api.Version(ctx)
@ -164,26 +153,21 @@ func (d *debug) chainStateSSE(w http.ResponseWriter, r *http.Request) {
syncState = fmt.Sprintf("behind (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second)) syncState = fmt.Sprintf("behind (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second))
} }
var out rpcInfo myinfo = rpcInfo{
out.Address = ai.Addr Address: ai.Addr,
out.CLayers = clayers CLayers: clayers,
out.Reachable = true Reachable: true,
out.Version = ver.Version Version: ver.Version,
out.SyncState = syncState SyncState: syncState,
}
infosLk.Lock()
infos[info] = out
infosLk.Unlock()
}(addr) }(addr)
} }
wg.Wait() wg.Wait()
infoList := make([]rpcInfo, 0, len(infos)) var infoList []rpcInfo
for _, info := range infos { for _, i := range infos {
infoList = append(infoList, info) infoList = append(infoList, i)
} }
sort.Slice(infoList, func(i, j int) bool { sort.Slice(infoList, func(i, j int) bool {
return infoList[i].Address < infoList[j].Address return infoList[i].Address < infoList[j].Address
}) })
@ -200,6 +184,12 @@ func (d *debug) chainStateSSE(w http.ResponseWriter, r *http.Request) {
} }
time.Sleep(time.Duration(build.BlockDelaySecs) * time.Second) time.Sleep(time.Duration(build.BlockDelaySecs) * time.Second)
select { // stop running if there is reader.
case <-ctx.Done():
return
default:
}
} }
} }