fix: when retrying Add Piece, first seek to start of reader
This commit is contained in:
parent
a4728d3c72
commit
839fc8df7d
2
go.mod
2
go.mod
@ -36,7 +36,7 @@ require (
|
|||||||
github.com/filecoin-project/go-data-transfer v1.12.0
|
github.com/filecoin-project/go-data-transfer v1.12.0
|
||||||
github.com/filecoin-project/go-fil-commcid v0.1.0
|
github.com/filecoin-project/go-fil-commcid v0.1.0
|
||||||
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
|
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
|
||||||
github.com/filecoin-project/go-fil-markets v1.13.5
|
github.com/filecoin-project/go-fil-markets v1.13.6-0.20211217102747-e95a23480585
|
||||||
github.com/filecoin-project/go-jsonrpc v0.1.5
|
github.com/filecoin-project/go-jsonrpc v0.1.5
|
||||||
github.com/filecoin-project/go-padreader v0.0.1
|
github.com/filecoin-project/go-padreader v0.0.1
|
||||||
github.com/filecoin-project/go-paramfetch v0.0.2
|
github.com/filecoin-project/go-paramfetch v0.0.2
|
||||||
|
4
go.sum
4
go.sum
@ -322,8 +322,8 @@ github.com/filecoin-project/go-fil-commcid v0.1.0 h1:3R4ds1A9r6cr8mvZBfMYxTS88Oq
|
|||||||
github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
|
github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
|
||||||
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo=
|
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo=
|
||||||
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8=
|
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8=
|
||||||
github.com/filecoin-project/go-fil-markets v1.13.5 h1:NLeF8rI5ZPOJNYEgA6NHrvnuh5QE/2dwuU/7wPW7zP0=
|
github.com/filecoin-project/go-fil-markets v1.13.6-0.20211217102747-e95a23480585 h1:kDzqA/WyUOVVt2en/r81nvS21s0sMThv4qlKoveooDU=
|
||||||
github.com/filecoin-project/go-fil-markets v1.13.5/go.mod h1:vXOHH3q2+zLk929W+lIq3etuDFTyJJ8nG2DwGHG2R1E=
|
github.com/filecoin-project/go-fil-markets v1.13.6-0.20211217102747-e95a23480585/go.mod h1:vXOHH3q2+zLk929W+lIq3etuDFTyJJ8nG2DwGHG2R1E=
|
||||||
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
|
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
|
||||||
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
|
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
|
||||||
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=
|
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=
|
||||||
|
@ -4,7 +4,6 @@ package storageadapter
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
@ -88,7 +87,7 @@ func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemark
|
|||||||
return n.dealPublisher.Publish(ctx, deal.ClientDealProposal)
|
return n.dealPublisher.Publish(ctx, deal.ClientDealProposal)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize abi.UnpaddedPieceSize, pieceData io.Reader) (*storagemarket.PackingResult, error) {
|
func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize abi.UnpaddedPieceSize, pieceData shared.ReadSeekStarter) (*storagemarket.PackingResult, error) {
|
||||||
if deal.PublishCid == nil {
|
if deal.PublishCid == nil {
|
||||||
return nil, xerrors.Errorf("deal.PublishCid can't be nil")
|
return nil, xerrors.Errorf("deal.PublishCid can't be nil")
|
||||||
}
|
}
|
||||||
@ -104,17 +103,32 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema
|
|||||||
KeepUnsealed: deal.FastRetrieval,
|
KeepUnsealed: deal.FastRetrieval,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Attempt to add the piece to the sector
|
||||||
p, offset, err := n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo)
|
p, offset, err := n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo)
|
||||||
curTime := build.Clock.Now()
|
curTime := build.Clock.Now()
|
||||||
for build.Clock.Since(curTime) < addPieceRetryTimeout {
|
for build.Clock.Since(curTime) < addPieceRetryTimeout {
|
||||||
|
// Check if there was an error because of too many sectors being sealed
|
||||||
if !xerrors.Is(err, sealing.ErrTooManySectorsSealing) {
|
if !xerrors.Is(err, sealing.ErrTooManySectorsSealing) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to addPiece for deal %d, err: %v", deal.DealID, err)
|
log.Errorf("failed to addPiece for deal %d, err: %v", deal.DealID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// There was either a fatal error or no error. In either case
|
||||||
|
// don't retry AddPiece
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The piece could not be added to the sector because there are too
|
||||||
|
// many sectors being sealed, back-off for a while before trying again
|
||||||
select {
|
select {
|
||||||
case <-build.Clock.After(addPieceRetryWait):
|
case <-build.Clock.After(addPieceRetryWait):
|
||||||
|
// Reset the reader to the start
|
||||||
|
err = pieceData.SeekStart()
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("failed to reset piece reader to start before retrying AddPiece for deal %d: %w", deal.DealID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt to add the piece again
|
||||||
p, offset, err = n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo)
|
p, offset, err = n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo)
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil, xerrors.New("context expired while waiting to retry AddPiece")
|
return nil, xerrors.New("context expired while waiting to retry AddPiece")
|
||||||
|
Loading…
Reference in New Issue
Block a user