Merge pull request #179 from filecoin-project/feat/events
Chain event helper
This commit is contained in:
commit
c034f0d763
@ -7,4 +7,6 @@ const UnixfsLinksPerLevel = 1024
|
|||||||
|
|
||||||
const SectorSize = 1024
|
const SectorSize = 1024
|
||||||
|
|
||||||
|
const ForkLengthThreshold = 20
|
||||||
|
|
||||||
// TODO: Move other important consts here
|
// TODO: Move other important consts here
|
||||||
|
96
chain/events/events.go
Normal file
96
chain/events/events.go
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
package events
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
logging "github.com/ipfs/go-log"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-lotus/build"
|
||||||
|
"github.com/filecoin-project/go-lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("events")
|
||||||
|
|
||||||
|
// `curH`-`ts.Height` = `confidence`
|
||||||
|
type HeightHandler func(ts *types.TipSet, curH uint64) error
|
||||||
|
type RevertHandler func(ts *types.TipSet) error
|
||||||
|
|
||||||
|
type heightHandler struct {
|
||||||
|
confidence int
|
||||||
|
|
||||||
|
handle HeightHandler
|
||||||
|
revert RevertHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
type eventChainStore interface {
|
||||||
|
SubscribeHeadChanges(f func(rev, app []*types.TipSet) error)
|
||||||
|
|
||||||
|
GetHeaviestTipSet() *types.TipSet
|
||||||
|
MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type Events struct {
|
||||||
|
cs eventChainStore
|
||||||
|
|
||||||
|
tsc *tipSetCache
|
||||||
|
lk sync.Mutex
|
||||||
|
|
||||||
|
heightEvents
|
||||||
|
calledEvents
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEvents(cs eventChainStore) *Events {
|
||||||
|
gcConfidence := 2 * build.ForkLengthThreshold
|
||||||
|
|
||||||
|
tsc := newTSCache(gcConfidence)
|
||||||
|
|
||||||
|
e := &Events{
|
||||||
|
cs: cs,
|
||||||
|
|
||||||
|
tsc: tsc,
|
||||||
|
|
||||||
|
heightEvents: heightEvents{
|
||||||
|
tsc: tsc,
|
||||||
|
gcConfidence: uint64(gcConfidence),
|
||||||
|
|
||||||
|
heightTriggers: map[uint64]*heightHandler{},
|
||||||
|
htTriggerHeights: map[uint64][]uint64{},
|
||||||
|
htHeights: map[uint64][]uint64{},
|
||||||
|
},
|
||||||
|
|
||||||
|
calledEvents: calledEvents{
|
||||||
|
cs: cs,
|
||||||
|
tsc: tsc,
|
||||||
|
gcConfidence: uint64(gcConfidence),
|
||||||
|
|
||||||
|
confQueue: map[triggerH]map[msgH][]*queuedEvent{},
|
||||||
|
revertQueue: map[msgH][]triggerH{},
|
||||||
|
triggers: map[triggerId]*callHandler{},
|
||||||
|
callTuples: map[callTuple][]triggerId{},
|
||||||
|
timeouts: map[uint64]map[triggerId]int{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = e.tsc.add(cs.GetHeaviestTipSet())
|
||||||
|
cs.SubscribeHeadChanges(e.headChange)
|
||||||
|
|
||||||
|
// TODO: cleanup/gc goroutine
|
||||||
|
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Events) headChange(rev, app []*types.TipSet) error {
|
||||||
|
if len(app) == 0 {
|
||||||
|
return xerrors.New("events.headChange expected at least one applied tipset")
|
||||||
|
}
|
||||||
|
|
||||||
|
e.lk.Lock()
|
||||||
|
defer e.lk.Unlock()
|
||||||
|
|
||||||
|
if err := e.headChangeAt(rev, app); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.headChangeCalled(rev, app)
|
||||||
|
}
|
330
chain/events/events_called.go
Normal file
330
chain/events/events_called.go
Normal file
@ -0,0 +1,330 @@
|
|||||||
|
package events
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-lotus/chain/address"
|
||||||
|
"github.com/filecoin-project/go-lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
const NoTimeout = math.MaxUint64
|
||||||
|
|
||||||
|
type triggerId = uint64
|
||||||
|
|
||||||
|
// msgH is the block height at which a message was present / event has happened
|
||||||
|
type msgH = uint64
|
||||||
|
|
||||||
|
// triggerH is the block height at which the listener will be notified about the
|
||||||
|
// message (msgH+confidence)
|
||||||
|
type triggerH = uint64
|
||||||
|
|
||||||
|
// `ts` is the tipset, in which the `msg` is included.
|
||||||
|
// `curH`-`ts.Height` = `confidence`
|
||||||
|
type CalledHandler func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error)
|
||||||
|
|
||||||
|
// CheckFunc is used for atomicity guarantees. If the condition the callbacks
|
||||||
|
// wait for has already happened in tipset `ts`
|
||||||
|
//
|
||||||
|
// If `done` is true, timeout won't be triggered
|
||||||
|
// If `more` is false, no messages will be sent to CalledHandler (RevertHandler
|
||||||
|
// may still be called)
|
||||||
|
type CheckFunc func(ts *types.TipSet) (done bool, more bool, err error)
|
||||||
|
|
||||||
|
type callHandler struct {
|
||||||
|
confidence int
|
||||||
|
timeout uint64
|
||||||
|
|
||||||
|
disabled bool // TODO: GC after gcConfidence reached
|
||||||
|
|
||||||
|
handle CalledHandler
|
||||||
|
revert RevertHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
type queuedEvent struct {
|
||||||
|
trigger triggerId
|
||||||
|
|
||||||
|
h uint64
|
||||||
|
msg *types.Message
|
||||||
|
|
||||||
|
called bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type calledEvents struct {
|
||||||
|
cs eventChainStore
|
||||||
|
tsc *tipSetCache
|
||||||
|
gcConfidence uint64
|
||||||
|
|
||||||
|
lk sync.Mutex
|
||||||
|
|
||||||
|
ctr triggerId
|
||||||
|
|
||||||
|
triggers map[triggerId]*callHandler
|
||||||
|
callTuples map[callTuple][]triggerId
|
||||||
|
|
||||||
|
// maps block heights to events
|
||||||
|
// [triggerH][msgH][event]
|
||||||
|
confQueue map[triggerH]map[msgH][]*queuedEvent
|
||||||
|
|
||||||
|
// [msgH][triggerH]
|
||||||
|
revertQueue map[msgH][]triggerH
|
||||||
|
|
||||||
|
// [timeoutH+confidence][triggerId]{calls}
|
||||||
|
timeouts map[uint64]map[triggerId]int
|
||||||
|
}
|
||||||
|
|
||||||
|
type callTuple struct {
|
||||||
|
actor address.Address
|
||||||
|
method uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *calledEvents) headChangeCalled(rev, app []*types.TipSet) error {
|
||||||
|
e.lk.Lock()
|
||||||
|
defer e.lk.Unlock()
|
||||||
|
|
||||||
|
for _, ts := range rev {
|
||||||
|
e.handleReverts(ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
tail := len(app) - 1
|
||||||
|
for i := range app {
|
||||||
|
ts := app[tail-i]
|
||||||
|
// called triggers
|
||||||
|
|
||||||
|
e.checkNewCalls(ts)
|
||||||
|
e.applyWithConfidence(ts)
|
||||||
|
e.applyTimeouts(ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *calledEvents) handleReverts(ts *types.TipSet) {
|
||||||
|
reverts, ok := e.revertQueue[ts.Height()]
|
||||||
|
if !ok {
|
||||||
|
return // nothing to do
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, triggerH := range reverts {
|
||||||
|
toRevert := e.confQueue[triggerH][ts.Height()]
|
||||||
|
for _, event := range toRevert {
|
||||||
|
if !event.called {
|
||||||
|
continue // event wasn't apply()-ied yet
|
||||||
|
}
|
||||||
|
|
||||||
|
trigger := e.triggers[event.trigger]
|
||||||
|
|
||||||
|
if err := trigger.revert(ts); err != nil {
|
||||||
|
log.Errorf("reverting chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, ts.Height(), triggerH, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
delete(e.confQueue[triggerH], ts.Height())
|
||||||
|
}
|
||||||
|
delete(e.revertQueue, ts.Height())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *calledEvents) checkNewCalls(ts *types.TipSet) {
|
||||||
|
e.messagesForTs(ts, func(msg *types.Message) {
|
||||||
|
// TODO: do we have to verify the receipt, or are messages on chain
|
||||||
|
// guaranteed to be successful?
|
||||||
|
|
||||||
|
ct := callTuple{
|
||||||
|
actor: msg.To,
|
||||||
|
method: msg.Method,
|
||||||
|
}
|
||||||
|
|
||||||
|
triggers, ok := e.callTuples[ct]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tid := range triggers {
|
||||||
|
e.queueForConfidence(tid, msg, ts)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *calledEvents) queueForConfidence(triggerId uint64, msg *types.Message, ts *types.TipSet) {
|
||||||
|
trigger := e.triggers[triggerId]
|
||||||
|
triggerH := ts.Height() + uint64(trigger.confidence)
|
||||||
|
|
||||||
|
byOrigH, ok := e.confQueue[triggerH]
|
||||||
|
if !ok {
|
||||||
|
byOrigH = map[uint64][]*queuedEvent{}
|
||||||
|
e.confQueue[triggerH] = byOrigH
|
||||||
|
}
|
||||||
|
|
||||||
|
byOrigH[ts.Height()] = append(byOrigH[ts.Height()], &queuedEvent{
|
||||||
|
trigger: triggerId,
|
||||||
|
h: ts.Height(),
|
||||||
|
msg: msg,
|
||||||
|
})
|
||||||
|
|
||||||
|
e.revertQueue[ts.Height()] = append(e.revertQueue[ts.Height()], triggerH) // todo: dedupe?
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *calledEvents) applyWithConfidence(ts *types.TipSet) {
|
||||||
|
byOrigH, ok := e.confQueue[ts.Height()]
|
||||||
|
if !ok {
|
||||||
|
return // no triggers at thin height
|
||||||
|
}
|
||||||
|
|
||||||
|
for origH, events := range byOrigH {
|
||||||
|
triggerTs, err := e.tsc.get(origH)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("events: applyWithConfidence didn't find tipset for event; wanted %d; current %d", origH, ts.Height())
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, event := range events {
|
||||||
|
trigger := e.triggers[event.trigger]
|
||||||
|
if trigger.disabled {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
more, err := trigger.handle(event.msg, triggerTs, ts.Height())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("chain trigger (call %s.%d() @H %d, called @ %d) failed: %s", event.msg.To, event.msg.Method, origH, ts.Height(), err)
|
||||||
|
continue // don't revert failed calls
|
||||||
|
}
|
||||||
|
|
||||||
|
event.called = true
|
||||||
|
|
||||||
|
touts, ok := e.timeouts[trigger.timeout]
|
||||||
|
if ok {
|
||||||
|
touts[event.trigger]++
|
||||||
|
}
|
||||||
|
|
||||||
|
trigger.disabled = !more
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *calledEvents) applyTimeouts(ts *types.TipSet) {
|
||||||
|
triggers, ok := e.timeouts[ts.Height()]
|
||||||
|
if !ok {
|
||||||
|
return // nothing to do
|
||||||
|
}
|
||||||
|
|
||||||
|
for triggerId, calls := range triggers {
|
||||||
|
if calls > 0 {
|
||||||
|
continue // don't timeout if the method was called
|
||||||
|
}
|
||||||
|
trigger := e.triggers[triggerId]
|
||||||
|
if trigger.disabled {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
timeoutTs, err := e.tsc.get(ts.Height() - uint64(trigger.confidence))
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("events: applyTimeouts didn't find tipset for event; wanted %d; current %d", ts.Height()-uint64(trigger.confidence), ts.Height())
|
||||||
|
}
|
||||||
|
|
||||||
|
more, err := trigger.handle(nil, timeoutTs, ts.Height())
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("chain trigger (call @H %d, called @ %d) failed: %s", timeoutTs.Height(), ts.Height(), err)
|
||||||
|
continue // don't revert failed calls
|
||||||
|
}
|
||||||
|
|
||||||
|
trigger.disabled = !more // allows messages after timeout
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *calledEvents) messagesForTs(ts *types.TipSet, consume func(*types.Message)) {
|
||||||
|
seen := map[cid.Cid]struct{}{}
|
||||||
|
|
||||||
|
for _, tsb := range ts.Blocks() {
|
||||||
|
bmsgs, smsgs, err := e.cs.MessagesForBlock(tsb)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("messagesForTs MessagesForBlock failed (ts.H=%d, Bcid:%s, B.Mcid:%s): %s", ts.Height(), tsb.Cid(), tsb.Messages, err)
|
||||||
|
// this is quite bad, but probably better than missing all the other updates
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, m := range bmsgs {
|
||||||
|
_, ok := seen[m.Cid()]
|
||||||
|
if ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
seen[m.Cid()] = struct{}{}
|
||||||
|
|
||||||
|
consume(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, m := range smsgs {
|
||||||
|
_, ok := seen[m.Message.Cid()]
|
||||||
|
if ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
seen[m.Message.Cid()] = struct{}{}
|
||||||
|
|
||||||
|
consume(&m.Message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Called registers a callbacks which are triggered when a specified method is
|
||||||
|
// called on an actor, or a timeout is reached.
|
||||||
|
//
|
||||||
|
// * `CheckFunc` callback is invoked immediately with a recent tipset, it
|
||||||
|
// returns two booleans - `done`, and `more`.
|
||||||
|
//
|
||||||
|
// * `done` should be true when some on-chain action we are waiting for has
|
||||||
|
// happened. When `done` is set to true, timeout trigger is disabled.
|
||||||
|
//
|
||||||
|
// * `more` should be false when we don't want to receive new notifications
|
||||||
|
// through CalledHandler. Note that notifications may still be delivered to
|
||||||
|
// RevertHandler
|
||||||
|
//
|
||||||
|
// * `CalledHandler` is called when the specified event was observed on-chain,
|
||||||
|
// and a confidence threshold was reached, or the specified `timeout` height
|
||||||
|
// was reached with no events observed. When this callback is invoked on a
|
||||||
|
// timeout, `msg` is set to nil. This callback returns a boolean specifying
|
||||||
|
// whether further notifications should be sent, like `more` return param
|
||||||
|
// from `CheckFunc` above.
|
||||||
|
//
|
||||||
|
// * `RevertHandler` is called after apply handler, when we drop the tipset
|
||||||
|
// containing the message. The tipset passed as the argument is the tipset
|
||||||
|
// that is being dropped. Note that the message dropped may be re-applied
|
||||||
|
// in a different tipset in small amount of time.
|
||||||
|
func (e *calledEvents) Called(check CheckFunc, hnd CalledHandler, rev RevertHandler, confidence int, timeout uint64, actor address.Address, method uint64) error {
|
||||||
|
e.lk.Lock()
|
||||||
|
defer e.lk.Unlock()
|
||||||
|
|
||||||
|
done, more, err := check(e.tsc.best())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if done {
|
||||||
|
timeout = NoTimeout
|
||||||
|
}
|
||||||
|
|
||||||
|
id := e.ctr
|
||||||
|
e.ctr++
|
||||||
|
|
||||||
|
e.triggers[id] = &callHandler{
|
||||||
|
confidence: confidence,
|
||||||
|
timeout: timeout + uint64(confidence),
|
||||||
|
|
||||||
|
disabled: !more,
|
||||||
|
|
||||||
|
handle: hnd,
|
||||||
|
revert: rev,
|
||||||
|
}
|
||||||
|
|
||||||
|
ct := callTuple{
|
||||||
|
actor: actor,
|
||||||
|
method: method,
|
||||||
|
}
|
||||||
|
|
||||||
|
e.callTuples[ct] = append(e.callTuples[ct], id)
|
||||||
|
if timeout != NoTimeout {
|
||||||
|
if e.timeouts[timeout+uint64(confidence)] == nil {
|
||||||
|
e.timeouts[timeout+uint64(confidence)] = map[uint64]int{}
|
||||||
|
}
|
||||||
|
e.timeouts[timeout+uint64(confidence)][id] = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
119
chain/events/events_height.go
Normal file
119
chain/events/events_height.go
Normal file
@ -0,0 +1,119 @@
|
|||||||
|
package events
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
type heightEvents struct {
|
||||||
|
lk sync.Mutex
|
||||||
|
tsc *tipSetCache
|
||||||
|
gcConfidence uint64
|
||||||
|
|
||||||
|
ctr triggerId
|
||||||
|
|
||||||
|
heightTriggers map[triggerId]*heightHandler
|
||||||
|
|
||||||
|
htTriggerHeights map[triggerH][]triggerId
|
||||||
|
htHeights map[msgH][]triggerId
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
|
||||||
|
e.lk.Lock()
|
||||||
|
defer e.lk.Unlock()
|
||||||
|
|
||||||
|
// highest tipset is always the first (see cs.ReorgOps)
|
||||||
|
newH := app[0].Height()
|
||||||
|
|
||||||
|
for _, ts := range rev {
|
||||||
|
// TODO: log error if h below gcconfidence
|
||||||
|
// revert height-based triggers
|
||||||
|
|
||||||
|
for _, tid := range e.htHeights[ts.Height()] {
|
||||||
|
// don't revert if newH is above this ts
|
||||||
|
if newH >= ts.Height() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err := e.heightTriggers[tid].revert(ts)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("reverting chain trigger (@H %d): %s", ts.Height(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := e.tsc.revert(ts); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tail := len(app) - 1
|
||||||
|
for i := range app {
|
||||||
|
ts := app[tail-i]
|
||||||
|
|
||||||
|
if err := e.tsc.add(ts); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// height triggers
|
||||||
|
|
||||||
|
for _, tid := range e.htTriggerHeights[ts.Height()] {
|
||||||
|
hnd := e.heightTriggers[tid]
|
||||||
|
triggerH := ts.Height() - uint64(hnd.confidence)
|
||||||
|
|
||||||
|
incTs, err := e.tsc.get(triggerH)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := hnd.handle(incTs, ts.Height()); err != nil {
|
||||||
|
msgInfo := ""
|
||||||
|
log.Errorf("chain trigger (%s@H %d, called @ %d) failed: %s", msgInfo, triggerH, ts.Height(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ChainAt invokes the specified `HeightHandler` when the chain reaches the
|
||||||
|
// specified height+confidence threshold. If the chain is rolled-back under the
|
||||||
|
// specified height, `RevertHandler` will be called.
|
||||||
|
func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence int, h uint64) error {
|
||||||
|
e.lk.Lock()
|
||||||
|
defer e.lk.Unlock()
|
||||||
|
|
||||||
|
bestH := e.tsc.best().Height()
|
||||||
|
|
||||||
|
if bestH >= h+uint64(confidence) {
|
||||||
|
ts, err := e.tsc.get(h)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("events.ChainAt: calling HandleFunc with nil tipset, not found in cache: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := hnd(ts, bestH); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if bestH >= h+uint64(confidence)+e.gcConfidence {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
triggerAt := h + uint64(confidence)
|
||||||
|
|
||||||
|
id := e.ctr
|
||||||
|
e.ctr++
|
||||||
|
|
||||||
|
e.heightTriggers[id] = &heightHandler{
|
||||||
|
confidence: confidence,
|
||||||
|
|
||||||
|
handle: hnd,
|
||||||
|
revert: rev,
|
||||||
|
}
|
||||||
|
|
||||||
|
e.htHeights[h] = append(e.htHeights[h], id)
|
||||||
|
e.htTriggerHeights[triggerAt] = append(e.htTriggerHeights[triggerAt], id)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
511
chain/events/events_test.go
Normal file
511
chain/events/events_test.go
Normal file
@ -0,0 +1,511 @@
|
|||||||
|
package events
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
"github.com/multiformats/go-multihash"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-lotus/build"
|
||||||
|
"github.com/filecoin-project/go-lotus/chain/address"
|
||||||
|
"github.com/filecoin-project/go-lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
var dummyCid cid.Cid
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
dummyCid, _ = cid.Parse("bafkqaaa")
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeMsg struct {
|
||||||
|
bmsgs []*types.Message
|
||||||
|
smsgs []*types.SignedMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeCS struct {
|
||||||
|
t *testing.T
|
||||||
|
h uint64
|
||||||
|
tsc *tipSetCache
|
||||||
|
|
||||||
|
msgs map[cid.Cid]fakeMsg
|
||||||
|
|
||||||
|
sub func(rev, app []*types.TipSet) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet {
|
||||||
|
ts, err := types.NewTipSet([]*types.BlockHeader{
|
||||||
|
{
|
||||||
|
Height: h,
|
||||||
|
|
||||||
|
StateRoot: dummyCid,
|
||||||
|
Messages: msgcid,
|
||||||
|
MessageReceipts: dummyCid,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return ts
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fcs *fakeCS) SubscribeHeadChanges(f func(rev, app []*types.TipSet) error) {
|
||||||
|
if fcs.sub != nil {
|
||||||
|
fcs.t.Fatal("sub should be nil")
|
||||||
|
}
|
||||||
|
fcs.sub = f
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fcs *fakeCS) GetHeaviestTipSet() *types.TipSet {
|
||||||
|
return fcs.tsc.best()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fcs *fakeCS) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
||||||
|
ms, ok := fcs.msgs[b.Messages]
|
||||||
|
if ok {
|
||||||
|
return ms.bmsgs, ms.smsgs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return []*types.Message{}, []*types.SignedMessage{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fcs *fakeCS) fakeMsgs(m fakeMsg) cid.Cid {
|
||||||
|
n := len(fcs.msgs)
|
||||||
|
c, err := cid.Prefix{
|
||||||
|
Version: 1,
|
||||||
|
Codec: cid.Raw,
|
||||||
|
MhType: multihash.IDENTITY,
|
||||||
|
MhLength: -1,
|
||||||
|
}.Sum([]byte(fmt.Sprintf("%d", n)))
|
||||||
|
require.NoError(fcs.t, err)
|
||||||
|
|
||||||
|
fcs.msgs[c] = m
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid) { // todo: allow msgs
|
||||||
|
if fcs.sub == nil {
|
||||||
|
fcs.t.Fatal("sub not be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
var revs []*types.TipSet
|
||||||
|
for i := 0; i < rev; i++ {
|
||||||
|
ts := fcs.tsc.best()
|
||||||
|
|
||||||
|
revs = append(revs, ts)
|
||||||
|
fcs.h--
|
||||||
|
require.NoError(fcs.t, fcs.tsc.revert(ts))
|
||||||
|
}
|
||||||
|
|
||||||
|
apps := make([]*types.TipSet, app)
|
||||||
|
for i := 0; i < app; i++ {
|
||||||
|
fcs.h++
|
||||||
|
|
||||||
|
mc, _ := msgs[i]
|
||||||
|
if mc == cid.Undef {
|
||||||
|
mc = dummyCid
|
||||||
|
}
|
||||||
|
|
||||||
|
ts := makeTs(fcs.t, fcs.h, mc)
|
||||||
|
require.NoError(fcs.t, fcs.tsc.add(ts))
|
||||||
|
|
||||||
|
apps[app-i-1] = ts
|
||||||
|
}
|
||||||
|
|
||||||
|
err := fcs.sub(revs, apps)
|
||||||
|
require.NoError(fcs.t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ eventChainStore = &fakeCS{}
|
||||||
|
|
||||||
|
func TestAt(t *testing.T) {
|
||||||
|
fcs := &fakeCS{
|
||||||
|
t: t,
|
||||||
|
h: 1,
|
||||||
|
tsc: newTSCache(2 * build.ForkLengthThreshold),
|
||||||
|
}
|
||||||
|
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
|
||||||
|
|
||||||
|
events := NewEvents(fcs)
|
||||||
|
|
||||||
|
var applied bool
|
||||||
|
var reverted bool
|
||||||
|
|
||||||
|
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error {
|
||||||
|
require.Equal(t, 5, int(ts.Height()))
|
||||||
|
require.Equal(t, 8, int(curH))
|
||||||
|
applied = true
|
||||||
|
return nil
|
||||||
|
}, func(ts *types.TipSet) error {
|
||||||
|
reverted = true
|
||||||
|
return nil
|
||||||
|
}, 3, 5)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fcs.advance(0, 3, nil)
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
fcs.advance(0, 3, nil)
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
fcs.advance(0, 3, nil)
|
||||||
|
require.Equal(t, true, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
applied = false
|
||||||
|
|
||||||
|
fcs.advance(0, 3, nil)
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
fcs.advance(10, 1, nil)
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, true, reverted)
|
||||||
|
reverted = false
|
||||||
|
|
||||||
|
fcs.advance(0, 1, nil)
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
fcs.advance(0, 2, nil)
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
fcs.advance(0, 1, nil) // 8
|
||||||
|
require.Equal(t, true, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCalled(t *testing.T) {
|
||||||
|
fcs := &fakeCS{
|
||||||
|
t: t,
|
||||||
|
h: 1,
|
||||||
|
|
||||||
|
msgs: map[cid.Cid]fakeMsg{},
|
||||||
|
tsc: newTSCache(2 * build.ForkLengthThreshold),
|
||||||
|
}
|
||||||
|
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
|
||||||
|
|
||||||
|
events := NewEvents(fcs)
|
||||||
|
|
||||||
|
t0123, err := address.NewFromString("t0123")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
more := true
|
||||||
|
var applied, reverted bool
|
||||||
|
var appliedMsg *types.Message
|
||||||
|
var appliedTs *types.TipSet
|
||||||
|
var appliedH uint64
|
||||||
|
|
||||||
|
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
|
||||||
|
return false, true, nil
|
||||||
|
}, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) {
|
||||||
|
applied = true
|
||||||
|
appliedMsg = msg
|
||||||
|
appliedTs = ts
|
||||||
|
appliedH = curH
|
||||||
|
return more, nil
|
||||||
|
}, func(ts *types.TipSet) error {
|
||||||
|
reverted = true
|
||||||
|
return nil
|
||||||
|
}, 3, 20, t0123, 5)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// create few blocks to make sure nothing get's randomly called
|
||||||
|
|
||||||
|
fcs.advance(0, 4, nil) // H=5
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
// create blocks with message (but below confidence threshold)
|
||||||
|
|
||||||
|
fcs.advance(0, 3, map[int]cid.Cid{ // msg at H=6; H=8 (confidence=2)
|
||||||
|
0: fcs.fakeMsgs(fakeMsg{
|
||||||
|
bmsgs: []*types.Message{
|
||||||
|
{To: t0123, Method: 5, Nonce: 1},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
// create additional block so we are above confidence threshold
|
||||||
|
|
||||||
|
fcs.advance(0, 1, nil) // H=9 (confidence=3, apply)
|
||||||
|
|
||||||
|
require.Equal(t, true, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
applied = false
|
||||||
|
|
||||||
|
require.Equal(t, uint64(6), appliedTs.Height())
|
||||||
|
require.Equal(t, "bafkqaajq", appliedTs.Blocks()[0].Messages.String())
|
||||||
|
require.Equal(t, uint64(9), appliedH)
|
||||||
|
require.Equal(t, t0123, appliedMsg.To)
|
||||||
|
require.Equal(t, uint64(1), appliedMsg.Nonce)
|
||||||
|
require.Equal(t, uint64(5), appliedMsg.Method)
|
||||||
|
|
||||||
|
// revert some blocks, keep the message
|
||||||
|
|
||||||
|
fcs.advance(3, 1, nil) // H=7 (confidence=1)
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
// revert the message
|
||||||
|
|
||||||
|
fcs.advance(2, 1, nil) // H=6, we reverted ts with the msg
|
||||||
|
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, true, reverted)
|
||||||
|
reverted = false
|
||||||
|
|
||||||
|
// send new message on different height
|
||||||
|
|
||||||
|
n2msg := fcs.fakeMsgs(fakeMsg{
|
||||||
|
bmsgs: []*types.Message{
|
||||||
|
{To: t0123, Method: 5, Nonce: 2},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
fcs.advance(0, 4, map[int]cid.Cid{ // msg at H=7; H=10 (confidence=3)
|
||||||
|
0: n2msg,
|
||||||
|
})
|
||||||
|
|
||||||
|
require.Equal(t, true, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
applied = false
|
||||||
|
|
||||||
|
require.Equal(t, uint64(7), appliedTs.Height())
|
||||||
|
require.Equal(t, "bafkqaajr", appliedTs.Blocks()[0].Messages.String())
|
||||||
|
require.Equal(t, uint64(10), appliedH)
|
||||||
|
require.Equal(t, t0123, appliedMsg.To)
|
||||||
|
require.Equal(t, uint64(2), appliedMsg.Nonce)
|
||||||
|
require.Equal(t, uint64(5), appliedMsg.Method)
|
||||||
|
|
||||||
|
// revert and apply at different height
|
||||||
|
|
||||||
|
fcs.advance(4, 5, map[int]cid.Cid{ // msg at H=8; H=11 (confidence=3)
|
||||||
|
1: n2msg,
|
||||||
|
})
|
||||||
|
|
||||||
|
// TODO: We probably don't want to call revert/apply, as restarting certain
|
||||||
|
// actions may be expensive, and in this case the message is still
|
||||||
|
// on-chain, just at different height
|
||||||
|
require.Equal(t, true, applied)
|
||||||
|
require.Equal(t, true, reverted)
|
||||||
|
reverted = false
|
||||||
|
applied = false
|
||||||
|
|
||||||
|
require.Equal(t, uint64(8), appliedTs.Height())
|
||||||
|
require.Equal(t, "bafkqaajr", appliedTs.Blocks()[0].Messages.String())
|
||||||
|
require.Equal(t, uint64(11), appliedH)
|
||||||
|
require.Equal(t, t0123, appliedMsg.To)
|
||||||
|
require.Equal(t, uint64(2), appliedMsg.Nonce)
|
||||||
|
require.Equal(t, uint64(5), appliedMsg.Method)
|
||||||
|
|
||||||
|
// call method again
|
||||||
|
|
||||||
|
fcs.advance(0, 4, map[int]cid.Cid{ // msg at H=12; H=15
|
||||||
|
0: n2msg,
|
||||||
|
})
|
||||||
|
|
||||||
|
require.Equal(t, true, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
applied = false
|
||||||
|
|
||||||
|
// send and revert below confidence, then cross confidence
|
||||||
|
fcs.advance(0, 1, map[int]cid.Cid{ // msg at H=16; H=16
|
||||||
|
0: fcs.fakeMsgs(fakeMsg{
|
||||||
|
bmsgs: []*types.Message{
|
||||||
|
{To: t0123, Method: 5, Nonce: 3},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
|
||||||
|
fcs.advance(1, 4, nil) // H=19, but message reverted
|
||||||
|
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
// test timeout (it's set to 20 in the call to `events.Called` above)
|
||||||
|
|
||||||
|
fcs.advance(0, 6, nil) // H=25
|
||||||
|
|
||||||
|
require.Equal(t, false, applied) // not calling timeout as we received messages
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
// test unregistering with more
|
||||||
|
|
||||||
|
more = false
|
||||||
|
fcs.advance(0, 4, map[int]cid.Cid{ // msg at H=26; H=29
|
||||||
|
0: fcs.fakeMsgs(fakeMsg{
|
||||||
|
bmsgs: []*types.Message{
|
||||||
|
{To: t0123, Method: 5, Nonce: 4}, // this signals we don't want more
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
|
||||||
|
require.Equal(t, true, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
applied = false
|
||||||
|
|
||||||
|
fcs.advance(0, 4, map[int]cid.Cid{ // msg at H=26; H=29
|
||||||
|
0: fcs.fakeMsgs(fakeMsg{
|
||||||
|
bmsgs: []*types.Message{
|
||||||
|
{To: t0123, Method: 5, Nonce: 5},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
|
||||||
|
require.Equal(t, false, applied) // should not get any further notifications
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
// revert after disabled
|
||||||
|
|
||||||
|
fcs.advance(5, 1, nil) // try reverting msg sent after disabling
|
||||||
|
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, false, reverted)
|
||||||
|
|
||||||
|
fcs.advance(5, 1, nil) // try reverting msg sent before disabling
|
||||||
|
|
||||||
|
require.Equal(t, false, applied)
|
||||||
|
require.Equal(t, true, reverted)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCalledTimeout(t *testing.T) {
|
||||||
|
fcs := &fakeCS{
|
||||||
|
t: t,
|
||||||
|
h: 1,
|
||||||
|
|
||||||
|
msgs: map[cid.Cid]fakeMsg{},
|
||||||
|
tsc: newTSCache(2 * build.ForkLengthThreshold),
|
||||||
|
}
|
||||||
|
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
|
||||||
|
|
||||||
|
events := NewEvents(fcs)
|
||||||
|
|
||||||
|
t0123, err := address.NewFromString("t0123")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
called := false
|
||||||
|
|
||||||
|
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
|
||||||
|
return false, true, nil
|
||||||
|
}, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) {
|
||||||
|
called = true
|
||||||
|
require.Nil(t, msg)
|
||||||
|
require.Equal(t, uint64(20), ts.Height())
|
||||||
|
require.Equal(t, uint64(23), curH)
|
||||||
|
return false, nil
|
||||||
|
}, func(ts *types.TipSet) error {
|
||||||
|
t.Fatal("revert on timeout")
|
||||||
|
return nil
|
||||||
|
}, 3, 20, t0123, 5)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fcs.advance(0, 21, nil)
|
||||||
|
require.False(t, called)
|
||||||
|
|
||||||
|
fcs.advance(0, 5, nil)
|
||||||
|
require.True(t, called)
|
||||||
|
called = false
|
||||||
|
|
||||||
|
// with check func reporting done
|
||||||
|
|
||||||
|
fcs = &fakeCS{
|
||||||
|
t: t,
|
||||||
|
h: 1,
|
||||||
|
|
||||||
|
msgs: map[cid.Cid]fakeMsg{},
|
||||||
|
tsc: newTSCache(2 * build.ForkLengthThreshold),
|
||||||
|
}
|
||||||
|
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
|
||||||
|
|
||||||
|
events = NewEvents(fcs)
|
||||||
|
|
||||||
|
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
|
||||||
|
return true, true, nil
|
||||||
|
}, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) {
|
||||||
|
called = true
|
||||||
|
require.Nil(t, msg)
|
||||||
|
require.Equal(t, uint64(20), ts.Height())
|
||||||
|
require.Equal(t, uint64(23), curH)
|
||||||
|
return false, nil
|
||||||
|
}, func(ts *types.TipSet) error {
|
||||||
|
t.Fatal("revert on timeout")
|
||||||
|
return nil
|
||||||
|
}, 3, 20, t0123, 5)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fcs.advance(0, 21, nil)
|
||||||
|
require.False(t, called)
|
||||||
|
|
||||||
|
fcs.advance(0, 5, nil)
|
||||||
|
require.False(t, called)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCalledOrder(t *testing.T) {
|
||||||
|
fcs := &fakeCS{
|
||||||
|
t: t,
|
||||||
|
h: 1,
|
||||||
|
|
||||||
|
msgs: map[cid.Cid]fakeMsg{},
|
||||||
|
tsc: newTSCache(2 * build.ForkLengthThreshold),
|
||||||
|
}
|
||||||
|
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
|
||||||
|
|
||||||
|
events := NewEvents(fcs)
|
||||||
|
|
||||||
|
t0123, err := address.NewFromString("t0123")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
at := 0
|
||||||
|
|
||||||
|
err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
|
||||||
|
return false, true, nil
|
||||||
|
}, func(msg *types.Message, ts *types.TipSet, curH uint64) (bool, error) {
|
||||||
|
switch at {
|
||||||
|
case 0:
|
||||||
|
require.Equal(t, uint64(1), msg.Nonce)
|
||||||
|
require.Equal(t, uint64(3), ts.Height())
|
||||||
|
case 1:
|
||||||
|
require.Equal(t, uint64(2), msg.Nonce)
|
||||||
|
require.Equal(t, uint64(4), ts.Height())
|
||||||
|
default:
|
||||||
|
t.Fatal("apply should only get called twice, at: ", at)
|
||||||
|
}
|
||||||
|
at++
|
||||||
|
return true, nil
|
||||||
|
}, func(ts *types.TipSet) error {
|
||||||
|
switch at {
|
||||||
|
case 2:
|
||||||
|
require.Equal(t, uint64(4), ts.Height())
|
||||||
|
case 3:
|
||||||
|
require.Equal(t, uint64(3), ts.Height())
|
||||||
|
default:
|
||||||
|
t.Fatal("revert should only get called twice, at: ", at)
|
||||||
|
}
|
||||||
|
at++
|
||||||
|
return nil
|
||||||
|
}, 3, 20, t0123, 5)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fcs.advance(0, 10, map[int]cid.Cid{
|
||||||
|
1: fcs.fakeMsgs(fakeMsg{
|
||||||
|
bmsgs: []*types.Message{
|
||||||
|
{To: t0123, Method: 5, Nonce: 1},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
2: fcs.fakeMsgs(fakeMsg{
|
||||||
|
bmsgs: []*types.Message{
|
||||||
|
{To: t0123, Method: 5, Nonce: 2},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
|
||||||
|
fcs.advance(9, 1, nil)
|
||||||
|
|
||||||
|
}
|
79
chain/events/tscache.go
Normal file
79
chain/events/tscache.go
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
package events
|
||||||
|
|
||||||
|
import (
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// tipSetCache implements a simple ring-buffer cache to keep track of recent
|
||||||
|
// tipsets
|
||||||
|
type tipSetCache struct {
|
||||||
|
cache []*types.TipSet
|
||||||
|
start int
|
||||||
|
len int
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTSCache(cap int) *tipSetCache {
|
||||||
|
return &tipSetCache{
|
||||||
|
cache: make([]*types.TipSet, cap),
|
||||||
|
start: 0,
|
||||||
|
len: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tsc *tipSetCache) add(ts *types.TipSet) error {
|
||||||
|
if tsc.len > 0 {
|
||||||
|
if tsc.cache[tsc.start].Height()+1 != ts.Height() {
|
||||||
|
return xerrors.Errorf("tipSetCache.add: expected new tipset height to be %d, was %d", tsc.cache[tsc.start].Height()+1, ts.Height())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tsc.start = (tsc.start + 1) % len(tsc.cache)
|
||||||
|
tsc.cache[tsc.start] = ts
|
||||||
|
if tsc.len < len(tsc.cache) {
|
||||||
|
tsc.len++
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tsc *tipSetCache) revert(ts *types.TipSet) error {
|
||||||
|
if tsc.len == 0 {
|
||||||
|
return xerrors.New("tipSetCache.revert: nothing to revert; cache is empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !tsc.cache[tsc.start].Equals(ts) {
|
||||||
|
return xerrors.New("tipSetCache.revert: revert tipset didn't match cache head")
|
||||||
|
}
|
||||||
|
|
||||||
|
tsc.cache[tsc.start] = nil
|
||||||
|
tsc.start = (tsc.start - 1) % len(tsc.cache)
|
||||||
|
tsc.len--
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) {
|
||||||
|
if tsc.len == 0 {
|
||||||
|
return nil, xerrors.New("tipSetCache.get: cache is empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
headH := tsc.cache[tsc.start].Height()
|
||||||
|
|
||||||
|
if height > headH {
|
||||||
|
return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache head: %d)", height, headH)
|
||||||
|
}
|
||||||
|
|
||||||
|
tailH := tsc.cache[(tsc.start-tsc.len+1)%len(tsc.cache)].Height()
|
||||||
|
|
||||||
|
if height < tailH {
|
||||||
|
// TODO: we can try to walk parents, but that shouldn't happen in
|
||||||
|
// practice, so it's probably not worth implementing
|
||||||
|
return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache tail: %d)", height, tailH)
|
||||||
|
}
|
||||||
|
|
||||||
|
return tsc.cache[int(height-tailH+1)%len(tsc.cache)], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tsc *tipSetCache) best() *types.TipSet {
|
||||||
|
return tsc.cache[tsc.start]
|
||||||
|
}
|
@ -26,8 +26,6 @@ import (
|
|||||||
"github.com/whyrusleeping/sharray"
|
"github.com/whyrusleeping/sharray"
|
||||||
)
|
)
|
||||||
|
|
||||||
const ForkLengthThreshold = 20
|
|
||||||
|
|
||||||
var log = logging.Logger("chain")
|
var log = logging.Logger("chain")
|
||||||
|
|
||||||
type Syncer struct {
|
type Syncer struct {
|
||||||
@ -237,7 +235,7 @@ func (syncer *Syncer) selectHead(heads map[peer.ID]*types.TipSet) (*types.TipSet
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if sel.Height()-nca.Height() > ForkLengthThreshold {
|
if sel.Height()-nca.Height() > build.ForkLengthThreshold {
|
||||||
// TODO: handle this better than refusing to sync
|
// TODO: handle this better than refusing to sync
|
||||||
return nil, fmt.Errorf("Conflict exists in heads set")
|
return nil, fmt.Errorf("Conflict exists in heads set")
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user