diff --git a/chain/events/events.go b/chain/events/events.go index a325b5410..e11507795 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -51,7 +51,7 @@ type Events struct { readyOnce sync.Once heightEvents - calledEvents + *hcEvents } func NewEvents(ctx context.Context, api eventAPI) *Events { @@ -74,18 +74,7 @@ func NewEvents(ctx context.Context, api eventAPI) *Events { htHeights: map[abi.ChainEpoch][]uint64{}, }, - calledEvents: calledEvents{ - cs: api, - tsc: tsc, - ctx: ctx, - gcConfidence: uint64(gcConfidence), - - confQueue: map[triggerH]map[msgH][]*queuedEvent{}, - revertQueue: map[msgH][]triggerH{}, - triggers: map[triggerID]*callHandler{}, - matchers: map[triggerID][]MatchFunc{}, - timeouts: map[abi.ChainEpoch]map[triggerID]int{}, - }, + hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)), } e.ready.Add(1) @@ -143,7 +132,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error { } e.readyOnce.Do(func() { - e.at = cur[0].Val.Height() + e.lastTs = cur[0].Val e.ready.Done() }) @@ -186,5 +175,5 @@ func (e *Events) headChange(rev, app []*types.TipSet) error { return err } - return e.headChangeCalled(rev, app) + return e.processHeadChangeEvent(rev, app) } diff --git a/chain/events/events_called.go b/chain/events/events_called.go index 0bae99404..3d8e05c02 100644 --- a/chain/events/events_called.go +++ b/chain/events/events_called.go @@ -13,6 +13,7 @@ import ( ) const NoTimeout = math.MaxInt64 +const NoHeight = abi.ChainEpoch(-1) type triggerID = uint64 @@ -23,54 +24,60 @@ type msgH = abi.ChainEpoch // message (msgH+confidence) type triggerH = abi.ChainEpoch -// CalledHandler arguments: -// `ts` is the tipset, in which the `msg` is included. +type eventData interface{} + +// EventHandler arguments: +// `prevTs` is the previous tipset, eg the "from" tipset for a state change. +// `ts` is the event tipset, eg the tipset in which the `msg` is included. // `curH`-`ts.Height` = `confidence` -type CalledHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) +type EventHandler func(data eventData, prevTs, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) // CheckFunc is used for atomicity guarantees. If the condition the callbacks // wait for has already happened in tipset `ts` // // If `done` is true, timeout won't be triggered -// If `more` is false, no messages will be sent to CalledHandler (RevertHandler +// If `more` is false, no messages will be sent to EventHandler (RevertHandler // may still be called) type CheckFunc func(ts *types.TipSet) (done bool, more bool, err error) -type MatchFunc func(msg *types.Message) (bool, error) - -type callHandler struct { +// Keep track of information for an event handler +type handlerInfo struct { confidence int timeout abi.ChainEpoch disabled bool // TODO: GC after gcConfidence reached - handle CalledHandler + handle EventHandler revert RevertHandler } +// When a change occurs, a queuedEvent is created and put into a queue +// until the required confidence is reached type queuedEvent struct { trigger triggerID - h abi.ChainEpoch - msg *types.Message + prevH abi.ChainEpoch + h abi.ChainEpoch + data eventData called bool } -type calledEvents struct { +// Manages chain head change events, which may be forward (new tipset added to +// chain) or backward (chain branch discarded in favour of heavier branch) +type hcEvents struct { cs eventAPI tsc *tipSetCache ctx context.Context gcConfidence uint64 - at abi.ChainEpoch + lastTs *types.TipSet lk sync.Mutex ctr triggerID - triggers map[triggerID]*callHandler - matchers map[triggerID][]MatchFunc + triggers map[triggerID]*handlerInfo // maps block heights to events // [triggerH][msgH][event] @@ -81,30 +88,79 @@ type calledEvents struct { // [timeoutH+confidence][triggerID]{calls} timeouts map[abi.ChainEpoch]map[triggerID]int + + messageEvents + watcherEvents } -func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error { +func newHCEvents(ctx context.Context, cs eventAPI, tsc *tipSetCache, gcConfidence uint64) *hcEvents { + e := hcEvents{ + ctx: ctx, + cs: cs, + tsc: tsc, + gcConfidence: gcConfidence, + + confQueue: map[triggerH]map[msgH][]*queuedEvent{}, + revertQueue: map[msgH][]triggerH{}, + triggers: map[triggerID]*handlerInfo{}, + timeouts: map[abi.ChainEpoch]map[triggerID]int{}, + } + + e.messageEvents = newMessageEvents(ctx, &e, cs) + e.watcherEvents = newWatcherEvents(ctx, &e, cs) + + return &e +} + +// Called when there is a change to the head with tipsets to be +// reverted / applied +func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error { e.lk.Lock() defer e.lk.Unlock() for _, ts := range rev { e.handleReverts(ts) - e.at = ts.Height() + e.lastTs = ts } for _, ts := range app { - // called triggers - e.checkNewCalls(ts) - for ; e.at <= ts.Height(); e.at++ { - e.applyWithConfidence(ts, e.at) + // Check if the head change caused any state changes that we were + // waiting for + stateChanges := e.watcherEvents.checkStateChanges(e.lastTs, ts) + + // Queue up calls until there have been enough blocks to reach + // confidence on the state changes + for tid, data := range stateChanges { + e.queueForConfidence(tid, data, e.lastTs, ts) + } + + // Check if the head change included any new message calls + newCalls, err := e.messageEvents.checkNewCalls(ts) + if err != nil { + return err + } + + // Queue up calls until there have been enough blocks to reach + // confidence on the message calls + for tid, data := range newCalls { + e.queueForConfidence(tid, data, nil, ts) + } + + for at := e.lastTs.Height(); at <= ts.Height(); at++ { + // Apply any queued events and timeouts that were targeted at the + // current chain height + e.applyWithConfidence(ts, at) e.applyTimeouts(ts) } + + // Update the latest known tipset + e.lastTs = ts } return nil } -func (e *calledEvents) handleReverts(ts *types.TipSet) { +func (e *hcEvents) handleReverts(ts *types.TipSet) { reverts, ok := e.revertQueue[ts.Height()] if !ok { return // nothing to do @@ -120,7 +176,7 @@ func (e *calledEvents) handleReverts(ts *types.TipSet) { trigger := e.triggers[event.trigger] if err := trigger.revert(e.ctx, ts); err != nil { - log.Errorf("reverting chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, ts.Height(), triggerH, err) + log.Errorf("reverting chain trigger (@H %d, triggered @ %d) failed: %s", ts.Height(), triggerH, err) } } delete(e.confQueue[triggerH], ts.Height()) @@ -128,41 +184,15 @@ func (e *calledEvents) handleReverts(ts *types.TipSet) { delete(e.revertQueue, ts.Height()) } -func (e *calledEvents) checkNewCalls(ts *types.TipSet) { - pts, err := e.cs.ChainGetTipSet(e.ctx, ts.Parents()) // we actually care about messages in the parent tipset here - if err != nil { - log.Errorf("getting parent tipset in checkNewCalls: %s", err) - return - } - - e.messagesForTs(pts, func(msg *types.Message) { - // TODO: provide receipts - for tid, matchFns := range e.matchers { - var matched bool - for _, matchFn := range matchFns { - ok, err := matchFn(msg) - if err != nil { - log.Errorf("event matcher failed: %s", err) - continue - } - matched = ok - - if matched { - break - } - } - - if matched { - e.queueForConfidence(tid, msg, ts) - break - } - } - }) -} - -func (e *calledEvents) queueForConfidence(trigID uint64, msg *types.Message, ts *types.TipSet) { +// Queue up events until the chain has reached a height that reflects the +// desired confidence +func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, prevTs, ts *types.TipSet) { trigger := e.triggers[trigID] + prevH := NoHeight + if prevTs != nil { + prevH = prevTs.Height() + } appliedH := ts.Height() triggerH := appliedH + abi.ChainEpoch(trigger.confidence) @@ -175,17 +205,19 @@ func (e *calledEvents) queueForConfidence(trigID uint64, msg *types.Message, ts byOrigH[appliedH] = append(byOrigH[appliedH], &queuedEvent{ trigger: trigID, + prevH: prevH, h: appliedH, - msg: msg, + data: data, }) e.revertQueue[appliedH] = append(e.revertQueue[appliedH], triggerH) } -func (e *calledEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) { +// Apply any events that were waiting for this chain height for confidence +func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) { byOrigH, ok := e.confQueue[height] if !ok { - return // no triggers at thin height + return // no triggers at this height } for origH, events := range byOrigH { @@ -204,15 +236,20 @@ func (e *calledEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpo continue } - rec, err := e.cs.StateGetReceipt(e.ctx, event.msg.Cid(), ts.Key()) - if err != nil { - log.Error(err) - return + // Previous tipset - this is relevant for example in a state change + // from one tipset to another + var prevTs *types.TipSet + if event.prevH != NoHeight { + prevTs, err = e.tsc.get(event.prevH) + if err != nil { + log.Errorf("events: applyWithConfidence didn't find tipset for previous event; wanted %d; current %d", event.prevH, height) + continue + } } - more, err := trigger.handle(event.msg, rec, triggerTs, height) + more, err := trigger.handle(event.data, prevTs, triggerTs, height) if err != nil { - log.Errorf("chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, origH, height, err) + log.Errorf("chain trigger (@H %d, triggered @ %d) failed: %s", origH, height, err) continue // don't revert failed calls } @@ -228,7 +265,8 @@ func (e *calledEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpo } } -func (e *calledEvents) applyTimeouts(ts *types.TipSet) { +// Apply any timeouts that expire at this height +func (e *hcEvents) applyTimeouts(ts *types.TipSet) { triggers, ok := e.timeouts[ts.Height()] if !ok { return // nothing to do @@ -258,12 +296,224 @@ func (e *calledEvents) applyTimeouts(ts *types.TipSet) { } } -func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) { +// Listen for an event +// - CheckFunc: immediately checks if the event already occurred +// - EventHandler: called when the event has occurred, after confidence tipsets +// - RevertHandler: called if the chain head changes causing the event to revert +// - confidence: wait this many tipsets before calling EventHandler +// - timeout: at this chain height, timeout on waiting for this event +func (e *hcEvents) onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error) { + e.lk.Lock() + defer e.lk.Unlock() + + // Check if the event has already occurred + ts := e.tsc.best() + done, more, err := check(ts) + if err != nil { + return 0, xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err) + } + if done { + timeout = NoTimeout + } + + // Create a trigger for the event + id := e.ctr + e.ctr++ + + e.triggers[id] = &handlerInfo{ + confidence: confidence, + timeout: timeout + abi.ChainEpoch(confidence), + + disabled: !more, + + handle: hnd, + revert: rev, + } + + // If there's a timeout, set up a timeout check at that height + if timeout != NoTimeout { + if e.timeouts[timeout+abi.ChainEpoch(confidence)] == nil { + e.timeouts[timeout+abi.ChainEpoch(confidence)] = map[uint64]int{} + } + e.timeouts[timeout+abi.ChainEpoch(confidence)][id] = 0 + } + + return id, nil +} + +// headChangeAPI is used to allow the composed event APIs to call back to hcEvents +// to listen for changes +type headChangeAPI interface { + onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error) +} + +// watcherEvents watches for a state change +type watcherEvents struct { + ctx context.Context + cs eventAPI + hcAPI headChangeAPI + + lk sync.RWMutex + matchers map[triggerID]StateMatchFunc +} + +func newWatcherEvents(ctx context.Context, hcAPI headChangeAPI, cs eventAPI) watcherEvents { + return watcherEvents{ + ctx: ctx, + cs: cs, + hcAPI: hcAPI, + matchers: make(map[triggerID]StateMatchFunc), + } +} + +// Run each of the matchers against the previous and current state to see if +// there's a change +func (we *watcherEvents) checkStateChanges(oldState, newState *types.TipSet) map[triggerID]eventData { + we.lk.RLock() + defer we.lk.RUnlock() + + res := make(map[triggerID]eventData) + for tid, matchFn := range we.matchers { + ok, data, err := matchFn(oldState, newState) + if err != nil { + log.Errorf("event diff fn failed: %s", err) + continue + } + + if ok { + res[tid] = data + } + } + return res +} + +// StateChange represents a change in state +type StateChange interface{} + +// StateChangeHandler arguments: +// `oldTs` is the state "from" tipset +// `newTs` is the state "to" tipset +// `states` is the change in state +// `curH`-`ts.Height` = `confidence` +type StateChangeHandler func(oldTs, newTs *types.TipSet, states StateChange, curH abi.ChainEpoch) (more bool, err error) + +type StateMatchFunc func(oldTs, newTs *types.TipSet) (bool, StateChange, error) + +// StateChanged registers a callback which is triggered when a specified state +// change occurs or a timeout is reached. +// +// * `CheckFunc` callback is invoked immediately with a recent tipset, it +// returns two booleans - `done`, and `more`. +// +// * `done` should be true when some on-chain state change we are waiting +// for has happened. When `done` is set to true, timeout trigger is disabled. +// +// * `more` should be false when we don't want to receive new notifications +// through StateChangeHandler. Note that notifications may still be delivered to +// RevertHandler +// +// * `StateChangeHandler` is called when the specified state change was observed +// on-chain, and a confidence threshold was reached, or the specified `timeout` +// height was reached with no state change observed. When this callback is +// invoked on a timeout, `oldState` and `newState` are set to nil. +// This callback returns a boolean specifying whether further notifications +// should be sent, like `more` return param from `CheckFunc` above. +// +// * `RevertHandler` is called after apply handler, when we drop the tipset +// containing the message. The tipset passed as the argument is the tipset +// that is being dropped. Note that the event dropped may be re-applied +// in a different tipset in small amount of time. +// +// * `StateMatchFunc` is called against each tipset state. If there is a match, +// the state change is queued up until the confidence interval has elapsed (and +// `StateChangeHandler` is called) +func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf StateMatchFunc) error { + hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) { + states, ok := data.(StateChange) + if data != nil && !ok { + panic("expected StateChange") + } + + return scHnd(prevTs, ts, states, height) + } + + id, err := we.hcAPI.onHeadChanged(check, hnd, rev, confidence, timeout) + if err != nil { + return err + } + + we.lk.Lock() + we.matchers[id] = mf + defer we.lk.Unlock() + + return nil +} + +// messageEvents watches for message calls to actors +type messageEvents struct { + ctx context.Context + cs eventAPI + hcAPI headChangeAPI + + lk sync.RWMutex + matchers map[triggerID][]MsgMatchFunc +} + +func newMessageEvents(ctx context.Context, hcAPI headChangeAPI, cs eventAPI) messageEvents { + return messageEvents{ + ctx: ctx, + cs: cs, + hcAPI: hcAPI, + matchers: map[triggerID][]MsgMatchFunc{}, + } +} + +// Check if there are any new actor calls +func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventData, error) { + pts, err := me.cs.ChainGetTipSet(me.ctx, ts.Parents()) // we actually care about messages in the parent tipset here + if err != nil { + log.Errorf("getting parent tipset in checkNewCalls: %s", err) + return nil, err + } + + res := make(map[triggerID]eventData) + me.messagesForTs(pts, func(msg *types.Message) { + me.lk.RLock() + defer me.lk.RUnlock() + // TODO: provide receipts + + for tid, matchFns := range me.matchers { + var matched bool + for _, matchFn := range matchFns { + ok, err := matchFn(msg) + if err != nil { + log.Errorf("event matcher failed: %s", err) + continue + } + matched = ok + + if matched { + break + } + } + + if matched { + res[tid] = msg + break + } + } + }) + + return res, nil +} + +// Get the messages in a tipset +func (me *messageEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) { seen := map[cid.Cid]struct{}{} for _, tsb := range ts.Blocks() { - msgs, err := e.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid()) + msgs, err := me.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid()) if err != nil { log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err) // this is quite bad, but probably better than missing all the other updates @@ -292,20 +542,27 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa } } -// Called registers a callbacks which are triggered when a specified method is +// MsgHandler arguments: +// `ts` is the tipset, in which the `msg` is included. +// `curH`-`ts.Height` = `confidence` +type MsgHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) + +type MsgMatchFunc func(msg *types.Message) (bool, error) + +// Called registers a callback which is triggered when a specified method is // called on an actor, or a timeout is reached. // // * `CheckFunc` callback is invoked immediately with a recent tipset, it // returns two booleans - `done`, and `more`. // -// * `done` should be true when some on-chain action we are waiting for has -// happened. When `done` is set to true, timeout trigger is disabled. +// * `done` should be true when some on-chain action we are waiting for has +// happened. When `done` is set to true, timeout trigger is disabled. // -// * `more` should be false when we don't want to receive new notifications -// through CalledHandler. Note that notifications may still be delivered to -// RevertHandler +// * `more` should be false when we don't want to receive new notifications +// through MsgHandler. Note that notifications may still be delivered to +// RevertHandler // -// * `CalledHandler` is called when the specified event was observed on-chain, +// * `MsgHandler` is called when the specified event was observed on-chain, // and a confidence threshold was reached, or the specified `timeout` height // was reached with no events observed. When this callback is invoked on a // timeout, `msg` is set to nil. This callback returns a boolean specifying @@ -316,44 +573,38 @@ func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Messa // containing the message. The tipset passed as the argument is the tipset // that is being dropped. Note that the message dropped may be re-applied // in a different tipset in small amount of time. -func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MatchFunc) error { - e.lk.Lock() - defer e.lk.Unlock() - - ts := e.tsc.best() - done, more, err := check(ts) - if err != nil { - return xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err) - } - if done { - timeout = NoTimeout - } - - id := e.ctr - e.ctr++ - - e.triggers[id] = &callHandler{ - confidence: confidence, - timeout: timeout + abi.ChainEpoch(confidence), - - disabled: !more, - - handle: hnd, - revert: rev, - } - - e.matchers[id] = append(e.matchers[id], mf) - - if timeout != NoTimeout { - if e.timeouts[timeout+abi.ChainEpoch(confidence)] == nil { - e.timeouts[timeout+abi.ChainEpoch(confidence)] = map[uint64]int{} +// +// * `MsgMatchFunc` is called against each message. If there is a match, the +// message is queued up until the confidence interval has elapsed (and +// `MsgHandler` is called) +func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MsgMatchFunc) error { + hnd := func(data eventData, prevTs, ts *types.TipSet, height abi.ChainEpoch) (bool, error) { + msg, ok := data.(*types.Message) + if data != nil && !ok { + panic("expected msg") } - e.timeouts[timeout+abi.ChainEpoch(confidence)][id] = 0 + + rec, err := me.cs.StateGetReceipt(me.ctx, msg.Cid(), ts.Key()) + if err != nil { + return false, err + } + + return msgHnd(msg, rec, ts, height) } + id, err := me.hcAPI.onHeadChanged(check, hnd, rev, confidence, timeout) + if err != nil { + return err + } + + me.lk.Lock() + defer me.lk.Unlock() + me.matchers[id] = append(me.matchers[id], mf) + return nil } -func (e *calledEvents) CalledMsg(ctx context.Context, hnd CalledHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, msg types.ChainMsg) error { - return e.Called(e.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, e.MatchMsg(msg.VMMessage())) +// Convenience function for checking and matching messages +func (me *messageEvents) CalledMsg(ctx context.Context, hnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, msg types.ChainMsg) error { + return me.Called(me.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, me.MatchMsg(msg.VMMessage())) } diff --git a/chain/events/events_test.go b/chain/events/events_test.go index a048789ec..5798fb75c 100644 --- a/chain/events/events_test.go +++ b/chain/events/events_test.go @@ -1004,8 +1004,6 @@ func TestRemoveTriggersOnMessage(t *testing.T) { return false, true, nil }, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) { require.Equal(t, false, applied) - fmt.Println(msg == nil) - fmt.Println(curH) applied = true return more, nil }, func(_ context.Context, ts *types.TipSet) error { @@ -1067,3 +1065,250 @@ func TestRemoveTriggersOnMessage(t *testing.T) { require.Equal(t, true, applied) require.Equal(t, false, reverted) } + +type testStateChange struct { + from string + to string +} + +func TestStateChanged(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + more := true + var applied, reverted bool + var appliedData StateChange + var appliedOldTs *types.TipSet + var appliedNewTs *types.TipSet + var appliedH abi.ChainEpoch + var matchData StateChange + + confidence := 3 + timeout := abi.ChainEpoch(20) + + err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { + return false, true, nil + }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { + require.Equal(t, false, applied) + applied = true + appliedData = data + appliedOldTs = oldTs + appliedNewTs = newTs + appliedH = curH + return more, nil + }, func(_ context.Context, ts *types.TipSet) error { + reverted = true + return nil + }, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { + if matchData == nil { + return false, matchData, nil + } + + d := matchData + matchData = nil + return true, d, nil + }) + require.NoError(t, err) + + // create few blocks to make sure nothing get's randomly called + + fcs.advance(0, 4, nil) // H=5 + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // create state change (but below confidence threshold) + matchData = testStateChange{from: "a", to: "b"} + fcs.advance(0, 3, nil) + + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // create additional block so we are above confidence threshold + + fcs.advance(0, 2, nil) // H=10 (confidence=3, apply) + + require.Equal(t, true, applied) + require.Equal(t, false, reverted) + applied = false + + // dip below confidence (should not apply again) + fcs.advance(2, 2, nil) // H=10 (confidence=3, apply) + + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // Change happens from 5 -> 6 + require.Equal(t, abi.ChainEpoch(5), appliedOldTs.Height()) + require.Equal(t, abi.ChainEpoch(6), appliedNewTs.Height()) + + // Actually applied (with confidence) at 9 + require.Equal(t, abi.ChainEpoch(9), appliedH) + + // Make sure the state change was correctly passed through + rcvd := appliedData.(testStateChange) + require.Equal(t, "a", rcvd.from) + require.Equal(t, "b", rcvd.to) +} + +func TestStateChangedRevert(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + more := true + var applied, reverted bool + var matchData StateChange + + confidence := 1 + timeout := abi.ChainEpoch(20) + + err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { + return false, true, nil + }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { + require.Equal(t, false, applied) + applied = true + return more, nil + }, func(_ context.Context, ts *types.TipSet) error { + reverted = true + return nil + }, confidence, timeout, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { + if matchData == nil { + return false, matchData, nil + } + + d := matchData + matchData = nil + return true, d, nil + }) + require.NoError(t, err) + + fcs.advance(0, 2, nil) // H=3 + + // Make a state change from TS at height 3 to TS at height 4 + matchData = testStateChange{from: "a", to: "b"} + fcs.advance(0, 1, nil) // H=4 + + // Haven't yet reached confidence + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // Advance to reach confidence level + fcs.advance(0, 1, nil) // H=5 + + // Should now have called the handler + require.Equal(t, true, applied) + require.Equal(t, false, reverted) + applied = false + + // Advance 3 more TS + fcs.advance(0, 3, nil) // H=8 + + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // Regress but not so far as to cause a revert + fcs.advance(3, 1, nil) // H=6 + + require.Equal(t, false, applied) + require.Equal(t, false, reverted) + + // Regress back to state where change happened + fcs.advance(3, 1, nil) // H=4 + + // Expect revert to have happened + require.Equal(t, false, applied) + require.Equal(t, true, reverted) +} + +func TestStateChangedTimeout(t *testing.T) { + fcs := &fakeCS{ + t: t, + h: 1, + + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) + + events := NewEvents(context.Background(), fcs) + + called := false + + err := events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { + return false, true, nil + }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { + called = true + require.Nil(t, data) + require.Equal(t, abi.ChainEpoch(20), newTs.Height()) + require.Equal(t, abi.ChainEpoch(23), curH) + return false, nil + }, func(_ context.Context, ts *types.TipSet) error { + t.Fatal("revert on timeout") + return nil + }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { + return false, nil, nil + }) + + require.NoError(t, err) + + fcs.advance(0, 21, nil) + require.False(t, called) + + fcs.advance(0, 5, nil) + require.True(t, called) + called = false + + // with check func reporting done + + fcs = &fakeCS{ + t: t, + h: 1, + + msgs: map[cid.Cid]fakeMsg{}, + blkMsgs: map[cid.Cid]cid.Cid{}, + tsc: newTSCache(2*build.ForkLengthThreshold, nil), + } + require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid))) + + events = NewEvents(context.Background(), fcs) + + err = events.StateChanged(func(ts *types.TipSet) (d bool, m bool, e error) { + return true, true, nil + }, func(oldTs, newTs *types.TipSet, data StateChange, curH abi.ChainEpoch) (bool, error) { + called = true + require.Nil(t, data) + require.Equal(t, abi.ChainEpoch(20), newTs.Height()) + require.Equal(t, abi.ChainEpoch(23), curH) + return false, nil + }, func(_ context.Context, ts *types.TipSet) error { + t.Fatal("revert on timeout") + return nil + }, 3, 20, func(oldTs, newTs *types.TipSet) (bool, StateChange, error) { + return false, nil, nil + }) + require.NoError(t, err) + + fcs.advance(0, 21, nil) + require.False(t, called) + + fcs.advance(0, 5, nil) + require.False(t, called) +} diff --git a/chain/events/state/predicates.go b/chain/events/state/predicates.go new file mode 100644 index 000000000..3245d5c03 --- /dev/null +++ b/chain/events/state/predicates.go @@ -0,0 +1,137 @@ +package state + +import ( + "context" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-amt-ipld/v2" + "github.com/filecoin-project/lotus/api/apibstore" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/filecoin-project/specs-actors/actors/builtin/market" + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" +) + +// UserData is the data returned from the DiffFunc +type UserData interface{} + +// ChainAPI abstracts out calls made by this class to external APIs +type ChainAPI interface { + apibstore.ChainIO + StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) +} + +// StatePredicates has common predicates for responding to state changes +type StatePredicates struct { + api ChainAPI + cst *cbor.BasicIpldStore +} + +func NewStatePredicates(api ChainAPI) *StatePredicates { + return &StatePredicates{ + api: api, + cst: cbor.NewCborStore(apibstore.NewAPIBlockstore(api)), + } +} + +// DiffFunc check if there's a change form oldState to newState, and returns +// - changed: was there a change +// - user: user-defined data representing the state change +// - err +type DiffFunc func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) + +type DiffStateFunc func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) + +// OnActorStateChanged calls diffStateFunc when the state changes for the given actor +func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFunc DiffStateFunc) DiffFunc { + return func(ctx context.Context, oldState, newState *types.TipSet) (changed bool, user UserData, err error) { + oldActor, err := sp.api.StateGetActor(ctx, addr, oldState.Key()) + if err != nil { + return false, nil, err + } + newActor, err := sp.api.StateGetActor(ctx, addr, newState.Key()) + if err != nil { + return false, nil, err + } + + if oldActor.Head.Equals(newActor.Head) { + return false, nil, nil + } + return diffStateFunc(ctx, oldActor.Head, newActor.Head) + } +} + +type DiffStorageMarketStateFunc func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) + +// OnStorageMarketActorChanged calls diffStorageMarketState when the state changes for the market actor +func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffFunc { + return sp.OnActorStateChanged(builtin.StorageMarketActorAddr, func(ctx context.Context, oldActorStateHead, newActorStateHead cid.Cid) (changed bool, user UserData, err error) { + var oldState market.State + if err := sp.cst.Get(ctx, oldActorStateHead, &oldState); err != nil { + return false, nil, err + } + var newState market.State + if err := sp.cst.Get(ctx, newActorStateHead, &newState); err != nil { + return false, nil, err + } + return diffStorageMarketState(ctx, &oldState, &newState) + }) +} + +type DiffDealStatesFunc func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) + +// OnDealStateChanged calls diffDealStates when the market state changes +func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) DiffStorageMarketStateFunc { + return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) { + if oldState.States.Equals(newState.States) { + return false, nil, nil + } + + oldRoot, err := amt.LoadAMT(ctx, sp.cst, oldState.States) + if err != nil { + return false, nil, err + } + newRoot, err := amt.LoadAMT(ctx, sp.cst, newState.States) + if err != nil { + return false, nil, err + } + + return diffDealStates(ctx, oldRoot, newRoot) + } +} + +// ChangedDeals is a set of changes to deal state +type ChangedDeals map[abi.DealID]DealStateChange + +// DealStateChange is a change in deal state from -> to +type DealStateChange struct { + From market.DealState + To market.DealState +} + +// DealStateChangedForIDs detects changes in the deal state AMT for the given deal IDs +func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDealStatesFunc { + return func(ctx context.Context, oldDealStateRoot *amt.Root, newDealStateRoot *amt.Root) (changed bool, user UserData, err error) { + changedDeals := make(ChangedDeals) + for _, dealID := range dealIds { + var oldDeal, newDeal market.DealState + err := oldDealStateRoot.Get(ctx, uint64(dealID), &oldDeal) + if err != nil { + return false, nil, err + } + err = newDealStateRoot.Get(ctx, uint64(dealID), &newDeal) + if err != nil { + return false, nil, err + } + if oldDeal != newDeal { + changedDeals[dealID] = DealStateChange{oldDeal, newDeal} + } + } + if len(changedDeals) > 0 { + return true, changedDeals, nil + } + return false, nil, nil + } +} diff --git a/chain/events/state/predicates_test.go b/chain/events/state/predicates_test.go new file mode 100644 index 000000000..1c10209a8 --- /dev/null +++ b/chain/events/state/predicates_test.go @@ -0,0 +1,177 @@ +package state + +import ( + "context" + "testing" + + "github.com/filecoin-project/specs-actors/actors/crypto" + "github.com/ipfs/go-hamt-ipld" + + "github.com/filecoin-project/go-amt-ipld/v2" + "github.com/filecoin-project/specs-actors/actors/builtin/market" + ds "github.com/ipfs/go-datastore" + ds_sync "github.com/ipfs/go-datastore/sync" + bstore "github.com/ipfs/go-ipfs-blockstore" + cbornode "github.com/ipfs/go-ipld-cbor" + "golang.org/x/xerrors" + + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/specs-actors/actors/abi" +) + +var dummyCid cid.Cid + +func init() { + dummyCid, _ = cid.Parse("bafkqaaa") +} + +type mockAPI struct { + ts map[types.TipSetKey]*types.Actor + bs bstore.Blockstore +} + +func newMockAPI(bs bstore.Blockstore) *mockAPI { + return &mockAPI{ + bs: bs, + ts: make(map[types.TipSetKey]*types.Actor), + } +} + +func (m mockAPI) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) { + return m.bs.Has(c) +} + +func (m mockAPI) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) { + blk, err := m.bs.Get(c) + if err != nil { + return nil, xerrors.Errorf("blockstore get: %w", err) + } + + return blk.RawData(), nil +} + +func (m mockAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { + return m.ts[tsk], nil +} + +func (m mockAPI) setActor(tsk types.TipSetKey, act *types.Actor) { + m.ts[tsk] = act +} + +func TestPredicates(t *testing.T) { + ctx := context.Background() + + bs := bstore.NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore())) + store := cbornode.NewCborStore(bs) + + oldDeals := map[abi.DealID]*market.DealState{ + abi.DealID(1): { + SectorStartEpoch: 1, + LastUpdatedEpoch: 2, + SlashEpoch: 0, + }, + abi.DealID(2): { + SectorStartEpoch: 4, + LastUpdatedEpoch: 5, + SlashEpoch: 0, + }, + } + oldStateC := createMarketState(ctx, t, store, oldDeals) + + newDeals := map[abi.DealID]*market.DealState{ + abi.DealID(1): { + SectorStartEpoch: 1, + LastUpdatedEpoch: 3, + SlashEpoch: 0, + }, + abi.DealID(2): { + SectorStartEpoch: 4, + LastUpdatedEpoch: 6, + SlashEpoch: 6, + }, + } + newStateC := createMarketState(ctx, t, store, newDeals) + + miner, err := address.NewFromString("t00") + require.NoError(t, err) + oldState, err := mockTipset(miner, 1) + require.NoError(t, err) + newState, err := mockTipset(miner, 2) + require.NoError(t, err) + + api := newMockAPI(bs) + api.setActor(oldState.Key(), &types.Actor{Head: oldStateC}) + api.setActor(newState.Key(), &types.Actor{Head: newStateC}) + + preds := NewStatePredicates(api) + + dealIds := []abi.DealID{abi.DealID(1), abi.DealID(2)} + diffFn := preds.OnStorageMarketActorChanged(preds.OnDealStateChanged(preds.DealStateChangedForIDs(dealIds))) + + // Diff a state against itself: expect no change + changed, _, err := diffFn(ctx, oldState, oldState) + require.NoError(t, err) + require.False(t, changed) + + // Diff old state against new state + changed, val, err := diffFn(ctx, oldState, newState) + require.NoError(t, err) + require.True(t, changed) + + changedDeals, ok := val.(ChangedDeals) + require.True(t, ok) + require.Len(t, changedDeals, 2) + require.Contains(t, changedDeals, abi.DealID(1)) + require.Contains(t, changedDeals, abi.DealID(2)) + deal1 := changedDeals[abi.DealID(1)] + if deal1.From.LastUpdatedEpoch != 2 || deal1.To.LastUpdatedEpoch != 3 { + t.Fatal("Unexpected change to LastUpdatedEpoch") + } + deal2 := changedDeals[abi.DealID(2)] + if deal2.From.SlashEpoch != 0 || deal2.To.SlashEpoch != 6 { + t.Fatal("Unexpected change to LastUpdatedEpoch") + } +} + +func mockTipset(miner address.Address, timestamp uint64) (*types.TipSet, error) { + return types.NewTipSet([]*types.BlockHeader{{ + Miner: miner, + Height: 5, + ParentStateRoot: dummyCid, + Messages: dummyCid, + ParentMessageReceipts: dummyCid, + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + Timestamp: timestamp, + }}) +} + +func createMarketState(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid { + rootCid := createAMT(ctx, t, store, deals) + + emptyArrayCid, err := amt.NewAMT(store).Flush(context.TODO()) + require.NoError(t, err) + emptyMap, err := store.Put(context.TODO(), hamt.NewNode(store, hamt.UseTreeBitWidth(5))) + require.NoError(t, err) + state := market.ConstructState(emptyArrayCid, emptyMap, emptyMap) + state.States = rootCid + + stateC, err := store.Put(ctx, state) + require.NoError(t, err) + return stateC +} + +func createAMT(ctx context.Context, t *testing.T, store *cbornode.BasicIpldStore, deals map[abi.DealID]*market.DealState) cid.Cid { + root := amt.NewAMT(store) + for dealID, dealState := range deals { + err := root.Set(ctx, uint64(dealID), dealState) + require.NoError(t, err) + } + rootCid, err := root.Flush(ctx) + require.NoError(t, err) + return rootCid +} diff --git a/chain/events/utils.go b/chain/events/utils.go index d525e5368..40556c9ff 100644 --- a/chain/events/utils.go +++ b/chain/events/utils.go @@ -8,11 +8,11 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -func (e *calledEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd CalledHandler) CheckFunc { +func (me *messageEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd MsgHandler) CheckFunc { msg := smsg.VMMessage() return func(ts *types.TipSet) (done bool, more bool, err error) { - fa, err := e.cs.StateGetActor(ctx, msg.From, ts.Key()) + fa, err := me.cs.StateGetActor(ctx, msg.From, ts.Key()) if err != nil { return false, true, err } @@ -22,7 +22,7 @@ func (e *calledEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd Ca return false, true, nil } - rec, err := e.cs.StateGetReceipt(ctx, smsg.VMMessage().Cid(), ts.Key()) + rec, err := me.cs.StateGetReceipt(ctx, smsg.VMMessage().Cid(), ts.Key()) if err != nil { return false, true, xerrors.Errorf("getting receipt in CheckMsg: %w", err) } @@ -33,7 +33,7 @@ func (e *calledEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd Ca } } -func (e *calledEvents) MatchMsg(inmsg *types.Message) MatchFunc { +func (me *messageEvents) MatchMsg(inmsg *types.Message) MsgMatchFunc { return func(msg *types.Message) (bool, error) { if msg.From == inmsg.From && msg.Nonce == inmsg.Nonce && !inmsg.Equals(msg) { return false, xerrors.Errorf("matching msg %s from %s, nonce %d: got duplicate origin/nonce msg %d", inmsg.Cid(), inmsg.From, inmsg.Nonce, msg.Nonce)