From ddf2e05dd0bf2e31ea1cce9ec69854ce053baecb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 19 Nov 2019 22:27:25 +0100 Subject: [PATCH] post: More correct 'wait' logic --- api/api_full.go | 1 + api/struct.go | 5 +++++ chain/deals/client.go | 4 ++-- chain/deals/client_states.go | 2 +- chain/events/events.go | 4 ++++ chain/events/events_called.go | 16 +++++++++++++--- chain/events/events_test.go | 16 ++++++++++++---- chain/events/utils.go | 35 +++++++++++++++++++++++++++++++++++ chain/stmgr/stmgr.go | 23 +++++++++++++++++++++++ chain/types/message.go | 4 ++++ node/impl/full/state.go | 4 ++++ storage/miner.go | 7 +++---- storage/post.go | 27 ++++++++++++++++++--------- 13 files changed, 125 insertions(+), 23 deletions(-) create mode 100644 chain/events/utils.go diff --git a/api/api_full.go b/api/api_full.go index cf67d8442..1afd74fd9 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -114,6 +114,7 @@ type FullNode interface { StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) StateLookupID(context.Context, address.Address, *types.TipSet) (address.Address, error) StateChangedActors(context.Context, cid.Cid, cid.Cid) (map[string]types.Actor, error) + StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) MarketEnsureAvailable(context.Context, address.Address, types.BigInt) error // MarketFreeBalance diff --git a/api/struct.go b/api/struct.go index 7ea1b5835..8276f91f2 100644 --- a/api/struct.go +++ b/api/struct.go @@ -108,6 +108,7 @@ type FullNodeStruct struct { StateMarketStorageDeal func(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) `perm:"read"` StateLookupID func(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) `perm:"read"` StateChangedActors func(context.Context, cid.Cid, cid.Cid) (map[string]types.Actor, error) `perm:"read"` + StateGetReceipt func(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) `perm:"read"` MarketEnsureAvailable func(context.Context, address.Address, types.BigInt) error `perm:"sign"` @@ -439,6 +440,10 @@ func (c *FullNodeStruct) StateChangedActors(ctx context.Context, olnstate cid.Ci return c.Internal.StateChangedActors(ctx, olnstate, newstate) } +func (c *FullNodeStruct) StateGetReceipt(ctx context.Context, msg cid.Cid, ts *types.TipSet) (*types.MessageReceipt, error) { + return c.Internal.StateGetReceipt(ctx, msg, ts) +} + func (c *FullNodeStruct) MarketEnsureAvailable(ctx context.Context, addr address.Address, amt types.BigInt) error { return c.Internal.MarketEnsureAvailable(ctx, addr, amt) } diff --git a/chain/deals/client.go b/chain/deals/client.go index 720926274..9346f2905 100644 --- a/chain/deals/client.go +++ b/chain/deals/client.go @@ -74,7 +74,7 @@ type clientDealUpdate struct { mut func(*ClientDeal) } -func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, dag dtypes.ClientDAG, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, fm *market.FundMgr, deals dtypes.ClientDealStore, chainapi full.ChainAPI) *Client { +func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, dag dtypes.ClientDAG, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, fm *market.FundMgr, deals dtypes.ClientDealStore, chainapi full.ChainAPI, stateapi full.StateAPI) *Client { c := &Client{ sm: sm, chain: chain, @@ -84,7 +84,7 @@ func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w * dag: dag, discovery: discovery, fm: fm, - events: events.NewEvents(context.TODO(), &chainapi), + events: events.NewEvents(context.TODO(), &struct {full.ChainAPI; full.StateAPI}{chainapi, stateapi}), deals: deals, conns: map[cid.Cid]inet.Stream{}, diff --git a/chain/deals/client_states.go b/chain/deals/client_states.go index c6438c236..bac7aab47 100644 --- a/chain/deals/client_states.go +++ b/chain/deals/client_states.go @@ -153,7 +153,7 @@ func (c *Client) sealing(ctx context.Context, deal ClientDeal) (func(*ClientDeal return false, true, nil } - called := func(msg *types.Message, ts *types.TipSet, curH uint64) (more bool, err error) { + called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) { defer func() { if err != nil { select { diff --git a/chain/events/events.go b/chain/events/events.go index fb0297ead..80028965b 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) @@ -32,6 +33,9 @@ type eventApi interface { ChainNotify(context.Context) (<-chan []*store.HeadChange, error) ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) + StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) + + StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) // optional / for CalledMsg } type Events struct { diff --git a/chain/events/events_called.go b/chain/events/events_called.go index 321eca840..e38b01035 100644 --- a/chain/events/events_called.go +++ b/chain/events/events_called.go @@ -23,7 +23,7 @@ type triggerH = uint64 // `ts` is the tipset, in which the `msg` is included. // `curH`-`ts.Height` = `confidence` -type CalledHandler func(msg *types.Message, ts *types.TipSet, curH uint64) (more bool, err error) +type CalledHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) // CheckFunc is used for atomicity guarantees. If the condition the callbacks // wait for has already happened in tipset `ts` @@ -186,7 +186,13 @@ func (e *calledEvents) applyWithConfidence(ts *types.TipSet) { continue } - more, err := trigger.handle(event.msg, triggerTs, ts.Height()) + rec, err := e.cs.StateGetReceipt(e.ctx, event.msg.Cid(), ts) + if err != nil { + log.Error(err) + return + } + + more, err := trigger.handle(event.msg, rec, triggerTs, ts.Height()) if err != nil { log.Errorf("chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, origH, ts.Height(), err) continue // don't revert failed calls @@ -224,7 +230,7 @@ func (e *calledEvents) applyTimeouts(ts *types.TipSet) { log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-uint64(trigger.confidence), ts.Height()) } - more, err := trigger.handle(nil, timeoutTs, ts.Height()) + more, err := trigger.handle(nil, nil, timeoutTs, ts.Height()) if err != nil { log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err) continue // don't revert failed calls @@ -328,3 +334,7 @@ func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHand return nil } + +func (e *calledEvents) CalledMsg(ctx context.Context, hnd CalledHandler, rev RevertHandler, confidence int, timeout uint64, msg *types.Message) error { + return e.Called(e.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, e.MatchMsg(msg)) +} diff --git a/chain/events/events_test.go b/chain/events/events_test.go index c4ecabe50..fa3d116af 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -40,6 +40,14 @@ type fakeCS struct { sub func(rev, app []*types.TipSet) } +func (fcs *fakeCS) StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) { + return nil, nil +} + +func (fcs *fakeCS) StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) { + panic("Not Implemented") +} + func (fcs *fakeCS) ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) { panic("Not Implemented") } @@ -514,7 +522,7 @@ func TestCalled(t *testing.T) { err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil - }, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) { + }, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (bool, error) { require.Equal(t, false, applied) applied = true appliedMsg = msg @@ -709,7 +717,7 @@ func TestCalledTimeout(t *testing.T) { err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil - }, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) { + }, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (bool, error) { called = true require.Nil(t, msg) require.Equal(t, uint64(20), ts.Height()) @@ -744,7 +752,7 @@ func TestCalledTimeout(t *testing.T) { err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) { return true, true, nil - }, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) { + }, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (bool, error) { called = true require.Nil(t, msg) require.Equal(t, uint64(20), ts.Height()) @@ -783,7 +791,7 @@ func TestCalledOrder(t *testing.T) { err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil - }, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) { + }, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (bool, error) { switch at { case 0: require.Equal(t, uint64(1), msg.Nonce) diff --git a/chain/events/utils.go b/chain/events/utils.go new file mode 100644 index 000000000..3dd34a1db --- /dev/null +++ b/chain/events/utils.go @@ -0,0 +1,35 @@ +package events + +import ( + "context" + + "github.com/filecoin-project/lotus/chain/types" +) + +func (e *calledEvents) CheckMsg(ctx context.Context, msg *types.Message, hnd CalledHandler) CheckFunc { + return func(ts *types.TipSet) (done bool, more bool, err error) { + fa, err := e.cs.StateGetActor(ctx, msg.From, ts) + if err != nil { + return false, true, err + } + + // TODO: probably want to look at the chain to make sure it's + // the right message, but this is probably good enough for now + done = fa.Nonce >= msg.Nonce + + rec, err := e.cs.StateGetReceipt(ctx, msg.Cid(), ts) + if err != nil { + return false, true, err + } + + more, err = hnd(msg, rec, ts, ts.Height()) + + return done, more, err + } +} + +func(e *calledEvents) MatchMsg(inmsg *types.Message) MatchFunc { + return func(msg *types.Message) (bool, error) { + return inmsg.Equals(msg), nil + } +} diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 5b6eafb81..eaaa4fc13 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -321,6 +321,29 @@ func (sm *StateManager) GetBlsPublicKey(ctx context.Context, addr address.Addres return pubk, nil } +func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.TipSet) (*types.MessageReceipt, error) { + r, err := sm.tipsetExecutedMessage(ts, msg) + if err != nil { + return nil, err + } + + if r != nil { + return r, nil + } + + m, err := sm.cs.GetCMessage(msg) + if err != nil { + return nil, fmt.Errorf("failed to load message: %w", err) + } + + _, r, err = sm.searchBackForMsg(ctx, ts, m) + if err != nil { + return nil, fmt.Errorf("failed to look back through chain for message: %w", err) + } + + return r, nil +} + func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid) (*types.TipSet, *types.MessageReceipt, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/chain/types/message.go b/chain/types/message.go index d2bf27358..aee5300cf 100644 --- a/chain/types/message.go +++ b/chain/types/message.go @@ -77,3 +77,7 @@ func (m *Message) RequiredFunds() BigInt { func (m *Message) VMMessage() *Message { return m } + +func (m *Message) Equals(o *Message) bool { + return m.Cid() == o.Cid() +} diff --git a/node/impl/full/state.go b/node/impl/full/state.go index 2454083b1..9724528ca 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -218,6 +218,10 @@ func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, }, nil } +func (a *StateAPI) StateGetReceipt(ctx context.Context, msg cid.Cid, ts *types.TipSet) (*types.MessageReceipt, error) { + return a.StateManager.GetReceipt(ctx, msg, ts) +} + func (a *StateAPI) StateListMiners(ctx context.Context, ts *types.TipSet) ([]address.Address, error) { var state actors.StoragePowerState if _, err := a.StateManager.LoadActorState(ctx, actors.StoragePowerAddress, &state, ts); err != nil { diff --git a/storage/miner.go b/storage/miner.go index af8bf5a6d..c2ebb0570 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -48,9 +48,6 @@ type Miner struct { } type storageMinerApi interface { - // I think I want this... but this is tricky - //ReadState(ctx context.Context, addr address.Address) (????, error) - // Call a read only method on actors (no interaction with the chain required) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error) @@ -58,7 +55,9 @@ type storageMinerApi interface { StateMinerSectors(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error) StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error) StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error) - StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) + StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) // TODO: removeme + StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) + StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) diff --git a/storage/post.go b/storage/post.go index ab94d719a..14a61d7c7 100644 --- a/storage/post.go +++ b/storage/post.go @@ -15,6 +15,8 @@ import ( "github.com/filecoin-project/lotus/lib/sectorbuilder" ) +const postMsgTimeout = 20 + func (m *Miner) beginPosting(ctx context.Context) { ts, err := m.api.ChainHead(context.TODO()) if err != nil { @@ -113,6 +115,7 @@ type post struct { proof []byte // commit + msg *types.Message smsg cid.Cid } @@ -220,7 +223,7 @@ func (p *post) commitPost(ctx context.Context) error { return xerrors.Errorf("could not serialize submit post parameters: %w", aerr) } - msg := &types.Message{ + p.msg = &types.Message{ To: p.m.maddr, From: p.m.worker, Method: actors.MAMethods.SubmitPoSt, @@ -232,7 +235,7 @@ func (p *post) commitPost(ctx context.Context) error { log.Info("mpush") - smsg, err := p.m.api.MpoolPushMessage(ctx, msg) + smsg, err := p.m.api.MpoolPushMessage(ctx, p.msg) if err != nil { return xerrors.Errorf("pushing message to mpool: %w", err) } @@ -247,16 +250,22 @@ func (p *post) waitCommit(ctx context.Context) error { log.Infof("Waiting for post %s to appear on chain", p.smsg) - // make sure it succeeds... - rec, err := p.m.api.StateWaitMsg(ctx, p.smsg) + err := p.m.events.CalledMsg(ctx, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) { + if rec.ExitCode != 0 { + log.Warnf("SubmitPoSt EXIT: %d", rec.ExitCode) + } + + log.Infof("Post made it on chain! (height=%d)", ts.Height()) + + return false, nil + }, func(ctx context.Context, ts *types.TipSet) error { + log.Warn("post message reverted") + return nil + }, 3, postMsgTimeout, p.msg) if err != nil { return err } - if rec.Receipt.ExitCode != 0 { - log.Warnf("SubmitPoSt EXIT: %d", rec.Receipt.ExitCode) - // TODO: Do something - } - log.Infof("Post made it on chain! (height=%d)", rec.TipSet.Height()) + return nil }