From 64d150e215256cab04c0eb52819ca2675330b151 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 10 Sep 2020 10:35:06 -0700 Subject: [PATCH 01/25] feat(cli): add chain delete cmd --- api/api_full.go | 3 +++ api/apistruct/struct.go | 5 +++++ cli/chain.go | 27 +++++++++++++++++++++++++++ node/impl/full/chain.go | 4 ++++ 4 files changed, 39 insertions(+) diff --git a/api/api_full.go b/api/api_full.go index 02417bf78..3dde0e517 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -75,6 +75,9 @@ type FullNode interface { // blockstore and returns raw bytes. ChainReadObj(context.Context, cid.Cid) ([]byte, error) + // ChainDeleteObj deletes node referenced by the given CID + ChainDeleteObj(context.Context, cid.Cid) error + // ChainHasObj checks if a given CID exists in the chain blockstore. ChainHasObj(context.Context, cid.Cid) (bool, error) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index ffb837785..172156c42 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -85,6 +85,7 @@ type FullNodeStruct struct { ChainGetParentMessages func(context.Context, cid.Cid) ([]api.Message, error) `perm:"read"` ChainGetTipSetByHeight func(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) `perm:"read"` ChainReadObj func(context.Context, cid.Cid) ([]byte, error) `perm:"read"` + ChainDeleteObj func(context.Context, cid.Cid) error `perm:"admin"` ChainHasObj func(context.Context, cid.Cid) (bool, error) `perm:"read"` ChainStatObj func(context.Context, cid.Cid, cid.Cid) (api.ObjStat, error) `perm:"read"` ChainSetHead func(context.Context, types.TipSetKey) error `perm:"admin"` @@ -658,6 +659,10 @@ func (c *FullNodeStruct) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, return c.Internal.ChainReadObj(ctx, obj) } +func (c *FullNodeStruct) ChainDeleteObj(ctx context.Context, obj cid.Cid) error { + return c.Internal.ChainDeleteObj(ctx, obj) +} + func (c *FullNodeStruct) ChainHasObj(ctx context.Context, o cid.Cid) (bool, error) { return c.Internal.ChainHasObj(ctx, o) } diff --git a/cli/chain.go b/cli/chain.go index ce1660641..1c63e37a8 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -193,6 +193,33 @@ var chainReadObjCmd = &cli.Command{ }, } +var chainDeleteObjCmd = &cli.Command{ + Name: "delete-obj", + Usage: "Delete an object", + ArgsUsage: "[objectCid]", + Action: func(cctx *cli.Context) error { + api, closer, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + c, err := cid.Decode(cctx.Args().First()) + if err != nil { + return fmt.Errorf("failed to parse cid input: %s", err) + } + + err = api.ChainDeleteObj(ctx, c) + if err != nil { + return err + } + + fmt.Printf("Obj %s deleted\n", c.String()) + return nil + }, +} + var chainStatObjCmd = &cli.Command{ Name: "stat-obj", Usage: "Collect size and ipld link counts for objs", diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index c5dd5c9a9..84335280d 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -197,6 +197,10 @@ func (a *ChainAPI) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, error return blk.RawData(), nil } +func (a *ChainAPI) ChainDeleteObj(ctx context.Context, obj cid.Cid) error { + return a.Chain.Blockstore().DeleteBlock(obj) +} + func (a *ChainAPI) ChainHasObj(ctx context.Context, obj cid.Cid) (bool, error) { return a.Chain.Blockstore().Has(obj) } From e86a74156e85c520b7ef63d885ee7b3874815340 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 10 Sep 2020 10:39:17 -0700 Subject: [PATCH 02/25] feat(cli): add command to list --- cli/chain.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cli/chain.go b/cli/chain.go index 1c63e37a8..0f1e36518 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -40,6 +40,7 @@ var chainCmd = &cli.Command{ chainHeadCmd, chainGetBlock, chainReadObjCmd, + chainDeleteObjCmd, chainStatObjCmd, chainGetMsgCmd, chainSetHeadCmd, From 5e445f5a4803abfbffa977b9f52da5f72dd94bc9 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 10 Sep 2020 10:47:19 -0700 Subject: [PATCH 03/25] docs(apidocs): run docs gen --- documentation/en/api-methods.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/documentation/en/api-methods.md b/documentation/en/api-methods.md index 49582d6f8..203a5f141 100644 --- a/documentation/en/api-methods.md +++ b/documentation/en/api-methods.md @@ -9,6 +9,7 @@ * [Beacon](#Beacon) * [BeaconGetEntry](#BeaconGetEntry) * [Chain](#Chain) + * [ChainDeleteObj](#ChainDeleteObj) * [ChainExport](#ChainExport) * [ChainGetBlock](#ChainGetBlock) * [ChainGetBlockMessages](#ChainGetBlockMessages) @@ -279,6 +280,23 @@ The Chain method group contains methods for interacting with the blockchain, but that do not require any form of state computation. +### ChainDeleteObj +ChainDeleteObj deletes node referenced by the given CID + + +Perms: admin + +Inputs: +```json +[ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + } +] +``` + +Response: `{}` + ### ChainExport ChainExport returns a stream of bytes with CAR dump of chain data. The exported chain data includes the header chain from the given tipset From 61c0b8c3db61ccb1d97fe05fd856c74fda0d0451 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 16 Sep 2020 10:14:28 +0300 Subject: [PATCH 04/25] properly close streams in blocksync we were leaking streams right and left... --- chain/exchange/client.go | 7 +++++++ chain/exchange/server.go | 5 ++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/chain/exchange/client.go b/chain/exchange/client.go index 22f7a9457..57563d5b2 100644 --- a/chain/exchange/client.go +++ b/chain/exchange/client.go @@ -7,6 +7,7 @@ import ( "math/rand" "time" + "github.com/libp2p/go-libp2p-core/helpers" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -357,6 +358,12 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque return nil, xerrors.Errorf("failed to open stream to peer: %w", err) } + defer func() { + // Note: this will become just stream.Close once we've completed the go-libp2p migration to + // go-libp2p-core 0.7.0 + go helpers.FullClose(stream) //nolint:errcheck + }() + // Write request. _ = stream.SetWriteDeadline(time.Now().Add(WriteReqDeadline)) if err := cborutil.WriteCborRPC(stream, req); err != nil { diff --git a/chain/exchange/server.go b/chain/exchange/server.go index 54e169b3f..dcdb5b3a5 100644 --- a/chain/exchange/server.go +++ b/chain/exchange/server.go @@ -15,6 +15,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-core/helpers" inet "github.com/libp2p/go-libp2p-core/network" ) @@ -39,7 +40,9 @@ func (s *server) HandleStream(stream inet.Stream) { ctx, span := trace.StartSpan(context.Background(), "chainxchg.HandleStream") defer span.End() - defer stream.Close() //nolint:errcheck + // Note: this will become just stream.Close once we've completed the go-libp2p migration to + // go-libp2p-core 0.7.0 + defer helpers.FullClose(stream) //nolint:errcheck var req Request if err := cborutil.ReadCborRPC(bufio.NewReader(stream), &req); err != nil { From 35f6e1064620f05f29f8ec775f453b2e56c3e9ae Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 16 Sep 2020 21:04:44 +0300 Subject: [PATCH 05/25] parallel chain sync --- chain/sync.go | 114 +++++++++++++++++++++++++++----------------------- 1 file changed, 62 insertions(+), 52 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index b5716a343..882fb1d95 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -7,7 +7,6 @@ import ( "fmt" "os" "sort" - "strconv" "strings" "sync" "time" @@ -63,20 +62,12 @@ var ( // where the Syncer publishes candidate chain heads to be synced. LocalIncoming = "incoming" - log = logging.Logger("chain") - defaultMessageFetchWindowSize = 200 -) + log = logging.Logger("chain") -func init() { - if s := os.Getenv("LOTUS_BSYNC_MSG_WINDOW"); s != "" { - val, err := strconv.Atoi(s) - if err != nil { - log.Errorf("failed to parse LOTUS_BSYNC_MSG_WINDOW: %s", err) - return - } - defaultMessageFetchWindowSize = val - } -} + concurrentSyncRequests = 16 + syncRequestBatchSize = 4 + syncRequestRetries = 5 +) // Syncer is in charge of running the chain synchronization logic. As such, it // is tasked with these functions, amongst others: @@ -132,8 +123,6 @@ type Syncer struct { verifier ffiwrapper.Verifier - windowSize int - tickerCtxCancel context.CancelFunc checkptLk sync.Mutex @@ -175,7 +164,6 @@ func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.C receiptTracker: newBlockReceiptTracker(), connmgr: connmgr, verifier: verifier, - windowSize: defaultMessageFetchWindowSize, incoming: pubsub.New(50), } @@ -1483,8 +1471,6 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS span.AddAttributes(trace.Int64Attribute("num_headers", int64(len(headers)))) - windowSize := syncer.windowSize -mainLoop: for i := len(headers) - 1; i >= 0; { fts, err := syncer.store.TryFillTipSet(headers[i]) if err != nil { @@ -1498,35 +1484,73 @@ mainLoop: continue } - batchSize := windowSize + batchSize := concurrentSyncRequests * syncRequestBatchSize if i < batchSize { - batchSize = i + if i == 0 { + batchSize = 1 + } else { + batchSize = i + } } - nextI := (i + 1) - batchSize // want to fetch batchSize values, 'i' points to last one we want to fetch, so its 'inclusive' of our request, thus we need to add one to our request start index - ss.SetStage(api.StageFetchingMessages) - var bstout []*exchange.CompactedMessages - for len(bstout) < batchSize { - next := headers[nextI] + bstout := make([]*exchange.CompactedMessages, batchSize) + var wg sync.WaitGroup + var mx sync.Mutex + var batchErr error + for j := 0; j < batchSize; j += syncRequestBatchSize { + wg.Add(1) + go func(j int) { + defer wg.Done() - nreq := batchSize - len(bstout) - bstips, err := syncer.Exchange.GetChainMessages(ctx, next, uint64(nreq)) - if err != nil { - // TODO check errors for temporary nature - if windowSize > 1 { - windowSize /= 2 - log.Infof("error fetching messages: %s; reducing window size to %d and trying again", err, windowSize) - continue mainLoop + nreq := syncRequestBatchSize + if j*syncRequestBatchSize+nreq > batchSize { + nreq = batchSize - j*syncRequestBatchSize } - return xerrors.Errorf("message processing failed: %w", err) - } - bstout = append(bstout, bstips...) - nextI += len(bstips) + failed := false + for offset := 0; !failed && offset < nreq; { + nextI := (i + 1) - batchSize + j*syncRequestBatchSize + offset + nextHeader := headers[nextI] + + var requestErr error + var requestResult []*exchange.CompactedMessages + for retry := 0; requestResult == nil && retry < syncRequestRetries; retry++ { + if retry > 0 { + log.Infof("fetching messages at %d (retry %d)", nextI, retry) + } else { + log.Infof("fetching messages at %d", nextI) + } + + result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset)) + if err != nil { + requestErr = multierror.Append(requestErr, err) + } else { + requestResult = result + } + } + + mx.Lock() + if requestResult == nil { + // we failed! + log.Errorf("error fetching messages at %d: %s", nextI, requestErr) + batchErr = multierror.Append(batchErr, requestErr) + failed = true + } else { + copy(bstout[j*syncRequestBatchSize+offset:], requestResult) + offset += len(requestResult) + } + mx.Unlock() + } + }(j) } + wg.Wait() ss.SetStage(api.StageMessages) + if batchErr != nil { + return xerrors.Errorf("failed to fetch messages: %w", err) + } + for bsi := 0; bsi < len(bstout); bsi++ { // temp storage so we don't persist data we dont want to bs := bstore.NewTemporary() @@ -1555,23 +1579,9 @@ mainLoop: } } - if i >= windowSize { - newWindowSize := windowSize + 10 - if newWindowSize > int(exchange.MaxRequestLength) { - newWindowSize = int(exchange.MaxRequestLength) - } - if newWindowSize > windowSize { - windowSize = newWindowSize - log.Infof("successfully fetched %d messages; increasing window size to %d", len(bstout), windowSize) - } - } - i -= batchSize } - // remember our window size - syncer.windowSize = windowSize - return nil } From 05a233f84d6de0916b1135f46eef3a61c7aa02ae Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 16 Sep 2020 21:43:35 +0300 Subject: [PATCH 06/25] add some more logging --- chain/sync.go | 1 + 1 file changed, 1 insertion(+) diff --git a/chain/sync.go b/chain/sync.go index 882fb1d95..87d2cf6f4 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1537,6 +1537,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS batchErr = multierror.Append(batchErr, requestErr) failed = true } else { + log.Infof("fetched messages for %d tipsets", len(requestResult)) copy(bstout[j*syncRequestBatchSize+offset:], requestResult) offset += len(requestResult) } From b984e94a87237b14ef1eb90df95d5979b4c665bf Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 16 Sep 2020 21:52:08 +0300 Subject: [PATCH 07/25] fix bug --- chain/sync.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index 87d2cf6f4..80a5ad423 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1504,13 +1504,13 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS defer wg.Done() nreq := syncRequestBatchSize - if j*syncRequestBatchSize+nreq > batchSize { - nreq = batchSize - j*syncRequestBatchSize + if j+nreq > batchSize { + nreq = batchSize - j } failed := false for offset := 0; !failed && offset < nreq; { - nextI := (i + 1) - batchSize + j*syncRequestBatchSize + offset + nextI := (i + 1) - batchSize + j + offset nextHeader := headers[nextI] var requestErr error @@ -1537,8 +1537,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS batchErr = multierror.Append(batchErr, requestErr) failed = true } else { - log.Infof("fetched messages for %d tipsets", len(requestResult)) - copy(bstout[j*syncRequestBatchSize+offset:], requestResult) + copy(bstout[j+offset:], requestResult) offset += len(requestResult) } mx.Unlock() From 8a4b629f407a9b0ff1e39e9eeb5ba7d541c4adfc Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 16 Sep 2020 21:55:51 +0300 Subject: [PATCH 08/25] increase sync request batch size to 8 --- chain/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/sync.go b/chain/sync.go index 80a5ad423..554f81ee9 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -65,7 +65,7 @@ var ( log = logging.Logger("chain") concurrentSyncRequests = 16 - syncRequestBatchSize = 4 + syncRequestBatchSize = 8 syncRequestRetries = 5 ) From 2a428f09e67f01bdd927f09e9e40f758b2bd60e1 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 16 Sep 2020 22:09:36 +0300 Subject: [PATCH 09/25] increase exchange ShufflePeersPrefix to 16, use that as the value of concurrent sync requests --- chain/exchange/protocol.go | 2 +- chain/sync.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/chain/exchange/protocol.go b/chain/exchange/protocol.go index ca4b61836..ac02cf60f 100644 --- a/chain/exchange/protocol.go +++ b/chain/exchange/protocol.go @@ -40,7 +40,7 @@ const ( WriteReqDeadline = 5 * time.Second ReadResDeadline = WriteReqDeadline ReadResMinSpeed = 50 << 10 - ShufflePeersPrefix = 5 + ShufflePeersPrefix = 16 WriteResDeadline = 60 * time.Second ) diff --git a/chain/sync.go b/chain/sync.go index 554f81ee9..0ab8ac183 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -64,7 +64,7 @@ var ( log = logging.Logger("chain") - concurrentSyncRequests = 16 + concurrentSyncRequests = exchange.ShufflePeersPrefix syncRequestBatchSize = 8 syncRequestRetries = 5 ) From fb605f6d7fedea4704882a631d3155b0c280c599 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 17 Sep 2020 17:21:26 +0300 Subject: [PATCH 10/25] refactor parallel fetch logic into a separate function --- chain/sync.go | 118 ++++++++++++++++++++++++++++---------------------- 1 file changed, 67 insertions(+), 51 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index 0ab8ac183..95c2e2e84 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1494,57 +1494,8 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS } ss.SetStage(api.StageFetchingMessages) - bstout := make([]*exchange.CompactedMessages, batchSize) - var wg sync.WaitGroup - var mx sync.Mutex - var batchErr error - for j := 0; j < batchSize; j += syncRequestBatchSize { - wg.Add(1) - go func(j int) { - defer wg.Done() - - nreq := syncRequestBatchSize - if j+nreq > batchSize { - nreq = batchSize - j - } - - failed := false - for offset := 0; !failed && offset < nreq; { - nextI := (i + 1) - batchSize + j + offset - nextHeader := headers[nextI] - - var requestErr error - var requestResult []*exchange.CompactedMessages - for retry := 0; requestResult == nil && retry < syncRequestRetries; retry++ { - if retry > 0 { - log.Infof("fetching messages at %d (retry %d)", nextI, retry) - } else { - log.Infof("fetching messages at %d", nextI) - } - - result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset)) - if err != nil { - requestErr = multierror.Append(requestErr, err) - } else { - requestResult = result - } - } - - mx.Lock() - if requestResult == nil { - // we failed! - log.Errorf("error fetching messages at %d: %s", nextI, requestErr) - batchErr = multierror.Append(batchErr, requestErr) - failed = true - } else { - copy(bstout[j+offset:], requestResult) - offset += len(requestResult) - } - mx.Unlock() - } - }(j) - } - wg.Wait() + startOffset := i + 1 - batchSize + bstout, batchErr := syncer.fetchMessages(ctx, headers[startOffset:startOffset+batchSize], startOffset) ss.SetStage(api.StageMessages) if batchErr != nil { @@ -1585,6 +1536,71 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS return nil } +func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet, startOffset int) ([]*exchange.CompactedMessages, error) { + batchSize := len(headers) + batch := make([]*exchange.CompactedMessages, batchSize) + + var wg sync.WaitGroup + var mx sync.Mutex + var batchErr error + + start := build.Clock.Now() + + for j := 0; j < batchSize; j += syncRequestBatchSize { + wg.Add(1) + go func(j int) { + defer wg.Done() + + nreq := syncRequestBatchSize + if j+nreq > batchSize { + nreq = batchSize - j + } + + failed := false + for offset := 0; !failed && offset < nreq; { + nextI := j + offset + nextHeader := headers[nextI] + + var requestErr error + var requestResult []*exchange.CompactedMessages + for retry := 0; requestResult == nil && retry < syncRequestRetries; retry++ { + if retry > 0 { + log.Infof("fetching messages at %d (retry %d)", startOffset+nextI, retry) + } else { + log.Infof("fetching messages at %d", startOffset+nextI) + } + + result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset)) + if err != nil { + requestErr = multierror.Append(requestErr, err) + } else { + requestResult = result + } + } + + mx.Lock() + if requestResult != nil { + copy(batch[j+offset:], requestResult) + offset += len(requestResult) + } else { + log.Errorf("error fetching messages at %d: %s", nextI, requestErr) + batchErr = multierror.Append(batchErr, requestErr) + failed = true + } + mx.Unlock() + } + }(j) + } + wg.Wait() + + if batchErr != nil { + dt := build.Clock.Since(start) + log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, dt) + } + + return batch, batchErr +} + func persistMessages(bs bstore.Blockstore, bst *exchange.CompactedMessages) error { for _, m := range bst.Bls { //log.Infof("putting BLS message: %s", m.Cid()) From d7948fcbcd687961f746d88ed55a2afc13024aeb Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 17 Sep 2020 17:33:52 +0300 Subject: [PATCH 11/25] fix log; we want to log time when we succeed! --- chain/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/sync.go b/chain/sync.go index 95c2e2e84..d09ba84de 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1593,7 +1593,7 @@ func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet } wg.Wait() - if batchErr != nil { + if batchErr == nil { dt := build.Clock.Since(start) log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, dt) } From 2946561decdb7bd7b9056cc256c6e32294a13279 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 17 Sep 2020 17:35:40 +0300 Subject: [PATCH 12/25] clean up return code --- chain/sync.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index d09ba84de..9c9714447 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1593,11 +1593,12 @@ func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet } wg.Wait() - if batchErr == nil { - dt := build.Clock.Since(start) - log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, dt) + if batchErr != nil { + return nil, batchErr } + log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, build.Clock.Since(start)) + return batch, batchErr } From 6dfc40abc1c9152d3fb59061f2008b54b0b63872 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 17 Sep 2020 18:23:50 +0300 Subject: [PATCH 13/25] error is nil at end, so return batch, nil --- chain/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/sync.go b/chain/sync.go index 9c9714447..03ae1cd4f 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1599,7 +1599,7 @@ func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, build.Clock.Since(start)) - return batch, batchErr + return batch, nil } func persistMessages(bs bstore.Blockstore, bst *exchange.CompactedMessages) error { From f135ec84682f072626e90b9b43f65ad535ddaa33 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 21 Sep 2020 09:21:25 +0300 Subject: [PATCH 14/25] fix handling of end of sync --- chain/sync.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index 03ae1cd4f..74dc5aa1a 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1486,11 +1486,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS batchSize := concurrentSyncRequests * syncRequestBatchSize if i < batchSize { - if i == 0 { - batchSize = 1 - } else { - batchSize = i - } + batchSize = i + 1 } ss.SetStage(api.StageFetchingMessages) From 9b91628d858a35f49de2f0f9ecc691a1cba3fbaf Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 22 Sep 2020 11:05:12 -0500 Subject: [PATCH 15/25] begin adding simple api server for deal stats --- cmd/lotus-shed/dealtracker.go | 78 +++++++++++++++++++++++++++++++++++ cmd/lotus-shed/main.go | 1 + 2 files changed, 79 insertions(+) create mode 100644 cmd/lotus-shed/dealtracker.go diff --git a/cmd/lotus-shed/dealtracker.go b/cmd/lotus-shed/dealtracker.go new file mode 100644 index 000000000..9f97337ee --- /dev/null +++ b/cmd/lotus-shed/dealtracker.go @@ -0,0 +1,78 @@ +package main + +import ( + "context" + "encoding/json" + "net/http" + + "github.com/filecoin-project/lotus/api" + lcli "github.com/filecoin-project/lotus/cli" + "github.com/urfave/cli/v2" +) + +type dealStatsServer struct { + api api.FullNode +} + +type dealCountResp struct { + Total int64 `json:"total"` + Epoch int64 `json:"epoch"` +} + +func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *http.Request) { + ctx := context.Background() + + head, err := dss.api.ChainHead(ctx) + if err != nil { + log.Warnf("failed to get chain head: %s", err) + w.WriteHeader(500) + return + } + + deals, err := dss.api.StateMarketDeals(ctx, head.Key()) + if err != nil { + log.Warnf("failed to get market deals: %s", err) + w.WriteHeader(500) + return + } + + if err := json.NewEncoder(w).Encode(&dealCountResp{ + Total: int64(len(deals)), + Epoch: int64(head.Height()), + }); err != nil { + log.Warnf("failed to write back deal count response: %s", err) + return + } +} + +func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, r *http.Request) { + +} + +func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r *http.Request) { + +} + +var serveDealStatsCmd = &cli.Command{ + Name: "serve-deal-stats", + Flags: []cli.Flag{}, + Action: func(cctx *cli.Context) error { + api, closer, err := lcli.GetFullNodeAPI(cctx) + if err != nil { + return err + } + + defer closer() + ctx := lcli.ReqContext(cctx) + + _ = ctx + + dss := &dealStatsServer{api} + + http.HandleFunc("/api/storagedeal/count", dss.handleStorageDealCount) + http.HandleFunc("/api/storagedeal/averagesize", dss.handleStorageDealAverageSize) + http.HandleFunc("/api/storagedeal/totalreal", dss.handleStorageDealTotalReal) + + panic(http.ListenAndServe(":7272", nil)) + }, +} diff --git a/cmd/lotus-shed/main.go b/cmd/lotus-shed/main.go index 1a56756d1..118b4ea72 100644 --- a/cmd/lotus-shed/main.go +++ b/cmd/lotus-shed/main.go @@ -36,6 +36,7 @@ func main() { mpoolStatsCmd, exportChainCmd, consensusCmd, + serveDealStatsCmd, } app := &cli.App{ From f96698b54df95fcb44bbcf9cd62e9f710832196f Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 22 Sep 2020 12:54:39 -0500 Subject: [PATCH 16/25] finish up other endpoints --- cmd/lotus-shed/dealtracker.go | 136 +++++++++++++++++++++++++++++++++- 1 file changed, 134 insertions(+), 2 deletions(-) diff --git a/cmd/lotus-shed/dealtracker.go b/cmd/lotus-shed/dealtracker.go index 9f97337ee..1d3dad058 100644 --- a/cmd/lotus-shed/dealtracker.go +++ b/cmd/lotus-shed/dealtracker.go @@ -5,8 +5,10 @@ import ( "encoding/json" "net/http" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/api" lcli "github.com/filecoin-project/lotus/cli" + "github.com/ipfs/go-cid" "github.com/urfave/cli/v2" ) @@ -14,11 +16,34 @@ type dealStatsServer struct { api api.FullNode } +var filteredClients map[address.Address]bool + +func init() { + filteredClients = make(map[address.Address]bool) + for _, a := range []string{"t0112", "t0113", "t0114", "t010089"} { + addr, err := address.NewFromString(a) + if err != nil { + panic(err) + } + filteredClients[addr] = true + } +} + type dealCountResp struct { Total int64 `json:"total"` Epoch int64 `json:"epoch"` } +func filterDeals(deals map[string]api.MarketDeal) []*api.MarketDeal { + out := make([]*api.MarketDeal, 0, len(deals)) + for _, d := range deals { + if !filteredClients[d.Proposal.Client] { + out = append(out, &d) + } + } + return out +} + func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *http.Request) { ctx := context.Background() @@ -36,8 +61,15 @@ func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *htt return } + var count int64 + for _, d := range deals { + if !filteredClients[d.Proposal.Client] { + count++ + } + } + if err := json.NewEncoder(w).Encode(&dealCountResp{ - Total: int64(len(deals)), + Total: count, Epoch: int64(head.Height()), }); err != nil { log.Warnf("failed to write back deal count response: %s", err) @@ -45,14 +77,113 @@ func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *htt } } -func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, r *http.Request) { +type dealAverageResp struct { + AverageSize int64 `json:"averageSize"` + Epoch int64 `json:"epoch"` +} +func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, r *http.Request) { + ctx := context.Background() + + head, err := dss.api.ChainHead(ctx) + if err != nil { + log.Warnf("failed to get chain head: %s", err) + w.WriteHeader(500) + return + } + + deals, err := dss.api.StateMarketDeals(ctx, head.Key()) + if err != nil { + log.Warnf("failed to get market deals: %s", err) + w.WriteHeader(500) + return + } + + var count int64 + var totalBytes int64 + for _, d := range deals { + if !filteredClients[d.Proposal.Client] { + count++ + totalBytes += int64(d.Proposal.PieceSize.Unpadded()) + } + } + + if err := json.NewEncoder(w).Encode(&dealAverageResp{ + AverageSize: totalBytes / count, + Epoch: int64(head.Height()), + }); err != nil { + log.Warnf("failed to write back deal average response: %s", err) + return + } } func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r *http.Request) { } +type clientStatsOutput struct { + Client address.Address `json:"client"` + DataSize int64 `json:"data_size"` + NumCids int `json:"num_cids"` + NumDeals int `json:"num_deals"` + NumMiners int `json:"num_miners"` + + cids map[cid.Cid]bool + providers map[address.Address]bool +} + +func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *http.Request) { + ctx := context.Background() + + head, err := dss.api.ChainHead(ctx) + if err != nil { + log.Warnf("failed to get chain head: %s", err) + w.WriteHeader(500) + return + } + + deals, err := dss.api.StateMarketDeals(ctx, head.Key()) + if err != nil { + log.Warnf("failed to get market deals: %s", err) + w.WriteHeader(500) + return + } + + stats := make(map[address.Address]*clientStatsOutput) + + for _, d := range deals { + if filteredClients[d.Proposal.Client] { + continue + } + + st, ok := stats[d.Proposal.Client] + if !ok { + st = &clientStatsOutput{ + Client: d.Proposal.Client, + cids: make(map[cid.Cid]bool), + providers: make(map[address.Address]bool), + } + stats[d.Proposal.Client] = st + } + + st.DataSize += int64(d.Proposal.PieceSize.Unpadded()) + st.cids[d.Proposal.PieceCID] = true + st.providers[d.Proposal.Provider] = true + st.NumDeals++ + } + + out := make([]*clientStatsOutput, 0, len(stats)) + for _, cso := range stats { + cso.NumCids = len(cso.cids) + cso.NumMiners = len(cso.providers) + } + + if err := json.NewEncoder(w).Encode(out); err != nil { + log.Warnf("failed to write back client stats response: %s", err) + return + } +} + var serveDealStatsCmd = &cli.Command{ Name: "serve-deal-stats", Flags: []cli.Flag{}, @@ -72,6 +203,7 @@ var serveDealStatsCmd = &cli.Command{ http.HandleFunc("/api/storagedeal/count", dss.handleStorageDealCount) http.HandleFunc("/api/storagedeal/averagesize", dss.handleStorageDealAverageSize) http.HandleFunc("/api/storagedeal/totalreal", dss.handleStorageDealTotalReal) + http.HandleFunc("/api/storagedeal/clientstats", dss.handleStorageClientStats) panic(http.ListenAndServe(":7272", nil)) }, From 3153ab9ae3672a5a7b1aec1384364fdc50169e13 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 22 Sep 2020 13:24:49 -0500 Subject: [PATCH 17/25] allow graceful shutdown --- cmd/lotus-shed/dealtracker.go | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/cmd/lotus-shed/dealtracker.go b/cmd/lotus-shed/dealtracker.go index 1d3dad058..cbe375803 100644 --- a/cmd/lotus-shed/dealtracker.go +++ b/cmd/lotus-shed/dealtracker.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/json" + "net" "net/http" "github.com/filecoin-project/go-address" @@ -200,11 +201,28 @@ var serveDealStatsCmd = &cli.Command{ dss := &dealStatsServer{api} - http.HandleFunc("/api/storagedeal/count", dss.handleStorageDealCount) - http.HandleFunc("/api/storagedeal/averagesize", dss.handleStorageDealAverageSize) - http.HandleFunc("/api/storagedeal/totalreal", dss.handleStorageDealTotalReal) - http.HandleFunc("/api/storagedeal/clientstats", dss.handleStorageClientStats) + mux := &http.ServeMux{} + mux.HandleFunc("/api/storagedeal/count", dss.handleStorageDealCount) + mux.HandleFunc("/api/storagedeal/averagesize", dss.handleStorageDealAverageSize) + mux.HandleFunc("/api/storagedeal/totalreal", dss.handleStorageDealTotalReal) + mux.HandleFunc("/api/storagedeal/clientstats", dss.handleStorageClientStats) - panic(http.ListenAndServe(":7272", nil)) + s := &http.Server{ + Addr: ":7272", + Handler: mux, + } + + go func() { + <-ctx.Done() + s.Shutdown(context.TODO()) + }() + + list, err := net.Listen("tcp", ":7272") + if err != nil { + panic(err) + } + + s.Serve(list) + return nil }, } From 3cf2fd595f7f74561a5856befa69afe6ee8d570f Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 22 Sep 2020 13:27:58 -0500 Subject: [PATCH 18/25] fix appending to array --- cmd/lotus-shed/dealtracker.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/lotus-shed/dealtracker.go b/cmd/lotus-shed/dealtracker.go index cbe375803..57f42bc83 100644 --- a/cmd/lotus-shed/dealtracker.go +++ b/cmd/lotus-shed/dealtracker.go @@ -177,6 +177,8 @@ func (dss *dealStatsServer) handleStorageClientStats(w http.ResponseWriter, r *h for _, cso := range stats { cso.NumCids = len(cso.cids) cso.NumMiners = len(cso.providers) + + out = append(out, cso) } if err := json.NewEncoder(w).Encode(out); err != nil { From 88ada66280807b2000f78a5fca5f186bbf233df5 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 22 Sep 2020 13:31:01 -0500 Subject: [PATCH 19/25] finish up the total bytes endpoint --- cmd/lotus-shed/dealtracker.go | 37 ++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/cmd/lotus-shed/dealtracker.go b/cmd/lotus-shed/dealtracker.go index 57f42bc83..18dc959f7 100644 --- a/cmd/lotus-shed/dealtracker.go +++ b/cmd/lotus-shed/dealtracker.go @@ -79,7 +79,7 @@ func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *htt } type dealAverageResp struct { - AverageSize int64 `json:"averageSize"` + AverageSize int64 `json:"average_size"` Epoch int64 `json:"epoch"` } @@ -118,7 +118,42 @@ func (dss *dealStatsServer) handleStorageDealAverageSize(w http.ResponseWriter, } } +type dealTotalResp struct { + TotalBytes int64 `json:"total_size"` + Epoch int64 `json:"epoch"` +} + func (dss *dealStatsServer) handleStorageDealTotalReal(w http.ResponseWriter, r *http.Request) { + ctx := context.Background() + + head, err := dss.api.ChainHead(ctx) + if err != nil { + log.Warnf("failed to get chain head: %s", err) + w.WriteHeader(500) + return + } + + deals, err := dss.api.StateMarketDeals(ctx, head.Key()) + if err != nil { + log.Warnf("failed to get market deals: %s", err) + w.WriteHeader(500) + return + } + + var totalBytes int64 + for _, d := range deals { + if !filteredClients[d.Proposal.Client] { + totalBytes += int64(d.Proposal.PieceSize.Unpadded()) + } + } + + if err := json.NewEncoder(w).Encode(&dealTotalResp{ + TotalBytes: totalBytes, + Epoch: int64(head.Height()), + }); err != nil { + log.Warnf("failed to write back deal average response: %s", err) + return + } } From 91a43c477c5ec8c8911ada49cbe5485b49273472 Mon Sep 17 00:00:00 2001 From: jennijuju Date: Mon, 21 Sep 2020 21:41:17 -0400 Subject: [PATCH 20/25] When doing `sectors update-state`, show a list of existing states if user inputs an invalid one. --- cmd/lotus-storage-miner/sectors.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index 27a5c31be..370962bdc 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -404,8 +404,9 @@ var sectorsCapacityCollateralCmd = &cli.Command{ } var sectorsUpdateCmd = &cli.Command{ - Name: "update-state", - Usage: "ADVANCED: manually update the state of a sector, this may aid in error recovery", + Name: "update-state", + Usage: "ADVANCED: manually update the state of a sector, this may aid in error recovery", + ArgsUsage: " ", Flags: []cli.Flag{ &cli.BoolFlag{ Name: "really-do-it", @@ -431,8 +432,13 @@ var sectorsUpdateCmd = &cli.Command{ return xerrors.Errorf("could not parse sector number: %w", err) } - if _, ok := sealing.ExistSectorStateList[sealing.SectorState(cctx.Args().Get(1))]; !ok { - return xerrors.Errorf("Not existing sector state") + newState := cctx.Args().Get(1) + if _, ok := sealing.ExistSectorStateList[sealing.SectorState(newState)]; !ok { + fmt.Printf(" \"%s\" is not a valid state. Possible states for sectors are: \n", newState) + for state := range sealing.ExistSectorStateList { + fmt.Printf("%s\n", string(state)) + } + return nil } return nodeApi.SectorsUpdate(ctx, abi.SectorNumber(id), api.SectorState(cctx.Args().Get(1))) From f1ab1af6170e822803073f9ae74ad20d00fdda43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 23 Sep 2020 17:42:01 +0100 Subject: [PATCH 21/25] add init.State#Remove() for testing. --- chain/actors/builtin/init/init.go | 5 +++++ chain/actors/builtin/init/v0.go | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/chain/actors/builtin/init/init.go b/chain/actors/builtin/init/init.go index 1164891f8..f235450c2 100644 --- a/chain/actors/builtin/init/init.go +++ b/chain/actors/builtin/init/init.go @@ -36,4 +36,9 @@ type State interface { NetworkName() (dtypes.NetworkName, error) ForEachActor(func(id abi.ActorID, address address.Address) error) error + + // Remove exists to support tooling that manipulates state for testing. + // It should not be used in production code, as init actor entries are + // immutable. + Remove(addrs ...address.Address) error } diff --git a/chain/actors/builtin/init/v0.go b/chain/actors/builtin/init/v0.go index 717ed9669..425ba654c 100644 --- a/chain/actors/builtin/init/v0.go +++ b/chain/actors/builtin/init/v0.go @@ -4,6 +4,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" cbg "github.com/whyrusleeping/cbor-gen" + "golang.org/x/xerrors" init_ "github.com/filecoin-project/specs-actors/actors/builtin/init" @@ -46,3 +47,21 @@ func (s *state0) ForEachActor(cb func(id abi.ActorID, address address.Address) e func (s *state0) NetworkName() (dtypes.NetworkName, error) { return dtypes.NetworkName(s.State.NetworkName), nil } + +func (s *state0) Remove(addrs ...address.Address) (err error) { + m, err := adt0.AsMap(s.store, s.State.AddressMap) + if err != nil { + return err + } + for _, addr := range addrs { + if err = m.Delete(abi.AddrKey(addr)); err != nil { + return xerrors.Errorf("failed to delete entry for address: %s; err: %w", addr, err) + } + } + amr, err := m.Root() + if err != nil { + return xerrors.Errorf("failed to get address map root: %w", err) + } + s.State.AddressMap = amr + return nil +} From 6c5ed3f07f287e375c171818f001f13e5f0dd7ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 23 Sep 2020 19:31:36 +0200 Subject: [PATCH 22/25] Some safeguards on chain delete-obj --- cli/chain.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/cli/chain.go b/cli/chain.go index 0f1e36518..bd72c2030 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -195,9 +195,15 @@ var chainReadObjCmd = &cli.Command{ } var chainDeleteObjCmd = &cli.Command{ - Name: "delete-obj", - Usage: "Delete an object", - ArgsUsage: "[objectCid]", + Name: "delete-obj", + Usage: "Delete an object from the chain blockstore", + Description: "WARNING: Removing wrong objects from the chain blockstore may lead to sync issues", + ArgsUsage: "[objectCid]", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "really-do-it", + }, + }, Action: func(cctx *cli.Context) error { api, closer, err := GetFullNodeAPI(cctx) if err != nil { @@ -211,6 +217,10 @@ var chainDeleteObjCmd = &cli.Command{ return fmt.Errorf("failed to parse cid input: %s", err) } + if !cctx.Bool("really-do-it") { + return xerrors.Errorf("pass the --really-do-it flag to proceed") + } + err = api.ChainDeleteObj(ctx, c) if err != nil { return err From 3fc791b0e8c1c12b2d729123e6cf3bb9386a1eca Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 23 Sep 2020 12:16:26 -0700 Subject: [PATCH 23/25] feat(markets): update markets v0.6.2 --- go.mod | 10 +++++----- go.sum | 31 ++++++++++++++----------------- node/impl/client/client.go | 2 +- 3 files changed, 20 insertions(+), 23 deletions(-) diff --git a/go.mod b/go.mod index 6706a3eda..cfdf4cb1d 100644 --- a/go.mod +++ b/go.mod @@ -25,9 +25,9 @@ require ( github.com/filecoin-project/go-bitfield v0.2.0 github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 - github.com/filecoin-project/go-data-transfer v0.6.4 + github.com/filecoin-project/go-data-transfer v0.6.5 github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f - github.com/filecoin-project/go-fil-markets v0.6.1-0.20200917052354-ee0af754c6e9 + github.com/filecoin-project/go-fil-markets v0.6.2 github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200822201400-474f4fdccc52 github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20 @@ -60,7 +60,7 @@ require ( github.com/ipfs/go-ds-measure v0.1.0 github.com/ipfs/go-filestore v1.0.0 github.com/ipfs/go-fs-lock v0.0.6 - github.com/ipfs/go-graphsync v0.1.2 + github.com/ipfs/go-graphsync v0.2.0 github.com/ipfs/go-ipfs-blockstore v1.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-ds-help v1.0.0 @@ -77,8 +77,8 @@ require ( github.com/ipfs/go-path v0.0.7 github.com/ipfs/go-unixfs v0.2.4 github.com/ipfs/interface-go-ipfs-core v0.2.3 - github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae - github.com/ipld/go-ipld-prime v0.0.4-0.20200828224805-5ff8c8b0b6ef + github.com/ipld/go-car v0.1.1-0.20200923150018-8cdef32e2da4 + github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f github.com/kelseyhightower/envconfig v1.4.0 github.com/lib/pq v1.7.0 github.com/libp2p/go-eventbus v0.2.1 diff --git a/go.sum b/go.sum index e62cb92c9..3b31e6c3b 100644 --- a/go.sum +++ b/go.sum @@ -222,13 +222,12 @@ github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:a github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= -github.com/filecoin-project/go-data-transfer v0.6.3/go.mod h1:PmBKVXkhh67/tnEdJXQwDHl5mT+7Tbcwe1NPninqhnM= -github.com/filecoin-project/go-data-transfer v0.6.4 h1:Q08ABa+cOTOLoAyHeA94fPLcwu53p6eeAaxMxQb0m0A= -github.com/filecoin-project/go-data-transfer v0.6.4/go.mod h1:PmBKVXkhh67/tnEdJXQwDHl5mT+7Tbcwe1NPninqhnM= +github.com/filecoin-project/go-data-transfer v0.6.5 h1:oP20la8Z0CLrw0uqvt6xVgw6rOevZeGJ9GNQeC0OCSU= +github.com/filecoin-project/go-data-transfer v0.6.5/go.mod h1:I9Ylb/UiZyqnI41wUoCXq/le0nDLhlwpFQCtNPxEPOA= github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1:GxJzR3oRIMTPtpZ0b7QF8FKPK6/iPAc7trhlL5k/g+s= github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= -github.com/filecoin-project/go-fil-markets v0.6.1-0.20200917052354-ee0af754c6e9 h1:SnCUC9wHDId9TtV8PsQp8q1OOsi+NOLOwitIDnAgUa4= -github.com/filecoin-project/go-fil-markets v0.6.1-0.20200917052354-ee0af754c6e9/go.mod h1:PLr9svZxsnHkae1Ky7+66g7fP9AlneVxIVu+oSMq56A= +github.com/filecoin-project/go-fil-markets v0.6.2 h1:9Z57KeaQSa1liCmT1pH6SIjrn9mGTDFJXmR2WQVuaiY= +github.com/filecoin-project/go-fil-markets v0.6.2/go.mod h1:wtN4Hc/1hoVCpWhSWYxwUxH3PQtjSkWWuC1nQjiIWog= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= @@ -396,9 +395,8 @@ github.com/gxed/go-shellwords v1.0.3/go.mod h1:N7paucT91ByIjmVJHhvoarjoQnmsi3Jd3 github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU= github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= github.com/gxed/pubsub v0.0.0-20180201040156-26ebdf44f824/go.mod h1:OiEWyHgK+CWrmOlVquHaIK1vhpUJydC9m0Je6mhaiNE= -github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099/go.mod h1:WVPCl0HO/0RAL5+vBH2GMxBomlxBF70MAS78+Lu1//k= -github.com/hannahhoward/cbor-gen-for v0.0.0-20200723175505-5892b522820a h1:wfqh5oiHXvn3Rk54xy8Cwqh+HnYihGnjMNzdNb3/ld0= -github.com/hannahhoward/cbor-gen-for v0.0.0-20200723175505-5892b522820a/go.mod h1:jvfsLIxk0fY/2BKSQ1xf2406AKA5dwMmKKv0ADcOfN8= +github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1 h1:F9k+7wv5OIk1zcq23QpdiL0hfDuXPjuOmMNaC6fgQ0Q= +github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1/go.mod h1:jvfsLIxk0fY/2BKSQ1xf2406AKA5dwMmKKv0ADcOfN8= github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e h1:3YKHER4nmd7b5qy5t0GWDTwSn4OyRgfAXSmo6VnryBY= github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e/go.mod h1:I8h3MITA53gN9OnWGCgaMa0JWVRdXthWw4M3CPM54OY= github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= @@ -504,8 +502,8 @@ github.com/ipfs/go-filestore v1.0.0/go.mod h1:/XOCuNtIe2f1YPbiXdYvD0BKLA0JR1MgPi github.com/ipfs/go-fs-lock v0.0.6 h1:sn3TWwNVQqSeNjlWy6zQ1uUGAZrV3hPOyEA6y1/N2a0= github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28L7zESmM= github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE= -github.com/ipfs/go-graphsync v0.1.2 h1:25Ll9kIXCE+DY0dicvfS3KMw+U5sd01b/FJbA7KAbhg= -github.com/ipfs/go-graphsync v0.1.2/go.mod h1:sLXVXm1OxtE2XYPw62MuXCdAuNwkAdsbnfrmos5odbA= +github.com/ipfs/go-graphsync v0.2.0 h1:x94MvHLNuRwBlZzVal7tR1RYK7T7H6bqQLPopxDbIF0= +github.com/ipfs/go-graphsync v0.2.0/go.mod h1:gEBvJUNelzMkaRPJTpg/jaKN4AQW/7wDWu0K92D8o10= github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= @@ -609,14 +607,14 @@ github.com/ipfs/iptb v1.4.0 h1:YFYTrCkLMRwk/35IMyC6+yjoQSHTEcNcefBStLJzgvo= github.com/ipfs/iptb v1.4.0/go.mod h1:1rzHpCYtNp87/+hTxG5TfCVn/yMY3dKnLn8tBiMfdmg= github.com/ipfs/iptb-plugins v0.2.1 h1:au4HWn9/pRPbkxA08pDx2oRAs4cnbgQWgV0teYXuuGA= github.com/ipfs/iptb-plugins v0.2.1/go.mod h1:QXMbtIWZ+jRsW8a4h13qAKU7jcM7qaittO8wOsTP0Rs= -github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae h1:OV9dxl8iPMCOD8Vi/hvFwRh3JWPXqmkYSVxWr9JnEzM= -github.com/ipld/go-car v0.1.1-0.20200526133713-1c7508d55aae/go.mod h1:2mvxpu4dKRnuH3mj5u6KW/tmRSCcXvy/KYiJ4nC6h4c= +github.com/ipld/go-car v0.1.1-0.20200923150018-8cdef32e2da4 h1:6phjU3kXvCEWOZpu+Ob0w6DzgPFZmDLgLPxJhD8RxEY= +github.com/ipld/go-car v0.1.1-0.20200923150018-8cdef32e2da4/go.mod h1:xrMEcuSq+D1vEwl+YAXsg/JfA98XGpXDwnkIL4Aimqw= github.com/ipld/go-ipld-prime v0.0.2-0.20200428162820-8b59dc292b8e/go.mod h1:uVIwe/u0H4VdKv3kaN1ck7uCb6yD9cFLS9/ELyXbsw8= -github.com/ipld/go-ipld-prime v0.0.4-0.20200828224805-5ff8c8b0b6ef h1:/yPelt/0CuzZsmRkYzBBnJ499JnAOGaIaAXHujx96ic= -github.com/ipld/go-ipld-prime v0.0.4-0.20200828224805-5ff8c8b0b6ef/go.mod h1:uVIwe/u0H4VdKv3kaN1ck7uCb6yD9cFLS9/ELyXbsw8= +github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f h1:XpOuNQ5GbXxUcSukbQcW9jkE7REpaFGJU2/T00fo9kA= +github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM= github.com/ipld/go-ipld-prime-proto v0.0.0-20200428191222-c1ffdadc01e1/go.mod h1:OAV6xBmuTLsPZ+epzKkPB1e25FHk/vCtyatkdHcArLs= -github.com/ipld/go-ipld-prime-proto v0.0.0-20200828231332-ae0aea07222b h1:ZtlW6pubN17TDaStlxgrwEXXwwUfJaXu9RobwczXato= -github.com/ipld/go-ipld-prime-proto v0.0.0-20200828231332-ae0aea07222b/go.mod h1:OAV6xBmuTLsPZ+epzKkPB1e25FHk/vCtyatkdHcArLs= +github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6 h1:6Mq+tZGSEMEoJJ1NbJRhddeelkXZcU8yfH/ZRYUo/Es= +github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6/go.mod h1:3pHYooM9Ea65jewRwrb2u5uHZCNkNTe9ABsVB+SrkH0= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4= github.com/jackpal/gateway v1.0.4/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= @@ -1362,7 +1360,6 @@ github.com/weaveworks/promrus v1.2.0/go.mod h1:SaE82+OJ91yqjrE1rsvBWVzNZKcHYFtMU github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc/go.mod h1:r45hJU7yEoA81k6MWNhpMj/kms0n14dkzkxYHoB96UM= github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba h1:X4n8JG2e2biEZZXdBKt9HX7DN3bYGFUqljqqy0DqgnY= github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba/go.mod h1:CHQnYnQUEPydYCwuy8lmTHfGmdw9TKrhWV0xLx8l0oM= -github.com/whyrusleeping/cbor-gen v0.0.0-20191212224538-d370462a7e8a/go.mod h1:xdlJQaiqipF0HW+Mzpg7XRM3fWbGvfgFlcppuvlkIvY= github.com/whyrusleeping/cbor-gen v0.0.0-20191216205031-b047b6acb3c0/go.mod h1:xdlJQaiqipF0HW+Mzpg7XRM3fWbGvfgFlcppuvlkIvY= github.com/whyrusleeping/cbor-gen v0.0.0-20200123233031-1cdf64d27158/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI= github.com/whyrusleeping/cbor-gen v0.0.0-20200402171437-3d27c146c105/go.mod h1:Xj/M2wWU+QdTdRbu/L/1dIZY8/Wb2K9pAhtroQuxJJI= diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 8b47144af..81978af16 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -711,7 +711,7 @@ func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath stri // TODO: does that defer mean to remove the whole blockstore? defer bufferedDS.Remove(ctx, c) //nolint:errcheck - ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any) + ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) // entire DAG selector allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(), From 1c1d23d14273dbdca95b6a03537635799ad33c0e Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 23 Sep 2020 12:36:15 -0700 Subject: [PATCH 24/25] fix out-of-bounds when loading all sector infos fixes #3972 --- chain/actors/builtin/miner/v0.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chain/actors/builtin/miner/v0.go b/chain/actors/builtin/miner/v0.go index e515b9ed6..9cdfc25bc 100644 --- a/chain/actors/builtin/miner/v0.go +++ b/chain/actors/builtin/miner/v0.go @@ -186,9 +186,9 @@ func (s *state0) LoadSectors(snos *bitfield.BitField) ([]*SectorOnChainInfo, err if snos == nil { infos := make([]*SectorOnChainInfo, 0, sectors.Length()) var info0 miner0.SectorOnChainInfo - if err := sectors.ForEach(&info0, func(i int64) error { + if err := sectors.ForEach(&info0, func(_ int64) error { info := fromV0SectorOnChainInfo(info0) - infos[i] = &info + infos = append(infos, &info) return nil }); err != nil { return nil, err From a1281273bc20b7911ee7052936a8fda8f2eda776 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 23 Sep 2020 22:10:02 +0200 Subject: [PATCH 25/25] shed dealtracker: fix lint, env var filter --- cmd/lotus-shed/dealtracker.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/cmd/lotus-shed/dealtracker.go b/cmd/lotus-shed/dealtracker.go index 18dc959f7..d39f51bd1 100644 --- a/cmd/lotus-shed/dealtracker.go +++ b/cmd/lotus-shed/dealtracker.go @@ -5,6 +5,8 @@ import ( "encoding/json" "net" "net/http" + "os" + "strings" "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/api" @@ -20,8 +22,15 @@ type dealStatsServer struct { var filteredClients map[address.Address]bool func init() { + fc := []string{"t0112", "t0113", "t0114", "t010089"} + + filtered, set := os.LookupEnv("FILTERED_CLIENTS") + if set { + fc = strings.Split(filtered, ":") + } + filteredClients = make(map[address.Address]bool) - for _, a := range []string{"t0112", "t0113", "t0114", "t010089"} { + for _, a := range fc { addr, err := address.NewFromString(a) if err != nil { panic(err) @@ -35,16 +44,6 @@ type dealCountResp struct { Epoch int64 `json:"epoch"` } -func filterDeals(deals map[string]api.MarketDeal) []*api.MarketDeal { - out := make([]*api.MarketDeal, 0, len(deals)) - for _, d := range deals { - if !filteredClients[d.Proposal.Client] { - out = append(out, &d) - } - } - return out -} - func (dss *dealStatsServer) handleStorageDealCount(w http.ResponseWriter, r *http.Request) { ctx := context.Background() @@ -251,15 +250,16 @@ var serveDealStatsCmd = &cli.Command{ go func() { <-ctx.Done() - s.Shutdown(context.TODO()) + if err := s.Shutdown(context.TODO()); err != nil { + log.Error(err) + } }() - list, err := net.Listen("tcp", ":7272") + list, err := net.Listen("tcp", ":7272") // nolint if err != nil { panic(err) } - s.Serve(list) - return nil + return s.Serve(list) }, }