Merge pull request #285 from filecoin-project/fix/event-null-blocks
events: Handle null blocks correctly
This commit is contained in:
commit
3c68e66b4c
@ -27,16 +27,34 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
|
||||
// TODO: log error if h below gcconfidence
|
||||
// revert height-based triggers
|
||||
|
||||
for _, tid := range e.htHeights[ts.Height()] {
|
||||
// don't revert if newH is above this ts
|
||||
if newH >= ts.Height() {
|
||||
continue
|
||||
revert := func(h uint64, ts *types.TipSet) {
|
||||
for _, tid := range e.htHeights[h] {
|
||||
// don't revert if newH is above this ts
|
||||
if newH >= h {
|
||||
continue
|
||||
}
|
||||
|
||||
err := e.heightTriggers[tid].revert(ts)
|
||||
if err != nil {
|
||||
log.Errorf("reverting chain trigger (@H %d): %s", h, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
revert(ts.Height(), ts)
|
||||
|
||||
subh := ts.Height() - 1
|
||||
for {
|
||||
cts, err := e.tsc.get(subh)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := e.heightTriggers[tid].revert(ts)
|
||||
if err != nil {
|
||||
log.Errorf("reverting chain trigger (@H %d): %s", ts.Height(), err)
|
||||
if cts != nil {
|
||||
break
|
||||
}
|
||||
|
||||
revert(subh, ts)
|
||||
subh--
|
||||
}
|
||||
|
||||
if err := e.tsc.revert(ts); err != nil {
|
||||
@ -54,19 +72,44 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
|
||||
|
||||
// height triggers
|
||||
|
||||
for _, tid := range e.htTriggerHeights[ts.Height()] {
|
||||
hnd := e.heightTriggers[tid]
|
||||
triggerH := ts.Height() - uint64(hnd.confidence)
|
||||
apply := func(h uint64, ts *types.TipSet) error {
|
||||
for _, tid := range e.htTriggerHeights[h] {
|
||||
hnd := e.heightTriggers[tid]
|
||||
triggerH := h - uint64(hnd.confidence)
|
||||
|
||||
incTs, err := e.tsc.get(triggerH)
|
||||
incTs, err := e.tsc.getNonNull(triggerH)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := hnd.handle(incTs, h); err != nil {
|
||||
log.Errorf("chain trigger (@H %d, called @ %d) failed: %s", triggerH, ts.Height(), err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := apply(ts.Height(), ts); err != nil {
|
||||
return err
|
||||
}
|
||||
subh := ts.Height() - 1
|
||||
for {
|
||||
cts, err := e.tsc.get(subh)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := hnd.handle(incTs, ts.Height()); err != nil {
|
||||
log.Errorf("chain trigger (@H %d, called @ %d) failed: %s", triggerH, ts.Height(), err)
|
||||
if cts != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if err := apply(subh, ts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
subh--
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -75,6 +118,8 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
|
||||
// ChainAt invokes the specified `HeightHandler` when the chain reaches the
|
||||
// specified height+confidence threshold. If the chain is rolled-back under the
|
||||
// specified height, `RevertHandler` will be called.
|
||||
//
|
||||
// ts passed to handlers is the tipset at the specified, or above, if lower tipsets were null
|
||||
func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence int, h uint64) error {
|
||||
e.lk.Lock()
|
||||
defer e.lk.Unlock()
|
||||
|
@ -114,21 +114,28 @@ func (fcs *fakeCS) fakeMsgs(m fakeMsg) cid.Cid {
|
||||
return c
|
||||
}
|
||||
|
||||
func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid) { // todo: allow msgs
|
||||
func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid, nulls ...int) { // todo: allow msgs
|
||||
if fcs.sub == nil {
|
||||
fcs.t.Fatal("sub not be nil")
|
||||
}
|
||||
|
||||
nullm := map[int]struct{}{}
|
||||
for _, v := range nulls {
|
||||
nullm[v] = struct{}{}
|
||||
}
|
||||
|
||||
var revs []*types.TipSet
|
||||
for i := 0; i < rev; i++ {
|
||||
ts := fcs.tsc.best()
|
||||
|
||||
revs = append(revs, ts)
|
||||
if _, ok := nullm[int(ts.Height())]; !ok {
|
||||
revs = append(revs, ts)
|
||||
require.NoError(fcs.t, fcs.tsc.revert(ts))
|
||||
}
|
||||
fcs.h--
|
||||
require.NoError(fcs.t, fcs.tsc.revert(ts))
|
||||
}
|
||||
|
||||
apps := make([]*types.TipSet, app)
|
||||
var apps []*types.TipSet
|
||||
for i := 0; i < app; i++ {
|
||||
fcs.h++
|
||||
|
||||
@ -137,6 +144,10 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid) { // todo: allow
|
||||
mc = dummyCid
|
||||
}
|
||||
|
||||
if _, ok := nullm[int(fcs.h)]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
ts := makeTs(fcs.t, fcs.h, mc)
|
||||
require.NoError(fcs.t, fcs.tsc.add(ts))
|
||||
|
||||
@ -144,7 +155,11 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid) { // todo: allow
|
||||
fcs.blkMsgs[ts.Blocks()[0].Cid()] = mc
|
||||
}
|
||||
|
||||
apps[app-i-1] = ts
|
||||
apps = append(apps, ts)
|
||||
}
|
||||
|
||||
for i, j := 0, len(apps)-1; i < j; i, j = i+1, j-1 {
|
||||
apps[i], apps[j] = apps[j], apps[i]
|
||||
}
|
||||
|
||||
fcs.sub(revs, apps)
|
||||
@ -212,6 +227,79 @@ func TestAt(t *testing.T) {
|
||||
require.Equal(t, false, reverted)
|
||||
}
|
||||
|
||||
func TestAtNullTrigger(t *testing.T) {
|
||||
fcs := &fakeCS{
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
|
||||
|
||||
events := NewEvents(context.Background(), fcs)
|
||||
|
||||
var applied bool
|
||||
var reverted bool
|
||||
|
||||
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error {
|
||||
require.Equal(t, uint64(6), ts.Height())
|
||||
require.Equal(t, 8, int(curH))
|
||||
applied = true
|
||||
return nil
|
||||
}, func(ts *types.TipSet) error {
|
||||
reverted = true
|
||||
return nil
|
||||
}, 3, 5)
|
||||
require.NoError(t, err)
|
||||
|
||||
fcs.advance(0, 6, nil, 5)
|
||||
require.Equal(t, false, applied)
|
||||
require.Equal(t, false, reverted)
|
||||
|
||||
fcs.advance(0, 3, nil)
|
||||
require.Equal(t, true, applied)
|
||||
require.Equal(t, false, reverted)
|
||||
applied = false
|
||||
}
|
||||
|
||||
func TestAtNullConf(t *testing.T) {
|
||||
fcs := &fakeCS{
|
||||
t: t,
|
||||
h: 1,
|
||||
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
|
||||
}
|
||||
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
|
||||
|
||||
events := NewEvents(context.Background(), fcs)
|
||||
|
||||
var applied bool
|
||||
var reverted bool
|
||||
|
||||
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error {
|
||||
require.Equal(t, 5, int(ts.Height()))
|
||||
require.Equal(t, 8, int(curH))
|
||||
applied = true
|
||||
return nil
|
||||
}, func(ts *types.TipSet) error {
|
||||
reverted = true
|
||||
return nil
|
||||
}, 3, 5)
|
||||
require.NoError(t, err)
|
||||
|
||||
fcs.advance(0, 6, nil)
|
||||
require.Equal(t, false, applied)
|
||||
require.Equal(t, false, reverted)
|
||||
|
||||
fcs.advance(0, 3, nil, 8)
|
||||
require.Equal(t, true, applied)
|
||||
require.Equal(t, false, reverted)
|
||||
applied = false
|
||||
|
||||
fcs.advance(7, 1, nil)
|
||||
require.Equal(t, false, applied)
|
||||
require.Equal(t, true, reverted)
|
||||
reverted = false
|
||||
}
|
||||
|
||||
func TestAtStart(t *testing.T) {
|
||||
fcs := &fakeCS{
|
||||
t: t,
|
||||
|
@ -31,11 +31,26 @@ func newTSCache(cap int, storage tsByHFunc) *tipSetCache {
|
||||
|
||||
func (tsc *tipSetCache) add(ts *types.TipSet) error {
|
||||
if tsc.len > 0 {
|
||||
if tsc.cache[tsc.start].Height()+1 != ts.Height() {
|
||||
return xerrors.Errorf("tipSetCache.add: expected new tipset height to be %d, was %d", tsc.cache[tsc.start].Height()+1, ts.Height())
|
||||
if tsc.cache[tsc.start].Height() >= ts.Height() {
|
||||
return xerrors.Errorf("tipSetCache.add: expected new tipset height to be at least %d, was %d", tsc.cache[tsc.start].Height()+1, ts.Height())
|
||||
}
|
||||
}
|
||||
|
||||
nextH := ts.Height()
|
||||
if tsc.len > 0 {
|
||||
nextH = tsc.cache[tsc.start].Height() + 1
|
||||
}
|
||||
|
||||
// fill null blocks
|
||||
for nextH != ts.Height() {
|
||||
tsc.start = normalModulo(tsc.start+1, len(tsc.cache))
|
||||
tsc.cache[tsc.start] = nil
|
||||
if tsc.len < len(tsc.cache) {
|
||||
tsc.len++
|
||||
}
|
||||
nextH++
|
||||
}
|
||||
|
||||
tsc.start = normalModulo(tsc.start+1, len(tsc.cache))
|
||||
tsc.cache[tsc.start] = ts
|
||||
if tsc.len < len(tsc.cache) {
|
||||
@ -56,9 +71,24 @@ func (tsc *tipSetCache) revert(ts *types.TipSet) error {
|
||||
tsc.cache[tsc.start] = nil
|
||||
tsc.start = normalModulo(tsc.start-1, len(tsc.cache))
|
||||
tsc.len--
|
||||
|
||||
_ = tsc.revert(nil) // revert null block gap
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tsc *tipSetCache) getNonNull(height uint64) (*types.TipSet, error) {
|
||||
for {
|
||||
ts, err := tsc.get(height)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ts != nil {
|
||||
return ts, nil
|
||||
}
|
||||
height++
|
||||
}
|
||||
}
|
||||
|
||||
func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) {
|
||||
if tsc.len == 0 {
|
||||
return nil, xerrors.New("tipSetCache.get: cache is empty")
|
||||
@ -71,7 +101,13 @@ func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) {
|
||||
}
|
||||
|
||||
clen := len(tsc.cache)
|
||||
tail := tsc.cache[normalModulo(tsc.start-tsc.len+1, clen)]
|
||||
var tail *types.TipSet
|
||||
for i := 1; i <= tsc.len; i++ {
|
||||
tail = tsc.cache[normalModulo(tsc.start-tsc.len+i, clen)]
|
||||
if tail != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if height < tail.Height() {
|
||||
log.Warnf("tipSetCache.get: requested tipset not in cache, requesting from storage (h=%d; tail=%d)", height, tail.Height())
|
||||
|
@ -2,7 +2,7 @@ package events
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/stretchr/testify/require"
|
||||
"testing"
|
||||
|
||||
"github.com/filecoin-project/go-lotus/chain/types"
|
||||
@ -33,8 +33,6 @@ func TestTsCache(t *testing.T) {
|
||||
}
|
||||
|
||||
for i := 0; i < 9000; i++ {
|
||||
fmt.Printf("i=%d; tl=%d; tcl=%d\n", i, tsc.len, len(tsc.cache))
|
||||
|
||||
if i%90 > 60 {
|
||||
if err := tsc.revert(tsc.best()); err != nil {
|
||||
t.Fatal(err, "; i:", i)
|
||||
@ -47,3 +45,65 @@ func TestTsCache(t *testing.T) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestTsCacheNulls(t *testing.T) {
|
||||
tsc := newTSCache(50, func(context.Context, uint64, *types.TipSet) (*types.TipSet, error) {
|
||||
t.Fatal("storage call")
|
||||
return &types.TipSet{}, nil
|
||||
})
|
||||
|
||||
h := uint64(75)
|
||||
|
||||
add := func() {
|
||||
ts, err := types.NewTipSet([]*types.BlockHeader{{
|
||||
Height: h,
|
||||
ParentStateRoot: dummyCid,
|
||||
Messages: dummyCid,
|
||||
ParentMessageReceipts: dummyCid,
|
||||
}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := tsc.add(ts); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
h++
|
||||
}
|
||||
|
||||
add()
|
||||
add()
|
||||
add()
|
||||
h += 5
|
||||
|
||||
add()
|
||||
add()
|
||||
|
||||
require.Equal(t, h-1, tsc.best().Height())
|
||||
|
||||
ts, err := tsc.get(h - 1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, h-1, ts.Height())
|
||||
|
||||
ts, err = tsc.get(h - 2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, h-2, ts.Height())
|
||||
|
||||
ts, err = tsc.get(h - 3)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, ts)
|
||||
|
||||
ts, err = tsc.get(h - 8)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, h-8, ts.Height())
|
||||
|
||||
require.NoError(t, tsc.revert(tsc.best()))
|
||||
require.NoError(t, tsc.revert(tsc.best()))
|
||||
require.Equal(t, h-8, tsc.best().Height())
|
||||
|
||||
h += 50
|
||||
add()
|
||||
|
||||
ts, err = tsc.get(h - 1)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, h-1, ts.Height())
|
||||
}
|
||||
|
@ -104,6 +104,13 @@ func (ts *TipSet) Blocks() []*BlockHeader {
|
||||
}
|
||||
|
||||
func (ts *TipSet) Equals(ots *TipSet) bool {
|
||||
if ts == nil && ots == nil {
|
||||
return true
|
||||
}
|
||||
if ts == nil || ots == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if len(ts.blks) != len(ots.blks) {
|
||||
return false
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ var runCmd = &cli.Command{
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
nodeApi, closer, err := lcli.GetFullNodeAPI(cctx)
|
||||
nodeApi, closer, err := lcli.GetFullNodeAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user