Merge remote-tracking branch 'origin/master' into feat/async-restartable-workers
This commit is contained in:
commit
f57652524c
@ -76,6 +76,9 @@ type FullNode interface {
|
||||
// blockstore and returns raw bytes.
|
||||
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
|
||||
|
||||
// ChainDeleteObj deletes node referenced by the given CID
|
||||
ChainDeleteObj(context.Context, cid.Cid) error
|
||||
|
||||
// ChainHasObj checks if a given CID exists in the chain blockstore.
|
||||
ChainHasObj(context.Context, cid.Cid) (bool, error)
|
||||
|
||||
|
@ -87,6 +87,7 @@ type FullNodeStruct struct {
|
||||
ChainGetParentMessages func(context.Context, cid.Cid) ([]api.Message, error) `perm:"read"`
|
||||
ChainGetTipSetByHeight func(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) `perm:"read"`
|
||||
ChainReadObj func(context.Context, cid.Cid) ([]byte, error) `perm:"read"`
|
||||
ChainDeleteObj func(context.Context, cid.Cid) error `perm:"admin"`
|
||||
ChainHasObj func(context.Context, cid.Cid) (bool, error) `perm:"read"`
|
||||
ChainStatObj func(context.Context, cid.Cid, cid.Cid) (api.ObjStat, error) `perm:"read"`
|
||||
ChainSetHead func(context.Context, types.TipSetKey) error `perm:"admin"`
|
||||
@ -673,6 +674,10 @@ func (c *FullNodeStruct) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte,
|
||||
return c.Internal.ChainReadObj(ctx, obj)
|
||||
}
|
||||
|
||||
func (c *FullNodeStruct) ChainDeleteObj(ctx context.Context, obj cid.Cid) error {
|
||||
return c.Internal.ChainDeleteObj(ctx, obj)
|
||||
}
|
||||
|
||||
func (c *FullNodeStruct) ChainHasObj(ctx context.Context, o cid.Cid) (bool, error) {
|
||||
return c.Internal.ChainHasObj(ctx, o)
|
||||
}
|
||||
|
@ -36,4 +36,9 @@ type State interface {
|
||||
NetworkName() (dtypes.NetworkName, error)
|
||||
|
||||
ForEachActor(func(id abi.ActorID, address address.Address) error) error
|
||||
|
||||
// Remove exists to support tooling that manipulates state for testing.
|
||||
// It should not be used in production code, as init actor entries are
|
||||
// immutable.
|
||||
Remove(addrs ...address.Address) error
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
init_ "github.com/filecoin-project/specs-actors/actors/builtin/init"
|
||||
|
||||
@ -46,3 +47,21 @@ func (s *state0) ForEachActor(cb func(id abi.ActorID, address address.Address) e
|
||||
func (s *state0) NetworkName() (dtypes.NetworkName, error) {
|
||||
return dtypes.NetworkName(s.State.NetworkName), nil
|
||||
}
|
||||
|
||||
func (s *state0) Remove(addrs ...address.Address) (err error) {
|
||||
m, err := adt0.AsMap(s.store, s.State.AddressMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, addr := range addrs {
|
||||
if err = m.Delete(abi.AddrKey(addr)); err != nil {
|
||||
return xerrors.Errorf("failed to delete entry for address: %s; err: %w", addr, err)
|
||||
}
|
||||
}
|
||||
amr, err := m.Root()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to get address map root: %w", err)
|
||||
}
|
||||
s.State.AddressMap = amr
|
||||
return nil
|
||||
}
|
||||
|
@ -186,9 +186,9 @@ func (s *state0) LoadSectors(snos *bitfield.BitField) ([]*SectorOnChainInfo, err
|
||||
if snos == nil {
|
||||
infos := make([]*SectorOnChainInfo, 0, sectors.Length())
|
||||
var info0 miner0.SectorOnChainInfo
|
||||
if err := sectors.ForEach(&info0, func(i int64) error {
|
||||
if err := sectors.ForEach(&info0, func(_ int64) error {
|
||||
info := fromV0SectorOnChainInfo(info0)
|
||||
infos[i] = &info
|
||||
infos = append(infos, &info)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/helpers"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
@ -357,6 +358,12 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque
|
||||
return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// Note: this will become just stream.Close once we've completed the go-libp2p migration to
|
||||
// go-libp2p-core 0.7.0
|
||||
go helpers.FullClose(stream) //nolint:errcheck
|
||||
}()
|
||||
|
||||
// Write request.
|
||||
_ = stream.SetWriteDeadline(time.Now().Add(WriteReqDeadline))
|
||||
if err := cborutil.WriteCborRPC(stream, req); err != nil {
|
||||
|
@ -40,7 +40,7 @@ const (
|
||||
WriteReqDeadline = 5 * time.Second
|
||||
ReadResDeadline = WriteReqDeadline
|
||||
ReadResMinSpeed = 50 << 10
|
||||
ShufflePeersPrefix = 5
|
||||
ShufflePeersPrefix = 16
|
||||
WriteResDeadline = 60 * time.Second
|
||||
)
|
||||
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/libp2p/go-libp2p-core/helpers"
|
||||
inet "github.com/libp2p/go-libp2p-core/network"
|
||||
)
|
||||
|
||||
@ -39,7 +40,9 @@ func (s *server) HandleStream(stream inet.Stream) {
|
||||
ctx, span := trace.StartSpan(context.Background(), "chainxchg.HandleStream")
|
||||
defer span.End()
|
||||
|
||||
defer stream.Close() //nolint:errcheck
|
||||
// Note: this will become just stream.Close once we've completed the go-libp2p migration to
|
||||
// go-libp2p-core 0.7.0
|
||||
defer helpers.FullClose(stream) //nolint:errcheck
|
||||
|
||||
var req Request
|
||||
if err := cborutil.ReadCborRPC(bufio.NewReader(stream), &req); err != nil {
|
||||
|
135
chain/sync.go
135
chain/sync.go
@ -7,7 +7,6 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@ -62,20 +61,12 @@ var (
|
||||
// where the Syncer publishes candidate chain heads to be synced.
|
||||
LocalIncoming = "incoming"
|
||||
|
||||
log = logging.Logger("chain")
|
||||
defaultMessageFetchWindowSize = 200
|
||||
)
|
||||
log = logging.Logger("chain")
|
||||
|
||||
func init() {
|
||||
if s := os.Getenv("LOTUS_BSYNC_MSG_WINDOW"); s != "" {
|
||||
val, err := strconv.Atoi(s)
|
||||
if err != nil {
|
||||
log.Errorf("failed to parse LOTUS_BSYNC_MSG_WINDOW: %s", err)
|
||||
return
|
||||
}
|
||||
defaultMessageFetchWindowSize = val
|
||||
}
|
||||
}
|
||||
concurrentSyncRequests = exchange.ShufflePeersPrefix
|
||||
syncRequestBatchSize = 8
|
||||
syncRequestRetries = 5
|
||||
)
|
||||
|
||||
// Syncer is in charge of running the chain synchronization logic. As such, it
|
||||
// is tasked with these functions, amongst others:
|
||||
@ -131,8 +122,6 @@ type Syncer struct {
|
||||
|
||||
verifier ffiwrapper.Verifier
|
||||
|
||||
windowSize int
|
||||
|
||||
tickerCtxCancel context.CancelFunc
|
||||
|
||||
checkptLk sync.Mutex
|
||||
@ -174,7 +163,6 @@ func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.C
|
||||
receiptTracker: newBlockReceiptTracker(),
|
||||
connmgr: connmgr,
|
||||
verifier: verifier,
|
||||
windowSize: defaultMessageFetchWindowSize,
|
||||
|
||||
incoming: pubsub.New(50),
|
||||
}
|
||||
@ -1481,8 +1469,6 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS
|
||||
|
||||
span.AddAttributes(trace.Int64Attribute("num_headers", int64(len(headers))))
|
||||
|
||||
windowSize := syncer.windowSize
|
||||
mainLoop:
|
||||
for i := len(headers) - 1; i >= 0; {
|
||||
fts, err := syncer.store.TryFillTipSet(headers[i])
|
||||
if err != nil {
|
||||
@ -1496,35 +1482,20 @@ mainLoop:
|
||||
continue
|
||||
}
|
||||
|
||||
batchSize := windowSize
|
||||
batchSize := concurrentSyncRequests * syncRequestBatchSize
|
||||
if i < batchSize {
|
||||
batchSize = i
|
||||
batchSize = i + 1
|
||||
}
|
||||
|
||||
nextI := (i + 1) - batchSize // want to fetch batchSize values, 'i' points to last one we want to fetch, so its 'inclusive' of our request, thus we need to add one to our request start index
|
||||
|
||||
ss.SetStage(api.StageFetchingMessages)
|
||||
var bstout []*exchange.CompactedMessages
|
||||
for len(bstout) < batchSize {
|
||||
next := headers[nextI]
|
||||
|
||||
nreq := batchSize - len(bstout)
|
||||
bstips, err := syncer.Exchange.GetChainMessages(ctx, next, uint64(nreq))
|
||||
if err != nil {
|
||||
// TODO check errors for temporary nature
|
||||
if windowSize > 1 {
|
||||
windowSize /= 2
|
||||
log.Infof("error fetching messages: %s; reducing window size to %d and trying again", err, windowSize)
|
||||
continue mainLoop
|
||||
}
|
||||
return xerrors.Errorf("message processing failed: %w", err)
|
||||
}
|
||||
|
||||
bstout = append(bstout, bstips...)
|
||||
nextI += len(bstips)
|
||||
}
|
||||
startOffset := i + 1 - batchSize
|
||||
bstout, batchErr := syncer.fetchMessages(ctx, headers[startOffset:startOffset+batchSize], startOffset)
|
||||
ss.SetStage(api.StageMessages)
|
||||
|
||||
if batchErr != nil {
|
||||
return xerrors.Errorf("failed to fetch messages: %w", err)
|
||||
}
|
||||
|
||||
for bsi := 0; bsi < len(bstout); bsi++ {
|
||||
// temp storage so we don't persist data we dont want to
|
||||
bs := bstore.NewTemporary()
|
||||
@ -1553,26 +1524,78 @@ mainLoop:
|
||||
}
|
||||
}
|
||||
|
||||
if i >= windowSize {
|
||||
newWindowSize := windowSize + 10
|
||||
if newWindowSize > int(exchange.MaxRequestLength) {
|
||||
newWindowSize = int(exchange.MaxRequestLength)
|
||||
}
|
||||
if newWindowSize > windowSize {
|
||||
windowSize = newWindowSize
|
||||
log.Infof("successfully fetched %d messages; increasing window size to %d", len(bstout), windowSize)
|
||||
}
|
||||
}
|
||||
|
||||
i -= batchSize
|
||||
}
|
||||
|
||||
// remember our window size
|
||||
syncer.windowSize = windowSize
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet, startOffset int) ([]*exchange.CompactedMessages, error) {
|
||||
batchSize := len(headers)
|
||||
batch := make([]*exchange.CompactedMessages, batchSize)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var mx sync.Mutex
|
||||
var batchErr error
|
||||
|
||||
start := build.Clock.Now()
|
||||
|
||||
for j := 0; j < batchSize; j += syncRequestBatchSize {
|
||||
wg.Add(1)
|
||||
go func(j int) {
|
||||
defer wg.Done()
|
||||
|
||||
nreq := syncRequestBatchSize
|
||||
if j+nreq > batchSize {
|
||||
nreq = batchSize - j
|
||||
}
|
||||
|
||||
failed := false
|
||||
for offset := 0; !failed && offset < nreq; {
|
||||
nextI := j + offset
|
||||
nextHeader := headers[nextI]
|
||||
|
||||
var requestErr error
|
||||
var requestResult []*exchange.CompactedMessages
|
||||
for retry := 0; requestResult == nil && retry < syncRequestRetries; retry++ {
|
||||
if retry > 0 {
|
||||
log.Infof("fetching messages at %d (retry %d)", startOffset+nextI, retry)
|
||||
} else {
|
||||
log.Infof("fetching messages at %d", startOffset+nextI)
|
||||
}
|
||||
|
||||
result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset))
|
||||
if err != nil {
|
||||
requestErr = multierror.Append(requestErr, err)
|
||||
} else {
|
||||
requestResult = result
|
||||
}
|
||||
}
|
||||
|
||||
mx.Lock()
|
||||
if requestResult != nil {
|
||||
copy(batch[j+offset:], requestResult)
|
||||
offset += len(requestResult)
|
||||
} else {
|
||||
log.Errorf("error fetching messages at %d: %s", nextI, requestErr)
|
||||
batchErr = multierror.Append(batchErr, requestErr)
|
||||
failed = true
|
||||
}
|
||||
mx.Unlock()
|
||||
}
|
||||
}(j)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if batchErr != nil {
|
||||
return nil, batchErr
|
||||
}
|
||||
|
||||
log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, build.Clock.Since(start))
|
||||
|
||||
return batch, nil
|
||||
}
|
||||
|
||||
func persistMessages(bs bstore.Blockstore, bst *exchange.CompactedMessages) error {
|
||||
for _, m := range bst.Bls {
|
||||
//log.Infof("putting BLS message: %s", m.Cid())
|
||||
|
38
cli/chain.go
38
cli/chain.go
@ -40,6 +40,7 @@ var chainCmd = &cli.Command{
|
||||
chainHeadCmd,
|
||||
chainGetBlock,
|
||||
chainReadObjCmd,
|
||||
chainDeleteObjCmd,
|
||||
chainStatObjCmd,
|
||||
chainGetMsgCmd,
|
||||
chainSetHeadCmd,
|
||||
@ -193,6 +194,43 @@ var chainReadObjCmd = &cli.Command{
|
||||
},
|
||||
}
|
||||
|
||||
var chainDeleteObjCmd = &cli.Command{
|
||||
Name: "delete-obj",
|
||||
Usage: "Delete an object from the chain blockstore",
|
||||
Description: "WARNING: Removing wrong objects from the chain blockstore may lead to sync issues",
|
||||
ArgsUsage: "[objectCid]",
|
||||
Flags: []cli.Flag{
|
||||
&cli.BoolFlag{
|
||||
Name: "really-do-it",
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := GetFullNodeAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
ctx := ReqContext(cctx)
|
||||
|
||||
c, err := cid.Decode(cctx.Args().First())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse cid input: %s", err)
|
||||
}
|
||||
|
||||
if !cctx.Bool("really-do-it") {
|
||||
return xerrors.Errorf("pass the --really-do-it flag to proceed")
|
||||
}
|
||||
|
||||
err = api.ChainDeleteObj(ctx, c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Printf("Obj %s deleted\n", c.String())
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var chainStatObjCmd = &cli.Command{
|
||||
Name: "stat-obj",
|
||||
Usage: "Collect size and ipld link counts for objs",
|
||||
|
265
cmd/lotus-shed/dealtracker.go
Normal file
265
cmd/lotus-shed/dealtracker.go
Normal file
@ -0,0 +1,265 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
type dealStatsServer struct {
|
||||
api api.FullNode
|
||||
}
|
||||
|
||||
var filteredClients map[address.Address]bool
|
||||
|
||||
func init() {
|
||||
fc := []string{"t0112", "t0113", "t0114", "t010089"}
|
||||
|
||||
filtered, set := os.LookupEnv("FILTERED_CLIENTS")
|
||||
if set {
|
||||
fc = strings.Split(filtered, ":")
|
||||
}
|
||||
|
||||
filteredClients = make(map[address.Address]bool)
|
||||
for _, a := range fc {
|
||||
addr, err := address.NewFromString(a)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
filteredClients[addr] = true
|
||||
}
|
||||
}
|
||||
|
||||
type dealCountResp struct {
|
||||
Total int64 `json:"total"`
|
||||
Epoch int64 `json:"epoch"`
|
||||
}
|
||||
|
||||
func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := context.Background()
|
||||
|
||||
head, err := dss.api.ChainHead(ctx)
|
||||
if err != nil {
|
||||
log.Warnf("failed to get chain head: %s", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
|
||||
if err != nil {
|
||||
log.Warnf("failed to get market deals: %s", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
var count int64
|
||||
for _, d := range deals {
|
||||
if !filteredClients[d.Proposal.Client] {
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(w).Encode(&dealCountResp{
|
||||
Total: count,
|
||||
Epoch: int64(head.Height()),
|
||||
}); err != nil {
|
||||
log.Warnf("failed to write back deal count response: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type dealAverageResp struct {
|
||||
AverageSize int64 `json:"average_size"`
|
||||
Epoch int64 `json:"epoch"`
|
||||
}
|
||||
|
||||
func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := context.Background()
|
||||
|
||||
head, err := dss.api.ChainHead(ctx)
|
||||
if err != nil {
|
||||
log.Warnf("failed to get chain head: %s", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
|
||||
if err != nil {
|
||||
log.Warnf("failed to get market deals: %s", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
var count int64
|
||||
var totalBytes int64
|
||||
for _, d := range deals {
|
||||
if !filteredClients[d.Proposal.Client] {
|
||||
count++
|
||||
totalBytes += int64(d.Proposal.PieceSize.Unpadded())
|
||||
}
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(w).Encode(&dealAverageResp{
|
||||
AverageSize: totalBytes / count,
|
||||
Epoch: int64(head.Height()),
|
||||
}); err != nil {
|
||||
log.Warnf("failed to write back deal average response: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type dealTotalResp struct {
|
||||
TotalBytes int64 `json:"total_size"`
|
||||
Epoch int64 `json:"epoch"`
|
||||
}
|
||||
|
||||
func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := context.Background()
|
||||
|
||||
head, err := dss.api.ChainHead(ctx)
|
||||
if err != nil {
|
||||
log.Warnf("failed to get chain head: %s", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
|
||||
if err != nil {
|
||||
log.Warnf("failed to get market deals: %s", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
var totalBytes int64
|
||||
for _, d := range deals {
|
||||
if !filteredClients[d.Proposal.Client] {
|
||||
totalBytes += int64(d.Proposal.PieceSize.Unpadded())
|
||||
}
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(w).Encode(&dealTotalResp{
|
||||
TotalBytes: totalBytes,
|
||||
Epoch: int64(head.Height()),
|
||||
}); err != nil {
|
||||
log.Warnf("failed to write back deal average response: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
type clientStatsOutput struct {
|
||||
Client address.Address `json:"client"`
|
||||
DataSize int64 `json:"data_size"`
|
||||
NumCids int `json:"num_cids"`
|
||||
NumDeals int `json:"num_deals"`
|
||||
NumMiners int `json:"num_miners"`
|
||||
|
||||
cids map[cid.Cid]bool
|
||||
providers map[address.Address]bool
|
||||
}
|
||||
|
||||
func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := context.Background()
|
||||
|
||||
head, err := dss.api.ChainHead(ctx)
|
||||
if err != nil {
|
||||
log.Warnf("failed to get chain head: %s", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
deals, err := dss.api.StateMarketDeals(ctx, head.Key())
|
||||
if err != nil {
|
||||
log.Warnf("failed to get market deals: %s", err)
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
stats := make(map[address.Address]*clientStatsOutput)
|
||||
|
||||
for _, d := range deals {
|
||||
if filteredClients[d.Proposal.Client] {
|
||||
continue
|
||||
}
|
||||
|
||||
st, ok := stats[d.Proposal.Client]
|
||||
if !ok {
|
||||
st = &clientStatsOutput{
|
||||
Client: d.Proposal.Client,
|
||||
cids: make(map[cid.Cid]bool),
|
||||
providers: make(map[address.Address]bool),
|
||||
}
|
||||
stats[d.Proposal.Client] = st
|
||||
}
|
||||
|
||||
st.DataSize += int64(d.Proposal.PieceSize.Unpadded())
|
||||
st.cids[d.Proposal.PieceCID] = true
|
||||
st.providers[d.Proposal.Provider] = true
|
||||
st.NumDeals++
|
||||
}
|
||||
|
||||
out := make([]*clientStatsOutput, 0, len(stats))
|
||||
for _, cso := range stats {
|
||||
cso.NumCids = len(cso.cids)
|
||||
cso.NumMiners = len(cso.providers)
|
||||
|
||||
out = append(out, cso)
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(w).Encode(out); err != nil {
|
||||
log.Warnf("failed to write back client stats response: %s", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
var serveDealStatsCmd = &cli.Command{
|
||||
Name: "serve-deal-stats",
|
||||
Flags: []cli.Flag{},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := lcli.GetFullNodeAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer closer()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
_ = ctx
|
||||
|
||||
dss := &dealStatsServer{api}
|
||||
|
||||
mux := &http.ServeMux{}
|
||||
mux.HandleFunc("/api/storagedeal/count", dss.handleStorageDealCount)
|
||||
mux.HandleFunc("/api/storagedeal/averagesize", dss.handleStorageDealAverageSize)
|
||||
mux.HandleFunc("/api/storagedeal/totalreal", dss.handleStorageDealTotalReal)
|
||||
mux.HandleFunc("/api/storagedeal/clientstats", dss.handleStorageClientStats)
|
||||
|
||||
s := &http.Server{
|
||||
Addr: ":7272",
|
||||
Handler: mux,
|
||||
}
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
if err := s.Shutdown(context.TODO()); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}()
|
||||
|
||||
list, err := net.Listen("tcp", ":7272") // nolint
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return s.Serve(list)
|
||||
},
|
||||
}
|
@ -36,6 +36,7 @@ func main() {
|
||||
mpoolStatsCmd,
|
||||
exportChainCmd,
|
||||
consensusCmd,
|
||||
serveDealStatsCmd,
|
||||
}
|
||||
|
||||
app := &cli.App{
|
||||
|
@ -404,8 +404,9 @@ var sectorsCapacityCollateralCmd = &cli.Command{
|
||||
}
|
||||
|
||||
var sectorsUpdateCmd = &cli.Command{
|
||||
Name: "update-state",
|
||||
Usage: "ADVANCED: manually update the state of a sector, this may aid in error recovery",
|
||||
Name: "update-state",
|
||||
Usage: "ADVANCED: manually update the state of a sector, this may aid in error recovery",
|
||||
ArgsUsage: "<sectorNum> <newState>",
|
||||
Flags: []cli.Flag{
|
||||
&cli.BoolFlag{
|
||||
Name: "really-do-it",
|
||||
@ -431,8 +432,13 @@ var sectorsUpdateCmd = &cli.Command{
|
||||
return xerrors.Errorf("could not parse sector number: %w", err)
|
||||
}
|
||||
|
||||
if _, ok := sealing.ExistSectorStateList[sealing.SectorState(cctx.Args().Get(1))]; !ok {
|
||||
return xerrors.Errorf("Not existing sector state")
|
||||
newState := cctx.Args().Get(1)
|
||||
if _, ok := sealing.ExistSectorStateList[sealing.SectorState(newState)]; !ok {
|
||||
fmt.Printf(" \"%s\" is not a valid state. Possible states for sectors are: \n", newState)
|
||||
for state := range sealing.ExistSectorStateList {
|
||||
fmt.Printf("%s\n", string(state))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return nodeApi.SectorsUpdate(ctx, abi.SectorNumber(id), api.SectorState(cctx.Args().Get(1)))
|
||||
|
@ -9,6 +9,7 @@
|
||||
* [Beacon](#Beacon)
|
||||
* [BeaconGetEntry](#BeaconGetEntry)
|
||||
* [Chain](#Chain)
|
||||
* [ChainDeleteObj](#ChainDeleteObj)
|
||||
* [ChainExport](#ChainExport)
|
||||
* [ChainGetBlock](#ChainGetBlock)
|
||||
* [ChainGetBlockMessages](#ChainGetBlockMessages)
|
||||
@ -281,6 +282,23 @@ The Chain method group contains methods for interacting with the
|
||||
blockchain, but that do not require any form of state computation.
|
||||
|
||||
|
||||
### ChainDeleteObj
|
||||
ChainDeleteObj deletes node referenced by the given CID
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
{
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### ChainExport
|
||||
ChainExport returns a stream of bytes with CAR dump of chain data.
|
||||
The exported chain data includes the header chain from the given tipset
|
||||
|
10
go.mod
10
go.mod
@ -25,9 +25,9 @@ require (
|
||||
github.com/filecoin-project/go-bitfield v0.2.0
|
||||
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
|
||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
|
||||
github.com/filecoin-project/go-data-transfer v0.6.4
|
||||
github.com/filecoin-project/go-data-transfer v0.6.5
|
||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f
|
||||
github.com/filecoin-project/go-fil-markets v0.6.1-0.20200917052354-ee0af754c6e9
|
||||
github.com/filecoin-project/go-fil-markets v0.6.2
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200822201400-474f4fdccc52
|
||||
github.com/filecoin-project/go-multistore v0.0.3
|
||||
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
|
||||
@ -60,7 +60,7 @@ require (
|
||||
github.com/ipfs/go-ds-measure v0.1.0
|
||||
github.com/ipfs/go-filestore v1.0.0
|
||||
github.com/ipfs/go-fs-lock v0.0.6
|
||||
github.com/ipfs/go-graphsync v0.1.2
|
||||
github.com/ipfs/go-graphsync v0.2.0
|
||||
github.com/ipfs/go-ipfs-blockstore v1.0.1
|
||||
github.com/ipfs/go-ipfs-chunker v0.0.5
|
||||
github.com/ipfs/go-ipfs-ds-help v1.0.0
|
||||
@ -77,8 +77,8 @@ require (
|
||||
github.com/ipfs/go-path v0.0.7
|
||||
github.com/ipfs/go-unixfs v0.2.4
|
||||
github.com/ipfs/interface-go-ipfs-core v0.2.3
|
||||
github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae
|
||||
github.com/ipld/go-ipld-prime v0.0.4-0.20200828224805-5ff8c8b0b6ef
|
||||
github.com/ipld/go-car v0.1.1-0.20200923150018-8cdef32e2da4
|
||||
github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f
|
||||
github.com/kelseyhightower/envconfig v1.4.0
|
||||
github.com/lib/pq v1.7.0
|
||||
github.com/libp2p/go-eventbus v0.2.1
|
||||
|
31
go.sum
31
go.sum
@ -222,13 +222,12 @@ github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:a
|
||||
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg=
|
||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
|
||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
|
||||
github.com/filecoin-project/go-data-transfer v0.6.3/go.mod h1:PmBKVXkhh67/tnEdJXQwDHl5mT+7Tbcwe1NPninqhnM=
|
||||
github.com/filecoin-project/go-data-transfer v0.6.4 h1:Q08ABa+cOTOLoAyHeA94fPLcwu53p6eeAaxMxQb0m0A=
|
||||
github.com/filecoin-project/go-data-transfer v0.6.4/go.mod h1:PmBKVXkhh67/tnEdJXQwDHl5mT+7Tbcwe1NPninqhnM=
|
||||
github.com/filecoin-project/go-data-transfer v0.6.5 h1:oP20la8Z0CLrw0uqvt6xVgw6rOevZeGJ9GNQeC0OCSU=
|
||||
github.com/filecoin-project/go-data-transfer v0.6.5/go.mod h1:I9Ylb/UiZyqnI41wUoCXq/le0nDLhlwpFQCtNPxEPOA=
|
||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1:GxJzR3oRIMTPtpZ0b7QF8FKPK6/iPAc7trhlL5k/g+s=
|
||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
|
||||
github.com/filecoin-project/go-fil-markets v0.6.1-0.20200917052354-ee0af754c6e9 h1:SnCUC9wHDId9TtV8PsQp8q1OOsi+NOLOwitIDnAgUa4=
|
||||
github.com/filecoin-project/go-fil-markets v0.6.1-0.20200917052354-ee0af754c6e9/go.mod h1:PLr9svZxsnHkae1Ky7+66g7fP9AlneVxIVu+oSMq56A=
|
||||
github.com/filecoin-project/go-fil-markets v0.6.2 h1:9Z57KeaQSa1liCmT1pH6SIjrn9mGTDFJXmR2WQVuaiY=
|
||||
github.com/filecoin-project/go-fil-markets v0.6.2/go.mod h1:wtN4Hc/1hoVCpWhSWYxwUxH3PQtjSkWWuC1nQjiIWog=
|
||||
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
|
||||
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
|
||||
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
|
||||
@ -396,9 +395,8 @@ github.com/gxed/go-shellwords v1.0.3/go.mod h1:N7paucT91ByIjmVJHhvoarjoQnmsi3Jd3
|
||||
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
|
||||
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
|
||||
github.com/gxed/pubsub v0.0.0-20180201040156-26ebdf44f824/go.mod h1:OiEWyHgK+CWrmOlVquHaIK1vhpUJydC9m0Je6mhaiNE=
|
||||
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099/go.mod h1:WVPCl0HO/0RAL5+vBH2GMxBomlxBF70MAS78+Lu1//k=
|
||||
github.com/hannahhoward/cbor-gen-for v0.0.0-20200723175505-5892b522820a h1:wfqh5oiHXvn3Rk54xy8Cwqh+HnYihGnjMNzdNb3/ld0=
|
||||
github.com/hannahhoward/cbor-gen-for v0.0.0-20200723175505-5892b522820a/go.mod h1:jvfsLIxk0fY/2BKSQ1xf2406AKA5dwMmKKv0ADcOfN8=
|
||||
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1 h1:F9k+7wv5OIk1zcq23QpdiL0hfDuXPjuOmMNaC6fgQ0Q=
|
||||
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1/go.mod h1:jvfsLIxk0fY/2BKSQ1xf2406AKA5dwMmKKv0ADcOfN8=
|
||||
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e h1:3YKHER4nmd7b5qy5t0GWDTwSn4OyRgfAXSmo6VnryBY=
|
||||
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e/go.mod h1:I8h3MITA53gN9OnWGCgaMa0JWVRdXthWw4M3CPM54OY=
|
||||
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
|
||||
@ -504,8 +502,8 @@ github.com/ipfs/go-filestore v1.0.0/go.mod h1:/XOCuNtIe2f1YPbiXdYvD0BKLA0JR1MgPi
|
||||
github.com/ipfs/go-fs-lock v0.0.6 h1:sn3TWwNVQqSeNjlWy6zQ1uUGAZrV3hPOyEA6y1/N2a0=
|
||||
github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28L7zESmM=
|
||||
github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
|
||||
github.com/ipfs/go-graphsync v0.1.2 h1:25Ll9kIXCE+DY0dicvfS3KMw+U5sd01b/FJbA7KAbhg=
|
||||
github.com/ipfs/go-graphsync v0.1.2/go.mod h1:sLXVXm1OxtE2XYPw62MuXCdAuNwkAdsbnfrmos5odbA=
|
||||
github.com/ipfs/go-graphsync v0.2.0 h1:x94MvHLNuRwBlZzVal7tR1RYK7T7H6bqQLPopxDbIF0=
|
||||
github.com/ipfs/go-graphsync v0.2.0/go.mod h1:gEBvJUNelzMkaRPJTpg/jaKN4AQW/7wDWu0K92D8o10=
|
||||
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
|
||||
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
|
||||
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
|
||||
@ -609,14 +607,14 @@ github.com/ipfs/iptb v1.4.0 h1:YFYTrCkLMRwk/35IMyC6+yjoQSHTEcNcefBStLJzgvo=
|
||||
github.com/ipfs/iptb v1.4.0/go.mod h1:1rzHpCYtNp87/+hTxG5TfCVn/yMY3dKnLn8tBiMfdmg=
|
||||
github.com/ipfs/iptb-plugins v0.2.1 h1:au4HWn9/pRPbkxA08pDx2oRAs4cnbgQWgV0teYXuuGA=
|
||||
github.com/ipfs/iptb-plugins v0.2.1/go.mod h1:QXMbtIWZ+jRsW8a4h13qAKU7jcM7qaittO8wOsTP0Rs=
|
||||
github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae h1:OV9dxl8iPMCOD8Vi/hvFwRh3JWPXqmkYSVxWr9JnEzM=
|
||||
github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae/go.mod h1:2mvxpu4dKRnuH3mj5u6KW/tmRSCcXvy/KYiJ4nC6h4c=
|
||||
github.com/ipld/go-car v0.1.1-0.20200923150018-8cdef32e2da4 h1:6phjU3kXvCEWOZpu+Ob0w6DzgPFZmDLgLPxJhD8RxEY=
|
||||
github.com/ipld/go-car v0.1.1-0.20200923150018-8cdef32e2da4/go.mod h1:xrMEcuSq+D1vEwl+YAXsg/JfA98XGpXDwnkIL4Aimqw=
|
||||
github.com/ipld/go-ipld-prime v0.0.2-0.20200428162820-8b59dc292b8e/go.mod h1:uVIwe/u0H4VdKv3kaN1ck7uCb6yD9cFLS9/ELyXbsw8=
|
||||
github.com/ipld/go-ipld-prime v0.0.4-0.20200828224805-5ff8c8b0b6ef h1:/yPelt/0CuzZsmRkYzBBnJ499JnAOGaIaAXHujx96ic=
|
||||
github.com/ipld/go-ipld-prime v0.0.4-0.20200828224805-5ff8c8b0b6ef/go.mod h1:uVIwe/u0H4VdKv3kaN1ck7uCb6yD9cFLS9/ELyXbsw8=
|
||||
github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f h1:XpOuNQ5GbXxUcSukbQcW9jkE7REpaFGJU2/T00fo9kA=
|
||||
github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM=
|
||||
github.com/ipld/go-ipld-prime-proto v0.0.0-20200428191222-c1ffdadc01e1/go.mod h1:OAV6xBmuTLsPZ+epzKkPB1e25FHk/vCtyatkdHcArLs=
|
||||
github.com/ipld/go-ipld-prime-proto v0.0.0-20200828231332-ae0aea07222b h1:ZtlW6pubN17TDaStlxgrwEXXwwUfJaXu9RobwczXato=
|
||||
github.com/ipld/go-ipld-prime-proto v0.0.0-20200828231332-ae0aea07222b/go.mod h1:OAV6xBmuTLsPZ+epzKkPB1e25FHk/vCtyatkdHcArLs=
|
||||
github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6 h1:6Mq+tZGSEMEoJJ1NbJRhddeelkXZcU8yfH/ZRYUo/Es=
|
||||
github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6/go.mod h1:3pHYooM9Ea65jewRwrb2u5uHZCNkNTe9ABsVB+SrkH0=
|
||||
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
|
||||
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
|
||||
github.com/jackpal/gateway v1.0.4/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
|
||||
@ -1362,7 +1360,6 @@ github.com/weaveworks/promrus v1.2.0/go.mod h1:SaE82+OJ91yqjrE1rsvBWVzNZKcHYFtMU
|
||||
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc/go.mod h1:r45hJU7yEoA81k6MWNhpMj/kms0n14dkzkxYHoB96UM=
|
||||
github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba h1:X4n8JG2e2biEZZXdBKt9HX7DN3bYGFUqljqqy0DqgnY=
|
||||
github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba/go.mod h1:CHQnYnQUEPydYCwuy8lmTHfGmdw9TKrhWV0xLx8l0oM=
|
||||
github.com/whyrusleeping/cbor-gen v0.0.0-20191212224538-d370462a7e8a/go.mod h1:xdlJQaiqipF0HW+Mzpg7XRM3fWbGvfgFlcppuvlkIvY=
|
||||
github.com/whyrusleeping/cbor-gen v0.0.0-20191216205031-b047b6acb3c0/go.mod h1:xdlJQaiqipF0HW+Mzpg7XRM3fWbGvfgFlcppuvlkIvY=
|
||||
github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI=
|
||||
github.com/whyrusleeping/cbor-gen v0.0.0-20200402171437-3d27c146c105/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI=
|
||||
|
@ -711,7 +711,7 @@ func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath stri
|
||||
|
||||
// TODO: does that defer mean to remove the whole blockstore?
|
||||
defer bufferedDS.Remove(ctx, c) //nolint:errcheck
|
||||
ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any)
|
||||
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
|
||||
|
||||
// entire DAG selector
|
||||
allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(),
|
||||
|
@ -197,6 +197,10 @@ func (a *ChainAPI) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, error
|
||||
return blk.RawData(), nil
|
||||
}
|
||||
|
||||
func (a *ChainAPI) ChainDeleteObj(ctx context.Context, obj cid.Cid) error {
|
||||
return a.Chain.Blockstore().DeleteBlock(obj)
|
||||
}
|
||||
|
||||
func (a *ChainAPI) ChainHasObj(ctx context.Context, obj cid.Cid) (bool, error) {
|
||||
return a.Chain.Blockstore().Has(obj)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user