Merge pull request #4051 from filecoin-project/chore/final-dealstat-version
Chore/final dealstat version
This commit is contained in:
commit
91d04ede0f
@ -5,10 +5,10 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"sync"
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
lcli "github.com/filecoin-project/lotus/cli"
|
lcli "github.com/filecoin-project/lotus/cli"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
@ -19,58 +19,54 @@ type dealStatsServer struct {
|
|||||||
api api.FullNode
|
api api.FullNode
|
||||||
}
|
}
|
||||||
|
|
||||||
var filteredClients map[address.Address]bool
|
// Requested by @jbenet
|
||||||
|
// How many epochs back to look at for dealstats
|
||||||
|
var epochLookback = abi.ChainEpoch(10)
|
||||||
|
|
||||||
|
// these lists grow continuously with the network
|
||||||
|
// TODO: need to switch this to an LRU of sorts, to ensure refreshes
|
||||||
|
var knownFiltered = new(sync.Map)
|
||||||
|
var resolvedWallets = new(sync.Map)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
fc := []string{"t0112", "t0113", "t0114", "t010089"}
|
for _, a := range []string{
|
||||||
|
"t0100", // client for genesis miner
|
||||||
filtered, set := os.LookupEnv("FILTERED_CLIENTS")
|
"t0101", // client for genesis miner
|
||||||
if set {
|
"t0102", // client for genesis miner
|
||||||
fc = strings.Split(filtered, ":")
|
"t0112", // client for genesis miner
|
||||||
}
|
"t0113", // client for genesis miner
|
||||||
|
"t0114", // client for genesis miner
|
||||||
filteredClients = make(map[address.Address]bool)
|
"t1nslxql4pck5pq7hddlzym3orxlx35wkepzjkm3i", // SR1 dealbot wallet
|
||||||
for _, a := range fc {
|
"t1stghxhdp2w53dym2nz2jtbpk6ccd4l2lxgmezlq", // SR1 dealbot wallet
|
||||||
addr, err := address.NewFromString(a)
|
"t1mcr5xkgv4jdl3rnz77outn6xbmygb55vdejgbfi", // SR1 dealbot wallet
|
||||||
|
"t1qiqdbbmrdalbntnuapriirduvxu5ltsc5mhy7si", // SR1 dealbot wallet
|
||||||
|
} {
|
||||||
|
a, err := address.NewFromString(a)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
filteredClients[addr] = true
|
knownFiltered.Store(a, true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type dealCountResp struct {
|
type dealCountResp struct {
|
||||||
Total int64 `json:"total"`
|
Epoch int64 `json:"epoch"`
|
||||||
Epoch int64 `json:"epoch"`
|
Endpoint string `json:"endpoint"`
|
||||||
|
Payload int64 `json:"payload"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *http.Request) {
|
func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
head, err := dss.api.ChainHead(ctx)
|
epoch, deals := dss.filteredDealList()
|
||||||
if err != nil {
|
if epoch == 0 {
|
||||||
log.Warnf("failed to get chain head: %s", err)
|
|
||||||
w.WriteHeader(500)
|
w.WriteHeader(500)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("failed to get market deals: %s", err)
|
|
||||||
w.WriteHeader(500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var count int64
|
|
||||||
for _, d := range deals {
|
|
||||||
if !filteredClients[d.Proposal.Client] {
|
|
||||||
count++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := json.NewEncoder(w).Encode(&dealCountResp{
|
if err := json.NewEncoder(w).Encode(&dealCountResp{
|
||||||
Total: count,
|
Endpoint: "COUNT_DEALS",
|
||||||
Epoch: int64(head.Height()),
|
Payload: int64(len(deals)),
|
||||||
|
Epoch: epoch,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
log.Warnf("failed to write back deal count response: %s", err)
|
log.Warnf("failed to write back deal count response: %s", err)
|
||||||
return
|
return
|
||||||
@ -78,39 +74,28 @@ func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *htt
|
|||||||
}
|
}
|
||||||
|
|
||||||
type dealAverageResp struct {
|
type dealAverageResp struct {
|
||||||
AverageSize int64 `json:"average_size"`
|
Epoch int64 `json:"epoch"`
|
||||||
Epoch int64 `json:"epoch"`
|
Endpoint string `json:"endpoint"`
|
||||||
|
Payload int64 `json:"payload"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, r *http.Request) {
|
func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
head, err := dss.api.ChainHead(ctx)
|
epoch, deals := dss.filteredDealList()
|
||||||
if err != nil {
|
if epoch == 0 {
|
||||||
log.Warnf("failed to get chain head: %s", err)
|
|
||||||
w.WriteHeader(500)
|
w.WriteHeader(500)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("failed to get market deals: %s", err)
|
|
||||||
w.WriteHeader(500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var count int64
|
|
||||||
var totalBytes int64
|
var totalBytes int64
|
||||||
for _, d := range deals {
|
for _, d := range deals {
|
||||||
if !filteredClients[d.Proposal.Client] {
|
totalBytes += int64(d.deal.Proposal.PieceSize.Unpadded())
|
||||||
count++
|
|
||||||
totalBytes += int64(d.Proposal.PieceSize.Unpadded())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := json.NewEncoder(w).Encode(&dealAverageResp{
|
if err := json.NewEncoder(w).Encode(&dealAverageResp{
|
||||||
AverageSize: totalBytes / count,
|
Endpoint: "AVERAGE_DEAL_SIZE",
|
||||||
Epoch: int64(head.Height()),
|
Payload: totalBytes / int64(len(deals)),
|
||||||
|
Epoch: epoch,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
log.Warnf("failed to write back deal average response: %s", err)
|
log.Warnf("failed to write back deal average response: %s", err)
|
||||||
return
|
return
|
||||||
@ -118,37 +103,27 @@ func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter,
|
|||||||
}
|
}
|
||||||
|
|
||||||
type dealTotalResp struct {
|
type dealTotalResp struct {
|
||||||
TotalBytes int64 `json:"total_size"`
|
Epoch int64 `json:"epoch"`
|
||||||
Epoch int64 `json:"epoch"`
|
Endpoint string `json:"endpoint"`
|
||||||
|
Payload int64 `json:"payload"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r *http.Request) {
|
func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := context.Background()
|
epoch, deals := dss.filteredDealList()
|
||||||
|
if epoch == 0 {
|
||||||
head, err := dss.api.ChainHead(ctx)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("failed to get chain head: %s", err)
|
|
||||||
w.WriteHeader(500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("failed to get market deals: %s", err)
|
|
||||||
w.WriteHeader(500)
|
w.WriteHeader(500)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var totalBytes int64
|
var totalBytes int64
|
||||||
for _, d := range deals {
|
for _, d := range deals {
|
||||||
if !filteredClients[d.Proposal.Client] {
|
totalBytes += int64(d.deal.Proposal.PieceSize.Unpadded())
|
||||||
totalBytes += int64(d.Proposal.PieceSize.Unpadded())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := json.NewEncoder(w).Encode(&dealTotalResp{
|
if err := json.NewEncoder(w).Encode(&dealTotalResp{
|
||||||
TotalBytes: totalBytes,
|
Endpoint: "DEAL_BYTES",
|
||||||
Epoch: int64(head.Height()),
|
Payload: totalBytes,
|
||||||
|
Epoch: epoch,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
log.Warnf("failed to write back deal average response: %s", err)
|
log.Warnf("failed to write back deal average response: %s", err)
|
||||||
return
|
return
|
||||||
@ -157,6 +132,12 @@ func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r
|
|||||||
}
|
}
|
||||||
|
|
||||||
type clientStatsOutput struct {
|
type clientStatsOutput struct {
|
||||||
|
Epoch int64 `json:"epoch"`
|
||||||
|
Endpoint string `json:"endpoint"`
|
||||||
|
Payload []*clientStats `json:"payload"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type clientStats struct {
|
||||||
Client address.Address `json:"client"`
|
Client address.Address `json:"client"`
|
||||||
DataSize int64 `json:"data_size"`
|
DataSize int64 `json:"data_size"`
|
||||||
NumCids int `json:"num_cids"`
|
NumCids int `json:"num_cids"`
|
||||||
@ -168,51 +149,41 @@ type clientStatsOutput struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *http.Request) {
|
func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := context.Background()
|
epoch, deals := dss.filteredDealList()
|
||||||
|
if epoch == 0 {
|
||||||
head, err := dss.api.ChainHead(ctx)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("failed to get chain head: %s", err)
|
|
||||||
w.WriteHeader(500)
|
w.WriteHeader(500)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
|
stats := make(map[address.Address]*clientStats)
|
||||||
if err != nil {
|
|
||||||
log.Warnf("failed to get market deals: %s", err)
|
|
||||||
w.WriteHeader(500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
stats := make(map[address.Address]*clientStatsOutput)
|
|
||||||
|
|
||||||
for _, d := range deals {
|
for _, d := range deals {
|
||||||
if filteredClients[d.Proposal.Client] {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
st, ok := stats[d.Proposal.Client]
|
st, ok := stats[d.deal.Proposal.Client]
|
||||||
if !ok {
|
if !ok {
|
||||||
st = &clientStatsOutput{
|
st = &clientStats{
|
||||||
Client: d.Proposal.Client,
|
Client: d.resolvedWallet,
|
||||||
cids: make(map[cid.Cid]bool),
|
cids: make(map[cid.Cid]bool),
|
||||||
providers: make(map[address.Address]bool),
|
providers: make(map[address.Address]bool),
|
||||||
}
|
}
|
||||||
stats[d.Proposal.Client] = st
|
stats[d.deal.Proposal.Client] = st
|
||||||
}
|
}
|
||||||
|
|
||||||
st.DataSize += int64(d.Proposal.PieceSize.Unpadded())
|
st.DataSize += int64(d.deal.Proposal.PieceSize.Unpadded())
|
||||||
st.cids[d.Proposal.PieceCID] = true
|
st.cids[d.deal.Proposal.PieceCID] = true
|
||||||
st.providers[d.Proposal.Provider] = true
|
st.providers[d.deal.Proposal.Provider] = true
|
||||||
st.NumDeals++
|
st.NumDeals++
|
||||||
}
|
}
|
||||||
|
|
||||||
out := make([]*clientStatsOutput, 0, len(stats))
|
out := clientStatsOutput{
|
||||||
for _, cso := range stats {
|
Epoch: epoch,
|
||||||
cso.NumCids = len(cso.cids)
|
Endpoint: "CLIENT_DEAL_STATS",
|
||||||
cso.NumMiners = len(cso.providers)
|
Payload: make([]*clientStats, 0, len(stats)),
|
||||||
|
}
|
||||||
out = append(out, cso)
|
for _, cs := range stats {
|
||||||
|
cs.NumCids = len(cs.cids)
|
||||||
|
cs.NumMiners = len(cs.providers)
|
||||||
|
out.Payload = append(out.Payload, cs)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := json.NewEncoder(w).Encode(out); err != nil {
|
if err := json.NewEncoder(w).Encode(out); err != nil {
|
||||||
@ -221,6 +192,93 @@ func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *h
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type dealInfo struct {
|
||||||
|
deal api.MarketDeal
|
||||||
|
resolvedWallet address.Address
|
||||||
|
}
|
||||||
|
|
||||||
|
// filteredDealList returns the current epoch and a list of filtered deals
|
||||||
|
// on error returns an epoch of 0
|
||||||
|
func (dss *dealStatsServer) filteredDealList() (int64, map[string]dealInfo) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
head, err := dss.api.ChainHead(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to get chain head: %s", err)
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
head, err = dss.api.ChainGetTipSetByHeight(ctx, head.Height()-epochLookback, head.Key())
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to walk back %s epochs: %s", epochLookback, err)
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Disabled as per @pooja's request
|
||||||
|
//
|
||||||
|
// // Exclude any address associated with a miner
|
||||||
|
// miners, err := dss.api.StateListMiners(ctx, head.Key())
|
||||||
|
// if err != nil {
|
||||||
|
// log.Warnf("failed to get miner list: %s", err)
|
||||||
|
// return 0, nil
|
||||||
|
// }
|
||||||
|
// for _, m := range miners {
|
||||||
|
// info, err := dss.api.StateMinerInfo(ctx, m, head.Key())
|
||||||
|
// if err != nil {
|
||||||
|
// log.Warnf("failed to get info for known miner '%s': %s", m, err)
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
|
||||||
|
// knownFiltered.Store(info.Owner, true)
|
||||||
|
// knownFiltered.Store(info.Worker, true)
|
||||||
|
// for _, a := range info.ControlAddresses {
|
||||||
|
// knownFiltered.Store(a, true)
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to get market deals: %s", err)
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ret := make(map[string]dealInfo, len(deals))
|
||||||
|
for dealKey, d := range deals {
|
||||||
|
|
||||||
|
// Counting no-longer-active deals as per Pooja's request
|
||||||
|
// // https://github.com/filecoin-project/specs-actors/blob/v0.9.9/actors/builtin/market/deal.go#L81-L85
|
||||||
|
// if d.State.SectorStartEpoch < 0 {
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
|
||||||
|
if _, isFiltered := knownFiltered.Load(d.Proposal.Client); isFiltered {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, wasSeen := resolvedWallets.Load(d.Proposal.Client); !wasSeen {
|
||||||
|
w, err := dss.api.StateAccountKey(ctx, d.Proposal.Client, head.Key())
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to resolve id '%s' to wallet address: %s", d.Proposal.Client, err)
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
resolvedWallets.Store(d.Proposal.Client, w)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
w, _ := resolvedWallets.Load(d.Proposal.Client)
|
||||||
|
if _, isFiltered := knownFiltered.Load(w); isFiltered {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
ret[dealKey] = dealInfo{
|
||||||
|
deal: d,
|
||||||
|
resolvedWallet: w.(address.Address),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return int64(head.Height()), ret
|
||||||
|
}
|
||||||
|
|
||||||
var serveDealStatsCmd = &cli.Command{
|
var serveDealStatsCmd = &cli.Command{
|
||||||
Name: "serve-deal-stats",
|
Name: "serve-deal-stats",
|
||||||
Flags: []cli.Flag{},
|
Flags: []cli.Flag{},
|
||||||
@ -260,6 +318,8 @@ var serveDealStatsCmd = &cli.Command{
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Warnf("deal-stat server listening on %s\n== NOTE: QUERIES ARE EXPENSIVE - YOU MUST FRONT-CACHE THIS SERVICE\n", list.Addr().String())
|
||||||
|
|
||||||
return s.Serve(list)
|
return s.Serve(list)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user