retry add piece

This commit is contained in:
Aarsh Shah 2020-09-11 16:58:09 +05:30
parent 4410da98be
commit 62b2963781
2 changed files with 18 additions and 3 deletions

View File

@ -28,6 +28,8 @@ import (
const SectorStorePrefix = "/sectors" const SectorStorePrefix = "/sectors"
var ErrTooManySectorsSealing = xerrors.New("too many sectors sealing")
var log = logging.Logger("sectors") var log = logging.Logger("sectors")
type SectorLocation struct { type SectorLocation struct {
@ -280,7 +282,7 @@ func (m *Sealing) newDealSector() (abi.SectorNumber, error) {
if cfg.MaxSealingSectorsForDeals > 0 { if cfg.MaxSealingSectorsForDeals > 0 {
if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals { if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals {
return 0, xerrors.Errorf("too many sectors sealing") return 0, ErrTooManySectorsSealing
} }
} }

View File

@ -6,6 +6,7 @@ import (
"bytes" "bytes"
"context" "context"
"io" "io"
"time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
@ -35,6 +36,7 @@ import (
"github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/filecoin-project/lotus/storage/sectorblocks"
) )
var addPieceRetryWait = 5 * time.Minute
var log = logging.Logger("storageadapter") var log = logging.Logger("storageadapter")
type ProviderNodeAdapter struct { type ProviderNodeAdapter struct {
@ -91,7 +93,7 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema
return nil, xerrors.Errorf("deal.PublishCid can't be nil") return nil, xerrors.Errorf("deal.PublishCid can't be nil")
} }
p, offset, err := n.secb.AddPiece(ctx, pieceSize, pieceData, sealing.DealInfo{ sdInfo := sealing.DealInfo{
DealID: deal.DealID, DealID: deal.DealID,
PublishCid: deal.PublishCid, PublishCid: deal.PublishCid,
DealSchedule: sealing.DealSchedule{ DealSchedule: sealing.DealSchedule{
@ -99,7 +101,18 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema
EndEpoch: deal.ClientDealProposal.Proposal.EndEpoch, EndEpoch: deal.ClientDealProposal.Proposal.EndEpoch,
}, },
KeepUnsealed: deal.FastRetrieval, KeepUnsealed: deal.FastRetrieval,
}) }
p, offset, err := n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo)
if xerrors.Is(err, sealing.ErrTooManySectorsSealing) {
select {
case <-time.After(addPieceRetryWait):
p, offset, err = n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo)
case <-ctx.Done():
return nil, xerrors.New("context expired while waiting to retry AddPiece")
}
}
if err != nil { if err != nil {
return nil, xerrors.Errorf("AddPiece failed: %s", err) return nil, xerrors.Errorf("AddPiece failed: %s", err)
} }