2019-09-05 07:40:50 +00:00
|
|
|
package events
|
2019-09-03 17:45:55 +00:00
|
|
|
|
|
|
|
import (
|
2019-09-18 11:01:52 +00:00
|
|
|
"context"
|
2019-09-03 17:45:55 +00:00
|
|
|
"sync"
|
2019-09-18 11:01:52 +00:00
|
|
|
"time"
|
2019-09-03 17:45:55 +00:00
|
|
|
|
2020-02-08 02:18:32 +00:00
|
|
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
2019-09-18 11:01:52 +00:00
|
|
|
"github.com/ipfs/go-cid"
|
2020-01-08 19:10:57 +00:00
|
|
|
logging "github.com/ipfs/go-log/v2"
|
2019-09-04 16:09:08 +00:00
|
|
|
"golang.org/x/xerrors"
|
|
|
|
|
2019-12-19 20:13:17 +00:00
|
|
|
"github.com/filecoin-project/go-address"
|
2019-10-18 04:47:41 +00:00
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
|
|
"github.com/filecoin-project/lotus/build"
|
|
|
|
"github.com/filecoin-project/lotus/chain/store"
|
|
|
|
"github.com/filecoin-project/lotus/chain/types"
|
2019-09-03 17:45:55 +00:00
|
|
|
)
|
|
|
|
|
2019-09-05 07:40:50 +00:00
|
|
|
var log = logging.Logger("events")
|
|
|
|
|
2020-06-02 14:29:39 +00:00
|
|
|
// HeightHandler `curH`-`ts.Height` = `confidence`
|
2020-02-08 02:18:32 +00:00
|
|
|
type HeightHandler func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error
|
2019-11-05 14:03:59 +00:00
|
|
|
type RevertHandler func(ctx context.Context, ts *types.TipSet) error
|
2019-09-03 17:45:55 +00:00
|
|
|
|
2019-09-04 16:09:08 +00:00
|
|
|
type heightHandler struct {
|
2019-09-03 17:45:55 +00:00
|
|
|
confidence int
|
2019-12-04 12:41:22 +00:00
|
|
|
called bool
|
2019-09-03 17:45:55 +00:00
|
|
|
|
2019-09-04 16:09:08 +00:00
|
|
|
handle HeightHandler
|
|
|
|
revert RevertHandler
|
2019-09-03 17:45:55 +00:00
|
|
|
}
|
|
|
|
|
2020-06-02 14:29:39 +00:00
|
|
|
type eventAPI interface {
|
2020-04-23 22:15:00 +00:00
|
|
|
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
|
2019-09-18 11:01:52 +00:00
|
|
|
ChainGetBlockMessages(context.Context, cid.Cid) (*api.BlockMessages, error)
|
2020-02-24 17:32:02 +00:00
|
|
|
ChainGetTipSetByHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
|
2020-02-11 23:29:45 +00:00
|
|
|
StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error)
|
2020-05-13 22:48:36 +00:00
|
|
|
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
|
2019-11-19 21:27:25 +00:00
|
|
|
|
2020-02-11 23:29:45 +00:00
|
|
|
StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) // optional / for CalledMsg
|
2019-09-03 17:45:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type Events struct {
|
2020-06-02 14:29:39 +00:00
|
|
|
api eventAPI
|
2019-09-03 17:45:55 +00:00
|
|
|
|
|
|
|
tsc *tipSetCache
|
|
|
|
lk sync.Mutex
|
|
|
|
|
2019-09-18 11:01:52 +00:00
|
|
|
ready sync.WaitGroup
|
|
|
|
readyOnce sync.Once
|
|
|
|
|
2019-09-05 07:36:11 +00:00
|
|
|
heightEvents
|
2020-06-26 19:42:44 +00:00
|
|
|
*hcEvents
|
2019-09-03 17:45:55 +00:00
|
|
|
}
|
|
|
|
|
2020-06-02 14:29:39 +00:00
|
|
|
func NewEvents(ctx context.Context, api eventAPI) *Events {
|
2019-09-03 17:45:55 +00:00
|
|
|
gcConfidence := 2 * build.ForkLengthThreshold
|
|
|
|
|
2019-09-18 13:32:21 +00:00
|
|
|
tsc := newTSCache(gcConfidence, api.ChainGetTipSetByHeight)
|
2019-09-04 16:09:08 +00:00
|
|
|
|
2019-09-03 17:45:55 +00:00
|
|
|
e := &Events{
|
2019-09-18 11:01:52 +00:00
|
|
|
api: api,
|
2019-09-03 17:45:55 +00:00
|
|
|
|
2019-09-04 16:09:08 +00:00
|
|
|
tsc: tsc,
|
2019-09-03 17:45:55 +00:00
|
|
|
|
2019-09-05 07:36:11 +00:00
|
|
|
heightEvents: heightEvents{
|
|
|
|
tsc: tsc,
|
2019-11-05 14:03:59 +00:00
|
|
|
ctx: ctx,
|
2020-05-27 20:53:20 +00:00
|
|
|
gcConfidence: gcConfidence,
|
2019-09-05 07:36:11 +00:00
|
|
|
|
|
|
|
heightTriggers: map[uint64]*heightHandler{},
|
2020-02-08 02:18:32 +00:00
|
|
|
htTriggerHeights: map[abi.ChainEpoch][]uint64{},
|
|
|
|
htHeights: map[abi.ChainEpoch][]uint64{},
|
2019-09-05 07:36:11 +00:00
|
|
|
},
|
2019-09-03 17:45:55 +00:00
|
|
|
|
2020-06-26 19:42:44 +00:00
|
|
|
hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)),
|
2019-09-03 17:45:55 +00:00
|
|
|
}
|
|
|
|
|
2019-09-18 11:01:52 +00:00
|
|
|
e.ready.Add(1)
|
|
|
|
|
2019-09-18 18:07:39 +00:00
|
|
|
go e.listenHeadChanges(ctx)
|
2019-09-18 11:01:52 +00:00
|
|
|
|
|
|
|
e.ready.Wait()
|
2019-09-03 17:45:55 +00:00
|
|
|
|
2019-09-03 17:59:32 +00:00
|
|
|
// TODO: cleanup/gc goroutine
|
2019-09-03 17:45:55 +00:00
|
|
|
|
|
|
|
return e
|
|
|
|
}
|
|
|
|
|
2019-09-18 18:07:39 +00:00
|
|
|
func (e *Events) listenHeadChanges(ctx context.Context) {
|
|
|
|
for {
|
|
|
|
if err := e.listenHeadChangesOnce(ctx); err != nil {
|
|
|
|
log.Errorf("listen head changes errored: %s", err)
|
|
|
|
} else {
|
|
|
|
log.Warn("listenHeadChanges quit")
|
|
|
|
}
|
2019-09-18 11:01:52 +00:00
|
|
|
if ctx.Err() != nil {
|
|
|
|
log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
time.Sleep(time.Second)
|
|
|
|
log.Info("restarting listenHeadChanges")
|
2019-09-18 18:07:39 +00:00
|
|
|
}
|
2019-09-18 11:01:52 +00:00
|
|
|
}
|
|
|
|
|
2019-09-18 18:07:39 +00:00
|
|
|
func (e *Events) listenHeadChangesOnce(ctx context.Context) error {
|
2019-09-27 11:37:44 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
defer cancel()
|
|
|
|
|
2019-09-18 11:01:52 +00:00
|
|
|
notifs, err := e.api.ChainNotify(ctx)
|
|
|
|
if err != nil {
|
|
|
|
// TODO: retry
|
2019-09-18 18:07:39 +00:00
|
|
|
return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err)
|
2019-09-18 11:01:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
cur, ok := <-notifs // TODO: timeout?
|
|
|
|
if !ok {
|
2019-09-18 18:07:39 +00:00
|
|
|
return xerrors.Errorf("notification channel closed")
|
2019-09-18 11:01:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if len(cur) != 1 {
|
2019-09-18 18:07:39 +00:00
|
|
|
return xerrors.Errorf("unexpected initial head notification length: %d", len(cur))
|
2019-09-18 11:01:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if cur[0].Type != store.HCCurrent {
|
2019-09-18 18:07:39 +00:00
|
|
|
return xerrors.Errorf("expected first head notification type to be 'current', was '%s'", cur[0].Type)
|
2019-09-18 11:01:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if err := e.tsc.add(cur[0].Val); err != nil {
|
2019-09-18 18:07:39 +00:00
|
|
|
log.Warn("tsc.add: adding current tipset failed: %w", err)
|
2019-09-18 11:01:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
e.readyOnce.Do(func() {
|
2020-06-24 18:10:52 +00:00
|
|
|
e.lastTs = cur[0].Val
|
2020-05-08 08:24:17 +00:00
|
|
|
|
2019-09-18 11:01:52 +00:00
|
|
|
e.ready.Done()
|
|
|
|
})
|
|
|
|
|
|
|
|
for notif := range notifs {
|
|
|
|
var rev, app []*types.TipSet
|
|
|
|
for _, notif := range notif {
|
|
|
|
switch notif.Type {
|
|
|
|
case store.HCRevert:
|
|
|
|
rev = append(rev, notif.Val)
|
|
|
|
case store.HCApply:
|
|
|
|
app = append(app, notif.Val)
|
|
|
|
default:
|
|
|
|
log.Warnf("unexpected head change notification type: '%s'", notif.Type)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := e.headChange(rev, app); err != nil {
|
|
|
|
log.Warnf("headChange failed: %s", err)
|
|
|
|
}
|
2020-05-13 22:48:36 +00:00
|
|
|
|
|
|
|
// sync with fake chainstore (for tests)
|
2020-05-15 09:17:13 +00:00
|
|
|
if fcs, ok := e.api.(interface{ notifDone() }); ok {
|
2020-05-13 22:48:36 +00:00
|
|
|
fcs.notifDone()
|
|
|
|
}
|
2019-09-18 11:01:52 +00:00
|
|
|
}
|
|
|
|
|
2019-09-18 18:07:39 +00:00
|
|
|
return nil
|
2019-09-18 11:01:52 +00:00
|
|
|
}
|
|
|
|
|
2019-09-03 17:45:55 +00:00
|
|
|
func (e *Events) headChange(rev, app []*types.TipSet) error {
|
2019-09-03 17:59:32 +00:00
|
|
|
if len(app) == 0 {
|
|
|
|
return xerrors.New("events.headChange expected at least one applied tipset")
|
|
|
|
}
|
|
|
|
|
2019-09-03 17:45:55 +00:00
|
|
|
e.lk.Lock()
|
|
|
|
defer e.lk.Unlock()
|
|
|
|
|
2019-09-03 17:59:32 +00:00
|
|
|
if err := e.headChangeAt(rev, app); err != nil {
|
|
|
|
return err
|
2019-09-03 17:45:55 +00:00
|
|
|
}
|
|
|
|
|
2020-06-24 18:10:52 +00:00
|
|
|
return e.processHeadChangeEvent(rev, app)
|
2019-09-03 17:59:32 +00:00
|
|
|
}
|