From 6bda3342df093b89b8463ca7501634c3d4f297b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 3 Nov 2023 12:42:42 +0100 Subject: [PATCH 01/11] lpwindow: prefix task with compute_ --- provider/lpwindow/{do.go => compute_do.go} | 0 provider/lpwindow/{task.go => compute_task.go} | 0 provider/lpwindow/{task_test.go => compute_task_test.go} | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename provider/lpwindow/{do.go => compute_do.go} (100%) rename provider/lpwindow/{task.go => compute_task.go} (100%) rename provider/lpwindow/{task_test.go => compute_task_test.go} (100%) diff --git a/provider/lpwindow/do.go b/provider/lpwindow/compute_do.go similarity index 100% rename from provider/lpwindow/do.go rename to provider/lpwindow/compute_do.go diff --git a/provider/lpwindow/task.go b/provider/lpwindow/compute_task.go similarity index 100% rename from provider/lpwindow/task.go rename to provider/lpwindow/compute_task.go diff --git a/provider/lpwindow/task_test.go b/provider/lpwindow/compute_task_test.go similarity index 100% rename from provider/lpwindow/task_test.go rename to provider/lpwindow/compute_task_test.go From ebec992ba84e4ffb23f5c61174356127b005700d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 3 Nov 2023 13:51:01 +0100 Subject: [PATCH 02/11] lpwindow wip send; minimal lpmessage send --- lib/harmony/harmonydb/sql/20231103.sql | 22 ++++ provider/lpmessage/sender.go | 165 +++++++++++++++++++++++++ provider/lpwindow/compute_task.go | 15 +-- provider/lpwindow/submit_task.go | 40 ++++++ 4 files changed, 230 insertions(+), 12 deletions(-) create mode 100644 lib/harmony/harmonydb/sql/20231103.sql create mode 100644 provider/lpmessage/sender.go create mode 100644 provider/lpwindow/submit_task.go diff --git a/lib/harmony/harmonydb/sql/20231103.sql b/lib/harmony/harmonydb/sql/20231103.sql new file mode 100644 index 000000000..669f122f4 --- /dev/null +++ b/lib/harmony/harmonydb/sql/20231103.sql @@ -0,0 +1,22 @@ +create table message_sends +( + from_key text not null, + nonce bigint not null, + to_addr text not null, + signed_data bytea not null, + signed_json jsonb not null, + signed_cid text not null, + send_time date default CURRENT_TIMESTAMP, + send_reason text, + send_success bool default false not null, + constraint message_sends_pk + primary key (from_key, nonce) +); + +comment on column message_sends.from_key is 'text f[1/3/4]... address'; +comment on column message_sends.nonce is 'assigned message nonce'; +comment on column message_sends.to_addr is 'text f[0/1/2/3/4]... address'; +comment on column message_sends.signed_data is 'signed message data'; +comment on column message_sends.signed_cid is 'signed message cid'; +comment on column message_sends.send_reason is 'optional description of send reason'; +comment on column message_sends.send_success is 'whether this message was broadcasted to the network already'; diff --git a/provider/lpmessage/sender.go b/provider/lpmessage/sender.go new file mode 100644 index 000000000..4e9a78aae --- /dev/null +++ b/provider/lpmessage/sender.go @@ -0,0 +1,165 @@ +package lpmessage + +import ( + "context" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" +) + +type SenderAPI interface { + StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) + GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) + WalletBalance(ctx context.Context, addr address.Address) (big.Int, error) + MpoolGetNonce(context.Context, address.Address) (uint64, error) + MpoolSend(context.Context, *types.SignedMessage) (cid.Cid, error) +} + +type SignerAPI interface { + WalletSignMessage(context.Context, address.Address, *types.Message) (*types.SignedMessage, error) +} + +// Sender abstracts away highly-available message sending with coordination through +// HarmonyDB. It make sure that nonces are assigned transactionally, and that +// messages are correctly broadcasted to the network. It is not a Task in the sense +// of a HarmonyTask interface, just a helper for tasks which need to send messages +// to the network. +type Sender struct { + api SenderAPI + signer SignerAPI + + db *harmonydb.DB +} + +// NewSender creates a new Sender. +func NewSender(api SenderAPI, signer SignerAPI, db *harmonydb.DB) *Sender { + return &Sender{ + api: api, + signer: signer, + + db: db, + } +} + +// Send atomically assigns a nonce, signs, and pushes a message +// to mempool. +// maxFee is only used when GasFeeCap/GasPremium fields aren't specified +// +// When maxFee is set to 0, Send will guess appropriate fee +// based on current chain conditions +// +// Send behaves much like fullnodeApi.MpoolPushMessage, but it coordinates +// through HarmonyDB, making it safe to broadcast messages from multiple independent +// API nodes +// +// Send is also currently more strict about required parameters than MpoolPushMessage +func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageSendSpec, reason string) (cid.Cid, error) { + if mss == nil { + return cid.Undef, xerrors.Errorf("MessageSendSpec cannot be nil") + } + if (mss.MsgUuid != uuid.UUID{}) { + return cid.Undef, xerrors.Errorf("MessageSendSpec.MsgUuid must be zero") + } + + fromA, err := s.api.StateAccountKey(ctx, msg.From, types.EmptyTSK) + if err != nil { + return cid.Undef, xerrors.Errorf("getting key address: %w", err) + } + + if msg.Nonce != 0 { + return cid.Undef, xerrors.Errorf("Send expects message nonce to be 0, was %d", msg.Nonce) + } + + msg, err = s.api.GasEstimateMessageGas(ctx, msg, mss, types.EmptyTSK) + if err != nil { + return cid.Undef, xerrors.Errorf("GasEstimateMessageGas error: %w", err) + } + + b, err := s.api.WalletBalance(ctx, msg.From) + if err != nil { + return cid.Undef, xerrors.Errorf("mpool push: getting origin balance: %w", err) + } + + requiredFunds := big.Add(msg.Value, msg.RequiredFunds()) + if b.LessThan(requiredFunds) { + return cid.Undef, xerrors.Errorf("mpool push: not enough funds: %s < %s", b, requiredFunds) + } + + var sigMsg *types.SignedMessage + + // start db tx + c, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + // assign nonce (max(api.MpoolGetNonce, db nonce+1)) + msgNonce, err := s.api.MpoolGetNonce(ctx, fromA) + if err != nil { + return false, xerrors.Errorf("getting nonce from mpool: %w", err) + } + + // get nonce from db + var dbNonce uint64 + r := tx.QueryRow(`select max(nonce) from message_sends where from_key = $1`, fromA.String()) + if err := r.Scan(&dbNonce); err != nil { + return false, xerrors.Errorf("getting nonce from db: %w", err) + } + + if dbNonce+1 > msgNonce { + msgNonce = dbNonce + 1 + } + + msg.Nonce = msgNonce + + // sign message + sigMsg, err = s.signer.WalletSignMessage(ctx, msg.From, msg) + if err != nil { + return false, xerrors.Errorf("signing message: %w", err) + } + + data, err := sigMsg.Serialize() + if err != nil { + return false, xerrors.Errorf("serializing message: %w", err) + } + + jsonBytes, err := sigMsg.MarshalJSON() + if err != nil { + return false, xerrors.Errorf("marshaling message: %w", err) + } + + // write to db + c, err := tx.Exec(`insert into message_sends (from_key, nonce, to_addr, signed_data, signed_json, signed_cid, send_reason) values ($1, $2, $3, $4, $5, $6, $7)`, + fromA.String(), msg.Nonce, msg.To.String(), data, string(jsonBytes), sigMsg.Cid().String(), reason) + if err != nil { + return false, xerrors.Errorf("inserting message into db: %w", err) + } + if c != 1 { + return false, xerrors.Errorf("inserting message into db: expected 1 row to be affected, got %d", c) + } + + // commit + return true, nil + }) + if err != nil || !c { + return cid.Undef, xerrors.Errorf("transaction failed or didn't commit: %w", err) + } + + // push to mpool + _, err = s.api.MpoolSend(ctx, sigMsg) + if err != nil { + return cid.Undef, xerrors.Errorf("mpool push: failed to push message: %w", err) + } + + // update db recocd to say it was pushed (set send_success to true) + cn, err := s.db.Exec(ctx, `update message_sends set send_success = true where from_key = $1 and nonce = $2`, fromA.String(), msg.Nonce) + if err != nil { + return cid.Undef, xerrors.Errorf("updating db record: %w", err) + } + if cn != 1 { + return cid.Undef, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", c) + } + + return cid.Undef, nil +} diff --git a/provider/lpwindow/compute_task.go b/provider/lpwindow/compute_task.go index e283dbd18..0e59d5d2b 100644 --- a/provider/lpwindow/compute_task.go +++ b/provider/lpwindow/compute_task.go @@ -4,14 +4,12 @@ import ( "bytes" "context" "fmt" - "sort" - "strings" - "time" - "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/lotus/storage/sealer" + "sort" + "strings" logging "github.com/ipfs/go-log/v2" "github.com/samber/lo" @@ -83,11 +81,7 @@ type wdTaskIdentity struct { } func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { - - log.Errorw("WDPOST DO", "taskID", taskID) - - time.Sleep(5 * time.Second) - log.Errorf("WdPostTask.Do() called with taskID: %v", taskID) + log.Debugw("WdPostTask.Do() called with taskID: %v", taskID) var spID, pps, dlIdx, partIdx uint64 @@ -200,9 +194,6 @@ func entToStr[T any](t T, i int) string { } func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { - - log.Errorw("WDPOST CANACCEPT", "ids", ids) - // GetEpoch ts, err := t.api.ChainHead(context.Background()) diff --git a/provider/lpwindow/submit_task.go b/provider/lpwindow/submit_task.go new file mode 100644 index 000000000..de0ef94c9 --- /dev/null +++ b/provider/lpwindow/submit_task.go @@ -0,0 +1,40 @@ +package lpwindow + +import ( + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" +) + +type WdPostSubmitTask struct { +} + +func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + //TODO implement me + panic("implement me") +} + +func (w *WdPostSubmitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + //TODO implement me + panic("implement me") +} + +func (w *WdPostSubmitTask) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Max: 128, + Name: "WdPostSubmit", + Cost: resources.Resources{ + Cpu: 0, + Gpu: 0, + Ram: 0, + }, + MaxFailures: 10, + Follows: nil, // ?? + } +} + +func (w *WdPostSubmitTask) Adder(taskFunc harmonytask.AddTaskFunc) { + //TODO implement me + panic("implement me") +} + +var _ harmonytask.TaskInterface = &WdPostSubmitTask{} From 81603a67f38daac3bbc24e1ab308427d306fda17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 3 Nov 2023 21:53:15 +0100 Subject: [PATCH 03/11] lpwindow: Submit task Adder --- lib/harmony/harmonydb/sql/20230823.sql | 7 ++- provider/lpwindow/compute_task.go | 37 --------------- provider/lpwindow/submit_task.go | 65 ++++++++++++++++++++++++-- 3 files changed, 68 insertions(+), 41 deletions(-) diff --git a/lib/harmony/harmonydb/sql/20230823.sql b/lib/harmony/harmonydb/sql/20230823.sql index c16294bf2..fa402dbd3 100644 --- a/lib/harmony/harmonydb/sql/20230823.sql +++ b/lib/harmony/harmonydb/sql/20230823.sql @@ -24,5 +24,10 @@ create table wdpost_proofs partition bigint not null, submit_at_epoch bigint not null, submit_by_epoch bigint not null, - proof_message bytea + proof_message bytea, + + submit_task_id bigint not null, + + constraint wdpost_proofs_identity_key + unique (sp_id, deadline, partition) ); diff --git a/provider/lpwindow/compute_task.go b/provider/lpwindow/compute_task.go index 0e59d5d2b..05cc9ae32 100644 --- a/provider/lpwindow/compute_task.go +++ b/provider/lpwindow/compute_task.go @@ -149,43 +149,6 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done deadline.Close, msgbuf.Bytes()) - /*submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts) - if err != nil { - log.Errorf("WdPostTask.Do() failed to runPoStCycle: %v", err) - return false, err - } - - log.Errorf("WdPostTask.Do() called with taskID: %v, submitWdPostParams: %v", taskID, submitWdPostParams) - - // Enter an entry for each wdpost message proof into the wdpost_proofs table - for _, params := range submitWdPostParams { - - // Convert submitWdPostParams.Partitions to a byte array using CBOR - buf := new(bytes.Buffer) - scratch := make([]byte, 9) - if err := cbg.WriteMajorTypeHeaderBuf(scratch, buf, cbg.MajArray, uint64(len(params.Partitions))); err != nil { - return false, err - } - for _, v := range params.Partitions { - if err := v.MarshalCBOR(buf); err != nil { - return false, err - } - } - - // Insert into wdpost_proofs table - _, err = t.db.Exec(context.Background(), - `INSERT INTO wdpost_proofs ( - deadline, - partitions, - proof_type, - proof_bytes) - VALUES ($1, $2, $3, $4)`, - params.Deadline, - buf.Bytes(), - params.Proofs[0].PoStProof, - params.Proofs[0].ProofBytes) - }*/ - return true, nil } diff --git a/provider/lpwindow/submit_task.go b/provider/lpwindow/submit_task.go index de0ef94c9..2369a6759 100644 --- a/provider/lpwindow/submit_task.go +++ b/provider/lpwindow/submit_task.go @@ -1,11 +1,21 @@ package lpwindow import ( + "context" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/lib/promise" + "github.com/filecoin-project/lotus/provider/chainsched" + "github.com/filecoin-project/lotus/provider/lpmessage" ) type WdPostSubmitTask struct { + sender *lpmessage.Sender + db *harmonydb.DB + + submitPoStTF promise.Promise[harmonytask.AddTaskFunc] } func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { @@ -18,6 +28,18 @@ func (w *WdPostSubmitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonyta panic("implement me") } +func NewWdPostSubmitTask(pcs *chainsched.ProviderChainSched, send *lpmessage.Sender) (*WdPostSubmitTask, error) { + res := &WdPostSubmitTask{ + sender: send, + } + + if err := pcs.AddHandler(res.processHeadChange); err != nil { + return nil, err + } + + return res, nil +} + func (w *WdPostSubmitTask) TypeDetails() harmonytask.TaskTypeDetails { return harmonytask.TaskTypeDetails{ Max: 128, @@ -25,7 +47,7 @@ func (w *WdPostSubmitTask) TypeDetails() harmonytask.TaskTypeDetails { Cost: resources.Resources{ Cpu: 0, Gpu: 0, - Ram: 0, + Ram: 10 << 20, }, MaxFailures: 10, Follows: nil, // ?? @@ -33,8 +55,45 @@ func (w *WdPostSubmitTask) TypeDetails() harmonytask.TaskTypeDetails { } func (w *WdPostSubmitTask) Adder(taskFunc harmonytask.AddTaskFunc) { - //TODO implement me - panic("implement me") + w.submitPoStTF.Set(taskFunc) +} + +func (w *WdPostSubmitTask) processHeadChange(ctx context.Context, revert, apply *types.TipSet) error { + tf := w.submitPoStTF.Val(ctx) + + qry, err := w.db.Query(ctx, `select sp_id, deadline, partition, submit_at_epoch from wdpost_proofs where submit_task_id is null and submit_at_epoch <= $1`, apply.Height()) + if err != nil { + return err + } + defer qry.Close() + + for qry.Next() { + var spID int64 + var deadline uint64 + var partition uint64 + var submitAtEpoch uint64 + if err := qry.Scan(&spID, &deadline, &partition, &submitAtEpoch); err != nil { + return err + } + + tf(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) { + // update in transaction iff submit_task_id is still null + res, err := tx.Exec(`update wdpost_proofs set submit_task_id = $1 where sp_id = $2 and deadline = $3 and partition = $4 and submit_task_id is null`, id, spID, deadline, partition) + if err != nil { + return false, err + } + if res != 1 { + return false, nil + } + + return true, nil + }) + } + if err := qry.Err(); err != nil { + return err + } + + return nil } var _ harmonytask.TaskInterface = &WdPostSubmitTask{} From c1add7d5eb9c99aa9255fa0659e275981f4b3505 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 3 Nov 2023 22:00:19 +0100 Subject: [PATCH 04/11] lpwindow: Submit CanAccept --- provider/lpwindow/submit_task.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/provider/lpwindow/submit_task.go b/provider/lpwindow/submit_task.go index 2369a6759..5889a74b7 100644 --- a/provider/lpwindow/submit_task.go +++ b/provider/lpwindow/submit_task.go @@ -24,8 +24,17 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) } func (w *WdPostSubmitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { - //TODO implement me - panic("implement me") + if len(ids) == 0 { + // probably can't happen, but panicking is bad + return nil, nil + } + + if w.sender == nil { + // we can't send messages + return nil, nil + } + + return &ids[0], nil } func NewWdPostSubmitTask(pcs *chainsched.ProviderChainSched, send *lpmessage.Sender) (*WdPostSubmitTask, error) { From a686a995f6472d188dde8a30e4e230f64a1cb23a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 3 Nov 2023 22:40:28 +0100 Subject: [PATCH 05/11] lpwindow: Submit Do --- cmd/lotus-provider/run.go | 4 +- lib/harmony/harmonydb/sql/20230823.sql | 1 + provider/builder.go | 16 ++- provider/lpmessage/sender.go | 9 +- provider/lpwindow/compute_task.go | 60 ++++----- provider/lpwindow/submit_task.go | 165 ++++++++++++++++++++++--- 6 files changed, 203 insertions(+), 52 deletions(-) diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index e4ae8871e..ca5d9e9d1 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -245,12 +245,12 @@ var runCmd = &cli.Command{ { if cfg.Subsystems.EnableWindowPost { - wdPostTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, + wdPostTask, wdPoStSubmitTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks) if err != nil { return err } - activeTasks = append(activeTasks, wdPostTask) + activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask) } } taskEngine, err := harmonytask.New(db, activeTasks, listenAddr) diff --git a/lib/harmony/harmonydb/sql/20230823.sql b/lib/harmony/harmonydb/sql/20230823.sql index fa402dbd3..a9486567c 100644 --- a/lib/harmony/harmonydb/sql/20230823.sql +++ b/lib/harmony/harmonydb/sql/20230823.sql @@ -27,6 +27,7 @@ create table wdpost_proofs proof_message bytea, submit_task_id bigint not null, + message_cid text not null, constraint wdpost_proofs_identity_key unique (sp_id, deadline, partition) diff --git a/provider/builder.go b/provider/builder.go index c8944c296..352c533d6 100644 --- a/provider/builder.go +++ b/provider/builder.go @@ -2,6 +2,7 @@ package provider import ( "context" + "github.com/filecoin-project/lotus/provider/lpmessage" "time" "github.com/filecoin-project/lotus/storage/paths" @@ -23,19 +24,26 @@ var log = logging.Logger("provider") func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig, api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, - as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, error) { + as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, *lpwindow.WdPostSubmitTask, error) { chainSched := chainsched.New(api) // todo config ft := lpwindow.NewSimpleFaultTracker(stor, idx, 32, 5*time.Second, 300*time.Second) - task, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, maddr, max) + sender := lpmessage.NewSender(api, api, db) + + computeTask, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, maddr, max) if err != nil { - return nil, err + return nil, nil, err + } + + submitTask, err := lpwindow.NewWdPostSubmitTask(chainSched, sender, db, api, fc.MaxWindowPoStGasFee, as) + if err != nil { + return nil, nil, err } go chainSched.Run(ctx) - return task, nil + return computeTask, submitTask, nil } diff --git a/provider/lpmessage/sender.go b/provider/lpmessage/sender.go index 4e9a78aae..012499f45 100644 --- a/provider/lpmessage/sender.go +++ b/provider/lpmessage/sender.go @@ -9,15 +9,18 @@ import ( "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/google/uuid" "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" ) +var log = logging.Logger("lpmessage") + type SenderAPI interface { StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) WalletBalance(ctx context.Context, addr address.Address) (big.Int, error) MpoolGetNonce(context.Context, address.Address) (uint64, error) - MpoolSend(context.Context, *types.SignedMessage) (cid.Cid, error) + MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error) } type SignerAPI interface { @@ -147,7 +150,7 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS } // push to mpool - _, err = s.api.MpoolSend(ctx, sigMsg) + _, err = s.api.MpoolPush(ctx, sigMsg) if err != nil { return cid.Undef, xerrors.Errorf("mpool push: failed to push message: %w", err) } @@ -161,5 +164,7 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS return cid.Undef, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", c) } + log.Infow("sent message", "cid", sigMsg.Cid(), "from", fromA, "to", msg.To, "nonce", msg.Nonce, "value", msg.Value, "gaslimit", msg.GasLimit) + return cid.Undef, nil } diff --git a/provider/lpwindow/compute_task.go b/provider/lpwindow/compute_task.go index 05cc9ae32..5a805df07 100644 --- a/provider/lpwindow/compute_task.go +++ b/provider/lpwindow/compute_task.go @@ -80,8 +80,37 @@ type wdTaskIdentity struct { Partition_index uint64 } +func NewWdPostTask(db *harmonydb.DB, + api WDPoStAPI, + faultTracker sealer.FaultTracker, + prover ProverPoSt, + verifier storiface.Verifier, + + pcs *chainsched.ProviderChainSched, + actors []dtypes.MinerAddress, + max int, +) (*WdPostTask, error) { + t := &WdPostTask{ + db: db, + api: api, + + faultTracker: faultTracker, + prover: prover, + verifier: verifier, + + actors: actors, + max: max, + } + + if err := pcs.AddHandler(t.processHeadChange); err != nil { + return nil, err + } + + return t, nil +} + func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { - log.Debugw("WdPostTask.Do() called with taskID: %v", taskID) + log.Debugw("WdPostTask.Do()", "taskID", taskID) var spID, pps, dlIdx, partIdx uint64 @@ -333,35 +362,6 @@ func (t *WdPostTask) processHeadChange(ctx context.Context, revert, apply *types return nil } -func NewWdPostTask(db *harmonydb.DB, - api WDPoStAPI, - faultTracker sealer.FaultTracker, - prover ProverPoSt, - verifier storiface.Verifier, - - pcs *chainsched.ProviderChainSched, - actors []dtypes.MinerAddress, - max int, -) (*WdPostTask, error) { - t := &WdPostTask{ - db: db, - api: api, - - faultTracker: faultTracker, - prover: prover, - verifier: verifier, - - actors: actors, - max: max, - } - - if err := pcs.AddHandler(t.processHeadChange); err != nil { - return nil, err - } - - return t, nil -} - func (t *WdPostTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIdentity, tx *harmonydb.Tx) (bool, error) { _, err := tx.Exec( diff --git a/provider/lpwindow/submit_task.go b/provider/lpwindow/submit_task.go index 5889a74b7..4cb019a5a 100644 --- a/provider/lpwindow/submit_task.go +++ b/provider/lpwindow/submit_task.go @@ -1,7 +1,14 @@ package lpwindow import ( + "bytes" "context" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/go-state-types/builtin" + "github.com/filecoin-project/go-state-types/builtin/v9/miner" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" @@ -9,18 +16,160 @@ import ( "github.com/filecoin-project/lotus/lib/promise" "github.com/filecoin-project/lotus/provider/chainsched" "github.com/filecoin-project/lotus/provider/lpmessage" + "github.com/filecoin-project/lotus/storage/ctladdr" + "golang.org/x/xerrors" ) +type WdPoStSubmitTaskApi interface { + ChainHead(context.Context) (*types.TipSet, error) + + WalletBalance(context.Context, address.Address) (types.BigInt, error) + WalletHas(context.Context, address.Address) (bool, error) + + StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error) + StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) + StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) + + GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error) + GasEstimateGasPremium(_ context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) +} + type WdPostSubmitTask struct { sender *lpmessage.Sender db *harmonydb.DB + api WdPoStSubmitTaskApi + + maxWindowPoStGasFee types.FIL + as *ctladdr.AddressSelector submitPoStTF promise.Promise[harmonytask.AddTaskFunc] } +func NewWdPostSubmitTask(pcs *chainsched.ProviderChainSched, send *lpmessage.Sender, db *harmonydb.DB, api WdPoStSubmitTaskApi, maxWindowPoStGasFee types.FIL, as *ctladdr.AddressSelector) (*WdPostSubmitTask, error) { + res := &WdPostSubmitTask{ + sender: send, + db: db, + api: api, + + maxWindowPoStGasFee: maxWindowPoStGasFee, + as: as, + } + + if err := pcs.AddHandler(res.processHeadChange); err != nil { + return nil, err + } + + return res, nil +} + func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { - //TODO implement me - panic("implement me") + log.Debugw("WdPostSubmitTask.Do", "taskID", taskID) + + var spID uint64 + var deadline uint64 + var partition uint64 + var submitAtEpoch, submitByEpoch abi.ChainEpoch + var proofParamBytes []byte + var dbTask uint64 + + err = w.db.QueryRow( + context.Background(), `select sp_id, deadline, partition, submit_at_epoch, submit_by_epoch, proof_message, submit_task_id + from wdpost_proofs where sp_id = $1 and deadline = $2 and partition = $3`, spID, deadline, partition, + ).Scan(&spID, &deadline, &partition, &submitAtEpoch, &submitByEpoch, &proofParamBytes, &dbTask) + if err != nil { + return false, xerrors.Errorf("query post proof: %w", err) + } + + if dbTask != uint64(taskID) { + return false, xerrors.Errorf("taskID mismatch: %d != %d", dbTask, taskID) + } + + head, err := w.api.ChainHead(context.Background()) + if err != nil { + return false, xerrors.Errorf("getting chain head: %w", err) + } + + if head.Height() > submitByEpoch { + // we missed the deadline, no point in submitting + log.Errorw("missed submit deadline", "spID", spID, "deadline", deadline, "partition", partition, "submitByEpoch", submitByEpoch, "headHeight", head.Height()) + return true, nil + } + + if head.Height() < submitAtEpoch { + log.Errorw("submit epoch not reached", "spID", spID, "deadline", deadline, "partition", partition, "submitAtEpoch", submitAtEpoch, "headHeight", head.Height()) + return false, xerrors.Errorf("submit epoch not reached: %d < %d", head.Height(), submitAtEpoch) + } + + var params miner.SubmitWindowedPoStParams + if err := params.UnmarshalCBOR(bytes.NewReader(proofParamBytes)); err != nil { + return false, xerrors.Errorf("unmarshaling proof message: %w", err) + } + + maddr, err := address.NewIDAddress(spID) + if err != nil { + return false, xerrors.Errorf("invalid miner address: %w", err) + } + + mi, err := w.api.StateMinerInfo(context.Background(), maddr, types.EmptyTSK) + if err != nil { + return false, xerrors.Errorf("error getting miner info: %w", err) + } + + msg := &types.Message{ + To: maddr, + Method: builtin.MethodsMiner.SubmitWindowedPoSt, + Params: proofParamBytes, + Value: big.Zero(), + + From: mi.Worker, // set worker for now for gas estimation + } + + // calculate a more frugal estimation; premium is estimated to guarantee + // inclusion within 5 tipsets, and fee cap is estimated for inclusion + // within 4 tipsets. + minGasFeeMsg := *msg + + minGasFeeMsg.GasPremium, err = w.api.GasEstimateGasPremium(context.Background(), 5, msg.From, msg.GasLimit, types.EmptyTSK) + if err != nil { + log.Errorf("failed to estimate minimum gas premium: %+v", err) + minGasFeeMsg.GasPremium = msg.GasPremium + } + + minGasFeeMsg.GasFeeCap, err = w.api.GasEstimateFeeCap(context.Background(), &minGasFeeMsg, 4, types.EmptyTSK) + if err != nil { + log.Errorf("failed to estimate minimum gas fee cap: %+v", err) + minGasFeeMsg.GasFeeCap = msg.GasFeeCap + } + + // goodFunds = funds needed for optimal inclusion probability. + // minFunds = funds needed for more speculative inclusion probability. + goodFunds := big.Add(msg.RequiredFunds(), msg.Value) + minFunds := big.Min(big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value), goodFunds) + + from, _, err := w.as.AddressFor(context.Background(), w.api, mi, api.PoStAddr, goodFunds, minFunds) + if err != nil { + return false, xerrors.Errorf("error getting address: %w", err) + } + + msg.From = from + + mss := &api.MessageSendSpec{ + MaxFee: abi.TokenAmount(w.maxWindowPoStGasFee), + } + + smsg, err := w.sender.Send(context.Background(), msg, mss, "wdpost") + if err != nil { + return false, xerrors.Errorf("sending proof message: %w", err) + } + + // set message_cid in the wdpost_proofs entry + + _, err = w.db.Exec(context.Background(), `update wdpost_proofs set message_cid = $1 where sp_id = $2 and deadline = $3 and partition = $4`, smsg.String(), spID, deadline, partition) + if err != nil { + return true, xerrors.Errorf("updating wdpost_proofs: %w", err) + } + + return true, nil } func (w *WdPostSubmitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { @@ -37,18 +186,6 @@ func (w *WdPostSubmitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonyta return &ids[0], nil } -func NewWdPostSubmitTask(pcs *chainsched.ProviderChainSched, send *lpmessage.Sender) (*WdPostSubmitTask, error) { - res := &WdPostSubmitTask{ - sender: send, - } - - if err := pcs.AddHandler(res.processHeadChange); err != nil { - return nil, err - } - - return res, nil -} - func (w *WdPostSubmitTask) TypeDetails() harmonytask.TaskTypeDetails { return harmonytask.TaskTypeDetails{ Max: 128, From 4f9e168017bf9914f3619e1141c0fbb851aa7294 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 4 Nov 2023 11:04:46 +0100 Subject: [PATCH 06/11] harmony: Fix resources.Register --- cmd/lotus-provider/run.go | 10 +++++----- lib/harmony/harmonydb/sql/20230719.sql | 3 +-- lib/harmony/resources/resources.go | 15 ++++++++++----- node/config/doc_gen.go | 20 ++++++++------------ provider/builder.go | 7 +++---- provider/lpmessage/sender.go | 13 ++++++++----- provider/lpwindow/compute_do.go | 17 ++++++++++------- provider/lpwindow/compute_task.go | 8 ++++---- provider/lpwindow/faults_simple.go | 13 ++++++++----- provider/lpwindow/submit_task.go | 5 ++++- 10 files changed, 61 insertions(+), 50 deletions(-) diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index ca5d9e9d1..16683c50e 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -9,21 +9,21 @@ import ( "strings" "time" - "github.com/filecoin-project/go-statestore" "github.com/gbrlsnchs/jwt/v3" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - "golang.org/x/xerrors" - "github.com/gin-contrib/pprof" "github.com/gin-gonic/gin" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" "github.com/pkg/errors" "github.com/urfave/cli/v2" "go.opencensus.io/stats" "go.opencensus.io/tag" + "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-jsonrpc/auth" + "github.com/filecoin-project/go-statestore" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" diff --git a/lib/harmony/harmonydb/sql/20230719.sql b/lib/harmony/harmonydb/sql/20230719.sql index d4eb58326..e7b1795c5 100644 --- a/lib/harmony/harmonydb/sql/20230719.sql +++ b/lib/harmony/harmonydb/sql/20230719.sql @@ -6,8 +6,7 @@ CREATE TABLE harmony_machines ( host_and_port varchar(300) NOT NULL, cpu INTEGER NOT NULL, ram BIGINT NOT NULL, - gpu FLOAT NOT NULL, - gpuram BIGINT NOT NULL + gpu FLOAT NOT NULL ); CREATE TABLE harmony_task ( diff --git a/lib/harmony/resources/resources.go b/lib/harmony/resources/resources.go index 140fa3e04..109863173 100644 --- a/lib/harmony/resources/resources.go +++ b/lib/harmony/resources/resources.go @@ -3,6 +3,7 @@ package resources import ( "bytes" "context" + "golang.org/x/xerrors" "os/exec" "regexp" "runtime" @@ -42,7 +43,7 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) { } ctx := context.Background() { // Learn our owner_id while updating harmony_machines - var ownerID int + var ownerID *int // Upsert query with last_contact update, fetch the machine ID // (note this isn't a simple insert .. on conflict because host_and_port isn't unique) @@ -54,7 +55,7 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) { RETURNING id ), inserted AS ( - INSERT INTO harmony_machines (host_and_port, cpu, ram, gpu, gpuram, last_contact) + INSERT INTO harmony_machines (host_and_port, cpu, ram, gpu, last_contact) SELECT $1, $2, $3, $4, CURRENT_TIMESTAMP WHERE NOT EXISTS (SELECT id FROM upsert) RETURNING id @@ -64,10 +65,13 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) { SELECT id FROM inserted; `, hostnameAndPort, reg.Cpu, reg.Ram, reg.Gpu).Scan(&ownerID) if err != nil { - return nil, err + return nil, xerrors.Errorf("inserting machine entry: %w", err) + } + if ownerID == nil { + return nil, xerrors.Errorf("no owner id") } - reg.MachineID = ownerID + reg.MachineID = *ownerID cleaned := CleanupMachines(context.Background(), db) logger.Infow("Cleaned up machines", "count", cleaned) @@ -87,9 +91,10 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) { return ®, nil } + func CleanupMachines(ctx context.Context, db *harmonydb.DB) int { ct, err := db.Exec(ctx, `DELETE FROM harmony_machines WHERE last_contact < $1`, - time.Now().Add(-1*LOOKS_DEAD_TIMEOUT)) + time.Now().Add(-1*LOOKS_DEAD_TIMEOUT).UTC()) if err != nil { logger.Warn("unable to delete old machines: ", err) } diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 14c0325c1..ecc95ddea 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -36,6 +36,14 @@ var Doc = map[string][]DocField{ Comment: `FULLNODE_API_INFO is the API endpoint for the Lotus daemon.`, }, + { + Name: "StorageRPCSecret", + Type: "string", + + Comment: `RPC Secret for the storage subsystem. +If integrating with lotus-miner this must match the value from +cat ~/.lotusminer/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU | jq -r .PrivateKey`, + }, }, "Backup": { { @@ -748,18 +756,6 @@ over the worker address if this flag is set.`, Comment: ``, }, - { - Name: "SealingParams", - Type: "SealingConfig", - - Comment: ``, - }, - { - Name: "SealerConfig", - Type: "//", - - Comment: ``, - }, { Name: "Journal", Type: "JournalConfig", diff --git a/provider/builder.go b/provider/builder.go index 352c533d6..37b4afae9 100644 --- a/provider/builder.go +++ b/provider/builder.go @@ -2,12 +2,8 @@ package provider import ( "context" - "github.com/filecoin-project/lotus/provider/lpmessage" "time" - "github.com/filecoin-project/lotus/storage/paths" - "github.com/filecoin-project/lotus/storage/sealer" - logging "github.com/ipfs/go-log/v2" "github.com/filecoin-project/lotus/api" @@ -15,8 +11,11 @@ import ( "github.com/filecoin-project/lotus/node/config" dtypes "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/provider/chainsched" + "github.com/filecoin-project/lotus/provider/lpmessage" "github.com/filecoin-project/lotus/provider/lpwindow" "github.com/filecoin-project/lotus/storage/ctladdr" + "github.com/filecoin-project/lotus/storage/paths" + "github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) diff --git a/provider/lpmessage/sender.go b/provider/lpmessage/sender.go index 012499f45..b839f8003 100644 --- a/provider/lpmessage/sender.go +++ b/provider/lpmessage/sender.go @@ -2,15 +2,18 @@ package lpmessage import ( "context" - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-state-types/big" - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/google/uuid" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/big" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" ) var log = logging.Logger("lpmessage") diff --git a/provider/lpwindow/compute_do.go b/provider/lpwindow/compute_do.go index ffcccc628..1b4d440a0 100644 --- a/provider/lpwindow/compute_do.go +++ b/provider/lpwindow/compute_do.go @@ -3,6 +3,14 @@ package lpwindow import ( "bytes" "context" + "sort" + "sync" + "time" + + "github.com/ipfs/go-cid" + "go.uber.org/multierr" + "golang.org/x/xerrors" + ffi "github.com/filecoin-project/filecoin-ffi" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-bitfield" @@ -12,16 +20,11 @@ import ( "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/proof" + proof7 "github.com/filecoin-project/specs-actors/v7/actors/runtime/proof" + "github.com/filecoin-project/lotus/build" types "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/storage/sealer/storiface" - proof7 "github.com/filecoin-project/specs-actors/v7/actors/runtime/proof" - "github.com/ipfs/go-cid" - "go.uber.org/multierr" - "golang.org/x/xerrors" - "sort" - "sync" - "time" ) const disablePreChecks = false // todo config diff --git a/provider/lpwindow/compute_task.go b/provider/lpwindow/compute_task.go index 5a805df07..a5baeb20a 100644 --- a/provider/lpwindow/compute_task.go +++ b/provider/lpwindow/compute_task.go @@ -4,10 +4,6 @@ import ( "bytes" "context" "fmt" - "github.com/filecoin-project/go-bitfield" - "github.com/filecoin-project/go-state-types/crypto" - "github.com/filecoin-project/go-state-types/network" - "github.com/filecoin-project/lotus/storage/sealer" "sort" "strings" @@ -16,9 +12,12 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/builtin/v9/miner" + "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/dline" + "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" @@ -29,6 +28,7 @@ import ( "github.com/filecoin-project/lotus/lib/promise" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/provider/chainsched" + "github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/wdpost" diff --git a/provider/lpwindow/faults_simple.go b/provider/lpwindow/faults_simple.go index b79a7dcf7..d43e8ee19 100644 --- a/provider/lpwindow/faults_simple.go +++ b/provider/lpwindow/faults_simple.go @@ -4,13 +4,16 @@ import ( "context" "crypto/rand" "fmt" - ffi "github.com/filecoin-project/filecoin-ffi" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/storage/paths" - "github.com/filecoin-project/lotus/storage/sealer/storiface" - "golang.org/x/xerrors" "sync" "time" + + "golang.org/x/xerrors" + + ffi "github.com/filecoin-project/filecoin-ffi" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/storage/paths" + "github.com/filecoin-project/lotus/storage/sealer/storiface" ) type SimpleFaultTracker struct { diff --git a/provider/lpwindow/submit_task.go b/provider/lpwindow/submit_task.go index 4cb019a5a..dad7ca2f7 100644 --- a/provider/lpwindow/submit_task.go +++ b/provider/lpwindow/submit_task.go @@ -3,11 +3,15 @@ package lpwindow import ( "bytes" "context" + + "golang.org/x/xerrors" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/go-state-types/builtin/v9/miner" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" @@ -17,7 +21,6 @@ import ( "github.com/filecoin-project/lotus/provider/chainsched" "github.com/filecoin-project/lotus/provider/lpmessage" "github.com/filecoin-project/lotus/storage/ctladdr" - "golang.org/x/xerrors" ) type WdPoStSubmitTaskApi interface { From edd95f6524a0957c6ada0f30c2419e9c44d97186 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 4 Nov 2023 12:32:27 +0100 Subject: [PATCH 07/11] lpwindow: IT SUBMITS --- lib/harmony/harmonydb/sql/20230823.sql | 19 +++---- lib/harmony/harmonytask/task_type_handler.go | 2 +- provider/lpmessage/sender.go | 10 ++-- provider/lpwindow/compute_task.go | 17 +++++-- provider/lpwindow/submit_task.go | 52 ++++++++++++++------ 5 files changed, 69 insertions(+), 31 deletions(-) diff --git a/lib/harmony/harmonydb/sql/20230823.sql b/lib/harmony/harmonydb/sql/20230823.sql index a9486567c..d2d954c7c 100644 --- a/lib/harmony/harmonydb/sql/20230823.sql +++ b/lib/harmony/harmonydb/sql/20230823.sql @@ -19,16 +19,17 @@ comment on column wdpost_partition_tasks.partition_index is 'partition index wit create table wdpost_proofs ( - sp_id bigint not null, - deadline bigint not null, - partition bigint not null, - submit_at_epoch bigint not null, - submit_by_epoch bigint not null, - proof_message bytea, + sp_id bigint not null, + proving_period_start bigint not null, + deadline bigint not null, + partition bigint not null, + submit_at_epoch bigint not null, + submit_by_epoch bigint not null, + proof_message bytea, - submit_task_id bigint not null, - message_cid text not null, + submit_task_id bigint, + message_cid text, constraint wdpost_proofs_identity_key - unique (sp_id, deadline, partition) + unique (sp_id, proving_period_start, deadline, partition) ); diff --git a/lib/harmony/harmonytask/task_type_handler.go b/lib/harmony/harmonytask/task_type_handler.go index 6ab7a031a..09540913d 100644 --- a/lib/harmony/harmonytask/task_type_handler.go +++ b/lib/harmony/harmonytask/task_type_handler.go @@ -115,7 +115,7 @@ top: defer func() { if r := recover(); r != nil { - stackSlice := make([]byte, 512) + stackSlice := make([]byte, 4092) sz := runtime.Stack(stackSlice, false) log.Error("Recovered from a serious error "+ "while processing "+h.Name+" task "+strconv.Itoa(int(*tID))+": ", r, diff --git a/provider/lpmessage/sender.go b/provider/lpmessage/sender.go index b839f8003..f6bd0a8a7 100644 --- a/provider/lpmessage/sender.go +++ b/provider/lpmessage/sender.go @@ -77,6 +77,8 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS return cid.Undef, xerrors.Errorf("getting key address: %w", err) } + msg.From = fromA + if msg.Nonce != 0 { return cid.Undef, xerrors.Errorf("Send expects message nonce to be 0, was %d", msg.Nonce) } @@ -107,14 +109,14 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS } // get nonce from db - var dbNonce uint64 + var dbNonce *uint64 r := tx.QueryRow(`select max(nonce) from message_sends where from_key = $1`, fromA.String()) if err := r.Scan(&dbNonce); err != nil { return false, xerrors.Errorf("getting nonce from db: %w", err) } - if dbNonce+1 > msgNonce { - msgNonce = dbNonce + 1 + if dbNonce != nil && *dbNonce+1 > msgNonce { + msgNonce = *dbNonce + 1 } msg.Nonce = msgNonce @@ -169,5 +171,5 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS log.Infow("sent message", "cid", sigMsg.Cid(), "from", fromA, "to", msg.To, "nonce", msg.Nonce, "value", msg.Value, "gaslimit", msg.GasLimit) - return cid.Undef, nil + return sigMsg.Cid(), nil } diff --git a/provider/lpwindow/compute_task.go b/provider/lpwindow/compute_task.go index a5baeb20a..372de22fc 100644 --- a/provider/lpwindow/compute_task.go +++ b/provider/lpwindow/compute_task.go @@ -162,22 +162,33 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done } // Insert into wdpost_proofs table - _, err = t.db.Exec(context.Background(), + n, err := t.db.Exec(context.Background(), `INSERT INTO wdpost_proofs ( sp_id, + proving_period_start, deadline, partition, submit_at_epoch, submit_by_epoch, proof_message) - VALUES ($1, $2, $3, $4, $5, $6)`, + VALUES ($1, $2, $3, $4, $5, $6, $7)`, spID, + pps, deadline.Index, partIdx, deadline.Open, deadline.Close, msgbuf.Bytes()) + if err != nil { + log.Errorf("WdPostTask.Do() failed to insert into wdpost_proofs: %v", err) + return false, err + } + if n != 1 { + log.Errorf("WdPostTask.Do() failed to insert into wdpost_proofs: %v", err) + return false, err + } + return true, nil } @@ -379,7 +390,7 @@ func (t *WdPostTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIden taskIdent.Partition_index, ) if err != nil { - return false, err + return false, xerrors.Errorf("insert partition task: %w", err) } return true, nil diff --git a/provider/lpwindow/submit_task.go b/provider/lpwindow/submit_task.go index dad7ca2f7..63f4e4651 100644 --- a/provider/lpwindow/submit_task.go +++ b/provider/lpwindow/submit_task.go @@ -3,6 +3,8 @@ package lpwindow import ( "bytes" "context" + "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/lotus/storage/wdpost" "golang.org/x/xerrors" @@ -32,6 +34,7 @@ type WdPoStSubmitTaskApi interface { StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error) StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) + StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error) GasEstimateGasPremium(_ context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error) @@ -71,14 +74,14 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) var spID uint64 var deadline uint64 var partition uint64 - var submitAtEpoch, submitByEpoch abi.ChainEpoch - var proofParamBytes []byte + var pps, submitAtEpoch, submitByEpoch abi.ChainEpoch + var earlyParamBytes []byte var dbTask uint64 err = w.db.QueryRow( - context.Background(), `select sp_id, deadline, partition, submit_at_epoch, submit_by_epoch, proof_message, submit_task_id - from wdpost_proofs where sp_id = $1 and deadline = $2 and partition = $3`, spID, deadline, partition, - ).Scan(&spID, &deadline, &partition, &submitAtEpoch, &submitByEpoch, &proofParamBytes, &dbTask) + context.Background(), `select sp_id, proving_period_start, deadline, partition, submit_at_epoch, submit_by_epoch, proof_message, submit_task_id + from wdpost_proofs where submit_task_id = $1`, taskID, + ).Scan(&spID, &pps, &deadline, &partition, &submitAtEpoch, &submitByEpoch, &earlyParamBytes, &dbTask) if err != nil { return false, xerrors.Errorf("query post proof: %w", err) } @@ -103,11 +106,31 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) return false, xerrors.Errorf("submit epoch not reached: %d < %d", head.Height(), submitAtEpoch) } + dlInfo := wdpost.NewDeadlineInfo(pps, deadline, head.Height()) + var params miner.SubmitWindowedPoStParams - if err := params.UnmarshalCBOR(bytes.NewReader(proofParamBytes)); err != nil { + if err := params.UnmarshalCBOR(bytes.NewReader(earlyParamBytes)); err != nil { return false, xerrors.Errorf("unmarshaling proof message: %w", err) } + commEpoch := dlInfo.Challenge + + commRand, err := w.api.StateGetRandomnessFromTickets(context.Background(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil, head.Key()) + if err != nil { + err = xerrors.Errorf("failed to get chain randomness from tickets for windowPost (epoch=%d): %w", commEpoch, err) + log.Errorf("submitPoStMessage failed: %+v", err) + + return false, xerrors.Errorf("getting post commit randomness: %w", err) + } + + params.ChainCommitEpoch = commEpoch + params.ChainCommitRand = commRand + + var pbuf bytes.Buffer + if err := params.MarshalCBOR(&pbuf); err != nil { + return false, xerrors.Errorf("marshaling proof message: %w", err) + } + maddr, err := address.NewIDAddress(spID) if err != nil { return false, xerrors.Errorf("invalid miner address: %w", err) @@ -121,7 +144,7 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) msg := &types.Message{ To: maddr, Method: builtin.MethodsMiner.SubmitWindowedPoSt, - Params: proofParamBytes, + Params: pbuf.Bytes(), Value: big.Zero(), From: mi.Worker, // set worker for now for gas estimation @@ -146,7 +169,7 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) // goodFunds = funds needed for optimal inclusion probability. // minFunds = funds needed for more speculative inclusion probability. - goodFunds := big.Add(msg.RequiredFunds(), msg.Value) + goodFunds := big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value) minFunds := big.Min(big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value), goodFunds) from, _, err := w.as.AddressFor(context.Background(), w.api, mi, api.PoStAddr, goodFunds, minFunds) @@ -167,7 +190,7 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) // set message_cid in the wdpost_proofs entry - _, err = w.db.Exec(context.Background(), `update wdpost_proofs set message_cid = $1 where sp_id = $2 and deadline = $3 and partition = $4`, smsg.String(), spID, deadline, partition) + _, err = w.db.Exec(context.Background(), `update wdpost_proofs set message_cid = $1 where sp_id = $2 and proving_period_start = $3 and deadline = $4 and partition = $5`, smsg.String(), spID, pps, deadline, partition) if err != nil { return true, xerrors.Errorf("updating wdpost_proofs: %w", err) } @@ -210,7 +233,7 @@ func (w *WdPostSubmitTask) Adder(taskFunc harmonytask.AddTaskFunc) { func (w *WdPostSubmitTask) processHeadChange(ctx context.Context, revert, apply *types.TipSet) error { tf := w.submitPoStTF.Val(ctx) - qry, err := w.db.Query(ctx, `select sp_id, deadline, partition, submit_at_epoch from wdpost_proofs where submit_task_id is null and submit_at_epoch <= $1`, apply.Height()) + qry, err := w.db.Query(ctx, `select sp_id, proving_period_start, deadline, partition, submit_at_epoch from wdpost_proofs where submit_task_id is null and submit_at_epoch <= $1`, apply.Height()) if err != nil { return err } @@ -218,18 +241,19 @@ func (w *WdPostSubmitTask) processHeadChange(ctx context.Context, revert, apply for qry.Next() { var spID int64 + var pps int64 var deadline uint64 var partition uint64 var submitAtEpoch uint64 - if err := qry.Scan(&spID, &deadline, &partition, &submitAtEpoch); err != nil { - return err + if err := qry.Scan(&spID, &pps, &deadline, &partition, &submitAtEpoch); err != nil { + return xerrors.Errorf("scan submittable posts: %w", err) } tf(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) { // update in transaction iff submit_task_id is still null - res, err := tx.Exec(`update wdpost_proofs set submit_task_id = $1 where sp_id = $2 and deadline = $3 and partition = $4 and submit_task_id is null`, id, spID, deadline, partition) + res, err := tx.Exec(`update wdpost_proofs set submit_task_id = $1 where sp_id = $2 and proving_period_start = $3 and deadline = $4 and partition = $5 and submit_task_id is null`, id, spID, pps, deadline, partition) if err != nil { - return false, err + return false, xerrors.Errorf("query ready proof: %w", err) } if res != 1 { return false, nil From fa33e8240919c7047a05d81b513a6be3f83a5083 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 4 Nov 2023 12:35:04 +0100 Subject: [PATCH 08/11] docsgen --- documentation/en/cli-lotus-miner.md | 4 ---- .../en/default-lotus-provider-config.toml | 19 +++++++++++++++++++ 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index 0aae5fbaf..8406b07cc 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -66,10 +66,6 @@ OPTIONS: --no-local-storage don't use storageminer repo for sector storage (default: false) --gas-premium value set gas premium for initialization messages in AttoFIL (default: "0") --from value select which address to send actor creation message from - --db-host value Command separated list of hostnames for yugabyte cluster (default: "yugabyte") [$LOTUS_DB_HOST] - --db-name value (default: "yugabyte") [$LOTUS_DB_NAME] - --db-user value (default: "yugabyte") [$LOTUS_DB_USER] - --db-password value (default: "yugabyte") [$LOTUS_DB_PASSWORD] --help, -h show help ``` diff --git a/documentation/en/default-lotus-provider-config.toml b/documentation/en/default-lotus-provider-config.toml index c9c0c9e78..9d420ff37 100644 --- a/documentation/en/default-lotus-provider-config.toml +++ b/documentation/en/default-lotus-provider-config.toml @@ -2,6 +2,9 @@ # type: bool #EnableWindowPost = false + # type: int + #WindowPostMaxTasks = 0 + # type: bool #EnableWinningPost = false @@ -188,3 +191,19 @@ # type: bool #SingleRecoveringPartitionPerPostMessage = false + +[Journal] + # Events of the form: "system1:event1,system1:event2[,...]" + # + # type: string + #DisabledEvents = "" + + +[Apis] + # RPC Secret for the storage subsystem. + # If integrating with lotus-miner this must match the value from + # cat ~/.lotusminer/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU | jq -r .PrivateKey + # + # type: string + #StorageRPCSecret = "" + From d509cc48e944b64523b748c247d6d2f9a73d2df0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 7 Nov 2023 13:38:00 +0100 Subject: [PATCH 09/11] lpwindow: Make sql queries a bit more readable --- provider/lpwindow/submit_task.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/provider/lpwindow/submit_task.go b/provider/lpwindow/submit_task.go index 63f4e4651..d387a9f06 100644 --- a/provider/lpwindow/submit_task.go +++ b/provider/lpwindow/submit_task.go @@ -79,8 +79,8 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) var dbTask uint64 err = w.db.QueryRow( - context.Background(), `select sp_id, proving_period_start, deadline, partition, submit_at_epoch, submit_by_epoch, proof_message, submit_task_id - from wdpost_proofs where submit_task_id = $1`, taskID, + context.Background(), `SELECT sp_id, proving_period_start, deadline, partition, submit_at_epoch, submit_by_epoch, proof_message, submit_task_id + FROM wdpost_proofs WHERE submit_task_id = $1`, taskID, ).Scan(&spID, &pps, &deadline, &partition, &submitAtEpoch, &submitByEpoch, &earlyParamBytes, &dbTask) if err != nil { return false, xerrors.Errorf("query post proof: %w", err) @@ -190,7 +190,7 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) // set message_cid in the wdpost_proofs entry - _, err = w.db.Exec(context.Background(), `update wdpost_proofs set message_cid = $1 where sp_id = $2 and proving_period_start = $3 and deadline = $4 and partition = $5`, smsg.String(), spID, pps, deadline, partition) + _, err = w.db.Exec(context.Background(), `UPDATE wdpost_proofs SET message_cid = $1 WHERE sp_id = $2 AND proving_period_start = $3 AND deadline = $4 AND partition = $5`, smsg.String(), spID, pps, deadline, partition) if err != nil { return true, xerrors.Errorf("updating wdpost_proofs: %w", err) } @@ -233,7 +233,7 @@ func (w *WdPostSubmitTask) Adder(taskFunc harmonytask.AddTaskFunc) { func (w *WdPostSubmitTask) processHeadChange(ctx context.Context, revert, apply *types.TipSet) error { tf := w.submitPoStTF.Val(ctx) - qry, err := w.db.Query(ctx, `select sp_id, proving_period_start, deadline, partition, submit_at_epoch from wdpost_proofs where submit_task_id is null and submit_at_epoch <= $1`, apply.Height()) + qry, err := w.db.Query(ctx, `SELECT sp_id, proving_period_start, deadline, partition, submit_at_epoch FROM wdpost_proofs WHERE submit_task_id IS NULL AND submit_at_epoch <= $1`, apply.Height()) if err != nil { return err } @@ -251,7 +251,7 @@ func (w *WdPostSubmitTask) processHeadChange(ctx context.Context, revert, apply tf(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) { // update in transaction iff submit_task_id is still null - res, err := tx.Exec(`update wdpost_proofs set submit_task_id = $1 where sp_id = $2 and proving_period_start = $3 and deadline = $4 and partition = $5 and submit_task_id is null`, id, spID, pps, deadline, partition) + res, err := tx.Exec(`UPDATE wdpost_proofs SET submit_task_id = $1 WHERE sp_id = $2 AND proving_period_start = $3 AND deadline = $4 AND partition = $5 AND submit_task_id IS NULL`, id, spID, pps, deadline, partition) if err != nil { return false, xerrors.Errorf("query ready proof: %w", err) } From 5c5055884c1b5848fdb600416116c65f3c827dca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 7 Nov 2023 13:38:24 +0100 Subject: [PATCH 10/11] harmony: Change message_sends send_time to timestamp --- lib/harmony/harmonydb/sql/20231103.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/harmony/harmonydb/sql/20231103.sql b/lib/harmony/harmonydb/sql/20231103.sql index 669f122f4..50aea400f 100644 --- a/lib/harmony/harmonydb/sql/20231103.sql +++ b/lib/harmony/harmonydb/sql/20231103.sql @@ -6,7 +6,7 @@ create table message_sends signed_data bytea not null, signed_json jsonb not null, signed_cid text not null, - send_time date default CURRENT_TIMESTAMP, + send_time timestamp default CURRENT_TIMESTAMP, send_reason text, send_success bool default false not null, constraint message_sends_pk From de280a5708ce313b17fd6303105e6540bf55993f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 7 Nov 2023 14:03:31 +0100 Subject: [PATCH 11/11] lpwindow: proof_message -> proof_params rename --- lib/harmony/harmonydb/sql/20230823.sql | 2 +- provider/lpmessage/sender.go | 2 ++ provider/lpwindow/compute_task.go | 2 +- provider/lpwindow/submit_task.go | 2 +- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/harmony/harmonydb/sql/20230823.sql b/lib/harmony/harmonydb/sql/20230823.sql index d2d954c7c..750b7ea25 100644 --- a/lib/harmony/harmonydb/sql/20230823.sql +++ b/lib/harmony/harmonydb/sql/20230823.sql @@ -25,7 +25,7 @@ create table wdpost_proofs partition bigint not null, submit_at_epoch bigint not null, submit_by_epoch bigint not null, - proof_message bytea, + proof_params bytea, submit_task_id bigint, message_cid text, diff --git a/provider/lpmessage/sender.go b/provider/lpmessage/sender.go index f6bd0a8a7..5123410c9 100644 --- a/provider/lpmessage/sender.go +++ b/provider/lpmessage/sender.go @@ -157,6 +157,8 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS // push to mpool _, err = s.api.MpoolPush(ctx, sigMsg) if err != nil { + // TODO: We may get nonce gaps here.. + return cid.Undef, xerrors.Errorf("mpool push: failed to push message: %w", err) } diff --git a/provider/lpwindow/compute_task.go b/provider/lpwindow/compute_task.go index 372de22fc..949c09365 100644 --- a/provider/lpwindow/compute_task.go +++ b/provider/lpwindow/compute_task.go @@ -170,7 +170,7 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done partition, submit_at_epoch, submit_by_epoch, - proof_message) + proof_params) VALUES ($1, $2, $3, $4, $5, $6, $7)`, spID, pps, diff --git a/provider/lpwindow/submit_task.go b/provider/lpwindow/submit_task.go index d387a9f06..cfae3e292 100644 --- a/provider/lpwindow/submit_task.go +++ b/provider/lpwindow/submit_task.go @@ -79,7 +79,7 @@ func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) var dbTask uint64 err = w.db.QueryRow( - context.Background(), `SELECT sp_id, proving_period_start, deadline, partition, submit_at_epoch, submit_by_epoch, proof_message, submit_task_id + context.Background(), `SELECT sp_id, proving_period_start, deadline, partition, submit_at_epoch, submit_by_epoch, proof_params, submit_task_id FROM wdpost_proofs WHERE submit_task_id = $1`, taskID, ).Scan(&spID, &pps, &deadline, &partition, &submitAtEpoch, &submitByEpoch, &earlyParamBytes, &dbTask) if err != nil {