basic chain event impl
This commit is contained in:
parent
bcedd87cfd
commit
5f88ceb965
@ -7,4 +7,6 @@ const UnixfsLinksPerLevel = 1024
|
||||
|
||||
const SectorSize = 1024
|
||||
|
||||
const ForkLengthThreshold = 20
|
||||
|
||||
// TODO: Move other important consts here
|
||||
|
314
chain/store/events.go
Normal file
314
chain/store/events.go
Normal file
@ -0,0 +1,314 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
"sync"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/build"
|
||||
"github.com/filecoin-project/go-lotus/chain/address"
|
||||
"github.com/filecoin-project/go-lotus/chain/types"
|
||||
)
|
||||
|
||||
// CheckFunc is used before one-shoot callbacks for atomicity
|
||||
// guarantees. If the condition the callbacks wait for has already happened in
|
||||
// tipset `ts`, this function MUST return true
|
||||
type CheckFunc func(ts *types.TipSet) (bool, error)
|
||||
|
||||
// `ts` is the tipset, in which the `msg` is included.
|
||||
// `curH`-`ts.Height` = `confidence`
|
||||
type HandleFunc func(msg *types.Message, ts *types.TipSet, curH uint64) error
|
||||
type RevertFunc func(ts *types.TipSet) error
|
||||
|
||||
type handler struct {
|
||||
confidence int
|
||||
|
||||
handle HandleFunc
|
||||
revert RevertFunc
|
||||
|
||||
msg *types.Message
|
||||
disable bool
|
||||
}
|
||||
|
||||
type callTuple struct {
|
||||
actor address.Address
|
||||
method uint64
|
||||
}
|
||||
|
||||
type eventChainStore interface {
|
||||
SubscribeHeadChanges(f func(rev, app []*types.TipSet) error)
|
||||
|
||||
GetHeaviestTipSet() *types.TipSet
|
||||
MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
|
||||
}
|
||||
|
||||
type Events struct {
|
||||
cs eventChainStore
|
||||
gcConfidence uint64
|
||||
|
||||
tsc *tipSetCache
|
||||
lk sync.Mutex
|
||||
|
||||
ctr uint64
|
||||
|
||||
// ChainAt
|
||||
|
||||
heightTriggers map[uint64]*handler
|
||||
|
||||
htTriggerHeights map[uint64][]uint64
|
||||
htHeights map[uint64][]uint64
|
||||
|
||||
// Called
|
||||
|
||||
calledTriggers map[uint64]handler
|
||||
|
||||
ctTriggers map[callTuple][]uint64
|
||||
}
|
||||
|
||||
func NewEvents(cs eventChainStore) *Events {
|
||||
gcConfidence := 2 * build.ForkLengthThreshold
|
||||
|
||||
e := &Events{
|
||||
cs: cs,
|
||||
gcConfidence: uint64(gcConfidence),
|
||||
|
||||
tsc: newTSCache(gcConfidence),
|
||||
|
||||
heightTriggers: map[uint64]*handler{},
|
||||
htTriggerHeights: map[uint64][]uint64{},
|
||||
htHeights: map[uint64][]uint64{},
|
||||
|
||||
calledTriggers: map[uint64]handler{},
|
||||
ctTriggers: map[callTuple][]uint64{},
|
||||
}
|
||||
|
||||
_ = e.tsc.add(cs.GetHeaviestTipSet())
|
||||
cs.SubscribeHeadChanges(e.headChange)
|
||||
|
||||
// TODO: cleanup goroutine
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *Events) headChange(rev, app []*types.TipSet) error {
|
||||
e.lk.Lock()
|
||||
defer e.lk.Unlock()
|
||||
|
||||
if len(app) == 0 {
|
||||
return xerrors.New("events.headChange expected at least one applied tipset")
|
||||
}
|
||||
|
||||
// highest tipset is always the first (see cs.ReorgOps)
|
||||
newH := app[0].Height()
|
||||
|
||||
for _, ts := range rev {
|
||||
// TODO: log error if h below gcconfidence
|
||||
// revert height-based triggers
|
||||
|
||||
for _, tid := range e.htHeights[ts.Height()] {
|
||||
// don't revert if newH is above this ts
|
||||
if newH >= ts.Height() {
|
||||
if e.heightTriggers[tid].msg != nil {
|
||||
// TODO: optimization: don't revert if app[newH - ts.Height()] contains the msg
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
err := e.heightTriggers[tid].revert(ts)
|
||||
if err != nil {
|
||||
log.Errorf("reverting chain trigger (@H %d): %s", ts.Height(), err)
|
||||
}
|
||||
}
|
||||
|
||||
// todo: called reverts
|
||||
|
||||
if err := e.tsc.revert(ts); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, ts := range app {
|
||||
if err := e.tsc.add(ts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// height triggers
|
||||
|
||||
for _, tid := range e.htTriggerHeights[ts.Height()] {
|
||||
hnd := e.heightTriggers[tid]
|
||||
if hnd.disable {
|
||||
continue
|
||||
}
|
||||
|
||||
triggerH := ts.Height() - uint64(hnd.confidence)
|
||||
|
||||
incTs, err := e.tsc.get(triggerH)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := hnd.handle(hnd.msg, incTs, ts.Height()); err != nil {
|
||||
msgInfo := ""
|
||||
if hnd.msg != nil {
|
||||
msgInfo = fmt.Sprintf("call %s(%d), ", hnd.msg.To, hnd.msg.Method)
|
||||
}
|
||||
log.Errorf("chain trigger (%s@H %d, called @ %d) failed: %s", msgInfo, triggerH, ts.Height(), err)
|
||||
}
|
||||
hnd.disable = hnd.msg != nil // special case for Called
|
||||
}
|
||||
|
||||
// called triggers
|
||||
|
||||
err := e.messagesForTs(ts, func(msg *types.Message) error {
|
||||
// TODO: do we have to verify the receipt, or are messages on chain
|
||||
// guaranteed to be successful?
|
||||
|
||||
ct := callTuple{
|
||||
actor: msg.To,
|
||||
method: msg.Method,
|
||||
}
|
||||
|
||||
triggers, ok := e.ctTriggers[ct]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, tid := range triggers {
|
||||
trigger := e.calledTriggers[tid]
|
||||
|
||||
err := e.chainAt(trigger.handle, trigger.revert, msg, trigger.confidence, ts.Height())
|
||||
if err != nil {
|
||||
log.Errorf("chain trigger (call %s(%d), msg found @ %d) failed: %s", msg.To, msg.Method, ts.Height(), err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Events) messagesForTs(ts *types.TipSet, consume func(*types.Message) error) error {
|
||||
seen := map[cid.Cid]struct{}{}
|
||||
|
||||
for _, tsb := range ts.Blocks() {
|
||||
bmsgs, smsgs, err := e.cs.MessagesForBlock(tsb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, m := range bmsgs {
|
||||
_, ok := seen[m.Cid()]
|
||||
if ok {
|
||||
continue
|
||||
}
|
||||
seen[m.Cid()] = struct{}{}
|
||||
|
||||
if err := consume(m); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, m := range smsgs {
|
||||
_, ok := seen[m.Message.Cid()]
|
||||
if ok {
|
||||
continue
|
||||
}
|
||||
seen[m.Message.Cid()] = struct{}{}
|
||||
|
||||
if err := consume(&m.Message); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Events) CalledOnce(check CheckFunc, hnd HandleFunc, rev RevertFunc, confidence int, actor address.Address, method uint64) error {
|
||||
e.lk.Lock()
|
||||
defer e.lk.Unlock()
|
||||
|
||||
// TODO: this should use older tipset, and take reverts into account
|
||||
done, err := check(e.tsc.best())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if done {
|
||||
// Already happened, don't bother registering callback
|
||||
return nil
|
||||
}
|
||||
|
||||
id := e.ctr
|
||||
e.ctr++
|
||||
|
||||
e.calledTriggers[id] = handler{
|
||||
confidence: confidence,
|
||||
|
||||
handle: hnd,
|
||||
revert: rev,
|
||||
}
|
||||
|
||||
ct := callTuple{
|
||||
actor: actor,
|
||||
method: method,
|
||||
}
|
||||
|
||||
e.ctTriggers[ct] = append(e.ctTriggers[ct], id)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Events) NotCalledBy(check CheckFunc, hnd HandleFunc, rev RevertFunc, confidence int, actor address.Address, method uint64, h uint64) {
|
||||
panic("impl")
|
||||
}
|
||||
|
||||
func (e *Events) ChainAt(hnd HandleFunc, rev RevertFunc, confidence int, h uint64) error {
|
||||
e.lk.Lock()
|
||||
defer e.lk.Unlock()
|
||||
|
||||
return e.chainAt(hnd, rev, nil, confidence, h)
|
||||
}
|
||||
|
||||
func (e *Events) chainAt(hnd HandleFunc, rev RevertFunc, msg *types.Message, confidence int, h uint64) error {
|
||||
bestH := e.tsc.best().Height()
|
||||
|
||||
if bestH >= h+uint64(confidence) {
|
||||
ts, err := e.tsc.get(h)
|
||||
if err != nil {
|
||||
log.Warnf("events.ChainAt: calling HandleFunc with nil tipset, not found in cache: %s", err)
|
||||
}
|
||||
|
||||
if err := hnd(msg, ts, bestH); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if bestH >= h+uint64(confidence)+e.gcConfidence {
|
||||
return nil
|
||||
}
|
||||
|
||||
triggerAt := h + uint64(confidence)
|
||||
|
||||
id := e.ctr
|
||||
e.ctr++
|
||||
|
||||
e.heightTriggers[id] = &handler{
|
||||
confidence: confidence,
|
||||
|
||||
handle: hnd,
|
||||
revert: rev,
|
||||
|
||||
msg: msg,
|
||||
}
|
||||
|
||||
e.htHeights[h] = append(e.htHeights[h], id)
|
||||
e.htTriggerHeights[triggerAt] = append(e.htTriggerHeights[triggerAt], id)
|
||||
|
||||
return nil
|
||||
}
|
306
chain/store/events_test.go
Normal file
306
chain/store/events_test.go
Normal file
@ -0,0 +1,306 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/filecoin-project/go-lotus/build"
|
||||
"github.com/filecoin-project/go-lotus/chain/address"
|
||||
"github.com/multiformats/go-multihash"
|
||||
"testing"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/chain/types"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var dummyCid cid.Cid
|
||||
|
||||
func init() {
|
||||
dummyCid, _ = cid.Parse("bafkqaaa")
|
||||
}
|
||||
|
||||
type fakeMsg struct {
|
||||
bmsgs []*types.Message
|
||||
smsgs []*types.SignedMessage
|
||||
}
|
||||
|
||||
type fakeCS struct {
|
||||
t *testing.T
|
||||
h uint64
|
||||
tsc *tipSetCache
|
||||
|
||||
msgs map[cid.Cid]fakeMsg
|
||||
|
||||
sub func(rev, app []*types.TipSet) error
|
||||
}
|
||||
|
||||
func makeTs(t *testing.T, h uint64, msgcid cid.Cid) *types.TipSet {
|
||||
ts, err := types.NewTipSet([]*types.BlockHeader{
|
||||
{
|
||||
Height: h,
|
||||
|
||||
StateRoot: dummyCid,
|
||||
Messages: msgcid,
|
||||
MessageReceipts: dummyCid,
|
||||
},
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
return ts
|
||||
}
|
||||
|
||||
func (fcs *fakeCS) SubscribeHeadChanges(f func(rev, app []*types.TipSet) error) {
|
||||
if fcs.sub != nil {
|
||||
fcs.t.Fatal("sub should be nil")
|
||||
}
|
||||
fcs.sub = f
|
||||
}
|
||||
|
||||
func (fcs *fakeCS) GetHeaviestTipSet() *types.TipSet {
|
||||
return fcs.tsc.best()
|
||||
}
|
||||
|
||||
func (fcs *fakeCS) MessagesForBlock(b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
||||
ms, ok := fcs.msgs[b.Messages]
|
||||
if ok {
|
||||
return ms.bmsgs, ms.smsgs, nil
|
||||
}
|
||||
|
||||
return []*types.Message{}, []*types.SignedMessage{}, nil
|
||||
}
|
||||
|
||||
func (fcs *fakeCS) fakeMsgs(m fakeMsg) cid.Cid {
|
||||
n := len(fcs.msgs)
|
||||
c, err := cid.Prefix{
|
||||
Version: 1,
|
||||
Codec: cid.Raw,
|
||||
MhType: multihash.IDENTITY,
|
||||
MhLength: -1,
|
||||
}.Sum([]byte(fmt.Sprintf("%d", n)))
|
||||
require.NoError(fcs.t, err)
|
||||
|
||||
fcs.msgs[c] = m
|
||||
return c
|
||||
}
|
||||
|
||||
func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid) { // todo: allow msgs
|
||||
if fcs.sub == nil {
|
||||
fcs.t.Fatal("sub not be nil")
|
||||
}
|
||||
|
||||
var revs []*types.TipSet
|
||||
for i := 0; i < rev; i++ {
|
||||
ts := fcs.tsc.best()
|
||||
|
||||
revs = append(revs, ts)
|
||||
fcs.h--
|
||||
require.NoError(fcs.t, fcs.tsc.revert(ts))
|
||||
}
|
||||
|
||||
var apps []*types.TipSet
|
||||
for i := 0; i < app; i++ {
|
||||
fcs.h++
|
||||
|
||||
mc, _ := msgs[i]
|
||||
if mc == cid.Undef {
|
||||
mc = dummyCid
|
||||
}
|
||||
|
||||
ts := makeTs(fcs.t, fcs.h, mc)
|
||||
require.NoError(fcs.t, fcs.tsc.add(ts))
|
||||
|
||||
apps = append(apps, ts)
|
||||
}
|
||||
|
||||
err := fcs.sub(revs, apps)
|
||||
require.NoError(fcs.t, err)
|
||||
}
|
||||
|
||||
var _ eventChainStore = &fakeCS{}
|
||||
|
||||
func TestAt(t *testing.T) {
|
||||
fcs := &fakeCS{
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2 * build.ForkLengthThreshold),
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
|
||||
|
||||
events := NewEvents(fcs)
|
||||
|
||||
var applied bool
|
||||
var reverted bool
|
||||
|
||||
err := events.ChainAt(func(msg *types.Message, ts *types.TipSet, curH uint64) error {
|
||||
require.Equal(t, 5, int(ts.Height()))
|
||||
require.Equal(t, 8, int(curH))
|
||||
require.Nil(t, msg)
|
||||
applied = true
|
||||
return nil
|
||||
}, func(ts *types.TipSet) error {
|
||||
reverted = true
|
||||
return nil
|
||||
}, 3, 5)
|
||||
require.NoError(t, err)
|
||||
|
||||
fcs.advance(0, 3, nil)
|
||||
require.Equal(t, false, applied)
|
||||
require.Equal(t, false, reverted)
|
||||
|
||||
fcs.advance(0, 3, nil)
|
||||
require.Equal(t, false, applied)
|
||||
require.Equal(t, false, reverted)
|
||||
|
||||
fcs.advance(0, 3, nil)
|
||||
require.Equal(t, true, applied)
|
||||
require.Equal(t, false, reverted)
|
||||
applied = false
|
||||
|
||||
fcs.advance(0, 3, nil)
|
||||
require.Equal(t, false, applied)
|
||||
require.Equal(t, false, reverted)
|
||||
|
||||
fcs.advance(10, 1, nil)
|
||||
require.Equal(t, false, applied)
|
||||
require.Equal(t, true, reverted)
|
||||
reverted = false
|
||||
|
||||
fcs.advance(0, 1, nil)
|
||||
require.Equal(t, false, applied)
|
||||
require.Equal(t, false, reverted)
|
||||
|
||||
fcs.advance(0, 2, nil)
|
||||
require.Equal(t, false, applied)
|
||||
require.Equal(t, false, reverted)
|
||||
|
||||
fcs.advance(0, 1, nil) // 8
|
||||
require.Equal(t, true, applied)
|
||||
require.Equal(t, false, reverted)
|
||||
}
|
||||
|
||||
func TestCalled(t *testing.T) {
|
||||
fcs := &fakeCS{
|
||||
t: t,
|
||||
h: 1,
|
||||
|
||||
msgs: map[cid.Cid]fakeMsg{},
|
||||
tsc: newTSCache(2 * build.ForkLengthThreshold),
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
|
||||
|
||||
events := NewEvents(fcs)
|
||||
|
||||
t0123, err := address.NewFromString("t0123")
|
||||
require.NoError(t, err)
|
||||
|
||||
var applied, reverted bool
|
||||
var appliedMsg *types.Message
|
||||
var appliedTs *types.TipSet
|
||||
var appliedH uint64
|
||||
|
||||
err = events.CalledOnce(func(ts *types.TipSet) (b bool, e error) {
|
||||
return false, nil
|
||||
}, func(msg *types.Message, ts *types.TipSet, curH uint64) error {
|
||||
applied = true
|
||||
appliedMsg = msg
|
||||
appliedTs = ts
|
||||
appliedH = curH
|
||||
return nil
|
||||
}, func(ts *types.TipSet) error {
|
||||
reverted = true
|
||||
return nil
|
||||
}, 3, t0123, 5)
|
||||
require.NoError(t, err)
|
||||
|
||||
// create few blocks to make sure nothing get's randomly called
|
||||
|
||||
fcs.advance(0, 4, nil) // H=5
|
||||
require.Equal(t, false, applied)
|
||||
require.Equal(t, false, reverted)
|
||||
|
||||
// create blocks with message (but below confidence threshold)
|
||||
|
||||
fcs.advance(0, 3, map[int]cid.Cid{ // msg at H=6; H=8 (confidence=2)
|
||||
0: fcs.fakeMsgs(fakeMsg{
|
||||
bmsgs: []*types.Message{
|
||||
{To: t0123, Method: 5, Nonce: 1},
|
||||
},
|
||||
}),
|
||||
})
|
||||
|
||||
require.Equal(t, false, applied)
|
||||
require.Equal(t, false, reverted)
|
||||
|
||||
// create additional block so we are above confidence threshold
|
||||
|
||||
fcs.advance(0, 1, nil) // H=9 (confidence=3, apply)
|
||||
|
||||
require.Equal(t, true, applied)
|
||||
require.Equal(t, false, reverted)
|
||||
applied = false
|
||||
|
||||
require.Equal(t, uint64(6), appliedTs.Height())
|
||||
require.Equal(t, "bafkqaajq", appliedTs.Blocks()[0].Messages.String())
|
||||
require.Equal(t, uint64(9), appliedH)
|
||||
require.Equal(t, t0123, appliedMsg.To)
|
||||
require.Equal(t, uint64(1), appliedMsg.Nonce)
|
||||
require.Equal(t, uint64(5), appliedMsg.Method)
|
||||
|
||||
// revert some blocks, keep the message
|
||||
|
||||
fcs.advance(3, 1, nil) // H=7 (confidence=1)
|
||||
require.Equal(t, false, applied)
|
||||
require.Equal(t, false, reverted)
|
||||
|
||||
// revert the message
|
||||
|
||||
fcs.advance(2, 1, nil) // H=6, we reverted ts with the msg
|
||||
|
||||
require.Equal(t, false, applied)
|
||||
require.Equal(t, true, reverted)
|
||||
reverted = false
|
||||
|
||||
// send new message on different height
|
||||
|
||||
n2msg := fcs.fakeMsgs(fakeMsg{
|
||||
bmsgs: []*types.Message{
|
||||
{To: t0123, Method: 5, Nonce: 2},
|
||||
},
|
||||
})
|
||||
|
||||
fcs.advance(0, 4, map[int]cid.Cid{ // msg at H=7; H=10 (confidence=3)
|
||||
0: n2msg,
|
||||
})
|
||||
|
||||
require.Equal(t, true, applied)
|
||||
require.Equal(t, false, reverted)
|
||||
applied = false
|
||||
|
||||
require.Equal(t, uint64(7), appliedTs.Height())
|
||||
require.Equal(t, "bafkqaajr", appliedTs.Blocks()[0].Messages.String())
|
||||
require.Equal(t, uint64(10), appliedH)
|
||||
require.Equal(t, t0123, appliedMsg.To)
|
||||
require.Equal(t, uint64(2), appliedMsg.Nonce)
|
||||
require.Equal(t, uint64(5), appliedMsg.Method)
|
||||
|
||||
// revert and apply at different height
|
||||
|
||||
fcs.advance(4, 5, map[int]cid.Cid{ // msg at H=8; H=11 (confidence=3)
|
||||
1: n2msg,
|
||||
})
|
||||
|
||||
// TODO: We probably don't want to call revert/apply, as restarting certain
|
||||
// actions may be expensive, and in this case the message is still
|
||||
// on-chain, just at different height
|
||||
require.Equal(t, true, applied)
|
||||
require.Equal(t, true, reverted)
|
||||
reverted = false
|
||||
applied = false
|
||||
|
||||
require.Equal(t, uint64(8), appliedTs.Height())
|
||||
require.Equal(t, "bafkqaajr", appliedTs.Blocks()[0].Messages.String())
|
||||
require.Equal(t, uint64(11), appliedH)
|
||||
require.Equal(t, t0123, appliedMsg.To)
|
||||
require.Equal(t, uint64(2), appliedMsg.Nonce)
|
||||
require.Equal(t, uint64(5), appliedMsg.Method)
|
||||
}
|
79
chain/store/tscache.go
Normal file
79
chain/store/tscache.go
Normal file
@ -0,0 +1,79 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/chain/types"
|
||||
)
|
||||
|
||||
// tipSetCache implements a simple ring-buffer cache to keep track of recent
|
||||
// tipsets
|
||||
type tipSetCache struct {
|
||||
cache []*types.TipSet
|
||||
start int
|
||||
len int
|
||||
}
|
||||
|
||||
func newTSCache(cap int) *tipSetCache {
|
||||
return &tipSetCache{
|
||||
cache: make([]*types.TipSet, cap),
|
||||
start: 0,
|
||||
len: 0,
|
||||
}
|
||||
}
|
||||
|
||||
func (tsc *tipSetCache) add(ts *types.TipSet) error {
|
||||
if tsc.len > 0 {
|
||||
if tsc.cache[tsc.start].Height()+1 != ts.Height() {
|
||||
return xerrors.Errorf("tipSetCache.add: expected new tipset height to be %d, was %d", tsc.cache[tsc.start].Height()+1, ts.Height())
|
||||
}
|
||||
}
|
||||
|
||||
tsc.start = (tsc.start + 1) % len(tsc.cache)
|
||||
tsc.cache[tsc.start] = ts
|
||||
if tsc.len < len(tsc.cache) {
|
||||
tsc.len++
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tsc *tipSetCache) revert(ts *types.TipSet) error {
|
||||
if tsc.len == 0 {
|
||||
return xerrors.New("tipSetCache.revert: nothing to revert; cache is empty")
|
||||
}
|
||||
|
||||
if !tsc.cache[tsc.start].Equals(ts) {
|
||||
return xerrors.New("tipSetCache.revert: revert tipset didn't match cache head")
|
||||
}
|
||||
|
||||
tsc.cache[tsc.start] = nil
|
||||
tsc.start = (tsc.start - 1) % len(tsc.cache)
|
||||
tsc.len--
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) {
|
||||
if tsc.len == 0 {
|
||||
return nil, xerrors.New("tipSetCache.get: cache is empty")
|
||||
}
|
||||
|
||||
headH := tsc.cache[tsc.start].Height()
|
||||
|
||||
if height > headH {
|
||||
return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache head: %d)", height, headH)
|
||||
}
|
||||
|
||||
tailH := tsc.cache[(tsc.start-tsc.len+1)%len(tsc.cache)].Height()
|
||||
|
||||
if height < tailH {
|
||||
// TODO: we can try to walk parents, but that shouldn't happen in
|
||||
// practice, so it's probably not worth implementing
|
||||
return nil, xerrors.Errorf("tipSetCache.get: requested tipset not in cache (req: %d, cache tail: %d)", height, tailH)
|
||||
}
|
||||
|
||||
return tsc.cache[int(height-tailH+1)%len(tsc.cache)], nil
|
||||
}
|
||||
|
||||
func (tsc *tipSetCache) best() *types.TipSet {
|
||||
return tsc.cache[tsc.start]
|
||||
}
|
@ -26,8 +26,6 @@ import (
|
||||
"github.com/whyrusleeping/sharray"
|
||||
)
|
||||
|
||||
const ForkLengthThreshold = 20
|
||||
|
||||
var log = logging.Logger("chain")
|
||||
|
||||
type Syncer struct {
|
||||
@ -237,7 +235,7 @@ func (syncer *Syncer) selectHead(heads map[peer.ID]*types.TipSet) (*types.TipSet
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if sel.Height()-nca.Height() > ForkLengthThreshold {
|
||||
if sel.Height()-nca.Height() > build.ForkLengthThreshold {
|
||||
// TODO: handle this better than refusing to sync
|
||||
return nil, fmt.Errorf("Conflict exists in heads set")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user