Merge pull request #9328 from filecoin-project/fix/ffiwrap-close-ap

fix: ffiwrapper: Close readers in AddPiece
This commit is contained in:
Łukasz Magiera 2022-09-19 17:47:15 +02:00 committed by GitHub
commit 39af6e0365
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 101 additions and 3 deletions

View File

@ -64,7 +64,9 @@ func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize,
log.Warnf("DataCid: cannot close pieceData reader %T because it is not an io.Closer", origPieceData)
return
}
closer.Close() //nolint:errcheck
if err := closer.Close(); err != nil {
log.Warnw("closing pieceData in DataCid", "error", err)
}
}()
pieceData = io.LimitReader(io.MultiReader(
@ -186,7 +188,19 @@ func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize,
}, nil
}
func (sb *Sealer) AddPiece(ctx context.Context, sector storiface.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storiface.Data) (abi.PieceInfo, error) {
func (sb *Sealer) AddPiece(ctx context.Context, sector storiface.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (abi.PieceInfo, error) {
origPieceData := pieceData
defer func() {
closer, ok := origPieceData.(io.Closer)
if !ok {
log.Warnf("AddPiece: cannot close pieceData reader %T because it is not an io.Closer", origPieceData)
return
}
if err := closer.Close(); err != nil {
log.Warnw("closing pieceData in AddPiece", "error", err)
}
}()
// TODO: allow tuning those:
chunk := abi.PaddedPieceSize(4 << 20)
parallel := runtime.NumCPU()
@ -252,7 +266,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storiface.SectorRef, exis
pw := fr32.NewPadWriter(w)
pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), pw)
pr := io.TeeReader(io.LimitReader(pieceData, int64(pieceSize)), pw)
throttle := make(chan []byte, parallel)
piecePromises := make([]func() (abi.PieceInfo, error), 0)

View File

@ -991,3 +991,87 @@ func TestPoStChallengeAssumptions(t *testing.T) {
require.Len(t, c1.Challenges, 3)
}
}
func TestDCAPCloses(t *testing.T) {
sz := abi.PaddedPieceSize(2 << 10).Unpadded()
cdir, err := ioutil.TempDir("", "sbtest-c-")
if err != nil {
t.Fatal(err)
}
miner := abi.ActorID(123)
sp := &basicfs.Provider{
Root: cdir,
}
sb, err := New(sp)
if err != nil {
t.Fatalf("%+v", err)
}
cleanup := func() {
if t.Failed() {
fmt.Printf("not removing %s\n", cdir)
return
}
if err := os.RemoveAll(cdir); err != nil {
t.Error(err)
}
}
t.Cleanup(cleanup)
t.Run("DataCid", func(t *testing.T) {
r := rand.New(rand.NewSource(0x7e5))
clr := &closeAssertReader{
Reader: io.LimitReader(r, int64(sz)),
}
c, err := sb.DataCid(context.TODO(), sz, clr)
if err != nil {
t.Fatal(err)
}
require.Equal(t, "baga6ea4seaqeje7jy4hufnybpo7ckxzujaigqbcxhdjq7ojb4b6xzgqdugkyciq", c.PieceCID.String())
require.True(t, clr.closed)
})
t.Run("AddPiece", func(t *testing.T) {
r := rand.New(rand.NewSource(0x7e5))
clr := &closeAssertReader{
Reader: io.LimitReader(r, int64(sz)),
}
c, err := sb.AddPiece(context.TODO(), storiface.SectorRef{
ID: abi.SectorID{
Miner: miner,
Number: 0,
},
ProofType: abi.RegisteredSealProof_StackedDrg2KiBV1_1,
}, nil, sz, clr)
if err != nil {
t.Fatal(err)
}
require.Equal(t, "baga6ea4seaqeje7jy4hufnybpo7ckxzujaigqbcxhdjq7ojb4b6xzgqdugkyciq", c.PieceCID.String())
require.True(t, clr.closed)
})
}
type closeAssertReader struct {
io.Reader
closed bool
}
func (c *closeAssertReader) Close() error {
if c.closed {
panic("double close")
}
c.closed = true
return nil
}
var _ io.Closer = &closeAssertReader{}