Centralize filtering, output wallet addresses

This commit is contained in:
Peter Rabbitson 2020-09-26 21:16:28 +02:00
parent 2c1d96bcaa
commit 8955b8d8a7

View File

@ -5,8 +5,7 @@ import (
"encoding/json" "encoding/json"
"net" "net"
"net/http" "net/http"
"os" "sync"
"strings"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
@ -19,23 +18,27 @@ type dealStatsServer struct {
api api.FullNode api api.FullNode
} }
var filteredClients map[address.Address]bool // these lists grow continuously with the network
// TODO: need to switch this to an LRU of sorts, to ensure refreshes
var knownFiltered = new(sync.Map)
var resolvedWallets = new(sync.Map)
func init() { func init() {
fc := []string{"t0112", "t0113", "t0114", "t010089"} for _, a := range []string{
"t0100", // client for genesis miner
filtered, set := os.LookupEnv("FILTERED_CLIENTS") "t0112", // client for genesis miner
if set { "t0113", // client for genesis miner
fc = strings.Split(filtered, ":") "t0114", // client for genesis miner
} "t1nslxql4pck5pq7hddlzym3orxlx35wkepzjkm3i", // SR1 dealbot wallet
"t1stghxhdp2w53dym2nz2jtbpk6ccd4l2lxgmezlq", // SR1 dealbot wallet
filteredClients = make(map[address.Address]bool) "t1mcr5xkgv4jdl3rnz77outn6xbmygb55vdejgbfi", // SR1 dealbot wallet
for _, a := range fc { "t1qiqdbbmrdalbntnuapriirduvxu5ltsc5mhy7si", // SR1 dealbot wallet
addr, err := address.NewFromString(a) } {
a, err := address.NewFromString(a)
if err != nil { if err != nil {
panic(err) panic(err)
} }
filteredClients[addr] = true knownFiltered.Store(a, true)
} }
} }
@ -45,32 +48,16 @@ type dealCountResp struct {
} }
func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *http.Request) { func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
head, err := dss.api.ChainHead(ctx) epoch, deals := dss.filteredDealList()
if err != nil { if epoch == 0 {
log.Warnf("failed to get chain head: %s", err)
w.WriteHeader(500) w.WriteHeader(500)
return return
} }
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
if err != nil {
log.Warnf("failed to get market deals: %s", err)
w.WriteHeader(500)
return
}
var count int64
for _, d := range deals {
if !filteredClients[d.Proposal.Client] {
count++
}
}
if err := json.NewEncoder(w).Encode(&dealCountResp{ if err := json.NewEncoder(w).Encode(&dealCountResp{
Total: count, Total: int64(len(deals)),
Epoch: int64(head.Height()), Epoch: epoch,
}); err != nil { }); err != nil {
log.Warnf("failed to write back deal count response: %s", err) log.Warnf("failed to write back deal count response: %s", err)
return return
@ -83,34 +70,21 @@ type dealAverageResp struct {
} }
func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, r *http.Request) { func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
head, err := dss.api.ChainHead(ctx) epoch, deals := dss.filteredDealList()
if err != nil { if epoch == 0 {
log.Warnf("failed to get chain head: %s", err)
w.WriteHeader(500) w.WriteHeader(500)
return return
} }
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
if err != nil {
log.Warnf("failed to get market deals: %s", err)
w.WriteHeader(500)
return
}
var count int64
var totalBytes int64 var totalBytes int64
for _, d := range deals { for _, d := range deals {
if !filteredClients[d.Proposal.Client] { totalBytes += int64(d.deal.Proposal.PieceSize.Unpadded())
count++
totalBytes += int64(d.Proposal.PieceSize.Unpadded())
}
} }
if err := json.NewEncoder(w).Encode(&dealAverageResp{ if err := json.NewEncoder(w).Encode(&dealAverageResp{
AverageSize: totalBytes / count, AverageSize: totalBytes / int64(len(deals)),
Epoch: int64(head.Height()), Epoch: epoch,
}); err != nil { }); err != nil {
log.Warnf("failed to write back deal average response: %s", err) log.Warnf("failed to write back deal average response: %s", err)
return return
@ -123,32 +97,20 @@ type dealTotalResp struct {
} }
func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r *http.Request) { func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r *http.Request) {
ctx := context.Background() epoch, deals := dss.filteredDealList()
if epoch == 0 {
head, err := dss.api.ChainHead(ctx)
if err != nil {
log.Warnf("failed to get chain head: %s", err)
w.WriteHeader(500)
return
}
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
if err != nil {
log.Warnf("failed to get market deals: %s", err)
w.WriteHeader(500) w.WriteHeader(500)
return return
} }
var totalBytes int64 var totalBytes int64
for _, d := range deals { for _, d := range deals {
if !filteredClients[d.Proposal.Client] { totalBytes += int64(d.deal.Proposal.PieceSize.Unpadded())
totalBytes += int64(d.Proposal.PieceSize.Unpadded())
}
} }
if err := json.NewEncoder(w).Encode(&dealTotalResp{ if err := json.NewEncoder(w).Encode(&dealTotalResp{
TotalBytes: totalBytes, TotalBytes: totalBytes,
Epoch: int64(head.Height()), Epoch: epoch,
}); err != nil { }); err != nil {
log.Warnf("failed to write back deal average response: %s", err) log.Warnf("failed to write back deal average response: %s", err)
return return
@ -168,18 +130,8 @@ type clientStatsOutput struct {
} }
func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *http.Request) { func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *http.Request) {
ctx := context.Background() epoch, deals := dss.filteredDealList()
if epoch == 0 {
head, err := dss.api.ChainHead(ctx)
if err != nil {
log.Warnf("failed to get chain head: %s", err)
w.WriteHeader(500)
return
}
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
if err != nil {
log.Warnf("failed to get market deals: %s", err)
w.WriteHeader(500) w.WriteHeader(500)
return return
} }
@ -187,23 +139,20 @@ func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *h
stats := make(map[address.Address]*clientStatsOutput) stats := make(map[address.Address]*clientStatsOutput)
for _, d := range deals { for _, d := range deals {
if filteredClients[d.Proposal.Client] {
continue
}
st, ok := stats[d.Proposal.Client] st, ok := stats[d.deal.Proposal.Client]
if !ok { if !ok {
st = &clientStatsOutput{ st = &clientStatsOutput{
Client: d.Proposal.Client, Client: d.resolvedWallet,
cids: make(map[cid.Cid]bool), cids: make(map[cid.Cid]bool),
providers: make(map[address.Address]bool), providers: make(map[address.Address]bool),
} }
stats[d.Proposal.Client] = st stats[d.deal.Proposal.Client] = st
} }
st.DataSize += int64(d.Proposal.PieceSize.Unpadded()) st.DataSize += int64(d.deal.Proposal.PieceSize.Unpadded())
st.cids[d.Proposal.PieceCID] = true st.cids[d.deal.Proposal.PieceCID] = true
st.providers[d.Proposal.Provider] = true st.providers[d.deal.Proposal.Provider] = true
st.NumDeals++ st.NumDeals++
} }
@ -221,6 +170,65 @@ func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *h
} }
} }
type dealInfo struct {
deal api.MarketDeal
resolvedWallet address.Address
}
// filteredDealList returns the current epoch and a list of filtered deals
// on error returns an epoch of 0
func (dss *dealStatsServer) filteredDealList() (int64, map[string]dealInfo) {
ctx := context.Background()
head, err := dss.api.ChainHead(ctx)
if err != nil {
log.Warnf("failed to get chain head: %s", err)
return 0, nil
}
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
if err != nil {
log.Warnf("failed to get market deals: %s", err)
return 0, nil
}
ret := make(map[string]dealInfo, len(deals))
for dealKey, d := range deals {
// Counting no-longer-active deals as per Pooja's request
// // https://github.com/filecoin-project/specs-actors/blob/v0.9.9/actors/builtin/market/deal.go#L81-L85
// if d.State.SectorStartEpoch < 0 {
// continue
// }
if _, isFiltered := knownFiltered.Load(d.Proposal.Client); isFiltered {
continue
}
if _, wasSeen := resolvedWallets.Load(d.Proposal.Client); !wasSeen {
w, err := dss.api.StateAccountKey(ctx, d.Proposal.Client, head.Key())
if err != nil {
log.Warnf("failed to resolve id '%s' to wallet address: %s", d.Proposal.Client, err)
continue
} else {
resolvedWallets.Store(d.Proposal.Client, w)
}
}
w, _ := resolvedWallets.Load(d.Proposal.Client)
if _, isFiltered := knownFiltered.Load(w); isFiltered {
continue
}
ret[dealKey] = dealInfo{
deal: d,
resolvedWallet: w.(address.Address),
}
}
return int64(head.Height()), ret
}
var serveDealStatsCmd = &cli.Command{ var serveDealStatsCmd = &cli.Command{
Name: "serve-deal-stats", Name: "serve-deal-stats",
Flags: []cli.Flag{}, Flags: []cli.Flag{},