From 4b1445e3b2ad863a5f10342a7c55c611ee258c24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 6 Dec 2023 20:53:50 +0100 Subject: [PATCH 1/3] fix: lotus-provider: Wait for the correct taskID --- provider/lpmessage/sender.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/provider/lpmessage/sender.go b/provider/lpmessage/sender.go index bee22ff69..5d03cfd02 100644 --- a/provider/lpmessage/sender.go +++ b/provider/lpmessage/sender.go @@ -324,6 +324,7 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS return cid.Undef, xerrors.Errorf("marshaling message: %w", err) } + var sendTaskID *harmonytask.TaskID taskAdder(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { _, err := tx.Exec(`insert into message_sends (from_key, to_addr, send_reason, unsigned_data, unsigned_cid, send_task_id) values ($1, $2, $3, $4, $5, $6)`, msg.From.String(), msg.To.String(), reason, unsBytes.Bytes(), msg.Cid().String(), id) @@ -331,9 +332,15 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS return false, xerrors.Errorf("inserting message into db: %w", err) } + sendTaskID = &id + return true, nil }) + if sendTaskID == nil { + return cid.Undef, xerrors.Errorf("failed to add task") + } + // wait for exec var ( pollInterval = 50 * time.Millisecond @@ -350,7 +357,7 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS var sigCidStr, sendError string var sendSuccess *bool - err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, taskAdder).Scan(&sigCidStr, &sendSuccess, &sendError) + err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, &sendTaskID).Scan(&sigCidStr, &sendSuccess, &sendError) if err != nil { return cid.Undef, xerrors.Errorf("getting cid for task: %w", err) } From 93fd40826740b7caa122144f6d0e535345c8e2e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 6 Dec 2023 22:54:46 +0100 Subject: [PATCH 2/3] lotus-provider: additional fixes to make recover work --- lib/harmony/harmonytask/harmonytask.go | 5 +++++ provider/lpmessage/sender.go | 7 ++++++- provider/lpwindow/recover_task.go | 2 +- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/lib/harmony/harmonytask/harmonytask.go b/lib/harmony/harmonytask/harmonytask.go index 595e5b63a..31b632975 100644 --- a/lib/harmony/harmonytask/harmonytask.go +++ b/lib/harmony/harmonytask/harmonytask.go @@ -146,6 +146,11 @@ func New( TaskTypeDetails: c.TypeDetails(), TaskEngine: e, } + + if len(h.Name) > 16 { + return nil, fmt.Errorf("task name too long: %s, max 16 characters", h.Name) + } + e.handlers = append(e.handlers, &h) e.taskMap[h.TaskTypeDetails.Name] = &h } diff --git a/provider/lpmessage/sender.go b/provider/lpmessage/sender.go index 5d03cfd02..5467e5d2f 100644 --- a/provider/lpmessage/sender.go +++ b/provider/lpmessage/sender.go @@ -354,7 +354,7 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS for { var err error - var sigCidStr, sendError string + var sigCidStr, sendError *string var sendSuccess *bool err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, &sendTaskID).Scan(&sigCidStr, &sendSuccess, &sendError) @@ -373,6 +373,11 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS continue } + if sigCidStr == nil || sendError == nil { + // should never happen because sendSuccess is already not null here + return cid.Undef, xerrors.Errorf("got null values for sigCidStr or sendError, this should never happen") + } + if !*sendSuccess { sendErr = xerrors.Errorf("send error: %s", sendError) } else { diff --git a/provider/lpwindow/recover_task.go b/provider/lpwindow/recover_task.go index 6006f3c35..12f8522b5 100644 --- a/provider/lpwindow/recover_task.go +++ b/provider/lpwindow/recover_task.go @@ -217,7 +217,7 @@ func (w *WdPostRecoverDeclareTask) CanAccept(ids []harmonytask.TaskID, engine *h func (w *WdPostRecoverDeclareTask) TypeDetails() harmonytask.TaskTypeDetails { return harmonytask.TaskTypeDetails{ Max: 128, - Name: "WdPostRecoverDeclare", + Name: "WdPostRecover", Cost: resources.Resources{ Cpu: 1, Gpu: 0, From bf5132e50bedd405948c85d51365d29c0956c9c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 6 Dec 2023 23:00:53 +0100 Subject: [PATCH 3/3] more sender fixes --- provider/lpmessage/sender.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/provider/lpmessage/sender.go b/provider/lpmessage/sender.go index 5467e5d2f..0db0c0b51 100644 --- a/provider/lpmessage/sender.go +++ b/provider/lpmessage/sender.go @@ -379,9 +379,9 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS } if !*sendSuccess { - sendErr = xerrors.Errorf("send error: %s", sendError) + sendErr = xerrors.Errorf("send error: %s", *sendError) } else { - sigCid, err = cid.Parse(sigCidStr) + sigCid, err = cid.Parse(*sigCidStr) if err != nil { return cid.Undef, xerrors.Errorf("parsing signed cid: %w", err) }