Merge pull request #4971 from filecoin-project/chore/new_sr2_deal_rollup_cmd
New SR-specific lotus-shed cmd
This commit is contained in:
commit
9cef300535
@ -1,325 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
type dealStatsServer struct {
|
||||
api api.FullNode
|
||||
}
|
||||
|
||||
// Requested by @jbenet
|
||||
// How many epochs back to look at for dealstats
|
||||
var epochLookback = abi.ChainEpoch(10)
|
||||
|
||||
// these lists grow continuously with the network
|
||||
// TODO: need to switch this to an LRU of sorts, to ensure refreshes
|
||||
var knownFiltered = new(sync.Map)
|
||||
var resolvedWallets = new(sync.Map)
|
||||
|
||||
func init() {
|
||||
for _, a := range []string{
|
||||
"t0100", // client for genesis miner
|
||||
"t0101", // client for genesis miner
|
||||
"t0102", // client for genesis miner
|
||||
"t0112", // client for genesis miner
|
||||
"t0113", // client for genesis miner
|
||||
"t0114", // client for genesis miner
|
||||
"t1nslxql4pck5pq7hddlzym3orxlx35wkepzjkm3i", // SR1 dealbot wallet
|
||||
"t1stghxhdp2w53dym2nz2jtbpk6ccd4l2lxgmezlq", // SR1 dealbot wallet
|
||||
"t1mcr5xkgv4jdl3rnz77outn6xbmygb55vdejgbfi", // SR1 dealbot wallet
|
||||
"t1qiqdbbmrdalbntnuapriirduvxu5ltsc5mhy7si", // SR1 dealbot wallet
|
||||
} {
|
||||
a, err := address.NewFromString(a)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
knownFiltered.Store(a, true)
|
||||
}
|
||||
}
|
||||
|
||||
type dealCountResp struct {
|
||||
Epoch int64 `json:"epoch"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
Payload int64 `json:"payload"`
|
||||
}
|
||||
|
||||
func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
epoch, deals := dss.filteredDealList()
|
||||
if epoch == 0 {
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(w).Encode(&dealCountResp{
|
||||
Endpoint: "COUNT_DEALS",
|
||||
Payload: int64(len(deals)),
|
||||
Epoch: epoch,
|
||||
}); err != nil {
|
||||
log.Warnf("failed to write back deal count response: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type dealAverageResp struct {
|
||||
Epoch int64 `json:"epoch"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
Payload int64 `json:"payload"`
|
||||
}
|
||||
|
||||
func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
epoch, deals := dss.filteredDealList()
|
||||
if epoch == 0 {
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
var totalBytes int64
|
||||
for _, d := range deals {
|
||||
totalBytes += int64(d.deal.Proposal.PieceSize.Unpadded())
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(w).Encode(&dealAverageResp{
|
||||
Endpoint: "AVERAGE_DEAL_SIZE",
|
||||
Payload: totalBytes / int64(len(deals)),
|
||||
Epoch: epoch,
|
||||
}); err != nil {
|
||||
log.Warnf("failed to write back deal average response: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type dealTotalResp struct {
|
||||
Epoch int64 `json:"epoch"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
Payload int64 `json:"payload"`
|
||||
}
|
||||
|
||||
func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r *http.Request) {
|
||||
epoch, deals := dss.filteredDealList()
|
||||
if epoch == 0 {
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
var totalBytes int64
|
||||
for _, d := range deals {
|
||||
totalBytes += int64(d.deal.Proposal.PieceSize.Unpadded())
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(w).Encode(&dealTotalResp{
|
||||
Endpoint: "DEAL_BYTES",
|
||||
Payload: totalBytes,
|
||||
Epoch: epoch,
|
||||
}); err != nil {
|
||||
log.Warnf("failed to write back deal average response: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
type clientStatsOutput struct {
|
||||
Epoch int64 `json:"epoch"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
Payload []*clientStats `json:"payload"`
|
||||
}
|
||||
|
||||
type clientStats struct {
|
||||
Client address.Address `json:"client"`
|
||||
DataSize int64 `json:"data_size"`
|
||||
NumCids int `json:"num_cids"`
|
||||
NumDeals int `json:"num_deals"`
|
||||
NumMiners int `json:"num_miners"`
|
||||
|
||||
cids map[cid.Cid]bool
|
||||
providers map[address.Address]bool
|
||||
}
|
||||
|
||||
func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *http.Request) {
|
||||
epoch, deals := dss.filteredDealList()
|
||||
if epoch == 0 {
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
stats := make(map[address.Address]*clientStats)
|
||||
|
||||
for _, d := range deals {
|
||||
|
||||
st, ok := stats[d.deal.Proposal.Client]
|
||||
if !ok {
|
||||
st = &clientStats{
|
||||
Client: d.resolvedWallet,
|
||||
cids: make(map[cid.Cid]bool),
|
||||
providers: make(map[address.Address]bool),
|
||||
}
|
||||
stats[d.deal.Proposal.Client] = st
|
||||
}
|
||||
|
||||
st.DataSize += int64(d.deal.Proposal.PieceSize.Unpadded())
|
||||
st.cids[d.deal.Proposal.PieceCID] = true
|
||||
st.providers[d.deal.Proposal.Provider] = true
|
||||
st.NumDeals++
|
||||
}
|
||||
|
||||
out := clientStatsOutput{
|
||||
Epoch: epoch,
|
||||
Endpoint: "CLIENT_DEAL_STATS",
|
||||
Payload: make([]*clientStats, 0, len(stats)),
|
||||
}
|
||||
for _, cs := range stats {
|
||||
cs.NumCids = len(cs.cids)
|
||||
cs.NumMiners = len(cs.providers)
|
||||
out.Payload = append(out.Payload, cs)
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(w).Encode(out); err != nil {
|
||||
log.Warnf("failed to write back client stats response: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type dealInfo struct {
|
||||
deal api.MarketDeal
|
||||
resolvedWallet address.Address
|
||||
}
|
||||
|
||||
// filteredDealList returns the current epoch and a list of filtered deals
|
||||
// on error returns an epoch of 0
|
||||
func (dss *dealStatsServer) filteredDealList() (int64, map[string]dealInfo) {
|
||||
ctx := context.Background()
|
||||
|
||||
head, err := dss.api.ChainHead(ctx)
|
||||
if err != nil {
|
||||
log.Warnf("failed to get chain head: %s", err)
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
head, err = dss.api.ChainGetTipSetByHeight(ctx, head.Height()-epochLookback, head.Key())
|
||||
if err != nil {
|
||||
log.Warnf("failed to walk back %s epochs: %s", epochLookback, err)
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Disabled as per @pooja's request
|
||||
//
|
||||
// // Exclude any address associated with a miner
|
||||
// miners, err := dss.api.StateListMiners(ctx, head.Key())
|
||||
// if err != nil {
|
||||
// log.Warnf("failed to get miner list: %s", err)
|
||||
// return 0, nil
|
||||
// }
|
||||
// for _, m := range miners {
|
||||
// info, err := dss.api.StateMinerInfo(ctx, m, head.Key())
|
||||
// if err != nil {
|
||||
// log.Warnf("failed to get info for known miner '%s': %s", m, err)
|
||||
// continue
|
||||
// }
|
||||
|
||||
// knownFiltered.Store(info.Owner, true)
|
||||
// knownFiltered.Store(info.Worker, true)
|
||||
// for _, a := range info.ControlAddresses {
|
||||
// knownFiltered.Store(a, true)
|
||||
// }
|
||||
// }
|
||||
|
||||
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
|
||||
if err != nil {
|
||||
log.Warnf("failed to get market deals: %s", err)
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
ret := make(map[string]dealInfo, len(deals))
|
||||
for dealKey, d := range deals {
|
||||
|
||||
// Counting no-longer-active deals as per Pooja's request
|
||||
// // https://github.com/filecoin-project/specs-actors/blob/v0.9.9/actors/builtin/market/deal.go#L81-L85
|
||||
// if d.State.SectorStartEpoch < 0 {
|
||||
// continue
|
||||
// }
|
||||
|
||||
if _, isFiltered := knownFiltered.Load(d.Proposal.Client); isFiltered {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, wasSeen := resolvedWallets.Load(d.Proposal.Client); !wasSeen {
|
||||
w, err := dss.api.StateAccountKey(ctx, d.Proposal.Client, head.Key())
|
||||
if err != nil {
|
||||
log.Warnf("failed to resolve id '%s' to wallet address: %s", d.Proposal.Client, err)
|
||||
continue
|
||||
} else {
|
||||
resolvedWallets.Store(d.Proposal.Client, w)
|
||||
}
|
||||
}
|
||||
|
||||
w, _ := resolvedWallets.Load(d.Proposal.Client)
|
||||
if _, isFiltered := knownFiltered.Load(w); isFiltered {
|
||||
continue
|
||||
}
|
||||
|
||||
ret[dealKey] = dealInfo{
|
||||
deal: d,
|
||||
resolvedWallet: w.(address.Address),
|
||||
}
|
||||
}
|
||||
|
||||
return int64(head.Height()), ret
|
||||
}
|
||||
|
||||
var serveDealStatsCmd = &cli.Command{
|
||||
Name: "serve-deal-stats",
|
||||
Flags: []cli.Flag{},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := lcli.GetFullNodeAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer closer()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
_ = ctx
|
||||
|
||||
dss := &dealStatsServer{api}
|
||||
|
||||
mux := &http.ServeMux{}
|
||||
mux.HandleFunc("/api/storagedeal/count", dss.handleStorageDealCount)
|
||||
mux.HandleFunc("/api/storagedeal/averagesize", dss.handleStorageDealAverageSize)
|
||||
mux.HandleFunc("/api/storagedeal/totalreal", dss.handleStorageDealTotalReal)
|
||||
mux.HandleFunc("/api/storagedeal/clientstats", dss.handleStorageClientStats)
|
||||
|
||||
s := &http.Server{
|
||||
Addr: ":7272",
|
||||
Handler: mux,
|
||||
}
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
if err := s.Shutdown(context.TODO()); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}()
|
||||
|
||||
list, err := net.Listen("tcp", ":7272") // nolint
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
log.Warnf("deal-stat server listening on %s\n== NOTE: QUERIES ARE EXPENSIVE - YOU MUST FRONT-CACHE THIS SERVICE\n", list.Addr().String())
|
||||
|
||||
return s.Serve(list)
|
||||
},
|
||||
}
|
@ -40,7 +40,7 @@ func main() {
|
||||
mpoolStatsCmd,
|
||||
exportChainCmd,
|
||||
consensusCmd,
|
||||
serveDealStatsCmd,
|
||||
rollupDealStatsCmd,
|
||||
syncCmd,
|
||||
stateTreePruneCmd,
|
||||
datastoreCmd,
|
||||
|
432
cmd/lotus-shed/sr2-dealstats-rollup.go
Normal file
432
cmd/lotus-shed/sr2-dealstats-rollup.go
Normal file
@ -0,0 +1,432 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/Jeffail/gabs"
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/urfave/cli/v2"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
// Requested by @jbenet
|
||||
// How many epochs back to look at for dealstats
|
||||
var epochLookback = abi.ChainEpoch(10)
|
||||
|
||||
var resolvedWallets = map[address.Address]address.Address{}
|
||||
var knownAddrMap = map[address.Address]string{}
|
||||
|
||||
//
|
||||
// contents of basic_stats.json
|
||||
type competitionTotalOutput struct {
|
||||
Epoch int64 `json:"epoch"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
Payload competitionTotal `json:"payload"`
|
||||
}
|
||||
type competitionTotal struct {
|
||||
UniqueCids int `json:"total_unique_cids"`
|
||||
UniqueProviders int `json:"total_unique_providers"`
|
||||
UniqueProjects int `json:"total_unique_projects"`
|
||||
UniqueClients int `json:"total_unique_clients"`
|
||||
TotalDeals int `json:"total_num_deals"`
|
||||
TotalBytes int64 `json:"total_stored_data_size"`
|
||||
|
||||
seenProject map[string]bool
|
||||
seenClient map[address.Address]bool
|
||||
seenProvider map[address.Address]bool
|
||||
seenPieceCid map[cid.Cid]bool
|
||||
}
|
||||
|
||||
//
|
||||
// contents of client_stats.json
|
||||
type projectAggregateStatsOutput struct {
|
||||
Epoch int64 `json:"epoch"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
Payload map[string]*projectAggregateStats `json:"payload"`
|
||||
}
|
||||
type projectAggregateStats struct {
|
||||
ProjectID string `json:"project_id"`
|
||||
DataSizeMaxProvider int64 `json:"max_data_size_stored_with_single_provider"`
|
||||
HighestCidDealCount int `json:"max_same_cid_deals"`
|
||||
DataSize int64 `json:"total_data_size"`
|
||||
NumCids int `json:"total_num_cids"`
|
||||
NumDeals int `json:"total_num_deals"`
|
||||
NumProviders int `json:"total_num_providers"`
|
||||
ClientStats map[string]*clientAggregateStats `json:"clients"`
|
||||
|
||||
dataPerProvider map[address.Address]int64
|
||||
cidDeals map[cid.Cid]int
|
||||
}
|
||||
type clientAggregateStats struct {
|
||||
Client string `json:"client"`
|
||||
DataSize int64 `json:"total_data_size"`
|
||||
NumCids int `json:"total_num_cids"`
|
||||
NumDeals int `json:"total_num_deals"`
|
||||
NumProviders int `json:"total_num_providers"`
|
||||
|
||||
providers map[address.Address]bool
|
||||
cids map[cid.Cid]bool
|
||||
}
|
||||
|
||||
//
|
||||
// contents of deals_list_{{projid}}.json
|
||||
type dealListOutput struct {
|
||||
Epoch int64 `json:"epoch"`
|
||||
Endpoint string `json:"endpoint"`
|
||||
Payload []*individualDeal `json:"payload"`
|
||||
}
|
||||
type individualDeal struct {
|
||||
ProjectID string `json:"project_id"`
|
||||
Client string `json:"client"`
|
||||
DealID string `json:"deal_id"`
|
||||
DealStartEpoch int64 `json:"deal_start_epoch"`
|
||||
MinerID string `json:"miner_id"`
|
||||
PayloadCID string `json:"payload_cid"`
|
||||
PaddedSize int64 `json:"data_size"`
|
||||
}
|
||||
|
||||
var rollupDealStatsCmd = &cli.Command{
|
||||
Name: "rollup-deal-stats",
|
||||
Flags: []cli.Flag{},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
|
||||
if cctx.Args().Len() != 2 || cctx.Args().Get(0) == "" || cctx.Args().Get(1) == "" {
|
||||
return errors.New("must supply 2 arguments: a nonexistent target directory to write results to and a source of currently active projects")
|
||||
}
|
||||
|
||||
outDirName := cctx.Args().Get(0)
|
||||
if _, err := os.Stat(outDirName); err == nil {
|
||||
return fmt.Errorf("unable to proceed: supplied stat target '%s' already exists", outDirName)
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(outDirName, 0755); err != nil {
|
||||
return fmt.Errorf("creation of destination '%s' failed: %s", outDirName, err)
|
||||
}
|
||||
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
projListName := cctx.Args().Get(1)
|
||||
var projListFh *os.File
|
||||
|
||||
{
|
||||
// Parses JSON input in the form:
|
||||
// {
|
||||
// "payload": [
|
||||
// {
|
||||
// "project": "5fb5f5b3ad3275e236287ce3",
|
||||
// "address": "f3w3r2c6iukyh3u6f6kx62s5g6n2gf54aqp33ukqrqhje2y6xhf7k55przg4xqgahpcdal6laljz6zonma5pka"
|
||||
// },
|
||||
// {
|
||||
// "project": "5fb608c4ad3275e236287ced",
|
||||
// "address": "f3rs2khurnubol6ent27lpggidxxujqo2lg5aap5d5bmtam6yjb5wfla5cxxdgj45tqoaawgpzt5lofc3vpzfq"
|
||||
// },
|
||||
// ...
|
||||
// ]
|
||||
// }
|
||||
if strings.HasPrefix(projListName, "http://") || strings.HasPrefix(projListName, "https://") {
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", projListName, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close() //nolint:errcheck
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return xerrors.Errorf("non-200 response: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
projListFh, err = os.Create(outDirName + "/client_list.json")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = io.Copy(projListFh, resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
return errors.New("file inputs not yet supported")
|
||||
}
|
||||
|
||||
if _, err := projListFh.Seek(0, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
defer projListFh.Close() //nolint:errcheck
|
||||
|
||||
projList, err := gabs.ParseJSONBuffer(projListFh)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
proj, err := projList.Search("payload").Children()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, p := range proj {
|
||||
a, err := address.NewFromString(p.S("address").Data().(string))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
knownAddrMap[a] = p.S("project").Data().(string)
|
||||
}
|
||||
|
||||
if len(knownAddrMap) == 0 {
|
||||
return fmt.Errorf("no active projects/clients found in '%s': unable to continue", projListName)
|
||||
}
|
||||
}
|
||||
|
||||
outClientStatsFd, err := os.Create(outDirName + "/client_stats.json")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer outClientStatsFd.Close() //nolint:errcheck
|
||||
|
||||
outBasicStatsFd, err := os.Create(outDirName + "/basic_stats.json")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer outBasicStatsFd.Close() //nolint:errcheck
|
||||
|
||||
outUnfilteredStatsFd, err := os.Create(outDirName + "/unfiltered_basic_stats.json")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer outUnfilteredStatsFd.Close() //nolint:errcheck
|
||||
|
||||
api, apiCloser, err := lcli.GetFullNodeAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer apiCloser()
|
||||
|
||||
head, err := api.ChainHead(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
head, err = api.ChainGetTipSetByHeight(ctx, head.Height()-epochLookback, head.Key())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
grandTotals := competitionTotal{
|
||||
seenProject: make(map[string]bool),
|
||||
seenClient: make(map[address.Address]bool),
|
||||
seenProvider: make(map[address.Address]bool),
|
||||
seenPieceCid: make(map[cid.Cid]bool),
|
||||
}
|
||||
|
||||
unfilteredGrandTotals := competitionTotal{
|
||||
seenClient: make(map[address.Address]bool),
|
||||
seenProvider: make(map[address.Address]bool),
|
||||
seenPieceCid: make(map[cid.Cid]bool),
|
||||
}
|
||||
|
||||
projStats := make(map[string]*projectAggregateStats)
|
||||
projDealLists := make(map[string][]*individualDeal)
|
||||
|
||||
deals, err := api.StateMarketDeals(ctx, head.Key())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for dealID, dealInfo := range deals {
|
||||
|
||||
// Counting no-longer-active deals as per Pooja's request
|
||||
// // https://github.com/filecoin-project/specs-actors/blob/v0.9.9/actors/builtin/market/deal.go#L81-L85
|
||||
// if d.State.SectorStartEpoch < 0 {
|
||||
// continue
|
||||
// }
|
||||
|
||||
clientAddr, found := resolvedWallets[dealInfo.Proposal.Client]
|
||||
if !found {
|
||||
var err error
|
||||
clientAddr, err = api.StateAccountKey(ctx, dealInfo.Proposal.Client, head.Key())
|
||||
if err != nil {
|
||||
log.Warnf("failed to resolve id '%s' to wallet address: %s", dealInfo.Proposal.Client, err)
|
||||
continue
|
||||
}
|
||||
|
||||
resolvedWallets[dealInfo.Proposal.Client] = clientAddr
|
||||
}
|
||||
|
||||
unfilteredGrandTotals.seenClient[clientAddr] = true
|
||||
unfilteredGrandTotals.TotalBytes += int64(dealInfo.Proposal.PieceSize)
|
||||
unfilteredGrandTotals.seenProvider[dealInfo.Proposal.Provider] = true
|
||||
unfilteredGrandTotals.seenPieceCid[dealInfo.Proposal.PieceCID] = true
|
||||
unfilteredGrandTotals.TotalDeals++
|
||||
|
||||
projID, projKnown := knownAddrMap[clientAddr]
|
||||
if !projKnown {
|
||||
continue
|
||||
}
|
||||
|
||||
grandTotals.seenProject[projID] = true
|
||||
grandTotals.seenClient[clientAddr] = true
|
||||
|
||||
projStatEntry, ok := projStats[projID]
|
||||
if !ok {
|
||||
projStatEntry = &projectAggregateStats{
|
||||
ProjectID: projID,
|
||||
ClientStats: make(map[string]*clientAggregateStats),
|
||||
cidDeals: make(map[cid.Cid]int),
|
||||
dataPerProvider: make(map[address.Address]int64),
|
||||
}
|
||||
projStats[projID] = projStatEntry
|
||||
}
|
||||
|
||||
clientStatEntry, ok := projStatEntry.ClientStats[clientAddr.String()]
|
||||
if !ok {
|
||||
clientStatEntry = &clientAggregateStats{
|
||||
Client: clientAddr.String(),
|
||||
cids: make(map[cid.Cid]bool),
|
||||
providers: make(map[address.Address]bool),
|
||||
}
|
||||
projStatEntry.ClientStats[clientAddr.String()] = clientStatEntry
|
||||
}
|
||||
|
||||
grandTotals.TotalBytes += int64(dealInfo.Proposal.PieceSize)
|
||||
projStatEntry.DataSize += int64(dealInfo.Proposal.PieceSize)
|
||||
clientStatEntry.DataSize += int64(dealInfo.Proposal.PieceSize)
|
||||
|
||||
grandTotals.seenProvider[dealInfo.Proposal.Provider] = true
|
||||
projStatEntry.dataPerProvider[dealInfo.Proposal.Provider] += int64(dealInfo.Proposal.PieceSize)
|
||||
clientStatEntry.providers[dealInfo.Proposal.Provider] = true
|
||||
|
||||
grandTotals.seenPieceCid[dealInfo.Proposal.PieceCID] = true
|
||||
projStatEntry.cidDeals[dealInfo.Proposal.PieceCID]++
|
||||
clientStatEntry.cids[dealInfo.Proposal.PieceCID] = true
|
||||
|
||||
grandTotals.TotalDeals++
|
||||
projStatEntry.NumDeals++
|
||||
clientStatEntry.NumDeals++
|
||||
|
||||
payloadCid := "unknown"
|
||||
if c, err := cid.Parse(dealInfo.Proposal.Label); err == nil {
|
||||
payloadCid = c.String()
|
||||
}
|
||||
|
||||
projDealLists[projID] = append(projDealLists[projID], &individualDeal{
|
||||
DealID: dealID,
|
||||
ProjectID: projID,
|
||||
Client: clientAddr.String(),
|
||||
MinerID: dealInfo.Proposal.Provider.String(),
|
||||
PayloadCID: payloadCid,
|
||||
PaddedSize: int64(dealInfo.Proposal.PieceSize),
|
||||
DealStartEpoch: int64(dealInfo.State.SectorStartEpoch),
|
||||
})
|
||||
}
|
||||
|
||||
//
|
||||
// Write out per-project deal lists
|
||||
for proj, dl := range projDealLists {
|
||||
err := func() error {
|
||||
outListFd, err := os.Create(fmt.Sprintf(outDirName+"/deals_list_%s.json", proj))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer outListFd.Close() //nolint:errcheck
|
||||
|
||||
ridiculousLintMandatedRebind := dl
|
||||
sort.Slice(dl, func(i, j int) bool {
|
||||
return ridiculousLintMandatedRebind[j].PaddedSize < ridiculousLintMandatedRebind[i].PaddedSize
|
||||
})
|
||||
|
||||
if err := json.NewEncoder(outListFd).Encode(
|
||||
dealListOutput{
|
||||
Epoch: int64(head.Height()),
|
||||
Endpoint: "DEAL_LIST",
|
||||
Payload: dl,
|
||||
},
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// write out basic_stats.json and unfiltered_basic_stats.json
|
||||
for _, st := range []*competitionTotal{&grandTotals, &unfilteredGrandTotals} {
|
||||
st.UniqueCids = len(st.seenPieceCid)
|
||||
st.UniqueClients = len(st.seenClient)
|
||||
st.UniqueProviders = len(st.seenProvider)
|
||||
if st.seenProject != nil {
|
||||
st.UniqueProjects = len(st.seenProject)
|
||||
}
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(outBasicStatsFd).Encode(
|
||||
competitionTotalOutput{
|
||||
Epoch: int64(head.Height()),
|
||||
Endpoint: "COMPETITION_TOTALS",
|
||||
Payload: grandTotals,
|
||||
},
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(outUnfilteredStatsFd).Encode(
|
||||
competitionTotalOutput{
|
||||
Epoch: int64(head.Height()),
|
||||
Endpoint: "NETWORK_WIDE_TOTALS",
|
||||
Payload: unfilteredGrandTotals,
|
||||
},
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//
|
||||
// write out client_stats.json
|
||||
for _, ps := range projStats {
|
||||
ps.NumCids = len(ps.cidDeals)
|
||||
ps.NumProviders = len(ps.dataPerProvider)
|
||||
for _, dealsForCid := range ps.cidDeals {
|
||||
if ps.HighestCidDealCount < dealsForCid {
|
||||
ps.HighestCidDealCount = dealsForCid
|
||||
}
|
||||
}
|
||||
for _, dataForProvider := range ps.dataPerProvider {
|
||||
if ps.DataSizeMaxProvider < dataForProvider {
|
||||
ps.DataSizeMaxProvider = dataForProvider
|
||||
}
|
||||
}
|
||||
|
||||
for _, cs := range ps.ClientStats {
|
||||
cs.NumCids = len(cs.cids)
|
||||
cs.NumProviders = len(cs.providers)
|
||||
}
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(outClientStatsFd).Encode(
|
||||
projectAggregateStatsOutput{
|
||||
Epoch: int64(head.Height()),
|
||||
Endpoint: "PROJECT_DEAL_STATS",
|
||||
Payload: projStats,
|
||||
},
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
1
go.mod
1
go.mod
@ -8,6 +8,7 @@ require (
|
||||
github.com/BurntSushi/toml v0.3.1
|
||||
github.com/GeertJohan/go.rice v1.0.0
|
||||
github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee
|
||||
github.com/Jeffail/gabs v1.4.0
|
||||
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
|
||||
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
|
||||
github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129
|
||||
|
2
go.sum
2
go.sum
@ -40,6 +40,8 @@ github.com/GeertJohan/go.rice v1.0.0 h1:KkI6O9uMaQU3VEKaj01ulavtF7o1fWT7+pk/4voi
|
||||
github.com/GeertJohan/go.rice v1.0.0/go.mod h1:eH6gbSOAUv07dQuZVnBmoDP8mgsM1rtixis4Tib9if0=
|
||||
github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee h1:8doiS7ib3zi6/K172oDhSKU0dJ/miJramo9NITOMyZQ=
|
||||
github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee/go.mod h1:W0GbEAA4uFNYOGG2cJpmFJ04E6SD1NLELPYZB57/7AY=
|
||||
github.com/Jeffail/gabs v1.4.0 h1://5fYRRTq1edjfIrQGvdkcd22pkYUrHZ5YC/H2GJVAo=
|
||||
github.com/Jeffail/gabs v1.4.0/go.mod h1:6xMvQMK4k33lb7GUUpaAPh6nKMmemQeg5d4gn7/bOXc=
|
||||
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
|
||||
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
|
||||
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
|
||||
|
Loading…
Reference in New Issue
Block a user