Merge pull request #2944 from filecoin-project/feat/stmgr-wait-ignore-gas

stmgr: Allow replacing gas values in WaitMsg
This commit is contained in:
Łukasz Magiera 2020-08-11 17:51:28 +02:00 committed by GitHub
commit 0409c5a482
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 149 additions and 48 deletions

View File

@ -465,6 +465,7 @@ type DealInfo struct {
}
type MsgLookup struct {
Message cid.Cid // Can be different than requested, in case it was replaced, but only gas values changed
Receipt types.MessageReceipt
ReturnDec interface{}
TipSet types.TipSetKey

View File

@ -441,7 +441,7 @@ func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.T
return nil, fmt.Errorf("failed to load message: %w", err)
}
r, err := sm.tipsetExecutedMessage(ts, msg, m.VMMessage())
r, _, err := sm.tipsetExecutedMessage(ts, msg, m.VMMessage())
if err != nil {
return nil, err
}
@ -450,7 +450,7 @@ func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.T
return r, nil
}
_, r, err = sm.searchBackForMsg(ctx, ts, m)
_, r, _, err = sm.searchBackForMsg(ctx, ts, m)
if err != nil {
return nil, fmt.Errorf("failed to look back through chain for message: %w", err)
}
@ -461,44 +461,45 @@ func (sm *StateManager) GetReceipt(ctx context.Context, msg cid.Cid, ts *types.T
// WaitForMessage blocks until a message appears on chain. It looks backwards in the chain to see if this has already
// happened. It guarantees that the message has been on chain for at least confidence epochs without being reverted
// before returning.
func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confidence uint64) (*types.TipSet, *types.MessageReceipt, error) {
func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confidence uint64) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
msg, err := sm.cs.GetCMessage(mcid)
if err != nil {
return nil, nil, fmt.Errorf("failed to load message: %w", err)
return nil, nil, cid.Undef, fmt.Errorf("failed to load message: %w", err)
}
tsub := sm.cs.SubHeadChanges(ctx)
head, ok := <-tsub
if !ok {
return nil, nil, fmt.Errorf("SubHeadChanges stream was invalid")
return nil, nil, cid.Undef, fmt.Errorf("SubHeadChanges stream was invalid")
}
if len(head) != 1 {
return nil, nil, fmt.Errorf("SubHeadChanges first entry should have been one item")
return nil, nil, cid.Undef, fmt.Errorf("SubHeadChanges first entry should have been one item")
}
if head[0].Type != store.HCCurrent {
return nil, nil, fmt.Errorf("expected current head on SHC stream (got %s)", head[0].Type)
return nil, nil, cid.Undef, fmt.Errorf("expected current head on SHC stream (got %s)", head[0].Type)
}
r, err := sm.tipsetExecutedMessage(head[0].Val, mcid, msg.VMMessage())
r, foundMsg, err := sm.tipsetExecutedMessage(head[0].Val, mcid, msg.VMMessage())
if err != nil {
return nil, nil, err
return nil, nil, cid.Undef, err
}
if r != nil {
return head[0].Val, r, nil
return head[0].Val, r, foundMsg, nil
}
var backTs *types.TipSet
var backRcp *types.MessageReceipt
var backFm cid.Cid
backSearchWait := make(chan struct{})
go func() {
fts, r, err := sm.searchBackForMsg(ctx, head[0].Val, msg)
fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head[0].Val, msg)
if err != nil {
log.Warnf("failed to look back through chain for message: %w", err)
return
@ -506,11 +507,13 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confid
backTs = fts
backRcp = r
backFm = foundMsg
close(backSearchWait)
}()
var candidateTs *types.TipSet
var candidateRcp *types.MessageReceipt
var candidateFm cid.Cid
heightOfHead := head[0].Val.Height()
reverts := map[types.TipSetKey]bool{}
@ -518,7 +521,7 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confid
select {
case notif, ok := <-tsub:
if !ok {
return nil, nil, ctx.Err()
return nil, nil, cid.Undef, ctx.Err()
}
for _, val := range notif {
switch val.Type {
@ -526,24 +529,26 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confid
if val.Val.Equals(candidateTs) {
candidateTs = nil
candidateRcp = nil
candidateFm = cid.Undef
}
if backSearchWait != nil {
reverts[val.Val.Key()] = true
}
case store.HCApply:
if candidateTs != nil && val.Val.Height() >= candidateTs.Height()+abi.ChainEpoch(confidence) {
return candidateTs, candidateRcp, nil
return candidateTs, candidateRcp, candidateFm, nil
}
r, err := sm.tipsetExecutedMessage(val.Val, mcid, msg.VMMessage())
r, foundMsg, err := sm.tipsetExecutedMessage(val.Val, mcid, msg.VMMessage())
if err != nil {
return nil, nil, err
return nil, nil, cid.Undef, err
}
if r != nil {
if confidence == 0 {
return val.Val, r, err
return val.Val, r, foundMsg, err
}
candidateTs = val.Val
candidateRcp = r
candidateFm = foundMsg
}
heightOfHead = val.Val.Height()
}
@ -553,111 +558,112 @@ func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confid
if backTs != nil && !reverts[backTs.Key()] {
// if head is at or past confidence interval, return immediately
if heightOfHead >= backTs.Height()+abi.ChainEpoch(confidence) {
return backTs, backRcp, nil
return backTs, backRcp, backFm, nil
}
// wait for confidence interval
candidateTs = backTs
candidateRcp = backRcp
candidateFm = backFm
}
reverts = nil
backSearchWait = nil
case <-ctx.Done():
return nil, nil, ctx.Err()
return nil, nil, cid.Undef, ctx.Err()
}
}
}
func (sm *StateManager) SearchForMessage(ctx context.Context, mcid cid.Cid) (*types.TipSet, *types.MessageReceipt, error) {
func (sm *StateManager) SearchForMessage(ctx context.Context, mcid cid.Cid) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
msg, err := sm.cs.GetCMessage(mcid)
if err != nil {
return nil, nil, fmt.Errorf("failed to load message: %w", err)
return nil, nil, cid.Undef, fmt.Errorf("failed to load message: %w", err)
}
head := sm.cs.GetHeaviestTipSet()
r, err := sm.tipsetExecutedMessage(head, mcid, msg.VMMessage())
r, foundMsg, err := sm.tipsetExecutedMessage(head, mcid, msg.VMMessage())
if err != nil {
return nil, nil, err
return nil, nil, cid.Undef, err
}
if r != nil {
return head, r, nil
return head, r, foundMsg, nil
}
fts, r, err := sm.searchBackForMsg(ctx, head, msg)
fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head, msg)
if err != nil {
log.Warnf("failed to look back through chain for message %s", mcid)
return nil, nil, err
return nil, nil, cid.Undef, err
}
if fts == nil {
return nil, nil, nil
return nil, nil, cid.Undef, nil
}
return fts, r, nil
return fts, r, foundMsg, nil
}
func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, error) {
func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
cur := from
for {
if cur.Height() == 0 {
// it ain't here!
return nil, nil, nil
return nil, nil, cid.Undef, nil
}
select {
case <-ctx.Done():
return nil, nil, nil
return nil, nil, cid.Undef, nil
default:
}
var act types.Actor
err := sm.WithParentState(cur, sm.WithActor(m.VMMessage().From, GetActor(&act)))
if err != nil {
return nil, nil, err
return nil, nil, cid.Undef, err
}
// we either have no messages from the sender, or the latest message we found has a lower nonce than the one being searched for,
// either way, no reason to lookback, it ain't there
if act.Nonce == 0 || act.Nonce < m.VMMessage().Nonce {
return nil, nil, nil
return nil, nil, cid.Undef, nil
}
ts, err := sm.cs.LoadTipSet(cur.Parents())
if err != nil {
return nil, nil, fmt.Errorf("failed to load tipset during msg wait searchback: %w", err)
return nil, nil, cid.Undef, fmt.Errorf("failed to load tipset during msg wait searchback: %w", err)
}
r, err := sm.tipsetExecutedMessage(ts, m.Cid(), m.VMMessage())
r, foundMsg, err := sm.tipsetExecutedMessage(ts, m.Cid(), m.VMMessage())
if err != nil {
return nil, nil, fmt.Errorf("checking for message execution during lookback: %w", err)
return nil, nil, cid.Undef, fmt.Errorf("checking for message execution during lookback: %w", err)
}
if r != nil {
return ts, r, nil
return ts, r, foundMsg, nil
}
cur = ts
}
}
func (sm *StateManager) tipsetExecutedMessage(ts *types.TipSet, msg cid.Cid, vmm *types.Message) (*types.MessageReceipt, error) {
func (sm *StateManager) tipsetExecutedMessage(ts *types.TipSet, msg cid.Cid, vmm *types.Message) (*types.MessageReceipt, cid.Cid, error) {
// The genesis block did not execute any messages
if ts.Height() == 0 {
return nil, nil
return nil, cid.Undef, nil
}
pts, err := sm.cs.LoadTipSet(ts.Parents())
if err != nil {
return nil, err
return nil, cid.Undef, err
}
cm, err := sm.cs.MessagesForTipset(pts)
if err != nil {
return nil, err
return nil, cid.Undef, err
}
for ii := range cm {
@ -667,21 +673,30 @@ func (sm *StateManager) tipsetExecutedMessage(ts *types.TipSet, msg cid.Cid, vmm
if m.VMMessage().From == vmm.From { // cheaper to just check origin first
if m.VMMessage().Nonce == vmm.Nonce {
if m.Cid() == msg {
return sm.cs.GetParentReceipt(ts.Blocks()[0], i)
if m.VMMessage().EqualCall(vmm) {
if m.Cid() != msg {
log.Warnw("found message with equal nonce and call params but different CID",
"wanted", msg, "found", m.Cid(), "nonce", vmm.Nonce, "from", vmm.From)
}
pr, err := sm.cs.GetParentReceipt(ts.Blocks()[0], i)
if err != nil {
return nil, cid.Undef, err
}
return pr, m.Cid(), nil
}
// this should be that message
return nil, xerrors.Errorf("found message with equal nonce as the one we are looking for (F:%s n %d, TS: %s n%d)",
return nil, cid.Undef, xerrors.Errorf("found message with equal nonce as the one we are looking for (F:%s n %d, TS: %s n%d)",
msg, vmm.Nonce, m.Cid(), m.VMMessage().Nonce)
}
if m.VMMessage().Nonce < vmm.Nonce {
return nil, nil // don't bother looking further
return nil, cid.Undef, nil // don't bother looking further
}
}
}
return nil, nil
return nil, cid.Undef, nil
}
func (sm *StateManager) ListAllActors(ctx context.Context, ts *types.TipSet) ([]address.Address, error) {

View File

@ -118,6 +118,17 @@ func (m *Message) Equals(o *Message) bool {
return m.Cid() == o.Cid()
}
func (m *Message) EqualCall(o *Message) bool {
m1 := *m
m2 := *o
m1.GasLimit, m2.GasLimit = 0, 0
m1.GasFeeCap, m2.GasFeeCap = big.Zero(), big.Zero()
m1.GasPremium, m2.GasPremium = big.Zero(), big.Zero()
return (&m1).Equals(&m2)
}
func (m *Message) ValidForBlockInclusion(minGas int64) error {
if m.Version != 0 {
return xerrors.New("'Version' unsupported")

View File

@ -0,0 +1,72 @@
package types
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin"
)
func TestEqualCall(t *testing.T) {
m1 := &Message{
To: builtin.StoragePowerActorAddr,
From: builtin.SystemActorAddr,
Nonce: 34,
Value: big.Zero(),
GasLimit: 123,
GasFeeCap: big.NewInt(234),
GasPremium: big.NewInt(234),
Method: 6,
Params: []byte("hai"),
}
m2 := &Message{
To: builtin.StoragePowerActorAddr,
From: builtin.SystemActorAddr,
Nonce: 34,
Value: big.Zero(),
GasLimit: 1236, // changed
GasFeeCap: big.NewInt(234),
GasPremium: big.NewInt(234),
Method: 6,
Params: []byte("hai"),
}
m3 := &Message{
To: builtin.StoragePowerActorAddr,
From: builtin.SystemActorAddr,
Nonce: 34,
Value: big.Zero(),
GasLimit: 123,
GasFeeCap: big.NewInt(4524), // changed
GasPremium: big.NewInt(234),
Method: 6,
Params: []byte("hai"),
}
m4 := &Message{
To: builtin.StoragePowerActorAddr,
From: builtin.SystemActorAddr,
Nonce: 34,
Value: big.Zero(),
GasLimit: 123,
GasFeeCap: big.NewInt(4524),
GasPremium: big.NewInt(234),
Method: 5, // changed
Params: []byte("hai"),
}
require.True(t, m1.EqualCall(m2))
require.True(t, m1.EqualCall(m3))
require.False(t, m1.EqualCall(m4))
}

View File

@ -209,7 +209,7 @@ func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal stor
}
// TODO: timeout
_, ret, err := c.sm.WaitForMessage(ctx, *deal.PublishMessage, build.MessageConfidence)
_, ret, _, err := c.sm.WaitForMessage(ctx, *deal.PublishMessage, build.MessageConfidence)
if err != nil {
return 0, xerrors.Errorf("waiting for deal publish message: %w", err)
}

View File

@ -426,7 +426,7 @@ func (a *StateAPI) MinerCreateBlock(ctx context.Context, bt *api.BlockTemplate)
}
func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) {
ts, recpt, err := a.StateManager.WaitForMessage(ctx, msg, confidence)
ts, recpt, found, err := a.StateManager.WaitForMessage(ctx, msg, confidence)
if err != nil {
return nil, err
}
@ -453,6 +453,7 @@ func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uin
}
return &api.MsgLookup{
Message: found,
Receipt: *recpt,
ReturnDec: returndec,
TipSet: ts.Key(),
@ -461,13 +462,14 @@ func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uin
}
func (a *StateAPI) StateSearchMsg(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error) {
ts, recpt, err := a.StateManager.SearchForMessage(ctx, msg)
ts, recpt, found, err := a.StateManager.SearchForMessage(ctx, msg)
if err != nil {
return nil, err
}
if ts != nil {
return &api.MsgLookup{
Message: found,
Receipt: *recpt,
TipSet: ts.Key(),
Height: ts.Height(),