diff --git a/lib/commp/writer.go b/lib/commp/writer.go new file mode 100644 index 000000000..4c5e3350c --- /dev/null +++ b/lib/commp/writer.go @@ -0,0 +1,113 @@ +package commp + +import ( + "bytes" + "math/bits" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + ffi "github.com/filecoin-project/filecoin-ffi" + "github.com/filecoin-project/go-padreader" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/extern/sector-storage/zerocomm" +) + +const commPBufPad = abi.PaddedPieceSize(8 << 20) +const CommPBuf = abi.UnpaddedPieceSize(commPBufPad - (commPBufPad / 128)) // can't use .Unpadded() for const + +type Writer struct { + len int64 + buf [CommPBuf]byte + leaves []cid.Cid +} + +func (w *Writer) Write(p []byte) (int, error) { + n := len(p) + for len(p) > 0 { + buffered := int(w.len % int64(len(w.buf))) + toBuffer := len(w.buf) - buffered + if toBuffer > len(p) { + toBuffer = len(p) + } + + copied := copy(w.buf[buffered:], p[:toBuffer]) + p = p[copied:] + w.len += int64(copied) + + if copied > 0 && w.len%int64(len(w.buf)) == 0 { + leaf, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, bytes.NewReader(w.buf[:]), CommPBuf) + if err != nil { + return 0, err + } + w.leaves = append(w.leaves, leaf) + } + } + return n, nil +} + +func (w *Writer) Sum() (api.DataCIDSize, error) { + // process last non-zero leaf if exists + lastLen := w.len % int64(len(w.buf)) + rawLen := w.len + + // process remaining bit of data + if lastLen != 0 { + if len(w.leaves) != 0 { + copy(w.buf[lastLen:], make([]byte, int(int64(CommPBuf)-lastLen))) + lastLen = int64(CommPBuf) + } + + r, sz := padreader.New(bytes.NewReader(w.buf[:lastLen]), uint64(lastLen)) + p, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, r, sz) + if err != nil { + return api.DataCIDSize{}, err + } + + if sz < CommPBuf { // special case for pieces smaller than 16MiB + return api.DataCIDSize{ + PayloadSize: w.len, + PieceSize: sz.Padded(), + PieceCID: p, + }, nil + } + + w.leaves = append(w.leaves, p) + } + + // pad with zero pieces to power-of-two size + fillerLeaves := (1 << (bits.Len(uint(len(w.leaves) - 1)))) - len(w.leaves) + for i := 0; i < fillerLeaves; i++ { + w.leaves = append(w.leaves, zerocomm.ZeroPieceCommitment(CommPBuf)) + } + + if len(w.leaves) == 1 { + return api.DataCIDSize{ + PayloadSize: rawLen, + PieceSize: abi.PaddedPieceSize(len(w.leaves)) * commPBufPad, + PieceCID: w.leaves[0], + }, nil + } + + pieces := make([]abi.PieceInfo, len(w.leaves)) + for i, leaf := range w.leaves { + pieces[i] = abi.PieceInfo{ + Size: commPBufPad, + PieceCID: leaf, + } + } + + p, err := ffi.GenerateUnsealedCID(abi.RegisteredSealProof_StackedDrg32GiBV1, pieces) + if err != nil { + return api.DataCIDSize{}, xerrors.Errorf("generating unsealed CID: %w", err) + } + + return api.DataCIDSize{ + PayloadSize: rawLen, + PieceSize: abi.PaddedPieceSize(len(w.leaves)) * commPBufPad, + PieceCID: p, + }, nil +} diff --git a/lib/commp/writer_test.go b/lib/commp/writer_test.go new file mode 100644 index 000000000..284648e4e --- /dev/null +++ b/lib/commp/writer_test.go @@ -0,0 +1,88 @@ +package commp + +import ( + "bytes" + "crypto/rand" + "fmt" + "io" + "io/ioutil" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-padreader" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" + "github.com/filecoin-project/lotus/extern/sector-storage/zerocomm" +) + +func TestWriterZero(t *testing.T) { + for i, s := range []struct { + writes []int + expect abi.PaddedPieceSize + }{ + {writes: []int{200}, expect: 256}, + {writes: []int{200, 200}, expect: 512}, + + {writes: []int{int(CommPBuf)}, expect: commPBufPad}, + {writes: []int{int(CommPBuf) * 2}, expect: 2 * commPBufPad}, + {writes: []int{int(CommPBuf), int(CommPBuf), int(CommPBuf)}, expect: 4 * commPBufPad}, + {writes: []int{int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf)}, expect: 16 * commPBufPad}, + + {writes: []int{200, int(CommPBuf)}, expect: 2 * commPBufPad}, + } { + s := s + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + w := &Writer{} + var rawSum int64 + for _, write := range s.writes { + rawSum += int64(write) + _, err := w.Write(make([]byte, write)) + require.NoError(t, err) + } + + p, err := w.Sum() + require.NoError(t, err) + require.Equal(t, rawSum, p.PayloadSize) + require.Equal(t, s.expect, p.PieceSize) + require.Equal(t, zerocomm.ZeroPieceCommitment(s.expect.Unpadded()).String(), p.PieceCID.String()) + }) + } +} + +func TestWriterData(t *testing.T) { + dataLen := float64(CommPBuf) * 6.78 + data, _ := ioutil.ReadAll(io.LimitReader(rand.Reader, int64(dataLen))) + + pr, sz := padreader.New(bytes.NewReader(data), uint64(dataLen)) + exp, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, pr, sz) + require.NoError(t, err) + + w := &Writer{} + _, err = io.Copy(w, bytes.NewReader(data)) + require.NoError(t, err) + + res, err := w.Sum() + require.NoError(t, err) + + require.Equal(t, exp.String(), res.PieceCID.String()) +} + +func BenchmarkWriterZero(b *testing.B) { + buf := make([]byte, int(CommPBuf)*b.N) + b.SetBytes(int64(CommPBuf)) + b.ResetTimer() + + w := &Writer{} + + _, err := w.Write(buf) + require.NoError(b, err) + o, err := w.Sum() + + b.StopTimer() + + require.NoError(b, err) + require.Equal(b, zerocomm.ZeroPieceCommitment(o.PieceSize.Unpadded()).String(), o.PieceCID.String()) + require.Equal(b, int64(CommPBuf)*int64(b.N), o.PayloadSize) +} diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 63158e581..aa6a81dad 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -2,18 +2,16 @@ package client import ( "bufio" - "bytes" "context" "fmt" "io" - "math/bits" "os" - "github.com/filecoin-project/go-state-types/dline" - - "github.com/filecoin-project/go-state-types/big" "golang.org/x/xerrors" + "github.com/filecoin-project/go-padreader" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/go-state-types/dline" "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil" @@ -34,7 +32,6 @@ import ( mh "github.com/multiformats/go-multihash" "go.uber.org/fx" - ffi "github.com/filecoin-project/filecoin-ffi" "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/discovery" @@ -44,17 +41,16 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-multistore" - "github.com/filecoin-project/go-padreader" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" - "github.com/filecoin-project/lotus/extern/sector-storage/zerocomm" marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/commp" "github.com/filecoin-project/lotus/markets/utils" "github.com/filecoin-project/lotus/node/impl/full" "github.com/filecoin-project/lotus/node/impl/paych" @@ -714,107 +710,11 @@ func (a *API) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, e }, nil } -const commPBufPad = abi.PaddedPieceSize(8 << 20) -const commPBuf = abi.UnpaddedPieceSize(commPBufPad - (commPBufPad / 128)) // can't use .Unpadded() for const - -type commPWriter struct { - len int64 - buf [commPBuf]byte - leaves []cid.Cid -} - -func (w *commPWriter) Write(p []byte) (int, error) { - n := len(p) - for len(p) > 0 { - buffered := int(w.len % int64(len(w.buf))) - toBuffer := len(w.buf) - buffered - if toBuffer > len(p) { - toBuffer = len(p) - } - - copied := copy(w.buf[buffered:], p[:toBuffer]) - p = p[copied:] - w.len += int64(copied) - - if copied > 0 && w.len%int64(len(w.buf)) == 0 { - leaf, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, bytes.NewReader(w.buf[:]), commPBuf) - if err != nil { - return 0, err - } - w.leaves = append(w.leaves, leaf) - } - } - return n, nil -} - -func (w *commPWriter) Sum() (api.DataCIDSize, error) { - // process last non-zero leaf if exists - lastLen := w.len % int64(len(w.buf)) - rawLen := w.len - - // process remaining bit of data - if lastLen != 0 { - if len(w.leaves) != 0 { - copy(w.buf[lastLen:], make([]byte, int(int64(commPBuf)-lastLen))) - lastLen = int64(commPBuf) - } - - r, sz := padreader.New(bytes.NewReader(w.buf[:lastLen]), uint64(lastLen)) - p, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, r, sz) - if err != nil { - return api.DataCIDSize{}, err - } - - if sz < commPBuf { // special case for pieces smaller than 16MiB - return api.DataCIDSize{ - PayloadSize: w.len, - PieceSize: sz.Padded(), - PieceCID: p, - }, nil - } - - w.leaves = append(w.leaves, p) - } - - // pad with zero pieces to power-of-two size - fillerLeaves := (1 << (bits.Len(uint(len(w.leaves) - 1)))) - len(w.leaves) - for i := 0; i < fillerLeaves; i++ { - w.leaves = append(w.leaves, zerocomm.ZeroPieceCommitment(commPBuf)) - } - - if len(w.leaves) == 1 { - return api.DataCIDSize{ - PayloadSize: rawLen, - PieceSize: abi.PaddedPieceSize(len(w.leaves)) * commPBufPad, - PieceCID: w.leaves[0], - }, nil - } - - pieces := make([]abi.PieceInfo, len(w.leaves)) - for i, leaf := range w.leaves { - pieces[i] = abi.PieceInfo{ - Size: commPBufPad, - PieceCID: leaf, - } - } - - p, err := ffi.GenerateUnsealedCID(abi.RegisteredSealProof_StackedDrg32GiBV1, pieces) - if err != nil { - return api.DataCIDSize{}, xerrors.Errorf("generating unsealed CID: %w", err) - } - - return api.DataCIDSize{ - PayloadSize: rawLen, - PieceSize: abi.PaddedPieceSize(len(w.leaves)) * commPBufPad, - PieceCID: p, - }, nil -} - func (a *API) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) { dag := merkledag.NewDAGService(blockservice.New(a.CombinedBstore, offline.Exchange(a.CombinedBstore))) - w := &commPWriter{} - bw := bufio.NewWriterSize(w, int(commPBuf)) + w := &commp.Writer{} + bw := bufio.NewWriterSize(w, int(commp.CommPBuf)) err := car.WriteCar(ctx, dag, []cid.Cid{root}, w) if err != nil { diff --git a/node/impl/client/client_test.go b/node/impl/client/client_test.go index 00e7b828c..da13c8ef3 100644 --- a/node/impl/client/client_test.go +++ b/node/impl/client/client_test.go @@ -1,88 +1 @@ package client - -import ( - "bytes" - "crypto/rand" - "fmt" - "io" - "io/ioutil" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/filecoin-project/go-padreader" - "github.com/filecoin-project/go-state-types/abi" - - "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" - "github.com/filecoin-project/lotus/extern/sector-storage/zerocomm" -) - -func TestClientDealPieceCIDZero(t *testing.T) { - for i, s := range []struct { - writes []int - expect abi.PaddedPieceSize - }{ - {writes: []int{200}, expect: 256}, - {writes: []int{200, 200}, expect: 512}, - - {writes: []int{int(commPBuf)}, expect: commPBufPad}, - {writes: []int{int(commPBuf) * 2}, expect: 2 * commPBufPad}, - {writes: []int{int(commPBuf), int(commPBuf), int(commPBuf)}, expect: 4 * commPBufPad}, - {writes: []int{int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf), int(commPBuf)}, expect: 16 * commPBufPad}, - - {writes: []int{200, int(commPBuf)}, expect: 2 * commPBufPad}, - } { - s := s - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - w := &commPWriter{} - var rawSum int64 - for _, write := range s.writes { - rawSum += int64(write) - _, err := w.Write(make([]byte, write)) - require.NoError(t, err) - } - - p, err := w.Sum() - require.NoError(t, err) - require.Equal(t, rawSum, p.PayloadSize) - require.Equal(t, s.expect, p.PieceSize) - require.Equal(t, zerocomm.ZeroPieceCommitment(s.expect.Unpadded()).String(), p.PieceCID.String()) - }) - } -} - -func TestClientDealPieceCIDData(t *testing.T) { - dataLen := float64(commPBuf) * 6.78 - data, _ := ioutil.ReadAll(io.LimitReader(rand.Reader, int64(dataLen))) - - pr, sz := padreader.New(bytes.NewReader(data), uint64(dataLen)) - exp, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, pr, sz) - require.NoError(t, err) - - w := &commPWriter{} - _, err = io.Copy(w, bytes.NewReader(data)) - require.NoError(t, err) - - res, err := w.Sum() - require.NoError(t, err) - - require.Equal(t, exp.String(), res.PieceCID.String()) -} - -func BenchmarkClientDealPieceCIDZero(b *testing.B) { - buf := make([]byte, int(commPBuf)*b.N) - b.SetBytes(int64(commPBuf)) - b.ResetTimer() - - w := &commPWriter{} - - _, err := w.Write(buf) - require.NoError(b, err) - o, err := w.Sum() - - b.StopTimer() - - require.NoError(b, err) - require.Equal(b, zerocomm.ZeroPieceCommitment(o.PieceSize.Unpadded()).String(), o.PieceCID.String()) - require.Equal(b, int64(commPBuf)*int64(b.N), o.PayloadSize) -}