diff --git a/provider/lpmessage/watch.go b/provider/lpmessage/watch.go index a97c76e48..5f98a9a0c 100644 --- a/provider/lpmessage/watch.go +++ b/provider/lpmessage/watch.go @@ -2,6 +2,7 @@ package lpmessage import ( "context" + "encoding/json" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" @@ -15,53 +16,12 @@ import ( const MinConfidence = 6 -/* -create table message_waits ( - - signed_message_cid text primary key references message_sends (signed_cid), - waiter_machine_id int references harmony_machines (id) on delete set null, - - executed_tsk_cid text, - executed_tsk_epoch bigint, - executed_msg_cid text, - executed_msg_data jsonb, - - executed_rcpt_exitcode bigint, - executed_rcpt_return bytea, - executed_rcpt_gas_used bigint - -) - -create table message_sends -( - - from_key text not null, - to_addr text not null, - send_reason text not null, - send_task_id bigint not null, - - unsigned_data bytea not null, - unsigned_cid text not null, - - nonce bigint, - signed_data bytea, - signed_json jsonb, - signed_cid text, - - send_time timestamp default null, - send_success boolean default null, - send_error text, - - constraint message_sends_pk - primary key (send_task_id, from_key) - -); -*/ type MessageWaiterApi interface { StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error) StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) + ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) } type MessageWaiter struct { @@ -75,7 +35,7 @@ type MessageWaiter struct { bestTs atomic.Pointer[types.TipSetKey] } -func NewMessageWaiter(db *harmonydb.DB, ht *harmonytask.TaskEngine, pcs *chainsched.ProviderChainSched) (*MessageWaiter, error) { +func NewMessageWaiter(db *harmonydb.DB, ht *harmonytask.TaskEngine, pcs *chainsched.ProviderChainSched, api MessageWaiterApi) (*MessageWaiter, error) { mw := &MessageWaiter{ db: db, ht: ht, @@ -127,7 +87,7 @@ func (mw *MessageWaiter) update() { // first if we see pending messages with null owner, assign them to ourselves { - n, err := mw.db.Exec(ctx, `UPDATE message_waits SET owner_machine_id = $1 WHERE owner_machine_id IS NULL AND executed_tsk_cid IS NULL`, machineID) + n, err := mw.db.Exec(ctx, `UPDATE message_waits SET waiter_machine_id = $1 WHERE waiter_machine_id IS NULL AND executed_tsk_cid IS NULL`, machineID) if err != nil { log.Errorf("failed to assign pending messages: %+v", err) return @@ -149,7 +109,7 @@ func (mw *MessageWaiter) update() { // really large limit in case of things getting stuck and backlogging severely err = mw.db.Select(ctx, &msgs, `SELECT signed_message_cid, from_key, nonce FROM message_wait JOIN message_sends ON signed_message_cid = signed_cid - WHERE owner_machine_id = $1 LIMIT 10000`, machineID) + WHERE waiter_machine_id = $1 LIMIT 10000`, machineID) if err != nil { log.Errorf("failed to get assigned messages: %+v", err) return @@ -180,11 +140,52 @@ func (mw *MessageWaiter) update() { // check if any of the messages we have assigned to us are now on chain, and have been for MinConfidence epochs for _, msg := range msgs { + if msg.Nonce > toCheck[msg.FromAddr] { + continue // definitely not on chain yet + } + look, err := mw.api.StateSearchMsg(ctx, lbtsk, cid.MustParse(msg.Cid), api.LookbackNoLimit, false) if err != nil { log.Errorf("failed to search for message: %+v", err) return } + + if look == nil { + continue // not on chain yet (or not executed yet) + } + + tskCid, err := look.TipSet.Cid() + if err != nil { + log.Errorf("failed to get tipset cid: %+v", err) + return + } + + emsg, err := mw.api.ChainGetMessage(ctx, look.Message) + if err != nil { + log.Errorf("failed to get message: %+v", err) + return + } + + execMsg, err := json.Marshal(emsg) + if err != nil { + log.Errorf("failed to marshal message: %+v", err) + return + } + + // record in db + _, err = mw.db.Exec(ctx, `UPDATE message_waits SET + waiter_machine_id = NULL, + executed_tsk_cid = $1, executed_tsk_epoch = $2, + executed_msg_cid = $3, executed_msg_data = $4, + executed_rcpt_exitcode = $5, executed_rcpt_return = $6, executed_rcpt_gas_used = $7 + WHERE signed_message_cid = $8`, tskCid, look.Height, + look.Message, execMsg, + look.Receipt.ExitCode, look.Receipt.Return, look.Receipt.GasUsed, + msg.Cid) + if err != nil { + log.Errorf("failed to update message wait: %+v", err) + return + } } }