sectorbuilder: Use StandaloneWriteWithAlignment

This commit is contained in:
Łukasz Magiera 2019-11-07 21:40:46 +01:00
parent f6a49ab9f9
commit 34846c538e
7 changed files with 23 additions and 14 deletions

View File

@ -36,7 +36,7 @@ func TestDealFlow(t *testing.T, b APIBuilder) {
} }
time.Sleep(time.Second) time.Sleep(time.Second)
r := io.LimitReader(rand.New(rand.NewSource(17)), 9000000) r := io.LimitReader(rand.New(rand.NewSource(17)), 1000)
fcid, err := client.ClientImportLocal(ctx, r) fcid, err := client.ClientImportLocal(ctx, r)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -19,7 +19,7 @@ func (sb *SectorBuilder) stagedSectorPath(sectorID uint64) string {
} }
func (sb *SectorBuilder) stagedSectorFile(sectorID uint64) (*os.File, error) { func (sb *SectorBuilder) stagedSectorFile(sectorID uint64) (*os.File, error) {
return os.OpenFile(sb.stagedSectorPath(sectorID), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) return os.OpenFile(sb.stagedSectorPath(sectorID), os.O_RDWR|os.O_CREATE, 0644)
} }
func (sb *SectorBuilder) sealedSectorPath(sectorID uint64) (string, error) { func (sb *SectorBuilder) sealedSectorPath(sectorID uint64) (string, error) {

View File

@ -124,7 +124,7 @@ func (sb *SectorBuilder) AcquireSectorId() (uint64, error) {
return sectorbuilder.AcquireSectorId(sb.handle) return sectorbuilder.AcquireSectorId(sb.handle)
} }
func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader) (PublicPieceInfo, error) { func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Reader, existingPieceSizes []uint64) (PublicPieceInfo, error) {
f, werr, err := toReadableFile(file, int64(pieceSize)) f, werr, err := toReadableFile(file, int64(pieceSize))
if err != nil { if err != nil {
return PublicPieceInfo{}, err return PublicPieceInfo{}, err
@ -138,13 +138,13 @@ func (sb *SectorBuilder) AddPiece(pieceSize uint64, sectorId uint64, file io.Rea
return PublicPieceInfo{}, err return PublicPieceInfo{}, err
} }
writeUnpadded, commP, err := sectorbuilder.StandaloneWriteWithoutAlignment(f, pieceSize, stagedFile) _, _, commP, err := sectorbuilder.StandaloneWriteWithAlignment(f, pieceSize, stagedFile, existingPieceSizes)
if err != nil { if err != nil {
return PublicPieceInfo{}, err return PublicPieceInfo{}, err
} }
if writeUnpadded != pieceSize { /*if writeUnpadded != pieceSize {
return PublicPieceInfo{}, xerrors.Errorf("writeUnpadded != pieceSize: %d != %d", writeUnpadded, pieceSize) return PublicPieceInfo{}, xerrors.Errorf("writeUnpadded != pieceSize: %d != %d", writeUnpadded, pieceSize)
} }*/
if err := stagedFile.Close(); err != nil { if err := stagedFile.Close(); err != nil {
return PublicPieceInfo{}, err return PublicPieceInfo{}, err
@ -188,7 +188,7 @@ func (sb *SectorBuilder) SealPreCommit(sectorID uint64, ticket SealTicket, piece
} }
ussize := UserBytesForSectorSize(sb.ssize) ussize := UserBytesForSectorSize(sb.ssize)
if sum != ussize { if sum != ussize {
return RawSealPreCommitOutput{}, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize - sum)) return RawSealPreCommitOutput{}, xerrors.Errorf("aggregated piece sizes don't match sector size: %d != %d (%d)", sum, ussize, int64(ussize-sum))
} }
stagedPath := sb.stagedSectorPath(sectorID) stagedPath := sb.stagedSectorPath(sectorID)

View File

@ -37,7 +37,7 @@ func TestSealAndVerify(t *testing.T) {
} }
r := io.LimitReader(rand.New(rand.NewSource(42)), int64(dlen)) r := io.LimitReader(rand.New(rand.NewSource(42)), int64(dlen))
ppi, err := sb.AddPiece(dlen, sid, r) ppi, err := sb.AddPiece(dlen, sid, r, []uint64{})
if err != nil { if err != nil {
t.Fatalf("%+v", err) t.Fatalf("%+v", err)
} }

View File

@ -16,8 +16,7 @@ import (
"github.com/filecoin-project/lotus/lib/sectorbuilder" "github.com/filecoin-project/lotus/lib/sectorbuilder"
) )
// TODO: expected sector ID func (m *Miner) storeGarbage(ctx context.Context, sectorID uint64, existingPieceSizes []uint64, sizes ...uint64) ([]Piece, error) {
func (m *Miner) storeGarbage(ctx context.Context, sectorID uint64, sizes ...uint64) ([]Piece, error) {
if len(sizes) == 0 { if len(sizes) == 0 {
return nil, nil return nil, nil
} }
@ -95,11 +94,13 @@ func (m *Miner) storeGarbage(ctx context.Context, sectorID uint64, sizes ...uint
for i, size := range sizes { for i, size := range sizes {
name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000)) name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000))
ppi, err := m.sb.AddPiece(size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size))) ppi, err := m.sb.AddPiece(size, sectorID, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), existingPieceSizes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
existingPieceSizes = append(existingPieceSizes, size)
out[i] = Piece{ out[i] = Piece{
DealID: resp.DealIDs[i], DealID: resp.DealIDs[i],
Ref: name, Ref: name,
@ -122,7 +123,7 @@ func (m *Miner) StoreGarbageData(_ context.Context) error {
return return
} }
pieces, err := m.storeGarbage(ctx, sid, size) pieces, err := m.storeGarbage(ctx, sid, []uint64{}, size)
if err != nil { if err != nil {
log.Errorf("%+v", err) log.Errorf("%+v", err)
return return

View File

@ -95,6 +95,14 @@ func (t *SectorInfo) refs() []string {
return out return out
} }
func (t *SectorInfo) existingPieces() []uint64 {
out := make([]uint64, len(t.Pieces))
for i, piece := range t.Pieces {
out[i] = piece.Size
}
return out
}
func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput { func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput {
var out sectorbuilder.RawSealPreCommitOutput var out sectorbuilder.RawSealPreCommitOutput
@ -203,7 +211,7 @@ func (m *Miner) SealPiece(ctx context.Context, ref string, size uint64, r io.Rea
return 0, xerrors.Errorf("acquiring sector ID: %w", err) return 0, xerrors.Errorf("acquiring sector ID: %w", err)
} }
ppi, err := m.sb.AddPiece(size, sid, r) ppi, err := m.sb.AddPiece(size, sid, r, []uint64{})
if err != nil { if err != nil {
return 0, xerrors.Errorf("adding piece to sector: %w", err) return 0, xerrors.Errorf("adding piece to sector: %w", err)
} }

View File

@ -58,7 +58,7 @@ func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*Sec
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID) log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID)
} }
pieces, err := m.storeGarbage(ctx, sector.SectorID, fillerSizes...) pieces, err := m.storeGarbage(ctx, sector.SectorID, sector.existingPieces(), fillerSizes...)
if err != nil { if err != nil {
return nil, xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err) return nil, xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
} }