From 8955b8d8a7ae3110acd82f072e9d739be7f73ac1 Mon Sep 17 00:00:00 2001 From: Peter Rabbitson Date: Sat, 26 Sep 2020 21:16:28 +0200 Subject: [PATCH] Centralize filtering, output wallet addresses --- cmd/lotus-shed/dealtracker.go | 186 ++++++++++++++++++---------------- 1 file changed, 97 insertions(+), 89 deletions(-) diff --git a/cmd/lotus-shed/dealtracker.go b/cmd/lotus-shed/dealtracker.go index d39f51bd1..a21923009 100644 --- a/cmd/lotus-shed/dealtracker.go +++ b/cmd/lotus-shed/dealtracker.go @@ -5,8 +5,7 @@ import ( "encoding/json" "net" "net/http" - "os" - "strings" + "sync" "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/api" @@ -19,23 +18,27 @@ type dealStatsServer struct { api api.FullNode } -var filteredClients map[address.Address]bool +// these lists grow continuously with the network +// TODO: need to switch this to an LRU of sorts, to ensure refreshes +var knownFiltered = new(sync.Map) +var resolvedWallets = new(sync.Map) func init() { - fc := []string{"t0112", "t0113", "t0114", "t010089"} - - filtered, set := os.LookupEnv("FILTERED_CLIENTS") - if set { - fc = strings.Split(filtered, ":") - } - - filteredClients = make(map[address.Address]bool) - for _, a := range fc { - addr, err := address.NewFromString(a) + for _, a := range []string{ + "t0100", // client for genesis miner + "t0112", // client for genesis miner + "t0113", // client for genesis miner + "t0114", // client for genesis miner + "t1nslxql4pck5pq7hddlzym3orxlx35wkepzjkm3i", // SR1 dealbot wallet + "t1stghxhdp2w53dym2nz2jtbpk6ccd4l2lxgmezlq", // SR1 dealbot wallet + "t1mcr5xkgv4jdl3rnz77outn6xbmygb55vdejgbfi", // SR1 dealbot wallet + "t1qiqdbbmrdalbntnuapriirduvxu5ltsc5mhy7si", // SR1 dealbot wallet + } { + a, err := address.NewFromString(a) if err != nil { panic(err) } - filteredClients[addr] = true + knownFiltered.Store(a, true) } } @@ -45,32 +48,16 @@ type dealCountResp struct { } func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() - head, err := dss.api.ChainHead(ctx) - if err != nil { - log.Warnf("failed to get chain head: %s", err) + epoch, deals := dss.filteredDealList() + if epoch == 0 { w.WriteHeader(500) return } - deals, err := dss.api.StateMarketDeals(ctx, head.Key()) - if err != nil { - log.Warnf("failed to get market deals: %s", err) - w.WriteHeader(500) - return - } - - var count int64 - for _, d := range deals { - if !filteredClients[d.Proposal.Client] { - count++ - } - } - if err := json.NewEncoder(w).Encode(&dealCountResp{ - Total: count, - Epoch: int64(head.Height()), + Total: int64(len(deals)), + Epoch: epoch, }); err != nil { log.Warnf("failed to write back deal count response: %s", err) return @@ -83,34 +70,21 @@ type dealAverageResp struct { } func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() - head, err := dss.api.ChainHead(ctx) - if err != nil { - log.Warnf("failed to get chain head: %s", err) + epoch, deals := dss.filteredDealList() + if epoch == 0 { w.WriteHeader(500) return } - deals, err := dss.api.StateMarketDeals(ctx, head.Key()) - if err != nil { - log.Warnf("failed to get market deals: %s", err) - w.WriteHeader(500) - return - } - - var count int64 var totalBytes int64 for _, d := range deals { - if !filteredClients[d.Proposal.Client] { - count++ - totalBytes += int64(d.Proposal.PieceSize.Unpadded()) - } + totalBytes += int64(d.deal.Proposal.PieceSize.Unpadded()) } if err := json.NewEncoder(w).Encode(&dealAverageResp{ - AverageSize: totalBytes / count, - Epoch: int64(head.Height()), + AverageSize: totalBytes / int64(len(deals)), + Epoch: epoch, }); err != nil { log.Warnf("failed to write back deal average response: %s", err) return @@ -123,32 +97,20 @@ type dealTotalResp struct { } func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() - - head, err := dss.api.ChainHead(ctx) - if err != nil { - log.Warnf("failed to get chain head: %s", err) - w.WriteHeader(500) - return - } - - deals, err := dss.api.StateMarketDeals(ctx, head.Key()) - if err != nil { - log.Warnf("failed to get market deals: %s", err) + epoch, deals := dss.filteredDealList() + if epoch == 0 { w.WriteHeader(500) return } var totalBytes int64 for _, d := range deals { - if !filteredClients[d.Proposal.Client] { - totalBytes += int64(d.Proposal.PieceSize.Unpadded()) - } + totalBytes += int64(d.deal.Proposal.PieceSize.Unpadded()) } if err := json.NewEncoder(w).Encode(&dealTotalResp{ TotalBytes: totalBytes, - Epoch: int64(head.Height()), + Epoch: epoch, }); err != nil { log.Warnf("failed to write back deal average response: %s", err) return @@ -168,18 +130,8 @@ type clientStatsOutput struct { } func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() - - head, err := dss.api.ChainHead(ctx) - if err != nil { - log.Warnf("failed to get chain head: %s", err) - w.WriteHeader(500) - return - } - - deals, err := dss.api.StateMarketDeals(ctx, head.Key()) - if err != nil { - log.Warnf("failed to get market deals: %s", err) + epoch, deals := dss.filteredDealList() + if epoch == 0 { w.WriteHeader(500) return } @@ -187,23 +139,20 @@ func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *h stats := make(map[address.Address]*clientStatsOutput) for _, d := range deals { - if filteredClients[d.Proposal.Client] { - continue - } - st, ok := stats[d.Proposal.Client] + st, ok := stats[d.deal.Proposal.Client] if !ok { st = &clientStatsOutput{ - Client: d.Proposal.Client, + Client: d.resolvedWallet, cids: make(map[cid.Cid]bool), providers: make(map[address.Address]bool), } - stats[d.Proposal.Client] = st + stats[d.deal.Proposal.Client] = st } - st.DataSize += int64(d.Proposal.PieceSize.Unpadded()) - st.cids[d.Proposal.PieceCID] = true - st.providers[d.Proposal.Provider] = true + st.DataSize += int64(d.deal.Proposal.PieceSize.Unpadded()) + st.cids[d.deal.Proposal.PieceCID] = true + st.providers[d.deal.Proposal.Provider] = true st.NumDeals++ } @@ -221,6 +170,65 @@ func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *h } } +type dealInfo struct { + deal api.MarketDeal + resolvedWallet address.Address +} + +// filteredDealList returns the current epoch and a list of filtered deals +// on error returns an epoch of 0 +func (dss *dealStatsServer) filteredDealList() (int64, map[string]dealInfo) { + ctx := context.Background() + + head, err := dss.api.ChainHead(ctx) + if err != nil { + log.Warnf("failed to get chain head: %s", err) + return 0, nil + } + + deals, err := dss.api.StateMarketDeals(ctx, head.Key()) + if err != nil { + log.Warnf("failed to get market deals: %s", err) + return 0, nil + } + + ret := make(map[string]dealInfo, len(deals)) + for dealKey, d := range deals { + + // Counting no-longer-active deals as per Pooja's request + // // https://github.com/filecoin-project/specs-actors/blob/v0.9.9/actors/builtin/market/deal.go#L81-L85 + // if d.State.SectorStartEpoch < 0 { + // continue + // } + + if _, isFiltered := knownFiltered.Load(d.Proposal.Client); isFiltered { + continue + } + + if _, wasSeen := resolvedWallets.Load(d.Proposal.Client); !wasSeen { + w, err := dss.api.StateAccountKey(ctx, d.Proposal.Client, head.Key()) + if err != nil { + log.Warnf("failed to resolve id '%s' to wallet address: %s", d.Proposal.Client, err) + continue + } else { + resolvedWallets.Store(d.Proposal.Client, w) + } + } + + w, _ := resolvedWallets.Load(d.Proposal.Client) + if _, isFiltered := knownFiltered.Load(w); isFiltered { + continue + } + + ret[dealKey] = dealInfo{ + deal: d, + resolvedWallet: w.(address.Address), + } + } + + return int64(head.Height()), ret +} + var serveDealStatsCmd = &cli.Command{ Name: "serve-deal-stats", Flags: []cli.Flag{},