From 14c4a8bee6c0c8dc5b346891ccba3678a05e19ac Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Tue, 8 Oct 2019 14:51:34 +0900 Subject: [PATCH] implement looking in the past for messages --- api/api.go | 2 +- api/struct.go | 10 +-- chain/deals/handler_states.go | 2 +- chain/events/tscache.go | 1 + chain/stmgr/stmgr.go | 149 ++++++++++++++++++++++++++++++++ chain/store/store.go | 74 ++-------------- cli/createminer.go | 2 +- cli/paych.go | 2 +- cmd/lotus-storage-miner/init.go | 5 +- node/impl/full/chain.go | 14 --- node/impl/full/state.go | 14 +++ paych/paych.go | 9 +- paych/simple.go | 4 +- storage/miner.go | 4 +- storage/post.go | 2 +- 15 files changed, 195 insertions(+), 99 deletions(-) diff --git a/api/api.go b/api/api.go index af42b0177..13e564818 100644 --- a/api/api.go +++ b/api/api.go @@ -51,7 +51,6 @@ type FullNode interface { ChainHead(context.Context) (*types.TipSet, error) // TODO: check serialization ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error // TODO: check serialization ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) - ChainWaitMsg(context.Context, cid.Cid) (*MsgWait, error) ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error) ChainGetBlockMessages(context.Context, cid.Cid) (*BlockMessages, error) ChainGetParentReceipts(context.Context, cid.Cid) ([]*types.MessageReceipt, error) @@ -124,6 +123,7 @@ type FullNode interface { StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) StatePledgeCollateral(context.Context, *types.TipSet) (types.BigInt, error) + StateWaitMsg(context.Context, cid.Cid) (*MsgWait, error) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) PaychList(context.Context) ([]address.Address, error) diff --git a/api/struct.go b/api/struct.go index 49cd0b222..ed2e8e933 100644 --- a/api/struct.go +++ b/api/struct.go @@ -41,7 +41,6 @@ type FullNodeStruct struct { ChainSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"` ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"` ChainGetRandomness func(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) `perm:"read"` - ChainWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"` ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"` ChainGetBlockMessages func(context.Context, cid.Cid) (*BlockMessages, error) `perm:"read"` ChainGetParentReceipts func(context.Context, cid.Cid) ([]*types.MessageReceipt, error) `perm:"read"` @@ -92,6 +91,7 @@ type FullNodeStruct struct { StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"` StateReadState func(context.Context, *types.Actor, *types.TipSet) (*ActorState, error) `perm:"read"` StatePledgeCollateral func(context.Context, *types.TipSet) (types.BigInt, error) `perm:"read"` + StateWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"` PaychGet func(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) `perm:"sign"` PaychList func(context.Context) ([]address.Address, error) `perm:"read"` @@ -235,10 +235,6 @@ func (c *FullNodeStruct) ChainGetRandomness(ctx context.Context, pts *types.TipS return c.Internal.ChainGetRandomness(ctx, pts, ticks, lb) } -func (c *FullNodeStruct) ChainWaitMsg(ctx context.Context, msgc cid.Cid) (*MsgWait, error) { - return c.Internal.ChainWaitMsg(ctx, msgc) -} - func (c *FullNodeStruct) ChainGetTipSetByHeight(ctx context.Context, h uint64, ts *types.TipSet) (*types.TipSet, error) { return c.Internal.ChainGetTipSetByHeight(ctx, h, ts) } @@ -354,6 +350,10 @@ func (c *FullNodeStruct) StatePledgeCollateral(ctx context.Context, ts *types.Ti return c.Internal.StatePledgeCollateral(ctx, ts) } +func (c *FullNodeStruct) StateWaitMsg(ctx context.Context, msgc cid.Cid) (*MsgWait, error) { + return c.Internal.StateWaitMsg(ctx, msgc) +} + func (c *FullNodeStruct) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) { return c.Internal.PaychGet(ctx, from, to, ensureFunds) } diff --git a/chain/deals/handler_states.go b/chain/deals/handler_states.go index 4ab32b146..573b8d202 100644 --- a/chain/deals/handler_states.go +++ b/chain/deals/handler_states.go @@ -153,7 +153,7 @@ func (h *Handler) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal), if deal.Proposal.Payment.ChannelMessage != nil { log.Info("waiting for channel message to appear on chain") - if _, err := h.full.ChainWaitMsg(ctx, *deal.Proposal.Payment.ChannelMessage); err != nil { + if _, err := h.full.StateWaitMsg(ctx, *deal.Proposal.Payment.ChannelMessage); err != nil { return nil, xerrors.Errorf("waiting for paych message: %w", err) } } diff --git a/chain/events/tscache.go b/chain/events/tscache.go index 871113fc8..7279d4ab8 100644 --- a/chain/events/tscache.go +++ b/chain/events/tscache.go @@ -2,6 +2,7 @@ package events import ( "context" + "golang.org/x/xerrors" "github.com/filecoin-project/go-lotus/chain/types" diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 59f71413a..6ef9838a8 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -2,6 +2,7 @@ package stmgr import ( "context" + "fmt" "sync" amt "github.com/filecoin-project/go-amt-ipld" @@ -274,3 +275,151 @@ func (sm *StateManager) GetBlsPublicKey(ctx context.Context, addr address.Addres copy(pubk[:], kaddr.Payload()) return pubk, nil } + +func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid) (*types.TipSet, *types.MessageReceipt, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + msg, err := sm.cs.GetCMessage(mcid) + if err != nil { + return nil, nil, fmt.Errorf("failed to load message: %w", err) + } + + tsub := sm.cs.SubHeadChanges(ctx) + + head, ok := <-tsub + if !ok { + return nil, nil, fmt.Errorf("SubHeadChanges stream was invalid") + } + + if len(head) != 1 { + return nil, nil, fmt.Errorf("SubHeadChanges first entry should have been one item") + } + + if head[0].Type != store.HCCurrent { + return nil, nil, fmt.Errorf("expected current head on SHC stream (got %s)", head[0].Type) + } + + r, err := sm.tipsetExecutedMessage(head[0].Val, mcid) + if err != nil { + return nil, nil, err + } + + if r != nil { + return head[0].Val, r, nil + } + + var backTs *types.TipSet + var backRcp *types.MessageReceipt + backSearchWait := make(chan struct{}) + go func() { + fts, r, err := sm.searchBackForMsg(ctx, head[0].Val, msg) + if err != nil { + log.Warnf("failed to look back through chain for message: %w", err) + return + } + + backTs = fts + backRcp = r + close(backSearchWait) + }() + + for { + select { + case notif, ok := <-tsub: + if !ok { + return nil, nil, ctx.Err() + } + for _, val := range notif { + switch val.Type { + case store.HCRevert: + continue + case store.HCApply: + r, err := sm.tipsetExecutedMessage(val.Val, mcid) + if err != nil { + return nil, nil, err + } + if r != nil { + return val.Val, r, nil + } + } + } + case <-backSearchWait: + if backTs != nil { + return backTs, backRcp, nil + } + backSearchWait = nil + case <-ctx.Done(): + return nil, nil, ctx.Err() + } + } +} + +func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet, m store.ChainMsg) (*types.TipSet, *types.MessageReceipt, error) { + + cur := from + for { + select { + case <-ctx.Done(): + return nil, nil, nil + default: + } + + act, err := sm.GetActor(m.VMMessage().From, cur) + if err != nil { + return nil, nil, err + } + + if act.Nonce < m.VMMessage().Nonce { + // nonce on chain is before message nonce we're looking for, its + // not going to be here + return nil, nil, nil + } + + ts, err := sm.cs.LoadTipSet(cur.Parents()) + if err != nil { + return nil, nil, fmt.Errorf("failed to load tipset during msg wait searchback: %w", err) + } + + r, err := sm.tipsetExecutedMessage(ts, m.Cid()) + if err != nil { + return nil, nil, fmt.Errorf("checking for message execution during lookback: %w", err) + } + + if r != nil { + return ts, r, nil + } + + if ts.Height() == 0 { + // it ain't here! + return nil, nil, nil + } + + cur = ts + } +} + +func (sm *StateManager) tipsetExecutedMessage(ts *types.TipSet, msg cid.Cid) (*types.MessageReceipt, error) { + // The genesis block did not execute any messages + if ts.Height() == 0 { + return nil, nil + } + + pts, err := sm.cs.LoadTipSet(ts.Parents()) + if err != nil { + return nil, err + } + + cm, err := sm.cs.MessagesForTipset(pts) + if err != nil { + return nil, err + } + + for i, m := range cm { + if m.Cid() == msg { + return sm.cs.GetParentReceipt(ts.Blocks()[0], i) + } + } + + return nil, nil +} diff --git a/chain/store/store.go b/chain/store/store.go index 6d74bedc0..033f763a6 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -463,6 +463,15 @@ func (cs *ChainStore) GetGenesis() (*types.BlockHeader, error) { return types.DecodeBlock(genb.RawData()) } +func (cs *ChainStore) GetCMessage(c cid.Cid) (ChainMsg, error) { + m, err := cs.GetMessage(c) + if err == nil { + return m, nil + } + + return cs.GetSignedMessage(c) +} + func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) { sb, err := cs.bs.Get(c) if err != nil { @@ -642,71 +651,6 @@ func (cs *ChainStore) LoadSignedMessagesFromCids(cids []cid.Cid) ([]*types.Signe return msgs, nil } -func (cs *ChainStore) WaitForMessage(ctx context.Context, mcid cid.Cid) (*types.TipSet, *types.MessageReceipt, error) { - tsub := cs.SubHeadChanges(ctx) - - head := cs.GetHeaviestTipSet() - - r, err := cs.tipsetExecutedMessage(head, mcid) - if err != nil { - return nil, nil, err - } - - if r != nil { - return head, r, nil - } - - for { - select { - case notif, ok := <-tsub: - if !ok { - return nil, nil, ctx.Err() - } - for _, val := range notif { - switch val.Type { - case HCRevert: - continue - case HCApply: - r, err := cs.tipsetExecutedMessage(val.Val, mcid) - if err != nil { - return nil, nil, err - } - if r != nil { - return val.Val, r, nil - } - } - } - case <-ctx.Done(): - return nil, nil, ctx.Err() - } - } -} - -func (cs *ChainStore) tipsetExecutedMessage(ts *types.TipSet, msg cid.Cid) (*types.MessageReceipt, error) { - // The genesis block did not execute any messages - if ts.Height() == 0 { - return nil, nil - } - - pts, err := cs.LoadTipSet(ts.Parents()) - if err != nil { - return nil, err - } - - cm, err := cs.MessagesForTipset(pts) - if err != nil { - return nil, err - } - - for i, m := range cm { - if m.Cid() == msg { - return cs.GetParentReceipt(ts.Blocks()[0], i) - } - } - - return nil, nil -} - func (cs *ChainStore) Blockstore() blockstore.Blockstore { return cs.bs } diff --git a/cli/createminer.go b/cli/createminer.go index 49afaa2f8..c1c7cdffe 100644 --- a/cli/createminer.go +++ b/cli/createminer.go @@ -83,7 +83,7 @@ var createMinerCmd = &cli.Command{ return xerrors.Errorf("failed to push signed message: %w", err) } - mwait, err := api.ChainWaitMsg(ctx, smsg.Cid()) + mwait, err := api.StateWaitMsg(ctx, smsg.Cid()) if err != nil { return xerrors.Errorf("failed waiting for message inclusion: %w", err) } diff --git a/cli/paych.go b/cli/paych.go index 1b11d809c..834c35cc0 100644 --- a/cli/paych.go +++ b/cli/paych.go @@ -350,7 +350,7 @@ var paychVoucherSubmitCmd = &cli.Command{ return err } - mwait, err := api.ChainWaitMsg(ctx, mcid) + mwait, err := api.StateWaitMsg(ctx, mcid) if err != nil { return err } diff --git a/cmd/lotus-storage-miner/init.go b/cmd/lotus-storage-miner/init.go index 28f9d9bd7..7f25309be 100644 --- a/cmd/lotus-storage-miner/init.go +++ b/cmd/lotus-storage-miner/init.go @@ -3,6 +3,7 @@ package main import ( "context" "crypto/rand" + "github.com/libp2p/go-libp2p-core/crypto" "github.com/ipfs/go-datastore" @@ -232,7 +233,7 @@ func configureStorageMiner(ctx context.Context, api api.FullNode, addr address.A } log.Info("Waiting for message: ", smsg.Cid()) - ret, err := api.ChainWaitMsg(ctx, smsg.Cid()) + ret, err := api.StateWaitMsg(ctx, smsg.Cid()) if err != nil { return err } @@ -303,7 +304,7 @@ func createStorageMiner(ctx context.Context, api api.FullNode, peerid peer.ID, c log.Infof("Pushed StorageMarket.CreateStorageMiner, %s to Mpool", signed.Cid()) log.Infof("Waiting for confirmation") - mw, err := api.ChainWaitMsg(ctx, signed.Cid()) + mw, err := api.StateWaitMsg(ctx, signed.Cid()) if err != nil { return address.Undef, err } diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index 1641bfd29..927053b8e 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -46,20 +46,6 @@ func (a *ChainAPI) ChainGetRandomness(ctx context.Context, pts *types.TipSet, ti return a.Chain.GetRandomness(ctx, pts.Cids(), tickets, int64(lb)) } -func (a *ChainAPI) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) { - // TODO: consider using event system for this, expose confidence - - ts, recpt, err := a.Chain.WaitForMessage(ctx, msg) - if err != nil { - return nil, err - } - - return &api.MsgWait{ - Receipt: *recpt, - TipSet: ts, - }, nil -} - func (a *ChainAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*types.BlockHeader, error) { return a.Chain.GetBlock(msg) } diff --git a/node/impl/full/state.go b/node/impl/full/state.go index 13dbf5cb7..efa6eeebd 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -193,3 +193,17 @@ func (a *StateAPI) MinerCreateBlock(ctx context.Context, addr address.Address, p return &out, nil } + +func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) { + // TODO: consider using event system for this, expose confidence + + ts, recpt, err := a.StateManager.WaitForMessage(ctx, msg) + if err != nil { + return nil, err + } + + return &api.MsgWait{ + Receipt: *recpt, + TipSet: ts, + }, nil +} diff --git a/paych/paych.go b/paych/paych.go index 11e374888..60b1e0eac 100644 --- a/paych/paych.go +++ b/paych/paych.go @@ -4,10 +4,11 @@ import ( "bytes" "context" "fmt" - "golang.org/x/xerrors" "math" "strconv" + "golang.org/x/xerrors" + logging "github.com/ipfs/go-log" "go.uber.org/fx" @@ -25,7 +26,7 @@ type ManagerApi struct { full.MpoolAPI full.WalletAPI - full.ChainAPI + full.StateAPI } type Manager struct { @@ -34,7 +35,7 @@ type Manager struct { mpool full.MpoolAPI wallet full.WalletAPI - chain full.ChainAPI + state full.StateAPI } func NewManager(sm *stmgr.StateManager, pchstore *Store, api ManagerApi) *Manager { @@ -44,7 +45,7 @@ func NewManager(sm *stmgr.StateManager, pchstore *Store, api ManagerApi) *Manage mpool: api.MpoolAPI, wallet: api.WalletAPI, - chain: api.ChainAPI, + state: api.StateAPI, } } diff --git a/paych/simple.go b/paych/simple.go index 018604b6c..0e057a535 100644 --- a/paych/simple.go +++ b/paych/simple.go @@ -44,7 +44,7 @@ func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, am // TODO: wait outside the store lock! // (tricky because we need to setup channel tracking before we know it's address) - mwait, err := pm.chain.ChainWaitMsg(ctx, mcid) + mwait, err := pm.state.StateWaitMsg(ctx, mcid) if err != nil { return address.Undef, cid.Undef, err } @@ -85,7 +85,7 @@ func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from addres return err } - mwait, err := pm.chain.ChainWaitMsg(ctx, smsg.Cid()) // TODO: wait outside the store lock! + mwait, err := pm.state.StateWaitMsg(ctx, smsg.Cid()) // TODO: wait outside the store lock! if err != nil { return err } diff --git a/storage/miner.go b/storage/miner.go index 1e9a92928..3ade17757 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -54,11 +54,11 @@ type storageMinerApi interface { StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error) StateMinerProvingPeriodEnd(context.Context, address.Address, *types.TipSet) (uint64, error) StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.SectorInfo, error) + StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) ChainHead(context.Context) (*types.TipSet, error) - ChainWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) ChainNotify(context.Context) (<-chan []*store.HeadChange, error) ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) @@ -161,7 +161,7 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal } go func() { - _, err := m.api.ChainWaitMsg(ctx, smsg.Cid()) + _, err := m.api.StateWaitMsg(ctx, smsg.Cid()) if err != nil { return } diff --git a/storage/post.go b/storage/post.go index 8f404668f..8f4e9686c 100644 --- a/storage/post.go +++ b/storage/post.go @@ -149,7 +149,7 @@ func (m *Miner) computePost(ppe uint64) func(ts *types.TipSet, curH uint64) erro log.Infof("Waiting for post %s to appear on chain", smsg.Cid()) // make sure it succeeds... - rec, err := m.api.ChainWaitMsg(ctx, smsg.Cid()) + rec, err := m.api.StateWaitMsg(ctx, smsg.Cid()) if err != nil { return err }