v1.27.0-a #10

Closed
jonathanface wants to merge 473 commits from v1.27.0-a into master
11 changed files with 328 additions and 59 deletions
Showing only changes of commit 12db86c0aa - Show all commits

1
go.mod
View File

@ -168,6 +168,7 @@ require (
require ( require (
github.com/GeertJohan/go.incremental v1.0.0 // indirect github.com/GeertJohan/go.incremental v1.0.0 // indirect
github.com/KarpelesLab/reflink v1.0.1 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/StackExchange/wmi v1.2.1 // indirect github.com/StackExchange/wmi v1.2.1 // indirect

2
go.sum
View File

@ -57,6 +57,8 @@ github.com/GeertJohan/go.rice v1.0.3 h1:k5viR+xGtIhF61125vCE1cmJ5957RQGXG6dmbaWZ
github.com/GeertJohan/go.rice v1.0.3/go.mod h1:XVdrU4pW00M4ikZed5q56tPf1v2KwnIKeIdc9CBYNt4= github.com/GeertJohan/go.rice v1.0.3/go.mod h1:XVdrU4pW00M4ikZed5q56tPf1v2KwnIKeIdc9CBYNt4=
github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee h1:8doiS7ib3zi6/K172oDhSKU0dJ/miJramo9NITOMyZQ= github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee h1:8doiS7ib3zi6/K172oDhSKU0dJ/miJramo9NITOMyZQ=
github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee/go.mod h1:W0GbEAA4uFNYOGG2cJpmFJ04E6SD1NLELPYZB57/7AY= github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee/go.mod h1:W0GbEAA4uFNYOGG2cJpmFJ04E6SD1NLELPYZB57/7AY=
github.com/KarpelesLab/reflink v1.0.1 h1:d+tdjliwOCqvub9bl0Y02GxahWkNqejNb3TZTTUcQWA=
github.com/KarpelesLab/reflink v1.0.1/go.mod h1:WGkTOKNjd1FsJKBw3mu4JvrPEDJyJJ+JPtxBkbPoCok=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/Kubuxu/imtui v0.0.0-20210401140320-41663d68d0fa h1:1PPxEyGdIGVkX/kqMvLJ95a1dGS1Sz7tpNEgehEYYt0= github.com/Kubuxu/imtui v0.0.0-20210401140320-41663d68d0fa h1:1PPxEyGdIGVkX/kqMvLJ95a1dGS1Sz7tpNEgehEYYt0=

View File

@ -77,7 +77,7 @@ create table sectors_sdr_initial_pieces (
piece_index bigint not null, piece_index bigint not null,
piece_cid text not null, piece_cid text not null,
piece_size bigint not null, piece_size bigint not null, -- padded size
-- data source -- data source
data_url text not null, data_url text not null,
@ -98,6 +98,8 @@ create table sectors_sdr_initial_pieces (
primary key (sp_id, sector_number, piece_index) primary key (sp_id, sector_number, piece_index)
); );
comment on column sectors_sdr_initial_pieces.piece_size is 'padded size of the piece';
create table sectors_allocated_numbers ( create table sectors_allocated_numbers (
sp_id bigint not null primary key, sp_id bigint not null primary key,
allocated jsonb not null allocated jsonb not null

View File

@ -4,6 +4,8 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/KarpelesLab/reflink"
proof2 "github.com/filecoin-project/go-state-types/proof"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
@ -18,7 +20,6 @@ import (
"github.com/filecoin-project/lotus/provider/lpproof" "github.com/filecoin-project/lotus/provider/lpproof"
"github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader"
"github.com/filecoin-project/lotus/storage/sealer/proofpaths" "github.com/filecoin-project/lotus/storage/sealer/proofpaths"
"github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sealer/storiface"
) )
@ -115,7 +116,7 @@ func (sb *SealCalls) GenerateSDR(ctx context.Context, sector storiface.SectorRef
return nil return nil
} }
func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, size abi.PaddedPieceSize, data io.Reader) (cid.Cid, error) { func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool) (cid.Cid, error) {
maybeUns := storiface.FTNone maybeUns := storiface.FTNone
// todo sectors with data // todo sectors with data
@ -125,7 +126,7 @@ func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, size
} }
defer releaseSector() defer releaseSector()
return lpproof.BuildTreeD(data, filepath.Join(paths.Cache, proofpaths.TreeDName), size) return lpproof.BuildTreeD(data, unpaddedData, filepath.Join(paths.Cache, proofpaths.TreeDName), size)
} }
func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, unsealed cid.Cid) (cid.Cid, cid.Cid, error) { func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, unsealed cid.Cid) (cid.Cid, cid.Cid, error) {
@ -134,8 +135,6 @@ func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, uns
return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err) return cid.Undef, cid.Undef, xerrors.Errorf("make phase1 output: %w", err)
} }
log.Errorw("phase1 output", "p1o", p1o)
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing) paths, releaseSector, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, storiface.FTSealed, storiface.PathSealing)
if err != nil { if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err) return cid.Undef, cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err)
@ -149,20 +148,38 @@ func (sb *SealCalls) TreeRC(ctx context.Context, sector storiface.SectorRef, uns
return cid.Undef, cid.Undef, xerrors.Errorf("getting sector size: %w", err) return cid.Undef, cid.Undef, xerrors.Errorf("getting sector size: %w", err)
} }
// paths.Sealed is a string filepath {
f, err := os.Create(paths.Sealed) // copy TreeD prefix to sealed sector, SealPreCommitPhase2 will mutate it in place into the sealed sector
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("creating sealed sector file: %w", err)
}
if err := f.Truncate(int64(ssize)); err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("truncating sealed sector file: %w", err)
}
if os.Getenv("SEAL_WRITE_UNSEALED") == "1" { // first try reflink + truncate, that should be way faster
// expliticly write zeros to unsealed sector err := reflink.Always(filepath.Join(paths.Cache, proofpaths.TreeDName), paths.Sealed)
_, err := io.CopyN(f, nullreader.NewNullReader(abi.UnpaddedPieceSize(ssize)), int64(ssize)) if err == nil {
if err != nil { err = os.Truncate(paths.Sealed, int64(ssize))
return cid.Undef, cid.Undef, xerrors.Errorf("writing zeros to sealed sector file: %w", err) if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("truncating reflinked sealed file: %w", err)
}
} else {
log.Errorw("reflink treed -> sealed failed, falling back to slow copy, use single scratch btrfs or xfs filesystem", "error", err, "sector", sector, "cache", paths.Cache, "sealed", paths.Sealed)
// fallback to slow copy, copy ssize bytes from treed to sealed
dst, err := os.OpenFile(paths.Sealed, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("opening sealed sector file: %w", err)
}
src, err := os.Open(filepath.Join(paths.Cache, proofpaths.TreeDName))
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("opening treed sector file: %w", err)
}
_, err = io.CopyN(dst, src, int64(ssize))
derr := dst.Close()
_ = src.Close()
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("copying treed -> sealed: %w", err)
}
if derr != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("closing sealed file: %w", derr)
}
} }
} }
} }
@ -180,7 +197,29 @@ func (sb *SealCalls) PoRepSnark(ctx context.Context, sn storiface.SectorRef, sea
return nil, xerrors.Errorf("failed to generate vanilla proof: %w", err) return nil, xerrors.Errorf("failed to generate vanilla proof: %w", err)
} }
return ffi.SealCommitPhase2(vproof, sn.ID.Number, sn.ID.Miner) proof, err := ffi.SealCommitPhase2(vproof, sn.ID.Number, sn.ID.Miner)
if err != nil {
return nil, xerrors.Errorf("computing seal proof failed: %w", err)
}
ok, err := ffi.VerifySeal(proof2.SealVerifyInfo{
SealProof: sn.ProofType,
SectorID: sn.ID,
DealIDs: nil,
Randomness: ticket,
InteractiveRandomness: seed,
Proof: proof,
SealedCID: sealed,
UnsealedCID: unsealed,
})
if err != nil {
return nil, xerrors.Errorf("failed to verify proof: %w", err)
}
if !ok {
return nil, xerrors.Errorf("porep failed to validate")
}
return proof, nil
} }
func (sb *SealCalls) makePhase1Out(unsCid cid.Cid, spt abi.RegisteredSealProof) ([]byte, error) { func (sb *SealCalls) makePhase1Out(unsCid cid.Cid, spt abi.RegisteredSealProof) ([]byte, error) {

View File

@ -1,6 +1,7 @@
package lpproof package lpproof
import ( import (
"github.com/filecoin-project/lotus/storage/sealer/fr32"
"io" "io"
"math/bits" "math/bits"
"os" "os"
@ -50,7 +51,7 @@ func hashChunk(data [][]byte) {
} }
} }
func BuildTreeD(data io.Reader, outPath string, size abi.PaddedPieceSize) (cid.Cid, error) { func BuildTreeD(data io.Reader, unpaddedData bool, outPath string, size abi.PaddedPieceSize) (cid.Cid, error) {
out, err := os.Create(outPath) out, err := os.Create(outPath)
if err != nil { if err != nil {
return cid.Undef, err return cid.Undef, err
@ -156,6 +157,11 @@ func BuildTreeD(data io.Reader, outPath string, size abi.PaddedPieceSize) (cid.C
// size, and if it's smaller than a single buffer, we only have one // size, and if it's smaller than a single buffer, we only have one
// smaller buffer // smaller buffer
processedSize := uint64(len(workBuffer[0]))
if unpaddedData {
workBuffer[0] = workBuffer[0][:abi.PaddedPieceSize(len(workBuffer[0])).Unpadded()]
}
_, err := io.ReadFull(data, workBuffer[0]) _, err := io.ReadFull(data, workBuffer[0])
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return cid.Undef, err return cid.Undef, err
@ -164,6 +170,12 @@ func BuildTreeD(data io.Reader, outPath string, size abi.PaddedPieceSize) (cid.C
// start processing // start processing
workWg.Add(1) workWg.Add(1)
go func(startOffset uint64) { go func(startOffset uint64) {
if unpaddedData {
paddedBuf := pool.Get(int(abi.UnpaddedPieceSize(len(workBuffer[0])).Padded()))
fr32.PadSingle(workBuffer[0], paddedBuf)
pool.Put(workBuffer[0])
workBuffer[0] = paddedBuf
}
hashChunk(workBuffer) hashChunk(workBuffer)
// persist apex // persist apex
@ -200,7 +212,7 @@ func BuildTreeD(data io.Reader, outPath string, size abi.PaddedPieceSize) (cid.C
workWg.Done() workWg.Done()
}(processed) }(processed)
processed += uint64(len(workBuffer[0])) processed += processedSize
} }
workWg.Wait() workWg.Wait()

View File

@ -99,7 +99,7 @@ func Test2K(t *testing.T) {
tempFile := filepath.Join(t.TempDir(), "tree.dat") tempFile := filepath.Join(t.TempDir(), "tree.dat")
commd, err := BuildTreeD(bytes.NewReader(data), tempFile, 2048) commd, err := BuildTreeD(bytes.NewReader(data), false, tempFile, 2048)
require.NoError(t, err) require.NoError(t, err)
fmt.Println(commd) fmt.Println(commd)
@ -123,23 +123,7 @@ func Test2K(t *testing.T) {
} }
func Test8MiB(t *testing.T) { const expectD8M = `00000000: 01 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
data := make([]byte, 8<<20)
data[0] = 0x01
tempFile := filepath.Join(t.TempDir(), "tree.dat")
commd, err := BuildTreeD(bytes.NewReader(data), tempFile, 8<<20)
require.NoError(t, err)
fmt.Println(commd)
// dump tree.dat
dat, err := os.ReadFile(tempFile)
require.NoError(t, err)
actualD := hexPrint32LDedup(dat)
expectD := `00000000: 01 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
00000020: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00000020: 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
* *
00800000: 16 ab ab 34 1f b7 f3 70 e2 7e 4d ad cf 81 76 6d d0 df d0 ae 64 46 94 77 bb 2c f6 61 49 38 b2 2f 00800000: 16 ab ab 34 1f b7 f3 70 e2 7e 4d ad cf 81 76 6d d0 df d0 ae 64 46 94 77 bb 2c f6 61 49 38 b2 2f
@ -195,10 +179,45 @@ func Test8MiB(t *testing.T) {
00ffffc0: 23 40 4a 88 80 f9 cb c7 20 39 cb 86 14 35 9c 28 34 84 55 70 fe 95 19 0b bd 4d 93 41 42 e8 25 2c 00ffffc0: 23 40 4a 88 80 f9 cb c7 20 39 cb 86 14 35 9c 28 34 84 55 70 fe 95 19 0b bd 4d 93 41 42 e8 25 2c
` `
func Test8MiB(t *testing.T) {
data := make([]byte, 8<<20)
data[0] = 0x01
tempFile := filepath.Join(t.TempDir(), "tree.dat")
commd, err := BuildTreeD(bytes.NewReader(data), false, tempFile, 8<<20)
require.NoError(t, err)
fmt.Println(commd)
// dump tree.dat
dat, err := os.ReadFile(tempFile)
require.NoError(t, err)
actualD := hexPrint32LDedup(dat)
fmt.Println(actualD) fmt.Println(actualD)
require.EqualValues(t, expectD, actualD) require.EqualValues(t, expectD8M, actualD)
require.Equal(t, "baga6ea4seaqcgqckrcapts6hea44xbqugwocqneekvyp5fizbo6u3e2biluckla", commd.String())
}
func Test8MiBUnpad(t *testing.T) {
data := make([]byte, abi.PaddedPieceSize(8<<20).Unpadded())
data[0] = 0x01
tempFile := filepath.Join(t.TempDir(), "tree.dat")
commd, err := BuildTreeD(bytes.NewReader(data), true, tempFile, 8<<20)
require.NoError(t, err)
fmt.Println(commd)
// dump tree.dat
dat, err := os.ReadFile(tempFile)
require.NoError(t, err)
actualD := hexPrint32LDedup(dat)
fmt.Println(actualD)
require.EqualValues(t, expectD8M, actualD)
require.Equal(t, "baga6ea4seaqcgqckrcapts6hea44xbqugwocqneekvyp5fizbo6u3e2biluckla", commd.String()) require.Equal(t, "baga6ea4seaqcgqckrcapts6hea44xbqugwocqneekvyp5fizbo6u3e2biluckla", commd.String())
} }
@ -312,7 +331,7 @@ func BenchmarkBuildTreeD512M(b *testing.B) {
b.StartTimer() // Start the timer for the BuildTreeD operation b.StartTimer() // Start the timer for the BuildTreeD operation
_, err = BuildTreeD(bytes.NewReader(data), tempFilePath, dataSize) _, err = BuildTreeD(bytes.NewReader(data), false, tempFilePath, dataSize)
if err != nil { if err != nil {
b.Fatalf("BuildTreeD failed: %v", err) b.Fatalf("BuildTreeD failed: %v", err)
} }

View File

@ -3,6 +3,8 @@ package lpseal
import ( import (
"bytes" "bytes"
"context" "context"
"github.com/filecoin-project/go-commp-utils/nonffi"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -84,17 +86,34 @@ func (s *SDRTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bo
return false, xerrors.Errorf("getting pieces: %w", err) return false, xerrors.Errorf("getting pieces: %w", err)
} }
if len(pieces) > 0 {
// todo sdr with data
return false, xerrors.Errorf("todo sdr with data")
}
ssize, err := sectorParams.RegSealProof.SectorSize() ssize, err := sectorParams.RegSealProof.SectorSize()
if err != nil { if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err) return false, xerrors.Errorf("getting sector size: %w", err)
} }
commd := zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded()) var commd cid.Cid
if len(pieces) > 0 {
pieceInfos := make([]abi.PieceInfo, len(pieces))
for i, p := range pieces {
c, err := cid.Parse(p.PieceCID)
if err != nil {
return false, xerrors.Errorf("parsing piece cid: %w", err)
}
pieceInfos[i] = abi.PieceInfo{
Size: abi.PaddedPieceSize(p.PieceSize),
PieceCID: c,
}
}
commd, err = nonffi.GenerateUnsealedCID(sectorParams.RegSealProof, pieceInfos)
if err != nil {
return false, xerrors.Errorf("computing CommD: %w", err)
}
} else {
commd = zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded())
}
sref := storiface.SectorRef{ sref := storiface.SectorRef{
ID: abi.SectorID{ ID: abi.SectorID{

View File

@ -102,9 +102,8 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo
SectorNumber: abi.SectorNumber(sectorParams.SectorNumber), SectorNumber: abi.SectorNumber(sectorParams.SectorNumber),
SealedCID: sealedCID, SealedCID: sealedCID,
SealRandEpoch: sectorParams.TicketEpoch, SealRandEpoch: sectorParams.TicketEpoch,
DealIDs: nil, // todo DealIDs: nil,
Expiration: expiration, Expiration: expiration,
//UnsealedCid: unsealedCID, // todo with deals
}) })
var pbuf bytes.Buffer var pbuf bytes.Buffer
if err := params.MarshalCBOR(&pbuf); err != nil { if err := params.MarshalCBOR(&pbuf); err != nil {

View File

@ -2,6 +2,12 @@ package lpseal
import ( import (
"context" "context"
"github.com/filecoin-project/go-commp-utils/nonffi"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader"
"github.com/ipfs/go-cid"
"io"
"net/http"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -12,7 +18,6 @@ import (
"github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources" "github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/provider/lpffi" "github.com/filecoin-project/lotus/provider/lpffi"
"github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader"
"github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sealer/storiface"
) )
@ -60,27 +65,68 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
PieceIndex int64 `db:"piece_index"` PieceIndex int64 `db:"piece_index"`
PieceCID string `db:"piece_cid"` PieceCID string `db:"piece_cid"`
PieceSize int64 `db:"piece_size"` PieceSize int64 `db:"piece_size"`
DataUrl *string `db:"data_url"`
DataHeaders *[]byte `db:"data_headers"`
DataRawSize *int64 `db:"data_raw_size"`
} }
err = t.db.Select(ctx, &pieces, ` err = t.db.Select(ctx, &pieces, `
SELECT piece_index, piece_cid, piece_size SELECT piece_index, piece_cid, piece_size, data_url, data_headers, data_raw_size
FROM sectors_sdr_initial_pieces FROM sectors_sdr_initial_pieces
WHERE sp_id = $1 AND sector_number = $2`, sectorParams.SpID, sectorParams.SectorNumber) WHERE sp_id = $1 AND sector_number = $2`, sectorParams.SpID, sectorParams.SectorNumber)
if err != nil { if err != nil {
return false, xerrors.Errorf("getting pieces: %w", err) return false, xerrors.Errorf("getting pieces: %w", err)
} }
if len(pieces) > 0 {
// todo sectors with data
return false, xerrors.Errorf("todo sectors with data")
}
ssize, err := sectorParams.RegSealProof.SectorSize() ssize, err := sectorParams.RegSealProof.SectorSize()
if err != nil { if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err) return false, xerrors.Errorf("getting sector size: %w", err)
} }
commd := zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded()) var commd cid.Cid
var dataReader io.Reader
var unpaddedData bool
if len(pieces) > 0 {
pieceInfos := make([]abi.PieceInfo, len(pieces))
pieceReaders := make([]io.Reader, len(pieces))
for i, p := range pieces {
// make pieceInfo
c, err := cid.Parse(p.PieceCID)
if err != nil {
return false, xerrors.Errorf("parsing piece cid: %w", err)
}
pieceInfos[i] = abi.PieceInfo{
Size: abi.PaddedPieceSize(p.PieceSize),
PieceCID: c,
}
// make pieceReader
if p.DataUrl != nil {
pieceReaders[i], _ = padreader.New(&UrlPieceReader{
Url: *p.DataUrl,
RawSize: *p.DataRawSize,
}, uint64(*p.DataRawSize))
} else { // padding piece (w/o fr32 padding, added in TreeD)
pieceReaders[i] = nullreader.NewNullReader(abi.PaddedPieceSize(p.PieceSize).Unpadded())
}
}
commd, err = nonffi.GenerateUnsealedCID(sectorParams.RegSealProof, pieceInfos)
if err != nil {
return false, xerrors.Errorf("computing CommD: %w", err)
}
dataReader = io.MultiReader(pieceReaders...)
unpaddedData = true
} else {
commd = zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded())
dataReader = nullreader.NewNullReader(abi.UnpaddedPieceSize(ssize))
unpaddedData = false // nullreader includes fr32 zero bits
}
sref := storiface.SectorRef{ sref := storiface.SectorRef{
ID: abi.SectorID{ ID: abi.SectorID{
@ -91,7 +137,7 @@ func (t *TreesTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
} }
// D // D
treeUnsealed, err := t.sc.TreeD(ctx, sref, abi.PaddedPieceSize(ssize), nullreader.NewNullReader(abi.UnpaddedPieceSize(ssize))) treeUnsealed, err := t.sc.TreeD(ctx, sref, abi.PaddedPieceSize(ssize), dataReader, unpaddedData)
if err != nil { if err != nil {
return false, xerrors.Errorf("computing tree d: %w", err) return false, xerrors.Errorf("computing tree d: %w", err)
} }
@ -152,4 +198,61 @@ func (t *TreesTask) Adder(taskFunc harmonytask.AddTaskFunc) {
t.sp.pollers[pollerTrees].Set(taskFunc) t.sp.pollers[pollerTrees].Set(taskFunc)
} }
type UrlPieceReader struct {
Url string
RawSize int64 // the exact number of bytes read, if we read more or less that's an error
readSoFar int64
active io.ReadCloser // auto-closed on EOF
}
func (u *UrlPieceReader) Read(p []byte) (n int, err error) {
// Check if we have already read the required amount of data
if u.readSoFar >= u.RawSize {
return 0, io.EOF
}
// If 'active' is nil, initiate the HTTP request
if u.active == nil {
resp, err := http.Get(u.Url)
if err != nil {
return 0, err
}
// Set 'active' to the response body
u.active = resp.Body
}
// Calculate the maximum number of bytes we can read without exceeding RawSize
toRead := u.RawSize - u.readSoFar
if int64(len(p)) > toRead {
p = p[:toRead]
}
n, err = u.active.Read(p)
// Update the number of bytes read so far
u.readSoFar += int64(n)
// If the number of bytes read exceeds RawSize, return an error
if u.readSoFar > u.RawSize {
return n, xerrors.New("read beyond the specified RawSize")
}
// If EOF is reached, close the reader
if err == io.EOF {
cerr := u.active.Close()
if cerr != nil {
log.Errorf("error closing http piece reader: %s", cerr)
}
// if we're below the RawSize, return an unexpected EOF error
if u.readSoFar < u.RawSize {
return n, io.ErrUnexpectedEOF
}
}
return n, err
}
var _ harmonytask.TaskInterface = &TreesTask{} var _ harmonytask.TaskInterface = &TreesTask{}

View File

@ -0,0 +1,69 @@
package lpseal
import (
"io"
"net/http"
"net/http/httptest"
"testing"
)
// TestUrlPieceReader_Read tests various scenarios of reading data from UrlPieceReader
func TestUrlPieceReader_Read(t *testing.T) {
// Create a test server
testData := "This is a test string."
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, testData)
}))
defer ts.Close()
tests := []struct {
name string
rawSize int64
expected string
expectError bool
expectEOF bool
}{
{"ReadExact", int64(len(testData)), testData, false, true},
{"ReadLess", 10, testData[:10], false, false},
{"ReadMore", int64(len(testData)) + 10, "", true, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
reader := UrlPieceReader{
Url: ts.URL,
RawSize: tt.rawSize,
}
buffer, err := io.ReadAll(&reader)
if err != nil {
if (err != io.EOF && !tt.expectError) || (err == io.EOF && !tt.expectEOF) {
t.Errorf("Read() error = %v, expectError %v, expectEOF %v", err, tt.expectError, tt.expectEOF)
}
} else {
if got := string(buffer); got != tt.expected {
t.Errorf("Read() got = %v, expected %v", got, tt.expected)
}
}
})
}
}
// TestUrlPieceReader_Read_Error tests the error handling of UrlPieceReader
func TestUrlPieceReader_Read_Error(t *testing.T) {
// Simulate a server that returns an error
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "error", http.StatusInternalServerError)
}))
defer ts.Close()
reader := UrlPieceReader{
Url: ts.URL,
RawSize: 100,
}
buffer := make([]byte, 200)
_, err := reader.Read(buffer)
if err == nil {
t.Errorf("Expected an error, but got nil")
}
}

View File

@ -69,6 +69,10 @@ func Pad(in, out []byte) {
pad(in, out) pad(in, out)
} }
func PadSingle(in, out []byte) {
pad(in, out)
}
func pad(in, out []byte) { func pad(in, out []byte) {
chunks := len(out) / 128 chunks := len(out) / 128
for chunk := 0; chunk < chunks; chunk++ { for chunk := 0; chunk < chunks; chunk++ {