diff --git a/provider/lpproof/treed_build.go b/provider/lpproof/treed_build.go index 8064d5045..a59ef6624 100644 --- a/provider/lpproof/treed_build.go +++ b/provider/lpproof/treed_build.go @@ -5,6 +5,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/hashicorp/go-multierror" "github.com/ipfs/go-cid" + pool "github.com/libp2p/go-buffer-pool" "github.com/minio/sha256-simd" "golang.org/x/xerrors" "io" @@ -24,6 +25,8 @@ func hashChunk(data [][]byte) { d := sha256.New() + sumBuf := make([]byte, nodeSize) + for i := 0; i < l1Nodes; i++ { levels := bits.TrailingZeros(^uint(i)) + 1 @@ -34,7 +37,7 @@ func hashChunk(data [][]byte) { d.Reset() inNodeData := data[l][inNode*nodeSize : (inNode+2)*nodeSize] d.Write(inNodeData) - copy(data[l+1][outNode*nodeSize:(outNode+1)*nodeSize], d.Sum(nil)) + copy(data[l+1][outNode*nodeSize:(outNode+1)*nodeSize], d.Sum(sumBuf[:0])) // set top bits to 00 data[l+1][outNode*nodeSize+nodeSize-1] &= 0x3f @@ -61,8 +64,8 @@ func BuildTreeD(data io.Reader, outPath string, size abi.PaddedPieceSize) (cid.C // setup buffers maxThreads := int64(size) / threadChunkSize - if maxThreads > int64(runtime.NumCPU()) { - maxThreads = int64(runtime.NumCPU()) + if maxThreads > int64(runtime.NumCPU())*15/10 { + maxThreads = int64(runtime.NumCPU()) * 15 / 10 } if maxThreads < 1 { maxThreads = 1 @@ -79,11 +82,11 @@ func BuildTreeD(data io.Reader, outPath string, size abi.PaddedPieceSize) (cid.C if bottomBufSize > int64(size) { bottomBufSize = int64(size) } - workerBuffer[0] = make([]byte, bottomBufSize) + workerBuffer[0] = pool.Get(int(bottomBufSize)) // append levels until we get to a 32 byte level for len(workerBuffer[len(workerBuffer)-1]) > 32 { - newLevel := make([]byte, len(workerBuffer[len(workerBuffer)-1])/2) + newLevel := pool.Get(len(workerBuffer[len(workerBuffer)-1]) / 2) workerBuffer = append(workerBuffer, newLevel) } workerBuffers[i] = workerBuffer @@ -98,13 +101,25 @@ func BuildTreeD(data io.Reader, outPath string, size abi.PaddedPieceSize) (cid.C } apexBuf = make([][]byte, 1) - apexBuf[0] = make([]byte, apexBottomSize*nodeSize) + apexBuf[0] = pool.Get(int(apexBottomSize * nodeSize)) for len(apexBuf[len(apexBuf)-1]) > 32 { - newLevel := make([]byte, len(apexBuf[len(apexBuf)-1])/2) + newLevel := pool.Get(len(apexBuf[len(apexBuf)-1]) / 2) apexBuf = append(apexBuf, newLevel) } } + // defer free pool buffers + defer func() { + for _, workerBuffer := range workerBuffers { + for _, level := range workerBuffer { + pool.Put(level) + } + } + for _, level := range apexBuf { + pool.Put(level) + } + }() + // start processing var processed uint64 var workWg sync.WaitGroup diff --git a/provider/lpproof/treed_build_test.go b/provider/lpproof/treed_build_test.go index 5fd28bbd3..6c69c4ed1 100644 --- a/provider/lpproof/treed_build_test.go +++ b/provider/lpproof/treed_build_test.go @@ -5,10 +5,12 @@ import ( "crypto/rand" "fmt" "github.com/filecoin-project/go-state-types/abi" + pool "github.com/libp2p/go-buffer-pool" "github.com/stretchr/testify/require" "io/ioutil" "os" "path/filepath" + "runtime" "testing" ) @@ -273,6 +275,26 @@ func BenchmarkBuildTreeD512M(b *testing.B) { b.Fatalf("Failed to generate random data: %v", err) } + // preallocate NumCPU+1 1MiB/512k/256k/... + // with Pool.Get / Pool.Put, so that they are in the pool + { + nc := runtime.NumCPU() + bufs := [][]byte{} + for i := 0; i < nc+1; i++ { + for sz := 1 << 20; sz > 32; sz >>= 1 { + b := pool.Get(sz) + bufs = append(bufs, b) + } + } + for _, b := range bufs { + pool.Put(b) + } + } + + if b.N == 1 { + b.N = 10 + } + b.SetBytes(int64(dataSize)) // Set the number of bytes for the benchmark for i := 0; i < b.N; i++ {