lotus/chain/events/events_height.go

210 lines
4.3 KiB
Go
Raw Normal View History

2019-09-05 07:40:50 +00:00
package events
2019-09-04 16:09:08 +00:00
import (
2019-11-05 14:03:59 +00:00
"context"
"sync"
2019-09-05 07:40:50 +00:00
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
2019-11-08 20:11:56 +00:00
"go.opencensus.io/trace"
2020-09-07 03:49:10 +00:00
"golang.org/x/xerrors"
2019-11-08 20:11:56 +00:00
"github.com/filecoin-project/lotus/chain/types"
2019-09-04 16:09:08 +00:00
)
type heightEvents struct {
lk sync.Mutex
tsc *tipSetCache
2020-02-08 02:18:32 +00:00
gcConfidence abi.ChainEpoch
ctr triggerID
heightTriggers map[triggerID]*heightHandler
htTriggerHeights map[triggerH][]triggerID
htHeights map[msgH][]triggerID
2019-11-05 14:03:59 +00:00
ctx context.Context
}
func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
2019-11-05 14:36:44 +00:00
ctx, span := trace.StartSpan(e.ctx, "events.HeightHeadChange")
defer span.End()
span.AddAttributes(trace.Int64Attribute("endHeight", int64(app[0].Height())))
span.AddAttributes(trace.Int64Attribute("reverts", int64(len(rev))))
span.AddAttributes(trace.Int64Attribute("applies", int64(len(app))))
e.lk.Lock()
defer e.lk.Unlock()
2019-09-04 16:09:08 +00:00
for _, ts := range rev {
// TODO: log error if h below gcconfidence
// revert height-based triggers
2020-02-08 02:18:32 +00:00
revert := func(h abi.ChainEpoch, ts *types.TipSet) {
2019-10-04 22:43:04 +00:00
for _, tid := range e.htHeights[h] {
2019-11-05 14:36:44 +00:00
ctx, span := trace.StartSpan(ctx, "events.HeightRevert")
rev := e.heightTriggers[tid].revert
e.lk.Unlock()
err := rev(ctx, ts)
e.lk.Lock()
2019-12-04 12:41:22 +00:00
e.heightTriggers[tid].called = false
2019-11-05 14:36:44 +00:00
span.End()
2019-10-04 22:43:04 +00:00
if err != nil {
log.Errorf("reverting chain trigger (@H %d): %s", h, err)
}
2019-09-04 16:09:08 +00:00
}
2019-10-04 22:43:04 +00:00
}
revert(ts.Height(), ts)
2019-09-04 16:09:08 +00:00
2019-10-04 22:43:04 +00:00
subh := ts.Height() - 1
for {
cts, err := e.tsc.get(subh)
2019-09-04 16:09:08 +00:00
if err != nil {
2019-10-04 22:43:04 +00:00
return err
}
if cts != nil {
break
2019-09-04 16:09:08 +00:00
}
2019-10-04 22:43:04 +00:00
2019-10-04 23:49:18 +00:00
revert(subh, ts)
2019-10-04 22:43:04 +00:00
subh--
2019-09-04 16:09:08 +00:00
}
if err := e.tsc.revert(ts); err != nil {
return err
}
}
2019-09-05 07:57:12 +00:00
for i := range app {
ts := app[i]
2019-09-05 07:57:12 +00:00
2019-09-04 16:09:08 +00:00
if err := e.tsc.add(ts); err != nil {
return err
}
// height triggers
2020-02-08 02:18:32 +00:00
apply := func(h abi.ChainEpoch, ts *types.TipSet) error {
2019-10-04 22:43:04 +00:00
for _, tid := range e.htTriggerHeights[h] {
hnd := e.heightTriggers[tid]
2019-12-04 12:41:22 +00:00
if hnd.called {
return nil
}
2020-02-08 02:18:32 +00:00
triggerH := h - abi.ChainEpoch(hnd.confidence)
2019-10-04 22:43:04 +00:00
incTs, err := e.tsc.getNonNull(triggerH)
2019-10-04 22:43:04 +00:00
if err != nil {
return err
}
2019-11-05 14:36:44 +00:00
ctx, span := trace.StartSpan(ctx, "events.HeightApply")
span.AddAttributes(trace.BoolAttribute("immediate", false))
handle := hnd.handle
e.lk.Unlock()
err = handle(ctx, incTs, h)
e.lk.Lock()
2020-06-29 11:13:28 +00:00
hnd.called = true
2019-11-05 14:36:44 +00:00
span.End()
if err != nil {
2019-11-24 16:35:50 +00:00
log.Errorf("chain trigger (@H %d, called @ %d) failed: %+v", triggerH, ts.Height(), err)
2019-10-04 22:43:04 +00:00
}
}
return nil
}
2019-09-04 16:09:08 +00:00
2019-10-04 22:43:04 +00:00
if err := apply(ts.Height(), ts); err != nil {
return err
}
subh := ts.Height() - 1
for {
cts, err := e.tsc.get(subh)
2019-09-04 16:09:08 +00:00
if err != nil {
return err
}
2019-10-04 22:43:04 +00:00
if cts != nil {
break
}
2019-10-04 23:49:18 +00:00
if err := apply(subh, ts); err != nil {
2019-10-04 22:43:04 +00:00
return err
2019-09-04 16:09:08 +00:00
}
2019-10-04 22:43:04 +00:00
subh--
2019-09-04 16:09:08 +00:00
}
2019-10-04 22:43:04 +00:00
2019-09-04 16:09:08 +00:00
}
return nil
}
2019-09-05 08:27:08 +00:00
// ChainAt invokes the specified `HeightHandler` when the chain reaches the
// specified height+confidence threshold. If the chain is rolled-back under the
// specified height, `RevertHandler` will be called.
2019-10-04 23:49:18 +00:00
//
// ts passed to handlers is the tipset at the specified, or above, if lower tipsets were null
2020-02-08 02:18:32 +00:00
func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence int, h abi.ChainEpoch) error {
e.lk.Lock() // Tricky locking, check your locks if you modify this function!
2019-09-04 16:09:08 +00:00
best, err := e.tsc.best()
if err != nil {
return xerrors.Errorf("error getting best tipset: %w", err)
}
2019-09-04 16:09:08 +00:00
bestH := best.Height()
2020-02-08 02:18:32 +00:00
if bestH >= h+abi.ChainEpoch(confidence) {
ts, err := e.tsc.getNonNull(h)
2019-09-04 16:09:08 +00:00
if err != nil {
log.Warnf("events.ChainAt: calling HandleFunc with nil tipset, not found in cache: %s", err)
}
e.lk.Unlock()
2019-11-05 14:36:44 +00:00
ctx, span := trace.StartSpan(e.ctx, "events.HeightApply")
span.AddAttributes(trace.BoolAttribute("immediate", true))
err = hnd(ctx, ts, bestH)
span.End()
if err != nil {
2019-09-04 16:09:08 +00:00
return err
}
2019-11-05 14:36:44 +00:00
e.lk.Lock()
best, err = e.tsc.best()
if err != nil {
return xerrors.Errorf("error getting best tipset: %w", err)
}
bestH = best.Height()
2019-09-04 16:09:08 +00:00
}
defer e.lk.Unlock()
2020-02-08 02:18:32 +00:00
if bestH >= h+abi.ChainEpoch(confidence)+e.gcConfidence {
2019-09-04 16:09:08 +00:00
return nil
}
2020-02-08 02:18:32 +00:00
triggerAt := h + abi.ChainEpoch(confidence)
2019-09-04 16:09:08 +00:00
id := e.ctr
e.ctr++
e.heightTriggers[id] = &heightHandler{
confidence: confidence,
handle: hnd,
revert: rev,
}
e.htHeights[h] = append(e.htHeights[h], id)
e.htTriggerHeights[triggerAt] = append(e.htTriggerHeights[triggerAt], id)
return nil
}