chain events: Restructure the logic
This commit is contained in:
parent
4d0d51b58c
commit
289470a466
@ -1,39 +1,23 @@
|
|||||||
package store
|
package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
"golang.org/x/xerrors"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-lotus/build"
|
"github.com/filecoin-project/go-lotus/build"
|
||||||
"github.com/filecoin-project/go-lotus/chain/address"
|
|
||||||
"github.com/filecoin-project/go-lotus/chain/types"
|
"github.com/filecoin-project/go-lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CheckFunc is used before one-shoot callbacks for atomicity
|
|
||||||
// guarantees. If the condition the callbacks wait for has already happened in
|
|
||||||
// tipset `ts`, this function MUST return true
|
|
||||||
type CheckFunc func(ts *types.TipSet) (bool, error)
|
|
||||||
|
|
||||||
// `ts` is the tipset, in which the `msg` is included.
|
|
||||||
// `curH`-`ts.Height` = `confidence`
|
// `curH`-`ts.Height` = `confidence`
|
||||||
type HandleFunc func(msg *types.Message, ts *types.TipSet, curH uint64) error
|
type HeightHandler func(ts *types.TipSet, curH uint64) error
|
||||||
type RevertFunc func(ts *types.TipSet) error
|
type RevertHandler func(ts *types.TipSet) error
|
||||||
|
|
||||||
type handler struct {
|
type heightHandler struct {
|
||||||
confidence int
|
confidence int
|
||||||
|
|
||||||
handle HandleFunc
|
handle HeightHandler
|
||||||
revert RevertFunc
|
revert RevertHandler
|
||||||
|
|
||||||
msg *types.Message
|
|
||||||
disable bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type callTuple struct {
|
|
||||||
actor address.Address
|
|
||||||
method uint64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type eventChainStore interface {
|
type eventChainStore interface {
|
||||||
@ -54,33 +38,38 @@ type Events struct {
|
|||||||
|
|
||||||
// ChainAt
|
// ChainAt
|
||||||
|
|
||||||
heightTriggers map[uint64]*handler
|
heightTriggers map[uint64]*heightHandler
|
||||||
|
|
||||||
htTriggerHeights map[uint64][]uint64
|
htTriggerHeights map[uint64][]uint64
|
||||||
htHeights map[uint64][]uint64
|
htHeights map[uint64][]uint64
|
||||||
|
|
||||||
// Called
|
calledEvents
|
||||||
|
|
||||||
calledTriggers map[uint64]handler
|
|
||||||
|
|
||||||
ctTriggers map[callTuple][]uint64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEvents(cs eventChainStore) *Events {
|
func NewEvents(cs eventChainStore) *Events {
|
||||||
gcConfidence := 2 * build.ForkLengthThreshold
|
gcConfidence := 2 * build.ForkLengthThreshold
|
||||||
|
|
||||||
|
tsc := newTSCache(gcConfidence)
|
||||||
|
|
||||||
e := &Events{
|
e := &Events{
|
||||||
cs: cs,
|
cs: cs,
|
||||||
gcConfidence: uint64(gcConfidence),
|
gcConfidence: uint64(gcConfidence),
|
||||||
|
|
||||||
tsc: newTSCache(gcConfidence),
|
tsc: tsc,
|
||||||
|
|
||||||
heightTriggers: map[uint64]*handler{},
|
heightTriggers: map[uint64]*heightHandler{},
|
||||||
htTriggerHeights: map[uint64][]uint64{},
|
htTriggerHeights: map[uint64][]uint64{},
|
||||||
htHeights: map[uint64][]uint64{},
|
htHeights: map[uint64][]uint64{},
|
||||||
|
|
||||||
calledTriggers: map[uint64]handler{},
|
calledEvents: calledEvents{
|
||||||
ctTriggers: map[callTuple][]uint64{},
|
cs: cs,
|
||||||
|
tsc: tsc,
|
||||||
|
|
||||||
|
confQueue: map[uint64]map[uint64][]queuedEvent{},
|
||||||
|
revertQueue: map[uint64][]uint64{},
|
||||||
|
triggers: map[uint64]callHandler{},
|
||||||
|
callTuples: map[callTuple][]uint64{},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = e.tsc.add(cs.GetHeaviestTipSet())
|
_ = e.tsc.add(cs.GetHeaviestTipSet())
|
||||||
@ -105,226 +94,3 @@ func (e *Events) headChange(rev, app []*types.TipSet) error {
|
|||||||
|
|
||||||
return e.headChangeCalled(rev, app)
|
return e.headChangeCalled(rev, app)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Events) headChangeAt(rev, app []*types.TipSet) error {
|
|
||||||
// highest tipset is always the first (see cs.ReorgOps)
|
|
||||||
newH := app[0].Height()
|
|
||||||
|
|
||||||
for _, ts := range rev {
|
|
||||||
// TODO: log error if h below gcconfidence
|
|
||||||
// revert height-based triggers
|
|
||||||
|
|
||||||
for _, tid := range e.htHeights[ts.Height()] {
|
|
||||||
// don't revert if newH is above this ts
|
|
||||||
if newH >= ts.Height() {
|
|
||||||
if e.heightTriggers[tid].msg != nil {
|
|
||||||
// TODO: optimization: don't revert if app[newH - ts.Height()] contains the msg
|
|
||||||
} else {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
err := e.heightTriggers[tid].revert(ts)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("reverting chain trigger (@H %d): %s", ts.Height(), err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := e.tsc.revert(ts); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, ts := range app {
|
|
||||||
if err := e.tsc.add(ts); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// height triggers
|
|
||||||
|
|
||||||
for _, tid := range e.htTriggerHeights[ts.Height()] {
|
|
||||||
hnd := e.heightTriggers[tid]
|
|
||||||
if hnd.disable {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
triggerH := ts.Height() - uint64(hnd.confidence)
|
|
||||||
|
|
||||||
incTs, err := e.tsc.get(triggerH)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := hnd.handle(hnd.msg, incTs, ts.Height()); err != nil {
|
|
||||||
msgInfo := ""
|
|
||||||
if hnd.msg != nil {
|
|
||||||
msgInfo = fmt.Sprintf("call %s(%d), ", hnd.msg.To, hnd.msg.Method)
|
|
||||||
}
|
|
||||||
log.Errorf("chain trigger (%s@H %d, called @ %d) failed: %s", msgInfo, triggerH, ts.Height(), err)
|
|
||||||
}
|
|
||||||
hnd.disable = hnd.msg != nil // special case for Called
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *Events) headChangeCalled(rev, app []*types.TipSet) error {
|
|
||||||
for _, ts := range rev {
|
|
||||||
_ = ts
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, ts := range app {
|
|
||||||
// called triggers
|
|
||||||
|
|
||||||
err := e.messagesForTs(ts, func(msg *types.Message) error {
|
|
||||||
// TODO: do we have to verify the receipt, or are messages on chain
|
|
||||||
// guaranteed to be successful?
|
|
||||||
|
|
||||||
ct := callTuple{
|
|
||||||
actor: msg.To,
|
|
||||||
method: msg.Method,
|
|
||||||
}
|
|
||||||
|
|
||||||
triggers, ok := e.ctTriggers[ct]
|
|
||||||
if !ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tid := range triggers {
|
|
||||||
trigger := e.calledTriggers[tid]
|
|
||||||
|
|
||||||
err := e.chainAt(trigger.handle, trigger.revert, msg, trigger.confidence, ts.Height())
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("chain trigger (call %s(%d), msg found @ %d) failed: %s", msg.To, msg.Method, ts.Height(), err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *Events) messagesForTs(ts *types.TipSet, consume func(*types.Message) error) error {
|
|
||||||
seen := map[cid.Cid]struct{}{}
|
|
||||||
|
|
||||||
for _, tsb := range ts.Blocks() {
|
|
||||||
bmsgs, smsgs, err := e.cs.MessagesForBlock(tsb)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, m := range bmsgs {
|
|
||||||
_, ok := seen[m.Cid()]
|
|
||||||
if ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
seen[m.Cid()] = struct{}{}
|
|
||||||
|
|
||||||
if err := consume(m); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, m := range smsgs {
|
|
||||||
_, ok := seen[m.Message.Cid()]
|
|
||||||
if ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
seen[m.Message.Cid()] = struct{}{}
|
|
||||||
|
|
||||||
if err := consume(&m.Message); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *Events) CalledOnce(check CheckFunc, hnd HandleFunc, rev RevertFunc, confidence int, actor address.Address, method uint64) error {
|
|
||||||
e.lk.Lock()
|
|
||||||
defer e.lk.Unlock()
|
|
||||||
|
|
||||||
// TODO: this should use older tipset, and take reverts into account
|
|
||||||
done, err := check(e.tsc.best())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if done {
|
|
||||||
// Already happened, don't bother registering callback
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
id := e.ctr
|
|
||||||
e.ctr++
|
|
||||||
|
|
||||||
e.calledTriggers[id] = handler{
|
|
||||||
confidence: confidence,
|
|
||||||
|
|
||||||
handle: hnd,
|
|
||||||
revert: rev,
|
|
||||||
}
|
|
||||||
|
|
||||||
ct := callTuple{
|
|
||||||
actor: actor,
|
|
||||||
method: method,
|
|
||||||
}
|
|
||||||
|
|
||||||
e.ctTriggers[ct] = append(e.ctTriggers[ct], id)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *Events) NotCalledBy(check CheckFunc, hnd HandleFunc, rev RevertFunc, confidence int, actor address.Address, method uint64, h uint64) {
|
|
||||||
panic("impl")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *Events) ChainAt(hnd HandleFunc, rev RevertFunc, confidence int, h uint64) error {
|
|
||||||
e.lk.Lock()
|
|
||||||
defer e.lk.Unlock()
|
|
||||||
|
|
||||||
return e.chainAt(hnd, rev, nil, confidence, h)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *Events) chainAt(hnd HandleFunc, rev RevertFunc, msg *types.Message, confidence int, h uint64) error {
|
|
||||||
bestH := e.tsc.best().Height()
|
|
||||||
|
|
||||||
if bestH >= h+uint64(confidence) {
|
|
||||||
ts, err := e.tsc.get(h)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("events.ChainAt: calling HandleFunc with nil tipset, not found in cache: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := hnd(msg, ts, bestH); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if bestH >= h+uint64(confidence)+e.gcConfidence {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
triggerAt := h + uint64(confidence)
|
|
||||||
|
|
||||||
id := e.ctr
|
|
||||||
e.ctr++
|
|
||||||
|
|
||||||
e.heightTriggers[id] = &handler{
|
|
||||||
confidence: confidence,
|
|
||||||
|
|
||||||
handle: hnd,
|
|
||||||
revert: rev,
|
|
||||||
|
|
||||||
msg: msg,
|
|
||||||
}
|
|
||||||
|
|
||||||
e.htHeights[h] = append(e.htHeights[h], id)
|
|
||||||
e.htTriggerHeights[triggerAt] = append(e.htTriggerHeights[triggerAt], id)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
227
chain/store/events_called.go
Normal file
227
chain/store/events_called.go
Normal file
@ -0,0 +1,227 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-lotus/chain/address"
|
||||||
|
"github.com/filecoin-project/go-lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// continous
|
||||||
|
// with reverts
|
||||||
|
// no timout
|
||||||
|
|
||||||
|
// `ts` is the tipset, in which the `msg` is included.
|
||||||
|
// `curH`-`ts.Height` = `confidence`
|
||||||
|
type CalledHandler func(msg *types.Message, ts *types.TipSet, curH uint64) error
|
||||||
|
|
||||||
|
// CheckFunc is used before one-shoot callbacks for atomicity
|
||||||
|
// guarantees. If the condition the callbacks wait for has already happened in
|
||||||
|
// tipset `ts`, this function MUST return true
|
||||||
|
type CheckFunc func(ts *types.TipSet) (bool, error)
|
||||||
|
|
||||||
|
type callHandler struct {
|
||||||
|
confidence int
|
||||||
|
timeout uint64
|
||||||
|
|
||||||
|
handle CalledHandler
|
||||||
|
revert RevertHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
type queuedEvent struct {
|
||||||
|
trigger uint64
|
||||||
|
|
||||||
|
h uint64
|
||||||
|
msg *types.Message
|
||||||
|
}
|
||||||
|
|
||||||
|
type calledEvents struct {
|
||||||
|
cs eventChainStore
|
||||||
|
tsc *tipSetCache
|
||||||
|
|
||||||
|
lk sync.Mutex
|
||||||
|
|
||||||
|
ctr uint64
|
||||||
|
|
||||||
|
// maps block heights to events
|
||||||
|
// [triggerH][msgH][event]
|
||||||
|
confQueue map[uint64]map[uint64][]queuedEvent
|
||||||
|
|
||||||
|
// [msgH][triggerH]
|
||||||
|
revertQueue map[uint64][]uint64
|
||||||
|
|
||||||
|
triggers map[uint64]callHandler
|
||||||
|
callTuples map[callTuple][]uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type callTuple struct {
|
||||||
|
actor address.Address
|
||||||
|
method uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error {
|
||||||
|
for _, ts := range rev {
|
||||||
|
e.handleReverts(ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ts := range app {
|
||||||
|
// called triggers
|
||||||
|
|
||||||
|
err := e.checkNewCalls(ts)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
e.applyWithConfidence(ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *calledEvents) handleReverts(ts *types.TipSet) {
|
||||||
|
reverts, ok := e.revertQueue[ts.Height()]
|
||||||
|
if !ok {
|
||||||
|
return // nothing to do
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, triggerH := range reverts {
|
||||||
|
toRevert := e.confQueue[triggerH][ts.Height()]
|
||||||
|
for _, event := range toRevert {
|
||||||
|
trigger := e.triggers[event.trigger]
|
||||||
|
if err := trigger.revert(ts); err != nil {
|
||||||
|
log.Errorf("reverting chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, ts.Height(), triggerH, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *calledEvents) checkNewCalls(ts *types.TipSet) error {
|
||||||
|
return e.messagesForTs(ts, func(msg *types.Message) {
|
||||||
|
// TODO: do we have to verify the receipt, or are messages on chain
|
||||||
|
// guaranteed to be successful?
|
||||||
|
|
||||||
|
ct := callTuple{
|
||||||
|
actor: msg.To,
|
||||||
|
method: msg.Method,
|
||||||
|
}
|
||||||
|
|
||||||
|
triggers, ok := e.callTuples[ct]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tid := range triggers {
|
||||||
|
e.queueForConfidence(tid, msg, ts)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *calledEvents) queueForConfidence(triggerId uint64, msg *types.Message, ts *types.TipSet) {
|
||||||
|
trigger := e.triggers[triggerId]
|
||||||
|
triggerH := ts.Height() + uint64(trigger.confidence)
|
||||||
|
|
||||||
|
byOrigH, ok := e.confQueue[triggerH]
|
||||||
|
if !ok {
|
||||||
|
byOrigH = map[uint64][]queuedEvent{}
|
||||||
|
e.confQueue[triggerH] = byOrigH
|
||||||
|
}
|
||||||
|
|
||||||
|
byOrigH[ts.Height()] = append(byOrigH[ts.Height()], queuedEvent{
|
||||||
|
trigger: triggerId,
|
||||||
|
h: ts.Height(),
|
||||||
|
msg: msg,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *calledEvents) applyWithConfidence(ts *types.TipSet) {
|
||||||
|
byOrigH, ok := e.confQueue[ts.Height()]
|
||||||
|
if !ok {
|
||||||
|
return // no triggers at thin height
|
||||||
|
}
|
||||||
|
|
||||||
|
for origH, events := range byOrigH {
|
||||||
|
triggerTs, err := e.tsc.get(origH)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("events: applyWithConfidence didn't find tipset for event; wanted %d; current %d", origH, ts.Height())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, event := range events {
|
||||||
|
trigger := e.triggers[event.trigger]
|
||||||
|
if err := trigger.handle(event.msg, triggerTs, ts.Height()); err != nil {
|
||||||
|
log.Errorf("chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, origH, ts.Height(), err)
|
||||||
|
continue // don't revert failed calls
|
||||||
|
}
|
||||||
|
|
||||||
|
e.revertQueue[origH] = append(e.revertQueue[origH], ts.Height())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) error {
|
||||||
|
seen := map[cid.Cid]struct{}{}
|
||||||
|
|
||||||
|
for _, tsb := range ts.Blocks() {
|
||||||
|
bmsgs, smsgs, err := e.cs.MessagesForBlock(tsb)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, m := range bmsgs {
|
||||||
|
_, ok := seen[m.Cid()]
|
||||||
|
if ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
seen[m.Cid()] = struct{}{}
|
||||||
|
|
||||||
|
consume(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, m := range smsgs {
|
||||||
|
_, ok := seen[m.Message.Cid()]
|
||||||
|
if ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
seen[m.Message.Cid()] = struct{}{}
|
||||||
|
|
||||||
|
consume(&m.Message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHandler, confidence int, actor address.Address, method uint64) error {
|
||||||
|
e.lk.Lock()
|
||||||
|
defer e.lk.Unlock()
|
||||||
|
|
||||||
|
// TODO: this should use older tipset, and take reverts into account
|
||||||
|
done, err := check(e.tsc.best())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if done {
|
||||||
|
// Already happened, don't bother registering callback
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
id := e.ctr
|
||||||
|
e.ctr++
|
||||||
|
|
||||||
|
e.triggers[id] = callHandler{
|
||||||
|
confidence: confidence,
|
||||||
|
timeout: math.MaxUint64, // TODO
|
||||||
|
|
||||||
|
handle: hnd,
|
||||||
|
revert: rev,
|
||||||
|
}
|
||||||
|
|
||||||
|
ct := callTuple{
|
||||||
|
actor: actor,
|
||||||
|
method: method,
|
||||||
|
}
|
||||||
|
|
||||||
|
e.callTuples[ct] = append(e.callTuples[ct], id)
|
||||||
|
return nil
|
||||||
|
}
|
95
chain/store/events_height.go
Normal file
95
chain/store/events_height.go
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/filecoin-project/go-lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (e *Events) headChangeAt(rev, app []*types.TipSet) error {
|
||||||
|
// highest tipset is always the first (see cs.ReorgOps)
|
||||||
|
newH := app[0].Height()
|
||||||
|
|
||||||
|
for _, ts := range rev {
|
||||||
|
// TODO: log error if h below gcconfidence
|
||||||
|
// revert height-based triggers
|
||||||
|
|
||||||
|
for _, tid := range e.htHeights[ts.Height()] {
|
||||||
|
// don't revert if newH is above this ts
|
||||||
|
if newH >= ts.Height() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err := e.heightTriggers[tid].revert(ts)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("reverting chain trigger (@H %d): %s", ts.Height(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := e.tsc.revert(ts); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ts := range app {
|
||||||
|
if err := e.tsc.add(ts); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// height triggers
|
||||||
|
|
||||||
|
for _, tid := range e.htTriggerHeights[ts.Height()] {
|
||||||
|
hnd := e.heightTriggers[tid]
|
||||||
|
triggerH := ts.Height() - uint64(hnd.confidence)
|
||||||
|
|
||||||
|
incTs, err := e.tsc.get(triggerH)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := hnd.handle(incTs, ts.Height()); err != nil {
|
||||||
|
msgInfo := ""
|
||||||
|
log.Errorf("chain trigger (%s@H %d, called @ %d) failed: %s", msgInfo, triggerH, ts.Height(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Events) ChainAt(hnd HeightHandler, rev RevertHandler, confidence int, h uint64) error {
|
||||||
|
e.lk.Lock()
|
||||||
|
defer e.lk.Unlock()
|
||||||
|
|
||||||
|
bestH := e.tsc.best().Height()
|
||||||
|
|
||||||
|
if bestH >= h+uint64(confidence) {
|
||||||
|
ts, err := e.tsc.get(h)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("events.ChainAt: calling HandleFunc with nil tipset, not found in cache: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := hnd(ts, bestH); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if bestH >= h+uint64(confidence)+e.gcConfidence {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
triggerAt := h + uint64(confidence)
|
||||||
|
|
||||||
|
id := e.ctr
|
||||||
|
e.ctr++
|
||||||
|
|
||||||
|
e.heightTriggers[id] = &heightHandler{
|
||||||
|
confidence: confidence,
|
||||||
|
|
||||||
|
handle: hnd,
|
||||||
|
revert: rev,
|
||||||
|
}
|
||||||
|
|
||||||
|
e.htHeights[h] = append(e.htHeights[h], id)
|
||||||
|
e.htTriggerHeights[triggerAt] = append(e.htTriggerHeights[triggerAt], id)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
@ -131,10 +131,9 @@ func TestAt(t *testing.T) {
|
|||||||
var applied bool
|
var applied bool
|
||||||
var reverted bool
|
var reverted bool
|
||||||
|
|
||||||
err := events.ChainAt(func(msg *types.Message, ts *types.TipSet, curH uint64) error {
|
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error {
|
||||||
require.Equal(t, 5, int(ts.Height()))
|
require.Equal(t, 5, int(ts.Height()))
|
||||||
require.Equal(t, 8, int(curH))
|
require.Equal(t, 8, int(curH))
|
||||||
require.Nil(t, msg)
|
|
||||||
applied = true
|
applied = true
|
||||||
return nil
|
return nil
|
||||||
}, func(ts *types.TipSet) error {
|
}, func(ts *types.TipSet) error {
|
||||||
@ -198,7 +197,7 @@ func TestCalled(t *testing.T) {
|
|||||||
var appliedTs *types.TipSet
|
var appliedTs *types.TipSet
|
||||||
var appliedH uint64
|
var appliedH uint64
|
||||||
|
|
||||||
err = events.CalledOnce(func(ts *types.TipSet) (b bool, e error) {
|
err = events.Called(func(ts *types.TipSet) (b bool, e error) {
|
||||||
return false, nil
|
return false, nil
|
||||||
}, func(msg *types.Message, ts *types.TipSet, curH uint64) error {
|
}, func(msg *types.Message, ts *types.TipSet, curH uint64) error {
|
||||||
applied = true
|
applied = true
|
||||||
@ -310,6 +309,6 @@ func TestCalled(t *testing.T) {
|
|||||||
0: n2msg,
|
0: n2msg,
|
||||||
})
|
})
|
||||||
|
|
||||||
// require.Equal(t, false, applied) TODO: FIX!
|
require.Equal(t, false, applied)
|
||||||
require.Equal(t, false, reverted)
|
require.Equal(t, false, reverted)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user