diff --git a/curiosrc/market/lmrpc/lmrpc.go b/curiosrc/market/lmrpc/lmrpc.go index b760b7e1e..0f58be45c 100644 --- a/curiosrc/market/lmrpc/lmrpc.go +++ b/curiosrc/market/lmrpc/lmrpc.go @@ -37,6 +37,8 @@ import ( var log = logging.Logger("lmrpc") +const backpressureWaitTime = 30 * time.Second + func ServeCurioMarketRPCFromConfig(db *harmonydb.DB, full api.FullNode, cfg *config.CurioConfig) error { return forEachMarketRPC(cfg, func(maddr string, listen string) error { addr, err := address.NewFromString(maddr) @@ -248,46 +250,72 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr var refID int64 var pieceWasCreated bool - comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { - var pieceID int64 - // Attempt to select the piece ID first - err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID) + for { + var backpressureWait bool - if err != nil { - if err == pgx.ErrNoRows { - // Piece does not exist, attempt to insert - err = tx.QueryRow(` + comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + // BACKPRESSURE + wait, err := maybeApplyBackpressure(tx, conf.Ingest) + if err != nil { + return false, xerrors.Errorf("backpressure checks: %w", err) + } + if wait { + backpressureWait = true + return false, nil + } + + var pieceID int64 + // Attempt to select the piece ID first + err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID) + + if err != nil { + if err == pgx.ErrNoRows { + // Piece does not exist, attempt to insert + err = tx.QueryRow(` INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size) VALUES ($1, $2, $3) ON CONFLICT (piece_cid) DO NOTHING RETURNING id`, deal.DealProposal.PieceCID.String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID) - if err != nil { - return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err) + if err != nil { + return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err) + } + pieceWasCreated = true // New piece was created + } else { + // Some other error occurred during select + return false, xerrors.Errorf("checking existing parked piece: %w", err) } - pieceWasCreated = true // New piece was created } else { - // Some other error occurred during select - return false, xerrors.Errorf("checking existing parked piece: %w", err) + pieceWasCreated = false // Piece already exists, no new piece was created } - } else { - pieceWasCreated = false // Piece already exists, no new piece was created - } - // Add parked_piece_ref - err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url) + // Add parked_piece_ref + err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url) VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID) + if err != nil { + return false, xerrors.Errorf("inserting parked piece ref: %w", err) + } + + // If everything went well, commit the transaction + return true, nil // This will commit the transaction + }, harmonydb.OptionRetry()) if err != nil { - return false, xerrors.Errorf("inserting parked piece ref: %w", err) + return api.SectorOffset{}, xerrors.Errorf("inserting parked piece: %w", err) + } + if !comm { + if backpressureWait { + // Backpressure was applied, wait and try again + select { + case <-time.After(backpressureWaitTime): + case <-ctx.Done(): + return api.SectorOffset{}, xerrors.Errorf("context done while waiting for backpressure: %w", ctx.Err()) + } + continue + } + + return api.SectorOffset{}, xerrors.Errorf("piece tx didn't commit") } - // If everything went well, commit the transaction - return true, nil // This will commit the transaction - }, harmonydb.OptionRetry()) - if err != nil { - return api.SectorOffset{}, xerrors.Errorf("inserting parked piece: %w", err) - } - if !comm { - return api.SectorOffset{}, xerrors.Errorf("piece tx didn't commit") + break } // wait for piece to be parked @@ -420,3 +448,55 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr return server.ListenAndServe() } + +func maybeApplyBackpressure(tx *harmonydb.Tx, cfg config.CurioIngestConfig) (wait bool, err error) { + var bufferedSDR, bufferedTrees, bufferedPoRep int + err = tx.QueryRow(`WITH BufferedSDR AS ( + SELECT SUM(buffered_count) AS buffered_sdr_count + FROM ( + SELECT COUNT(p.task_id_sdr) - COUNT(t.owner_id) AS buffered_count + FROM sectors_sdr_pipeline p + LEFT JOIN harmony_task t ON p.task_id_sdr = t.id + WHERE p.after_sdr = false + UNION ALL + SELECT COUNT(1) AS buffered_count + FROM parked_pieces + WHERE complete = false + ) AS subquery +), + BufferedTrees AS ( + SELECT COUNT(p.task_id_tree_r) - COUNT(t.owner_id) AS buffered_trees_count + FROM sectors_sdr_pipeline p + LEFT JOIN harmony_task t ON p.task_id_tree_r = t.id + WHERE p.after_sdr = true AND p.after_tree_r = false + ), + BufferedPoRep AS ( + SELECT COUNT(p.task_id_porep) - COUNT(t.owner_id) AS buffered_porep_count + FROM sectors_sdr_pipeline p + LEFT JOIN harmony_task t ON p.task_id_porep = t.id + WHERE p.after_tree_r = true AND p.after_porep = false + ) +SELECT + (SELECT buffered_sdr_count FROM BufferedSDR) AS total_buffered, + (SELECT buffered_trees_count FROM BufferedTrees) AS buffered_trees_count, + (SELECT buffered_porep_count FROM BufferedPoRep) AS buffered_porep_count +`).Scan(&bufferedSDR, &bufferedTrees, &bufferedPoRep) + if err != nil { + return false, xerrors.Errorf("counting parked pieces: %w", err) + } + + if cfg.MaxQueueSDR != 0 && bufferedSDR > cfg.MaxQueueSDR { + log.Debugw("backpressure", "reason", "too many SDR tasks", "buffered", bufferedSDR, "max", cfg.MaxQueueSDR) + return true, nil + } + if cfg.MaxQueueTrees != 0 && bufferedTrees > cfg.MaxQueueTrees { + log.Debugw("backpressure", "reason", "too many tree tasks", "buffered", bufferedTrees, "max", cfg.MaxQueueTrees) + return true, nil + } + if cfg.MaxQueuePoRep != 0 && bufferedPoRep > cfg.MaxQueuePoRep { + log.Debugw("backpressure", "reason", "too many PoRep tasks", "buffered", bufferedPoRep, "max", cfg.MaxQueuePoRep) + return true, nil + } + + return false, nil +} diff --git a/documentation/en/default-curio-config.toml b/documentation/en/default-curio-config.toml index 3ecadeb52..afcb7608a 100644 --- a/documentation/en/default-curio-config.toml +++ b/documentation/en/default-curio-config.toml @@ -318,6 +318,37 @@ #SingleRecoveringPartitionPerPostMessage = false +[Ingest] + # Maximum number of sectors that can be queued waiting for SDR to start processing. + # 0 = unlimited + # Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem. + # The SDR queue includes deals which are in the process of entering the sealing pipeline - size of this queue + # will also impact the maximum number of ParkPiece tasks which can run concurrently. + # + # SDR queue is the first queue in the sealing pipeline, meaning that it should be used as the primary backpressure mechanism. + # + # type: int + #MaxQueueSDR = 8 + + # Maximum number of sectors that can be queued waiting for SDRTrees to start processing. + # 0 = unlimited + # Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem. + # In case of the trees tasks it is possible that this queue grows more than this limit, the backpressure is only + # applied to sectors entering the pipeline. + # + # type: int + #MaxQueueTrees = 0 + + # Maximum number of sectors that can be queued waiting for PoRep to start processing. + # 0 = unlimited + # Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem. + # Like with the trees tasks, it is possible that this queue grows more than this limit, the backpressure is only + # applied to sectors entering the pipeline. + # + # type: int + #MaxQueuePoRep = 0 + + [Journal] # Events of the form: "system1:event1,system1:event2[,...]" # diff --git a/node/config/def.go b/node/config/def.go index 0307001b8..999f015e5 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -362,5 +362,10 @@ func DefaultCurioConfig() *CurioConfig { PartitionCheckTimeout: Duration(20 * time.Minute), SingleCheckTimeout: Duration(10 * time.Minute), }, + Ingest: CurioIngestConfig{ + MaxQueueSDR: 8, // default to 8 sectors before sdr + MaxQueueTrees: 0, // default don't use this limit + MaxQueuePoRep: 0, // default don't use this limit + }, } } diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index e9bfb6fba..74549163e 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -208,6 +208,12 @@ over the worker address if this flag is set.`, Comment: ``, }, + { + Name: "Ingest", + Type: "CurioIngestConfig", + + Comment: ``, + }, { Name: "Journal", Type: "JournalConfig", @@ -271,6 +277,40 @@ over the worker address if this flag is set.`, Comment: ``, }, }, + "CurioIngestConfig": { + { + Name: "MaxQueueSDR", + Type: "int", + + Comment: `Maximum number of sectors that can be queued waiting for SDR to start processing. +0 = unlimited +Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem. +The SDR queue includes deals which are in the process of entering the sealing pipeline - size of this queue +will also impact the maximum number of ParkPiece tasks which can run concurrently. + +SDR queue is the first queue in the sealing pipeline, meaning that it should be used as the primary backpressure mechanism.`, + }, + { + Name: "MaxQueueTrees", + Type: "int", + + Comment: `Maximum number of sectors that can be queued waiting for SDRTrees to start processing. +0 = unlimited +Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem. +In case of the trees tasks it is possible that this queue grows more than this limit, the backpressure is only +applied to sectors entering the pipeline.`, + }, + { + Name: "MaxQueuePoRep", + Type: "int", + + Comment: `Maximum number of sectors that can be queued waiting for PoRep to start processing. +0 = unlimited +Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem. +Like with the trees tasks, it is possible that this queue grows more than this limit, the backpressure is only +applied to sectors entering the pipeline.`, + }, + }, "CurioProvingConfig": { { Name: "ParallelCheckLimit", diff --git a/node/config/types.go b/node/config/types.go index d133fcadd..c15df320f 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -74,6 +74,7 @@ type CurioConfig struct { // Addresses of wallets per MinerAddress (one of the fields). Addresses []CurioAddresses Proving CurioProvingConfig + Ingest CurioIngestConfig Journal JournalConfig Apis ApisConfig } @@ -826,6 +827,31 @@ type CurioProvingConfig struct { SingleRecoveringPartitionPerPostMessage bool } +type CurioIngestConfig struct { + // Maximum number of sectors that can be queued waiting for SDR to start processing. + // 0 = unlimited + // Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem. + // The SDR queue includes deals which are in the process of entering the sealing pipeline - size of this queue + // will also impact the maximum number of ParkPiece tasks which can run concurrently. + // + // SDR queue is the first queue in the sealing pipeline, meaning that it should be used as the primary backpressure mechanism. + MaxQueueSDR int + + // Maximum number of sectors that can be queued waiting for SDRTrees to start processing. + // 0 = unlimited + // Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem. + // In case of the trees tasks it is possible that this queue grows more than this limit, the backpressure is only + // applied to sectors entering the pipeline. + MaxQueueTrees int + + // Maximum number of sectors that can be queued waiting for PoRep to start processing. + // 0 = unlimited + // Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem. + // Like with the trees tasks, it is possible that this queue grows more than this limit, the backpressure is only + // applied to sectors entering the pipeline. + MaxQueuePoRep int +} + // API contains configs for API endpoint type API struct { // Binding address for the Lotus API