Merge pull request #1730 from filecoin-project/fix/events-api-revert-issue-1723

events: Fix revert+apply of msg execution, but not the message
This commit is contained in:
Łukasz Magiera 2020-05-14 01:48:47 +02:00 committed by GitHub
commit 32dad4d43b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 164 additions and 33 deletions

View File

@ -36,6 +36,7 @@ type eventApi interface {
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error) StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error)
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) // optional / for CalledMsg StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) // optional / for CalledMsg
} }
@ -163,6 +164,11 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
if err := e.headChange(rev, app); err != nil { if err := e.headChange(rev, app); err != nil {
log.Warnf("headChange failed: %s", err) log.Warnf("headChange failed: %s", err)
} }
// sync with fake chainstore (for tests)
if fcs, ok := e.api.(interface{notifDone()}); ok {
fcs.notifDone()
}
} }
return nil return nil

View File

@ -125,7 +125,13 @@ func (e *calledEvents) handleReverts(ts *types.TipSet) {
} }
func (e *calledEvents) checkNewCalls(ts *types.TipSet) { func (e *calledEvents) checkNewCalls(ts *types.TipSet) {
e.messagesForTs(ts, func(msg *types.Message) { pts, err := e.cs.ChainGetTipSet(e.ctx, ts.Parents()) // we actually care about messages in the parent tipset here
if err != nil {
log.Errorf("getting parent tipset in checkNewCalls: %s", err)
return
}
e.messagesForTs(pts, func(msg *types.Message) {
// TODO: provide receipts // TODO: provide receipts
for tid, matchFns := range e.matchers { for tid, matchFns := range e.matchers {
@ -154,8 +160,7 @@ func (e *calledEvents) checkNewCalls(ts *types.TipSet) {
func (e *calledEvents) queueForConfidence(triggerId uint64, msg *types.Message, ts *types.TipSet) { func (e *calledEvents) queueForConfidence(triggerId uint64, msg *types.Message, ts *types.TipSet) {
trigger := e.triggers[triggerId] trigger := e.triggers[triggerId]
// messages are not applied in the tipset they are included in appliedH := ts.Height()
appliedH := ts.Height() + 1
triggerH := appliedH + abi.ChainEpoch(trigger.confidence) triggerH := appliedH + abi.ChainEpoch(trigger.confidence)

View File

@ -3,21 +3,20 @@ package events
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"testing" "testing"
"time"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/store"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
@ -40,9 +39,17 @@ type fakeCS struct {
msgs map[cid.Cid]fakeMsg msgs map[cid.Cid]fakeMsg
blkMsgs map[cid.Cid]cid.Cid blkMsgs map[cid.Cid]cid.Cid
sync sync.Mutex
tipsets map[types.TipSetKey]*types.TipSet
sub func(rev, app []*types.TipSet) sub func(rev, app []*types.TipSet)
} }
func (fcs *fakeCS) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) {
return fcs.tipsets[key], nil
}
func (fcs *fakeCS) StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error) { func (fcs *fakeCS) StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error) {
return nil, nil return nil, nil
} }
@ -55,7 +62,7 @@ func (fcs *fakeCS) ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types
panic("Not Implemented") panic("Not Implemented")
} }
func makeTs(t *testing.T, h abi.ChainEpoch, msgcid cid.Cid) *types.TipSet { func (fcs *fakeCS) makeTs(t *testing.T, parents []cid.Cid, h abi.ChainEpoch, msgcid cid.Cid) *types.TipSet {
a, _ := address.NewFromString("t00") a, _ := address.NewFromString("t00")
b, _ := address.NewFromString("t02") b, _ := address.NewFromString("t02")
var ts, err = types.NewTipSet([]*types.BlockHeader{ var ts, err = types.NewTipSet([]*types.BlockHeader{
@ -63,6 +70,8 @@ func makeTs(t *testing.T, h abi.ChainEpoch, msgcid cid.Cid) *types.TipSet {
Height: h, Height: h,
Miner: a, Miner: a,
Parents: parents,
Ticket: &types.Ticket{VRFProof: []byte{byte(h % 2)}}, Ticket: &types.Ticket{VRFProof: []byte{byte(h % 2)}},
ParentStateRoot: dummyCid, ParentStateRoot: dummyCid,
@ -76,6 +85,8 @@ func makeTs(t *testing.T, h abi.ChainEpoch, msgcid cid.Cid) *types.TipSet {
Height: h, Height: h,
Miner: b, Miner: b,
Parents: parents,
Ticket: &types.Ticket{VRFProof: []byte{byte((h + 1) % 2)}}, Ticket: &types.Ticket{VRFProof: []byte{byte((h + 1) % 2)}},
ParentStateRoot: dummyCid, ParentStateRoot: dummyCid,
@ -87,6 +98,11 @@ func makeTs(t *testing.T, h abi.ChainEpoch, msgcid cid.Cid) *types.TipSet {
}, },
}) })
if fcs.tipsets == nil {
fcs.tipsets = map[types.TipSetKey]*types.TipSet{}
}
fcs.tipsets[ts.Key()] = ts
require.NoError(t, err) require.NoError(t, err)
return ts return ts
@ -180,7 +196,7 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid, nulls ...int) { /
continue continue
} }
ts := makeTs(fcs.t, fcs.h, mc) ts := fcs.makeTs(fcs.t, fcs.tsc.best().Key().Cids(), fcs.h, mc)
require.NoError(fcs.t, fcs.tsc.add(ts)) require.NoError(fcs.t, fcs.tsc.add(ts))
if hasMsgs { if hasMsgs {
@ -190,8 +206,17 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid, nulls ...int) { /
apps = append(apps, ts) apps = append(apps, ts)
} }
fcs.sync.Lock()
fcs.sub(revs, apps) fcs.sub(revs, apps)
time.Sleep(100 * time.Millisecond) // TODO: :c
fcs.sync.Lock()
fcs.sync.Unlock()
}
func (fcs *fakeCS) notifDone() {
fcs.sync.Unlock()
} }
var _ eventApi = &fakeCS{} var _ eventApi = &fakeCS{}
@ -202,7 +227,7 @@ func TestAt(t *testing.T) {
h: 1, h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs) events := NewEvents(context.Background(), fcs)
@ -267,7 +292,7 @@ func TestAtDoubleTrigger(t *testing.T) {
h: 1, h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs) events := NewEvents(context.Background(), fcs)
@ -309,7 +334,7 @@ func TestAtNullTrigger(t *testing.T) {
h: 1, h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs) events := NewEvents(context.Background(), fcs)
@ -343,7 +368,7 @@ func TestAtNullConf(t *testing.T) {
h: 1, h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs) events := NewEvents(context.Background(), fcs)
@ -382,7 +407,7 @@ func TestAtStart(t *testing.T) {
h: 1, h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs) events := NewEvents(context.Background(), fcs)
@ -416,7 +441,7 @@ func TestAtStartConfidence(t *testing.T) {
h: 1, h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs) events := NewEvents(context.Background(), fcs)
@ -446,7 +471,7 @@ func TestAtChained(t *testing.T) {
h: 1, h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs) events := NewEvents(context.Background(), fcs)
@ -480,7 +505,7 @@ func TestAtChainedConfidence(t *testing.T) {
h: 1, h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs) events := NewEvents(context.Background(), fcs)
@ -514,7 +539,7 @@ func TestAtChainedConfidenceNull(t *testing.T) {
h: 1, h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs) events := NewEvents(context.Background(), fcs)
@ -552,7 +577,7 @@ func TestCalled(t *testing.T) {
blkMsgs: map[cid.Cid]cid.Cid{}, blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs) events := NewEvents(context.Background(), fcs)
@ -628,7 +653,7 @@ func TestCalled(t *testing.T) {
// revert the message // revert the message
fcs.advance(2, 1, nil) // H=7, we reverted ts with the msg fcs.advance(2, 1, nil) // H=7, we reverted ts with the msg execution, but not the msg itself
require.Equal(t, false, applied) require.Equal(t, false, applied)
require.Equal(t, true, reverted) require.Equal(t, true, reverted)
@ -642,10 +667,17 @@ func TestCalled(t *testing.T) {
}, },
}) })
fcs.advance(0, 5, map[int]cid.Cid{ // (confidence=3) fcs.advance(0, 3, map[int]cid.Cid{ // (n2msg confidence=1)
0: n2msg, 0: n2msg,
}) })
require.Equal(t, true, applied) // msg from H=7, which had reverted execution
require.Equal(t, false, reverted)
require.Equal(t, abi.ChainEpoch(10), appliedH)
applied = false
fcs.advance(0, 2, nil) // (confidence=3)
require.Equal(t, true, applied) require.Equal(t, true, applied)
require.Equal(t, false, reverted) require.Equal(t, false, reverted)
applied = false applied = false
@ -659,7 +691,7 @@ func TestCalled(t *testing.T) {
// revert and apply at different height // revert and apply at different height
fcs.advance(4, 6, map[int]cid.Cid{ // (confidence=3) fcs.advance(8, 6, map[int]cid.Cid{ // (confidence=3)
1: n2msg, 1: n2msg,
}) })
@ -671,9 +703,9 @@ func TestCalled(t *testing.T) {
reverted = false reverted = false
applied = false applied = false
require.Equal(t, abi.ChainEpoch(11), appliedTs.Height()) require.Equal(t, abi.ChainEpoch(7), appliedTs.Height())
require.Equal(t, "bafkqaaa", appliedTs.Blocks()[0].Messages.String()) require.Equal(t, "bafkqaaa", appliedTs.Blocks()[0].Messages.String())
require.Equal(t, abi.ChainEpoch(14), appliedH) require.Equal(t, abi.ChainEpoch(10), appliedH)
require.Equal(t, t0123, appliedMsg.To) require.Equal(t, t0123, appliedMsg.To)
require.Equal(t, uint64(2), appliedMsg.Nonce) require.Equal(t, uint64(2), appliedMsg.Nonce)
require.Equal(t, abi.MethodNum(5), appliedMsg.Method) require.Equal(t, abi.MethodNum(5), appliedMsg.Method)
@ -697,7 +729,7 @@ func TestCalled(t *testing.T) {
}), }),
}) })
fcs.advance(1, 4, nil) // H=19, but message reverted fcs.advance(2, 5, nil) // H=19, but message reverted
require.Equal(t, false, applied) require.Equal(t, false, applied)
require.Equal(t, false, reverted) require.Equal(t, false, reverted)
@ -757,7 +789,7 @@ func TestCalledTimeout(t *testing.T) {
blkMsgs: map[cid.Cid]cid.Cid{}, blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs) events := NewEvents(context.Background(), fcs)
@ -797,7 +829,7 @@ func TestCalledTimeout(t *testing.T) {
blkMsgs: map[cid.Cid]cid.Cid{}, blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events = NewEvents(context.Background(), fcs) events = NewEvents(context.Background(), fcs)
@ -831,7 +863,7 @@ func TestCalledOrder(t *testing.T) {
blkMsgs: map[cid.Cid]cid.Cid{}, blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs) events := NewEvents(context.Background(), fcs)
@ -894,7 +926,7 @@ func TestCalledNull(t *testing.T) {
blkMsgs: map[cid.Cid]cid.Cid{}, blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil), tsc: newTSCache(2*build.ForkLengthThreshold, nil),
} }
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid))) require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs) events := NewEvents(context.Background(), fcs)
@ -949,3 +981,90 @@ func TestCalledNull(t *testing.T) {
require.Equal(t, false, applied) require.Equal(t, false, applied)
require.Equal(t, true, reverted) require.Equal(t, true, reverted)
} }
func TestRemoveTriggersOnMessage(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
t0123, err := address.NewFromString("t0123")
require.NoError(t, err)
more := true
var applied, reverted bool
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
require.Equal(t, false, applied)
fmt.Println(msg == nil)
fmt.Println(curH)
applied = true
return more, nil
}, func(_ context.Context, ts *types.TipSet) error {
reverted = true
return nil
}, 3, 20, matchAddrMethod(t0123, 5))
require.NoError(t, err)
// create few blocks to make sure nothing get's randomly called
fcs.advance(0, 4, nil) // H=5
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// create blocks with message (but below confidence threshold)
fcs.advance(0, 3, map[int]cid.Cid{ // msg occurs at H=5, applied at H=6; H=8 (confidence=2)
0: fcs.fakeMsgs(fakeMsg{
bmsgs: []*types.Message{
{To: t0123, From: t0123, Method: 5, Nonce: 1},
},
}),
})
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// revert applied TS & message TS
fcs.advance(3, 1, nil) // H=6 (tipset message applied in reverted, AND message reverted)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// create additional blocks so we are above confidence threshold, but message not applied
// as it was reverted
fcs.advance(0, 5, nil) // H=11 (confidence=3, apply)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// create blocks with message again (but below confidence threshold)
fcs.advance(0, 3, map[int]cid.Cid{ // msg occurs at H=12, applied at H=13; H=15 (confidence=2)
0: fcs.fakeMsgs(fakeMsg{
bmsgs: []*types.Message{
{To: t0123, From: t0123, Method: 5, Nonce: 2},
},
}),
})
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// revert applied height TS, but don't remove message trigger
fcs.advance(2, 1, nil) // H=13 (tipset message applied in reverted, by tipset with message not reverted)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
// create additional blocks so we are above confidence threshold
fcs.advance(0, 4, nil) // H=18 (confidence=3, apply)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
}

View File

@ -68,6 +68,7 @@ type storageMinerApi interface {
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error) ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error) ChainReadObj(context.Context, cid.Cid) ([]byte, error)
ChainHasObj(context.Context, cid.Cid) (bool, error) ChainHasObj(context.Context, cid.Cid) (bool, error)
ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error)
WalletSign(context.Context, address.Address, []byte) (*crypto.Signature, error) WalletSign(context.Context, address.Address, []byte) (*crypto.Signature, error)
WalletBalance(context.Context, address.Address) (types.BigInt, error) WalletBalance(context.Context, address.Address) (types.BigInt, error)