Merge pull request #4535 from filecoin-project/feat/clientdeal-cache-commd
client deal: Cache CommD when creating multiple deals
This commit is contained in:
commit
8c60069bb8
@ -301,6 +301,8 @@ type FullNode interface {
|
||||
ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan marketevents.RetrievalEvent, error)
|
||||
// ClientQueryAsk returns a signed StorageAsk from the specified miner.
|
||||
ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error)
|
||||
// ClientCalcCommP calculates the CommP and data size of the specified CID
|
||||
ClientDealPieceCID(ctx context.Context, root cid.Cid) (DataCIDSize, error)
|
||||
// ClientCalcCommP calculates the CommP for a specified file
|
||||
ClientCalcCommP(ctx context.Context, inpath string) (*CommPRet, error)
|
||||
// ClientGenCar generates a CAR file for the specified file.
|
||||
@ -889,6 +891,12 @@ type DataSize struct {
|
||||
PieceSize abi.PaddedPieceSize
|
||||
}
|
||||
|
||||
type DataCIDSize struct {
|
||||
PayloadSize int64
|
||||
PieceSize abi.PaddedPieceSize
|
||||
PieceCID cid.Cid
|
||||
}
|
||||
|
||||
type CommPRet struct {
|
||||
Root cid.Cid
|
||||
Size abi.UnpaddedPieceSize
|
||||
|
@ -166,6 +166,7 @@ type FullNodeStruct struct {
|
||||
ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"`
|
||||
ClientRetrieveWithEvents func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) `perm:"admin"`
|
||||
ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) `perm:"read"`
|
||||
ClientDealPieceCID func(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) `perm:"read"`
|
||||
ClientCalcCommP func(ctx context.Context, inpath string) (*api.CommPRet, error) `perm:"read"`
|
||||
ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"`
|
||||
ClientDealSize func(ctx context.Context, root cid.Cid) (api.DataSize, error) `perm:"read"`
|
||||
@ -559,6 +560,11 @@ func (c *FullNodeStruct) ClientRetrieveWithEvents(ctx context.Context, order api
|
||||
func (c *FullNodeStruct) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.StorageAsk, error) {
|
||||
return c.Internal.ClientQueryAsk(ctx, p, miner)
|
||||
}
|
||||
|
||||
func (c *FullNodeStruct) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) {
|
||||
return c.Internal.ClientDealPieceCID(ctx, root)
|
||||
}
|
||||
|
||||
func (c *FullNodeStruct) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet, error) {
|
||||
return c.Internal.ClientCalcCommP(ctx, inpath)
|
||||
}
|
||||
|
@ -493,7 +493,7 @@ func interactiveDeal(cctx *cli.Context) error {
|
||||
var dur time.Duration
|
||||
var epochs abi.ChainEpoch
|
||||
var verified bool
|
||||
var ds lapi.DataSize
|
||||
var ds lapi.DataCIDSize
|
||||
|
||||
// find
|
||||
var candidateAsks []*storagemarket.StorageAsk
|
||||
@ -557,7 +557,7 @@ uiLoop:
|
||||
}
|
||||
|
||||
color.Blue(".. calculating data size\n")
|
||||
ds, err = api.ClientDealSize(ctx, data)
|
||||
ds, err = api.ClientDealPieceCID(ctx, data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -847,6 +847,9 @@ uiLoop:
|
||||
Data: &storagemarket.DataRef{
|
||||
TransferType: storagemarket.TTGraphsync,
|
||||
Root: data,
|
||||
|
||||
PieceCid: &ds.PieceCID,
|
||||
PieceSize: ds.PieceSize.Unpadded(),
|
||||
},
|
||||
Wallet: a,
|
||||
Miner: maddr,
|
||||
|
@ -34,6 +34,7 @@
|
||||
* [ClientCalcCommP](#ClientCalcCommP)
|
||||
* [ClientCancelDataTransfer](#ClientCancelDataTransfer)
|
||||
* [ClientDataTransferUpdates](#ClientDataTransferUpdates)
|
||||
* [ClientDealPieceCID](#ClientDealPieceCID)
|
||||
* [ClientDealSize](#ClientDealSize)
|
||||
* [ClientFindData](#ClientFindData)
|
||||
* [ClientGenCar](#ClientGenCar)
|
||||
@ -890,6 +891,32 @@ Response:
|
||||
}
|
||||
```
|
||||
|
||||
### ClientDealPieceCID
|
||||
ClientCalcCommP calculates the CommP and data size of the specified CID
|
||||
|
||||
|
||||
Perms: read
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
{
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"PayloadSize": 9,
|
||||
"PieceSize": 1032,
|
||||
"PieceCID": {
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### ClientDealSize
|
||||
ClientDealSize calculates real deal data size
|
||||
|
||||
|
113
lib/commp/writer.go
Normal file
113
lib/commp/writer.go
Normal file
@ -0,0 +1,113 @@
|
||||
package commp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"math/bits"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
ffi "github.com/filecoin-project/filecoin-ffi"
|
||||
"github.com/filecoin-project/go-padreader"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/zerocomm"
|
||||
)
|
||||
|
||||
const commPBufPad = abi.PaddedPieceSize(8 << 20)
|
||||
const CommPBuf = abi.UnpaddedPieceSize(commPBufPad - (commPBufPad / 128)) // can't use .Unpadded() for const
|
||||
|
||||
type Writer struct {
|
||||
len int64
|
||||
buf [CommPBuf]byte
|
||||
leaves []cid.Cid
|
||||
}
|
||||
|
||||
func (w *Writer) Write(p []byte) (int, error) {
|
||||
n := len(p)
|
||||
for len(p) > 0 {
|
||||
buffered := int(w.len % int64(len(w.buf)))
|
||||
toBuffer := len(w.buf) - buffered
|
||||
if toBuffer > len(p) {
|
||||
toBuffer = len(p)
|
||||
}
|
||||
|
||||
copied := copy(w.buf[buffered:], p[:toBuffer])
|
||||
p = p[copied:]
|
||||
w.len += int64(copied)
|
||||
|
||||
if copied > 0 && w.len%int64(len(w.buf)) == 0 {
|
||||
leaf, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, bytes.NewReader(w.buf[:]), CommPBuf)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
w.leaves = append(w.leaves, leaf)
|
||||
}
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (w *Writer) Sum() (api.DataCIDSize, error) {
|
||||
// process last non-zero leaf if exists
|
||||
lastLen := w.len % int64(len(w.buf))
|
||||
rawLen := w.len
|
||||
|
||||
// process remaining bit of data
|
||||
if lastLen != 0 {
|
||||
if len(w.leaves) != 0 {
|
||||
copy(w.buf[lastLen:], make([]byte, int(int64(CommPBuf)-lastLen)))
|
||||
lastLen = int64(CommPBuf)
|
||||
}
|
||||
|
||||
r, sz := padreader.New(bytes.NewReader(w.buf[:lastLen]), uint64(lastLen))
|
||||
p, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, r, sz)
|
||||
if err != nil {
|
||||
return api.DataCIDSize{}, err
|
||||
}
|
||||
|
||||
if sz < CommPBuf { // special case for pieces smaller than 16MiB
|
||||
return api.DataCIDSize{
|
||||
PayloadSize: w.len,
|
||||
PieceSize: sz.Padded(),
|
||||
PieceCID: p,
|
||||
}, nil
|
||||
}
|
||||
|
||||
w.leaves = append(w.leaves, p)
|
||||
}
|
||||
|
||||
// pad with zero pieces to power-of-two size
|
||||
fillerLeaves := (1 << (bits.Len(uint(len(w.leaves) - 1)))) - len(w.leaves)
|
||||
for i := 0; i < fillerLeaves; i++ {
|
||||
w.leaves = append(w.leaves, zerocomm.ZeroPieceCommitment(CommPBuf))
|
||||
}
|
||||
|
||||
if len(w.leaves) == 1 {
|
||||
return api.DataCIDSize{
|
||||
PayloadSize: rawLen,
|
||||
PieceSize: abi.PaddedPieceSize(len(w.leaves)) * commPBufPad,
|
||||
PieceCID: w.leaves[0],
|
||||
}, nil
|
||||
}
|
||||
|
||||
pieces := make([]abi.PieceInfo, len(w.leaves))
|
||||
for i, leaf := range w.leaves {
|
||||
pieces[i] = abi.PieceInfo{
|
||||
Size: commPBufPad,
|
||||
PieceCID: leaf,
|
||||
}
|
||||
}
|
||||
|
||||
p, err := ffi.GenerateUnsealedCID(abi.RegisteredSealProof_StackedDrg32GiBV1, pieces)
|
||||
if err != nil {
|
||||
return api.DataCIDSize{}, xerrors.Errorf("generating unsealed CID: %w", err)
|
||||
}
|
||||
|
||||
return api.DataCIDSize{
|
||||
PayloadSize: rawLen,
|
||||
PieceSize: abi.PaddedPieceSize(len(w.leaves)) * commPBufPad,
|
||||
PieceCID: p,
|
||||
}, nil
|
||||
}
|
88
lib/commp/writer_test.go
Normal file
88
lib/commp/writer_test.go
Normal file
@ -0,0 +1,88 @@
|
||||
package commp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-padreader"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/zerocomm"
|
||||
)
|
||||
|
||||
func TestWriterZero(t *testing.T) {
|
||||
for i, s := range []struct {
|
||||
writes []int
|
||||
expect abi.PaddedPieceSize
|
||||
}{
|
||||
{writes: []int{200}, expect: 256},
|
||||
{writes: []int{200, 200}, expect: 512},
|
||||
|
||||
{writes: []int{int(CommPBuf)}, expect: commPBufPad},
|
||||
{writes: []int{int(CommPBuf) * 2}, expect: 2 * commPBufPad},
|
||||
{writes: []int{int(CommPBuf), int(CommPBuf), int(CommPBuf)}, expect: 4 * commPBufPad},
|
||||
{writes: []int{int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf), int(CommPBuf)}, expect: 16 * commPBufPad},
|
||||
|
||||
{writes: []int{200, int(CommPBuf)}, expect: 2 * commPBufPad},
|
||||
} {
|
||||
s := s
|
||||
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
|
||||
w := &Writer{}
|
||||
var rawSum int64
|
||||
for _, write := range s.writes {
|
||||
rawSum += int64(write)
|
||||
_, err := w.Write(make([]byte, write))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
p, err := w.Sum()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, rawSum, p.PayloadSize)
|
||||
require.Equal(t, s.expect, p.PieceSize)
|
||||
require.Equal(t, zerocomm.ZeroPieceCommitment(s.expect.Unpadded()).String(), p.PieceCID.String())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriterData(t *testing.T) {
|
||||
dataLen := float64(CommPBuf) * 6.78
|
||||
data, _ := ioutil.ReadAll(io.LimitReader(rand.Reader, int64(dataLen)))
|
||||
|
||||
pr, sz := padreader.New(bytes.NewReader(data), uint64(dataLen))
|
||||
exp, err := ffiwrapper.GeneratePieceCIDFromFile(abi.RegisteredSealProof_StackedDrg32GiBV1, pr, sz)
|
||||
require.NoError(t, err)
|
||||
|
||||
w := &Writer{}
|
||||
_, err = io.Copy(w, bytes.NewReader(data))
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := w.Sum()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, exp.String(), res.PieceCID.String())
|
||||
}
|
||||
|
||||
func BenchmarkWriterZero(b *testing.B) {
|
||||
buf := make([]byte, int(CommPBuf)*b.N)
|
||||
b.SetBytes(int64(CommPBuf))
|
||||
b.ResetTimer()
|
||||
|
||||
w := &Writer{}
|
||||
|
||||
_, err := w.Write(buf)
|
||||
require.NoError(b, err)
|
||||
o, err := w.Sum()
|
||||
|
||||
b.StopTimer()
|
||||
|
||||
require.NoError(b, err)
|
||||
require.Equal(b, zerocomm.ZeroPieceCommitment(o.PieceSize.Unpadded()).String(), o.PieceCID.String())
|
||||
require.Equal(b, int64(CommPBuf)*int64(b.N), o.PayloadSize)
|
||||
}
|
@ -1,16 +1,17 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/dline"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-padreader"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
"github.com/filecoin-project/go-state-types/dline"
|
||||
"github.com/ipfs/go-blockservice"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-cidutil"
|
||||
@ -40,7 +41,6 @@ import (
|
||||
"github.com/filecoin-project/go-fil-markets/shared"
|
||||
"github.com/filecoin-project/go-fil-markets/storagemarket"
|
||||
"github.com/filecoin-project/go-multistore"
|
||||
"github.com/filecoin-project/go-padreader"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||
@ -50,6 +50,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/commp"
|
||||
"github.com/filecoin-project/lotus/markets/utils"
|
||||
"github.com/filecoin-project/lotus/node/impl/full"
|
||||
"github.com/filecoin-project/lotus/node/impl/paych"
|
||||
@ -709,6 +710,24 @@ func (a *API) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, e
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (a *API) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) {
|
||||
dag := merkledag.NewDAGService(blockservice.New(a.CombinedBstore, offline.Exchange(a.CombinedBstore)))
|
||||
|
||||
w := &commp.Writer{}
|
||||
bw := bufio.NewWriterSize(w, int(commp.CommPBuf))
|
||||
|
||||
err := car.WriteCar(ctx, dag, []cid.Cid{root}, w)
|
||||
if err != nil {
|
||||
return api.DataCIDSize{}, err
|
||||
}
|
||||
|
||||
if err := bw.Flush(); err != nil {
|
||||
return api.DataCIDSize{}, err
|
||||
}
|
||||
|
||||
return w.Sum()
|
||||
}
|
||||
|
||||
func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error {
|
||||
id, st, err := a.imgr().NewStore()
|
||||
if err != nil {
|
||||
|
1
node/impl/client/client_test.go
Normal file
1
node/impl/client/client_test.go
Normal file
@ -0,0 +1 @@
|
||||
package client
|
Loading…
Reference in New Issue
Block a user