Merge pull request #7737 from filecoin-project/feat/opt-ds-pr

dagstore pieceReader: Always read full in ReadAt
This commit is contained in:
Łukasz Magiera 2021-12-06 23:14:21 +01:00 committed by GitHub
commit 9ccd4ee240
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 88 additions and 11 deletions

4
go.mod
View File

@ -36,7 +36,7 @@ require (
github.com/filecoin-project/go-data-transfer v1.11.4
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.13.3
github.com/filecoin-project/go-fil-markets v1.13.4
github.com/filecoin-project/go-jsonrpc v0.1.5
github.com/filecoin-project/go-padreader v0.0.1
github.com/filecoin-project/go-paramfetch v0.0.2
@ -98,7 +98,7 @@ require (
github.com/ipfs/go-unixfs v0.2.6
github.com/ipfs/interface-go-ipfs-core v0.4.0
github.com/ipld/go-car v0.3.2-0.20211001225732-32d0d9933823
github.com/ipld/go-car/v2 v2.0.3-0.20210811121346-c514a30114d7
github.com/ipld/go-car/v2 v2.1.0
github.com/ipld/go-codec-dagpb v1.3.0
github.com/ipld/go-ipld-prime v0.12.3
github.com/ipld/go-ipld-selector-text-lite v0.0.1

11
go.sum
View File

@ -338,8 +338,8 @@ github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo=
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8=
github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
github.com/filecoin-project/go-fil-markets v1.13.3 h1:iMCpG7I4fb+YLcgDnMaqZiZiyFZWNvrwHqiFPHB0/tQ=
github.com/filecoin-project/go-fil-markets v1.13.3/go.mod h1:38zuj8AgDvOfdakFLpC/syYIYgXTzkq7xqBJ6T1AuG4=
github.com/filecoin-project/go-fil-markets v1.13.4 h1:NAu+ACelR2mYsj+yJ4iLu8FGqWK50OnU5VF8axkLsSc=
github.com/filecoin-project/go-fil-markets v1.13.4/go.mod h1:aANjXD2XMHWnT2zWpyGWLsWLC24C4mHm0gRm85OpPWE=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=
@ -865,8 +865,8 @@ github.com/ipld/go-car v0.3.2-0.20211001225732-32d0d9933823 h1:8JMSJ0k71fU9lIUrp
github.com/ipld/go-car v0.3.2-0.20211001225732-32d0d9933823/go.mod h1:jSlTph+i/q1jLFoiKKeN69KGG0fXpwrcD0izu5C1Tpo=
github.com/ipld/go-car/v2 v2.0.0-beta1.0.20210721090610-5a9d1b217d25/go.mod h1:I2ACeeg6XNBe5pdh5TaR7Ambhfa7If9KXxmXgZsYENU=
github.com/ipld/go-car/v2 v2.0.2/go.mod h1:I2ACeeg6XNBe5pdh5TaR7Ambhfa7If9KXxmXgZsYENU=
github.com/ipld/go-car/v2 v2.0.3-0.20210811121346-c514a30114d7 h1:6Z0beJSZNsRY+7udoqUl4gQ/tqtrPuRvDySrlsvbqZA=
github.com/ipld/go-car/v2 v2.0.3-0.20210811121346-c514a30114d7/go.mod h1:I2ACeeg6XNBe5pdh5TaR7Ambhfa7If9KXxmXgZsYENU=
github.com/ipld/go-car/v2 v2.1.0 h1:t8R/WXUSkfu1K1gpPk76mytCxsEdMjGcMIgpOq3/Cnw=
github.com/ipld/go-car/v2 v2.1.0/go.mod h1:Xr6GwkDhv8dtOtgHzOynAkIOg0t0YiPc5DxBPppWqZA=
github.com/ipld/go-codec-dagpb v1.2.0/go.mod h1:6nBN7X7h8EOsEejZGqC7tej5drsdBAXbMHyBT+Fne5s=
github.com/ipld/go-codec-dagpb v1.3.0 h1:czTcaoAuNNyIYWs6Qe01DJ+sEX7B+1Z0LcXjSatMGe8=
github.com/ipld/go-codec-dagpb v1.3.0/go.mod h1:ga4JTU3abYApDC3pZ00BC2RSvC3qfBb9MSJkMLSwnhA=
@ -1482,8 +1482,9 @@ github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPw
github.com/multiformats/go-multicodec v0.2.0/go.mod h1:/y4YVwkfMyry5kFbMTbLJKErhycTIftytRV+llXdyS4=
github.com/multiformats/go-multicodec v0.2.1-0.20210713081508-b421db6850ae/go.mod h1:qGGaQmioCDh+TeFOnxrbU0DaIPw8yFgAZgFG0V7p1qQ=
github.com/multiformats/go-multicodec v0.2.1-0.20210714093213-b2b5bd6fe68b/go.mod h1:qGGaQmioCDh+TeFOnxrbU0DaIPw8yFgAZgFG0V7p1qQ=
github.com/multiformats/go-multicodec v0.3.0 h1:tstDwfIjiHbnIjeM5Lp+pMrSeN+LCMsEwOrkPmWm03A=
github.com/multiformats/go-multicodec v0.3.0/go.mod h1:qGGaQmioCDh+TeFOnxrbU0DaIPw8yFgAZgFG0V7p1qQ=
github.com/multiformats/go-multicodec v0.3.1-0.20210902112759-1539a079fd61 h1:ZrUuMKNgJ52qHPoQ+bx0h0uBfcWmN7Px+4uKSZeesiI=
github.com/multiformats/go-multicodec v0.3.1-0.20210902112759-1539a079fd61/go.mod h1:1Hj/eHRaVWSXiSNNfcEPcwZleTmdNP81xlxDLnWU9GQ=
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po=
github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=

View File

@ -30,7 +30,6 @@ func TestLotusMount(t *testing.T) {
mockLotusMountAPI := mock_dagstore.NewMockMinerAPI(mockCtrl)
mockLotusMountAPI.EXPECT().IsUnsealed(gomock.Any(), cid).Return(true, nil).Times(1)
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1)
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid, uint64(0)).Return(ioutil.NopCloser(strings.NewReader("testing")), abi.UnpaddedPieceSize(7), nil).Times(1)
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(ctx, cid).Return(uint64(100), nil).Times(1)

View File

@ -1,19 +1,23 @@
package dagstore
import (
"bufio"
"context"
"io"
"github.com/ipfs/go-cid"
"go.opencensus.io/stats"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/metrics"
)
// For small read skips, it's faster to "burn" some bytes than to setup new sector reader.
// Assuming 1ms stream seek latency, and 1G/s stream rate, we're willing to discard up to 1 MiB.
var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M
var ReadBuf = 128 * (127 * 8) // unpadded(128k)
type pieceReader struct {
ctx context.Context
@ -25,15 +29,19 @@ type pieceReader struct {
seqAt int64 // next byte to be read by io.Reader
r io.ReadCloser
br *bufio.Reader
rAt int64
}
func (p *pieceReader) init() (_ *pieceReader, err error) {
stats.Record(p.ctx, metrics.DagStorePRInitCount.M(1))
p.rAt = 0
p.r, p.len, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
if err != nil {
return nil, err
}
p.br = bufio.NewReaderSize(p.r, ReadBuf)
return p, nil
}
@ -95,6 +103,8 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
return 0, err
}
stats.Record(p.ctx, metrics.DagStorePRBytesRequested.M(int64(len(b))))
// 1. Get the backing reader into the correct position
// if the backing reader is ahead of the offset we want, or more than
@ -105,12 +115,20 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
return 0, xerrors.Errorf("closing backing reader: %w", err)
}
p.r = nil
p.br = nil
}
log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt)
log.Debugw("pieceReader new stream", "piece", p.pieceCid, "at", p.rAt, "off", off-p.rAt, "n", len(b))
if off > p.rAt {
stats.Record(p.ctx, metrics.DagStorePRSeekForwardBytes.M(off-p.rAt), metrics.DagStorePRSeekForwardCount.M(1))
} else {
stats.Record(p.ctx, metrics.DagStorePRSeekBackBytes.M(p.rAt-off), metrics.DagStorePRSeekBackCount.M(1))
}
p.rAt = off
p.r, _, err = p.api.FetchUnsealedPiece(p.ctx, p.pieceCid, uint64(p.rAt))
p.br = bufio.NewReaderSize(p.r, ReadBuf)
if err != nil {
return 0, xerrors.Errorf("getting backing reader: %w", err)
}
@ -118,7 +136,9 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
// 2. Check if we need to burn some bytes
if off > p.rAt {
n, err := io.CopyN(io.Discard, p.r, off-p.rAt)
stats.Record(p.ctx, metrics.DagStorePRBytesDiscarded.M(off-p.rAt), metrics.DagStorePRDiscardCount.M(1))
n, err := io.CopyN(io.Discard, p.br, off-p.rAt)
p.rAt += n
if err != nil {
return 0, xerrors.Errorf("discarding read gap: %w", err)
@ -131,7 +151,14 @@ func (p *pieceReader) ReadAt(b []byte, off int64) (n int, err error) {
}
// 4. Read!
n, err = p.r.Read(b)
n, err = io.ReadFull(p.br, b)
if n < len(b) {
log.Debugw("pieceReader short read", "piece", p.pieceCid, "at", p.rAt, "toEnd", int64(p.len)-p.rAt, "n", len(b), "read", n, "err", err)
}
if err == io.ErrUnexpectedEOF {
err = io.EOF
}
p.rAt += int64(n)
return n, err
}

View File

@ -128,6 +128,15 @@ var (
StorageLimitUsedBytes = stats.Int64("storage/path_limit_used_bytes", "used optional storage limit bytes", stats.UnitBytes)
StorageLimitMaxBytes = stats.Int64("storage/path_limit_max_bytes", "optional storage limit", stats.UnitBytes)
DagStorePRInitCount = stats.Int64("dagstore/pr_init_count", "PieceReader init count", stats.UnitDimensionless)
DagStorePRBytesRequested = stats.Int64("dagstore/pr_requested_bytes", "PieceReader requested bytes", stats.UnitBytes)
DagStorePRBytesDiscarded = stats.Int64("dagstore/pr_discarded_bytes", "PieceReader discarded bytes", stats.UnitBytes)
DagStorePRDiscardCount = stats.Int64("dagstore/pr_discard_count", "PieceReader discard count", stats.UnitDimensionless)
DagStorePRSeekBackCount = stats.Int64("dagstore/pr_seek_back_count", "PieceReader seek back count", stats.UnitDimensionless)
DagStorePRSeekForwardCount = stats.Int64("dagstore/pr_seek_forward_count", "PieceReader seek forward count", stats.UnitDimensionless)
DagStorePRSeekBackBytes = stats.Int64("dagstore/pr_seek_back_bytes", "PieceReader seek back bytes", stats.UnitBytes)
DagStorePRSeekForwardBytes = stats.Int64("dagstore/pr_seek_forward_bytes", "PieceReader seek forward bytes", stats.UnitBytes)
// splitstore
SplitstoreMiss = stats.Int64("splitstore/miss", "Number of misses in hotstre access", stats.UnitDimensionless)
SplitstoreCompactionTimeSeconds = stats.Float64("splitstore/compaction_time", "Compaction time in seconds", stats.UnitSeconds)
@ -383,6 +392,39 @@ var (
TagKeys: []tag.Key{StorageID},
}
DagStorePRInitCountView = &view.View{
Measure: DagStorePRInitCount,
Aggregation: view.Count(),
}
DagStorePRBytesRequestedView = &view.View{
Measure: DagStorePRBytesRequested,
Aggregation: view.Sum(),
}
DagStorePRBytesDiscardedView = &view.View{
Measure: DagStorePRBytesDiscarded,
Aggregation: view.Sum(),
}
DagStorePRDiscardCountView = &view.View{
Measure: DagStorePRDiscardCount,
Aggregation: view.Count(),
}
DagStorePRSeekBackCountView = &view.View{
Measure: DagStorePRSeekBackCount,
Aggregation: view.Count(),
}
DagStorePRSeekForwardCountView = &view.View{
Measure: DagStorePRSeekForwardCount,
Aggregation: view.Count(),
}
DagStorePRSeekBackBytesView = &view.View{
Measure: DagStorePRSeekBackBytes,
Aggregation: view.Sum(),
}
DagStorePRSeekForwardBytesView = &view.View{
Measure: DagStorePRSeekForwardBytes,
Aggregation: view.Sum(),
}
// splitstore
SplitstoreMissView = &view.View{
Measure: SplitstoreMiss,
@ -539,6 +581,14 @@ var MinerNodeViews = append([]*view.View{
StorageReservedBytesView,
StorageLimitUsedBytesView,
StorageLimitMaxBytesView,
DagStorePRInitCountView,
DagStorePRBytesRequestedView,
DagStorePRBytesDiscardedView,
DagStorePRDiscardCountView,
DagStorePRSeekBackCountView,
DagStorePRSeekForwardCountView,
DagStorePRSeekBackBytesView,
DagStorePRSeekForwardBytesView,
}, DefaultViews...)
// SinceInMilliseconds returns the duration of time since the provide time as a float64.