fix: ffiwrapper: Close readers in AddPiece
This commit is contained in:
parent
4abc38dacc
commit
d68bb937d8
@ -60,7 +60,9 @@ func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize,
|
|||||||
log.Warnf("DataCid: cannot close pieceData reader %T because it is not an io.Closer", origPieceData)
|
log.Warnf("DataCid: cannot close pieceData reader %T because it is not an io.Closer", origPieceData)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
closer.Close() //nolint:errcheck
|
if err := closer.Close(); err != nil {
|
||||||
|
log.Warnw("closing pieceData in DataCid", "error", err)
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
pieceData = io.LimitReader(io.MultiReader(
|
pieceData = io.LimitReader(io.MultiReader(
|
||||||
@ -182,7 +184,19 @@ func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize,
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sb *Sealer) AddPiece(ctx context.Context, sector storiface.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storiface.Data) (abi.PieceInfo, error) {
|
func (sb *Sealer) AddPiece(ctx context.Context, sector storiface.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (abi.PieceInfo, error) {
|
||||||
|
origPieceData := pieceData
|
||||||
|
defer func() {
|
||||||
|
closer, ok := origPieceData.(io.Closer)
|
||||||
|
if !ok {
|
||||||
|
log.Warnf("AddPiece: cannot close pieceData reader %T because it is not an io.Closer", origPieceData)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := closer.Close(); err != nil {
|
||||||
|
log.Warnw("closing pieceData in AddPiece", "error", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// TODO: allow tuning those:
|
// TODO: allow tuning those:
|
||||||
chunk := abi.PaddedPieceSize(4 << 20)
|
chunk := abi.PaddedPieceSize(4 << 20)
|
||||||
parallel := runtime.NumCPU()
|
parallel := runtime.NumCPU()
|
||||||
@ -248,7 +262,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector storiface.SectorRef, exis
|
|||||||
|
|
||||||
pw := fr32.NewPadWriter(w)
|
pw := fr32.NewPadWriter(w)
|
||||||
|
|
||||||
pr := io.TeeReader(io.LimitReader(file, int64(pieceSize)), pw)
|
pr := io.TeeReader(io.LimitReader(pieceData, int64(pieceSize)), pw)
|
||||||
|
|
||||||
throttle := make(chan []byte, parallel)
|
throttle := make(chan []byte, parallel)
|
||||||
piecePromises := make([]func() (abi.PieceInfo, error), 0)
|
piecePromises := make([]func() (abi.PieceInfo, error), 0)
|
||||||
|
@ -991,3 +991,87 @@ func TestPoStChallengeAssumptions(t *testing.T) {
|
|||||||
require.Len(t, c1.Challenges, 3)
|
require.Len(t, c1.Challenges, 3)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDCAPCloses(t *testing.T) {
|
||||||
|
sz := abi.PaddedPieceSize(2 << 10).Unpadded()
|
||||||
|
|
||||||
|
cdir, err := ioutil.TempDir("", "sbtest-c-")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
miner := abi.ActorID(123)
|
||||||
|
|
||||||
|
sp := &basicfs.Provider{
|
||||||
|
Root: cdir,
|
||||||
|
}
|
||||||
|
sb, err := New(sp)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("%+v", err)
|
||||||
|
}
|
||||||
|
cleanup := func() {
|
||||||
|
if t.Failed() {
|
||||||
|
fmt.Printf("not removing %s\n", cdir)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := os.RemoveAll(cdir); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.Cleanup(cleanup)
|
||||||
|
|
||||||
|
t.Run("DataCid", func(t *testing.T) {
|
||||||
|
r := rand.New(rand.NewSource(0x7e5))
|
||||||
|
|
||||||
|
clr := &closeAssertReader{
|
||||||
|
Reader: io.LimitReader(r, int64(sz)),
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := sb.DataCid(context.TODO(), sz, clr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, "baga6ea4seaqeje7jy4hufnybpo7ckxzujaigqbcxhdjq7ojb4b6xzgqdugkyciq", c.PieceCID.String())
|
||||||
|
require.True(t, clr.closed)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("AddPiece", func(t *testing.T) {
|
||||||
|
r := rand.New(rand.NewSource(0x7e5))
|
||||||
|
|
||||||
|
clr := &closeAssertReader{
|
||||||
|
Reader: io.LimitReader(r, int64(sz)),
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := sb.AddPiece(context.TODO(), storiface.SectorRef{
|
||||||
|
ID: abi.SectorID{
|
||||||
|
Miner: miner,
|
||||||
|
Number: 0,
|
||||||
|
},
|
||||||
|
ProofType: abi.RegisteredSealProof_StackedDrg2KiBV1_1,
|
||||||
|
}, nil, sz, clr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, "baga6ea4seaqeje7jy4hufnybpo7ckxzujaigqbcxhdjq7ojb4b6xzgqdugkyciq", c.PieceCID.String())
|
||||||
|
require.True(t, clr.closed)
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
type closeAssertReader struct {
|
||||||
|
io.Reader
|
||||||
|
closed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *closeAssertReader) Close() error {
|
||||||
|
if c.closed {
|
||||||
|
panic("double close")
|
||||||
|
}
|
||||||
|
|
||||||
|
c.closed = true
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ io.Closer = &closeAssertReader{}
|
||||||
|
Loading…
Reference in New Issue
Block a user