package events import ( "context" "fmt" "sync" "testing" "gotest.tools/assert" "github.com/ipfs/go-cid" "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) var dummyCid cid.Cid func init() { dummyCid, _ = cid.Parse("bafkqaaa") } type fakeMsg struct { bmsgs []*types.Message smsgs []*types.SignedMessage } type fakeCS struct { t *testing.T h abi.ChainEpoch tsc *tipSetCache msgs map[cid.Cid]fakeMsg blkMsgs map[cid.Cid]cid.Cid sync sync.Mutex tipsets map[types.TipSetKey]*types.TipSet sub func(rev, app []*types.TipSet) callNumberLk sync.Mutex callNumber map[string]int } func (fcs *fakeCS) ChainHead(ctx context.Context) (*types.TipSet, error) { fcs.callNumberLk.Lock() defer fcs.callNumberLk.Unlock() fcs.callNumber["ChainHead"] = fcs.callNumber["ChainHead"] + 1 panic("implement me") } func (fcs *fakeCS) ChainGetTipSet(ctx context.Context, key types.TipSetKey) (*types.TipSet, error) { fcs.callNumberLk.Lock() defer fcs.callNumberLk.Unlock() fcs.callNumber["ChainGetTipSet"] = fcs.callNumber["ChainGetTipSet"] + 1 return fcs.tipsets[key], nil } func (fcs *fakeCS) StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error) { fcs.callNumberLk.Lock() defer fcs.callNumberLk.Unlock() fcs.callNumber["StateSearchMsg"] = fcs.callNumber["StateSearchMsg"] + 1 return nil, nil } func (fcs *fakeCS) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { fcs.callNumberLk.Lock() defer fcs.callNumberLk.Unlock() fcs.callNumber["StateGetActor"] = fcs.callNumber["StateGetActor"] + 1 panic("Not Implemented") } func (fcs *fakeCS) ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error) { fcs.callNumberLk.Lock() defer fcs.callNumberLk.Unlock() fcs.callNumber["ChainGetTipSetByHeight"] = fcs.callNumber["ChainGetTipSetByHeight"] + 1 panic("Not Implemented") } func (fcs *fakeCS) makeTs(t *testing.T, parents []cid.Cid, h abi.ChainEpoch, msgcid cid.Cid) *types.TipSet { a, _ := address.NewFromString("t00") b, _ := address.NewFromString("t02") var ts, err = types.NewTipSet([]*types.BlockHeader{ { Height: h, Miner: a, Parents: parents, Ticket: &types.Ticket{VRFProof: []byte{byte(h % 2)}}, ParentStateRoot: dummyCid, Messages: msgcid, ParentMessageReceipts: dummyCid, BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, }, { Height: h, Miner: b, Parents: parents, Ticket: &types.Ticket{VRFProof: []byte{byte((h + 1) % 2)}}, ParentStateRoot: dummyCid, Messages: msgcid, ParentMessageReceipts: dummyCid, BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, }, }) if fcs.tipsets == nil { fcs.tipsets = map[types.TipSetKey]*types.TipSet{} } fcs.tipsets[ts.Key()] = ts require.NoError(t, err) return ts } func (fcs *fakeCS) ChainNotify(context.Context) (<-chan []*api.HeadChange, error) { fcs.callNumberLk.Lock() defer fcs.callNumberLk.Unlock() fcs.callNumber["ChainNotify"] = fcs.callNumber["ChainNotify"] + 1 out := make(chan []*api.HeadChange, 1) best, err := fcs.tsc.best() if err != nil { return nil, err } out <- []*api.HeadChange{{Type: store.HCCurrent, Val: best}} fcs.sub = func(rev, app []*types.TipSet) { notif := make([]*api.HeadChange, len(rev)+len(app)) for i, r := range rev { notif[i] = &api.HeadChange{ Type: store.HCRevert, Val: r, } } for i, r := range app { notif[i+len(rev)] = &api.HeadChange{ Type: store.HCApply, Val: r, } } out <- notif } return out, nil } func (fcs *fakeCS) ChainGetBlockMessages(ctx context.Context, blk cid.Cid) (*api.BlockMessages, error) { fcs.callNumberLk.Lock() defer fcs.callNumberLk.Unlock() fcs.callNumber["ChainGetBlockMessages"] = fcs.callNumber["ChainGetBlockMessages"] + 1 messages, ok := fcs.blkMsgs[blk] if !ok { return &api.BlockMessages{}, nil } ms, ok := fcs.msgs[messages] if !ok { return &api.BlockMessages{}, nil } return &api.BlockMessages{BlsMessages: ms.bmsgs, SecpkMessages: ms.smsgs}, nil } func (fcs *fakeCS) fakeMsgs(m fakeMsg) cid.Cid { n := len(fcs.msgs) c, err := cid.Prefix{ Version: 1, Codec: cid.Raw, MhType: multihash.IDENTITY, MhLength: -1, }.Sum([]byte(fmt.Sprintf("%d", n))) require.NoError(fcs.t, err) fcs.msgs[c] = m return c } func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid, nulls ...int) { // todo: allow msgs if fcs.sub == nil { fcs.t.Fatal("sub not be nil") } nullm := map[int]struct{}{} for _, v := range nulls { nullm[v] = struct{}{} } var revs []*types.TipSet for i := 0; i < rev; i++ { ts, err := fcs.tsc.best() require.NoError(fcs.t, err) if _, ok := nullm[int(ts.Height())]; !ok { revs = append(revs, ts) require.NoError(fcs.t, fcs.tsc.revert(ts)) } fcs.h-- } var apps []*types.TipSet for i := 0; i < app; i++ { fcs.h++ mc, hasMsgs := msgs[i] if !hasMsgs { mc = dummyCid } if _, ok := nullm[int(fcs.h)]; ok { continue } best, err := fcs.tsc.best() require.NoError(fcs.t, err) ts := fcs.makeTs(fcs.t, best.Key().Cids(), fcs.h, mc) require.NoError(fcs.t, fcs.tsc.add(ts)) if hasMsgs { fcs.blkMsgs[ts.Blocks()[0].Cid()] = mc } apps = append(apps, ts) } fcs.sync.Lock() fcs.sub(revs, apps) fcs.sync.Lock() fcs.sync.Unlock() //nolint:staticcheck } func (fcs *fakeCS) notifDone() { fcs.sync.Unlock() } var _ EventAPI = &fakeCS{} func TestAt(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, tsc: newTSCache(2*build.ForkLengthThreshold, nil), callNumber: map[string]int{}, } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) var applied bool var reverted bool err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { require.Equal(t, 5, int(ts.Height())) require.Equal(t, 8, int(curH)) applied = true return nil }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil }, 3, 5) require.NoError(t, err) fcs.advance(0, 3, nil) require.Equal(t, false, applied) require.Equal(t, false, reverted) fcs.advance(0, 3, nil) require.Equal(t, false, applied) require.Equal(t, false, reverted) fcs.advance(0, 3, nil) require.Equal(t, true, applied) require.Equal(t, false, reverted) applied = false fcs.advance(0, 3, nil) require.Equal(t, false, applied) require.Equal(t, false, reverted) fcs.advance(10, 10, nil) require.Equal(t, true, applied) require.Equal(t, true, reverted) applied = false reverted = false fcs.advance(10, 1, nil) require.Equal(t, false, applied) require.Equal(t, true, reverted) reverted = false fcs.advance(0, 1, nil) require.Equal(t, false, applied) require.Equal(t, false, reverted) fcs.advance(0, 2, nil) require.Equal(t, false, applied) require.Equal(t, false, reverted) fcs.advance(0, 1, nil) // 8 require.Equal(t, true, applied) require.Equal(t, false, reverted) } func TestAtDoubleTrigger(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, tsc: newTSCache(2*build.ForkLengthThreshold, nil), callNumber: map[string]int{}, } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) var applied bool var reverted bool err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { require.Equal(t, 5, int(ts.Height())) require.Equal(t, 8, int(curH)) applied = true return nil }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil }, 3, 5) require.NoError(t, err) fcs.advance(0, 6, nil) require.False(t, applied) require.False(t, reverted) fcs.advance(0, 1, nil) require.True(t, applied) require.False(t, reverted) applied = false fcs.advance(2, 2, nil) require.False(t, applied) require.False(t, reverted) fcs.advance(4, 4, nil) require.True(t, applied) require.True(t, reverted) } func TestAtNullTrigger(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, tsc: newTSCache(2*build.ForkLengthThreshold, nil), callNumber: map[string]int{}, } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) var applied bool var reverted bool err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { require.Equal(t, abi.ChainEpoch(6), ts.Height()) require.Equal(t, 8, int(curH)) applied = true return nil }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil }, 3, 5) require.NoError(t, err) fcs.advance(0, 6, nil, 5) require.Equal(t, false, applied) require.Equal(t, false, reverted) fcs.advance(0, 3, nil) require.Equal(t, true, applied) require.Equal(t, false, reverted) applied = false } func TestAtNullConf(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, tsc: newTSCache(2*build.ForkLengthThreshold, nil), callNumber: map[string]int{}, } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) var applied bool var reverted bool err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { require.Equal(t, 5, int(ts.Height())) require.Equal(t, 8, int(curH)) applied = true return nil }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil }, 3, 5) require.NoError(t, err) fcs.advance(0, 6, nil) require.Equal(t, false, applied) require.Equal(t, false, reverted) fcs.advance(0, 3, nil, 8) require.Equal(t, true, applied) require.Equal(t, false, reverted) applied = false fcs.advance(7, 1, nil) require.Equal(t, false, applied) require.Equal(t, true, reverted) reverted = false } func TestAtStart(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, tsc: newTSCache(2*build.ForkLengthThreshold, nil), callNumber: map[string]int{}, } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) fcs.advance(0, 5, nil) // 6 var applied bool var reverted bool err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { require.Equal(t, 5, int(ts.Height())) require.Equal(t, 8, int(curH)) applied = true return nil }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil }, 3, 5) require.NoError(t, err) require.Equal(t, false, applied) require.Equal(t, false, reverted) fcs.advance(0, 5, nil) // 11 require.Equal(t, true, applied) require.Equal(t, false, reverted) } func TestAtStartConfidence(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, tsc: newTSCache(2*build.ForkLengthThreshold, nil), callNumber: map[string]int{}, } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) fcs.advance(0, 10, nil) // 11 var applied bool var reverted bool err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { require.Equal(t, 5, int(ts.Height())) require.Equal(t, 11, int(curH)) applied = true return nil }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil }, 3, 5) require.NoError(t, err) require.Equal(t, true, applied) require.Equal(t, false, reverted) } func TestAtChained(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, tsc: newTSCache(2*build.ForkLengthThreshold, nil), callNumber: map[string]int{}, } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) var applied bool var reverted bool err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { return events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { require.Equal(t, 10, int(ts.Height())) applied = true return nil }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil }, 3, 10) }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil }, 3, 5) require.NoError(t, err) fcs.advance(0, 15, nil) require.Equal(t, true, applied) require.Equal(t, false, reverted) } func TestAtChainedConfidence(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, tsc: newTSCache(2*build.ForkLengthThreshold, nil), callNumber: map[string]int{}, } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) fcs.advance(0, 15, nil) var applied bool var reverted bool err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { return events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { require.Equal(t, 10, int(ts.Height())) applied = true return nil }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil }, 3, 10) }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil }, 3, 5) require.NoError(t, err) require.Equal(t, true, applied) require.Equal(t, false, reverted) } func TestAtChainedConfidenceNull(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, tsc: newTSCache(2*build.ForkLengthThreshold, nil), callNumber: map[string]int{}, } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) fcs.advance(0, 15, nil, 5) var applied bool var reverted bool err := events.ChainAt(func(_ context.Context, ts *types.TipSet, curH abi.ChainEpoch) error { applied = true require.Equal(t, 6, int(ts.Height())) return nil }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil }, 3, 5) require.NoError(t, err) require.Equal(t, true, applied) require.Equal(t, false, reverted) } func matchAddrMethod(to address.Address, m abi.MethodNum) func(msg *types.Message) (matched bool, err error) { return func(msg *types.Message) (matched bool, err error) { return to == msg.To && m == msg.Method, nil } } func TestCalled(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, msgs: map[cid.Cid]fakeMsg{}, blkMsgs: map[cid.Cid]cid.Cid{}, tsc: newTSCache(2*build.ForkLengthThreshold, nil), callNumber: map[string]int{}, } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) t0123, err := address.NewFromString("t0123") require.NoError(t, err) more := true var applied, reverted bool var appliedMsg *types.Message var appliedTs *types.TipSet var appliedH abi.ChainEpoch err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil }, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) { require.Equal(t, false, applied) applied = true appliedMsg = msg appliedTs = ts appliedH = curH return more, nil }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil }, 3, 20, matchAddrMethod(t0123, 5)) require.NoError(t, err) // create few blocks to make sure nothing get's randomly called fcs.advance(0, 4, nil) // H=5 require.Equal(t, false, applied) require.Equal(t, false, reverted) // create blocks with message (but below confidence threshold) fcs.advance(0, 3, map[int]cid.Cid{ // msg at H=6; H=8 (confidence=2) 0: fcs.fakeMsgs(fakeMsg{ bmsgs: []*types.Message{ {To: t0123, From: t0123, Method: 5, Nonce: 1}, }, }), }) require.Equal(t, false, applied) require.Equal(t, false, reverted) // create additional block so we are above confidence threshold fcs.advance(0, 2, nil) // H=10 (confidence=3, apply) require.Equal(t, true, applied) require.Equal(t, false, reverted) applied = false // dip below confidence fcs.advance(2, 2, nil) // H=10 (confidence=3, apply) require.Equal(t, false, applied) require.Equal(t, false, reverted) require.Equal(t, abi.ChainEpoch(7), appliedTs.Height()) require.Equal(t, "bafkqaaa", appliedTs.Blocks()[0].Messages.String()) require.Equal(t, abi.ChainEpoch(10), appliedH) require.Equal(t, t0123, appliedMsg.To) require.Equal(t, uint64(1), appliedMsg.Nonce) require.Equal(t, abi.MethodNum(5), appliedMsg.Method) // revert some blocks, keep the message fcs.advance(3, 1, nil) // H=8 (confidence=1) require.Equal(t, false, applied) require.Equal(t, false, reverted) // revert the message fcs.advance(2, 1, nil) // H=7, we reverted ts with the msg execution, but not the msg itself require.Equal(t, false, applied) require.Equal(t, true, reverted) reverted = false // send new message on different height n2msg := fcs.fakeMsgs(fakeMsg{ bmsgs: []*types.Message{ {To: t0123, From: t0123, Method: 5, Nonce: 2}, }, }) fcs.advance(0, 3, map[int]cid.Cid{ // (n2msg confidence=1) 0: n2msg, }) require.Equal(t, true, applied) // msg from H=7, which had reverted execution require.Equal(t, false, reverted) require.Equal(t, abi.ChainEpoch(10), appliedH) applied = false fcs.advance(0, 2, nil) // (confidence=3) require.Equal(t, true, applied) require.Equal(t, false, reverted) applied = false require.Equal(t, abi.ChainEpoch(9), appliedTs.Height()) require.Equal(t, "bafkqaaa", appliedTs.Blocks()[0].Messages.String()) require.Equal(t, abi.ChainEpoch(12), appliedH) require.Equal(t, t0123, appliedMsg.To) require.Equal(t, uint64(2), appliedMsg.Nonce) require.Equal(t, abi.MethodNum(5), appliedMsg.Method) // revert and apply at different height fcs.advance(8, 6, map[int]cid.Cid{ // (confidence=3) 1: n2msg, }) // TODO: We probably don't want to call revert/apply, as restarting certain // actions may be expensive, and in this case the message is still // on-chain, just at different height require.Equal(t, true, applied) require.Equal(t, true, reverted) reverted = false applied = false require.Equal(t, abi.ChainEpoch(7), appliedTs.Height()) require.Equal(t, "bafkqaaa", appliedTs.Blocks()[0].Messages.String()) require.Equal(t, abi.ChainEpoch(10), appliedH) require.Equal(t, t0123, appliedMsg.To) require.Equal(t, uint64(2), appliedMsg.Nonce) require.Equal(t, abi.MethodNum(5), appliedMsg.Method) // call method again fcs.advance(0, 5, map[int]cid.Cid{ 0: n2msg, }) require.Equal(t, true, applied) require.Equal(t, false, reverted) applied = false // send and revert below confidence, then cross confidence fcs.advance(0, 2, map[int]cid.Cid{ 0: fcs.fakeMsgs(fakeMsg{ bmsgs: []*types.Message{ {To: t0123, From: t0123, Method: 5, Nonce: 3}, }, }), }) fcs.advance(2, 5, nil) // H=19, but message reverted require.Equal(t, false, applied) require.Equal(t, false, reverted) // test timeout (it's set to 20 in the call to `events.Called` above) fcs.advance(0, 6, nil) require.Equal(t, false, applied) // not calling timeout as we received messages require.Equal(t, false, reverted) // test unregistering with more more = false fcs.advance(0, 5, map[int]cid.Cid{ 0: fcs.fakeMsgs(fakeMsg{ bmsgs: []*types.Message{ {To: t0123, From: t0123, Method: 5, Nonce: 4}, // this signals we don't want more }, }), }) require.Equal(t, true, applied) require.Equal(t, false, reverted) applied = false fcs.advance(0, 5, map[int]cid.Cid{ 0: fcs.fakeMsgs(fakeMsg{ bmsgs: []*types.Message{ {To: t0123, From: t0123, Method: 5, Nonce: 5}, }, }), }) require.Equal(t, false, applied) // should not get any further notifications require.Equal(t, false, reverted) // revert after disabled fcs.advance(5, 1, nil) // try reverting msg sent after disabling require.Equal(t, false, applied) require.Equal(t, false, reverted) fcs.advance(5, 1, nil) // try reverting msg sent before disabling require.Equal(t, false, applied) require.Equal(t, true, reverted) } func TestCalledTimeout(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, msgs: map[cid.Cid]fakeMsg{}, blkMsgs: map[cid.Cid]cid.Cid{}, tsc: newTSCache(2*build.ForkLengthThreshold, nil), callNumber: map[string]int{}, } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) t0123, err := address.NewFromString("t0123") require.NoError(t, err) called := false err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil }, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) { called = true require.Nil(t, msg) require.Equal(t, abi.ChainEpoch(20), ts.Height()) require.Equal(t, abi.ChainEpoch(23), curH) return false, nil }, func(_ context.Context, ts *types.TipSet) error { t.Fatal("revert on timeout") return nil }, 3, 20, matchAddrMethod(t0123, 5)) require.NoError(t, err) fcs.advance(0, 21, nil) require.False(t, called) fcs.advance(0, 5, nil) require.True(t, called) called = false // with check func reporting done fcs = &fakeCS{ t: t, h: 1, msgs: map[cid.Cid]fakeMsg{}, blkMsgs: map[cid.Cid]cid.Cid{}, callNumber: map[string]int{}, tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events = NewEvents(context.Background(), fcs) err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) { return true, true, nil }, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) { called = true require.Nil(t, msg) require.Equal(t, abi.ChainEpoch(20), ts.Height()) require.Equal(t, abi.ChainEpoch(23), curH) return false, nil }, func(_ context.Context, ts *types.TipSet) error { t.Fatal("revert on timeout") return nil }, 3, 20, matchAddrMethod(t0123, 5)) require.NoError(t, err) fcs.advance(0, 21, nil) require.False(t, called) fcs.advance(0, 5, nil) require.False(t, called) } func TestCalledOrder(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, msgs: map[cid.Cid]fakeMsg{}, blkMsgs: map[cid.Cid]cid.Cid{}, tsc: newTSCache(2*build.ForkLengthThreshold, nil), callNumber: map[string]int{}, } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) t0123, err := address.NewFromString("t0123") require.NoError(t, err) at := 0 err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil }, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) { switch at { case 0: require.Equal(t, uint64(1), msg.Nonce) require.Equal(t, abi.ChainEpoch(4), ts.Height()) case 1: require.Equal(t, uint64(2), msg.Nonce) require.Equal(t, abi.ChainEpoch(5), ts.Height()) default: t.Fatal("apply should only get called twice, at: ", at) } at++ return true, nil }, func(_ context.Context, ts *types.TipSet) error { switch at { case 2: require.Equal(t, abi.ChainEpoch(5), ts.Height()) case 3: require.Equal(t, abi.ChainEpoch(4), ts.Height()) default: t.Fatal("revert should only get called twice, at: ", at) } at++ return nil }, 3, 20, matchAddrMethod(t0123, 5)) require.NoError(t, err) fcs.advance(0, 10, map[int]cid.Cid{ 1: fcs.fakeMsgs(fakeMsg{ bmsgs: []*types.Message{ {To: t0123, From: t0123, Method: 5, Nonce: 1}, }, }), 2: fcs.fakeMsgs(fakeMsg{ bmsgs: []*types.Message{ {To: t0123, From: t0123, Method: 5, Nonce: 2}, }, }), }) fcs.advance(9, 1, nil) } func TestCalledNull(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, msgs: map[cid.Cid]fakeMsg{}, blkMsgs: map[cid.Cid]cid.Cid{}, tsc: newTSCache(2*build.ForkLengthThreshold, nil), callNumber: map[string]int{}, } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) t0123, err := address.NewFromString("t0123") require.NoError(t, err) more := true var applied, reverted bool err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil }, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) { require.Equal(t, false, applied) applied = true return more, nil }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil }, 3, 20, matchAddrMethod(t0123, 5)) require.NoError(t, err) // create few blocks to make sure nothing get's randomly called fcs.advance(0, 4, nil) // H=5 require.Equal(t, false, applied) require.Equal(t, false, reverted) // create blocks with message (but below confidence threshold) fcs.advance(0, 3, map[int]cid.Cid{ // msg at H=6; H=8 (confidence=2) 0: fcs.fakeMsgs(fakeMsg{ bmsgs: []*types.Message{ {To: t0123, From: t0123, Method: 5, Nonce: 1}, }, }), }) require.Equal(t, false, applied) require.Equal(t, false, reverted) // create additional blocks so we are above confidence threshold, but with null tipset at the height // of application fcs.advance(0, 3, nil, 10) // H=11 (confidence=3, apply) require.Equal(t, true, applied) require.Equal(t, false, reverted) applied = false fcs.advance(5, 1, nil, 10) require.Equal(t, false, applied) require.Equal(t, true, reverted) } func TestRemoveTriggersOnMessage(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, msgs: map[cid.Cid]fakeMsg{}, blkMsgs: map[cid.Cid]cid.Cid{}, tsc: newTSCache(2*build.ForkLengthThreshold, nil), callNumber: map[string]int{}, } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) t0123, err := address.NewFromString("t0123") require.NoError(t, err) more := true var applied, reverted bool err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil }, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) { require.Equal(t, false, applied) applied = true return more, nil }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil }, 3, 20, matchAddrMethod(t0123, 5)) require.NoError(t, err) // create few blocks to make sure nothing get's randomly called fcs.advance(0, 4, nil) // H=5 require.Equal(t, false, applied) require.Equal(t, false, reverted) // create blocks with message (but below confidence threshold) fcs.advance(0, 3, map[int]cid.Cid{ // msg occurs at H=5, applied at H=6; H=8 (confidence=2) 0: fcs.fakeMsgs(fakeMsg{ bmsgs: []*types.Message{ {To: t0123, From: t0123, Method: 5, Nonce: 1}, }, }), }) require.Equal(t, false, applied) require.Equal(t, false, reverted) // revert applied TS & message TS fcs.advance(3, 1, nil) // H=6 (tipset message applied in reverted, AND message reverted) require.Equal(t, false, applied) require.Equal(t, false, reverted) // create additional blocks so we are above confidence threshold, but message not applied // as it was reverted fcs.advance(0, 5, nil) // H=11 (confidence=3, apply) require.Equal(t, false, applied) require.Equal(t, false, reverted) // create blocks with message again (but below confidence threshold) fcs.advance(0, 3, map[int]cid.Cid{ // msg occurs at H=12, applied at H=13; H=15 (confidence=2) 0: fcs.fakeMsgs(fakeMsg{ bmsgs: []*types.Message{ {To: t0123, From: t0123, Method: 5, Nonce: 2}, }, }), }) require.Equal(t, false, applied) require.Equal(t, false, reverted) // revert applied height TS, but don't remove message trigger fcs.advance(2, 1, nil) // H=13 (tipset message applied in reverted, by tipset with message not reverted) require.Equal(t, false, applied) require.Equal(t, false, reverted) // create additional blocks so we are above confidence threshold fcs.advance(0, 4, nil) // H=18 (confidence=3, apply) require.Equal(t, true, applied) require.Equal(t, false, reverted) } type testStateChange struct { from string to string } func TestStateChanged(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, msgs: map[cid.Cid]fakeMsg{}, blkMsgs: map[cid.Cid]cid.Cid{}, tsc: newTSCache(2*build.ForkLengthThreshold, nil), callNumber: map[string]int{}, } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) more := true var applied, reverted bool var appliedData StateChange var appliedOldTs *types.TipSet var appliedNewTs *types.TipSet var appliedH abi.ChainEpoch var matchData StateChange confidence := 3 timeout := abi.ChainEpoch(20) err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { require.Equal(t, false, applied) applied = true appliedData = data appliedOldTs = oldTs appliedNewTs = newTs appliedH = curH return more, nil }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil }, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { if matchData == nil { return false, matchData, nil } d := matchData matchData = nil return true, d, nil }) require.NoError(t, err) // create few blocks to make sure nothing get's randomly called fcs.advance(0, 4, nil) // H=5 require.Equal(t, false, applied) require.Equal(t, false, reverted) // create state change (but below confidence threshold) matchData = testStateChange{from: "a", to: "b"} fcs.advance(0, 3, nil) require.Equal(t, false, applied) require.Equal(t, false, reverted) // create additional block so we are above confidence threshold fcs.advance(0, 2, nil) // H=10 (confidence=3, apply) require.Equal(t, true, applied) require.Equal(t, false, reverted) applied = false // dip below confidence (should not apply again) fcs.advance(2, 2, nil) // H=10 (confidence=3, apply) require.Equal(t, false, applied) require.Equal(t, false, reverted) // Change happens from 5 -> 6 require.Equal(t, abi.ChainEpoch(5), appliedOldTs.Height()) require.Equal(t, abi.ChainEpoch(6), appliedNewTs.Height()) // Actually applied (with confidence) at 9 require.Equal(t, abi.ChainEpoch(9), appliedH) // Make sure the state change was correctly passed through rcvd := appliedData.(testStateChange) require.Equal(t, "a", rcvd.from) require.Equal(t, "b", rcvd.to) } func TestStateChangedRevert(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, msgs: map[cid.Cid]fakeMsg{}, blkMsgs: map[cid.Cid]cid.Cid{}, tsc: newTSCache(2*build.ForkLengthThreshold, nil), callNumber: map[string]int{}, } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) more := true var applied, reverted bool var matchData StateChange confidence := 1 timeout := abi.ChainEpoch(20) err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { require.Equal(t, false, applied) applied = true return more, nil }, func(_ context.Context, ts *types.TipSet) error { reverted = true return nil }, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { if matchData == nil { return false, matchData, nil } d := matchData matchData = nil return true, d, nil }) require.NoError(t, err) fcs.advance(0, 2, nil) // H=3 // Make a state change from TS at height 3 to TS at height 4 matchData = testStateChange{from: "a", to: "b"} fcs.advance(0, 1, nil) // H=4 // Haven't yet reached confidence require.Equal(t, false, applied) require.Equal(t, false, reverted) // Advance to reach confidence level fcs.advance(0, 1, nil) // H=5 // Should now have called the handler require.Equal(t, true, applied) require.Equal(t, false, reverted) applied = false // Advance 3 more TS fcs.advance(0, 3, nil) // H=8 require.Equal(t, false, applied) require.Equal(t, false, reverted) // Regress but not so far as to cause a revert fcs.advance(3, 1, nil) // H=6 require.Equal(t, false, applied) require.Equal(t, false, reverted) // Regress back to state where change happened fcs.advance(3, 1, nil) // H=4 // Expect revert to have happened require.Equal(t, false, applied) require.Equal(t, true, reverted) } func TestStateChangedTimeout(t *testing.T) { timeoutHeight := abi.ChainEpoch(20) confidence := 3 testCases := []struct { name string checkFn CheckFunc nilBlocks []int expectTimeout bool }{{ // Verify that the state changed timeout is called at the expected height name: "state changed timeout", checkFn: func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil }, expectTimeout: true, }, { // Verify that the state changed timeout is called even if the timeout // falls on nil block name: "state changed timeout falls on nil block", checkFn: func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil }, nilBlocks: []int{20, 21, 22, 23}, expectTimeout: true, }, { // Verify that the state changed timeout is not called if the check // function reports that it's complete name: "no timeout callback if check func reports done", checkFn: func(ts *types.TipSet) (d bool, m bool, e error) { return true, true, nil }, expectTimeout: false, }} for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, msgs: map[cid.Cid]fakeMsg{}, blkMsgs: map[cid.Cid]cid.Cid{}, tsc: newTSCache(2*build.ForkLengthThreshold, nil), callNumber: map[string]int{}, } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) // Track whether the callback was called called := false // Set up state change tracking that will timeout at the given height err := events.StateChanged( tc.checkFn, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { // Expect the callback to be called at the timeout height with nil data called = true require.Nil(t, data) require.Equal(t, timeoutHeight, newTs.Height()) require.Equal(t, timeoutHeight+abi.ChainEpoch(confidence), curH) return false, nil }, func(_ context.Context, ts *types.TipSet) error { t.Fatal("revert on timeout") return nil }, confidence, timeoutHeight, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { return false, nil, nil }) require.NoError(t, err) // Advance to timeout height fcs.advance(0, int(timeoutHeight)+1, nil) require.False(t, called) // Advance past timeout height fcs.advance(0, 5, nil, tc.nilBlocks...) require.Equal(t, tc.expectTimeout, called) called = false }) } } func TestCalledMultiplePerEpoch(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, msgs: map[cid.Cid]fakeMsg{}, blkMsgs: map[cid.Cid]cid.Cid{}, callNumber: map[string]int{}, tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) events := NewEvents(context.Background(), fcs) t0123, err := address.NewFromString("t0123") require.NoError(t, err) at := 0 err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) { return false, true, nil }, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) { switch at { case 0: require.Equal(t, uint64(1), msg.Nonce) require.Equal(t, abi.ChainEpoch(4), ts.Height()) case 1: require.Equal(t, uint64(2), msg.Nonce) require.Equal(t, abi.ChainEpoch(4), ts.Height()) default: t.Fatal("apply should only get called twice, at: ", at) } at++ return true, nil }, func(_ context.Context, ts *types.TipSet) error { switch at { case 2: require.Equal(t, abi.ChainEpoch(4), ts.Height()) case 3: require.Equal(t, abi.ChainEpoch(4), ts.Height()) default: t.Fatal("revert should only get called twice, at: ", at) } at++ return nil }, 3, 20, matchAddrMethod(t0123, 5)) require.NoError(t, err) fcs.advance(0, 10, map[int]cid.Cid{ 1: fcs.fakeMsgs(fakeMsg{ bmsgs: []*types.Message{ {To: t0123, From: t0123, Method: 5, Nonce: 1}, {To: t0123, From: t0123, Method: 5, Nonce: 2}, }, }), }) fcs.advance(9, 1, nil) } func TestCachedSameBlock(t *testing.T) { fcs := &fakeCS{ t: t, h: 1, msgs: map[cid.Cid]fakeMsg{}, blkMsgs: map[cid.Cid]cid.Cid{}, callNumber: map[string]int{}, tsc: newTSCache(2*build.ForkLengthThreshold, nil), } require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) _ = NewEvents(context.Background(), fcs) fcs.advance(0, 10, map[int]cid.Cid{}) assert.Assert(t, fcs.callNumber["ChainGetBlockMessages"] == 20, "expect call ChainGetBlockMessages %d but got ", 20, fcs.callNumber["ChainGetBlockMessages"]) fcs.advance(5, 10, map[int]cid.Cid{}) assert.Assert(t, fcs.callNumber["ChainGetBlockMessages"] == 30, "expect call ChainGetBlockMessages %d but got ", 30, fcs.callNumber["ChainGetBlockMessages"]) }