From dad91901428f1bec885fbd56147ab1c6255986cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 3 Dec 2021 17:07:14 +0100 Subject: [PATCH 1/4] Add metrics to dagstore piecereader --- markets/dagstore/piecereader.go | 14 +++++++++ metrics/metrics.go | 50 +++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/markets/dagstore/piecereader.go b/markets/dagstore/piecereader.go index 6ef69dfbe..f9ba881f5 100644 --- a/markets/dagstore/piecereader.go +++ b/markets/dagstore/piecereader.go @@ -2,6 +2,8 @@ package dagstore import ( "context" + "github.com/filecoin-project/lotus/metrics" + "go.opencensus.io/stats" "io" "github.com/ipfs/go-cid" @@ -29,6 +31,8 @@ type pieceReader struct { } func (p *pieceReader) init() (_ *pieceReader, err error) { + stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1)) + p.rAt = 0 p.r, p.len, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt)) if err != nil { @@ -95,6 +99,8 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { return 0, err } + stats.Record(p.ctx, metrics.DagStorePRBytesRequested.M(int64(len(b)))) + // 1. Get the backing reader into the correct position // if the backing reader is ahead of the offset we want, or more than @@ -109,6 +115,12 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt) + if off > p.rAt { + stats.Record(p.ctx, metrics.DagStorePRSeekForwardBytes.M(off-p.rAt), metrics.DagStorePRSeekForwardCount.M(1)) + } else { + stats.Record(p.ctx, metrics.DagStorePRSeekBackBytes.M(p.rAt-off), metrics.DagStorePRSeekBackCount.M(1)) + } + p.rAt = off p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt)) if err != nil { @@ -118,6 +130,8 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { // 2. Check if we need to burn some bytes if off > p.rAt { + stats.Record(p.ctx, metrics.DagStorePRBytesDiscarded.M(off-p.rAt), metrics.DagStorePRDiscardCount.M(1)) + n, err := io.CopyN(io.Discard, p.r, off-p.rAt) p.rAt += n if err != nil { diff --git a/metrics/metrics.go b/metrics/metrics.go index ddd149d8d..b4032bb1d 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -128,6 +128,15 @@ var ( StorageLimitUsedBytes = stats.Int64("storage/path_limit_used_bytes", "used optional storage limit bytes", stats.UnitBytes) StorageLimitMaxBytes = stats.Int64("storage/path_limit_max_bytes", "optional storage limit", stats.UnitBytes) + DagStorePRInitCount = stats.Int64("dagstore/pr_init_count", "PieceReader init count", stats.UnitDimensionless) + DagStorePRBytesRequested = stats.Int64("dagstore/pr_requested_bytes", "PieceReader requested bytes", stats.UnitBytes) + DagStorePRBytesDiscarded = stats.Int64("dagstore/pr_discarded_bytes", "PieceReader discarded bytes", stats.UnitBytes) + DagStorePRDiscardCount = stats.Int64("dagstore/pr_discard_count", "PieceReader discard count", stats.UnitDimensionless) + DagStorePRSeekBackCount = stats.Int64("dagstore/pr_seek_back_count", "PieceReader seek back count", stats.UnitDimensionless) + DagStorePRSeekForwardCount = stats.Int64("dagstore/pr_seek_forward_count", "PieceReader seek forward count", stats.UnitDimensionless) + DagStorePRSeekBackBytes = stats.Int64("dagstore/pr_seek_back_bytes", "PieceReader seek back bytes", stats.UnitBytes) + DagStorePRSeekForwardBytes = stats.Int64("dagstore/pr_seek_forward_bytes", "PieceReader seek forward bytes", stats.UnitBytes) + // splitstore SplitstoreMiss = stats.Int64("splitstore/miss", "Number of misses in hotstre access", stats.UnitDimensionless) SplitstoreCompactionTimeSeconds = stats.Float64("splitstore/compaction_time", "Compaction time in seconds", stats.UnitSeconds) @@ -383,6 +392,39 @@ var ( TagKeys: []tag.Key{StorageID}, } + DagStorePRInitCountView = &view.View{ + Measure: DagStorePRInitCount, + Aggregation: view.Count(), + } + DagStorePRBytesRequestedView = &view.View{ + Measure: DagStorePRBytesRequested, + Aggregation: view.Sum(), + } + DagStorePRBytesDiscardedView = &view.View{ + Measure: DagStorePRBytesDiscarded, + Aggregation: view.Sum(), + } + DagStorePRDiscardCountView = &view.View{ + Measure: DagStorePRDiscardCount, + Aggregation: view.Count(), + } + DagStorePRSeekBackCountView = &view.View{ + Measure: DagStorePRSeekBackCount, + Aggregation: view.Count(), + } + DagStorePRSeekForwardCountView = &view.View{ + Measure: DagStorePRSeekForwardCount, + Aggregation: view.Count(), + } + DagStorePRSeekBackBytesView = &view.View{ + Measure: DagStorePRSeekBackBytes, + Aggregation: view.Sum(), + } + DagStorePRSeekForwardBytesView = &view.View{ + Measure: DagStorePRSeekForwardBytes, + Aggregation: view.Sum(), + } + // splitstore SplitstoreMissView = &view.View{ Measure: SplitstoreMiss, @@ -539,6 +581,14 @@ var MinerNodeViews = append([]*view.View{ StorageReservedBytesView, StorageLimitUsedBytesView, StorageLimitMaxBytesView, + DagStorePRInitCountView, + DagStorePRBytesRequestedView, + DagStorePRBytesDiscardedView, + DagStorePRDiscardCountView, + DagStorePRSeekBackCountView, + DagStorePRSeekForwardCountView, + DagStorePRSeekBackBytesView, + DagStorePRSeekForwardBytesView, }, DefaultViews...) // SinceInMilliseconds returns the duration of time since the provide time as a float64. From 3969d6b767f0282f71ec69bd2ae40a60b8edc106 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 3 Dec 2021 20:11:52 +0100 Subject: [PATCH 2/4] dagstore pieceReader: Always read full in ReadAt --- markets/dagstore/piecereader.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/markets/dagstore/piecereader.go b/markets/dagstore/piecereader.go index f9ba881f5..14a027bd1 100644 --- a/markets/dagstore/piecereader.go +++ b/markets/dagstore/piecereader.go @@ -1,21 +1,23 @@ package dagstore import ( + "bufio" "context" - "github.com/filecoin-project/lotus/metrics" - "go.opencensus.io/stats" "io" "github.com/ipfs/go-cid" + "go.opencensus.io/stats" "golang.org/x/xerrors" "github.com/filecoin-project/dagstore/mount" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/metrics" ) // For small read skips, it's faster to "burn" some bytes than to setup new sector reader. // Assuming 1ms stream seek latency, and 1G/s stream rate, we're willing to discard up to 1 MiB. var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M +var ReadBuf = 128 * (127 * 8) // unpadded(128k) type pieceReader struct { ctx context.Context @@ -27,6 +29,7 @@ type pieceReader struct { seqAt int64 // next byte to be read by io.Reader r io.ReadCloser + br *bufio.Reader rAt int64 } @@ -38,6 +41,7 @@ func (p *pieceReader) init() (_ *pieceReader, err error) { if err != nil { return nil, err } + p.br = bufio.NewReaderSize(p.r, ReadBuf) return p, nil } @@ -111,9 +115,10 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { return 0, xerrors.Errorf("closing backing reader: %w", err) } p.r = nil + p.br = nil } - log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt) + log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt, "n", len(b)) if off > p.rAt { stats.Record(p.ctx, metrics.DagStorePRSeekForwardBytes.M(off-p.rAt), metrics.DagStorePRSeekForwardCount.M(1)) @@ -123,6 +128,7 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { p.rAt = off p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt)) + p.br = bufio.NewReaderSize(p.r, ReadBuf) if err != nil { return 0, xerrors.Errorf("getting backing reader: %w", err) } @@ -132,7 +138,7 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { if off > p.rAt { stats.Record(p.ctx, metrics.DagStorePRBytesDiscarded.M(off-p.rAt), metrics.DagStorePRDiscardCount.M(1)) - n, err := io.CopyN(io.Discard, p.r, off-p.rAt) + n, err := io.CopyN(io.Discard, p.br, off-p.rAt) p.rAt += n if err != nil { return 0, xerrors.Errorf("discarding read gap: %w", err) @@ -145,7 +151,11 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { } // 4. Read! - n, err = p.r.Read(b) + n, err = io.ReadFull(p.br, b) + if n < len(b) { + log.Debugw("pieceReader short read", "piece", p.pieceCid, "at", p.rAt, "toEnd", int64(p.len)-p.rAt, "n", len(b), "read", n, "err", err) + } + p.rAt += int64(n) return n, err } From 9f6265e0b43b95d891b6cee283ebf9cfeb7d429c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 3 Dec 2021 23:36:36 +0100 Subject: [PATCH 3/4] dagstore pieceReader: Fix wrong ErrUnexpectedEOF return in ReadAt --- markets/dagstore/mount_test.go | 1 - markets/dagstore/piecereader.go | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/markets/dagstore/mount_test.go b/markets/dagstore/mount_test.go index ec5e8a086..820058786 100644 --- a/markets/dagstore/mount_test.go +++ b/markets/dagstore/mount_test.go @@ -30,7 +30,6 @@ func TestLotusMount(t *testing.T) { mockLotusMountAPI := mock_dagstore.NewMockMinerAPI(mockCtrl) mockLotusMountAPI.EXPECT().IsUnsealed(gomock.Any(), cid).Return(true, nil).Times(1) - mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1) mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1) mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1) diff --git a/markets/dagstore/piecereader.go b/markets/dagstore/piecereader.go index 14a027bd1..67cd10c27 100644 --- a/markets/dagstore/piecereader.go +++ b/markets/dagstore/piecereader.go @@ -155,6 +155,9 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) { if n < len(b) { log.Debugw("pieceReader short read", "piece", p.pieceCid, "at", p.rAt, "toEnd", int64(p.len)-p.rAt, "n", len(b), "read", n, "err", err) } + if err == io.ErrUnexpectedEOF { + err = io.EOF + } p.rAt += int64(n) return n, err From 84c48de5d9b0087fedcd6e8d6f15b324711d49c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 6 Dec 2021 15:38:45 +0100 Subject: [PATCH 4/4] CARv2 v2.1.0 --- go.mod | 4 ++-- go.sum | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index cf234250a..a9b0bee9d 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/filecoin-project/go-data-transfer v1.11.4 github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 - github.com/filecoin-project/go-fil-markets v1.13.3 + github.com/filecoin-project/go-fil-markets v1.13.4 github.com/filecoin-project/go-jsonrpc v0.1.5 github.com/filecoin-project/go-padreader v0.0.1 github.com/filecoin-project/go-paramfetch v0.0.2 @@ -98,7 +98,7 @@ require ( github.com/ipfs/go-unixfs v0.2.6 github.com/ipfs/interface-go-ipfs-core v0.4.0 github.com/ipld/go-car v0.3.2-0.20211001225732-32d0d9933823 - github.com/ipld/go-car/v2 v2.0.3-0.20210811121346-c514a30114d7 + github.com/ipld/go-car/v2 v2.1.0 github.com/ipld/go-codec-dagpb v1.3.0 github.com/ipld/go-ipld-prime v0.12.3 github.com/ipld/go-ipld-selector-text-lite v0.0.1 diff --git a/go.sum b/go.sum index 905a758d8..241b98887 100644 --- a/go.sum +++ b/go.sum @@ -338,8 +338,8 @@ github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+ github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo= github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8= github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= -github.com/filecoin-project/go-fil-markets v1.13.3 h1:iMCpG7I4fb+YLcgDnMaqZiZiyFZWNvrwHqiFPHB0/tQ= -github.com/filecoin-project/go-fil-markets v1.13.3/go.mod h1:38zuj8AgDvOfdakFLpC/syYIYgXTzkq7xqBJ6T1AuG4= +github.com/filecoin-project/go-fil-markets v1.13.4 h1:NAu+ACelR2mYsj+yJ4iLu8FGqWK50OnU5VF8axkLsSc= +github.com/filecoin-project/go-fil-markets v1.13.4/go.mod h1:aANjXD2XMHWnT2zWpyGWLsWLC24C4mHm0gRm85OpPWE= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM= @@ -865,8 +865,8 @@ github.com/ipld/go-car v0.3.2-0.20211001225732-32d0d9933823 h1:8JMSJ0k71fU9lIUrp github.com/ipld/go-car v0.3.2-0.20211001225732-32d0d9933823/go.mod h1:jSlTph+i/q1jLFoiKKeN69KGG0fXpwrcD0izu5C1Tpo= github.com/ipld/go-car/v2 v2.0.0-beta1.0.20210721090610-5a9d1b217d25/go.mod h1:I2ACeeg6XNBe5pdh5TaR7Ambhfa7If9KXxmXgZsYENU= github.com/ipld/go-car/v2 v2.0.2/go.mod h1:I2ACeeg6XNBe5pdh5TaR7Ambhfa7If9KXxmXgZsYENU= -github.com/ipld/go-car/v2 v2.0.3-0.20210811121346-c514a30114d7 h1:6Z0beJSZNsRY+7udoqUl4gQ/tqtrPuRvDySrlsvbqZA= -github.com/ipld/go-car/v2 v2.0.3-0.20210811121346-c514a30114d7/go.mod h1:I2ACeeg6XNBe5pdh5TaR7Ambhfa7If9KXxmXgZsYENU= +github.com/ipld/go-car/v2 v2.1.0 h1:t8R/WXUSkfu1K1gpPk76mytCxsEdMjGcMIgpOq3/Cnw= +github.com/ipld/go-car/v2 v2.1.0/go.mod h1:Xr6GwkDhv8dtOtgHzOynAkIOg0t0YiPc5DxBPppWqZA= github.com/ipld/go-codec-dagpb v1.2.0/go.mod h1:6nBN7X7h8EOsEejZGqC7tej5drsdBAXbMHyBT+Fne5s= github.com/ipld/go-codec-dagpb v1.3.0 h1:czTcaoAuNNyIYWs6Qe01DJ+sEX7B+1Z0LcXjSatMGe8= github.com/ipld/go-codec-dagpb v1.3.0/go.mod h1:ga4JTU3abYApDC3pZ00BC2RSvC3qfBb9MSJkMLSwnhA= @@ -1482,8 +1482,9 @@ github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPw github.com/multiformats/go-multicodec v0.2.0/go.mod h1:/y4YVwkfMyry5kFbMTbLJKErhycTIftytRV+llXdyS4= github.com/multiformats/go-multicodec v0.2.1-0.20210713081508-b421db6850ae/go.mod h1:qGGaQmioCDh+TeFOnxrbU0DaIPw8yFgAZgFG0V7p1qQ= github.com/multiformats/go-multicodec v0.2.1-0.20210714093213-b2b5bd6fe68b/go.mod h1:qGGaQmioCDh+TeFOnxrbU0DaIPw8yFgAZgFG0V7p1qQ= -github.com/multiformats/go-multicodec v0.3.0 h1:tstDwfIjiHbnIjeM5Lp+pMrSeN+LCMsEwOrkPmWm03A= github.com/multiformats/go-multicodec v0.3.0/go.mod h1:qGGaQmioCDh+TeFOnxrbU0DaIPw8yFgAZgFG0V7p1qQ= +github.com/multiformats/go-multicodec v0.3.1-0.20210902112759-1539a079fd61 h1:ZrUuMKNgJ52qHPoQ+bx0h0uBfcWmN7Px+4uKSZeesiI= +github.com/multiformats/go-multicodec v0.3.1-0.20210902112759-1539a079fd61/go.mod h1:1Hj/eHRaVWSXiSNNfcEPcwZleTmdNP81xlxDLnWU9GQ= github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U= github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po= github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=