diff --git a/cmd/lotus-provider/proving.go b/cmd/lotus-provider/proving.go index db34050b5..a07a85b27 100644 --- a/cmd/lotus-provider/proving.go +++ b/cmd/lotus-provider/proving.go @@ -161,7 +161,7 @@ It will not send any messages to the chain. Since it can compute any deadline, o return err } - wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, deps.cfg.Fees, deps.cfg.Proving, deps.full, deps.verif, deps.lw, + wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, deps.cfg.Fees, deps.cfg.Proving, deps.full, deps.verif, deps.lw, nil, deps.as, deps.maddrs, deps.db, deps.stor, deps.si, deps.cfg.Subsystems.WindowPostMaxTasks) if err != nil { return err diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index efe6cd961..f8325578f 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -42,6 +42,7 @@ import ( "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/provider" + "github.com/filecoin-project/lotus/provider/lpmessage" "github.com/filecoin-project/lotus/provider/lpwinning" "github.com/filecoin-project/lotus/storage/ctladdr" "github.com/filecoin-project/lotus/storage/paths" @@ -146,14 +147,18 @@ var runCmd = &cli.Command{ } cfg, db, full, verif, lw, as, maddrs, stor, si, localStore := deps.cfg, deps.db, deps.full, deps.verif, deps.lw, deps.as, deps.maddrs, deps.stor, deps.si, deps.localStore + var activeTasks []harmonytask.TaskInterface + + sender, sendTask := lpmessage.NewSender(full, full, db) + activeTasks = append(activeTasks, sendTask) + /////////////////////////////////////////////////////////////////////// ///// Task Selection /////////////////////////////////////////////////////////////////////// - var activeTasks []harmonytask.TaskInterface { if cfg.Subsystems.EnableWindowPost { - wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, + wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, sender, as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks) if err != nil { return err diff --git a/lib/harmony/harmonydb/sql/20231103.sql b/lib/harmony/harmonydb/sql/20231103.sql index 50aea400f..e70cf3738 100644 --- a/lib/harmony/harmonydb/sql/20231103.sql +++ b/lib/harmony/harmonydb/sql/20231103.sql @@ -1,22 +1,55 @@ create table message_sends ( - from_key text not null, - nonce bigint not null, - to_addr text not null, - signed_data bytea not null, - signed_json jsonb not null, - signed_cid text not null, - send_time timestamp default CURRENT_TIMESTAMP, - send_reason text, - send_success bool default false not null, + from_key text not null, + to_addr text not null, + send_reason text not null, + send_task_id bigint not null, + + unsigned_data bytea not null, + unsigned_cid text not null, + + nonce bigint, + signed_data bytea, + signed_json jsonb, + signed_cid text, + + send_time timestamp default null, + send_success boolean default null, + send_error text, + constraint message_sends_pk - primary key (from_key, nonce) + primary key (send_task_id, from_key) ); comment on column message_sends.from_key is 'text f[1/3/4]... address'; -comment on column message_sends.nonce is 'assigned message nonce'; comment on column message_sends.to_addr is 'text f[0/1/2/3/4]... address'; -comment on column message_sends.signed_data is 'signed message data'; -comment on column message_sends.signed_cid is 'signed message cid'; comment on column message_sends.send_reason is 'optional description of send reason'; -comment on column message_sends.send_success is 'whether this message was broadcasted to the network already'; +comment on column message_sends.send_task_id is 'harmony task id of the send task'; + +comment on column message_sends.unsigned_data is 'unsigned message data'; +comment on column message_sends.unsigned_cid is 'unsigned message cid'; + +comment on column message_sends.nonce is 'assigned message nonce, set while the send task is executing'; +comment on column message_sends.signed_data is 'signed message data, set while the send task is executing'; +comment on column message_sends.signed_cid is 'signed message cid, set while the send task is executing'; + +comment on column message_sends.send_time is 'time when the send task was executed, set after pushing the message to the network'; +comment on column message_sends.send_success is 'whether this message was broadcasted to the network already, null if not yet attempted, true if successful, false if failed'; +comment on column message_sends.send_error is 'error message if send_success is false'; + +create unique index message_sends_success_index + on message_sends (from_key, nonce) + where send_success is not false; + +comment on index message_sends_success_index is +'message_sends_success_index enforces sender/nonce uniqueness, it is a conditional index that only indexes rows where send_success is not false. This allows us to have multiple rows with the same sender/nonce, as long as only one of them was successfully broadcasted (true) to the network or is in the process of being broadcasted (null).'; + +create table message_send_locks +( + from_key text not null, + task_id bigint not null, + claimed_at timestamp not null, + + constraint message_send_locks_pk + primary key (from_key) +); diff --git a/provider/builder.go b/provider/builder.go index e6adc1795..81a1a7a0a 100644 --- a/provider/builder.go +++ b/provider/builder.go @@ -20,7 +20,7 @@ import ( //var log = logging.Logger("provider") func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig, - api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, + api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, sender *lpmessage.Sender, as *ctladdr.AddressSelector, addresses []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, *lpwindow.WdPostSubmitTask, *lpwindow.WdPostRecoverDeclareTask, error) { @@ -29,8 +29,6 @@ func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc co // todo config ft := lpwindow.NewSimpleFaultTracker(stor, idx, 32, 5*time.Second, 300*time.Second) - sender := lpmessage.NewSender(api, api, db) - computeTask, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, addresses, max) if err != nil { return nil, nil, nil, err diff --git a/provider/lpmessage/sender.go b/provider/lpmessage/sender.go index 9f3d9104d..fe47f7f00 100644 --- a/provider/lpmessage/sender.go +++ b/provider/lpmessage/sender.go @@ -1,11 +1,14 @@ package lpmessage import ( + "bytes" "context" + "time" "github.com/google/uuid" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" + "go.uber.org/multierr" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -14,10 +17,15 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" + "github.com/filecoin-project/lotus/lib/promise" ) var log = logging.Logger("lpmessage") +var SendLockedWait = 100 * time.Millisecond + type SenderAPI interface { StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) @@ -32,23 +40,220 @@ type SignerAPI interface { // Sender abstracts away highly-available message sending with coordination through // HarmonyDB. It make sure that nonces are assigned transactionally, and that -// messages are correctly broadcasted to the network. It is not a Task in the sense -// of a HarmonyTask interface, just a helper for tasks which need to send messages -// to the network. +// messages are correctly broadcasted to the network. It ensures that messages +// are sent serially, and that failures to send don't cause nonce gaps. type Sender struct { + api SenderAPI + + sendTask *SendTask + + db *harmonydb.DB +} + +type SendTask struct { + sendTF promise.Promise[harmonytask.AddTaskFunc] + api SenderAPI signer SignerAPI db *harmonydb.DB } +func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + ctx := context.TODO() + + // get message from db + + var dbMsg struct { + FromKey string `db:"from_key"` + ToAddr string `db:"to_addr"` + + UnsignedData []byte `db:"unsigned_data"` + UnsignedCid string `db:"unsigned_cid"` + + // may not be null if we have somehow already signed but failed to send this message + Nonce *uint64 `db:"nonce"` + SignedData []byte `db:"signed_data"` + } + + err = s.db.QueryRow(ctx, `select from_key, nonce, to_addr, unsigned_data, unsigned_cid from message_sends where id = $1`, taskID).Scan(&dbMsg) + if err != nil { + return false, xerrors.Errorf("getting message from db: %w", err) + } + + // deserialize the message + var msg types.Message + err = msg.UnmarshalCBOR(bytes.NewReader(dbMsg.UnsignedData)) + if err != nil { + return false, xerrors.Errorf("unmarshaling unsigned db message: %w", err) + } + + // get db send lock + for { + // check if we still own the task + if !stillOwned() { + return false, xerrors.Errorf("lost ownership of task") + } + + // try to acquire lock + cn, err := s.db.Exec(ctx, `INSERT INTO message_send_locks (from_key, task_id, claimed_at) VALUES ($1, $2, CURRENT_TIMESTAMP) + ON CONFLICT (from_key) DO UPDATE SET task_id = EXCLUDED.task_id, claimed_at = CURRENT_TIMESTAMP WHERE message_send_locks.task_id = $2;`, dbMsg.FromKey, taskID) + if err != nil { + return false, xerrors.Errorf("acquiring send lock: %w", err) + } + + if cn == 1 { + // we got the lock + break + } + + // we didn't get the lock, wait a bit and try again + log.Infow("waiting for send lock", "task_id", taskID, "from", dbMsg.FromKey) + time.Sleep(SendLockedWait) + } + + // defer release db send lock + defer func() { + _, err := s.db.Exec(ctx, `delete from message_send_locks where from_key = $1 and task_id = $2`, dbMsg.FromKey, taskID) + if err != nil { + log.Errorw("releasing send lock", "task_id", taskID, "from", dbMsg.FromKey, "error", err) + + // make sure harmony retries this task so that we eventually release this lock + done = false + err = multierr.Append(err, xerrors.Errorf("releasing send lock: %w", err)) + } + }() + + // assign nonce IF NOT ASSIGNED (max(api.MpoolGetNonce, db nonce+1)) + var sigMsg *types.SignedMessage + + if dbMsg.Nonce == nil { + msgNonce, err := s.api.MpoolGetNonce(ctx, msg.From) + if err != nil { + return false, xerrors.Errorf("getting nonce from mpool: %w", err) + } + + // get nonce from db + var dbNonce *uint64 + r := s.db.QueryRow(ctx, `select max(nonce) from message_sends where from_key = $1 and send_success = true`, msg.From.String()) + if err := r.Scan(&dbNonce); err != nil { + return false, xerrors.Errorf("getting nonce from db: %w", err) + } + + if dbNonce != nil && *dbNonce+1 > msgNonce { + msgNonce = *dbNonce + 1 + } + + msg.Nonce = msgNonce + + // sign message + sigMsg, err = s.signer.WalletSignMessage(ctx, msg.From, &msg) + if err != nil { + return false, xerrors.Errorf("signing message: %w", err) + } + + data, err := sigMsg.Serialize() + if err != nil { + return false, xerrors.Errorf("serializing message: %w", err) + } + + jsonBytes, err := sigMsg.MarshalJSON() + if err != nil { + return false, xerrors.Errorf("marshaling message: %w", err) + } + + // write to db + + n, err := s.db.Exec(ctx, `update message_sends set nonce = $1, signed_data = $2, signed_json = $3, signed_cid = $4 where send_task_id = $5`, + msg.Nonce, data, string(jsonBytes), sigMsg.Cid().String(), taskID) + if err != nil { + return false, xerrors.Errorf("updating db record: %w", err) + } + if n != 1 { + log.Errorw("updating db record: expected 1 row to be affected, got %d", n) + return false, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", n) + } + } else { + // Note: this handles an unlikely edge-case: + // We have previously signed the message but either failed to send it or failed to update the db + // note that when that happens the likely cause is the provider process losing its db connection + // or getting killed before it can update the db. In that case the message lock will still be held + // so it will be safe to rebroadcast the signed message + + // deserialize the signed message + sigMsg = new(types.SignedMessage) + err = sigMsg.UnmarshalCBOR(bytes.NewReader(dbMsg.SignedData)) + if err != nil { + return false, xerrors.Errorf("unmarshaling signed db message: %w", err) + } + } + + // send! + _, err = s.api.MpoolPush(ctx, sigMsg) + + // persist send result + var sendSuccess = err == nil + var sendError string + if err != nil { + sendError = err.Error() + } + + _, err = s.db.Exec(ctx, `update message_sends set send_success = $1, send_error = $2, send_time = CURRENT_TIMESTAMP where send_task_id = $3`, sendSuccess, sendError, taskID) + if err != nil { + return false, xerrors.Errorf("updating db record: %w", err) + } + + return true, nil +} + +func (s *SendTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + if len(ids) == 0 { + // probably can't happen, but panicking is bad + return nil, nil + } + + if s.signer == nil { + // can't sign messages here + return nil, nil + } + + return &ids[0], nil +} + +func (s *SendTask) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Max: 1024, + Name: "SendMessage", + Cost: resources.Resources{ + Cpu: 0, + Gpu: 0, + Ram: 1 << 20, + }, + MaxFailures: 1000, + Follows: nil, + } +} + +func (s *SendTask) Adder(taskFunc harmonytask.AddTaskFunc) { + s.sendTF.Set(taskFunc) +} + +var _ harmonytask.TaskInterface = &SendTask{} + // NewSender creates a new Sender. -func NewSender(api SenderAPI, signer SignerAPI, db *harmonydb.DB) *Sender { - return &Sender{ +func NewSender(api SenderAPI, signer SignerAPI, db *harmonydb.DB) (*Sender, *SendTask) { + st := &SendTask{ api: api, signer: signer, db: db, } + + return &Sender{ + api: api, + db: db, + + sendTask: st, + }, st } // Send atomically assigns a nonce, signs, and pushes a message @@ -97,80 +302,70 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS return cid.Undef, xerrors.Errorf("mpool push: not enough funds: %s < %s", b, requiredFunds) } - var sigMsg *types.SignedMessage + // push the task + taskAdder := s.sendTask.sendTF.Val(ctx) - // start db tx - c, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { - // assign nonce (max(api.MpoolGetNonce, db nonce+1)) - msgNonce, err := s.api.MpoolGetNonce(ctx, fromA) - if err != nil { - return false, xerrors.Errorf("getting nonce from mpool: %w", err) - } + unsBytes := new(bytes.Buffer) + err = msg.MarshalCBOR(unsBytes) + if err != nil { + return cid.Undef, xerrors.Errorf("marshaling message: %w", err) + } - // get nonce from db - var dbNonce *uint64 - r := tx.QueryRow(`select max(nonce) from message_sends where from_key = $1`, fromA.String()) - if err := r.Scan(&dbNonce); err != nil { - return false, xerrors.Errorf("getting nonce from db: %w", err) - } - - if dbNonce != nil && *dbNonce+1 > msgNonce { - msgNonce = *dbNonce + 1 - } - - msg.Nonce = msgNonce - - // sign message - sigMsg, err = s.signer.WalletSignMessage(ctx, msg.From, msg) - if err != nil { - return false, xerrors.Errorf("signing message: %w", err) - } - - data, err := sigMsg.Serialize() - if err != nil { - return false, xerrors.Errorf("serializing message: %w", err) - } - - jsonBytes, err := sigMsg.MarshalJSON() - if err != nil { - return false, xerrors.Errorf("marshaling message: %w", err) - } - - // write to db - c, err := tx.Exec(`insert into message_sends (from_key, nonce, to_addr, signed_data, signed_json, signed_cid, send_reason) values ($1, $2, $3, $4, $5, $6, $7)`, - fromA.String(), msg.Nonce, msg.To.String(), data, string(jsonBytes), sigMsg.Cid().String(), reason) + taskAdder(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + _, err := tx.Exec(`insert into message_sends (from_key, to_addr, send_reason, unsigned_data, unsigned_cid, send_task_id) values ($1, $2, $3, $4, $5, $6)`, + msg.From.String(), msg.To.String(), reason, unsBytes.Bytes(), msg.Cid().String(), id) if err != nil { return false, xerrors.Errorf("inserting message into db: %w", err) } - if c != 1 { - return false, xerrors.Errorf("inserting message into db: expected 1 row to be affected, got %d", c) - } - // commit return true, nil }) - if err != nil || !c { - return cid.Undef, xerrors.Errorf("transaction failed or didn't commit: %w", err) + + // wait for exec + var ( + pollInterval = 50 * time.Millisecond + pollIntervalMul = 2 + maxPollInterval = 5 * time.Second + pollLoops = 0 + + sigCid cid.Cid + sendErr error + ) + + for { + var err error + var sigCidStr, sendError string + var sendSuccess *bool + + err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, taskAdder).Scan(&sigCidStr, &sendSuccess, &sendError) + if err != nil { + return cid.Undef, xerrors.Errorf("getting cid for task: %w", err) + } + + if sendSuccess == nil { + time.Sleep(pollInterval) + pollLoops++ + pollInterval *= time.Duration(pollIntervalMul) + if pollInterval > maxPollInterval { + pollInterval = maxPollInterval + } + + continue + } + + if !*sendSuccess { + sendErr = xerrors.Errorf("send error: %s", sendError) + } else { + sigCid, err = cid.Parse(sigCidStr) + if err != nil { + return cid.Undef, xerrors.Errorf("parsing signed cid: %w", err) + } + } + + break } - // push to mpool - _, err = s.api.MpoolPush(ctx, sigMsg) - if err != nil { - // TODO: We may get nonce gaps here.. + log.Infow("sent message", "cid", sigCid, "task_id", taskAdder, "send_error", sendErr, "poll_loops", pollLoops) - return cid.Undef, xerrors.Errorf("mpool push: failed to push message: %w", err) - } - - // update db recocd to say it was pushed (set send_success to true) - cn, err := s.db.Exec(ctx, `update message_sends set send_success = true where from_key = $1 and nonce = $2`, fromA.String(), msg.Nonce) - if err != nil { - return cid.Undef, xerrors.Errorf("updating db record: %w", err) - } - if cn != 1 { - return cid.Undef, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", cn) - } - - log.Infow("sent message", "cid", sigMsg.Cid(), "from", fromA, "to", msg.To, "nonce", msg.Nonce, "value", msg.Value, "gaslimit", msg.GasLimit) - - return sigMsg.Cid(), nil + return sigCid, sendErr }