From 3f9aae7031fa1244141abe84fdedd7393cfbd1b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 21 Dec 2023 17:28:26 +0100 Subject: [PATCH] lpseal: Implement TreeD build function in Go --- provider/lpproof/treed_build.go | 241 +++++++++++++++++++++++++++ provider/lpproof/treed_build_test.go | 116 +++++++++++++ 2 files changed, 357 insertions(+) create mode 100644 provider/lpproof/treed_build.go create mode 100644 provider/lpproof/treed_build_test.go diff --git a/provider/lpproof/treed_build.go b/provider/lpproof/treed_build.go new file mode 100644 index 000000000..ff52968e5 --- /dev/null +++ b/provider/lpproof/treed_build.go @@ -0,0 +1,241 @@ +package lpproof + +import ( + commcid "github.com/filecoin-project/go-fil-commcid" + "github.com/filecoin-project/go-state-types/abi" + "github.com/hashicorp/go-multierror" + "github.com/ipfs/go-cid" + "github.com/minio/sha256-simd" + "golang.org/x/xerrors" + "io" + "math/bits" + "os" + "runtime" + "sync" + "time" +) + +const nodeSize = 32 +const threadChunkSize = 1 << 20 +const nodesPerChunk = threadChunkSize / nodeSize + +func hashChunk(data [][]byte) { + l1Nodes := len(data[0]) / nodeSize / 2 + + d := sha256.New() + + for i := 0; i < l1Nodes; i++ { + levels := bits.TrailingZeros(^uint(i)) + 1 + + inNode := i * 2 // at level 0 + outNode := i + + for l := 0; l < levels; l++ { + d.Reset() + inNodeData := data[l][inNode*nodeSize : (inNode+2)*nodeSize] + d.Write(inNodeData) + copy(data[l+1][outNode*nodeSize:(outNode+1)*nodeSize], d.Sum(nil)) + // set top bits to 00 + data[l+1][outNode*nodeSize+nodeSize-1] &= 0x3f + + inNode-- + inNode >>= 1 + outNode >>= 1 + } + } +} + +func BuildTreeD(data io.Reader, outPath string, size abi.PaddedPieceSize) (cid.Cid, error) { + out, err := os.Create(outPath) + if err != nil { + return cid.Undef, err + } + + outSize := treeSize(size) + + // allocate space for the tree + err = out.Truncate(int64(outSize)) + if err != nil { + return cid.Undef, err + } + + // setup buffers + maxThreads := int64(size) / threadChunkSize + if maxThreads > int64(runtime.NumCPU()) { + maxThreads = int64(runtime.NumCPU()) + } + if maxThreads < 1 { + maxThreads = 1 + } + + // allocate buffers + var bufLk sync.Mutex + workerBuffers := make([][][]byte, maxThreads) // [worker][level][levelSize] + + for i := range workerBuffers { + workerBuffer := make([][]byte, 1) + + bottomBufSize := int64(threadChunkSize) + if bottomBufSize > int64(size) { + bottomBufSize = int64(size) + } + workerBuffer[0] = make([]byte, bottomBufSize) + + // append levels until we get to a 32 byte level + for len(workerBuffer[len(workerBuffer)-1]) > 32 { + newLevel := make([]byte, len(workerBuffer[len(workerBuffer)-1])/2) + workerBuffer = append(workerBuffer, newLevel) + } + workerBuffers[i] = workerBuffer + } + + // prepare apex buffer + apexBottomSize := uint64(size) / uint64(len(workerBuffers[0][0])) + var apexBuf [][]byte + threadLayers := 1 + + if apexBottomSize > 1 { + apexBuf = make([][]byte, 1) + apexBuf[0] = make([]byte, apexBottomSize) + for len(apexBuf[len(apexBuf)-1]) > 32 { + newLevel := make([]byte, len(apexBuf[len(apexBuf)-1])/2) + apexBuf = append(apexBuf, newLevel) + threadLayers++ + } + } + + // start processing + var processed uint64 + var workWg sync.WaitGroup + var errLock sync.Mutex + var oerr error + + for processed < uint64(size) { + // get a buffer + bufLk.Lock() + if len(workerBuffers) == 0 { + bufLk.Unlock() + time.Sleep(50 * time.Microsecond) + continue + } + + // pop last + workBuffer := workerBuffers[len(workerBuffers)-1] + workerBuffers = workerBuffers[:len(workerBuffers)-1] + + bufLk.Unlock() + + // before reading check that we didn't get a write error + errLock.Lock() + if oerr != nil { + errLock.Unlock() + return cid.Undef, oerr + } + errLock.Unlock() + + // read data into the bottom level + // note: the bottom level will never be too big; data is power of two + // size, and if it's smaller than a single buffer, we only have one + // smaller buffer + + _, err := io.ReadFull(data, workBuffer[0]) + if err != nil && err != io.EOF { + return cid.Undef, err + } + + // start processing + workWg.Add(1) + go func(startOffset uint64) { + hashChunk(workBuffer) + + // persist apex if needed + if len(apexBuf) > 0 { + apexHash := workBuffer[len(workBuffer)-1] + hashPos := startOffset >> threadLayers + + copy(apexBuf[0][hashPos:hashPos+nodeSize], apexHash) + } + + // write results + offsetInLayer := startOffset + for layer, layerData := range workBuffer { + + // layerOff is outSize:bits[most significant bit - layer] + layerOff := layerOffset(uint64(size), layer) + dataOff := offsetInLayer + layerOff + offsetInLayer /= 2 + + _, werr := out.WriteAt(layerData, int64(dataOff)) + if werr != nil { + errLock.Lock() + oerr = multierror.Append(oerr, werr) + errLock.Unlock() + return + } + } + + // return buffer + bufLk.Lock() + workerBuffers = append(workerBuffers, workBuffer) + bufLk.Unlock() + + workWg.Done() + }(processed) + + processed += uint64(len(workBuffer[0])) + } + + workWg.Wait() + + if oerr != nil { + return cid.Undef, oerr + } + + if len(apexBuf) > 0 { + // hash the apex + hashChunk(apexBuf) + + // write apex + for apexLayer, layerData := range apexBuf { + layer := apexLayer + threadLayers + + layerOff := layerOffset(uint64(size), layer) + _, werr := out.WriteAt(layerData, int64(layerOff)) + if werr != nil { + return cid.Undef, xerrors.Errorf("write apex: %w", werr) + } + } + } + + var commp [32]byte + if len(workerBuffers) == 1 { + copy(commp[:], workerBuffers[0][0]) + } else { + copy(commp[:], apexBuf[0]) + } + + commCid, err := commcid.DataCommitmentV1ToCID(commp[:]) + if err != nil { + return cid.Undef, err + } + + return commCid, nil +} + +func treeSize(data abi.PaddedPieceSize) uint64 { + bytesToAlloc := uint64(data) + + // append bytes until we get to nodeSize + for todo := bytesToAlloc; todo > nodeSize; todo /= 2 { + bytesToAlloc += todo / 2 + } + + return bytesToAlloc +} + +func layerOffset(size uint64, layer int) uint64 { + layerBits := uint64(1) << uint64(layer) + layerBits-- + layerOff := (size * layerBits) >> uint64(layer-1) + return layerOff +} diff --git a/provider/lpproof/treed_build_test.go b/provider/lpproof/treed_build_test.go new file mode 100644 index 000000000..f1834de3e --- /dev/null +++ b/provider/lpproof/treed_build_test.go @@ -0,0 +1,116 @@ +package lpproof + +import ( + "crypto/rand" + "github.com/filecoin-project/go-state-types/abi" + "github.com/stretchr/testify/require" + "testing" +) + +func TestTreeSize(t *testing.T) { + require.Equal(t, uint64(32), treeSize(abi.PaddedPieceSize(32))) + require.Equal(t, uint64(64+32), treeSize(abi.PaddedPieceSize(64))) + require.Equal(t, uint64(128+64+32), treeSize(abi.PaddedPieceSize(128))) + require.Equal(t, uint64(256+128+64+32), treeSize(abi.PaddedPieceSize(256))) +} + +func TestTreeLayerOffset(t *testing.T) { + require.Equal(t, uint64(0), layerOffset(128, 0)) + require.Equal(t, uint64(128), layerOffset(128, 1)) + require.Equal(t, uint64(128+64), layerOffset(128, 2)) + require.Equal(t, uint64(128+64+32), layerOffset(128, 3)) +} + +func TestHashChunk(t *testing.T) { + chunk := make([]byte, 64) + chunk[0] = 0x01 + + out := make([]byte, 32) + + data := [][]byte{chunk, out} + hashChunk(data) + + // 16 ab ab 34 1f b7 f3 70 e2 7e 4d ad cf 81 76 6d + // d0 df d0 ae 64 46 94 77 bb 2c f6 61 49 38 b2 2f + expect := []byte{ + 0x16, 0xab, 0xab, 0x34, 0x1f, 0xb7, 0xf3, 0x70, + 0xe2, 0x7e, 0x4d, 0xad, 0xcf, 0x81, 0x76, 0x6d, + 0xd0, 0xdf, 0xd0, 0xae, 0x64, 0x46, 0x94, 0x77, + 0xbb, 0x2c, 0xf6, 0x61, 0x49, 0x38, 0xb2, 0x2f, + } + + require.Equal(t, expect, out) +} + +func TestHashChunk2L(t *testing.T) { + data0 := make([]byte, 128) + data0[0] = 0x01 + + l1 := make([]byte, 64) + l2 := make([]byte, 32) + + data := [][]byte{data0, l1, l2} + hashChunk(data) + + // 16 ab ab 34 1f b7 f3 70 e2 7e 4d ad cf 81 76 6d + // d0 df d0 ae 64 46 94 77 bb 2c f6 61 49 38 b2 2f + expectL1Left := []byte{ + 0x16, 0xab, 0xab, 0x34, 0x1f, 0xb7, 0xf3, 0x70, + 0xe2, 0x7e, 0x4d, 0xad, 0xcf, 0x81, 0x76, 0x6d, + 0xd0, 0xdf, 0xd0, 0xae, 0x64, 0x46, 0x94, 0x77, + 0xbb, 0x2c, 0xf6, 0x61, 0x49, 0x38, 0xb2, 0x2f, + } + + // f5 a5 fd 42 d1 6a 20 30 27 98 ef 6e d3 09 97 9b + // 43 00 3d 23 20 d9 f0 e8 ea 98 31 a9 27 59 fb 0b + expectL1Rest := []byte{ + 0xf5, 0xa5, 0xfd, 0x42, 0xd1, 0x6a, 0x20, 0x30, + 0x27, 0x98, 0xef, 0x6e, 0xd3, 0x09, 0x97, 0x9b, + 0x43, 0x00, 0x3d, 0x23, 0x20, 0xd9, 0xf0, 0xe8, + 0xea, 0x98, 0x31, 0xa9, 0x27, 0x59, 0xfb, 0x0b, + } + + require.Equal(t, expectL1Left, l1[:32]) + require.Equal(t, expectL1Rest, l1[32:]) + + // 0d d6 da e4 1c 2f 75 55 01 29 59 4f b6 44 e4 a8 + // 42 cf af b3 16 a2 d5 93 21 e3 88 fe 84 a1 ec 2f + expectL2 := []byte{ + 0x0d, 0xd6, 0xda, 0xe4, 0x1c, 0x2f, 0x75, 0x55, + 0x01, 0x29, 0x59, 0x4f, 0xb6, 0x44, 0xe4, 0xa8, + 0x42, 0xcf, 0xaf, 0xb3, 0x16, 0xa2, 0xd5, 0x93, + 0x21, 0xe3, 0x88, 0xfe, 0x84, 0xa1, 0xec, 0x2f, + } + + require.Equal(t, expectL2, l2) +} + +func BenchmarkHashChunk(b *testing.B) { + const benchSize = 1024 * 1024 + + // Generate 1 MiB of random data + randomData := make([]byte, benchSize) + if _, err := rand.Read(randomData); err != nil { + b.Fatalf("Failed to generate random data: %v", err) + } + + // Prepare data structure for hashChunk + data := make([][]byte, 1) + data[0] = randomData + + // append levels until we get to a 32 byte level + for len(data[len(data)-1]) > 32 { + newLevel := make([]byte, len(data[len(data)-1])/2) + data = append(data, newLevel) + } + + b.SetBytes(benchSize) // Set the number of bytes for the benchmark + + b.ResetTimer() // Start the timer after setup + + for i := 0; i < b.N; i++ { + hashChunk(data) + // Use the result in some way to avoid compiler optimization + _ = data[1] + } +}