diff --git a/cmd/lotus-shed/lpdeal.go b/cmd/lotus-shed/lpdeal.go index 0a585bf66..b6237e13f 100644 --- a/cmd/lotus-shed/lpdeal.go +++ b/cmd/lotus-shed/lpdeal.go @@ -3,6 +3,7 @@ package main import ( "bytes" "context" + "database/sql" "fmt" "io" "net" @@ -452,6 +453,8 @@ var lpBoostProxyCmd = &cli.Command{ pieceUUID := uuid.New() + color.Blue("%s %s piece assign request with id %s", deal.DealProposal.PieceCID, deal.DealProposal.Provider, pieceUUID) + pieceInfoLk.Lock() pieceInfos[pieceUUID] = append(pieceInfos[pieceUUID], pi) pieceInfoLk.Unlock() @@ -464,23 +467,36 @@ var lpBoostProxyCmd = &cli.Command{ // add piece entry var refID int64 + var pieceWasCreated bool comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { - // Add parked_piece, on conflict do nothing var pieceID int64 - err = tx.QueryRow(` - INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size) - VALUES ($1, $2, $3) - ON CONFLICT (piece_cid) DO UPDATE - SET piece_cid = EXCLUDED.piece_cid - RETURNING id`, deal.DealProposal.PieceCID.String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID) + // Attempt to select the piece ID first + err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID) + if err != nil { - return false, xerrors.Errorf("upserting parked piece and getting id: %w", err) + if err == sql.ErrNoRows { + // Piece does not exist, attempt to insert + err = tx.QueryRow(` + INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size) + VALUES ($1, $2, $3) + ON CONFLICT (piece_cid) DO NOTHING + RETURNING id`, deal.DealProposal.PieceCID.String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID) + if err != nil { + return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err) + } + pieceWasCreated = true // New piece was created + } else { + // Some other error occurred during select + return false, xerrors.Errorf("checking existing parked piece: %w", err) + } + } else { + pieceWasCreated = false // Piece already exists, no new piece was created } // Add parked_piece_ref err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url) - VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID) + VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID) if err != nil { return false, xerrors.Errorf("inserting parked piece ref: %w", err) } @@ -496,8 +512,28 @@ var lpBoostProxyCmd = &cli.Command{ } // wait for piece to be parked + if pieceWasCreated { + <-pi.done + } else { + // If the piece was not created, we need to close the done channel + close(pi.done) - <-pi.done + go func() { + // close the data reader (drain to eof if it's not a closer) + if closer, ok := pieceData.(io.Closer); ok { + if err := closer.Close(); err != nil { + log.Warnw("closing pieceData in DataCid", "error", err) + } + } else { + log.Warnw("pieceData is not an io.Closer", "type", fmt.Sprintf("%T", pieceData)) + + _, err := io.Copy(io.Discard, pieceData) + if err != nil { + log.Warnw("draining pieceData in DataCid", "error", err) + } + } + }() + } pieceIDUrl := url.URL{ Scheme: "pieceref", diff --git a/lib/harmony/harmonydb/sql/20240228-piece-park.sql b/lib/harmony/harmonydb/sql/20240228-piece-park.sql index 8b6719dc5..95792628a 100644 --- a/lib/harmony/harmonydb/sql/20240228-piece-park.sql +++ b/lib/harmony/harmonydb/sql/20240228-piece-park.sql @@ -4,7 +4,7 @@ create table parked_pieces ( piece_cid text not null, piece_padded_size bigint not null, - piece_raw_size text not null, + piece_raw_size bigint not null, complete boolean not null default false, task_id bigint default null,