Merge pull request #11493 from filecoin-project/fix/lp-send-taskid-wait

fix: lotus-provider: Wait for the correct taskID
This commit is contained in:
Andrew Jackson (Ajax) 2023-12-06 16:16:08 -06:00 committed by GitHub
commit d32b8be99d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 5 deletions

View File

@ -146,6 +146,11 @@ func New(
TaskTypeDetails: c.TypeDetails(), TaskTypeDetails: c.TypeDetails(),
TaskEngine: e, TaskEngine: e,
} }
if len(h.Name) > 16 {
return nil, fmt.Errorf("task name too long: %s, max 16 characters", h.Name)
}
e.handlers = append(e.handlers, &h) e.handlers = append(e.handlers, &h)
e.taskMap[h.TaskTypeDetails.Name] = &h e.taskMap[h.TaskTypeDetails.Name] = &h
} }

View File

@ -324,6 +324,7 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
return cid.Undef, xerrors.Errorf("marshaling message: %w", err) return cid.Undef, xerrors.Errorf("marshaling message: %w", err)
} }
var sendTaskID *harmonytask.TaskID
taskAdder(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { taskAdder(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
_, err := tx.Exec(`insert into message_sends (from_key, to_addr, send_reason, unsigned_data, unsigned_cid, send_task_id) values ($1, $2, $3, $4, $5, $6)`, _, err := tx.Exec(`insert into message_sends (from_key, to_addr, send_reason, unsigned_data, unsigned_cid, send_task_id) values ($1, $2, $3, $4, $5, $6)`,
msg.From.String(), msg.To.String(), reason, unsBytes.Bytes(), msg.Cid().String(), id) msg.From.String(), msg.To.String(), reason, unsBytes.Bytes(), msg.Cid().String(), id)
@ -331,9 +332,15 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
return false, xerrors.Errorf("inserting message into db: %w", err) return false, xerrors.Errorf("inserting message into db: %w", err)
} }
sendTaskID = &id
return true, nil return true, nil
}) })
if sendTaskID == nil {
return cid.Undef, xerrors.Errorf("failed to add task")
}
// wait for exec // wait for exec
var ( var (
pollInterval = 50 * time.Millisecond pollInterval = 50 * time.Millisecond
@ -347,10 +354,10 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
for { for {
var err error var err error
var sigCidStr, sendError string var sigCidStr, sendError *string
var sendSuccess *bool var sendSuccess *bool
err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, taskAdder).Scan(&sigCidStr, &sendSuccess, &sendError) err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, &sendTaskID).Scan(&sigCidStr, &sendSuccess, &sendError)
if err != nil { if err != nil {
return cid.Undef, xerrors.Errorf("getting cid for task: %w", err) return cid.Undef, xerrors.Errorf("getting cid for task: %w", err)
} }
@ -366,10 +373,15 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
continue continue
} }
if sigCidStr == nil || sendError == nil {
// should never happen because sendSuccess is already not null here
return cid.Undef, xerrors.Errorf("got null values for sigCidStr or sendError, this should never happen")
}
if !*sendSuccess { if !*sendSuccess {
sendErr = xerrors.Errorf("send error: %s", sendError) sendErr = xerrors.Errorf("send error: %s", *sendError)
} else { } else {
sigCid, err = cid.Parse(sigCidStr) sigCid, err = cid.Parse(*sigCidStr)
if err != nil { if err != nil {
return cid.Undef, xerrors.Errorf("parsing signed cid: %w", err) return cid.Undef, xerrors.Errorf("parsing signed cid: %w", err)
} }

View File

@ -217,7 +217,7 @@ func (w *WdPostRecoverDeclareTask) CanAccept(ids []harmonytask.TaskID, engine *h
func (w *WdPostRecoverDeclareTask) TypeDetails() harmonytask.TaskTypeDetails { func (w *WdPostRecoverDeclareTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{ return harmonytask.TaskTypeDetails{
Max: 128, Max: 128,
Name: "WdPostRecoverDeclare", Name: "WdPostRecover",
Cost: resources.Resources{ Cost: resources.Resources{
Cpu: 1, Cpu: 1,
Gpu: 0, Gpu: 0,