diff --git a/provider/lpmessage/watch.go b/provider/lpmessage/watch.go index 3f18e5add..a97c76e48 100644 --- a/provider/lpmessage/watch.go +++ b/provider/lpmessage/watch.go @@ -2,13 +2,19 @@ package lpmessage import ( "context" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/provider/chainsched" + "github.com/ipfs/go-cid" "sync/atomic" ) +const MinConfidence = 6 + /* create table message_waits ( @@ -51,9 +57,17 @@ create table message_sends ); */ +type MessageWaiterApi interface { + StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) + ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) + ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error) + StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) +} + type MessageWaiter struct { - db *harmonydb.DB - ht *harmonytask.TaskEngine + db *harmonydb.DB + ht *harmonytask.TaskEngine + api MessageWaiterApi stopping, stopped chan struct{} @@ -65,6 +79,7 @@ func NewMessageWaiter(db *harmonydb.DB, ht *harmonytask.TaskEngine, pcs *chainsc mw := &MessageWaiter{ db: db, ht: ht, + api: api, stopping: make(chan struct{}), stopped: make(chan struct{}), updateCh: make(chan struct{}), @@ -95,6 +110,19 @@ func (mw *MessageWaiter) update() { tsk := *mw.bestTs.Load() + ts, err := mw.api.ChainGetTipSet(ctx, tsk) + if err != nil { + log.Errorf("failed to get tipset: %+v", err) + return + } + + lbts, err := mw.api.ChainGetTipSetByHeight(ctx, ts.Height()-MinConfidence, tsk) + if err != nil { + log.Errorf("failed to get tipset: %+v", err) + return + } + lbtsk := lbts.Key() + machineID := mw.ht.ResourcesAvailable().MachineID // first if we see pending messages with null owner, assign them to ourselves @@ -111,16 +139,53 @@ func (mw *MessageWaiter) update() { // get messages assigned to us var msgs []struct { - Cid string - Nonce uint64 + Cid string `db:"signed_message_cid"` + From string `db:"from_key"` + Nonce uint64 `db:"nonce"` + + FromAddr address.Address `db:"-"` } // really large limit in case of things getting stuck and backlogging severely - err := mw.db.Select(ctx, &msgs, `SELECT signed_message_cid FROM message_waits WHERE owner_machine_id = $1 LIMIT 10000`, machineID) + err = mw.db.Select(ctx, &msgs, `SELECT signed_message_cid, from_key, nonce FROM message_wait + JOIN message_sends ON signed_message_cid = signed_cid + WHERE owner_machine_id = $1 LIMIT 10000`, machineID) if err != nil { log.Errorf("failed to get assigned messages: %+v", err) return } + + // get address/nonce set to check + toCheck := make(map[address.Address]uint64) + + for i := range msgs { + msgs[i].FromAddr, err = address.NewFromString(msgs[i].From) + if err != nil { + log.Errorf("failed to parse from address: %+v", err) + return + } + toCheck[msgs[i].FromAddr] = 0 + } + + // get the nonce for each address + for addr := range toCheck { + act, err := mw.api.StateGetActor(ctx, addr, lbtsk) + if err != nil { + log.Errorf("failed to get actor: %+v", err) + return + } + + toCheck[addr] = act.Nonce + } + + // check if any of the messages we have assigned to us are now on chain, and have been for MinConfidence epochs + for _, msg := range msgs { + look, err := mw.api.StateSearchMsg(ctx, lbtsk, cid.MustParse(msg.Cid), api.LookbackNoLimit, false) + if err != nil { + log.Errorf("failed to search for message: %+v", err) + return + } + } } func (mw *MessageWaiter) Stop(ctx context.Context) error {