Merge pull request #3963 from filecoin-project/feat/dealstats
Add basic deal stats api server for spacerace slingshot
This commit is contained in:
commit
f4f5ef6f83
265
cmd/lotus-shed/dealtracker.go
Normal file
265
cmd/lotus-shed/dealtracker.go
Normal file
@ -0,0 +1,265 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
type dealStatsServer struct {
|
||||
api api.FullNode
|
||||
}
|
||||
|
||||
var filteredClients map[address.Address]bool
|
||||
|
||||
func init() {
|
||||
fc := []string{"t0112", "t0113", "t0114", "t010089"}
|
||||
|
||||
filtered, set := os.LookupEnv("FILTERED_CLIENTS")
|
||||
if set {
|
||||
fc = strings.Split(filtered, ":")
|
||||
}
|
||||
|
||||
filteredClients = make(map[address.Address]bool)
|
||||
for _, a := range fc {
|
||||
addr, err := address.NewFromString(a)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
filteredClients[addr] = true
|
||||
}
|
||||
}
|
||||
|
||||
type dealCountResp struct {
|
||||
Total int64 `json:"total"`
|
||||
Epoch int64 `json:"epoch"`
|
||||
}
|
||||
|
||||
func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := context.Background()
|
||||
|
||||
head, err := dss.api.ChainHead(ctx)
|
||||
if err != nil {
|
||||
log.Warnf("failed to get chain head: %s", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
|
||||
if err != nil {
|
||||
log.Warnf("failed to get market deals: %s", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
var count int64
|
||||
for _, d := range deals {
|
||||
if !filteredClients[d.Proposal.Client] {
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(w).Encode(&dealCountResp{
|
||||
Total: count,
|
||||
Epoch: int64(head.Height()),
|
||||
}); err != nil {
|
||||
log.Warnf("failed to write back deal count response: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type dealAverageResp struct {
|
||||
AverageSize int64 `json:"average_size"`
|
||||
Epoch int64 `json:"epoch"`
|
||||
}
|
||||
|
||||
func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := context.Background()
|
||||
|
||||
head, err := dss.api.ChainHead(ctx)
|
||||
if err != nil {
|
||||
log.Warnf("failed to get chain head: %s", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
|
||||
if err != nil {
|
||||
log.Warnf("failed to get market deals: %s", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
var count int64
|
||||
var totalBytes int64
|
||||
for _, d := range deals {
|
||||
if !filteredClients[d.Proposal.Client] {
|
||||
count++
|
||||
totalBytes += int64(d.Proposal.PieceSize.Unpadded())
|
||||
}
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(w).Encode(&dealAverageResp{
|
||||
AverageSize: totalBytes / count,
|
||||
Epoch: int64(head.Height()),
|
||||
}); err != nil {
|
||||
log.Warnf("failed to write back deal average response: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type dealTotalResp struct {
|
||||
TotalBytes int64 `json:"total_size"`
|
||||
Epoch int64 `json:"epoch"`
|
||||
}
|
||||
|
||||
func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := context.Background()
|
||||
|
||||
head, err := dss.api.ChainHead(ctx)
|
||||
if err != nil {
|
||||
log.Warnf("failed to get chain head: %s", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
|
||||
if err != nil {
|
||||
log.Warnf("failed to get market deals: %s", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
var totalBytes int64
|
||||
for _, d := range deals {
|
||||
if !filteredClients[d.Proposal.Client] {
|
||||
totalBytes += int64(d.Proposal.PieceSize.Unpadded())
|
||||
}
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(w).Encode(&dealTotalResp{
|
||||
TotalBytes: totalBytes,
|
||||
Epoch: int64(head.Height()),
|
||||
}); err != nil {
|
||||
log.Warnf("failed to write back deal average response: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
type clientStatsOutput struct {
|
||||
Client address.Address `json:"client"`
|
||||
DataSize int64 `json:"data_size"`
|
||||
NumCids int `json:"num_cids"`
|
||||
NumDeals int `json:"num_deals"`
|
||||
NumMiners int `json:"num_miners"`
|
||||
|
||||
cids map[cid.Cid]bool
|
||||
providers map[address.Address]bool
|
||||
}
|
||||
|
||||
func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := context.Background()
|
||||
|
||||
head, err := dss.api.ChainHead(ctx)
|
||||
if err != nil {
|
||||
log.Warnf("failed to get chain head: %s", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
|
||||
if err != nil {
|
||||
log.Warnf("failed to get market deals: %s", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
stats := make(map[address.Address]*clientStatsOutput)
|
||||
|
||||
for _, d := range deals {
|
||||
if filteredClients[d.Proposal.Client] {
|
||||
continue
|
||||
}
|
||||
|
||||
st, ok := stats[d.Proposal.Client]
|
||||
if !ok {
|
||||
st = &clientStatsOutput{
|
||||
Client: d.Proposal.Client,
|
||||
cids: make(map[cid.Cid]bool),
|
||||
providers: make(map[address.Address]bool),
|
||||
}
|
||||
stats[d.Proposal.Client] = st
|
||||
}
|
||||
|
||||
st.DataSize += int64(d.Proposal.PieceSize.Unpadded())
|
||||
st.cids[d.Proposal.PieceCID] = true
|
||||
st.providers[d.Proposal.Provider] = true
|
||||
st.NumDeals++
|
||||
}
|
||||
|
||||
out := make([]*clientStatsOutput, 0, len(stats))
|
||||
for _, cso := range stats {
|
||||
cso.NumCids = len(cso.cids)
|
||||
cso.NumMiners = len(cso.providers)
|
||||
|
||||
out = append(out, cso)
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(w).Encode(out); err != nil {
|
||||
log.Warnf("failed to write back client stats response: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
var serveDealStatsCmd = &cli.Command{
|
||||
Name: "serve-deal-stats",
|
||||
Flags: []cli.Flag{},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := lcli.GetFullNodeAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer closer()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
_ = ctx
|
||||
|
||||
dss := &dealStatsServer{api}
|
||||
|
||||
mux := &http.ServeMux{}
|
||||
mux.HandleFunc("/api/storagedeal/count", dss.handleStorageDealCount)
|
||||
mux.HandleFunc("/api/storagedeal/averagesize", dss.handleStorageDealAverageSize)
|
||||
mux.HandleFunc("/api/storagedeal/totalreal", dss.handleStorageDealTotalReal)
|
||||
mux.HandleFunc("/api/storagedeal/clientstats", dss.handleStorageClientStats)
|
||||
|
||||
s := &http.Server{
|
||||
Addr: ":7272",
|
||||
Handler: mux,
|
||||
}
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
if err := s.Shutdown(context.TODO()); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}()
|
||||
|
||||
list, err := net.Listen("tcp", ":7272") // nolint
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return s.Serve(list)
|
||||
},
|
||||
}
|
@ -36,6 +36,7 @@ func main() {
|
||||
mpoolStatsCmd,
|
||||
exportChainCmd,
|
||||
consensusCmd,
|
||||
serveDealStatsCmd,
|
||||
}
|
||||
|
||||
app := &cli.App{
|
||||
|
Loading…
Reference in New Issue
Block a user