lotus/provider/lpmessage/sender.go

212 lines
6.7 KiB
Go
Raw Normal View History

package lpmessage
import (
"context"
2023-11-22 04:10:25 +00:00
"encoding/json"
2023-11-04 10:04:46 +00:00
"github.com/google/uuid"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/big"
2023-11-04 10:04:46 +00:00
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
)
2023-11-03 21:40:28 +00:00
var log = logging.Logger("lpmessage")
2023-11-21 00:05:59 +00:00
type str string // makes ctx value collissions impossible
2023-11-22 04:43:13 +00:00
var CtxTestTaskID str = "task_id"
2023-11-21 00:05:59 +00:00
type SenderAPI interface {
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
WalletBalance(ctx context.Context, addr address.Address) (big.Int, error)
MpoolGetNonce(context.Context, address.Address) (uint64, error)
2023-11-03 21:40:28 +00:00
MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error)
}
type SignerAPI interface {
WalletSignMessage(context.Context, address.Address, *types.Message) (*types.SignedMessage, error)
}
// Sender abstracts away highly-available message sending with coordination through
// HarmonyDB. It make sure that nonces are assigned transactionally, and that
// messages are correctly broadcasted to the network. It is not a Task in the sense
// of a HarmonyTask interface, just a helper for tasks which need to send messages
// to the network.
type Sender struct {
api SenderAPI
signer SignerAPI
2023-11-21 00:05:59 +00:00
db *harmonydb.DB
}
// NewSender creates a new Sender.
func NewSender(api SenderAPI, signer SignerAPI, db *harmonydb.DB) *Sender {
return &Sender{
api: api,
signer: signer,
2023-11-17 00:22:03 +00:00
db: db,
}
}
// Send atomically assigns a nonce, signs, and pushes a message
// to mempool.
// maxFee is only used when GasFeeCap/GasPremium fields aren't specified
//
// When maxFee is set to 0, Send will guess appropriate fee
// based on current chain conditions
//
// Send behaves much like fullnodeApi.MpoolPushMessage, but it coordinates
// through HarmonyDB, making it safe to broadcast messages from multiple independent
// API nodes
//
// Send is also currently more strict about required parameters than MpoolPushMessage
func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageSendSpec, reason string) (cid.Cid, error) {
if mss == nil {
return cid.Undef, xerrors.Errorf("MessageSendSpec cannot be nil")
}
if (mss.MsgUuid != uuid.UUID{}) {
return cid.Undef, xerrors.Errorf("MessageSendSpec.MsgUuid must be zero")
}
fromA, err := s.api.StateAccountKey(ctx, msg.From, types.EmptyTSK)
if err != nil {
return cid.Undef, xerrors.Errorf("getting key address: %w", err)
}
2023-11-04 11:32:27 +00:00
msg.From = fromA
if msg.Nonce != 0 {
return cid.Undef, xerrors.Errorf("Send expects message nonce to be 0, was %d", msg.Nonce)
}
msg, err = s.api.GasEstimateMessageGas(ctx, msg, mss, types.EmptyTSK)
if err != nil {
return cid.Undef, xerrors.Errorf("GasEstimateMessageGas error: %w", err)
}
b, err := s.api.WalletBalance(ctx, msg.From)
if err != nil {
return cid.Undef, xerrors.Errorf("mpool push: getting origin balance: %w", err)
}
requiredFunds := big.Add(msg.Value, msg.RequiredFunds())
if b.LessThan(requiredFunds) {
return cid.Undef, xerrors.Errorf("mpool push: not enough funds: %s < %s", b, requiredFunds)
}
var sigMsg *types.SignedMessage
2023-11-21 00:05:59 +00:00
var idCount int
err = s.db.QueryRow(ctx, `SELECT COUNT(*) FROM harmony_test WHERE task_id=$1`,
2023-11-22 04:43:13 +00:00
ctx.Value(CtxTestTaskID)).Scan(&idCount)
2023-11-21 00:05:59 +00:00
if err != nil {
return cid.Undef, xerrors.Errorf("reading harmony_test: %w", err)
}
noSend := idCount == 1
// start db tx
c, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
// assign nonce (max(api.MpoolGetNonce, db nonce+1))
msgNonce, err := s.api.MpoolGetNonce(ctx, fromA)
if err != nil {
return false, xerrors.Errorf("getting nonce from mpool: %w", err)
}
// get nonce from db
2023-11-04 11:32:27 +00:00
var dbNonce *uint64
r := tx.QueryRow(`select max(nonce) from message_sends where from_key = $1`, fromA.String())
if err := r.Scan(&dbNonce); err != nil {
return false, xerrors.Errorf("getting nonce from db: %w", err)
}
2023-11-04 11:32:27 +00:00
if dbNonce != nil && *dbNonce+1 > msgNonce {
msgNonce = *dbNonce + 1
}
msg.Nonce = msgNonce
// sign message
sigMsg, err = s.signer.WalletSignMessage(ctx, msg.From, msg)
if err != nil {
return false, xerrors.Errorf("signing message: %w", err)
}
data, err := sigMsg.Serialize()
if err != nil {
return false, xerrors.Errorf("serializing message: %w", err)
}
jsonBytes, err := sigMsg.MarshalJSON()
if err != nil {
return false, xerrors.Errorf("marshaling message: %w", err)
}
2023-11-21 00:05:59 +00:00
if noSend {
2023-11-22 04:10:25 +00:00
data, err := json.MarshalIndent(map[string]any{
"from_key": fromA.String(),
"nonce": msg.Nonce,
"to_addr": msg.To.String(),
"signed_data": data,
"signed_json": string(jsonBytes),
"signed_cid": sigMsg.Cid(),
"send_reason": reason,
}, "", " ")
if err != nil {
return false, xerrors.Errorf("marshaling message: %w", err)
}
2023-11-22 04:43:13 +00:00
id := ctx.Value(CtxTestTaskID)
2023-11-22 04:10:25 +00:00
tx.Exec(`UPDATE harmony_test SET result=$1 WHERE task_id=$2`, string(data), id)
log.Infof("SKIPPED sending test message to chain. Query harmony_test WHERE task_id= %v", id)
2023-11-17 00:22:03 +00:00
return true, nil // nothing committed
}
// write to db
c, err := tx.Exec(`insert into message_sends (from_key, nonce, to_addr, signed_data, signed_json, signed_cid, send_reason) values ($1, $2, $3, $4, $5, $6, $7)`,
fromA.String(), msg.Nonce, msg.To.String(), data, string(jsonBytes), sigMsg.Cid().String(), reason)
if err != nil {
return false, xerrors.Errorf("inserting message into db: %w", err)
}
if c != 1 {
return false, xerrors.Errorf("inserting message into db: expected 1 row to be affected, got %d", c)
}
// commit
return true, nil
})
if err != nil || !c {
return cid.Undef, xerrors.Errorf("transaction failed or didn't commit: %w", err)
}
2023-11-21 00:05:59 +00:00
if noSend {
2023-11-17 00:22:03 +00:00
return sigMsg.Cid(), nil
}
// push to mpool
2023-11-03 21:40:28 +00:00
_, err = s.api.MpoolPush(ctx, sigMsg)
if err != nil {
// TODO: We may get nonce gaps here..
return cid.Undef, xerrors.Errorf("mpool push: failed to push message: %w", err)
}
// update db recocd to say it was pushed (set send_success to true)
cn, err := s.db.Exec(ctx, `update message_sends set send_success = true where from_key = $1 and nonce = $2`, fromA.String(), msg.Nonce)
if err != nil {
return cid.Undef, xerrors.Errorf("updating db record: %w", err)
}
if cn != 1 {
2023-11-15 04:58:43 +00:00
return cid.Undef, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", cn)
}
2023-11-03 21:40:28 +00:00
log.Infow("sent message", "cid", sigMsg.Cid(), "from", fromA, "to", msg.To, "nonce", msg.Nonce, "value", msg.Value, "gaslimit", msg.GasLimit)
2023-11-04 11:32:27 +00:00
return sigMsg.Cid(), nil
}