lotus/chain/events/events.go

234 lines
5.6 KiB
Go
Raw Permalink Normal View History

2019-09-05 07:40:50 +00:00
package events
2019-09-03 17:45:55 +00:00
import (
2019-09-18 11:01:52 +00:00
"context"
2019-09-03 17:45:55 +00:00
"sync"
2019-09-18 11:01:52 +00:00
"time"
2019-09-03 17:45:55 +00:00
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
2019-09-18 11:01:52 +00:00
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
2019-09-04 16:09:08 +00:00
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
2019-09-03 17:45:55 +00:00
)
2019-09-05 07:40:50 +00:00
var log = logging.Logger("events")
// HeightHandler `curH`-`ts.Height` = `confidence`
type (
HeightHandler func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error
RevertHandler func(ctx context.Context, ts *types.TipSet) error
)
2019-09-03 17:45:55 +00:00
2019-09-04 16:09:08 +00:00
type heightHandler struct {
2019-09-03 17:45:55 +00:00
confidence int
2019-12-04 12:41:22 +00:00
called bool
2019-09-03 17:45:55 +00:00
2019-09-04 16:09:08 +00:00
handle HeightHandler
revert RevertHandler
2019-09-03 17:45:55 +00:00
}
2021-04-05 11:23:46 +00:00
type EventAPI interface {
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
2019-09-18 11:01:52 +00:00
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
ChainHead(context.Context) (*types.TipSet, error)
2021-04-05 17:56:53 +00:00
StateSearchMsg(ctx context.Context, from types.TipSetKey, msg cid.Cid, limit abi.ChainEpoch, allowReplaced bool) (*api.MsgLookup, error)
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
2019-11-19 21:27:25 +00:00
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) // optional / for CalledMsg
2019-09-03 17:45:55 +00:00
}
type Events struct {
2021-04-05 11:23:46 +00:00
api EventAPI
2019-09-03 17:45:55 +00:00
tsc *tipSetCache
lk sync.Mutex
ready chan struct{}
2019-09-18 11:01:52 +00:00
readyOnce sync.Once
heightEvents
*hcEvents
2019-09-03 17:45:55 +00:00
observers []TipSetObserver
}
2019-09-03 17:45:55 +00:00
2021-04-05 11:23:46 +00:00
func NewEventsWithConfidence(ctx context.Context, api EventAPI, gcConfidence abi.ChainEpoch) *Events {
tsc := newTSCache(gcConfidence, api)
2019-09-04 16:09:08 +00:00
2019-09-03 17:45:55 +00:00
e := &Events{
2019-09-18 11:01:52 +00:00
api: api,
2019-09-03 17:45:55 +00:00
2019-09-04 16:09:08 +00:00
tsc: tsc,
2019-09-03 17:45:55 +00:00
heightEvents: heightEvents{
tsc: tsc,
2019-11-05 14:03:59 +00:00
ctx: ctx,
gcConfidence: gcConfidence,
heightTriggers: map[uint64]*heightHandler{},
2020-02-08 02:18:32 +00:00
htTriggerHeights: map[abi.ChainEpoch][]uint64{},
htHeights: map[abi.ChainEpoch][]uint64{},
},
2019-09-03 17:45:55 +00:00
hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
ready: make(chan struct{}),
observers: []TipSetObserver{},
2019-09-03 17:45:55 +00:00
}
go e.listenHeadChanges(ctx)
2019-09-18 11:01:52 +00:00
// Wait for the first tipset to be seen or bail if shutting down
select {
case <-e.ready:
case <-ctx.Done():
}
2019-09-03 17:45:55 +00:00
return e
}
2021-04-05 11:23:46 +00:00
func NewEvents(ctx context.Context, api EventAPI) *Events {
gcConfidence := 2 * build.ForkLengthThreshold
return NewEventsWithConfidence(ctx, api, gcConfidence)
}
func (e *Events) listenHeadChanges(ctx context.Context) {
for {
if err := e.listenHeadChangesOnce(ctx); err != nil {
log.Errorf("listen head changes errored: %s", err)
} else {
log.Warn("listenHeadChanges quit")
}
2021-02-02 17:43:49 +00:00
select {
case <-build.Clock.After(time.Second):
case <-ctx.Done():
2019-09-18 11:01:52 +00:00
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
return
}
2021-02-02 17:43:49 +00:00
2019-09-18 11:01:52 +00:00
log.Info("restarting listenHeadChanges")
}
2019-09-18 11:01:52 +00:00
}
func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
2019-09-27 11:37:44 +00:00
ctx, cancel := context.WithCancel(ctx)
defer cancel()
2019-09-18 11:01:52 +00:00
notifs, err := e.api.ChainNotify(ctx)
if err != nil {
// Retry is handled by caller
return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
2019-09-18 11:01:52 +00:00
}
var cur []*api.HeadChange
var ok bool
// Wait for first tipset or bail
select {
case cur, ok = <-notifs:
if !ok {
return xerrors.Errorf("notification channel closed")
}
case <-ctx.Done():
return ctx.Err()
2019-09-18 11:01:52 +00:00
}
if len(cur) != 1 {
return xerrors.Errorf("unexpected initial head notification length: %d", len(cur))
2019-09-18 11:01:52 +00:00
}
if cur[0].Type != store.HCCurrent {
return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type)
2019-09-18 11:01:52 +00:00
}
if err := e.tsc.add(cur[0].Val); err != nil {
2020-11-24 16:51:19 +00:00
log.Warnf("tsc.add: adding current tipset failed: %v", err)
2019-09-18 11:01:52 +00:00
}
e.readyOnce.Do(func() {
e.lastTs = cur[0].Val
// Signal that we have seen first tipset
close(e.ready)
2019-09-18 11:01:52 +00:00
})
for notif := range notifs {
var rev, app []*types.TipSet
for _, notif := range notif {
switch notif.Type {
case store.HCRevert:
rev = append(rev, notif.Val)
case store.HCApply:
app = append(app, notif.Val)
default:
log.Warnf("unexpected head change notification type: '%s'", notif.Type)
}
}
if err := e.headChange(ctx, rev, app); err != nil {
2019-09-18 11:01:52 +00:00
log.Warnf("headChange failed: %s", err)
}
// sync with fake chainstore (for tests)
2020-05-15 09:17:13 +00:00
if fcs, ok := e.api.(interface{ notifDone() }); ok {
fcs.notifDone()
}
2019-09-18 11:01:52 +00:00
}
return nil
2019-09-18 11:01:52 +00:00
}
func (e *Events) headChange(ctx context.Context, rev, app []*types.TipSet) error {
2019-09-03 17:59:32 +00:00
if len(app) == 0 {
return xerrors.New("events.headChange expected at least one applied tipset")
}
2019-09-03 17:45:55 +00:00
e.lk.Lock()
defer e.lk.Unlock()
2019-09-03 17:59:32 +00:00
if err := e.headChangeAt(rev, app); err != nil {
return err
2019-09-03 17:45:55 +00:00
}
if err := e.observeChanges(ctx, rev, app); err != nil {
return err
}
return e.processHeadChangeEvent(rev, app)
2019-09-03 17:59:32 +00:00
}
// A TipSetObserver receives notifications of tipsets
type TipSetObserver interface {
Apply(ctx context.Context, ts *types.TipSet) error
Revert(ctx context.Context, ts *types.TipSet) error
}
// TODO: add a confidence level so we can have observers with difference levels of confidence
func (e *Events) Observe(obs TipSetObserver) error {
e.lk.Lock()
defer e.lk.Unlock()
e.observers = append(e.observers, obs)
return nil
}
// observeChanges expects caller to hold e.lk
func (e *Events) observeChanges(ctx context.Context, rev, app []*types.TipSet) error {
for _, ts := range rev {
for _, o := range e.observers {
_ = o.Revert(ctx, ts)
}
}
for _, ts := range app {
for _, o := range e.observers {
_ = o.Apply(ctx, ts)
}
}
return nil
}