From bc381fc053527cfc0e005ddb8ccea4c4681bd13b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 10 Aug 2020 14:55:27 +0200 Subject: [PATCH] stmgr: Allow changing gas values in WaitMsg --- api/api_full.go | 1 + chain/stmgr/stmgr.go | 105 ++++++++++++++++++------------- chain/types/message.go | 11 ++++ chain/types/message_test.go | 72 +++++++++++++++++++++ markets/storageadapter/client.go | 2 +- node/impl/full/state.go | 6 +- 6 files changed, 149 insertions(+), 48 deletions(-) create mode 100644 chain/types/message_test.go diff --git a/api/api_full.go b/api/api_full.go index 1a3850cc6..1c01fee27 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -465,6 +465,7 @@ type DealInfo struct { } type MsgLookup struct { + Message cid.Cid // Can be different than requested, in case it was replaced, but only gas values changed Receipt types.MessageReceipt ReturnDec interface{} TipSet types.TipSetKey diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index 566692b38..67ef58800 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -441,7 +441,7 @@ func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.T return nil, fmt.Errorf("failed to load message: %w", err) } - r, err := sm.tipsetExecutedMessage(ts, msg, m.VMMessage()) + r, _, err := sm.tipsetExecutedMessage(ts, msg, m.VMMessage()) if err != nil { return nil, err } @@ -450,7 +450,7 @@ func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.T return r, nil } - _, r, err = sm.searchBackForMsg(ctx, ts, m) + _, r, _, err = sm.searchBackForMsg(ctx, ts, m) if err != nil { return nil, fmt.Errorf("failed to look back through chain for message: %w", err) } @@ -461,44 +461,45 @@ func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.T // WaitForMessage blocks until a message appears on chain. It looks backwards in the chain to see if this has already // happened. It guarantees that the message has been on chain for at least confidence epochs without being reverted // before returning. -func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confidence uint64) (*types.TipSet, *types.MessageReceipt, error) { +func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confidence uint64) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() msg, err := sm.cs.GetCMessage(mcid) if err != nil { - return nil, nil, fmt.Errorf("failed to load message: %w", err) + return nil, nil, cid.Undef, fmt.Errorf("failed to load message: %w", err) } tsub := sm.cs.SubHeadChanges(ctx) head, ok := <-tsub if !ok { - return nil, nil, fmt.Errorf("SubHeadChanges stream was invalid") + return nil, nil, cid.Undef, fmt.Errorf("SubHeadChanges stream was invalid") } if len(head) != 1 { - return nil, nil, fmt.Errorf("SubHeadChanges first entry should have been one item") + return nil, nil, cid.Undef, fmt.Errorf("SubHeadChanges first entry should have been one item") } if head[0].Type != store.HCCurrent { - return nil, nil, fmt.Errorf("expected current head on SHC stream (got %s)", head[0].Type) + return nil, nil, cid.Undef, fmt.Errorf("expected current head on SHC stream (got %s)", head[0].Type) } - r, err := sm.tipsetExecutedMessage(head[0].Val, mcid, msg.VMMessage()) + r, foundMsg, err := sm.tipsetExecutedMessage(head[0].Val, mcid, msg.VMMessage()) if err != nil { - return nil, nil, err + return nil, nil, cid.Undef, err } if r != nil { - return head[0].Val, r, nil + return head[0].Val, r, foundMsg, nil } var backTs *types.TipSet var backRcp *types.MessageReceipt + var backFm cid.Cid backSearchWait := make(chan struct{}) go func() { - fts, r, err := sm.searchBackForMsg(ctx, head[0].Val, msg) + fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head[0].Val, msg) if err != nil { log.Warnf("failed to look back through chain for message: %w", err) return @@ -506,11 +507,13 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confid backTs = fts backRcp = r + backFm = foundMsg close(backSearchWait) }() var candidateTs *types.TipSet var candidateRcp *types.MessageReceipt + var candidateFm cid.Cid heightOfHead := head[0].Val.Height() reverts := map[types.TipSetKey]bool{} @@ -518,7 +521,7 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confid select { case notif, ok := <-tsub: if !ok { - return nil, nil, ctx.Err() + return nil, nil, cid.Undef, ctx.Err() } for _, val := range notif { switch val.Type { @@ -526,24 +529,26 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confid if val.Val.Equals(candidateTs) { candidateTs = nil candidateRcp = nil + candidateFm = cid.Undef } if backSearchWait != nil { reverts[val.Val.Key()] = true } case store.HCApply: if candidateTs != nil && val.Val.Height() >= candidateTs.Height()+abi.ChainEpoch(confidence) { - return candidateTs, candidateRcp, nil + return candidateTs, candidateRcp, candidateFm, nil } - r, err := sm.tipsetExecutedMessage(val.Val, mcid, msg.VMMessage()) + r, foundMsg, err := sm.tipsetExecutedMessage(val.Val, mcid, msg.VMMessage()) if err != nil { - return nil, nil, err + return nil, nil, cid.Undef, err } if r != nil { if confidence == 0 { - return val.Val, r, err + return val.Val, r, foundMsg, err } candidateTs = val.Val candidateRcp = r + candidateFm = foundMsg } heightOfHead = val.Val.Height() } @@ -553,111 +558,112 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confid if backTs != nil && !reverts[backTs.Key()] { // if head is at or past confidence interval, return immediately if heightOfHead >= backTs.Height()+abi.ChainEpoch(confidence) { - return backTs, backRcp, nil + return backTs, backRcp, backFm, nil } // wait for confidence interval candidateTs = backTs candidateRcp = backRcp + candidateFm = backFm } reverts = nil backSearchWait = nil case <-ctx.Done(): - return nil, nil, ctx.Err() + return nil, nil, cid.Undef, ctx.Err() } } } -func (sm *StateManager) SearchForMessage(ctx context.Context, mcid cid.Cid) (*types.TipSet, *types.MessageReceipt, error) { +func (sm *StateManager) SearchForMessage(ctx context.Context, mcid cid.Cid) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) { msg, err := sm.cs.GetCMessage(mcid) if err != nil { - return nil, nil, fmt.Errorf("failed to load message: %w", err) + return nil, nil, cid.Undef, fmt.Errorf("failed to load message: %w", err) } head := sm.cs.GetHeaviestTipSet() - r, err := sm.tipsetExecutedMessage(head, mcid, msg.VMMessage()) + r, foundMsg, err := sm.tipsetExecutedMessage(head, mcid, msg.VMMessage()) if err != nil { - return nil, nil, err + return nil, nil, cid.Undef, err } if r != nil { - return head, r, nil + return head, r, foundMsg, nil } - fts, r, err := sm.searchBackForMsg(ctx, head, msg) + fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head, msg) if err != nil { log.Warnf("failed to look back through chain for message %s", mcid) - return nil, nil, err + return nil, nil, cid.Undef, err } if fts == nil { - return nil, nil, nil + return nil, nil, cid.Undef, nil } - return fts, r, nil + return fts, r, foundMsg, nil } -func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, error) { +func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) { cur := from for { if cur.Height() == 0 { // it ain't here! - return nil, nil, nil + return nil, nil, cid.Undef, nil } select { case <-ctx.Done(): - return nil, nil, nil + return nil, nil, cid.Undef, nil default: } var act types.Actor err := sm.WithParentState(cur, sm.WithActor(m.VMMessage().From, GetActor(&act))) if err != nil { - return nil, nil, err + return nil, nil, cid.Undef, err } // we either have no messages from the sender, or the latest message we found has a lower nonce than the one being searched for, // either way, no reason to lookback, it ain't there if act.Nonce == 0 || act.Nonce < m.VMMessage().Nonce { - return nil, nil, nil + return nil, nil, cid.Undef, nil } ts, err := sm.cs.LoadTipSet(cur.Parents()) if err != nil { - return nil, nil, fmt.Errorf("failed to load tipset during msg wait searchback: %w", err) + return nil, nil, cid.Undef, fmt.Errorf("failed to load tipset during msg wait searchback: %w", err) } - r, err := sm.tipsetExecutedMessage(ts, m.Cid(), m.VMMessage()) + r, foundMsg, err := sm.tipsetExecutedMessage(ts, m.Cid(), m.VMMessage()) if err != nil { - return nil, nil, fmt.Errorf("checking for message execution during lookback: %w", err) + return nil, nil, cid.Undef, fmt.Errorf("checking for message execution during lookback: %w", err) } if r != nil { - return ts, r, nil + return ts, r, foundMsg, nil } cur = ts } } -func (sm *StateManager) tipsetExecutedMessage(ts *types.TipSet, msg cid.Cid, vmm *types.Message) (*types.MessageReceipt, error) { +func (sm *StateManager) tipsetExecutedMessage(ts *types.TipSet, msg cid.Cid, vmm *types.Message) (*types.MessageReceipt, cid.Cid, error) { // The genesis block did not execute any messages if ts.Height() == 0 { - return nil, nil + return nil, cid.Undef, nil } pts, err := sm.cs.LoadTipSet(ts.Parents()) if err != nil { - return nil, err + return nil, cid.Undef, err } cm, err := sm.cs.MessagesForTipset(pts) if err != nil { - return nil, err + return nil, cid.Undef, err } for ii := range cm { @@ -667,21 +673,30 @@ func (sm *StateManager) tipsetExecutedMessage(ts *types.TipSet, msg cid.Cid, vmm if m.VMMessage().From == vmm.From { // cheaper to just check origin first if m.VMMessage().Nonce == vmm.Nonce { - if m.Cid() == msg { - return sm.cs.GetParentReceipt(ts.Blocks()[0], i) + if m.VMMessage().EqualCall(vmm) { + if m.Cid() != msg { + log.Warnw("found message with equal nonce and call params but different CID", + "wanted", msg, "found", m.Cid(), "nonce", vmm.Nonce, "from", vmm.From) + } + + pr, err := sm.cs.GetParentReceipt(ts.Blocks()[0], i) + if err != nil { + return nil, cid.Undef, err + } + return pr, m.Cid(), nil } // this should be that message - return nil, xerrors.Errorf("found message with equal nonce as the one we are looking for (F:%s n %d, TS: %s n%d)", + return nil, cid.Undef, xerrors.Errorf("found message with equal nonce as the one we are looking for (F:%s n %d, TS: %s n%d)", msg, vmm.Nonce, m.Cid(), m.VMMessage().Nonce) } if m.VMMessage().Nonce < vmm.Nonce { - return nil, nil // don't bother looking further + return nil, cid.Undef, nil // don't bother looking further } } } - return nil, nil + return nil, cid.Undef, nil } func (sm *StateManager) ListAllActors(ctx context.Context, ts *types.TipSet) ([]address.Address, error) { diff --git a/chain/types/message.go b/chain/types/message.go index eb03edd67..aab3020d5 100644 --- a/chain/types/message.go +++ b/chain/types/message.go @@ -118,6 +118,17 @@ func (m *Message) Equals(o *Message) bool { return m.Cid() == o.Cid() } +func (m *Message) EqualCall(o *Message) bool { + m1 := *m + m2 := *o + + m1.GasLimit, m2.GasLimit = 0, 0 + m1.GasFeeCap, m2.GasFeeCap = big.Zero(), big.Zero() + m1.GasPremium, m2.GasPremium = big.Zero(), big.Zero() + + return (&m1).Equals(&m2) +} + func (m *Message) ValidForBlockInclusion(minGas int64) error { if m.Version != 0 { return xerrors.New("'Version' unsupported") diff --git a/chain/types/message_test.go b/chain/types/message_test.go new file mode 100644 index 000000000..a7b4927e5 --- /dev/null +++ b/chain/types/message_test.go @@ -0,0 +1,72 @@ +package types + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/specs-actors/actors/abi/big" + "github.com/filecoin-project/specs-actors/actors/builtin" +) + +func TestEqualCall(t *testing.T) { + m1 := &Message{ + To: builtin.StoragePowerActorAddr, + From: builtin.SystemActorAddr, + Nonce: 34, + Value: big.Zero(), + + GasLimit: 123, + GasFeeCap: big.NewInt(234), + GasPremium: big.NewInt(234), + + Method: 6, + Params: []byte("hai"), + } + + m2 := &Message{ + To: builtin.StoragePowerActorAddr, + From: builtin.SystemActorAddr, + Nonce: 34, + Value: big.Zero(), + + GasLimit: 1236, // changed + GasFeeCap: big.NewInt(234), + GasPremium: big.NewInt(234), + + Method: 6, + Params: []byte("hai"), + } + + m3 := &Message{ + To: builtin.StoragePowerActorAddr, + From: builtin.SystemActorAddr, + Nonce: 34, + Value: big.Zero(), + + GasLimit: 123, + GasFeeCap: big.NewInt(4524), // changed + GasPremium: big.NewInt(234), + + Method: 6, + Params: []byte("hai"), + } + + m4 := &Message{ + To: builtin.StoragePowerActorAddr, + From: builtin.SystemActorAddr, + Nonce: 34, + Value: big.Zero(), + + GasLimit: 123, + GasFeeCap: big.NewInt(4524), + GasPremium: big.NewInt(234), + + Method: 5, // changed + Params: []byte("hai"), + } + + require.True(t, m1.EqualCall(m2)) + require.True(t, m1.EqualCall(m3)) + require.False(t, m1.EqualCall(m4)) +} diff --git a/markets/storageadapter/client.go b/markets/storageadapter/client.go index a97bcf758..74e3f8394 100644 --- a/markets/storageadapter/client.go +++ b/markets/storageadapter/client.go @@ -209,7 +209,7 @@ func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal stor } // TODO: timeout - _, ret, err := c.sm.WaitForMessage(ctx, *deal.PublishMessage, build.MessageConfidence) + _, ret, _, err := c.sm.WaitForMessage(ctx, *deal.PublishMessage, build.MessageConfidence) if err != nil { return 0, xerrors.Errorf("waiting for deal publish message: %w", err) } diff --git a/node/impl/full/state.go b/node/impl/full/state.go index 5495581f1..73bd4a398 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -426,7 +426,7 @@ func (a *StateAPI) MinerCreateBlock(ctx context.Context, bt *api.BlockTemplate) } func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) { - ts, recpt, err := a.StateManager.WaitForMessage(ctx, msg, confidence) + ts, recpt, found, err := a.StateManager.WaitForMessage(ctx, msg, confidence) if err != nil { return nil, err } @@ -453,6 +453,7 @@ func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uin } return &api.MsgLookup{ + Message: found, Receipt: *recpt, ReturnDec: returndec, TipSet: ts.Key(), @@ -461,13 +462,14 @@ func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uin } func (a *StateAPI) StateSearchMsg(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error) { - ts, recpt, err := a.StateManager.SearchForMessage(ctx, msg) + ts, recpt, found, err := a.StateManager.SearchForMessage(ctx, msg) if err != nil { return nil, err } if ts != nil { return &api.MsgLookup{ + Message: found, Receipt: *recpt, TipSet: ts.Key(), Height: ts.Height(),