lpseal: Use pooled buffers for TreeD

This commit is contained in:
Łukasz Magiera 2023-12-23 15:42:24 +01:00
parent 6af3757136
commit a2c906fe4f
2 changed files with 44 additions and 7 deletions

View File

@ -5,6 +5,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/hashicorp/go-multierror"
"github.com/ipfs/go-cid"
pool "github.com/libp2p/go-buffer-pool"
"github.com/minio/sha256-simd"
"golang.org/x/xerrors"
"io"
@ -24,6 +25,8 @@ func hashChunk(data [][]byte) {
d := sha256.New()
sumBuf := make([]byte, nodeSize)
for i := 0; i < l1Nodes; i++ {
levels := bits.TrailingZeros(^uint(i)) + 1
@ -34,7 +37,7 @@ func hashChunk(data [][]byte) {
d.Reset()
inNodeData := data[l][inNode*nodeSize : (inNode+2)*nodeSize]
d.Write(inNodeData)
copy(data[l+1][outNode*nodeSize:(outNode+1)*nodeSize], d.Sum(nil))
copy(data[l+1][outNode*nodeSize:(outNode+1)*nodeSize], d.Sum(sumBuf[:0]))
// set top bits to 00
data[l+1][outNode*nodeSize+nodeSize-1] &= 0x3f
@ -61,8 +64,8 @@ func BuildTreeD(data io.Reader, outPath string, size abi.PaddedPieceSize) (cid.C
// setup buffers
maxThreads := int64(size) / threadChunkSize
if maxThreads > int64(runtime.NumCPU()) {
maxThreads = int64(runtime.NumCPU())
if maxThreads > int64(runtime.NumCPU())*15/10 {
maxThreads = int64(runtime.NumCPU()) * 15 / 10
}
if maxThreads < 1 {
maxThreads = 1
@ -79,11 +82,11 @@ func BuildTreeD(data io.Reader, outPath string, size abi.PaddedPieceSize) (cid.C
if bottomBufSize > int64(size) {
bottomBufSize = int64(size)
}
workerBuffer[0] = make([]byte, bottomBufSize)
workerBuffer[0] = pool.Get(int(bottomBufSize))
// append levels until we get to a 32 byte level
for len(workerBuffer[len(workerBuffer)-1]) > 32 {
newLevel := make([]byte, len(workerBuffer[len(workerBuffer)-1])/2)
newLevel := pool.Get(len(workerBuffer[len(workerBuffer)-1]) / 2)
workerBuffer = append(workerBuffer, newLevel)
}
workerBuffers[i] = workerBuffer
@ -98,13 +101,25 @@ func BuildTreeD(data io.Reader, outPath string, size abi.PaddedPieceSize) (cid.C
}
apexBuf = make([][]byte, 1)
apexBuf[0] = make([]byte, apexBottomSize*nodeSize)
apexBuf[0] = pool.Get(int(apexBottomSize * nodeSize))
for len(apexBuf[len(apexBuf)-1]) > 32 {
newLevel := make([]byte, len(apexBuf[len(apexBuf)-1])/2)
newLevel := pool.Get(len(apexBuf[len(apexBuf)-1]) / 2)
apexBuf = append(apexBuf, newLevel)
}
}
// defer free pool buffers
defer func() {
for _, workerBuffer := range workerBuffers {
for _, level := range workerBuffer {
pool.Put(level)
}
}
for _, level := range apexBuf {
pool.Put(level)
}
}()
// start processing
var processed uint64
var workWg sync.WaitGroup

View File

@ -5,10 +5,12 @@ import (
"crypto/rand"
"fmt"
"github.com/filecoin-project/go-state-types/abi"
pool "github.com/libp2p/go-buffer-pool"
"github.com/stretchr/testify/require"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"testing"
)
@ -273,6 +275,26 @@ func BenchmarkBuildTreeD512M(b *testing.B) {
b.Fatalf("Failed to generate random data: %v", err)
}
// preallocate NumCPU+1 1MiB/512k/256k/...
// with Pool.Get / Pool.Put, so that they are in the pool
{
nc := runtime.NumCPU()
bufs := [][]byte{}
for i := 0; i < nc+1; i++ {
for sz := 1 << 20; sz > 32; sz >>= 1 {
b := pool.Get(sz)
bufs = append(bufs, b)
}
}
for _, b := range bufs {
pool.Put(b)
}
}
if b.N == 1 {
b.N = 10
}
b.SetBytes(int64(dataSize)) // Set the number of bytes for the benchmark
for i := 0; i < b.N; i++ {