bd10bdf99a
* build: Bump version to v1.17.3-dev * build: set version to v1.18.0-dev * chore: actors: Allow builtin-actors to return a map of methods (#9342) * Allow builtin-actors to return a map of methods * go mod * Fix tests * Fix tests, check carefully please * Delete lotus-pond (#9352) * feat: add StateNetworkVersion to mpool API * chore: refactor: rename NewestNetworkVersion * feat: actors: Integrate datacap actor into lotus (#9348) * Integrate datacap actor * Implement datacap actor in chain/builtin * feat: support typed errors over RPC * chore: deps: update to go-jsonrpc 0.1.8 * remove duplicate import * fix: itest: check for closed connection * chore: refactor: move retry test to API * address magik supernit * Add ability to only have single partition per msg for partitions with recovery sectors * doc gen * Address comments * Return beneficiary info from miner state Info() * Update builtin-actors to dev/20220922-v9 which includes FIP-0045 changes in progress * Integrate verifreg changes to lotus * Setup datacap actor * Update builtin-actors to dev/20220922-v9-1 * Update datacap actor to query datacap instead of verifreg * update gst * update markets * update actors with hamt fix * update gst * Update datacap to parse tokens * Update bundles * datacap and verifreg actors use ID addresses without protocol byte * update builtin-actors to rc1 * update go-fil-markets * Update bundles to rc2 * Integrate the v9 migration * Add api for getting allocation * Add upgrade epoch for butterfly * Tweak PreSeal struct to be infra-friendly * docsgen * More tweaking of PreSeal for genesis * review fixes * Use fake cid for test * add butterfly artifacts for oct 5 upgrade * check datacaps for v8 verifreg match v9 datacap actor * Remove print statements * Update to go-state-types master * Update to go-state-types v0.9.0-rc1 * review fixes * use go-fil-markets v1.24.0-v17 * Add accessors for allocations and claims maps * fix: missing permissions tag * butterfly * update butterfly artifacts * sealing pipeline: Prepare deal assigning logic for FIP-45 * sealing pipeline: Get allocationId with StateApi * use NoAllocationID instead of nil AllocationId * address review * Add datacap actor to registry.go * Add cli for listing allocations and removing expired allocations * Update to go-state-types master * deps: upgrade go-merkledag to 0.8.0 * shark params * Update cli/filplus.go Co-authored-by: Aayush Rajasekaran <arajasek94@gmail.com> * revert change to verifreg util * docsgen-cli * miss the stuff * Update FFI * Update go-state-types to v0.9.0 * Update builtin-actors to v9.0.0 * add calib upgrade epcoh * update the upgrade envvar * kill shark * Remove fvm splash banner from nv17 upgrade * check invariance for pending deals and allocations * check pending verified deal proposal migrated to allocation * Add check for unsealed CID in precommit sectors * Fix counting of allocations in nv17 migration test * make gen * pass state trees as pointers * Add assertion that migrations with & without cache are the same * compare allocation to verified deal proposal * Fix miner state precommit info * fix migration test tool * add changelog * Update to go-state-types v0.9.1 * Integrate builtin-actors v9.0.1 * chore: ver: bump version for rc3 (#9512) * Bump version to 1.18.0-rc3 * Update CHANGELOG.md * Update CHANGELOG.md Co-authored-by: Aayush Rajasekaran <arajasek94@gmail.com> * Update CHANGELOG.md Co-authored-by: Aayush Rajasekaran <arajasek94@gmail.com> Co-authored-by: Jiaying Wang <42981373+jennijuju@users.noreply.github.com> Co-authored-by: Aayush Rajasekaran <arajasek94@gmail.com> * Migration: Use autobatch bs * Fix autobatch Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai> * Invoker: Use MethodMeta from go-state-types * Add a second premigration for nv17 * Add more shed tools for migration checking * address review * Lotus release v1.18.0-rc4 * fix: ci: fix app-image build on ci (#9527) * Remove old go version first * Add GO_VERSION file * Use GO_VERSION to set / verify go version * mv GO_VERSION GO_VERSION_MIN * Use GO_VERSION_MIN in Makefile check Co-authored-by: Ian Davis <jungziege@gmail.com> * Update to latest go-state-types for migration fixes * go mod tidy * fix: use api.ErrActorNotFound instead of types.ErrActorNotFound * fix: add fields to ForkUpgradeParams * docs: update actors_version_checklist.md * chore: fix lint * update to go state type v0.9.6 with market migration fix (#9545) * update go-state-types to v-0.9.7 * Add invariant checks to migration * fix invariant check: number of entries in datacap actor should include verifreg * Invariant checks: Only include not-activated deals * test: nv17 migration * Address review * add lotus-shed invariance method * Migration cli takes a stateroot cid and a height * make gen * Update to builtin-actors v9.0.2 * Failing test that shows that notaries can remove datacap from the verifreg actor * Test that should pass when the problem is solved * make gen * Review fixes * statemanager call function will return call information even if call errors * update go-state-types * update builtin-actors * bubble up errors properly from ApplyImplicitMessage * bump to rc5 * set new upgrade heights for calibnet * set new upgrade height for butterfly * tweak calibnet upgrade schedule * clarify changelog note about calibnet * butterfly * update calibnet artifacts * Allow setting local bundles for Debug FVM for av 9+ * fix: autobatch: remove potential deadlock when a block is missing Check the _underlying_ blockstore instead of recursing. Also, drop the lock before we do that. * fix imports * build: set shark mainnet epoch (#9640) * chore: build: Lotus release v1.18.0 (#9641) * Lotus release v1.18.0 * add changelog * address review * changelog improvement Co-authored-by: Jennifer Wang <jiayingw703@gmail.com> Co-authored-by: Jiaying Wang <42981373+jennijuju@users.noreply.github.com> Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai> Co-authored-by: Łukasz Magiera <magik6k@gmail.com> Co-authored-by: Łukasz Magiera <magik6k@users.noreply.github.com> Co-authored-by: Aayush <arajasek94@gmail.com> Co-authored-by: Geoff Stuart <geoff.vball@gmail.com> Co-authored-by: Shrenuj Bansal <shrenuj.bansal@protocol.ai> Co-authored-by: simlecode <69969590+simlecode@users.noreply.github.com> Co-authored-by: Rod Vagg <rod@vagg.org> Co-authored-by: Jakub Sztandera <kubuxu@protocol.ai> Co-authored-by: Ian Davis <jungziege@gmail.com> Co-authored-by: zenground0 <ZenGround0@users.noreply.github.com> Co-authored-by: Steven Allen <steven@stebalien.com>
447 lines
13 KiB
Go
447 lines
13 KiB
Go
package storageadapter
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
"go.uber.org/fx"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-address"
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
"github.com/filecoin-project/go-state-types/big"
|
|
"github.com/filecoin-project/go-state-types/builtin"
|
|
"github.com/filecoin-project/go-state-types/builtin/v9/market"
|
|
"github.com/filecoin-project/go-state-types/exitcode"
|
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/chain/actors"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/node/config"
|
|
"github.com/filecoin-project/lotus/storage/ctladdr"
|
|
)
|
|
|
|
type dealPublisherAPI interface {
|
|
ChainHead(context.Context) (*types.TipSet, error)
|
|
MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error)
|
|
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
|
|
|
|
WalletBalance(context.Context, address.Address) (types.BigInt, error)
|
|
WalletHas(context.Context, address.Address) (bool, error)
|
|
StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error)
|
|
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
|
|
StateCall(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error)
|
|
}
|
|
|
|
// DealPublisher batches deal publishing so that many deals can be included in
|
|
// a single publish message. This saves gas for miners that publish deals
|
|
// frequently.
|
|
// When a deal is submitted, the DealPublisher waits a configurable amount of
|
|
// time for other deals to be submitted before sending the publish message.
|
|
// There is a configurable maximum number of deals that can be included in one
|
|
// message. When the limit is reached the DealPublisher immediately submits a
|
|
// publish message with all deals in the queue.
|
|
type DealPublisher struct {
|
|
api dealPublisherAPI
|
|
as *ctladdr.AddressSelector
|
|
|
|
ctx context.Context
|
|
Shutdown context.CancelFunc
|
|
|
|
maxDealsPerPublishMsg uint64
|
|
publishPeriod time.Duration
|
|
publishSpec *api.MessageSendSpec
|
|
|
|
lk sync.Mutex
|
|
pending []*pendingDeal
|
|
cancelWaitForMoreDeals context.CancelFunc
|
|
publishPeriodStart time.Time
|
|
startEpochSealingBuffer abi.ChainEpoch
|
|
}
|
|
|
|
// A deal that is queued to be published
|
|
type pendingDeal struct {
|
|
ctx context.Context
|
|
deal market.ClientDealProposal
|
|
Result chan publishResult
|
|
}
|
|
|
|
// The result of publishing a deal
|
|
type publishResult struct {
|
|
msgCid cid.Cid
|
|
err error
|
|
}
|
|
|
|
func newPendingDeal(ctx context.Context, deal market.ClientDealProposal) *pendingDeal {
|
|
return &pendingDeal{
|
|
ctx: ctx,
|
|
deal: deal,
|
|
Result: make(chan publishResult),
|
|
}
|
|
}
|
|
|
|
type PublishMsgConfig struct {
|
|
// The amount of time to wait for more deals to arrive before
|
|
// publishing
|
|
Period time.Duration
|
|
// The maximum number of deals to include in a single PublishStorageDeals
|
|
// message
|
|
MaxDealsPerMsg uint64
|
|
// Minimum start epoch buffer to give time for sealing of sector with deal
|
|
StartEpochSealingBuffer uint64
|
|
}
|
|
|
|
func NewDealPublisher(
|
|
feeConfig *config.MinerFeeConfig,
|
|
publishMsgCfg PublishMsgConfig,
|
|
) func(lc fx.Lifecycle, full api.FullNode, as *ctladdr.AddressSelector) *DealPublisher {
|
|
return func(lc fx.Lifecycle, full api.FullNode, as *ctladdr.AddressSelector) *DealPublisher {
|
|
maxFee := abi.NewTokenAmount(0)
|
|
if feeConfig != nil {
|
|
maxFee = abi.TokenAmount(feeConfig.MaxPublishDealsFee)
|
|
}
|
|
publishSpec := &api.MessageSendSpec{MaxFee: maxFee}
|
|
dp := newDealPublisher(full, as, publishMsgCfg, publishSpec)
|
|
lc.Append(fx.Hook{
|
|
OnStop: func(ctx context.Context) error {
|
|
dp.Shutdown()
|
|
return nil
|
|
},
|
|
})
|
|
return dp
|
|
}
|
|
}
|
|
|
|
func newDealPublisher(
|
|
dpapi dealPublisherAPI,
|
|
as *ctladdr.AddressSelector,
|
|
publishMsgCfg PublishMsgConfig,
|
|
publishSpec *api.MessageSendSpec,
|
|
) *DealPublisher {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
return &DealPublisher{
|
|
api: dpapi,
|
|
as: as,
|
|
ctx: ctx,
|
|
Shutdown: cancel,
|
|
maxDealsPerPublishMsg: publishMsgCfg.MaxDealsPerMsg,
|
|
publishPeriod: publishMsgCfg.Period,
|
|
startEpochSealingBuffer: abi.ChainEpoch(publishMsgCfg.StartEpochSealingBuffer),
|
|
publishSpec: publishSpec,
|
|
}
|
|
}
|
|
|
|
// PendingDeals returns the list of deals that are queued up to be published
|
|
func (p *DealPublisher) PendingDeals() api.PendingDealInfo {
|
|
p.lk.Lock()
|
|
defer p.lk.Unlock()
|
|
|
|
// Filter out deals whose context has been cancelled
|
|
deals := make([]*pendingDeal, 0, len(p.pending))
|
|
for _, dl := range p.pending {
|
|
if dl.ctx.Err() == nil {
|
|
deals = append(deals, dl)
|
|
}
|
|
}
|
|
|
|
pending := make([]market.ClientDealProposal, len(deals))
|
|
for i, deal := range deals {
|
|
pending[i] = deal.deal
|
|
}
|
|
|
|
return api.PendingDealInfo{
|
|
Deals: pending,
|
|
PublishPeriodStart: p.publishPeriodStart,
|
|
PublishPeriod: p.publishPeriod,
|
|
}
|
|
}
|
|
|
|
// ForcePublishPendingDeals publishes all pending deals without waiting for
|
|
// the publish period to elapse
|
|
func (p *DealPublisher) ForcePublishPendingDeals() {
|
|
p.lk.Lock()
|
|
defer p.lk.Unlock()
|
|
|
|
log.Infof("force publishing deals")
|
|
p.publishAllDeals()
|
|
}
|
|
|
|
func (p *DealPublisher) Publish(ctx context.Context, deal market.ClientDealProposal) (cid.Cid, error) {
|
|
pdeal := newPendingDeal(ctx, deal)
|
|
|
|
// Add the deal to the queue
|
|
p.processNewDeal(pdeal)
|
|
|
|
// Wait for the deal to be submitted
|
|
select {
|
|
case <-ctx.Done():
|
|
return cid.Undef, ctx.Err()
|
|
case res := <-pdeal.Result:
|
|
return res.msgCid, res.err
|
|
}
|
|
}
|
|
|
|
func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) {
|
|
p.lk.Lock()
|
|
defer p.lk.Unlock()
|
|
|
|
// Filter out any cancelled deals
|
|
p.filterCancelledDeals()
|
|
|
|
// If all deals have been cancelled, clear the wait-for-deals timer
|
|
if len(p.pending) == 0 && p.cancelWaitForMoreDeals != nil {
|
|
p.cancelWaitForMoreDeals()
|
|
p.cancelWaitForMoreDeals = nil
|
|
}
|
|
|
|
// Make sure the new deal hasn't been cancelled
|
|
if pdeal.ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
// Add the new deal to the queue
|
|
p.pending = append(p.pending, pdeal)
|
|
log.Infof("add deal with piece CID %s to publish deals queue - %d deals in queue (max queue size %d)",
|
|
pdeal.deal.Proposal.PieceCID, len(p.pending), p.maxDealsPerPublishMsg)
|
|
|
|
// If the maximum number of deals per message has been reached or we're not batching, send a
|
|
// publish message
|
|
if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg || p.publishPeriod == 0 {
|
|
log.Infof("publish deals queue has reached max size of %d, publishing deals", p.maxDealsPerPublishMsg)
|
|
p.publishAllDeals()
|
|
return
|
|
}
|
|
|
|
// Otherwise wait for more deals to arrive or the timeout to be reached
|
|
p.waitForMoreDeals()
|
|
}
|
|
|
|
func (p *DealPublisher) waitForMoreDeals() {
|
|
// Check if we're already waiting for deals
|
|
if !p.publishPeriodStart.IsZero() {
|
|
elapsed := build.Clock.Since(p.publishPeriodStart)
|
|
log.Infof("%s elapsed of / %s until publish deals queue is published",
|
|
elapsed, p.publishPeriod)
|
|
return
|
|
}
|
|
|
|
// Set a timeout to wait for more deals to arrive
|
|
log.Infof("waiting publish deals queue period of %s before publishing", p.publishPeriod)
|
|
ctx, cancel := context.WithCancel(p.ctx)
|
|
|
|
// Create the timer _before_ taking the current time so publishPeriod+timeout is always >=
|
|
// the actual timer timeout.
|
|
timer := build.Clock.Timer(p.publishPeriod)
|
|
|
|
p.publishPeriodStart = build.Clock.Now()
|
|
p.cancelWaitForMoreDeals = cancel
|
|
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
timer.Stop()
|
|
case <-timer.C:
|
|
p.lk.Lock()
|
|
defer p.lk.Unlock()
|
|
|
|
// The timeout has expired so publish all pending deals
|
|
log.Infof("publish deals queue period of %s has expired, publishing deals", p.publishPeriod)
|
|
p.publishAllDeals()
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (p *DealPublisher) publishAllDeals() {
|
|
// If the timeout hasn't yet been cancelled, cancel it
|
|
if p.cancelWaitForMoreDeals != nil {
|
|
p.cancelWaitForMoreDeals()
|
|
p.cancelWaitForMoreDeals = nil
|
|
p.publishPeriodStart = time.Time{}
|
|
}
|
|
|
|
// Filter out any deals that have been cancelled
|
|
p.filterCancelledDeals()
|
|
deals := p.pending
|
|
p.pending = nil
|
|
|
|
// Send the publish message
|
|
go p.publishReady(deals)
|
|
}
|
|
|
|
func (p *DealPublisher) publishReady(ready []*pendingDeal) {
|
|
if len(ready) == 0 {
|
|
return
|
|
}
|
|
|
|
// onComplete is called when the publish message has been sent or there
|
|
// was an error
|
|
onComplete := func(pd *pendingDeal, msgCid cid.Cid, err error) {
|
|
// Send the publish result on the pending deal's Result channel
|
|
res := publishResult{
|
|
msgCid: msgCid,
|
|
err: err,
|
|
}
|
|
select {
|
|
case <-p.ctx.Done():
|
|
case <-pd.ctx.Done():
|
|
case pd.Result <- res:
|
|
}
|
|
}
|
|
|
|
// Validate each deal to make sure it can be published
|
|
validated := make([]*pendingDeal, 0, len(ready))
|
|
deals := make([]market.ClientDealProposal, 0, len(ready))
|
|
for _, pd := range ready {
|
|
// Validate the deal
|
|
if err := p.validateDeal(pd.deal); err != nil {
|
|
// Validation failed, complete immediately with an error
|
|
go onComplete(pd, cid.Undef, xerrors.Errorf("publish validation failed: %w", err))
|
|
continue
|
|
}
|
|
|
|
validated = append(validated, pd)
|
|
deals = append(deals, pd.deal)
|
|
}
|
|
|
|
// Send the publish message
|
|
msgCid, err := p.publishDealProposals(deals)
|
|
|
|
// Signal that each deal has been published
|
|
for _, pd := range validated {
|
|
go onComplete(pd, msgCid, err)
|
|
}
|
|
}
|
|
|
|
// validateDeal checks that the deal proposal start epoch hasn't already
|
|
// elapsed
|
|
func (p *DealPublisher) validateDeal(deal market.ClientDealProposal) error {
|
|
start := time.Now()
|
|
|
|
pcid, err := deal.Proposal.Cid()
|
|
if err != nil {
|
|
return xerrors.Errorf("computing proposal cid: %w", err)
|
|
}
|
|
|
|
head, err := p.api.ChainHead(p.ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if head.Height()+p.startEpochSealingBuffer > deal.Proposal.StartEpoch {
|
|
return xerrors.Errorf(
|
|
"cannot publish deal with piece CID %s: current epoch %d has passed deal proposal start epoch %d",
|
|
deal.Proposal.PieceCID, head.Height(), deal.Proposal.StartEpoch)
|
|
}
|
|
|
|
mi, err := p.api.StateMinerInfo(p.ctx, deal.Proposal.Provider, types.EmptyTSK)
|
|
if err != nil {
|
|
return xerrors.Errorf("getting provider info: %w", err)
|
|
}
|
|
|
|
params, err := actors.SerializeParams(&market.PublishStorageDealsParams{
|
|
Deals: []market.ClientDealProposal{deal},
|
|
})
|
|
if err != nil {
|
|
return xerrors.Errorf("serializing PublishStorageDeals params failed: %w", err)
|
|
}
|
|
|
|
addr, _, err := p.as.AddressFor(p.ctx, p.api, mi, api.DealPublishAddr, big.Zero(), big.Zero())
|
|
if err != nil {
|
|
return xerrors.Errorf("selecting address for publishing deals: %w", err)
|
|
}
|
|
|
|
res, err := p.api.StateCall(p.ctx, &types.Message{
|
|
To: builtin.StorageMarketActorAddr,
|
|
From: addr,
|
|
Value: types.NewInt(0),
|
|
Method: builtin.MethodsMarket.PublishStorageDeals,
|
|
Params: params,
|
|
}, head.Key())
|
|
if err != nil {
|
|
return xerrors.Errorf("simulating deal publish message: %w", err)
|
|
}
|
|
if res.MsgRct.ExitCode != exitcode.Ok {
|
|
return xerrors.Errorf("simulating deal publish message: non-zero exitcode %s; message: %s", res.MsgRct.ExitCode, res.Error)
|
|
}
|
|
|
|
took := time.Now().Sub(start)
|
|
log.Infow("validating deal", "took", took, "proposal", pcid)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Sends the publish message
|
|
func (p *DealPublisher) publishDealProposals(deals []market.ClientDealProposal) (cid.Cid, error) {
|
|
if len(deals) == 0 {
|
|
return cid.Undef, nil
|
|
}
|
|
|
|
log.Infof("publishing %d deals in publish deals queue with piece CIDs: %s", len(deals), pieceCids(deals))
|
|
|
|
provider := deals[0].Proposal.Provider
|
|
for _, dl := range deals {
|
|
if dl.Proposal.Provider != provider {
|
|
msg := fmt.Sprintf("publishing %d deals failed: ", len(deals)) +
|
|
"not all deals are for same provider: " +
|
|
fmt.Sprintf("deal with piece CID %s is for provider %s ", deals[0].Proposal.PieceCID, deals[0].Proposal.Provider) +
|
|
fmt.Sprintf("but deal with piece CID %s is for provider %s", dl.Proposal.PieceCID, dl.Proposal.Provider)
|
|
return cid.Undef, xerrors.Errorf(msg)
|
|
}
|
|
}
|
|
|
|
mi, err := p.api.StateMinerInfo(p.ctx, provider, types.EmptyTSK)
|
|
if err != nil {
|
|
return cid.Undef, err
|
|
}
|
|
|
|
params, err := actors.SerializeParams(&market.PublishStorageDealsParams{
|
|
Deals: deals,
|
|
})
|
|
|
|
if err != nil {
|
|
return cid.Undef, xerrors.Errorf("serializing PublishStorageDeals params failed: %w", err)
|
|
}
|
|
|
|
addr, _, err := p.as.AddressFor(p.ctx, p.api, mi, api.DealPublishAddr, big.Zero(), big.Zero())
|
|
if err != nil {
|
|
return cid.Undef, xerrors.Errorf("selecting address for publishing deals: %w", err)
|
|
}
|
|
|
|
smsg, err := p.api.MpoolPushMessage(p.ctx, &types.Message{
|
|
To: builtin.StorageMarketActorAddr,
|
|
From: addr,
|
|
Value: types.NewInt(0),
|
|
Method: builtin.MethodsMarket.PublishStorageDeals,
|
|
Params: params,
|
|
}, p.publishSpec)
|
|
|
|
if err != nil {
|
|
return cid.Undef, err
|
|
}
|
|
return smsg.Cid(), nil
|
|
}
|
|
|
|
func pieceCids(deals []market.ClientDealProposal) string {
|
|
cids := make([]string, 0, len(deals))
|
|
for _, dl := range deals {
|
|
cids = append(cids, dl.Proposal.PieceCID.String())
|
|
}
|
|
return strings.Join(cids, ", ")
|
|
}
|
|
|
|
// filter out deals that have been cancelled
|
|
func (p *DealPublisher) filterCancelledDeals() {
|
|
filtered := p.pending[:0]
|
|
for _, pd := range p.pending {
|
|
if pd.ctx.Err() != nil {
|
|
continue
|
|
}
|
|
filtered = append(filtered, pd)
|
|
}
|
|
p.pending = filtered
|
|
}
|