diff --git a/go.mod b/go.mod index 24366a162..e3e6bd8ad 100644 --- a/go.mod +++ b/go.mod @@ -168,6 +168,7 @@ require ( require ( github.com/GeertJohan/go.incremental v1.0.0 // indirect + github.com/KarpelesLab/reflink v1.0.1 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/StackExchange/wmi v1.2.1 // indirect diff --git a/go.sum b/go.sum index 57eb9bcd1..26cd5e8a3 100644 --- a/go.sum +++ b/go.sum @@ -57,6 +57,8 @@ github.com/GeertJohan/go.rice v1.0.3 h1:k5viR+xGtIhF61125vCE1cmJ5957RQGXG6dmbaWZ github.com/GeertJohan/go.rice v1.0.3/go.mod h1:XVdrU4pW00M4ikZed5q56tPf1v2KwnIKeIdc9CBYNt4= github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee h1:8doiS7ib3zi6/K172oDhSKU0dJ/miJramo9NITOMyZQ= github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee/go.mod h1:W0GbEAA4uFNYOGG2cJpmFJ04E6SD1NLELPYZB57/7AY= +github.com/KarpelesLab/reflink v1.0.1 h1:d+tdjliwOCqvub9bl0Y02GxahWkNqejNb3TZTTUcQWA= +github.com/KarpelesLab/reflink v1.0.1/go.mod h1:WGkTOKNjd1FsJKBw3mu4JvrPEDJyJJ+JPtxBkbPoCok= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= github.com/Kubuxu/imtui v0.0.0-20210401140320-41663d68d0fa h1:1PPxEyGdIGVkX/kqMvLJ95a1dGS1Sz7tpNEgehEYYt0= diff --git a/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql b/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql index 1df390dfb..43c0c8498 100644 --- a/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql +++ b/lib/harmony/harmonydb/sql/20231217-sdr-pipeline.sql @@ -77,7 +77,7 @@ create table sectors_sdr_initial_pieces ( piece_index bigint not null, piece_cid text not null, - piece_size bigint not null, + piece_size bigint not null, -- padded size -- data source data_url text not null, @@ -98,6 +98,8 @@ create table sectors_sdr_initial_pieces ( primary key (sp_id, sector_number, piece_index) ); +comment on column sectors_sdr_initial_pieces.piece_size is 'padded size of the piece'; + create table sectors_allocated_numbers ( sp_id bigint not null primary key, allocated jsonb not null diff --git a/provider/lpffi/sdr_funcs.go b/provider/lpffi/sdr_funcs.go index 7ac01ed08..1fdc28144 100644 --- a/provider/lpffi/sdr_funcs.go +++ b/provider/lpffi/sdr_funcs.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "fmt" + "github.com/KarpelesLab/reflink" + proof2 "github.com/filecoin-project/go-state-types/proof" "io" "os" "path/filepath" @@ -18,7 +20,6 @@ import ( "github.com/filecoin-project/lotus/provider/lpproof" "github.com/filecoin-project/lotus/storage/paths" - "github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader" "github.com/filecoin-project/lotus/storage/sealer/proofpaths" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -115,7 +116,7 @@ func (sb *SealCalls) GenerateSDR(ctx context.Context, sector storiface.SectorRef return nil } -func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, size abi.PaddedPieceSize, data io.Reader) (cid.Cid, error) { +func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool) (cid.Cid, error) { maybeUns := storiface.FTNone // todo sectors with data @@ -125,7 +126,7 @@ func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, size } defer releaseSector() - return lpproof.BuildTreeD(data, filepath.Join(paths.Cache, proofpaths.TreeDName), size) + return lpproof.BuildTreeD(data, unpaddedData, filepath.Join(paths.Cache, proofpaths.TreeDName), size) } func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, unsealed cid.Cid) (cid.Cid, cid.Cid, error) { @@ -134,8 +135,6 @@ func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, uns return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err) } - log.Errorw("phase1 output", "p1o", p1o) - paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing) if err != nil { return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err) @@ -149,20 +148,38 @@ func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, uns return cid.Undef, cid.Undef, xerrors.Errorf("getting sector size: %w", err) } - // paths.Sealed is a string filepath - f, err := os.Create(paths.Sealed) - if err != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("creating sealed sector file: %w", err) - } - if err := f.Truncate(int64(ssize)); err != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("truncating sealed sector file: %w", err) - } + { + // copy TreeD prefix to sealed sector, SealPreCommitPhase2 will mutate it in place into the sealed sector - if os.Getenv("SEAL_WRITE_UNSEALED") == "1" { - // expliticly write zeros to unsealed sector - _, err := io.CopyN(f, nullreader.NewNullReader(abi.UnpaddedPieceSize(ssize)), int64(ssize)) - if err != nil { - return cid.Undef, cid.Undef, xerrors.Errorf("writing zeros to sealed sector file: %w", err) + // first try reflink + truncate, that should be way faster + err := reflink.Always(filepath.Join(paths.Cache, proofpaths.TreeDName), paths.Sealed) + if err == nil { + err = os.Truncate(paths.Sealed, int64(ssize)) + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("truncating reflinked sealed file: %w", err) + } + } else { + log.Errorw("reflink treed -> sealed failed, falling back to slow copy, use single scratch btrfs or xfs filesystem", "error", err, "sector", sector, "cache", paths.Cache, "sealed", paths.Sealed) + + // fallback to slow copy, copy ssize bytes from treed to sealed + dst, err := os.OpenFile(paths.Sealed, os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("opening sealed sector file: %w", err) + } + src, err := os.Open(filepath.Join(paths.Cache, proofpaths.TreeDName)) + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("opening treed sector file: %w", err) + } + + _, err = io.CopyN(dst, src, int64(ssize)) + derr := dst.Close() + _ = src.Close() + if err != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("copying treed -> sealed: %w", err) + } + if derr != nil { + return cid.Undef, cid.Undef, xerrors.Errorf("closing sealed file: %w", derr) + } } } } @@ -180,7 +197,29 @@ func (sb *SealCalls) PoRepSnark(ctx context.Context, sn storiface.SectorRef, sea return nil, xerrors.Errorf("failed to generate vanilla proof: %w", err) } - return ffi.SealCommitPhase2(vproof, sn.ID.Number, sn.ID.Miner) + proof, err := ffi.SealCommitPhase2(vproof, sn.ID.Number, sn.ID.Miner) + if err != nil { + return nil, xerrors.Errorf("computing seal proof failed: %w", err) + } + + ok, err := ffi.VerifySeal(proof2.SealVerifyInfo{ + SealProof: sn.ProofType, + SectorID: sn.ID, + DealIDs: nil, + Randomness: ticket, + InteractiveRandomness: seed, + Proof: proof, + SealedCID: sealed, + UnsealedCID: unsealed, + }) + if err != nil { + return nil, xerrors.Errorf("failed to verify proof: %w", err) + } + if !ok { + return nil, xerrors.Errorf("porep failed to validate") + } + + return proof, nil } func (sb *SealCalls) makePhase1Out(unsCid cid.Cid, spt abi.RegisteredSealProof) ([]byte, error) { diff --git a/provider/lpproof/treed_build.go b/provider/lpproof/treed_build.go index a7cc83d4b..07e4d12fd 100644 --- a/provider/lpproof/treed_build.go +++ b/provider/lpproof/treed_build.go @@ -1,6 +1,7 @@ package lpproof import ( + "github.com/filecoin-project/lotus/storage/sealer/fr32" "io" "math/bits" "os" @@ -50,7 +51,7 @@ func hashChunk(data [][]byte) { } } -func BuildTreeD(data io.Reader, outPath string, size abi.PaddedPieceSize) (cid.Cid, error) { +func BuildTreeD(data io.Reader, unpaddedData bool, outPath string, size abi.PaddedPieceSize) (cid.Cid, error) { out, err := os.Create(outPath) if err != nil { return cid.Undef, err @@ -156,6 +157,11 @@ func BuildTreeD(data io.Reader, outPath string, size abi.PaddedPieceSize) (cid.C // size, and if it's smaller than a single buffer, we only have one // smaller buffer + processedSize := uint64(len(workBuffer[0])) + if unpaddedData { + workBuffer[0] = workBuffer[0][:abi.PaddedPieceSize(len(workBuffer[0])).Unpadded()] + } + _, err := io.ReadFull(data, workBuffer[0]) if err != nil && err != io.EOF { return cid.Undef, err @@ -164,6 +170,12 @@ func BuildTreeD(data io.Reader, outPath string, size abi.PaddedPieceSize) (cid.C // start processing workWg.Add(1) go func(startOffset uint64) { + if unpaddedData { + paddedBuf := pool.Get(int(abi.UnpaddedPieceSize(len(workBuffer[0])).Padded())) + fr32.PadSingle(workBuffer[0], paddedBuf) + pool.Put(workBuffer[0]) + workBuffer[0] = paddedBuf + } hashChunk(workBuffer) // persist apex @@ -200,7 +212,7 @@ func BuildTreeD(data io.Reader, outPath string, size abi.PaddedPieceSize) (cid.C workWg.Done() }(processed) - processed += uint64(len(workBuffer[0])) + processed += processedSize } workWg.Wait() diff --git a/provider/lpproof/treed_build_test.go b/provider/lpproof/treed_build_test.go index e3687cfcf..daf742a35 100644 --- a/provider/lpproof/treed_build_test.go +++ b/provider/lpproof/treed_build_test.go @@ -99,7 +99,7 @@ func Test2K(t *testing.T) { tempFile := filepath.Join(t.TempDir(), "tree.dat") - commd, err := BuildTreeD(bytes.NewReader(data), tempFile, 2048) + commd, err := BuildTreeD(bytes.NewReader(data), false, tempFile, 2048) require.NoError(t, err) fmt.Println(commd) @@ -123,23 +123,7 @@ func Test2K(t *testing.T) { } -func Test8MiB(t *testing.T) { - data := make([]byte, 8<<20) - data[0] = 0x01 - - tempFile := filepath.Join(t.TempDir(), "tree.dat") - - commd, err := BuildTreeD(bytes.NewReader(data), tempFile, 8<<20) - require.NoError(t, err) - fmt.Println(commd) - - // dump tree.dat - dat, err := os.ReadFile(tempFile) - require.NoError(t, err) - - actualD := hexPrint32LDedup(dat) - - expectD := `00000000: 01 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 +const expectD8M = `00000000: 01 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00000020: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 * 00800000: 16 ab ab 34 1f b7 f3 70 e2 7e 4d ad cf 81 76 6d d0 df d0 ae 64 46 94 77 bb 2c f6 61 49 38 b2 2f @@ -195,10 +179,45 @@ func Test8MiB(t *testing.T) { 00ffffc0: 23 40 4a 88 80 f9 cb c7 20 39 cb 86 14 35 9c 28 34 84 55 70 fe 95 19 0b bd 4d 93 41 42 e8 25 2c ` +func Test8MiB(t *testing.T) { + data := make([]byte, 8<<20) + data[0] = 0x01 + + tempFile := filepath.Join(t.TempDir(), "tree.dat") + + commd, err := BuildTreeD(bytes.NewReader(data), false, tempFile, 8<<20) + require.NoError(t, err) + fmt.Println(commd) + + // dump tree.dat + dat, err := os.ReadFile(tempFile) + require.NoError(t, err) + + actualD := hexPrint32LDedup(dat) fmt.Println(actualD) - require.EqualValues(t, expectD, actualD) + require.EqualValues(t, expectD8M, actualD) + require.Equal(t, "baga6ea4seaqcgqckrcapts6hea44xbqugwocqneekvyp5fizbo6u3e2biluckla", commd.String()) +} +func Test8MiBUnpad(t *testing.T) { + data := make([]byte, abi.PaddedPieceSize(8<<20).Unpadded()) + data[0] = 0x01 + + tempFile := filepath.Join(t.TempDir(), "tree.dat") + + commd, err := BuildTreeD(bytes.NewReader(data), true, tempFile, 8<<20) + require.NoError(t, err) + fmt.Println(commd) + + // dump tree.dat + dat, err := os.ReadFile(tempFile) + require.NoError(t, err) + + actualD := hexPrint32LDedup(dat) + fmt.Println(actualD) + + require.EqualValues(t, expectD8M, actualD) require.Equal(t, "baga6ea4seaqcgqckrcapts6hea44xbqugwocqneekvyp5fizbo6u3e2biluckla", commd.String()) } @@ -312,7 +331,7 @@ func BenchmarkBuildTreeD512M(b *testing.B) { b.StartTimer() // Start the timer for the BuildTreeD operation - _, err = BuildTreeD(bytes.NewReader(data), tempFilePath, dataSize) + _, err = BuildTreeD(bytes.NewReader(data), false, tempFilePath, dataSize) if err != nil { b.Fatalf("BuildTreeD failed: %v", err) } diff --git a/provider/lpseal/task_sdr.go b/provider/lpseal/task_sdr.go index e8098ddcc..06f852260 100644 --- a/provider/lpseal/task_sdr.go +++ b/provider/lpseal/task_sdr.go @@ -3,6 +3,8 @@ package lpseal import ( "bytes" "context" + "github.com/filecoin-project/go-commp-utils/nonffi" + "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -84,17 +86,34 @@ func (s *SDRTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bo return false, xerrors.Errorf("getting pieces: %w", err) } - if len(pieces) > 0 { - // todo sdr with data - return false, xerrors.Errorf("todo sdr with data") - } - ssize, err := sectorParams.RegSealProof.SectorSize() if err != nil { return false, xerrors.Errorf("getting sector size: %w", err) } - commd := zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded()) + var commd cid.Cid + + if len(pieces) > 0 { + pieceInfos := make([]abi.PieceInfo, len(pieces)) + for i, p := range pieces { + c, err := cid.Parse(p.PieceCID) + if err != nil { + return false, xerrors.Errorf("parsing piece cid: %w", err) + } + + pieceInfos[i] = abi.PieceInfo{ + Size: abi.PaddedPieceSize(p.PieceSize), + PieceCID: c, + } + } + + commd, err = nonffi.GenerateUnsealedCID(sectorParams.RegSealProof, pieceInfos) + if err != nil { + return false, xerrors.Errorf("computing CommD: %w", err) + } + } else { + commd = zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded()) + } sref := storiface.SectorRef{ ID: abi.SectorID{ diff --git a/provider/lpseal/task_submit_precommit.go b/provider/lpseal/task_submit_precommit.go index e1d98df50..68f04c6c8 100644 --- a/provider/lpseal/task_submit_precommit.go +++ b/provider/lpseal/task_submit_precommit.go @@ -102,9 +102,8 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo SectorNumber: abi.SectorNumber(sectorParams.SectorNumber), SealedCID: sealedCID, SealRandEpoch: sectorParams.TicketEpoch, - DealIDs: nil, // todo + DealIDs: nil, Expiration: expiration, - //UnsealedCid: unsealedCID, // todo with deals }) var pbuf bytes.Buffer if err := params.MarshalCBOR(&pbuf); err != nil { diff --git a/provider/lpseal/task_trees.go b/provider/lpseal/task_trees.go index c382f75c0..dc96d1637 100644 --- a/provider/lpseal/task_trees.go +++ b/provider/lpseal/task_trees.go @@ -2,6 +2,12 @@ package lpseal import ( "context" + "github.com/filecoin-project/go-commp-utils/nonffi" + "github.com/filecoin-project/go-padreader" + "github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader" + "github.com/ipfs/go-cid" + "io" + "net/http" "golang.org/x/xerrors" @@ -12,7 +18,6 @@ import ( "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/resources" "github.com/filecoin-project/lotus/provider/lpffi" - "github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -60,27 +65,68 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done PieceIndex int64 `db:"piece_index"` PieceCID string `db:"piece_cid"` PieceSize int64 `db:"piece_size"` + + DataUrl *string `db:"data_url"` + DataHeaders *[]byte `db:"data_headers"` + DataRawSize *int64 `db:"data_raw_size"` } err = t.db.Select(ctx, &pieces, ` - SELECT piece_index, piece_cid, piece_size + SELECT piece_index, piece_cid, piece_size, data_url, data_headers, data_raw_size FROM sectors_sdr_initial_pieces WHERE sp_id = $1 AND sector_number = $2`, sectorParams.SpID, sectorParams.SectorNumber) if err != nil { return false, xerrors.Errorf("getting pieces: %w", err) } - if len(pieces) > 0 { - // todo sectors with data - return false, xerrors.Errorf("todo sectors with data") - } - ssize, err := sectorParams.RegSealProof.SectorSize() if err != nil { return false, xerrors.Errorf("getting sector size: %w", err) } - commd := zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded()) + var commd cid.Cid + var dataReader io.Reader + var unpaddedData bool + + if len(pieces) > 0 { + pieceInfos := make([]abi.PieceInfo, len(pieces)) + pieceReaders := make([]io.Reader, len(pieces)) + + for i, p := range pieces { + // make pieceInfo + c, err := cid.Parse(p.PieceCID) + if err != nil { + return false, xerrors.Errorf("parsing piece cid: %w", err) + } + + pieceInfos[i] = abi.PieceInfo{ + Size: abi.PaddedPieceSize(p.PieceSize), + PieceCID: c, + } + + // make pieceReader + if p.DataUrl != nil { + pieceReaders[i], _ = padreader.New(&UrlPieceReader{ + Url: *p.DataUrl, + RawSize: *p.DataRawSize, + }, uint64(*p.DataRawSize)) + } else { // padding piece (w/o fr32 padding, added in TreeD) + pieceReaders[i] = nullreader.NewNullReader(abi.PaddedPieceSize(p.PieceSize).Unpadded()) + } + } + + commd, err = nonffi.GenerateUnsealedCID(sectorParams.RegSealProof, pieceInfos) + if err != nil { + return false, xerrors.Errorf("computing CommD: %w", err) + } + + dataReader = io.MultiReader(pieceReaders...) + unpaddedData = true + } else { + commd = zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded()) + dataReader = nullreader.NewNullReader(abi.UnpaddedPieceSize(ssize)) + unpaddedData = false // nullreader includes fr32 zero bits + } sref := storiface.SectorRef{ ID: abi.SectorID{ @@ -91,7 +137,7 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done } // D - treeUnsealed, err := t.sc.TreeD(ctx, sref, abi.PaddedPieceSize(ssize), nullreader.NewNullReader(abi.UnpaddedPieceSize(ssize))) + treeUnsealed, err := t.sc.TreeD(ctx, sref, abi.PaddedPieceSize(ssize), dataReader, unpaddedData) if err != nil { return false, xerrors.Errorf("computing tree d: %w", err) } @@ -152,4 +198,61 @@ func (t *TreesTask) Adder(taskFunc harmonytask.AddTaskFunc) { t.sp.pollers[pollerTrees].Set(taskFunc) } +type UrlPieceReader struct { + Url string + RawSize int64 // the exact number of bytes read, if we read more or less that's an error + + readSoFar int64 + active io.ReadCloser // auto-closed on EOF +} + +func (u *UrlPieceReader) Read(p []byte) (n int, err error) { + // Check if we have already read the required amount of data + if u.readSoFar >= u.RawSize { + return 0, io.EOF + } + + // If 'active' is nil, initiate the HTTP request + if u.active == nil { + resp, err := http.Get(u.Url) + if err != nil { + return 0, err + } + + // Set 'active' to the response body + u.active = resp.Body + } + + // Calculate the maximum number of bytes we can read without exceeding RawSize + toRead := u.RawSize - u.readSoFar + if int64(len(p)) > toRead { + p = p[:toRead] + } + + n, err = u.active.Read(p) + + // Update the number of bytes read so far + u.readSoFar += int64(n) + + // If the number of bytes read exceeds RawSize, return an error + if u.readSoFar > u.RawSize { + return n, xerrors.New("read beyond the specified RawSize") + } + + // If EOF is reached, close the reader + if err == io.EOF { + cerr := u.active.Close() + if cerr != nil { + log.Errorf("error closing http piece reader: %s", cerr) + } + + // if we're below the RawSize, return an unexpected EOF error + if u.readSoFar < u.RawSize { + return n, io.ErrUnexpectedEOF + } + } + + return n, err +} + var _ harmonytask.TaskInterface = &TreesTask{} diff --git a/provider/lpseal/task_trees_test.go b/provider/lpseal/task_trees_test.go new file mode 100644 index 000000000..d9fd6c41e --- /dev/null +++ b/provider/lpseal/task_trees_test.go @@ -0,0 +1,69 @@ +package lpseal + +import ( + "io" + "net/http" + "net/http/httptest" + "testing" +) + +// TestUrlPieceReader_Read tests various scenarios of reading data from UrlPieceReader +func TestUrlPieceReader_Read(t *testing.T) { + // Create a test server + testData := "This is a test string." + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.WriteString(w, testData) + })) + defer ts.Close() + + tests := []struct { + name string + rawSize int64 + expected string + expectError bool + expectEOF bool + }{ + {"ReadExact", int64(len(testData)), testData, false, true}, + {"ReadLess", 10, testData[:10], false, false}, + {"ReadMore", int64(len(testData)) + 10, "", true, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + reader := UrlPieceReader{ + Url: ts.URL, + RawSize: tt.rawSize, + } + buffer, err := io.ReadAll(&reader) + if err != nil { + if (err != io.EOF && !tt.expectError) || (err == io.EOF && !tt.expectEOF) { + t.Errorf("Read() error = %v, expectError %v, expectEOF %v", err, tt.expectError, tt.expectEOF) + } + } else { + if got := string(buffer); got != tt.expected { + t.Errorf("Read() got = %v, expected %v", got, tt.expected) + } + } + }) + } +} + +// TestUrlPieceReader_Read_Error tests the error handling of UrlPieceReader +func TestUrlPieceReader_Read_Error(t *testing.T) { + // Simulate a server that returns an error + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "error", http.StatusInternalServerError) + })) + defer ts.Close() + + reader := UrlPieceReader{ + Url: ts.URL, + RawSize: 100, + } + buffer := make([]byte, 200) + + _, err := reader.Read(buffer) + if err == nil { + t.Errorf("Expected an error, but got nil") + } +} diff --git a/storage/sealer/fr32/fr32.go b/storage/sealer/fr32/fr32.go index 6f5be65b7..83a20597f 100644 --- a/storage/sealer/fr32/fr32.go +++ b/storage/sealer/fr32/fr32.go @@ -69,6 +69,10 @@ func Pad(in, out []byte) { pad(in, out) } +func PadSingle(in, out []byte) { + pad(in, out) +} + func pad(in, out []byte) { chunks := len(out) / 128 for chunk := 0; chunk < chunks; chunk++ {