From 62b2963781300ad9ac8bbdc4c6255f6886299a0c Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Fri, 11 Sep 2020 16:58:09 +0530 Subject: [PATCH] retry add piece --- extern/storage-sealing/sealing.go | 4 +++- markets/storageadapter/provider.go | 17 +++++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 45b95e18c..28df664aa 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -28,6 +28,8 @@ import ( const SectorStorePrefix = "/sectors" +var ErrTooManySectorsSealing = xerrors.New("too many sectors sealing") + var log = logging.Logger("sectors") type SectorLocation struct { @@ -280,7 +282,7 @@ func (m *Sealing) newDealSector() (abi.SectorNumber, error) { if cfg.MaxSealingSectorsForDeals > 0 { if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals { - return 0, xerrors.Errorf("too many sectors sealing") + return 0, ErrTooManySectorsSealing } } diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index 9c4a5946e..677e21455 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -6,6 +6,7 @@ import ( "bytes" "context" "io" + "time" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" @@ -35,6 +36,7 @@ import ( "github.com/filecoin-project/lotus/storage/sectorblocks" ) +var addPieceRetryWait = 5 * time.Minute var log = logging.Logger("storageadapter") type ProviderNodeAdapter struct { @@ -91,7 +93,7 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema return nil, xerrors.Errorf("deal.PublishCid can't be nil") } - p, offset, err := n.secb.AddPiece(ctx, pieceSize, pieceData, sealing.DealInfo{ + sdInfo := sealing.DealInfo{ DealID: deal.DealID, PublishCid: deal.PublishCid, DealSchedule: sealing.DealSchedule{ @@ -99,7 +101,18 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema EndEpoch: deal.ClientDealProposal.Proposal.EndEpoch, }, KeepUnsealed: deal.FastRetrieval, - }) + } + + p, offset, err := n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo) + if xerrors.Is(err, sealing.ErrTooManySectorsSealing) { + select { + case <-time.After(addPieceRetryWait): + p, offset, err = n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo) + case <-ctx.Done(): + return nil, xerrors.New("context expired while waiting to retry AddPiece") + } + } + if err != nil { return nil, xerrors.Errorf("AddPiece failed: %s", err) }