lpwindow wip send; minimal lpmessage send
This commit is contained in:
parent
6bda3342df
commit
ebec992ba8
22
lib/harmony/harmonydb/sql/20231103.sql
Normal file
22
lib/harmony/harmonydb/sql/20231103.sql
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
create table message_sends
|
||||||
|
(
|
||||||
|
from_key text not null,
|
||||||
|
nonce bigint not null,
|
||||||
|
to_addr text not null,
|
||||||
|
signed_data bytea not null,
|
||||||
|
signed_json jsonb not null,
|
||||||
|
signed_cid text not null,
|
||||||
|
send_time date default CURRENT_TIMESTAMP,
|
||||||
|
send_reason text,
|
||||||
|
send_success bool default false not null,
|
||||||
|
constraint message_sends_pk
|
||||||
|
primary key (from_key, nonce)
|
||||||
|
);
|
||||||
|
|
||||||
|
comment on column message_sends.from_key is 'text f[1/3/4]... address';
|
||||||
|
comment on column message_sends.nonce is 'assigned message nonce';
|
||||||
|
comment on column message_sends.to_addr is 'text f[0/1/2/3/4]... address';
|
||||||
|
comment on column message_sends.signed_data is 'signed message data';
|
||||||
|
comment on column message_sends.signed_cid is 'signed message cid';
|
||||||
|
comment on column message_sends.send_reason is 'optional description of send reason';
|
||||||
|
comment on column message_sends.send_success is 'whether this message was broadcasted to the network already';
|
165
provider/lpmessage/sender.go
Normal file
165
provider/lpmessage/sender.go
Normal file
@ -0,0 +1,165 @@
|
|||||||
|
package lpmessage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/go-state-types/big"
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SenderAPI interface {
|
||||||
|
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
|
||||||
|
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
|
||||||
|
WalletBalance(ctx context.Context, addr address.Address) (big.Int, error)
|
||||||
|
MpoolGetNonce(context.Context, address.Address) (uint64, error)
|
||||||
|
MpoolSend(context.Context, *types.SignedMessage) (cid.Cid, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type SignerAPI interface {
|
||||||
|
WalletSignMessage(context.Context, address.Address, *types.Message) (*types.SignedMessage, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sender abstracts away highly-available message sending with coordination through
|
||||||
|
// HarmonyDB. It make sure that nonces are assigned transactionally, and that
|
||||||
|
// messages are correctly broadcasted to the network. It is not a Task in the sense
|
||||||
|
// of a HarmonyTask interface, just a helper for tasks which need to send messages
|
||||||
|
// to the network.
|
||||||
|
type Sender struct {
|
||||||
|
api SenderAPI
|
||||||
|
signer SignerAPI
|
||||||
|
|
||||||
|
db *harmonydb.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSender creates a new Sender.
|
||||||
|
func NewSender(api SenderAPI, signer SignerAPI, db *harmonydb.DB) *Sender {
|
||||||
|
return &Sender{
|
||||||
|
api: api,
|
||||||
|
signer: signer,
|
||||||
|
|
||||||
|
db: db,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send atomically assigns a nonce, signs, and pushes a message
|
||||||
|
// to mempool.
|
||||||
|
// maxFee is only used when GasFeeCap/GasPremium fields aren't specified
|
||||||
|
//
|
||||||
|
// When maxFee is set to 0, Send will guess appropriate fee
|
||||||
|
// based on current chain conditions
|
||||||
|
//
|
||||||
|
// Send behaves much like fullnodeApi.MpoolPushMessage, but it coordinates
|
||||||
|
// through HarmonyDB, making it safe to broadcast messages from multiple independent
|
||||||
|
// API nodes
|
||||||
|
//
|
||||||
|
// Send is also currently more strict about required parameters than MpoolPushMessage
|
||||||
|
func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageSendSpec, reason string) (cid.Cid, error) {
|
||||||
|
if mss == nil {
|
||||||
|
return cid.Undef, xerrors.Errorf("MessageSendSpec cannot be nil")
|
||||||
|
}
|
||||||
|
if (mss.MsgUuid != uuid.UUID{}) {
|
||||||
|
return cid.Undef, xerrors.Errorf("MessageSendSpec.MsgUuid must be zero")
|
||||||
|
}
|
||||||
|
|
||||||
|
fromA, err := s.api.StateAccountKey(ctx, msg.From, types.EmptyTSK)
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, xerrors.Errorf("getting key address: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if msg.Nonce != 0 {
|
||||||
|
return cid.Undef, xerrors.Errorf("Send expects message nonce to be 0, was %d", msg.Nonce)
|
||||||
|
}
|
||||||
|
|
||||||
|
msg, err = s.api.GasEstimateMessageGas(ctx, msg, mss, types.EmptyTSK)
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, xerrors.Errorf("GasEstimateMessageGas error: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := s.api.WalletBalance(ctx, msg.From)
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, xerrors.Errorf("mpool push: getting origin balance: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
requiredFunds := big.Add(msg.Value, msg.RequiredFunds())
|
||||||
|
if b.LessThan(requiredFunds) {
|
||||||
|
return cid.Undef, xerrors.Errorf("mpool push: not enough funds: %s < %s", b, requiredFunds)
|
||||||
|
}
|
||||||
|
|
||||||
|
var sigMsg *types.SignedMessage
|
||||||
|
|
||||||
|
// start db tx
|
||||||
|
c, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||||
|
// assign nonce (max(api.MpoolGetNonce, db nonce+1))
|
||||||
|
msgNonce, err := s.api.MpoolGetNonce(ctx, fromA)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("getting nonce from mpool: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// get nonce from db
|
||||||
|
var dbNonce uint64
|
||||||
|
r := tx.QueryRow(`select max(nonce) from message_sends where from_key = $1`, fromA.String())
|
||||||
|
if err := r.Scan(&dbNonce); err != nil {
|
||||||
|
return false, xerrors.Errorf("getting nonce from db: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if dbNonce+1 > msgNonce {
|
||||||
|
msgNonce = dbNonce + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
msg.Nonce = msgNonce
|
||||||
|
|
||||||
|
// sign message
|
||||||
|
sigMsg, err = s.signer.WalletSignMessage(ctx, msg.From, msg)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("signing message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := sigMsg.Serialize()
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("serializing message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonBytes, err := sigMsg.MarshalJSON()
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("marshaling message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// write to db
|
||||||
|
c, err := tx.Exec(`insert into message_sends (from_key, nonce, to_addr, signed_data, signed_json, signed_cid, send_reason) values ($1, $2, $3, $4, $5, $6, $7)`,
|
||||||
|
fromA.String(), msg.Nonce, msg.To.String(), data, string(jsonBytes), sigMsg.Cid().String(), reason)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("inserting message into db: %w", err)
|
||||||
|
}
|
||||||
|
if c != 1 {
|
||||||
|
return false, xerrors.Errorf("inserting message into db: expected 1 row to be affected, got %d", c)
|
||||||
|
}
|
||||||
|
|
||||||
|
// commit
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
if err != nil || !c {
|
||||||
|
return cid.Undef, xerrors.Errorf("transaction failed or didn't commit: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// push to mpool
|
||||||
|
_, err = s.api.MpoolSend(ctx, sigMsg)
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, xerrors.Errorf("mpool push: failed to push message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// update db recocd to say it was pushed (set send_success to true)
|
||||||
|
cn, err := s.db.Exec(ctx, `update message_sends set send_success = true where from_key = $1 and nonce = $2`, fromA.String(), msg.Nonce)
|
||||||
|
if err != nil {
|
||||||
|
return cid.Undef, xerrors.Errorf("updating db record: %w", err)
|
||||||
|
}
|
||||||
|
if cn != 1 {
|
||||||
|
return cid.Undef, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", c)
|
||||||
|
}
|
||||||
|
|
||||||
|
return cid.Undef, nil
|
||||||
|
}
|
@ -4,14 +4,12 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-bitfield"
|
"github.com/filecoin-project/go-bitfield"
|
||||||
"github.com/filecoin-project/go-state-types/crypto"
|
"github.com/filecoin-project/go-state-types/crypto"
|
||||||
"github.com/filecoin-project/go-state-types/network"
|
"github.com/filecoin-project/go-state-types/network"
|
||||||
"github.com/filecoin-project/lotus/storage/sealer"
|
"github.com/filecoin-project/lotus/storage/sealer"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
@ -83,11 +81,7 @@ type wdTaskIdentity struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
|
func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
|
||||||
|
log.Debugw("WdPostTask.Do() called with taskID: %v", taskID)
|
||||||
log.Errorw("WDPOST DO", "taskID", taskID)
|
|
||||||
|
|
||||||
time.Sleep(5 * time.Second)
|
|
||||||
log.Errorf("WdPostTask.Do() called with taskID: %v", taskID)
|
|
||||||
|
|
||||||
var spID, pps, dlIdx, partIdx uint64
|
var spID, pps, dlIdx, partIdx uint64
|
||||||
|
|
||||||
@ -200,9 +194,6 @@ func entToStr[T any](t T, i int) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
|
func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
|
||||||
|
|
||||||
log.Errorw("WDPOST CANACCEPT", "ids", ids)
|
|
||||||
|
|
||||||
// GetEpoch
|
// GetEpoch
|
||||||
ts, err := t.api.ChainHead(context.Background())
|
ts, err := t.api.ChainHead(context.Background())
|
||||||
|
|
||||||
|
40
provider/lpwindow/submit_task.go
Normal file
40
provider/lpwindow/submit_task.go
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
package lpwindow
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
|
||||||
|
"github.com/filecoin-project/lotus/lib/harmony/resources"
|
||||||
|
)
|
||||||
|
|
||||||
|
type WdPostSubmitTask struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WdPostSubmitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WdPostSubmitTask) TypeDetails() harmonytask.TaskTypeDetails {
|
||||||
|
return harmonytask.TaskTypeDetails{
|
||||||
|
Max: 128,
|
||||||
|
Name: "WdPostSubmit",
|
||||||
|
Cost: resources.Resources{
|
||||||
|
Cpu: 0,
|
||||||
|
Gpu: 0,
|
||||||
|
Ram: 0,
|
||||||
|
},
|
||||||
|
MaxFailures: 10,
|
||||||
|
Follows: nil, // ??
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WdPostSubmitTask) Adder(taskFunc harmonytask.AddTaskFunc) {
|
||||||
|
//TODO implement me
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ harmonytask.TaskInterface = &WdPostSubmitTask{}
|
Loading…
Reference in New Issue
Block a user