dd490220d7
This allows us to create a general purpose head change events manager that can be used for call events and also for state change events.
590 lines
17 KiB
Go
590 lines
17 KiB
Go
package events
|
|
|
|
import (
|
|
"context"
|
|
"math"
|
|
"sync"
|
|
|
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
|
"github.com/ipfs/go-cid"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
)
|
|
|
|
const NoTimeout = math.MaxInt64
|
|
|
|
type triggerID = uint64
|
|
|
|
// msgH is the block height at which a message was present / event has happened
|
|
type msgH = abi.ChainEpoch
|
|
|
|
// triggerH is the block height at which the listener will be notified about the
|
|
// message (msgH+confidence)
|
|
type triggerH = abi.ChainEpoch
|
|
|
|
type eventData interface{}
|
|
|
|
// EventHandler arguments:
|
|
// `ts` is the tipset, in which the `msg` is included.
|
|
// `curH`-`ts.Height` = `confidence`
|
|
type EventHandler func(data eventData, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
|
|
|
|
// CheckFunc is used for atomicity guarantees. If the condition the callbacks
|
|
// wait for has already happened in tipset `ts`
|
|
//
|
|
// If `done` is true, timeout won't be triggered
|
|
// If `more` is false, no messages will be sent to EventHandler (RevertHandler
|
|
// may still be called)
|
|
type CheckFunc func(ts *types.TipSet) (done bool, more bool, err error)
|
|
|
|
// Keep track of information for an event handler
|
|
type handlerInfo struct {
|
|
confidence int
|
|
timeout abi.ChainEpoch
|
|
|
|
disabled bool // TODO: GC after gcConfidence reached
|
|
|
|
handle EventHandler
|
|
revert RevertHandler
|
|
}
|
|
|
|
// When a change occurs, a queuedEvent is created and put into a queue
|
|
// until the required confidence is reached
|
|
type queuedEvent struct {
|
|
trigger triggerID
|
|
|
|
h abi.ChainEpoch
|
|
data eventData
|
|
|
|
called bool
|
|
}
|
|
|
|
// Manages chain head change events, which may be forward (new tipset added to
|
|
// chain) or backward (chain branch discarded in favour of heavier branch)
|
|
type hcEvents struct {
|
|
cs eventAPI
|
|
tsc *tipSetCache
|
|
ctx context.Context
|
|
gcConfidence uint64
|
|
|
|
lastTs *types.TipSet
|
|
|
|
lk sync.Mutex
|
|
|
|
ctr triggerID
|
|
|
|
triggers map[triggerID]*handlerInfo
|
|
|
|
// maps block heights to events
|
|
// [triggerH][msgH][event]
|
|
confQueue map[triggerH]map[msgH][]*queuedEvent
|
|
|
|
// [msgH][triggerH]
|
|
revertQueue map[msgH][]triggerH
|
|
|
|
// [timeoutH+confidence][triggerID]{calls}
|
|
timeouts map[abi.ChainEpoch]map[triggerID]int
|
|
|
|
messageEvents
|
|
watcherEvents
|
|
}
|
|
|
|
func newHCEvents(ctx context.Context, cs eventAPI, tsc *tipSetCache, gcConfidence uint64) hcEvents {
|
|
e := hcEvents{
|
|
ctx: ctx,
|
|
cs: cs,
|
|
tsc: tsc,
|
|
gcConfidence: gcConfidence,
|
|
|
|
confQueue: map[triggerH]map[msgH][]*queuedEvent{},
|
|
revertQueue: map[msgH][]triggerH{},
|
|
triggers: map[triggerID]*handlerInfo{},
|
|
timeouts: map[abi.ChainEpoch]map[triggerID]int{},
|
|
}
|
|
|
|
e.messageEvents = newMessageEvents(ctx, &e, cs)
|
|
e.watcherEvents = newWatcherEvents(ctx, &e, cs)
|
|
|
|
return e
|
|
}
|
|
|
|
// Called when there is a change to the head with tipsets to be
|
|
// reverted / applied
|
|
func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error {
|
|
for _, ts := range rev {
|
|
e.handleReverts(ts)
|
|
e.lastTs = ts
|
|
}
|
|
|
|
for _, ts := range app {
|
|
// Check if the head change caused any state changes that we were
|
|
// waiting for
|
|
stateChanges := e.watcherEvents.checkStateChanges(e.lastTs, ts)
|
|
|
|
// Queue up calls until there have been enough blocks to reach
|
|
// confidence on the state changes
|
|
for tid, data := range stateChanges {
|
|
e.queueForConfidence(tid, data, ts)
|
|
}
|
|
|
|
// Check if the head change included any new message calls
|
|
newCalls, err := e.messageEvents.checkNewCalls(ts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Queue up calls until there have been enough blocks to reach
|
|
// confidence on the message calls
|
|
for tid, data := range newCalls {
|
|
e.queueForConfidence(tid, data, ts)
|
|
}
|
|
|
|
for at := e.lastTs.Height(); at <= ts.Height(); at++ {
|
|
// Apply any events and timeouts that were queued up until the
|
|
// current chain height
|
|
e.applyWithConfidence(ts, at)
|
|
e.applyTimeouts(ts)
|
|
}
|
|
|
|
// Update the latest known tipset
|
|
e.lastTs = ts
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *hcEvents) handleReverts(ts *types.TipSet) {
|
|
reverts, ok := e.revertQueue[ts.Height()]
|
|
if !ok {
|
|
return // nothing to do
|
|
}
|
|
|
|
for _, triggerH := range reverts {
|
|
toRevert := e.confQueue[triggerH][ts.Height()]
|
|
for _, event := range toRevert {
|
|
if !event.called {
|
|
continue // event wasn't apply()-ied yet
|
|
}
|
|
|
|
trigger := e.triggers[event.trigger]
|
|
|
|
if err := trigger.revert(e.ctx, ts); err != nil {
|
|
log.Errorf("reverting chain trigger failed: %s", err)
|
|
// log.Errorf("reverting chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, ts.Height(), triggerH, err)
|
|
}
|
|
}
|
|
delete(e.confQueue[triggerH], ts.Height())
|
|
}
|
|
delete(e.revertQueue, ts.Height())
|
|
}
|
|
|
|
// Queue up events until the chain has reached a height that reflects the
|
|
// desired confidence
|
|
func (e *hcEvents) queueForConfidence(trigID uint64, data eventData, ts *types.TipSet) {
|
|
trigger := e.triggers[trigID]
|
|
|
|
appliedH := ts.Height()
|
|
|
|
triggerH := appliedH + abi.ChainEpoch(trigger.confidence)
|
|
|
|
byOrigH, ok := e.confQueue[triggerH]
|
|
if !ok {
|
|
byOrigH = map[abi.ChainEpoch][]*queuedEvent{}
|
|
e.confQueue[triggerH] = byOrigH
|
|
}
|
|
|
|
byOrigH[appliedH] = append(byOrigH[appliedH], &queuedEvent{
|
|
trigger: trigID,
|
|
h: appliedH,
|
|
data: data,
|
|
})
|
|
|
|
e.revertQueue[appliedH] = append(e.revertQueue[appliedH], triggerH)
|
|
}
|
|
|
|
// Apply any events that were waiting for this chain height for confidence
|
|
func (e *hcEvents) applyWithConfidence(ts *types.TipSet, height abi.ChainEpoch) {
|
|
byOrigH, ok := e.confQueue[height]
|
|
if !ok {
|
|
return // no triggers at thin height
|
|
}
|
|
|
|
for origH, events := range byOrigH {
|
|
triggerTs, err := e.tsc.get(origH)
|
|
if err != nil {
|
|
log.Errorf("events: applyWithConfidence didn't find tipset for event; wanted %d; current %d", origH, height)
|
|
}
|
|
|
|
for _, event := range events {
|
|
if event.called {
|
|
continue
|
|
}
|
|
|
|
trigger := e.triggers[event.trigger]
|
|
if trigger.disabled {
|
|
continue
|
|
}
|
|
|
|
more, err := trigger.handle(event.data, triggerTs, height)
|
|
if err != nil {
|
|
log.Errorf("chain trigger (@H %d, called @ %d) failed: %s", origH, height, err)
|
|
// log.Errorf("chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, origH, height, err)
|
|
continue // don't revert failed calls
|
|
}
|
|
|
|
event.called = true
|
|
|
|
touts, ok := e.timeouts[trigger.timeout]
|
|
if ok {
|
|
touts[event.trigger]++
|
|
}
|
|
|
|
trigger.disabled = !more
|
|
}
|
|
}
|
|
}
|
|
|
|
// Apply any timeouts that expire at this height
|
|
func (e *hcEvents) applyTimeouts(ts *types.TipSet) {
|
|
triggers, ok := e.timeouts[ts.Height()]
|
|
if !ok {
|
|
return // nothing to do
|
|
}
|
|
|
|
for triggerID, calls := range triggers {
|
|
if calls > 0 {
|
|
continue // don't timeout if the method was called
|
|
}
|
|
trigger := e.triggers[triggerID]
|
|
if trigger.disabled {
|
|
continue
|
|
}
|
|
|
|
timeoutTs, err := e.tsc.get(ts.Height() - abi.ChainEpoch(trigger.confidence))
|
|
if err != nil {
|
|
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-abi.ChainEpoch(trigger.confidence), ts.Height())
|
|
}
|
|
|
|
// more, err := trigger.handle(nil, nil, timeoutTs, ts.Height())
|
|
more, err := trigger.handle(nil, timeoutTs, ts.Height())
|
|
if err != nil {
|
|
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err)
|
|
continue // don't revert failed calls
|
|
}
|
|
|
|
trigger.disabled = !more // allows messages after timeout
|
|
}
|
|
}
|
|
|
|
// Listen for an event
|
|
// - CheckFunc: immediately checks if the event already occured
|
|
// - EventHandler: called when the event has occurred, after confidence tipsets
|
|
// - RevertHandler: called if the chain head changes causing the event to revert
|
|
// - confidence: wait this many tipsets before calling EventHandler
|
|
// - timeout: at this chain height, timeout on waiting for this event
|
|
func (e *hcEvents) onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error) {
|
|
e.lk.Lock()
|
|
defer e.lk.Unlock()
|
|
|
|
// Check if the even has already occurred
|
|
ts := e.tsc.best()
|
|
done, more, err := check(ts)
|
|
if err != nil {
|
|
return 0, xerrors.Errorf("called check error (h: %d): %w", ts.Height(), err)
|
|
}
|
|
if done {
|
|
timeout = NoTimeout
|
|
}
|
|
|
|
// Create a trigger for the event
|
|
id := e.ctr
|
|
e.ctr++
|
|
|
|
e.triggers[id] = &handlerInfo{
|
|
confidence: confidence,
|
|
timeout: timeout + abi.ChainEpoch(confidence),
|
|
|
|
disabled: !more,
|
|
|
|
handle: hnd,
|
|
revert: rev,
|
|
}
|
|
|
|
// If there's a timeout, set up a timeout check at that height
|
|
if timeout != NoTimeout {
|
|
if e.timeouts[timeout+abi.ChainEpoch(confidence)] == nil {
|
|
e.timeouts[timeout+abi.ChainEpoch(confidence)] = map[uint64]int{}
|
|
}
|
|
e.timeouts[timeout+abi.ChainEpoch(confidence)][id] = 0
|
|
}
|
|
|
|
return id, nil
|
|
}
|
|
|
|
// headChangeAPI is used to allow the composed event APIs to call back to hcEvents
|
|
// to listen for changes
|
|
type headChangeAPI interface {
|
|
onHeadChanged(check CheckFunc, hnd EventHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch) (triggerID, error)
|
|
}
|
|
|
|
// watcherEvents watches for a state change
|
|
type watcherEvents struct {
|
|
ctx context.Context
|
|
cs eventAPI
|
|
hcApi headChangeAPI
|
|
|
|
lk sync.RWMutex
|
|
matchers map[triggerID]StateMatchFunc
|
|
}
|
|
|
|
func newWatcherEvents(ctx context.Context, hcApi headChangeAPI, cs eventAPI) watcherEvents {
|
|
return watcherEvents{
|
|
ctx: ctx,
|
|
cs: cs,
|
|
hcApi: hcApi,
|
|
matchers: make(map[triggerID]StateMatchFunc),
|
|
}
|
|
}
|
|
|
|
// Run each of the matchers against the previous and current state to see if
|
|
// there's a change
|
|
func (we *watcherEvents) checkStateChanges(oldState, newState *types.TipSet) map[triggerID]eventData {
|
|
we.lk.RLock()
|
|
defer we.lk.RUnlock()
|
|
|
|
res := make(map[triggerID]eventData)
|
|
for tid, matchFn := range we.matchers {
|
|
ok, data, err := matchFn(we.ctx, oldState, newState)
|
|
if err != nil {
|
|
log.Errorf("event diff fn failed: %s", err)
|
|
continue
|
|
}
|
|
|
|
if ok {
|
|
res[tid] = data
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
// Used to store the state change
|
|
type stateData interface{}
|
|
|
|
// StateChangeHandler arguments:
|
|
// `ts` is the tipset, in which the change occured
|
|
// `curH`-`ts.Height` = `confidence`
|
|
type StateChangeHandler func(ctx context.Context, oldState, newState stateData, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
|
|
|
|
type StateMatchFunc func(ctx context.Context, oldState, newState *types.TipSet) (bool, eventData, error)
|
|
|
|
// StateChanged registers a callback which is triggered when a specified state
|
|
// change occurs or a timeout is reached.
|
|
//
|
|
// * `CheckFunc` callback is invoked immediately with a recent tipset, it
|
|
// returns two booleans - `done`, and `more`.
|
|
//
|
|
// * `done` should be true when some on-chain state change we are waiting
|
|
// for has happened. When `done` is set to true, timeout trigger is disabled.
|
|
//
|
|
// * `more` should be false when we don't want to receive new notifications
|
|
// through StateChangeHandler. Note that notifications may still be delivered to
|
|
// RevertHandler
|
|
//
|
|
// * `StateChangeHandler` is called when the specified state change was observed
|
|
// on-chain, and a confidence threshold was reached, or the specified `timeout`
|
|
// height was reached with no state change observed. When this callback is
|
|
// invoked on a timeout, `oldState` and `newState` are set to nil.
|
|
// This callback returns a boolean specifying whether further notifications
|
|
// should be sent, like `more` return param from `CheckFunc` above.
|
|
//
|
|
// * `RevertHandler` is called after apply handler, when we drop the tipset
|
|
// containing the message. The tipset passed as the argument is the tipset
|
|
// that is being dropped. Note that the message dropped may be re-applied
|
|
// in a different tipset in small amount of time.
|
|
//
|
|
// * `StateMatchFunc` is called against each tipset state. If there is a match,
|
|
// the state change is queued up until the confidence interval has elapsed (and
|
|
// `StateChangeHandler` is called)
|
|
func (we *watcherEvents) StateChanged(check CheckFunc, scHnd StateChangeHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf StateMatchFunc) error {
|
|
hnd := func(data eventData, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
|
|
states, ok := data.([]interface{})
|
|
if !ok || len(states) != 2 {
|
|
panic("expected 2 element array")
|
|
}
|
|
|
|
return scHnd(we.ctx, states[0], states[1], ts, height)
|
|
}
|
|
|
|
id, err := we.hcApi.onHeadChanged(check, hnd, rev, confidence, timeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
we.lk.Lock()
|
|
we.matchers[id] = mf
|
|
defer we.lk.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
// messageEvents watches for message calls to actors
|
|
type messageEvents struct {
|
|
ctx context.Context
|
|
cs eventAPI
|
|
hcApi headChangeAPI
|
|
|
|
lk sync.RWMutex
|
|
matchers map[triggerID][]MsgMatchFunc
|
|
}
|
|
|
|
func newMessageEvents(ctx context.Context, hcApi headChangeAPI, cs eventAPI) messageEvents {
|
|
return messageEvents{
|
|
ctx: ctx,
|
|
cs: cs,
|
|
hcApi: hcApi,
|
|
matchers: map[triggerID][]MsgMatchFunc{},
|
|
}
|
|
}
|
|
|
|
// Check if there are any new actor calls
|
|
func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventData, error) {
|
|
pts, err := me.cs.ChainGetTipSet(me.ctx, ts.Parents()) // we actually care about messages in the parent tipset here
|
|
if err != nil {
|
|
log.Errorf("getting parent tipset in checkNewCalls: %s", err)
|
|
return nil, err
|
|
}
|
|
|
|
res := make(map[triggerID]eventData)
|
|
me.messagesForTs(pts, func(msg *types.Message) {
|
|
me.lk.RLock()
|
|
defer me.lk.RUnlock()
|
|
// TODO: provide receipts
|
|
|
|
for tid, matchFns := range me.matchers {
|
|
var matched bool
|
|
for _, matchFn := range matchFns {
|
|
ok, err := matchFn(msg)
|
|
if err != nil {
|
|
log.Errorf("event matcher failed: %s", err)
|
|
continue
|
|
}
|
|
matched = ok
|
|
|
|
if matched {
|
|
break
|
|
}
|
|
}
|
|
|
|
if matched {
|
|
res[tid] = msg
|
|
break
|
|
}
|
|
}
|
|
})
|
|
|
|
return res, nil
|
|
}
|
|
|
|
// Get the messages in a tipset
|
|
func (me *messageEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) {
|
|
seen := map[cid.Cid]struct{}{}
|
|
|
|
for _, tsb := range ts.Blocks() {
|
|
|
|
msgs, err := me.cs.ChainGetBlockMessages(context.TODO(), tsb.Cid())
|
|
if err != nil {
|
|
log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err)
|
|
// this is quite bad, but probably better than missing all the other updates
|
|
continue
|
|
}
|
|
|
|
for _, m := range msgs.BlsMessages {
|
|
_, ok := seen[m.Cid()]
|
|
if ok {
|
|
continue
|
|
}
|
|
seen[m.Cid()] = struct{}{}
|
|
|
|
consume(m)
|
|
}
|
|
|
|
for _, m := range msgs.SecpkMessages {
|
|
_, ok := seen[m.Message.Cid()]
|
|
if ok {
|
|
continue
|
|
}
|
|
seen[m.Message.Cid()] = struct{}{}
|
|
|
|
consume(&m.Message)
|
|
}
|
|
}
|
|
}
|
|
|
|
// MsgHandler arguments:
|
|
// `ts` is the tipset, in which the `msg` is included.
|
|
// `curH`-`ts.Height` = `confidence`
|
|
type MsgHandler func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (more bool, err error)
|
|
|
|
type MsgMatchFunc func(msg *types.Message) (bool, error)
|
|
|
|
// Called registers a callback which is triggered when a specified method is
|
|
// called on an actor, or a timeout is reached.
|
|
//
|
|
// * `CheckFunc` callback is invoked immediately with a recent tipset, it
|
|
// returns two booleans - `done`, and `more`.
|
|
//
|
|
// * `done` should be true when some on-chain action we are waiting for has
|
|
// happened. When `done` is set to true, timeout trigger is disabled.
|
|
//
|
|
// * `more` should be false when we don't want to receive new notifications
|
|
// through MsgHandler. Note that notifications may still be delivered to
|
|
// RevertHandler
|
|
//
|
|
// * `MsgHandler` is called when the specified event was observed on-chain,
|
|
// and a confidence threshold was reached, or the specified `timeout` height
|
|
// was reached with no events observed. When this callback is invoked on a
|
|
// timeout, `msg` is set to nil. This callback returns a boolean specifying
|
|
// whether further notifications should be sent, like `more` return param
|
|
// from `CheckFunc` above.
|
|
//
|
|
// * `RevertHandler` is called after apply handler, when we drop the tipset
|
|
// containing the message. The tipset passed as the argument is the tipset
|
|
// that is being dropped. Note that the message dropped may be re-applied
|
|
// in a different tipset in small amount of time.
|
|
//
|
|
// * `MsgMatchFunc` is called against each message. If there is a match, the
|
|
// message is queued up until the confidence interval has elapsed (and
|
|
// `MsgHandler` is called)
|
|
func (me *messageEvents) Called(check CheckFunc, msgHnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, mf MsgMatchFunc) error {
|
|
hnd := func(data eventData, ts *types.TipSet, height abi.ChainEpoch) (bool, error) {
|
|
msg, ok := data.(*types.Message)
|
|
if data != nil && !ok {
|
|
panic("expected msg")
|
|
}
|
|
|
|
rec, err := me.cs.StateGetReceipt(me.ctx, msg.Cid(), ts.Key())
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return msgHnd(msg, rec, ts, height)
|
|
}
|
|
|
|
id, err := me.hcApi.onHeadChanged(check, hnd, rev, confidence, timeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
me.lk.Lock()
|
|
defer me.lk.Unlock()
|
|
me.matchers[id] = append(me.matchers[id], mf)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Convenience function for checking and matching messages
|
|
func (me *messageEvents) CalledMsg(ctx context.Context, hnd MsgHandler, rev RevertHandler, confidence int, timeout abi.ChainEpoch, msg types.ChainMsg) error {
|
|
return me.Called(me.CheckMsg(ctx, msg, hnd), hnd, rev, confidence, timeout, me.MatchMsg(msg.VMMessage()))
|
|
}
|