diff --git a/chain/events/events.go b/chain/events/events.go index a325b5410..408c8845e 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -51,7 +51,7 @@ type Events struct { readyOnce sync.Once heightEvents - calledEvents + hcEvents } func NewEvents(ctx context.Context, api eventAPI) *Events { @@ -74,18 +74,7 @@ func NewEvents(ctx context.Context, api eventAPI) *Events { htHeights: map[abi.ChainEpoch][]uint64{}, }, - calledEvents: calledEvents{ - cs: api, - tsc: tsc, - ctx: ctx, - gcConfidence: uint64(gcConfidence), - - confQueue: map[triggerH]map[msgH][]*queuedEvent{}, - revertQueue: map[msgH][]triggerH{}, - triggers: map[triggerID]*callHandler{}, - matchers: map[triggerID][]MatchFunc{}, - timeouts: map[abi.ChainEpoch]map[triggerID]int{}, - }, + hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)), } e.ready.Add(1) @@ -143,7 +132,7 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error { } e.readyOnce.Do(func() { - e.at = cur[0].Val.Height() + e.lastTs = cur[0].Val e.ready.Done() }) @@ -186,5 +175,5 @@ func (e *Events) headChange(rev, app []*types.TipSet) error { return err } - return e.headChangeCalled(rev, app) + return e.processHeadChangeEvent(rev, app) } diff --git a/chain/events/events_hc.go b/chain/events/events_hc.go new file mode 100644 index 000000000..ae42fd85f --- /dev/null +++ b/chain/events/events_hc.go @@ -0,0 +1,589 @@ +package events + +import ( + "context" + "math" + "sync" + + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/chain/types" +) + +const NoTimeout = math.MaxInt64 + +type triggerID = uint64 + +// msgH is the block height at which a message was present / event has happened +type msgH = abi.ChainEpoch + +// triggerH is the block height at which the listener will be notified about the +// message (msgH+confidence) +type triggerH = abi.ChainEpoch + +type eventData interface{} + +// EventHandler arguments: +// `ts` is the tipset, in which the `msg` is included. +// `curH`-`ts.Height` = `confidence` +type EventHandler func(data eventData, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) + +// CheckFunc is used for atomicity guarantees. If the condition the callbacks +// wait for has already happened in tipset `ts` +// +// If `done` is true, timeout won't be triggered +// If `more` is false, no messages will be sent to EventHandler (RevertHandler +// may still be called) +type CheckFunc func(ts *types.TipSet) (done bool, more bool, err error) + +// Keep track of information for an event handler +type handlerInfo struct { + confidence int + timeout abi.ChainEpoch + + disabled bool // TODO: GC after gcConfidence reached + + handle EventHandler + revert RevertHandler +} + +// When a change occurs, a queuedEvent is created and put into a queue +// until the required confidence is reached +type queuedEvent struct { + trigger triggerID + + h abi.ChainEpoch + data eventData + + called bool +} + +// Manages chain head change events, which may be forward (new tipset added to +// chain) or backward (chain branch discarded in favour of heavier branch) +type hcEvents struct { + cs eventAPI + tsc *tipSetCache + ctx context.Context + gcConfidence uint64 + + lastTs *types.TipSet + + lk sync.Mutex + + ctr triggerID + + triggers map[triggerID]*handlerInfo + + // maps block heights to events + // [triggerH][msgH][event] + confQueue map[triggerH]map[msgH][]*queuedEvent + + // [msgH][triggerH] + revertQueue map[msgH][]triggerH + + // [timeoutH+confidence][triggerID]{calls} + timeouts map[abi.ChainEpoch]map[triggerID]int + + messageEvents + watcherEvents +} + +func newHCEvents(ctx context.Context, cs eventAPI, tsc *tipSetCache, gcConfidence uint64) hcEvents { + e := hcEvents{ + ctx: ctx, + cs: cs, + tsc: tsc, + gcConfidence: gcConfidence, + + confQueue: map[triggerH]map[msgH][]*queuedEvent{}, + revertQueue: map[msgH][]triggerH{}, + triggers: map[triggerID]*handlerInfo{}, + timeouts: map[abi.ChainEpoch]map[triggerID]int{}, + } + + e.messageEvents = newMessageEvents(ctx, &e, cs) + e.watcherEvents = newWatcherEvents(ctx, &e, cs) + + return e +} + +// Called when there is a change to the head with tipsets to be +// reverted / applied +func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error { + for _, ts := range rev { + e.handleReverts(ts) + e.lastTs = ts + } + + for _, ts := range app { + // Check if the head change caused any state changes that we were + // waiting for + stateChanges := e.watcherEvents.checkStateChanges(e.lastTs, ts) + + // Queue up calls until there have been enough blocks to reach + // confidence on the state changes + for tid, data := range stateChanges { + e.queueForConfidence(tid, data, ts) + } + + // Check if the head change included any new message calls + newCalls, err := e.messageEvents.checkNewCalls(ts) + if err != nil { + return err + } + + // Queue up calls until there have been enough blocks to reach + // confidence on the message calls + for tid, data := range newCalls { + e.queueForConfidence(tid, data, ts) + } + + for at := e.lastTs.Height(); at <= ts.Height(); at++ { + // Apply any events and timeouts that were queued up until the + // current chain height + e.applyWithConfidence(ts, at) + e.applyTimeouts(ts) + } + + // Update the latest known tipset + e.lastTs = ts + } + + return nil +} + +func (e *hcEvents) handleReverts(ts *types.TipSet) { + reverts, ok := e.revertQueue[ts.Height()] + if !ok { + return // nothing to do + } + + for _, triggerH := range reverts { + toRevert := e.confQueue[triggerH][ts.Height()] + for _, event := range toRevert { + if !event.called { + continue // event wasn't apply()-ied yet + } + + trigger := e.triggers[event.trigger] + + if err := trigger.revert(e.ctx, ts); err != nil { + log.Errorf("reverting chain trigger failed: %s", err) + // log.Errorf("reverting chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, ts.Height(), triggerH, err) + } + } + delete(e.confQueue[triggerH], ts.Height()) + } + delete(e.revertQueue, ts.Height()) +} + +// Queue up events until the chain has reached a height that reflects the +// desired confidence +func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, ts *types.TipSet) { + trigger := e.triggers[trigID] + + appliedH := ts.Height() + + triggerH := appliedH + abi.ChainEpoch(trigger.confidence) + + byOrigH, ok := e.confQueue[triggerH] + if !ok { + byOrigH = map[abi.ChainEpoch][]*queuedEvent{} + e.confQueue[triggerH] = byOrigH + } + + byOrigH[appliedH] = append(byOrigH[appliedH], &queuedEvent{ + trigger: trigID, + h: appliedH, + data: data, + }) + + e.revertQueue[appliedH] = append(e.revertQueue[appliedH], triggerH) +} + +// Apply any events that were waiting for this chain height for confidence +func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) { + byOrigH, ok := e.confQueue[height] + if !ok { + return // no triggers at thin height + } + + for origH, events := range byOrigH { + triggerTs, err := e.tsc.get(origH) + if err != nil { + log.Errorf("events: applyWithConfidence didn't find tipset for event; wanted %d; current %d", origH, height) + } + + for _, event := range events { + if event.called { + continue + } + + trigger := e.triggers[event.trigger] + if trigger.disabled { + continue + } + + more, err := trigger.handle(event.data, triggerTs, height) + if err != nil { + log.Errorf("chain trigger (@H %d, called @ %d) failed: %s", origH, height, err) + // log.Errorf("chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, origH, height, err) + continue // don't revert failed calls + } + + event.called = true + + touts, ok := e.timeouts[trigger.timeout] + if ok { + touts[event.trigger]++ + } + + trigger.disabled = !more + } + } +} + +// Apply any timeouts that expire at this height +func (e *hcEvents) applyTimeouts(ts *types.TipSet) { + triggers, ok := e.timeouts[ts.Height()] + if !ok { + return // nothing to do + } + + for triggerID, calls := range triggers { + if calls > 0 { + continue // don't timeout if the method was called + } + trigger := e.triggers[triggerID] + if trigger.disabled { + continue + } + + timeoutTs, err := e.tsc.get(ts.Height() - abi.ChainEpoch(trigger.confidence)) + if err != nil { + log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height()) + } + + // more, err := trigger.handle(nil, nil, timeoutTs, ts.Height()) + more, err := trigger.handle(nil, timeoutTs, ts.Height()) + if err != nil { + log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err) + continue // don't revert failed calls + } + + trigger.disabled = !more // allows messages after timeout + } +} + +// Listen for an event +// - CheckFunc: immediately checks if the event already occured +// - EventHandler: called when the event has occurred, after confidence tipsets +// - RevertHandler: called if the chain head changes causing the event to revert +// - confidence: wait this many tipsets before calling EventHandler +// - timeout: at this chain height, timeout on waiting for this event +func (e *hcEvents) onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error) { + e.lk.Lock() + defer e.lk.Unlock() + + // Check if the even has already occurred + ts := e.tsc.best() + done, more, err := check(ts) + if err != nil { + return 0, xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err) + } + if done { + timeout = NoTimeout + } + + // Create a trigger for the event + id := e.ctr + e.ctr++ + + e.triggers[id] = &handlerInfo{ + confidence: confidence, + timeout: timeout + abi.ChainEpoch(confidence), + + disabled: !more, + + handle: hnd, + revert: rev, + } + + // If there's a timeout, set up a timeout check at that height + if timeout != NoTimeout { + if e.timeouts[timeout+abi.ChainEpoch(confidence)] == nil { + e.timeouts[timeout+abi.ChainEpoch(confidence)] = map[uint64]int{} + } + e.timeouts[timeout+abi.ChainEpoch(confidence)][id] = 0 + } + + return id, nil +} + +// headChangeAPI is used to allow the composed event APIs to call back to hcEvents +// to listen for changes +type headChangeAPI interface { + onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error) +} + +// watcherEvents watches for a state change +type watcherEvents struct { + ctx context.Context + cs eventAPI + hcApi headChangeAPI + + lk sync.RWMutex + matchers map[triggerID]StateMatchFunc +} + +func newWatcherEvents(ctx context.Context, hcApi headChangeAPI, cs eventAPI) watcherEvents { + return watcherEvents{ + ctx: ctx, + cs: cs, + hcApi: hcApi, + matchers: make(map[triggerID]StateMatchFunc), + } +} + +// Run each of the matchers against the previous and current state to see if +// there's a change +func (we *watcherEvents) checkStateChanges(oldState, newState *types.TipSet) map[triggerID]eventData { + we.lk.RLock() + defer we.lk.RUnlock() + + res := make(map[triggerID]eventData) + for tid, matchFn := range we.matchers { + ok, data, err := matchFn(we.ctx, oldState, newState) + if err != nil { + log.Errorf("event diff fn failed: %s", err) + continue + } + + if ok { + res[tid] = data + } + } + return res +} + +// Used to store the state change +type stateData interface{} + +// StateChangeHandler arguments: +// `ts` is the tipset, in which the change occured +// `curH`-`ts.Height` = `confidence` +type StateChangeHandler func(ctx context.Context, oldState, newState stateData, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) + +type StateMatchFunc func(ctx context.Context, oldState, newState *types.TipSet) (bool, eventData, error) + +// StateChanged registers a callback which is triggered when a specified state +// change occurs or a timeout is reached. +// +// * `CheckFunc` callback is invoked immediately with a recent tipset, it +// returns two booleans - `done`, and `more`. +// +// * `done` should be true when some on-chain state change we are waiting +// for has happened. When `done` is set to true, timeout trigger is disabled. +// +// * `more` should be false when we don't want to receive new notifications +// through StateChangeHandler. Note that notifications may still be delivered to +// RevertHandler +// +// * `StateChangeHandler` is called when the specified state change was observed +// on-chain, and a confidence threshold was reached, or the specified `timeout` +// height was reached with no state change observed. When this callback is +// invoked on a timeout, `oldState` and `newState` are set to nil. +// This callback returns a boolean specifying whether further notifications +// should be sent, like `more` return param from `CheckFunc` above. +// +// * `RevertHandler` is called after apply handler, when we drop the tipset +// containing the message. The tipset passed as the argument is the tipset +// that is being dropped. Note that the message dropped may be re-applied +// in a different tipset in small amount of time. +// +// * `StateMatchFunc` is called against each tipset state. If there is a match, +// the state change is queued up until the confidence interval has elapsed (and +// `StateChangeHandler` is called) +func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf StateMatchFunc) error { + hnd := func(data eventData, ts *types.TipSet, height abi.ChainEpoch) (bool, error) { + states, ok := data.([]interface{}) + if !ok || len(states) != 2 { + panic("expected 2 element array") + } + + return scHnd(we.ctx, states[0], states[1], ts, height) + } + + id, err := we.hcApi.onHeadChanged(check, hnd, rev, confidence, timeout) + if err != nil { + return err + } + + we.lk.Lock() + we.matchers[id] = mf + defer we.lk.Unlock() + + return nil +} + +// messageEvents watches for message calls to actors +type messageEvents struct { + ctx context.Context + cs eventAPI + hcApi headChangeAPI + + lk sync.RWMutex + matchers map[triggerID][]MsgMatchFunc +} + +func newMessageEvents(ctx context.Context, hcApi headChangeAPI, cs eventAPI) messageEvents { + return messageEvents{ + ctx: ctx, + cs: cs, + hcApi: hcApi, + matchers: map[triggerID][]MsgMatchFunc{}, + } +} + +// Check if there are any new actor calls +func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventData, error) { + pts, err := me.cs.ChainGetTipSet(me.ctx, ts.Parents()) // we actually care about messages in the parent tipset here + if err != nil { + log.Errorf("getting parent tipset in checkNewCalls: %s", err) + return nil, err + } + + res := make(map[triggerID]eventData) + me.messagesForTs(pts, func(msg *types.Message) { + me.lk.RLock() + defer me.lk.RUnlock() + // TODO: provide receipts + + for tid, matchFns := range me.matchers { + var matched bool + for _, matchFn := range matchFns { + ok, err := matchFn(msg) + if err != nil { + log.Errorf("event matcher failed: %s", err) + continue + } + matched = ok + + if matched { + break + } + } + + if matched { + res[tid] = msg + break + } + } + }) + + return res, nil +} + +// Get the messages in a tipset +func (me *messageEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) { + seen := map[cid.Cid]struct{}{} + + for _, tsb := range ts.Blocks() { + + msgs, err := me.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid()) + if err != nil { + log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err) + // this is quite bad, but probably better than missing all the other updates + continue + } + + for _, m := range msgs.BlsMessages { + _, ok := seen[m.Cid()] + if ok { + continue + } + seen[m.Cid()] = struct{}{} + + consume(m) + } + + for _, m := range msgs.SecpkMessages { + _, ok := seen[m.Message.Cid()] + if ok { + continue + } + seen[m.Message.Cid()] = struct{}{} + + consume(&m.Message) + } + } +} + +// MsgHandler arguments: +// `ts` is the tipset, in which the `msg` is included. +// `curH`-`ts.Height` = `confidence` +type MsgHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error) + +type MsgMatchFunc func(msg *types.Message) (bool, error) + +// Called registers a callback which is triggered when a specified method is +// called on an actor, or a timeout is reached. +// +// * `CheckFunc` callback is invoked immediately with a recent tipset, it +// returns two booleans - `done`, and `more`. +// +// * `done` should be true when some on-chain action we are waiting for has +// happened. When `done` is set to true, timeout trigger is disabled. +// +// * `more` should be false when we don't want to receive new notifications +// through MsgHandler. Note that notifications may still be delivered to +// RevertHandler +// +// * `MsgHandler` is called when the specified event was observed on-chain, +// and a confidence threshold was reached, or the specified `timeout` height +// was reached with no events observed. When this callback is invoked on a +// timeout, `msg` is set to nil. This callback returns a boolean specifying +// whether further notifications should be sent, like `more` return param +// from `CheckFunc` above. +// +// * `RevertHandler` is called after apply handler, when we drop the tipset +// containing the message. The tipset passed as the argument is the tipset +// that is being dropped. Note that the message dropped may be re-applied +// in a different tipset in small amount of time. +// +// * `MsgMatchFunc` is called against each message. If there is a match, the +// message is queued up until the confidence interval has elapsed (and +// `MsgHandler` is called) +func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MsgMatchFunc) error { + hnd := func(data eventData, ts *types.TipSet, height abi.ChainEpoch) (bool, error) { + msg, ok := data.(*types.Message) + if data != nil && !ok { + panic("expected msg") + } + + rec, err := me.cs.StateGetReceipt(me.ctx, msg.Cid(), ts.Key()) + if err != nil { + return false, err + } + + return msgHnd(msg, rec, ts, height) + } + + id, err := me.hcApi.onHeadChanged(check, hnd, rev, confidence, timeout) + if err != nil { + return err + } + + me.lk.Lock() + defer me.lk.Unlock() + me.matchers[id] = append(me.matchers[id], mf) + + return nil +} + +// Convenience function for checking and matching messages +func (me *messageEvents) CalledMsg(ctx context.Context, hnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, msg types.ChainMsg) error { + return me.Called(me.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, me.MatchMsg(msg.VMMessage())) +} diff --git a/chain/events/utils.go b/chain/events/utils.go index d525e5368..40556c9ff 100644 --- a/chain/events/utils.go +++ b/chain/events/utils.go @@ -8,11 +8,11 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -func (e *calledEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd CalledHandler) CheckFunc { +func (me *messageEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd MsgHandler) CheckFunc { msg := smsg.VMMessage() return func(ts *types.TipSet) (done bool, more bool, err error) { - fa, err := e.cs.StateGetActor(ctx, msg.From, ts.Key()) + fa, err := me.cs.StateGetActor(ctx, msg.From, ts.Key()) if err != nil { return false, true, err } @@ -22,7 +22,7 @@ func (e *calledEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd Ca return false, true, nil } - rec, err := e.cs.StateGetReceipt(ctx, smsg.VMMessage().Cid(), ts.Key()) + rec, err := me.cs.StateGetReceipt(ctx, smsg.VMMessage().Cid(), ts.Key()) if err != nil { return false, true, xerrors.Errorf("getting receipt in CheckMsg: %w", err) } @@ -33,7 +33,7 @@ func (e *calledEvents) CheckMsg(ctx context.Context, smsg types.ChainMsg, hnd Ca } } -func (e *calledEvents) MatchMsg(inmsg *types.Message) MatchFunc { +func (me *messageEvents) MatchMsg(inmsg *types.Message) MsgMatchFunc { return func(msg *types.Message) (bool, error) { if msg.From == inmsg.From && msg.Nonce == inmsg.Nonce && !inmsg.Equals(msg) { return false, xerrors.Errorf("matching msg %s from %s, nonce %d: got duplicate origin/nonce msg %d", inmsg.Cid(), inmsg.From, inmsg.Nonce, msg.Nonce)