Merge pull request #11485 from filecoin-project/sqlGptHard
fix: sql Scan cannot write to an object
This commit is contained in:
commit
e0a8fe3f18
@ -76,7 +76,11 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
|
|||||||
SignedData []byte `db:"signed_data"`
|
SignedData []byte `db:"signed_data"`
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.db.QueryRow(ctx, `select from_key, nonce, to_addr, unsigned_data, unsigned_cid from message_sends where send_task_id = $1`, taskID).Scan(&dbMsg)
|
err = s.db.QueryRow(ctx, `
|
||||||
|
SELECT from_key, nonce, to_addr, unsigned_data, unsigned_cid
|
||||||
|
FROM message_sends
|
||||||
|
WHERE send_task_id = $1`, taskID).Scan(
|
||||||
|
&dbMsg.FromKey, &dbMsg.Nonce, &dbMsg.ToAddr, &dbMsg.UnsignedData, &dbMsg.UnsignedCid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("getting message from db: %w", err)
|
return false, xerrors.Errorf("getting message from db: %w", err)
|
||||||
}
|
}
|
||||||
@ -96,8 +100,11 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
|
|||||||
}
|
}
|
||||||
|
|
||||||
// try to acquire lock
|
// try to acquire lock
|
||||||
cn, err := s.db.Exec(ctx, `INSERT INTO message_send_locks (from_key, task_id, claimed_at) VALUES ($1, $2, CURRENT_TIMESTAMP)
|
cn, err := s.db.Exec(ctx, `
|
||||||
ON CONFLICT (from_key) DO UPDATE SET task_id = EXCLUDED.task_id, claimed_at = CURRENT_TIMESTAMP WHERE message_send_locks.task_id = $2;`, dbMsg.FromKey, taskID)
|
INSERT INTO message_send_locks (from_key, task_id, claimed_at)
|
||||||
|
VALUES ($1, $2, CURRENT_TIMESTAMP) ON CONFLICT (from_key) DO UPDATE
|
||||||
|
SET task_id = EXCLUDED.task_id, claimed_at = CURRENT_TIMESTAMP
|
||||||
|
WHERE message_send_locks.task_id = $2;`, dbMsg.FromKey, taskID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("acquiring send lock: %w", err)
|
return false, xerrors.Errorf("acquiring send lock: %w", err)
|
||||||
}
|
}
|
||||||
@ -114,7 +121,8 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
|
|||||||
|
|
||||||
// defer release db send lock
|
// defer release db send lock
|
||||||
defer func() {
|
defer func() {
|
||||||
_, err2 := s.db.Exec(ctx, `delete from message_send_locks where from_key = $1 and task_id = $2`, dbMsg.FromKey, taskID)
|
_, err2 := s.db.Exec(ctx, `
|
||||||
|
DELETE from message_send_locks WHERE from_key = $1 AND task_id = $2`, dbMsg.FromKey, taskID)
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
log.Errorw("releasing send lock", "task_id", taskID, "from", dbMsg.FromKey, "error", err2)
|
log.Errorw("releasing send lock", "task_id", taskID, "from", dbMsg.FromKey, "error", err2)
|
||||||
|
|
||||||
@ -135,7 +143,8 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
|
|||||||
|
|
||||||
// get nonce from db
|
// get nonce from db
|
||||||
var dbNonce *uint64
|
var dbNonce *uint64
|
||||||
r := s.db.QueryRow(ctx, `select max(nonce) from message_sends where from_key = $1 and send_success = true`, msg.From.String())
|
r := s.db.QueryRow(ctx, `
|
||||||
|
SELECT MAX(nonce) FROM message_sends WHERE from_key = $1 AND send_success = true`, msg.From.String())
|
||||||
if err := r.Scan(&dbNonce); err != nil {
|
if err := r.Scan(&dbNonce); err != nil {
|
||||||
return false, xerrors.Errorf("getting nonce from db: %w", err)
|
return false, xerrors.Errorf("getting nonce from db: %w", err)
|
||||||
}
|
}
|
||||||
@ -164,7 +173,9 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
|
|||||||
|
|
||||||
// write to db
|
// write to db
|
||||||
|
|
||||||
n, err := s.db.Exec(ctx, `update message_sends set nonce = $1, signed_data = $2, signed_json = $3, signed_cid = $4 where send_task_id = $5`,
|
n, err := s.db.Exec(ctx, `
|
||||||
|
UPDATE message_sends SET nonce = $1, signed_data = $2, signed_json = $3, signed_cid = $4
|
||||||
|
WHERE send_task_id = $5`,
|
||||||
msg.Nonce, data, string(jsonBytes), sigMsg.Cid().String(), taskID)
|
msg.Nonce, data, string(jsonBytes), sigMsg.Cid().String(), taskID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("updating db record: %w", err)
|
return false, xerrors.Errorf("updating db record: %w", err)
|
||||||
@ -198,7 +209,9 @@ func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done b
|
|||||||
sendError = err.Error()
|
sendError = err.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = s.db.Exec(ctx, `update message_sends set send_success = $1, send_error = $2, send_time = CURRENT_TIMESTAMP where send_task_id = $3`, sendSuccess, sendError, taskID)
|
_, err = s.db.Exec(ctx, `
|
||||||
|
UPDATE message_sends SET send_success = $1, send_error = $2, send_time = CURRENT_TIMESTAMP
|
||||||
|
WHERE send_task_id = $3`, sendSuccess, sendError, taskID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("updating db record: %w", err)
|
return false, xerrors.Errorf("updating db record: %w", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user