diff --git a/cmd/lotus-provider/proving.go b/cmd/lotus-provider/proving.go index bbe816e80..db34050b5 100644 --- a/cmd/lotus-provider/proving.go +++ b/cmd/lotus-provider/proving.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "os" + "time" "github.com/urfave/cli/v2" "golang.org/x/xerrors" @@ -36,6 +37,11 @@ var wdPostCmd = &cli.Command{ }, } +// wdPostTaskCmd writes to harmony_task and wdpost_partition_tasks, then waits for the result. +// It is intended to be used to test the windowpost scheduler. +// The end of the compute task puts the task_id onto wdpost_proofs, which is read by the submit task. +// The submit task will not send test tasks to the chain, and instead will write the result to harmony_test. +// The result is read by this command, and printed to stdout. var wdPostTaskCmd = &cli.Command{ Name: "task", Aliases: []string{"scheduled", "schedule", "async", "asynchronous"}, @@ -74,17 +80,13 @@ var wdPostTaskCmd = &cli.Command{ if err != nil { return xerrors.Errorf("cannot get miner id %w", err) } - did, err := deps.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { - _, err = tx.Exec(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123)`) + var id int64 + _, err = deps.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + err = tx.QueryRow(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123) RETURNING id`).Scan(&id) if err != nil { log.Error("inserting harmony_task: ", err) return false, xerrors.Errorf("inserting harmony_task: %w", err) } - var id int64 - if err = tx.QueryRow(`SELECT id FROM harmony_task ORDER BY update_time DESC LIMIT 1`).Scan(&id); err != nil { - log.Error("getting inserted id: ", err) - return false, xerrors.Errorf("getting inserted id: %w", err) - } _, err = tx.Exec(`INSERT INTO wdpost_partition_tasks (task_id, sp_id, proving_period_start, deadline_index, partition_index) VALUES ($1, $2, $3, $4, $5)`, id, maddr, ht, cctx.Uint64("deadline"), 0) @@ -101,12 +103,27 @@ var wdPostTaskCmd = &cli.Command{ if err != nil { return xerrors.Errorf("writing SQL transaction: %w", err) } - log.Infof("Inserted task %v", did) - log.Infof("Check your lotus-provider logs for more details.") + fmt.Printf("Inserted task %v. Waiting for success ", id) + var result string + for { + time.Sleep(time.Second) + err = deps.db.QueryRow(ctx, `SELECT result FROM harmony_test WHERE task_id=$1`, id).Scan(&result) + if err != nil { + return xerrors.Errorf("reading result from harmony_test: %w", err) + } + if result != "" { + break + } + fmt.Print(".") + } + log.Infof("Result:", result) return nil }, } +// This command is intended to be used to verify PoSt compute performance. +// It will not send any messages to the chain. Since it can compute any deadline, output may be incorrectly timed for the chain. +// The entire processing happens in this process while you wait. It does not use the scheduler. var wdPostHereCmd = &cli.Command{ Name: "here", Aliases: []string{"cli"}, diff --git a/lib/harmony/harmonydb/sql/20231120-testing1.sql b/lib/harmony/harmonydb/sql/20231120-testing1.sql index 0aeb4fc58..71daaef69 100644 --- a/lib/harmony/harmonydb/sql/20231120-testing1.sql +++ b/lib/harmony/harmonydb/sql/20231120-testing1.sql @@ -2,5 +2,7 @@ CREATE TABLE harmony_test ( task_id bigint constraint harmony_test_pk primary key, - options text -); \ No newline at end of file + options text, + result text +); +ALTER TABLE wdpost_proofs ADD COLUMN test_task_id bigint; \ No newline at end of file diff --git a/provider/builder.go b/provider/builder.go index 928eaaac7..e6adc1795 100644 --- a/provider/builder.go +++ b/provider/builder.go @@ -4,8 +4,6 @@ import ( "context" "time" - logging "github.com/ipfs/go-log/v2" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/node/config" @@ -19,7 +17,7 @@ import ( "github.com/filecoin-project/lotus/storage/sealer/storiface" ) -var log = logging.Logger("provider") +//var log = logging.Logger("provider") func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig, api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, diff --git a/provider/lpmessage/sender.go b/provider/lpmessage/sender.go index f75f2a4b8..9f3d9104d 100644 --- a/provider/lpmessage/sender.go +++ b/provider/lpmessage/sender.go @@ -18,10 +18,6 @@ import ( var log = logging.Logger("lpmessage") -type str string // makes ctx value collissions impossible - -var CtxTaskID str = "task_id" - type SenderAPI interface { StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) @@ -103,14 +99,6 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS var sigMsg *types.SignedMessage - var idCount int - err = s.db.QueryRow(ctx, `SELECT COUNT(*) FROM harmony_test WHERE task_id=$1`, - ctx.Value(CtxTaskID)).Scan(&idCount) - if err != nil { - return cid.Undef, xerrors.Errorf("reading harmony_test: %w", err) - } - noSend := idCount == 1 - // start db tx c, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { // assign nonce (max(api.MpoolGetNonce, db nonce+1)) @@ -148,18 +136,6 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS return false, xerrors.Errorf("marshaling message: %w", err) } - if noSend { - log.Errorw("SKIPPED SENDING MESSAGE PER ENVIRONMENT VARIABLE - NOT PRODUCTION SAFE", - "from_key", fromA.String(), - "nonce", msg.Nonce, - "to_addr", msg.To.String(), - "signed_data", data, - "signed_json", string(jsonBytes), - "signed_cid", sigMsg.Cid(), - "send_reason", reason, - ) - return true, nil // nothing committed - } // write to db c, err := tx.Exec(`insert into message_sends (from_key, nonce, to_addr, signed_data, signed_json, signed_cid, send_reason) values ($1, $2, $3, $4, $5, $6, $7)`, fromA.String(), msg.Nonce, msg.To.String(), data, string(jsonBytes), sigMsg.Cid().String(), reason) @@ -176,9 +152,6 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS if err != nil || !c { return cid.Undef, xerrors.Errorf("transaction failed or didn't commit: %w", err) } - if noSend { - return sigMsg.Cid(), nil - } // push to mpool _, err = s.api.MpoolPush(ctx, sigMsg) diff --git a/provider/lpwindow/compute_task.go b/provider/lpwindow/compute_task.go index c3385533c..e9d582704 100644 --- a/provider/lpwindow/compute_task.go +++ b/provider/lpwindow/compute_task.go @@ -3,6 +3,7 @@ package lpwindow import ( "bytes" "context" + "encoding/json" "fmt" "sort" "strings" @@ -161,6 +162,33 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("marshaling PoSt: %w", err) } + testTaskIDCt := 0 + if err = t.db.QueryRow(context.Background(), `SELECT COUNT(*) FROM harmony_test WHERE task_id = $1`, taskID).Scan(&testTaskIDCt); err != nil { + return false, xerrors.Errorf("querying for test task: %w", err) + } + if testTaskIDCt == 1 { + // Do not send test tasks to the chain but to harmony_test & stdout. + + data, err := json.MarshalIndent(map[string]any{ + "sp_id": spID, + "proving_period_start": pps, + "deadline": deadline.Index, + "partition": partIdx, + "submit_at_epoch": deadline.Open, + "submit_by_epoch": deadline.Close, + "proof_params": msgbuf.Bytes(), + }, "", " ") + if err != nil { + return false, xerrors.Errorf("marshaling message: %w", err) + } + ctx := context.Background() + _, err = t.db.Exec(ctx, `UPDATE harmony_test SET result=$1 WHERE task_id=$2`, string(data), taskID) + if err != nil { + return false, xerrors.Errorf("updating harmony_test: %w", err) + } + log.Infof("SKIPPED sending test message to chain. SELECT * FROM harmony_test WHERE task_id= %v", taskID) + return true, nil // nothing committed + } // Insert into wdpost_proofs table n, err := t.db.Exec(context.Background(), `INSERT INTO wdpost_proofs ( @@ -178,7 +206,8 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done partIdx, deadline.Open, deadline.Close, - msgbuf.Bytes()) + msgbuf.Bytes(), + ) if err != nil { log.Errorf("WdPostTask.Do() failed to insert into wdpost_proofs: %v", err) diff --git a/provider/lpwindow/submit_task.go b/provider/lpwindow/submit_task.go index fab3a2708..72f2499f6 100644 --- a/provider/lpwindow/submit_task.go +++ b/provider/lpwindow/submit_task.go @@ -149,7 +149,7 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) return false, xerrors.Errorf("preparing proof message: %w", err) } - ctx := context.WithValue(context.Background(), lpmessage.CtxTaskID, taskID) + ctx := context.Background() smsg, err := w.sender.Send(ctx, msg, mss, "wdpost") if err != nil { return false, xerrors.Errorf("sending proof message: %w", err) diff --git a/storage/sealer/fr32/fr32_test.go b/storage/sealer/fr32/fr32_test.go index 437fa4e43..d5f094df4 100644 --- a/storage/sealer/fr32/fr32_test.go +++ b/storage/sealer/fr32/fr32_test.go @@ -2,8 +2,8 @@ package fr32_test import ( "bytes" + "crypto/rand" "io" - "math/rand" "os" "testing" @@ -70,10 +70,13 @@ func TestPadChunkFFI(t *testing.T) { } func TestPadChunkRandEqFFI(t *testing.T) { + for i := 0; i < 200; i++ { var input [127]byte - rand.Read(input[:]) - + _, err := rand.Read(input[:]) + if err != nil { + panic(err) + } var buf [128]byte fr32.Pad(input[:], buf[:]) @@ -109,8 +112,10 @@ func TestRoundtrip(t *testing.T) { func TestRoundtripChunkRand(t *testing.T) { for i := 0; i < 200; i++ { var input [127]byte - rand.Read(input[:]) - + _, err := rand.Read(input[:]) + if err != nil { + panic(err) + } var buf [128]byte copy(buf[:], input[:]) @@ -127,8 +132,10 @@ func TestRoundtrip16MRand(t *testing.T) { up := abi.PaddedPieceSize(16 << 20).Unpadded() input := make([]byte, up) - rand.Read(input[:]) - + _, err := rand.Read(input[:]) + if err != nil { + panic(err) + } buf := make([]byte, 16<<20) fr32.Pad(input, buf)