lp tester fix: done after compute
This commit is contained in:
parent
5120add25a
commit
cef565f08b
@ -2,7 +2,6 @@ package lpmessage
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
@ -19,10 +18,6 @@ import (
|
|||||||
|
|
||||||
var log = logging.Logger("lpmessage")
|
var log = logging.Logger("lpmessage")
|
||||||
|
|
||||||
type str string // makes ctx value collissions impossible
|
|
||||||
|
|
||||||
var CtxTestTaskID str = "task_id"
|
|
||||||
|
|
||||||
type SenderAPI interface {
|
type SenderAPI interface {
|
||||||
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
|
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
|
||||||
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
|
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
|
||||||
@ -104,14 +99,6 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
|
|||||||
|
|
||||||
var sigMsg *types.SignedMessage
|
var sigMsg *types.SignedMessage
|
||||||
|
|
||||||
var idCount int
|
|
||||||
err = s.db.QueryRow(ctx, `SELECT COUNT(*) FROM harmony_test WHERE task_id=$1`,
|
|
||||||
ctx.Value(CtxTestTaskID)).Scan(&idCount)
|
|
||||||
if err != nil {
|
|
||||||
return cid.Undef, xerrors.Errorf("reading harmony_test: %w", err)
|
|
||||||
}
|
|
||||||
noSend := idCount == 1
|
|
||||||
|
|
||||||
// start db tx
|
// start db tx
|
||||||
c, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
c, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||||
// assign nonce (max(api.MpoolGetNonce, db nonce+1))
|
// assign nonce (max(api.MpoolGetNonce, db nonce+1))
|
||||||
@ -149,28 +136,6 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
|
|||||||
return false, xerrors.Errorf("marshaling message: %w", err)
|
return false, xerrors.Errorf("marshaling message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if noSend {
|
|
||||||
|
|
||||||
data, err := json.MarshalIndent(map[string]any{
|
|
||||||
"from_key": fromA.String(),
|
|
||||||
"nonce": msg.Nonce,
|
|
||||||
"to_addr": msg.To.String(),
|
|
||||||
"signed_data": data,
|
|
||||||
"signed_json": string(jsonBytes),
|
|
||||||
"signed_cid": sigMsg.Cid(),
|
|
||||||
"send_reason": reason,
|
|
||||||
}, "", " ")
|
|
||||||
if err != nil {
|
|
||||||
return false, xerrors.Errorf("marshaling message: %w", err)
|
|
||||||
}
|
|
||||||
id := ctx.Value(CtxTestTaskID)
|
|
||||||
_, err = tx.Exec(`UPDATE harmony_test SET result=$1 WHERE task_id=$2`, string(data), id)
|
|
||||||
if err != nil {
|
|
||||||
return false, xerrors.Errorf("updating harmony_test: %w", err)
|
|
||||||
}
|
|
||||||
log.Infof("SKIPPED sending test message to chain. Query harmony_test WHERE task_id= %v", id)
|
|
||||||
return true, nil // nothing committed
|
|
||||||
}
|
|
||||||
// write to db
|
// write to db
|
||||||
c, err := tx.Exec(`insert into message_sends (from_key, nonce, to_addr, signed_data, signed_json, signed_cid, send_reason) values ($1, $2, $3, $4, $5, $6, $7)`,
|
c, err := tx.Exec(`insert into message_sends (from_key, nonce, to_addr, signed_data, signed_json, signed_cid, send_reason) values ($1, $2, $3, $4, $5, $6, $7)`,
|
||||||
fromA.String(), msg.Nonce, msg.To.String(), data, string(jsonBytes), sigMsg.Cid().String(), reason)
|
fromA.String(), msg.Nonce, msg.To.String(), data, string(jsonBytes), sigMsg.Cid().String(), reason)
|
||||||
@ -187,9 +152,6 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
|
|||||||
if err != nil || !c {
|
if err != nil || !c {
|
||||||
return cid.Undef, xerrors.Errorf("transaction failed or didn't commit: %w", err)
|
return cid.Undef, xerrors.Errorf("transaction failed or didn't commit: %w", err)
|
||||||
}
|
}
|
||||||
if noSend {
|
|
||||||
return sigMsg.Cid(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// push to mpool
|
// push to mpool
|
||||||
_, err = s.api.MpoolPush(ctx, sigMsg)
|
_, err = s.api.MpoolPush(ctx, sigMsg)
|
||||||
|
@ -3,6 +3,7 @@ package lpwindow
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
@ -161,13 +162,32 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
|
|||||||
return false, xerrors.Errorf("marshaling PoSt: %w", err)
|
return false, xerrors.Errorf("marshaling PoSt: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
testTaskID := 0
|
|
||||||
testTaskIDCt := 0
|
testTaskIDCt := 0
|
||||||
if err = t.db.QueryRow(context.Background(), `SELECT COUNT(*) FROM harmony_test WHERE task_id = $1`, taskID).Scan(&testTaskIDCt); err != nil {
|
if err = t.db.QueryRow(context.Background(), `SELECT COUNT(*) FROM harmony_test WHERE task_id = $1`, taskID).Scan(&testTaskIDCt); err != nil {
|
||||||
return false, xerrors.Errorf("querying for test task: %w", err)
|
return false, xerrors.Errorf("querying for test task: %w", err)
|
||||||
}
|
}
|
||||||
if testTaskIDCt == 1 {
|
if testTaskIDCt == 1 {
|
||||||
testTaskID = int(taskID)
|
// Do not send test tasks to the chain but to harmony_test & stdout.
|
||||||
|
|
||||||
|
data, err := json.MarshalIndent(map[string]any{
|
||||||
|
"sp_id": spID,
|
||||||
|
"proving_period_start": pps,
|
||||||
|
"deadline": deadline.Index,
|
||||||
|
"partition": partIdx,
|
||||||
|
"submit_at_epoch": deadline.Open,
|
||||||
|
"submit_by_epoch": deadline.Close,
|
||||||
|
"proof_params": msgbuf.Bytes(),
|
||||||
|
}, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("marshaling message: %w", err)
|
||||||
|
}
|
||||||
|
ctx := context.Background()
|
||||||
|
_, err = t.db.Exec(ctx, `UPDATE harmony_test SET result=$1 WHERE task_id=$2`, string(data), taskID)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("updating harmony_test: %w", err)
|
||||||
|
}
|
||||||
|
log.Infof("SKIPPED sending test message to chain. SELECT * FROM harmony_test WHERE task_id= %v", taskID)
|
||||||
|
return true, nil // nothing committed
|
||||||
}
|
}
|
||||||
// Insert into wdpost_proofs table
|
// Insert into wdpost_proofs table
|
||||||
n, err := t.db.Exec(context.Background(),
|
n, err := t.db.Exec(context.Background(),
|
||||||
@ -178,9 +198,8 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
|
|||||||
partition,
|
partition,
|
||||||
submit_at_epoch,
|
submit_at_epoch,
|
||||||
submit_by_epoch,
|
submit_by_epoch,
|
||||||
proof_params,
|
proof_params)
|
||||||
test_task_id)
|
VALUES ($1, $2, $3, $4, $5, $6, $7)`,
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
|
|
||||||
spID,
|
spID,
|
||||||
pps,
|
pps,
|
||||||
deadline.Index,
|
deadline.Index,
|
||||||
@ -188,7 +207,6 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
|
|||||||
deadline.Open,
|
deadline.Open,
|
||||||
deadline.Close,
|
deadline.Close,
|
||||||
msgbuf.Bytes(),
|
msgbuf.Bytes(),
|
||||||
testTaskID,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -78,12 +78,11 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
|
|||||||
var pps, submitAtEpoch, submitByEpoch abi.ChainEpoch
|
var pps, submitAtEpoch, submitByEpoch abi.ChainEpoch
|
||||||
var earlyParamBytes []byte
|
var earlyParamBytes []byte
|
||||||
var dbTask uint64
|
var dbTask uint64
|
||||||
var testTaskID uint64
|
|
||||||
|
|
||||||
err = w.db.QueryRow(
|
err = w.db.QueryRow(
|
||||||
context.Background(), `SELECT sp_id, proving_period_start, deadline, partition, submit_at_epoch, submit_by_epoch, proof_params, submit_task_id, test_task_id
|
context.Background(), `SELECT sp_id, proving_period_start, deadline, partition, submit_at_epoch, submit_by_epoch, proof_params, submit_task_id
|
||||||
FROM wdpost_proofs WHERE submit_task_id = $1`, taskID,
|
FROM wdpost_proofs WHERE submit_task_id = $1`, taskID,
|
||||||
).Scan(&spID, &pps, &deadline, &partition, &submitAtEpoch, &submitByEpoch, &earlyParamBytes, &dbTask, &testTaskID)
|
).Scan(&spID, &pps, &deadline, &partition, &submitAtEpoch, &submitByEpoch, &earlyParamBytes, &dbTask)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("query post proof: %w", err)
|
return false, xerrors.Errorf("query post proof: %w", err)
|
||||||
}
|
}
|
||||||
@ -150,7 +149,7 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
|
|||||||
return false, xerrors.Errorf("preparing proof message: %w", err)
|
return false, xerrors.Errorf("preparing proof message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := context.WithValue(context.Background(), lpmessage.CtxTestTaskID, testTaskID)
|
ctx := context.Background()
|
||||||
smsg, err := w.sender.Send(ctx, msg, mss, "wdpost")
|
smsg, err := w.sender.Send(ctx, msg, mss, "wdpost")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("sending proof message: %w", err)
|
return false, xerrors.Errorf("sending proof message: %w", err)
|
||||||
|
Loading…
Reference in New Issue
Block a user