feat: curio/lmrpc: Ingest backpressure (#11865)
This commit is contained in:
parent
3cc62e04e9
commit
3931710c72
@ -37,6 +37,8 @@ import (
|
||||
|
||||
var log = logging.Logger("lmrpc")
|
||||
|
||||
const backpressureWaitTime = 30 * time.Second
|
||||
|
||||
func ServeCurioMarketRPCFromConfig(db *harmonydb.DB, full api.FullNode, cfg *config.CurioConfig) error {
|
||||
return forEachMarketRPC(cfg, func(maddr string, listen string) error {
|
||||
addr, err := address.NewFromString(maddr)
|
||||
@ -248,46 +250,72 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr
|
||||
var refID int64
|
||||
var pieceWasCreated bool
|
||||
|
||||
comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||
var pieceID int64
|
||||
// Attempt to select the piece ID first
|
||||
err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID)
|
||||
for {
|
||||
var backpressureWait bool
|
||||
|
||||
if err != nil {
|
||||
if err == pgx.ErrNoRows {
|
||||
// Piece does not exist, attempt to insert
|
||||
err = tx.QueryRow(`
|
||||
comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||
// BACKPRESSURE
|
||||
wait, err := maybeApplyBackpressure(tx, conf.Ingest)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("backpressure checks: %w", err)
|
||||
}
|
||||
if wait {
|
||||
backpressureWait = true
|
||||
return false, nil
|
||||
}
|
||||
|
||||
var pieceID int64
|
||||
// Attempt to select the piece ID first
|
||||
err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID)
|
||||
|
||||
if err != nil {
|
||||
if err == pgx.ErrNoRows {
|
||||
// Piece does not exist, attempt to insert
|
||||
err = tx.QueryRow(`
|
||||
INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size)
|
||||
VALUES ($1, $2, $3)
|
||||
ON CONFLICT (piece_cid) DO NOTHING
|
||||
RETURNING id`, deal.DealProposal.PieceCID.String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err)
|
||||
}
|
||||
pieceWasCreated = true // New piece was created
|
||||
} else {
|
||||
// Some other error occurred during select
|
||||
return false, xerrors.Errorf("checking existing parked piece: %w", err)
|
||||
}
|
||||
pieceWasCreated = true // New piece was created
|
||||
} else {
|
||||
// Some other error occurred during select
|
||||
return false, xerrors.Errorf("checking existing parked piece: %w", err)
|
||||
pieceWasCreated = false // Piece already exists, no new piece was created
|
||||
}
|
||||
} else {
|
||||
pieceWasCreated = false // Piece already exists, no new piece was created
|
||||
}
|
||||
|
||||
// Add parked_piece_ref
|
||||
err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url)
|
||||
// Add parked_piece_ref
|
||||
err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url)
|
||||
VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("inserting parked piece ref: %w", err)
|
||||
}
|
||||
|
||||
// If everything went well, commit the transaction
|
||||
return true, nil // This will commit the transaction
|
||||
}, harmonydb.OptionRetry())
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("inserting parked piece ref: %w", err)
|
||||
return api.SectorOffset{}, xerrors.Errorf("inserting parked piece: %w", err)
|
||||
}
|
||||
if !comm {
|
||||
if backpressureWait {
|
||||
// Backpressure was applied, wait and try again
|
||||
select {
|
||||
case <-time.After(backpressureWaitTime):
|
||||
case <-ctx.Done():
|
||||
return api.SectorOffset{}, xerrors.Errorf("context done while waiting for backpressure: %w", ctx.Err())
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
return api.SectorOffset{}, xerrors.Errorf("piece tx didn't commit")
|
||||
}
|
||||
|
||||
// If everything went well, commit the transaction
|
||||
return true, nil // This will commit the transaction
|
||||
}, harmonydb.OptionRetry())
|
||||
if err != nil {
|
||||
return api.SectorOffset{}, xerrors.Errorf("inserting parked piece: %w", err)
|
||||
}
|
||||
if !comm {
|
||||
return api.SectorOffset{}, xerrors.Errorf("piece tx didn't commit")
|
||||
break
|
||||
}
|
||||
|
||||
// wait for piece to be parked
|
||||
@ -420,3 +448,55 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr
|
||||
|
||||
return server.ListenAndServe()
|
||||
}
|
||||
|
||||
func maybeApplyBackpressure(tx *harmonydb.Tx, cfg config.CurioIngestConfig) (wait bool, err error) {
|
||||
var bufferedSDR, bufferedTrees, bufferedPoRep int
|
||||
err = tx.QueryRow(`WITH BufferedSDR AS (
|
||||
SELECT SUM(buffered_count) AS buffered_sdr_count
|
||||
FROM (
|
||||
SELECT COUNT(p.task_id_sdr) - COUNT(t.owner_id) AS buffered_count
|
||||
FROM sectors_sdr_pipeline p
|
||||
LEFT JOIN harmony_task t ON p.task_id_sdr = t.id
|
||||
WHERE p.after_sdr = false
|
||||
UNION ALL
|
||||
SELECT COUNT(1) AS buffered_count
|
||||
FROM parked_pieces
|
||||
WHERE complete = false
|
||||
) AS subquery
|
||||
),
|
||||
BufferedTrees AS (
|
||||
SELECT COUNT(p.task_id_tree_r) - COUNT(t.owner_id) AS buffered_trees_count
|
||||
FROM sectors_sdr_pipeline p
|
||||
LEFT JOIN harmony_task t ON p.task_id_tree_r = t.id
|
||||
WHERE p.after_sdr = true AND p.after_tree_r = false
|
||||
),
|
||||
BufferedPoRep AS (
|
||||
SELECT COUNT(p.task_id_porep) - COUNT(t.owner_id) AS buffered_porep_count
|
||||
FROM sectors_sdr_pipeline p
|
||||
LEFT JOIN harmony_task t ON p.task_id_porep = t.id
|
||||
WHERE p.after_tree_r = true AND p.after_porep = false
|
||||
)
|
||||
SELECT
|
||||
(SELECT buffered_sdr_count FROM BufferedSDR) AS total_buffered,
|
||||
(SELECT buffered_trees_count FROM BufferedTrees) AS buffered_trees_count,
|
||||
(SELECT buffered_porep_count FROM BufferedPoRep) AS buffered_porep_count
|
||||
`).Scan(&bufferedSDR, &bufferedTrees, &bufferedPoRep)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("counting parked pieces: %w", err)
|
||||
}
|
||||
|
||||
if cfg.MaxQueueSDR != 0 && bufferedSDR > cfg.MaxQueueSDR {
|
||||
log.Debugw("backpressure", "reason", "too many SDR tasks", "buffered", bufferedSDR, "max", cfg.MaxQueueSDR)
|
||||
return true, nil
|
||||
}
|
||||
if cfg.MaxQueueTrees != 0 && bufferedTrees > cfg.MaxQueueTrees {
|
||||
log.Debugw("backpressure", "reason", "too many tree tasks", "buffered", bufferedTrees, "max", cfg.MaxQueueTrees)
|
||||
return true, nil
|
||||
}
|
||||
if cfg.MaxQueuePoRep != 0 && bufferedPoRep > cfg.MaxQueuePoRep {
|
||||
log.Debugw("backpressure", "reason", "too many PoRep tasks", "buffered", bufferedPoRep, "max", cfg.MaxQueuePoRep)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
@ -318,6 +318,37 @@
|
||||
#SingleRecoveringPartitionPerPostMessage = false
|
||||
|
||||
|
||||
[Ingest]
|
||||
# Maximum number of sectors that can be queued waiting for SDR to start processing.
|
||||
# 0 = unlimited
|
||||
# Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
|
||||
# The SDR queue includes deals which are in the process of entering the sealing pipeline - size of this queue
|
||||
# will also impact the maximum number of ParkPiece tasks which can run concurrently.
|
||||
#
|
||||
# SDR queue is the first queue in the sealing pipeline, meaning that it should be used as the primary backpressure mechanism.
|
||||
#
|
||||
# type: int
|
||||
#MaxQueueSDR = 8
|
||||
|
||||
# Maximum number of sectors that can be queued waiting for SDRTrees to start processing.
|
||||
# 0 = unlimited
|
||||
# Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
|
||||
# In case of the trees tasks it is possible that this queue grows more than this limit, the backpressure is only
|
||||
# applied to sectors entering the pipeline.
|
||||
#
|
||||
# type: int
|
||||
#MaxQueueTrees = 0
|
||||
|
||||
# Maximum number of sectors that can be queued waiting for PoRep to start processing.
|
||||
# 0 = unlimited
|
||||
# Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
|
||||
# Like with the trees tasks, it is possible that this queue grows more than this limit, the backpressure is only
|
||||
# applied to sectors entering the pipeline.
|
||||
#
|
||||
# type: int
|
||||
#MaxQueuePoRep = 0
|
||||
|
||||
|
||||
[Journal]
|
||||
# Events of the form: "system1:event1,system1:event2[,...]"
|
||||
#
|
||||
|
@ -362,5 +362,10 @@ func DefaultCurioConfig() *CurioConfig {
|
||||
PartitionCheckTimeout: Duration(20 * time.Minute),
|
||||
SingleCheckTimeout: Duration(10 * time.Minute),
|
||||
},
|
||||
Ingest: CurioIngestConfig{
|
||||
MaxQueueSDR: 8, // default to 8 sectors before sdr
|
||||
MaxQueueTrees: 0, // default don't use this limit
|
||||
MaxQueuePoRep: 0, // default don't use this limit
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -208,6 +208,12 @@ over the worker address if this flag is set.`,
|
||||
|
||||
Comment: ``,
|
||||
},
|
||||
{
|
||||
Name: "Ingest",
|
||||
Type: "CurioIngestConfig",
|
||||
|
||||
Comment: ``,
|
||||
},
|
||||
{
|
||||
Name: "Journal",
|
||||
Type: "JournalConfig",
|
||||
@ -271,6 +277,40 @@ over the worker address if this flag is set.`,
|
||||
Comment: ``,
|
||||
},
|
||||
},
|
||||
"CurioIngestConfig": {
|
||||
{
|
||||
Name: "MaxQueueSDR",
|
||||
Type: "int",
|
||||
|
||||
Comment: `Maximum number of sectors that can be queued waiting for SDR to start processing.
|
||||
0 = unlimited
|
||||
Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
|
||||
The SDR queue includes deals which are in the process of entering the sealing pipeline - size of this queue
|
||||
will also impact the maximum number of ParkPiece tasks which can run concurrently.
|
||||
|
||||
SDR queue is the first queue in the sealing pipeline, meaning that it should be used as the primary backpressure mechanism.`,
|
||||
},
|
||||
{
|
||||
Name: "MaxQueueTrees",
|
||||
Type: "int",
|
||||
|
||||
Comment: `Maximum number of sectors that can be queued waiting for SDRTrees to start processing.
|
||||
0 = unlimited
|
||||
Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
|
||||
In case of the trees tasks it is possible that this queue grows more than this limit, the backpressure is only
|
||||
applied to sectors entering the pipeline.`,
|
||||
},
|
||||
{
|
||||
Name: "MaxQueuePoRep",
|
||||
Type: "int",
|
||||
|
||||
Comment: `Maximum number of sectors that can be queued waiting for PoRep to start processing.
|
||||
0 = unlimited
|
||||
Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
|
||||
Like with the trees tasks, it is possible that this queue grows more than this limit, the backpressure is only
|
||||
applied to sectors entering the pipeline.`,
|
||||
},
|
||||
},
|
||||
"CurioProvingConfig": {
|
||||
{
|
||||
Name: "ParallelCheckLimit",
|
||||
|
@ -74,6 +74,7 @@ type CurioConfig struct {
|
||||
// Addresses of wallets per MinerAddress (one of the fields).
|
||||
Addresses []CurioAddresses
|
||||
Proving CurioProvingConfig
|
||||
Ingest CurioIngestConfig
|
||||
Journal JournalConfig
|
||||
Apis ApisConfig
|
||||
}
|
||||
@ -826,6 +827,31 @@ type CurioProvingConfig struct {
|
||||
SingleRecoveringPartitionPerPostMessage bool
|
||||
}
|
||||
|
||||
type CurioIngestConfig struct {
|
||||
// Maximum number of sectors that can be queued waiting for SDR to start processing.
|
||||
// 0 = unlimited
|
||||
// Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
|
||||
// The SDR queue includes deals which are in the process of entering the sealing pipeline - size of this queue
|
||||
// will also impact the maximum number of ParkPiece tasks which can run concurrently.
|
||||
//
|
||||
// SDR queue is the first queue in the sealing pipeline, meaning that it should be used as the primary backpressure mechanism.
|
||||
MaxQueueSDR int
|
||||
|
||||
// Maximum number of sectors that can be queued waiting for SDRTrees to start processing.
|
||||
// 0 = unlimited
|
||||
// Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
|
||||
// In case of the trees tasks it is possible that this queue grows more than this limit, the backpressure is only
|
||||
// applied to sectors entering the pipeline.
|
||||
MaxQueueTrees int
|
||||
|
||||
// Maximum number of sectors that can be queued waiting for PoRep to start processing.
|
||||
// 0 = unlimited
|
||||
// Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
|
||||
// Like with the trees tasks, it is possible that this queue grows more than this limit, the backpressure is only
|
||||
// applied to sectors entering the pipeline.
|
||||
MaxQueuePoRep int
|
||||
}
|
||||
|
||||
// API contains configs for API endpoint
|
||||
type API struct {
|
||||
// Binding address for the Lotus API
|
||||
|
Loading…
Reference in New Issue
Block a user