diff --git a/lib/harmony/harmonydb/sql/20231103.sql b/lib/harmony/harmonydb/sql/20231103.sql new file mode 100644 index 000000000..669f122f4 --- /dev/null +++ b/lib/harmony/harmonydb/sql/20231103.sql @@ -0,0 +1,22 @@ +create table message_sends +( + from_key text not null, + nonce bigint not null, + to_addr text not null, + signed_data bytea not null, + signed_json jsonb not null, + signed_cid text not null, + send_time date default CURRENT_TIMESTAMP, + send_reason text, + send_success bool default false not null, + constraint message_sends_pk + primary key (from_key, nonce) +); + +comment on column message_sends.from_key is 'text f[1/3/4]... address'; +comment on column message_sends.nonce is 'assigned message nonce'; +comment on column message_sends.to_addr is 'text f[0/1/2/3/4]... address'; +comment on column message_sends.signed_data is 'signed message data'; +comment on column message_sends.signed_cid is 'signed message cid'; +comment on column message_sends.send_reason is 'optional description of send reason'; +comment on column message_sends.send_success is 'whether this message was broadcasted to the network already'; diff --git a/provider/lpmessage/sender.go b/provider/lpmessage/sender.go new file mode 100644 index 000000000..4e9a78aae --- /dev/null +++ b/provider/lpmessage/sender.go @@ -0,0 +1,165 @@ +package lpmessage + +import ( + "context" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" +) + +type SenderAPI interface { + StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error) + GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) + WalletBalance(ctx context.Context, addr address.Address) (big.Int, error) + MpoolGetNonce(context.Context, address.Address) (uint64, error) + MpoolSend(context.Context, *types.SignedMessage) (cid.Cid, error) +} + +type SignerAPI interface { + WalletSignMessage(context.Context, address.Address, *types.Message) (*types.SignedMessage, error) +} + +// Sender abstracts away highly-available message sending with coordination through +// HarmonyDB. It make sure that nonces are assigned transactionally, and that +// messages are correctly broadcasted to the network. It is not a Task in the sense +// of a HarmonyTask interface, just a helper for tasks which need to send messages +// to the network. +type Sender struct { + api SenderAPI + signer SignerAPI + + db *harmonydb.DB +} + +// NewSender creates a new Sender. +func NewSender(api SenderAPI, signer SignerAPI, db *harmonydb.DB) *Sender { + return &Sender{ + api: api, + signer: signer, + + db: db, + } +} + +// Send atomically assigns a nonce, signs, and pushes a message +// to mempool. +// maxFee is only used when GasFeeCap/GasPremium fields aren't specified +// +// When maxFee is set to 0, Send will guess appropriate fee +// based on current chain conditions +// +// Send behaves much like fullnodeApi.MpoolPushMessage, but it coordinates +// through HarmonyDB, making it safe to broadcast messages from multiple independent +// API nodes +// +// Send is also currently more strict about required parameters than MpoolPushMessage +func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageSendSpec, reason string) (cid.Cid, error) { + if mss == nil { + return cid.Undef, xerrors.Errorf("MessageSendSpec cannot be nil") + } + if (mss.MsgUuid != uuid.UUID{}) { + return cid.Undef, xerrors.Errorf("MessageSendSpec.MsgUuid must be zero") + } + + fromA, err := s.api.StateAccountKey(ctx, msg.From, types.EmptyTSK) + if err != nil { + return cid.Undef, xerrors.Errorf("getting key address: %w", err) + } + + if msg.Nonce != 0 { + return cid.Undef, xerrors.Errorf("Send expects message nonce to be 0, was %d", msg.Nonce) + } + + msg, err = s.api.GasEstimateMessageGas(ctx, msg, mss, types.EmptyTSK) + if err != nil { + return cid.Undef, xerrors.Errorf("GasEstimateMessageGas error: %w", err) + } + + b, err := s.api.WalletBalance(ctx, msg.From) + if err != nil { + return cid.Undef, xerrors.Errorf("mpool push: getting origin balance: %w", err) + } + + requiredFunds := big.Add(msg.Value, msg.RequiredFunds()) + if b.LessThan(requiredFunds) { + return cid.Undef, xerrors.Errorf("mpool push: not enough funds: %s < %s", b, requiredFunds) + } + + var sigMsg *types.SignedMessage + + // start db tx + c, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + // assign nonce (max(api.MpoolGetNonce, db nonce+1)) + msgNonce, err := s.api.MpoolGetNonce(ctx, fromA) + if err != nil { + return false, xerrors.Errorf("getting nonce from mpool: %w", err) + } + + // get nonce from db + var dbNonce uint64 + r := tx.QueryRow(`select max(nonce) from message_sends where from_key = $1`, fromA.String()) + if err := r.Scan(&dbNonce); err != nil { + return false, xerrors.Errorf("getting nonce from db: %w", err) + } + + if dbNonce+1 > msgNonce { + msgNonce = dbNonce + 1 + } + + msg.Nonce = msgNonce + + // sign message + sigMsg, err = s.signer.WalletSignMessage(ctx, msg.From, msg) + if err != nil { + return false, xerrors.Errorf("signing message: %w", err) + } + + data, err := sigMsg.Serialize() + if err != nil { + return false, xerrors.Errorf("serializing message: %w", err) + } + + jsonBytes, err := sigMsg.MarshalJSON() + if err != nil { + return false, xerrors.Errorf("marshaling message: %w", err) + } + + // write to db + c, err := tx.Exec(`insert into message_sends (from_key, nonce, to_addr, signed_data, signed_json, signed_cid, send_reason) values ($1, $2, $3, $4, $5, $6, $7)`, + fromA.String(), msg.Nonce, msg.To.String(), data, string(jsonBytes), sigMsg.Cid().String(), reason) + if err != nil { + return false, xerrors.Errorf("inserting message into db: %w", err) + } + if c != 1 { + return false, xerrors.Errorf("inserting message into db: expected 1 row to be affected, got %d", c) + } + + // commit + return true, nil + }) + if err != nil || !c { + return cid.Undef, xerrors.Errorf("transaction failed or didn't commit: %w", err) + } + + // push to mpool + _, err = s.api.MpoolSend(ctx, sigMsg) + if err != nil { + return cid.Undef, xerrors.Errorf("mpool push: failed to push message: %w", err) + } + + // update db recocd to say it was pushed (set send_success to true) + cn, err := s.db.Exec(ctx, `update message_sends set send_success = true where from_key = $1 and nonce = $2`, fromA.String(), msg.Nonce) + if err != nil { + return cid.Undef, xerrors.Errorf("updating db record: %w", err) + } + if cn != 1 { + return cid.Undef, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", c) + } + + return cid.Undef, nil +} diff --git a/provider/lpwindow/compute_task.go b/provider/lpwindow/compute_task.go index e283dbd18..0e59d5d2b 100644 --- a/provider/lpwindow/compute_task.go +++ b/provider/lpwindow/compute_task.go @@ -4,14 +4,12 @@ import ( "bytes" "context" "fmt" - "sort" - "strings" - "time" - "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/lotus/storage/sealer" + "sort" + "strings" logging "github.com/ipfs/go-log/v2" "github.com/samber/lo" @@ -83,11 +81,7 @@ type wdTaskIdentity struct { } func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { - - log.Errorw("WDPOST DO", "taskID", taskID) - - time.Sleep(5 * time.Second) - log.Errorf("WdPostTask.Do() called with taskID: %v", taskID) + log.Debugw("WdPostTask.Do() called with taskID: %v", taskID) var spID, pps, dlIdx, partIdx uint64 @@ -200,9 +194,6 @@ func entToStr[T any](t T, i int) string { } func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { - - log.Errorw("WDPOST CANACCEPT", "ids", ids) - // GetEpoch ts, err := t.api.ChainHead(context.Background()) diff --git a/provider/lpwindow/submit_task.go b/provider/lpwindow/submit_task.go new file mode 100644 index 000000000..de0ef94c9 --- /dev/null +++ b/provider/lpwindow/submit_task.go @@ -0,0 +1,40 @@ +package lpwindow + +import ( + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/lib/harmony/resources" +) + +type WdPostSubmitTask struct { +} + +func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { + //TODO implement me + panic("implement me") +} + +func (w *WdPostSubmitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { + //TODO implement me + panic("implement me") +} + +func (w *WdPostSubmitTask) TypeDetails() harmonytask.TaskTypeDetails { + return harmonytask.TaskTypeDetails{ + Max: 128, + Name: "WdPostSubmit", + Cost: resources.Resources{ + Cpu: 0, + Gpu: 0, + Ram: 0, + }, + MaxFailures: 10, + Follows: nil, // ?? + } +} + +func (w *WdPostSubmitTask) Adder(taskFunc harmonytask.AddTaskFunc) { + //TODO implement me + panic("implement me") +} + +var _ harmonytask.TaskInterface = &WdPostSubmitTask{}