v1.27.0-a #10
@ -4,6 +4,8 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||||
|
"github.com/google/uuid"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -14,7 +16,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/fatih/color"
|
"github.com/fatih/color"
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
"github.com/mitchellh/go-homedir"
|
"github.com/mitchellh/go-homedir"
|
||||||
manet "github.com/multiformats/go-multiaddr/net"
|
manet "github.com/multiformats/go-multiaddr/net"
|
||||||
"github.com/urfave/cli/v2"
|
"github.com/urfave/cli/v2"
|
||||||
@ -427,7 +428,7 @@ var lpBoostProxyCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
|
|
||||||
pieceInfoLk := new(sync.Mutex)
|
pieceInfoLk := new(sync.Mutex)
|
||||||
pieceInfos := map[cid.Cid][]pieceInfo{}
|
pieceInfos := map[uuid.UUID][]pieceInfo{}
|
||||||
|
|
||||||
ast.Internal.SectorAddPieceToAny = func(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data, deal api.PieceDealInfo) (api.SectorOffset, error) {
|
ast.Internal.SectorAddPieceToAny = func(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data, deal api.PieceDealInfo) (api.SectorOffset, error) {
|
||||||
origPieceData := pieceData
|
origPieceData := pieceData
|
||||||
@ -449,25 +450,68 @@ var lpBoostProxyCmd = &cli.Command{
|
|||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pieceUUID := uuid.New()
|
||||||
|
|
||||||
pieceInfoLk.Lock()
|
pieceInfoLk.Lock()
|
||||||
pieceInfos[deal.DealProposal.PieceCID] = append(pieceInfos[deal.DealProposal.PieceCID], pi)
|
pieceInfos[pieceUUID] = append(pieceInfos[pieceUUID], pi)
|
||||||
pieceInfoLk.Unlock()
|
pieceInfoLk.Unlock()
|
||||||
|
|
||||||
// /piece?piece_cid=xxxx
|
// /piece?piece_cid=xxxx
|
||||||
dataUrl := rootUrl
|
dataUrl := rootUrl
|
||||||
dataUrl.Path = "/piece"
|
dataUrl.Path = "/piece"
|
||||||
dataUrl.RawQuery = "piece_cid=" + deal.DealProposal.PieceCID.String()
|
dataUrl.RawQuery = "piece_id=" + pieceUUID.String()
|
||||||
|
|
||||||
|
// add piece entry
|
||||||
|
|
||||||
|
var refID int64
|
||||||
|
|
||||||
|
comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||||
|
// Add parked_piece, on conflict do nothing
|
||||||
|
var pieceID int64
|
||||||
|
err = tx.QueryRow(`
|
||||||
|
INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size)
|
||||||
|
VALUES ($1, $2, $3)
|
||||||
|
ON CONFLICT (piece_cid) DO UPDATE
|
||||||
|
SET piece_cid = EXCLUDED.piece_cid
|
||||||
|
RETURNING id`, deal.DealProposal.PieceCID.String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("upserting parked piece and getting id: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add parked_piece_ref
|
||||||
|
err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url)
|
||||||
|
VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("inserting parked piece ref: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If everything went well, commit the transaction
|
||||||
|
return true, nil // This will commit the transaction
|
||||||
|
}, harmonydb.OptionRetry())
|
||||||
|
if err != nil {
|
||||||
|
return api.SectorOffset{}, xerrors.Errorf("inserting parked piece: %w", err)
|
||||||
|
}
|
||||||
|
if !comm {
|
||||||
|
return api.SectorOffset{}, xerrors.Errorf("piece tx didn't commit")
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for piece to be parked
|
||||||
|
|
||||||
|
<-pi.done
|
||||||
|
|
||||||
|
pieceIDUrl := url.URL{
|
||||||
|
Scheme: "pieceref",
|
||||||
|
Opaque: fmt.Sprintf("%d", refID),
|
||||||
|
}
|
||||||
|
|
||||||
// make a sector
|
// make a sector
|
||||||
so, err := pin.AllocatePieceToSector(ctx, maddr, deal, int64(pieceSize), dataUrl, nil)
|
so, err := pin.AllocatePieceToSector(ctx, maddr, deal, int64(pieceSize), pieceIDUrl, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return api.SectorOffset{}, err
|
return api.SectorOffset{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
color.Blue("%s piece assigned to sector f0%d:%d @ %d", deal.DealProposal.PieceCID, mid, so.Sector, so.Offset)
|
color.Blue("%s piece assigned to sector f0%d:%d @ %d", deal.DealProposal.PieceCID, mid, so.Sector, so.Offset)
|
||||||
|
|
||||||
<-pi.done
|
|
||||||
|
|
||||||
return so, nil
|
return so, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -484,10 +528,12 @@ var lpBoostProxyCmd = &cli.Command{
|
|||||||
ast.Internal.StorageGetLocks = si.StorageGetLocks
|
ast.Internal.StorageGetLocks = si.StorageGetLocks
|
||||||
|
|
||||||
var pieceHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) {
|
var pieceHandler http.HandlerFunc = func(w http.ResponseWriter, r *http.Request) {
|
||||||
// /piece?piece_cid=xxxx
|
// /piece?piece_id=xxxx
|
||||||
pieceCid, err := cid.Decode(r.URL.Query().Get("piece_cid"))
|
pieceUUID := r.URL.Query().Get("piece_id")
|
||||||
|
|
||||||
|
pu, err := uuid.Parse(pieceUUID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, "bad piece_cid", http.StatusBadRequest)
|
http.Error(w, "bad piece id", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -496,13 +542,13 @@ var lpBoostProxyCmd = &cli.Command{
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("%s request for piece from %s\n", pieceCid, r.RemoteAddr)
|
fmt.Printf("%s request for piece from %s\n", pieceUUID, r.RemoteAddr)
|
||||||
|
|
||||||
pieceInfoLk.Lock()
|
pieceInfoLk.Lock()
|
||||||
pis, ok := pieceInfos[pieceCid]
|
pis, ok := pieceInfos[pu]
|
||||||
if !ok {
|
if !ok {
|
||||||
http.Error(w, "piece not found", http.StatusNotFound)
|
http.Error(w, "piece not found", http.StatusNotFound)
|
||||||
color.Red("%s not found", pieceCid)
|
color.Red("%s not found", pu)
|
||||||
pieceInfoLk.Unlock()
|
pieceInfoLk.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -511,7 +557,10 @@ var lpBoostProxyCmd = &cli.Command{
|
|||||||
pi := pis[0]
|
pi := pis[0]
|
||||||
pis = pis[1:]
|
pis = pis[1:]
|
||||||
|
|
||||||
pieceInfos[pieceCid] = pis
|
pieceInfos[pu] = pis
|
||||||
|
if len(pis) == 0 {
|
||||||
|
delete(pieceInfos, pu)
|
||||||
|
}
|
||||||
|
|
||||||
pieceInfoLk.Unlock()
|
pieceInfoLk.Unlock()
|
||||||
|
|
||||||
@ -533,7 +582,7 @@ var lpBoostProxyCmd = &cli.Command{
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
color.Green("%s served %.3f MiB in %s (%.2f MiB/s)", pieceCid, float64(n)/(1024*1024), took, mbps)
|
color.Green("%s served %.3f MiB in %s (%.2f MiB/s)", pu, float64(n)/(1024*1024), took, mbps)
|
||||||
}
|
}
|
||||||
|
|
||||||
finalApi := proxy.LoggingAPI[api.StorageMiner, api.StorageMinerStruct](&ast)
|
finalApi := proxy.LoggingAPI[api.StorageMiner, api.StorageMinerStruct](&ast)
|
||||||
|
@ -2,11 +2,14 @@ create table parked_pieces (
|
|||||||
id bigserial primary key,
|
id bigserial primary key,
|
||||||
created_at timestamp default current_timestamp,
|
created_at timestamp default current_timestamp,
|
||||||
|
|
||||||
piece_cid text not null,
|
piece_cid text not null unique constraint parked_pieces_piece_cid_key,
|
||||||
piece_padded_size bigint not null,
|
piece_padded_size bigint not null,
|
||||||
piece_raw_size text not null,
|
piece_raw_size text not null,
|
||||||
|
|
||||||
complete boolean not null default false
|
complete boolean not null default false,
|
||||||
|
task_id bigint default null,
|
||||||
|
|
||||||
|
foreign key (task_id) references harmony_task (id) on delete set null
|
||||||
);
|
);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -14,23 +17,15 @@ create table parked_pieces (
|
|||||||
* so that we can delete them when they are no longer needed.
|
* so that we can delete them when they are no longer needed.
|
||||||
*
|
*
|
||||||
* All references into the parked_pieces table should be done through this table.
|
* All references into the parked_pieces table should be done through this table.
|
||||||
|
*
|
||||||
|
* data_url is optional for refs which also act as data sources.
|
||||||
*/
|
*/
|
||||||
create table parked_piece_refs (
|
create table parked_piece_refs (
|
||||||
ref_id bigserial primary key,
|
ref_id bigserial primary key,
|
||||||
piece_id bigint not null,
|
piece_id bigint not null,
|
||||||
|
|
||||||
|
data_url text,
|
||||||
|
data_headers jsonb not null default '{}',
|
||||||
|
|
||||||
foreign key (piece_id) references parked_pieces(id) on delete cascade
|
foreign key (piece_id) references parked_pieces(id) on delete cascade
|
||||||
);
|
);
|
||||||
|
|
||||||
create table park_piece_tasks (
|
|
||||||
task_id bigint not null
|
|
||||||
constraint park_piece_tasks_pk
|
|
||||||
primary key,
|
|
||||||
|
|
||||||
piece_ref_id bigint not null,
|
|
||||||
|
|
||||||
data_url text not null,
|
|
||||||
data_headers jsonb not null default '{}',
|
|
||||||
data_raw_size bigint not null,
|
|
||||||
data_delete_on_finalize bool not null
|
|
||||||
);
|
|
||||||
|
@ -3,15 +3,12 @@ package lppiece
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||||
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
|
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
|
||||||
"github.com/filecoin-project/lotus/lib/harmony/resources"
|
"github.com/filecoin-project/lotus/lib/harmony/resources"
|
||||||
@ -22,6 +19,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var log = logging.Logger("lppiece")
|
var log = logging.Logger("lppiece")
|
||||||
|
var PieceParkPollInterval = time.Second * 15
|
||||||
|
|
||||||
// ParkPieceTask gets a piece from some origin, and parks it in storage
|
// ParkPieceTask gets a piece from some origin, and parks it in storage
|
||||||
// Pieces are always f00, piece ID is mapped to pieceCID in the DB
|
// Pieces are always f00, piece ID is mapped to pieceCID in the DB
|
||||||
@ -35,103 +33,129 @@ type ParkPieceTask struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewParkPieceTask(db *harmonydb.DB, sc *lpffi.SealCalls, max int) *ParkPieceTask {
|
func NewParkPieceTask(db *harmonydb.DB, sc *lpffi.SealCalls, max int) *ParkPieceTask {
|
||||||
return &ParkPieceTask{
|
pt := &ParkPieceTask{
|
||||||
db: db,
|
db: db,
|
||||||
sc: sc,
|
sc: sc,
|
||||||
|
|
||||||
max: max,
|
max: max,
|
||||||
}
|
}
|
||||||
|
go pt.pollPieceTasks(context.Background())
|
||||||
|
return pt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ParkPieceTask) PullPiece(ctx context.Context, pieceCID cid.Cid, rawSize int64, paddedSize abi.PaddedPieceSize, dataUrl string, headers http.Header) error {
|
func (p *ParkPieceTask) pollPieceTasks(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
// select parked pieces with no task_id
|
||||||
|
var pieceIDs []struct {
|
||||||
|
ID storiface.PieceNumber `db:"id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
err := p.db.Select(ctx, &pieceIDs, `SELECT id FROM parked_pieces WHERE complete = FALSE AND task_id IS NULL`)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to get parked pieces: %s", err)
|
||||||
|
time.Sleep(PieceParkPollInterval)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(pieceIDs) == 0 {
|
||||||
|
time.Sleep(PieceParkPollInterval)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, pieceID := range pieceIDs {
|
||||||
|
// create a task for each piece
|
||||||
p.TF.Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
|
p.TF.Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
|
||||||
var pieceID int
|
// update
|
||||||
err = tx.QueryRow(`INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size) VALUES ($1, $2, $3) RETURNING id`, pieceCID.String(), int64(paddedSize), rawSize).Scan(&pieceID)
|
n, err := tx.Exec(`UPDATE parked_pieces SET task_id = $1 WHERE id = $2 AND complete = FALSE AND task_id IS NULL`, id, pieceID.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("inserting parked piece: %w", err)
|
return false, xerrors.Errorf("updating parked piece: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var refID int
|
// commit only if we updated the piece
|
||||||
err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id) VALUES ($1) RETURNING ref_id`, pieceID).Scan(&refID)
|
return n > 0, nil
|
||||||
if err != nil {
|
|
||||||
return false, xerrors.Errorf("inserting parked piece ref: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
headersJson, err := json.Marshal(headers)
|
|
||||||
if err != nil {
|
|
||||||
return false, xerrors.Errorf("marshaling headers: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = tx.Exec(`INSERT INTO park_piece_tasks (task_id, piece_ref_id, data_url, data_headers, data_raw_size, data_delete_on_finalize)
|
|
||||||
VALUES ($1, $2, $3, $4, $5, $6)`, id, refID, dataUrl, headersJson, rawSize, false)
|
|
||||||
if err != nil {
|
|
||||||
return false, xerrors.Errorf("inserting park piece task: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
})
|
||||||
|
}
|
||||||
return nil
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ParkPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
|
func (p *ParkPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
var taskData []struct {
|
// Define a struct to hold piece data.
|
||||||
PieceID int `db:"id"`
|
var piecesData []struct {
|
||||||
|
PieceID int64 `db:"id"`
|
||||||
PieceCreatedAt time.Time `db:"created_at"`
|
PieceCreatedAt time.Time `db:"created_at"`
|
||||||
PieceCID string `db:"piece_cid"`
|
PieceCID string `db:"piece_cid"`
|
||||||
Complete bool `db:"complete"`
|
Complete bool `db:"complete"`
|
||||||
|
PiecePaddedSize int64 `db:"piece_padded_size"`
|
||||||
DataURL string `db:"data_url"`
|
PieceRawSize string `db:"piece_raw_size"`
|
||||||
DataHeaders string `db:"data_headers"`
|
|
||||||
DataRawSize int64 `db:"data_raw_size"`
|
|
||||||
DataDeleteOnFinalize bool `db:"data_delete_on_finalize"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err = p.db.Select(ctx, &taskData, `
|
// Select the piece data using the task ID.
|
||||||
select
|
err = p.db.Select(ctx, &piecesData, `
|
||||||
pp.id,
|
SELECT id, created_at, piece_cid, complete, piece_padded_size, piece_raw_size
|
||||||
pp.created_at,
|
FROM parked_pieces
|
||||||
pp.piece_cid,
|
WHERE task_id = $1
|
||||||
pp.complete,
|
|
||||||
ppt.data_url,
|
|
||||||
ppt.data_headers,
|
|
||||||
ppt.data_raw_size,
|
|
||||||
ppt.data_delete_on_finalize
|
|
||||||
from park_piece_tasks ppt
|
|
||||||
join parked_piece_refs ppr on ppt.piece_ref_id = ppr.ref_id
|
|
||||||
join parked_pieces pp on ppr.piece_id = pp.id
|
|
||||||
where ppt.task_id = $1
|
|
||||||
`, taskID)
|
`, taskID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, xerrors.Errorf("fetching piece data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(taskData) != 1 {
|
if len(piecesData) == 0 {
|
||||||
return false, xerrors.Errorf("expected 1 task, got %d", len(taskData))
|
return false, xerrors.Errorf("no piece data found for task_id: %d", taskID)
|
||||||
}
|
}
|
||||||
|
|
||||||
if taskData[0].Complete {
|
pieceData := piecesData[0]
|
||||||
log.Warnw("park piece task already complete", "task_id", taskID, "piece_cid", taskData[0].PieceCID)
|
|
||||||
|
if pieceData.Complete {
|
||||||
|
log.Warnw("park piece task already complete", "task_id", taskID, "piece_cid", pieceData.PieceCID)
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Define a struct for reference data.
|
||||||
|
var refData []struct {
|
||||||
|
DataURL string `db:"data_url"`
|
||||||
|
DataHeaders json.RawMessage `db:"data_headers"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now, select the first reference data that has a URL.
|
||||||
|
err = p.db.Select(ctx, &refData, `
|
||||||
|
SELECT data_url, data_headers
|
||||||
|
FROM parked_piece_refs
|
||||||
|
WHERE piece_id = $1 AND data_url IS NOT NULL
|
||||||
|
LIMIT 1
|
||||||
|
`, pieceData.PieceID)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("fetching reference data: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(refData) == 0 {
|
||||||
|
return false, xerrors.Errorf("no refs found for piece_id: %d", pieceData.PieceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert piece_raw_size from string to int64.
|
||||||
|
pieceRawSize, err := strconv.ParseInt(pieceData.PieceRawSize, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("parsing piece raw size: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if refData[0].DataURL != "" {
|
||||||
upr := &lpseal.UrlPieceReader{
|
upr := &lpseal.UrlPieceReader{
|
||||||
Url: taskData[0].DataURL,
|
Url: refData[0].DataURL,
|
||||||
RawSize: taskData[0].DataRawSize,
|
RawSize: pieceRawSize,
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
_ = upr.Close()
|
_ = upr.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
pnum := storiface.PieceNumber(taskData[0].PieceID)
|
pnum := storiface.PieceNumber(pieceData.PieceID)
|
||||||
|
|
||||||
if err := p.sc.WritePiece(ctx, pnum, taskData[0].DataRawSize, upr); err != nil {
|
if err := p.sc.WritePiece(ctx, pnum, pieceRawSize, upr); err != nil {
|
||||||
return false, xerrors.Errorf("write piece: %w", err)
|
return false, xerrors.Errorf("write piece: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = p.db.Exec(ctx, `update parked_pieces set complete = true where id = $1`, taskData[0].PieceID)
|
// Update the piece as complete after a successful write.
|
||||||
|
_, err = p.db.Exec(ctx, `UPDATE parked_pieces SET complete = TRUE WHERE id = $1`, pieceData.PieceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("marking piece as complete: %w", err)
|
return false, xerrors.Errorf("marking piece as complete: %w", err)
|
||||||
}
|
}
|
||||||
@ -139,6 +163,10 @@ func (p *ParkPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (d
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If no URL is found, this indicates an issue since at least one URL is expected.
|
||||||
|
return false, xerrors.Errorf("no data URL found for piece_id: %d", pieceData.PieceID)
|
||||||
|
}
|
||||||
|
|
||||||
func (p *ParkPieceTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
|
func (p *ParkPieceTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
|
||||||
id := ids[0]
|
id := ids[0]
|
||||||
return &id, nil
|
return &id, nil
|
||||||
|
Loading…
Reference in New Issue
Block a user