lotus/cmd/lotus-shed/dealtracker.go
2020-09-27 20:45:45 +02:00

326 lines
8.1 KiB
Go

package main
import (
"context"
"encoding/json"
"net"
"net/http"
"sync"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/ipfs/go-cid"
"github.com/urfave/cli/v2"
)
type dealStatsServer struct {
api api.FullNode
}
// Requested by @jbenet
// How many epochs back to look at for dealstats
var epochLookback = abi.ChainEpoch(10)
// these lists grow continuously with the network
// TODO: need to switch this to an LRU of sorts, to ensure refreshes
var knownFiltered = new(sync.Map)
var resolvedWallets = new(sync.Map)
func init() {
for _, a := range []string{
"t0100", // client for genesis miner
"t0101", // client for genesis miner
"t0102", // client for genesis miner
"t0112", // client for genesis miner
"t0113", // client for genesis miner
"t0114", // client for genesis miner
"t1nslxql4pck5pq7hddlzym3orxlx35wkepzjkm3i", // SR1 dealbot wallet
"t1stghxhdp2w53dym2nz2jtbpk6ccd4l2lxgmezlq", // SR1 dealbot wallet
"t1mcr5xkgv4jdl3rnz77outn6xbmygb55vdejgbfi", // SR1 dealbot wallet
"t1qiqdbbmrdalbntnuapriirduvxu5ltsc5mhy7si", // SR1 dealbot wallet
} {
a, err := address.NewFromString(a)
if err != nil {
panic(err)
}
knownFiltered.Store(a, true)
}
}
type dealCountResp struct {
Epoch int64 `json:"epoch"`
Endpoint string `json:"endpoint"`
Payload int64 `json:"payload"`
}
func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *http.Request) {
epoch, deals := dss.filteredDealList()
if epoch == 0 {
w.WriteHeader(500)
return
}
if err := json.NewEncoder(w).Encode(&dealCountResp{
Endpoint: "COUNT_DEALS",
Payload: int64(len(deals)),
Epoch: epoch,
}); err != nil {
log.Warnf("failed to write back deal count response: %s", err)
return
}
}
type dealAverageResp struct {
Epoch int64 `json:"epoch"`
Endpoint string `json:"endpoint"`
Payload int64 `json:"payload"`
}
func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, r *http.Request) {
epoch, deals := dss.filteredDealList()
if epoch == 0 {
w.WriteHeader(500)
return
}
var totalBytes int64
for _, d := range deals {
totalBytes += int64(d.deal.Proposal.PieceSize.Unpadded())
}
if err := json.NewEncoder(w).Encode(&dealAverageResp{
Endpoint: "AVERAGE_DEAL_SIZE",
Payload: totalBytes / int64(len(deals)),
Epoch: epoch,
}); err != nil {
log.Warnf("failed to write back deal average response: %s", err)
return
}
}
type dealTotalResp struct {
Epoch int64 `json:"epoch"`
Endpoint string `json:"endpoint"`
Payload int64 `json:"payload"`
}
func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r *http.Request) {
epoch, deals := dss.filteredDealList()
if epoch == 0 {
w.WriteHeader(500)
return
}
var totalBytes int64
for _, d := range deals {
totalBytes += int64(d.deal.Proposal.PieceSize.Unpadded())
}
if err := json.NewEncoder(w).Encode(&dealTotalResp{
Endpoint: "DEAL_BYTES",
Payload: totalBytes,
Epoch: epoch,
}); err != nil {
log.Warnf("failed to write back deal average response: %s", err)
return
}
}
type clientStatsOutput struct {
Epoch int64 `json:"epoch"`
Endpoint string `json:"endpoint"`
Payload []*clientStats `json:"payload"`
}
type clientStats struct {
Client address.Address `json:"client"`
DataSize int64 `json:"data_size"`
NumCids int `json:"num_cids"`
NumDeals int `json:"num_deals"`
NumMiners int `json:"num_miners"`
cids map[cid.Cid]bool
providers map[address.Address]bool
}
func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *http.Request) {
epoch, deals := dss.filteredDealList()
if epoch == 0 {
w.WriteHeader(500)
return
}
stats := make(map[address.Address]*clientStats)
for _, d := range deals {
st, ok := stats[d.deal.Proposal.Client]
if !ok {
st = &clientStats{
Client: d.resolvedWallet,
cids: make(map[cid.Cid]bool),
providers: make(map[address.Address]bool),
}
stats[d.deal.Proposal.Client] = st
}
st.DataSize += int64(d.deal.Proposal.PieceSize.Unpadded())
st.cids[d.deal.Proposal.PieceCID] = true
st.providers[d.deal.Proposal.Provider] = true
st.NumDeals++
}
out := clientStatsOutput{
Epoch: epoch,
Endpoint: "CLIENT_DEAL_STATS",
Payload: make([]*clientStats, 0, len(stats)),
}
for _, cs := range stats {
cs.NumCids = len(cs.cids)
cs.NumMiners = len(cs.providers)
out.Payload = append(out.Payload, cs)
}
if err := json.NewEncoder(w).Encode(out); err != nil {
log.Warnf("failed to write back client stats response: %s", err)
return
}
}
type dealInfo struct {
deal api.MarketDeal
resolvedWallet address.Address
}
// filteredDealList returns the current epoch and a list of filtered deals
// on error returns an epoch of 0
func (dss *dealStatsServer) filteredDealList() (int64, map[string]dealInfo) {
ctx := context.Background()
head, err := dss.api.ChainHead(ctx)
if err != nil {
log.Warnf("failed to get chain head: %s", err)
return 0, nil
}
head, err = dss.api.ChainGetTipSetByHeight(ctx, head.Height()-epochLookback, head.Key())
if err != nil {
log.Warnf("failed to walk back %s epochs: %s", epochLookback, err)
return 0, nil
}
// Disabled as per @pooja's request
//
// // Exclude any address associated with a miner
// miners, err := dss.api.StateListMiners(ctx, head.Key())
// if err != nil {
// log.Warnf("failed to get miner list: %s", err)
// return 0, nil
// }
// for _, m := range miners {
// info, err := dss.api.StateMinerInfo(ctx, m, head.Key())
// if err != nil {
// log.Warnf("failed to get info for known miner '%s': %s", m, err)
// continue
// }
// knownFiltered.Store(info.Owner, true)
// knownFiltered.Store(info.Worker, true)
// for _, a := range info.ControlAddresses {
// knownFiltered.Store(a, true)
// }
// }
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
if err != nil {
log.Warnf("failed to get market deals: %s", err)
return 0, nil
}
ret := make(map[string]dealInfo, len(deals))
for dealKey, d := range deals {
// Counting no-longer-active deals as per Pooja's request
// // https://github.com/filecoin-project/specs-actors/blob/v0.9.9/actors/builtin/market/deal.go#L81-L85
// if d.State.SectorStartEpoch < 0 {
// continue
// }
if _, isFiltered := knownFiltered.Load(d.Proposal.Client); isFiltered {
continue
}
if _, wasSeen := resolvedWallets.Load(d.Proposal.Client); !wasSeen {
w, err := dss.api.StateAccountKey(ctx, d.Proposal.Client, head.Key())
if err != nil {
log.Warnf("failed to resolve id '%s' to wallet address: %s", d.Proposal.Client, err)
continue
} else {
resolvedWallets.Store(d.Proposal.Client, w)
}
}
w, _ := resolvedWallets.Load(d.Proposal.Client)
if _, isFiltered := knownFiltered.Load(w); isFiltered {
continue
}
ret[dealKey] = dealInfo{
deal: d,
resolvedWallet: w.(address.Address),
}
}
return int64(head.Height()), ret
}
var serveDealStatsCmd = &cli.Command{
Name: "serve-deal-stats",
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
_ = ctx
dss := &dealStatsServer{api}
mux := &http.ServeMux{}
mux.HandleFunc("/api/storagedeal/count", dss.handleStorageDealCount)
mux.HandleFunc("/api/storagedeal/averagesize", dss.handleStorageDealAverageSize)
mux.HandleFunc("/api/storagedeal/totalreal", dss.handleStorageDealTotalReal)
mux.HandleFunc("/api/storagedeal/clientstats", dss.handleStorageClientStats)
s := &http.Server{
Addr: ":7272",
Handler: mux,
}
go func() {
<-ctx.Done()
if err := s.Shutdown(context.TODO()); err != nil {
log.Error(err)
}
}()
list, err := net.Listen("tcp", ":7272") // nolint
if err != nil {
panic(err)
}
log.Warnf("deal-stat server listening on %s\n== NOTE: QUERIES ARE EXPENSIVE - YOU MUST FRONT-CACHE THIS SERVICE\n", list.Addr().String())
return s.Serve(list)
},
}