Merge pull request #11467 from filecoin-project/feat/correct-lp-message-send

feat: lpmessage: Correct message sending
This commit is contained in:
Łukasz Magiera 2023-11-30 19:18:10 +01:00 committed by GitHub
commit 20d3b2674a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 321 additions and 90 deletions

View File

@ -161,7 +161,7 @@ It will not send any messages to the chain. Since it can compute any deadline, o
return err return err
} }
wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, deps.cfg.Fees, deps.cfg.Proving, deps.full, deps.verif, deps.lw, wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, deps.cfg.Fees, deps.cfg.Proving, deps.full, deps.verif, deps.lw, nil,
deps.as, deps.maddrs, deps.db, deps.stor, deps.si, deps.cfg.Subsystems.WindowPostMaxTasks) deps.as, deps.maddrs, deps.db, deps.stor, deps.si, deps.cfg.Subsystems.WindowPostMaxTasks)
if err != nil { if err != nil {
return err return err

View File

@ -42,6 +42,7 @@ import (
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/provider" "github.com/filecoin-project/lotus/provider"
"github.com/filecoin-project/lotus/provider/lpmessage"
"github.com/filecoin-project/lotus/provider/lpwinning" "github.com/filecoin-project/lotus/provider/lpwinning"
"github.com/filecoin-project/lotus/storage/ctladdr" "github.com/filecoin-project/lotus/storage/ctladdr"
"github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/paths"
@ -146,14 +147,18 @@ var runCmd = &cli.Command{
} }
cfg, db, full, verif, lw, as, maddrs, stor, si, localStore := deps.cfg, deps.db, deps.full, deps.verif, deps.lw, deps.as, deps.maddrs, deps.stor, deps.si, deps.localStore cfg, db, full, verif, lw, as, maddrs, stor, si, localStore := deps.cfg, deps.db, deps.full, deps.verif, deps.lw, deps.as, deps.maddrs, deps.stor, deps.si, deps.localStore
var activeTasks []harmonytask.TaskInterface
sender, sendTask := lpmessage.NewSender(full, full, db)
activeTasks = append(activeTasks, sendTask)
/////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////
///// Task Selection ///// Task Selection
/////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////
var activeTasks []harmonytask.TaskInterface
{ {
if cfg.Subsystems.EnableWindowPost { if cfg.Subsystems.EnableWindowPost {
wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, sender,
as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks) as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks)
if err != nil { if err != nil {
return err return err

View File

@ -1,22 +1,55 @@
create table message_sends create table message_sends
( (
from_key text not null, from_key text not null,
nonce bigint not null, to_addr text not null,
to_addr text not null, send_reason text not null,
signed_data bytea not null, send_task_id bigint not null,
signed_json jsonb not null,
signed_cid text not null, unsigned_data bytea not null,
send_time timestamp default CURRENT_TIMESTAMP, unsigned_cid text not null,
send_reason text,
send_success bool default false not null, nonce bigint,
signed_data bytea,
signed_json jsonb,
signed_cid text,
send_time timestamp default null,
send_success boolean default null,
send_error text,
constraint message_sends_pk constraint message_sends_pk
primary key (from_key, nonce) primary key (send_task_id, from_key)
); );
comment on column message_sends.from_key is 'text f[1/3/4]... address'; comment on column message_sends.from_key is 'text f[1/3/4]... address';
comment on column message_sends.nonce is 'assigned message nonce';
comment on column message_sends.to_addr is 'text f[0/1/2/3/4]... address'; comment on column message_sends.to_addr is 'text f[0/1/2/3/4]... address';
comment on column message_sends.signed_data is 'signed message data';
comment on column message_sends.signed_cid is 'signed message cid';
comment on column message_sends.send_reason is 'optional description of send reason'; comment on column message_sends.send_reason is 'optional description of send reason';
comment on column message_sends.send_success is 'whether this message was broadcasted to the network already'; comment on column message_sends.send_task_id is 'harmony task id of the send task';
comment on column message_sends.unsigned_data is 'unsigned message data';
comment on column message_sends.unsigned_cid is 'unsigned message cid';
comment on column message_sends.nonce is 'assigned message nonce, set while the send task is executing';
comment on column message_sends.signed_data is 'signed message data, set while the send task is executing';
comment on column message_sends.signed_cid is 'signed message cid, set while the send task is executing';
comment on column message_sends.send_time is 'time when the send task was executed, set after pushing the message to the network';
comment on column message_sends.send_success is 'whether this message was broadcasted to the network already, null if not yet attempted, true if successful, false if failed';
comment on column message_sends.send_error is 'error message if send_success is false';
create unique index message_sends_success_index
on message_sends (from_key, nonce)
where send_success is not false;
comment on index message_sends_success_index is
'message_sends_success_index enforces sender/nonce uniqueness, it is a conditional index that only indexes rows where send_success is not false. This allows us to have multiple rows with the same sender/nonce, as long as only one of them was successfully broadcasted (true) to the network or is in the process of being broadcasted (null).';
create table message_send_locks
(
from_key text not null,
task_id bigint not null,
claimed_at timestamp not null,
constraint message_send_locks_pk
primary key (from_key)
);

View File

@ -20,7 +20,7 @@ import (
//var log = logging.Logger("provider") //var log = logging.Logger("provider")
func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig, func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, sender *lpmessage.Sender,
as *ctladdr.AddressSelector, addresses []dtypes.MinerAddress, db *harmonydb.DB, as *ctladdr.AddressSelector, addresses []dtypes.MinerAddress, db *harmonydb.DB,
stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, *lpwindow.WdPostSubmitTask, *lpwindow.WdPostRecoverDeclareTask, error) { stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, *lpwindow.WdPostSubmitTask, *lpwindow.WdPostRecoverDeclareTask, error) {
@ -29,8 +29,6 @@ func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc co
// todo config // todo config
ft := lpwindow.NewSimpleFaultTracker(stor, idx, 32, 5*time.Second, 300*time.Second) ft := lpwindow.NewSimpleFaultTracker(stor, idx, 32, 5*time.Second, 300*time.Second)
sender := lpmessage.NewSender(api, api, db)
computeTask, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, addresses, max) computeTask, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, addresses, max)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err

View File

@ -1,11 +1,14 @@
package lpmessage package lpmessage
import ( import (
"bytes"
"context" "context"
"time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"go.uber.org/multierr"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
@ -14,10 +17,15 @@ import (
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/lib/promise"
) )
var log = logging.Logger("lpmessage") var log = logging.Logger("lpmessage")
var SendLockedWait = 100 * time.Millisecond
type SenderAPI interface { type SenderAPI interface {
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
@ -32,23 +40,220 @@ type SignerAPI interface {
// Sender abstracts away highly-available message sending with coordination through // Sender abstracts away highly-available message sending with coordination through
// HarmonyDB. It make sure that nonces are assigned transactionally, and that // HarmonyDB. It make sure that nonces are assigned transactionally, and that
// messages are correctly broadcasted to the network. It is not a Task in the sense // messages are correctly broadcasted to the network. It ensures that messages
// of a HarmonyTask interface, just a helper for tasks which need to send messages // are sent serially, and that failures to send don't cause nonce gaps.
// to the network.
type Sender struct { type Sender struct {
api SenderAPI
sendTask *SendTask
db *harmonydb.DB
}
type SendTask struct {
sendTF promise.Promise[harmonytask.AddTaskFunc]
api SenderAPI api SenderAPI
signer SignerAPI signer SignerAPI
db *harmonydb.DB db *harmonydb.DB
} }
func (s *SendTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
ctx := context.TODO()
// get message from db
var dbMsg struct {
FromKey string `db:"from_key"`
ToAddr string `db:"to_addr"`
UnsignedData []byte `db:"unsigned_data"`
UnsignedCid string `db:"unsigned_cid"`
// may not be null if we have somehow already signed but failed to send this message
Nonce *uint64 `db:"nonce"`
SignedData []byte `db:"signed_data"`
}
err = s.db.QueryRow(ctx, `select from_key, nonce, to_addr, unsigned_data, unsigned_cid from message_sends where id = $1`, taskID).Scan(&dbMsg)
if err != nil {
return false, xerrors.Errorf("getting message from db: %w", err)
}
// deserialize the message
var msg types.Message
err = msg.UnmarshalCBOR(bytes.NewReader(dbMsg.UnsignedData))
if err != nil {
return false, xerrors.Errorf("unmarshaling unsigned db message: %w", err)
}
// get db send lock
for {
// check if we still own the task
if !stillOwned() {
return false, xerrors.Errorf("lost ownership of task")
}
// try to acquire lock
cn, err := s.db.Exec(ctx, `INSERT INTO message_send_locks (from_key, task_id, claimed_at) VALUES ($1, $2, CURRENT_TIMESTAMP)
ON CONFLICT (from_key) DO UPDATE SET task_id = EXCLUDED.task_id, claimed_at = CURRENT_TIMESTAMP WHERE message_send_locks.task_id = $2;`, dbMsg.FromKey, taskID)
if err != nil {
return false, xerrors.Errorf("acquiring send lock: %w", err)
}
if cn == 1 {
// we got the lock
break
}
// we didn't get the lock, wait a bit and try again
log.Infow("waiting for send lock", "task_id", taskID, "from", dbMsg.FromKey)
time.Sleep(SendLockedWait)
}
// defer release db send lock
defer func() {
_, err := s.db.Exec(ctx, `delete from message_send_locks where from_key = $1 and task_id = $2`, dbMsg.FromKey, taskID)
if err != nil {
log.Errorw("releasing send lock", "task_id", taskID, "from", dbMsg.FromKey, "error", err)
// make sure harmony retries this task so that we eventually release this lock
done = false
err = multierr.Append(err, xerrors.Errorf("releasing send lock: %w", err))
}
}()
// assign nonce IF NOT ASSIGNED (max(api.MpoolGetNonce, db nonce+1))
var sigMsg *types.SignedMessage
if dbMsg.Nonce == nil {
msgNonce, err := s.api.MpoolGetNonce(ctx, msg.From)
if err != nil {
return false, xerrors.Errorf("getting nonce from mpool: %w", err)
}
// get nonce from db
var dbNonce *uint64
r := s.db.QueryRow(ctx, `select max(nonce) from message_sends where from_key = $1 and send_success = true`, msg.From.String())
if err := r.Scan(&dbNonce); err != nil {
return false, xerrors.Errorf("getting nonce from db: %w", err)
}
if dbNonce != nil && *dbNonce+1 > msgNonce {
msgNonce = *dbNonce + 1
}
msg.Nonce = msgNonce
// sign message
sigMsg, err = s.signer.WalletSignMessage(ctx, msg.From, &msg)
if err != nil {
return false, xerrors.Errorf("signing message: %w", err)
}
data, err := sigMsg.Serialize()
if err != nil {
return false, xerrors.Errorf("serializing message: %w", err)
}
jsonBytes, err := sigMsg.MarshalJSON()
if err != nil {
return false, xerrors.Errorf("marshaling message: %w", err)
}
// write to db
n, err := s.db.Exec(ctx, `update message_sends set nonce = $1, signed_data = $2, signed_json = $3, signed_cid = $4 where send_task_id = $5`,
msg.Nonce, data, string(jsonBytes), sigMsg.Cid().String(), taskID)
if err != nil {
return false, xerrors.Errorf("updating db record: %w", err)
}
if n != 1 {
log.Errorw("updating db record: expected 1 row to be affected, got %d", n)
return false, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", n)
}
} else {
// Note: this handles an unlikely edge-case:
// We have previously signed the message but either failed to send it or failed to update the db
// note that when that happens the likely cause is the provider process losing its db connection
// or getting killed before it can update the db. In that case the message lock will still be held
// so it will be safe to rebroadcast the signed message
// deserialize the signed message
sigMsg = new(types.SignedMessage)
err = sigMsg.UnmarshalCBOR(bytes.NewReader(dbMsg.SignedData))
if err != nil {
return false, xerrors.Errorf("unmarshaling signed db message: %w", err)
}
}
// send!
_, err = s.api.MpoolPush(ctx, sigMsg)
// persist send result
var sendSuccess = err == nil
var sendError string
if err != nil {
sendError = err.Error()
}
_, err = s.db.Exec(ctx, `update message_sends set send_success = $1, send_error = $2, send_time = CURRENT_TIMESTAMP where send_task_id = $3`, sendSuccess, sendError, taskID)
if err != nil {
return false, xerrors.Errorf("updating db record: %w", err)
}
return true, nil
}
func (s *SendTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
if len(ids) == 0 {
// probably can't happen, but panicking is bad
return nil, nil
}
if s.signer == nil {
// can't sign messages here
return nil, nil
}
return &ids[0], nil
}
func (s *SendTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: 1024,
Name: "SendMessage",
Cost: resources.Resources{
Cpu: 0,
Gpu: 0,
Ram: 1 << 20,
},
MaxFailures: 1000,
Follows: nil,
}
}
func (s *SendTask) Adder(taskFunc harmonytask.AddTaskFunc) {
s.sendTF.Set(taskFunc)
}
var _ harmonytask.TaskInterface = &SendTask{}
// NewSender creates a new Sender. // NewSender creates a new Sender.
func NewSender(api SenderAPI, signer SignerAPI, db *harmonydb.DB) *Sender { func NewSender(api SenderAPI, signer SignerAPI, db *harmonydb.DB) (*Sender, *SendTask) {
return &Sender{ st := &SendTask{
api: api, api: api,
signer: signer, signer: signer,
db: db, db: db,
} }
return &Sender{
api: api,
db: db,
sendTask: st,
}, st
} }
// Send atomically assigns a nonce, signs, and pushes a message // Send atomically assigns a nonce, signs, and pushes a message
@ -97,80 +302,70 @@ func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageS
return cid.Undef, xerrors.Errorf("mpool push: not enough funds: %s < %s", b, requiredFunds) return cid.Undef, xerrors.Errorf("mpool push: not enough funds: %s < %s", b, requiredFunds)
} }
var sigMsg *types.SignedMessage // push the task
taskAdder := s.sendTask.sendTF.Val(ctx)
// start db tx unsBytes := new(bytes.Buffer)
c, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { err = msg.MarshalCBOR(unsBytes)
// assign nonce (max(api.MpoolGetNonce, db nonce+1)) if err != nil {
msgNonce, err := s.api.MpoolGetNonce(ctx, fromA) return cid.Undef, xerrors.Errorf("marshaling message: %w", err)
if err != nil { }
return false, xerrors.Errorf("getting nonce from mpool: %w", err)
}
// get nonce from db taskAdder(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
var dbNonce *uint64 _, err := tx.Exec(`insert into message_sends (from_key, to_addr, send_reason, unsigned_data, unsigned_cid, send_task_id) values ($1, $2, $3, $4, $5, $6)`,
r := tx.QueryRow(`select max(nonce) from message_sends where from_key = $1`, fromA.String()) msg.From.String(), msg.To.String(), reason, unsBytes.Bytes(), msg.Cid().String(), id)
if err := r.Scan(&dbNonce); err != nil {
return false, xerrors.Errorf("getting nonce from db: %w", err)
}
if dbNonce != nil && *dbNonce+1 > msgNonce {
msgNonce = *dbNonce + 1
}
msg.Nonce = msgNonce
// sign message
sigMsg, err = s.signer.WalletSignMessage(ctx, msg.From, msg)
if err != nil {
return false, xerrors.Errorf("signing message: %w", err)
}
data, err := sigMsg.Serialize()
if err != nil {
return false, xerrors.Errorf("serializing message: %w", err)
}
jsonBytes, err := sigMsg.MarshalJSON()
if err != nil {
return false, xerrors.Errorf("marshaling message: %w", err)
}
// write to db
c, err := tx.Exec(`insert into message_sends (from_key, nonce, to_addr, signed_data, signed_json, signed_cid, send_reason) values ($1, $2, $3, $4, $5, $6, $7)`,
fromA.String(), msg.Nonce, msg.To.String(), data, string(jsonBytes), sigMsg.Cid().String(), reason)
if err != nil { if err != nil {
return false, xerrors.Errorf("inserting message into db: %w", err) return false, xerrors.Errorf("inserting message into db: %w", err)
} }
if c != 1 {
return false, xerrors.Errorf("inserting message into db: expected 1 row to be affected, got %d", c)
}
// commit
return true, nil return true, nil
}) })
if err != nil || !c {
return cid.Undef, xerrors.Errorf("transaction failed or didn't commit: %w", err) // wait for exec
var (
pollInterval = 50 * time.Millisecond
pollIntervalMul = 2
maxPollInterval = 5 * time.Second
pollLoops = 0
sigCid cid.Cid
sendErr error
)
for {
var err error
var sigCidStr, sendError string
var sendSuccess *bool
err = s.db.QueryRow(ctx, `select signed_cid, send_success, send_error from message_sends where send_task_id = $1`, taskAdder).Scan(&sigCidStr, &sendSuccess, &sendError)
if err != nil {
return cid.Undef, xerrors.Errorf("getting cid for task: %w", err)
}
if sendSuccess == nil {
time.Sleep(pollInterval)
pollLoops++
pollInterval *= time.Duration(pollIntervalMul)
if pollInterval > maxPollInterval {
pollInterval = maxPollInterval
}
continue
}
if !*sendSuccess {
sendErr = xerrors.Errorf("send error: %s", sendError)
} else {
sigCid, err = cid.Parse(sigCidStr)
if err != nil {
return cid.Undef, xerrors.Errorf("parsing signed cid: %w", err)
}
}
break
} }
// push to mpool log.Infow("sent message", "cid", sigCid, "task_id", taskAdder, "send_error", sendErr, "poll_loops", pollLoops)
_, err = s.api.MpoolPush(ctx, sigMsg)
if err != nil {
// TODO: We may get nonce gaps here..
return cid.Undef, xerrors.Errorf("mpool push: failed to push message: %w", err) return sigCid, sendErr
}
// update db recocd to say it was pushed (set send_success to true)
cn, err := s.db.Exec(ctx, `update message_sends set send_success = true where from_key = $1 and nonce = $2`, fromA.String(), msg.Nonce)
if err != nil {
return cid.Undef, xerrors.Errorf("updating db record: %w", err)
}
if cn != 1 {
return cid.Undef, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", cn)
}
log.Infow("sent message", "cid", sigMsg.Cid(), "from", fromA, "to", msg.To, "nonce", msg.Nonce, "value", msg.Value, "gaslimit", msg.GasLimit)
return sigMsg.Cid(), nil
} }