Merge pull request #642 from filecoin-project/fix/post-wait

post: More correct 'wait' logic
This commit is contained in:
Łukasz Magiera 2019-11-20 11:08:54 -06:00 committed by GitHub
commit 65c669b0f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 136 additions and 24 deletions

View File

@ -114,6 +114,7 @@ type FullNode interface {
StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error)
StateLookupID(context.Context, address.Address, *types.TipSet) (address.Address, error)
StateChangedActors(context.Context, cid.Cid, cid.Cid) (map[string]types.Actor, error)
StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error)
MarketEnsureAvailable(context.Context, address.Address, types.BigInt) error
// MarketFreeBalance

View File

@ -108,6 +108,7 @@ type FullNodeStruct struct {
StateMarketStorageDeal func(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) `perm:"read"`
StateLookupID func(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) `perm:"read"`
StateChangedActors func(context.Context, cid.Cid, cid.Cid) (map[string]types.Actor, error) `perm:"read"`
StateGetReceipt func(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) `perm:"read"`
MarketEnsureAvailable func(context.Context, address.Address, types.BigInt) error `perm:"sign"`
@ -439,6 +440,10 @@ func (c *FullNodeStruct) StateChangedActors(ctx context.Context, olnstate cid.Ci
return c.Internal.StateChangedActors(ctx, olnstate, newstate)
}
func (c *FullNodeStruct) StateGetReceipt(ctx context.Context, msg cid.Cid, ts *types.TipSet) (*types.MessageReceipt, error) {
return c.Internal.StateGetReceipt(ctx, msg, ts)
}
func (c *FullNodeStruct) MarketEnsureAvailable(ctx context.Context, addr address.Address, amt types.BigInt) error {
return c.Internal.MarketEnsureAvailable(ctx, addr, amt)
}

View File

@ -74,7 +74,12 @@ type clientDealUpdate struct {
mut func(*ClientDeal)
}
func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, dag dtypes.ClientDAG, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, fm *market.FundMgr, deals dtypes.ClientDealStore, chainapi full.ChainAPI) *Client {
type clientApi struct {
full.ChainAPI
full.StateAPI
}
func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *wallet.Wallet, dag dtypes.ClientDAG, dataTransfer dtypes.ClientDataTransfer, discovery *discovery.Local, fm *market.FundMgr, deals dtypes.ClientDealStore, chainapi full.ChainAPI, stateapi full.StateAPI) *Client {
c := &Client{
sm: sm,
chain: chain,
@ -84,7 +89,7 @@ func NewClient(sm *stmgr.StateManager, chain *store.ChainStore, h host.Host, w *
dag: dag,
discovery: discovery,
fm: fm,
events: events.NewEvents(context.TODO(), &chainapi),
events: events.NewEvents(context.TODO(), &clientApi{chainapi, stateapi}),
deals: deals,
conns: map[cid.Cid]inet.Stream{},

View File

@ -153,7 +153,7 @@ func (c *Client) sealing(ctx context.Context, deal ClientDeal) (func(*ClientDeal
return false, true, nil
}
called := func(msg *types.Message, ts *types.TipSet, curH uint64) (more bool, err error) {
called := func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) {
defer func() {
if err != nil {
select {

View File

@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
@ -32,6 +33,9 @@ type eventApi interface {
ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error)
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) // optional / for CalledMsg
}
type Events struct {

View File

@ -23,7 +23,7 @@ type triggerH = uint64
// `ts` is the tipset, in which the `msg` is included.
// `curH`-`ts.Height` = `confidence`
type CalledHandler func(msg *types.Message, ts *types.TipSet, curH uint64) (more bool, err error)
type CalledHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error)
// CheckFunc is used for atomicity guarantees. If the condition the callbacks
// wait for has already happened in tipset `ts`
@ -186,7 +186,13 @@ func (e *calledEvents) applyWithConfidence(ts *types.TipSet) {
continue
}
more, err := trigger.handle(event.msg, triggerTs, ts.Height())
rec, err := e.cs.StateGetReceipt(e.ctx, event.msg.Cid(), ts)
if err != nil {
log.Error(err)
return
}
more, err := trigger.handle(event.msg, rec, triggerTs, ts.Height())
if err != nil {
log.Errorf("chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, origH, ts.Height(), err)
continue // don't revert failed calls
@ -224,7 +230,7 @@ func (e *calledEvents) applyTimeouts(ts *types.TipSet) {
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-uint64(trigger.confidence), ts.Height())
}
more, err := trigger.handle(nil, timeoutTs, ts.Height())
more, err := trigger.handle(nil, nil, timeoutTs, ts.Height())
if err != nil {
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err)
continue // don't revert failed calls
@ -328,3 +334,7 @@ func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHand
return nil
}
func (e *calledEvents) CalledMsg(ctx context.Context, hnd CalledHandler, rev RevertHandler, confidence int, timeout uint64, msg *types.Message) error {
return e.Called(e.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, e.MatchMsg(msg))
}

View File

@ -40,6 +40,14 @@ type fakeCS struct {
sub func(rev, app []*types.TipSet)
}
func (fcs *fakeCS) StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) {
return nil, nil
}
func (fcs *fakeCS) StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) {
panic("Not Implemented")
}
func (fcs *fakeCS) ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error) {
panic("Not Implemented")
}
@ -514,7 +522,7 @@ func TestCalled(t *testing.T) {
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) {
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (bool, error) {
require.Equal(t, false, applied)
applied = true
appliedMsg = msg
@ -709,7 +717,7 @@ func TestCalledTimeout(t *testing.T) {
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) {
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (bool, error) {
called = true
require.Nil(t, msg)
require.Equal(t, uint64(20), ts.Height())
@ -744,7 +752,7 @@ func TestCalledTimeout(t *testing.T) {
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return true, true, nil
}, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) {
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (bool, error) {
called = true
require.Nil(t, msg)
require.Equal(t, uint64(20), ts.Height())
@ -783,7 +791,7 @@ func TestCalledOrder(t *testing.T) {
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) {
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (bool, error) {
switch at {
case 0:
require.Equal(t, uint64(1), msg.Nonce)

41
chain/events/utils.go Normal file
View File

@ -0,0 +1,41 @@
package events
import (
"context"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/types"
)
func (e *calledEvents) CheckMsg(ctx context.Context, msg *types.Message, hnd CalledHandler) CheckFunc {
return func(ts *types.TipSet) (done bool, more bool, err error) {
fa, err := e.cs.StateGetActor(ctx, msg.From, ts)
if err != nil {
return false, true, err
}
// TODO: probably want to look at the chain to make sure it's
// the right message, but this is probably good enough for now
done = fa.Nonce >= msg.Nonce
rec, err := e.cs.StateGetReceipt(ctx, msg.Cid(), ts)
if err != nil {
return false, true, err
}
more, err = hnd(msg, rec, ts, ts.Height())
return done, more, err
}
}
func(e *calledEvents) MatchMsg(inmsg *types.Message) MatchFunc {
return func(msg *types.Message) (bool, error) {
if msg.From == inmsg.From && msg.Nonce == inmsg.Nonce && !inmsg.Equals(msg) {
return false, xerrors.Errorf("matching msg %s from %s, nonce %d: got duplicate origin/nonce msg %s", inmsg.Cid(), inmsg.From, inmsg.Nonce, msg.Nonce)
}
return inmsg.Equals(msg), nil
}
}

View File

@ -321,6 +321,29 @@ func (sm *StateManager) GetBlsPublicKey(ctx context.Context, addr address.Addres
return pubk, nil
}
func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.TipSet) (*types.MessageReceipt, error) {
r, err := sm.tipsetExecutedMessage(ts, msg)
if err != nil {
return nil, err
}
if r != nil {
return r, nil
}
m, err := sm.cs.GetCMessage(msg)
if err != nil {
return nil, fmt.Errorf("failed to load message: %w", err)
}
_, r, err = sm.searchBackForMsg(ctx, ts, m)
if err != nil {
return nil, fmt.Errorf("failed to look back through chain for message: %w", err)
}
return r, nil
}
func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid) (*types.TipSet, *types.MessageReceipt, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

View File

@ -77,3 +77,7 @@ func (m *Message) RequiredFunds() BigInt {
func (m *Message) VMMessage() *Message {
return m
}
func (m *Message) Equals(o *Message) bool {
return m.Cid() == o.Cid()
}

View File

@ -56,7 +56,6 @@ func (s *Signature) TypeCode() int {
case KTBLS:
return IKTBLS
default:
log.Errorf("called TypeCode on signature with unknown Type: %q", s.Type)
return IKTUnknown
}
}

View File

@ -218,6 +218,10 @@ func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait,
}, nil
}
func (a *StateAPI) StateGetReceipt(ctx context.Context, msg cid.Cid, ts *types.TipSet) (*types.MessageReceipt, error) {
return a.StateManager.GetReceipt(ctx, msg, ts)
}
func (a *StateAPI) StateListMiners(ctx context.Context, ts *types.TipSet) ([]address.Address, error) {
var state actors.StoragePowerState
if _, err := a.StateManager.LoadActorState(ctx, actors.StoragePowerAddress, &state, ts); err != nil {

View File

@ -48,9 +48,6 @@ type Miner struct {
}
type storageMinerApi interface {
// I think I want this... but this is tricky
//ReadState(ctx context.Context, addr address.Address) (????, error)
// Call a read only method on actors (no interaction with the chain required)
StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error)
StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error)
@ -58,7 +55,9 @@ type storageMinerApi interface {
StateMinerSectors(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error)
StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error)
StateMinerSectorSize(context.Context, address.Address, *types.TipSet) (uint64, error)
StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error)
StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) // TODO: removeme eventually
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error)
StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error)
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)

View File

@ -15,6 +15,8 @@ import (
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
const postMsgTimeout = 20
func (m *Miner) beginPosting(ctx context.Context) {
ts, err := m.api.ChainHead(context.TODO())
if err != nil {
@ -113,6 +115,7 @@ type post struct {
proof []byte
// commit
msg *types.Message
smsg cid.Cid
}
@ -220,7 +223,7 @@ func (p *post) commitPost(ctx context.Context) error {
return xerrors.Errorf("could not serialize submit post parameters: %w", aerr)
}
msg := &types.Message{
p.msg = &types.Message{
To: p.m.maddr,
From: p.m.worker,
Method: actors.MAMethods.SubmitPoSt,
@ -232,7 +235,7 @@ func (p *post) commitPost(ctx context.Context) error {
log.Info("mpush")
smsg, err := p.m.api.MpoolPushMessage(ctx, msg)
smsg, err := p.m.api.MpoolPushMessage(ctx, p.msg)
if err != nil {
return xerrors.Errorf("pushing message to mpool: %w", err)
}
@ -247,16 +250,22 @@ func (p *post) waitCommit(ctx context.Context) error {
log.Infof("Waiting for post %s to appear on chain", p.smsg)
// make sure it succeeds...
rec, err := p.m.api.StateWaitMsg(ctx, p.smsg)
err := p.m.events.CalledMsg(ctx, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH uint64) (more bool, err error) {
if rec.ExitCode != 0 {
log.Warnf("SubmitPoSt EXIT: %d", rec.ExitCode)
}
log.Infof("Post made it on chain! (height=%d)", ts.Height())
return false, nil
}, func(ctx context.Context, ts *types.TipSet) error {
log.Warn("post message reverted")
return nil
}, 3, postMsgTimeout, p.msg)
if err != nil {
return err
}
if rec.Receipt.ExitCode != 0 {
log.Warnf("SubmitPoSt EXIT: %d", rec.Receipt.ExitCode)
// TODO: Do something
}
log.Infof("Post made it on chain! (height=%d)", rec.TipSet.Height())
return nil
}