Merge pull request #1303 from filecoin-project/feat/stats-head-buffer

stats: add small head change buffer to reduce collecting extraneous stats from small reorgs
This commit is contained in:
Łukasz Magiera 2020-02-28 04:13:50 +01:00 committed by GitHub
commit a97e7ea52a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 102 additions and 3 deletions

View File

@ -0,0 +1,47 @@
package main
import (
"container/list"
"github.com/filecoin-project/lotus/chain/store"
)
type headBuffer struct {
buffer *list.List
size int
}
func NewHeadBuffer(size int) *headBuffer {
buffer := list.New()
buffer.Init()
return &headBuffer{
buffer: buffer,
size: size,
}
}
func (h *headBuffer) Push(hc *store.HeadChange) (rethc *store.HeadChange) {
if h.buffer.Len() == h.size {
var ok bool
el := h.buffer.Front()
rethc, ok = el.Value.(*store.HeadChange)
if !ok {
panic("Value from list is not the correct type")
}
h.buffer.Remove(el)
}
h.buffer.PushBack(hc)
return
}
func (h *headBuffer) Pop() {
el := h.buffer.Back()
if el != nil {
h.buffer.Remove(el)
}
}

View File

@ -0,0 +1,43 @@
package main
import (
"testing"
"github.com/filecoin-project/lotus/chain/store"
"github.com/stretchr/testify/require"
)
func TestHeadBuffer(t *testing.T) {
t.Run("Straight push through", func(t *testing.T) {
hb := NewHeadBuffer(5)
require.Nil(t, hb.Push(&store.HeadChange{Type: "1"}))
require.Nil(t, hb.Push(&store.HeadChange{Type: "2"}))
require.Nil(t, hb.Push(&store.HeadChange{Type: "3"}))
require.Nil(t, hb.Push(&store.HeadChange{Type: "4"}))
require.Nil(t, hb.Push(&store.HeadChange{Type: "5"}))
hc := hb.Push(&store.HeadChange{Type: "6"})
require.Equal(t, hc.Type, "1")
})
t.Run("Reverts", func(t *testing.T) {
hb := NewHeadBuffer(5)
require.Nil(t, hb.Push(&store.HeadChange{Type: "1"}))
require.Nil(t, hb.Push(&store.HeadChange{Type: "2"}))
require.Nil(t, hb.Push(&store.HeadChange{Type: "3"}))
hb.Pop()
require.Nil(t, hb.Push(&store.HeadChange{Type: "3a"}))
hb.Pop()
require.Nil(t, hb.Push(&store.HeadChange{Type: "3b"}))
require.Nil(t, hb.Push(&store.HeadChange{Type: "4"}))
require.Nil(t, hb.Push(&store.HeadChange{Type: "5"}))
hc := hb.Push(&store.HeadChange{Type: "6"})
require.Equal(t, hc.Type, "1")
hc = hb.Push(&store.HeadChange{Type: "7"})
require.Equal(t, hc.Type, "2")
hc = hb.Push(&store.HeadChange{Type: "8"})
require.Equal(t, hc.Type, "3b")
})
}

View File

@ -22,10 +22,12 @@ func main() {
var database string = "lotus"
var reset bool = false
var height int64 = 0
var headlag int = 3
flag.StringVar(&repo, "repo", repo, "lotus repo path")
flag.StringVar(&database, "database", database, "influx database")
flag.Int64Var(&height, "height", height, "block height to start syncing from (0 will resume)")
flag.IntVar(&headlag, "head-lag", headlag, "number of head events to hold to protect against small reorgs")
flag.BoolVar(&reset, "reset", reset, "truncate database before starting stats gathering")
flag.Parse()
@ -66,7 +68,7 @@ func main() {
log.Fatal(err)
}
tipsetsCh, err := GetTips(ctx, api, uint64(height))
tipsetsCh, err := GetTips(ctx, api, uint64(height), headlag)
if err != nil {
log.Fatal(err)
}
@ -75,6 +77,7 @@ func main() {
defer wq.Close()
for tipset := range tipsetsCh {
log.Infow("Collect stats", "height", tipset.Height())
pl := NewPointList()
height := tipset.Height()

View File

@ -120,9 +120,11 @@ sync_complete:
}
}
func GetTips(ctx context.Context, api api.FullNode, lastHeight uint64) (<-chan *types.TipSet, error) {
func GetTips(ctx context.Context, api api.FullNode, lastHeight uint64, headlag int) (<-chan *types.TipSet, error) {
chmain := make(chan *types.TipSet)
hb := NewHeadBuffer(headlag)
notif, err := api.ChainNotify(ctx)
if err != nil {
return nil, err
@ -151,7 +153,11 @@ func GetTips(ctx context.Context, api api.FullNode, lastHeight uint64) (<-chan *
chmain <- tipset
}
case store.HCApply:
chmain <- change.Val
if out := hb.Push(change); out != nil {
chmain <- out.Val
}
case store.HCRevert:
hb.Pop()
}
}
case <-ping: