From 7f731db9dd51d7f72ed09f784497f0143c7cc8da Mon Sep 17 00:00:00 2001 From: Travis Person Date: Fri, 28 Feb 2020 02:59:43 +0000 Subject: [PATCH] stats: add small head change buffer to reduce collecting extraneous stats from small reorgs --- tools/stats/head_buffer.go | 47 +++++++++++++++++++++++++++++++++ tools/stats/head_buffer_test.go | 43 ++++++++++++++++++++++++++++++ tools/stats/main.go | 5 +++- tools/stats/rpc.go | 10 +++++-- 4 files changed, 102 insertions(+), 3 deletions(-) create mode 100644 tools/stats/head_buffer.go create mode 100644 tools/stats/head_buffer_test.go diff --git a/tools/stats/head_buffer.go b/tools/stats/head_buffer.go new file mode 100644 index 000000000..77102e034 --- /dev/null +++ b/tools/stats/head_buffer.go @@ -0,0 +1,47 @@ +package main + +import ( + "container/list" + + "github.com/filecoin-project/lotus/chain/store" +) + +type headBuffer struct { + buffer *list.List + size int +} + +func NewHeadBuffer(size int) *headBuffer { + buffer := list.New() + buffer.Init() + + return &headBuffer{ + buffer: buffer, + size: size, + } +} + +func (h *headBuffer) Push(hc *store.HeadChange) (rethc *store.HeadChange) { + if h.buffer.Len() == h.size { + var ok bool + + el := h.buffer.Front() + rethc, ok = el.Value.(*store.HeadChange) + if !ok { + panic("Value from list is not the correct type") + } + + h.buffer.Remove(el) + } + + h.buffer.PushBack(hc) + + return +} + +func (h *headBuffer) Pop() { + el := h.buffer.Back() + if el != nil { + h.buffer.Remove(el) + } +} diff --git a/tools/stats/head_buffer_test.go b/tools/stats/head_buffer_test.go new file mode 100644 index 000000000..3ae7d749c --- /dev/null +++ b/tools/stats/head_buffer_test.go @@ -0,0 +1,43 @@ +package main + +import ( + "testing" + + "github.com/filecoin-project/lotus/chain/store" + "github.com/stretchr/testify/require" +) + +func TestHeadBuffer(t *testing.T) { + + t.Run("Straight push through", func(t *testing.T) { + hb := NewHeadBuffer(5) + require.Nil(t, hb.Push(&store.HeadChange{Type: "1"})) + require.Nil(t, hb.Push(&store.HeadChange{Type: "2"})) + require.Nil(t, hb.Push(&store.HeadChange{Type: "3"})) + require.Nil(t, hb.Push(&store.HeadChange{Type: "4"})) + require.Nil(t, hb.Push(&store.HeadChange{Type: "5"})) + + hc := hb.Push(&store.HeadChange{Type: "6"}) + require.Equal(t, hc.Type, "1") + }) + + t.Run("Reverts", func(t *testing.T) { + hb := NewHeadBuffer(5) + require.Nil(t, hb.Push(&store.HeadChange{Type: "1"})) + require.Nil(t, hb.Push(&store.HeadChange{Type: "2"})) + require.Nil(t, hb.Push(&store.HeadChange{Type: "3"})) + hb.Pop() + require.Nil(t, hb.Push(&store.HeadChange{Type: "3a"})) + hb.Pop() + require.Nil(t, hb.Push(&store.HeadChange{Type: "3b"})) + require.Nil(t, hb.Push(&store.HeadChange{Type: "4"})) + require.Nil(t, hb.Push(&store.HeadChange{Type: "5"})) + + hc := hb.Push(&store.HeadChange{Type: "6"}) + require.Equal(t, hc.Type, "1") + hc = hb.Push(&store.HeadChange{Type: "7"}) + require.Equal(t, hc.Type, "2") + hc = hb.Push(&store.HeadChange{Type: "8"}) + require.Equal(t, hc.Type, "3b") + }) +} diff --git a/tools/stats/main.go b/tools/stats/main.go index fb383e8e1..e03845133 100644 --- a/tools/stats/main.go +++ b/tools/stats/main.go @@ -22,10 +22,12 @@ func main() { var database string = "lotus" var reset bool = false var height int64 = 0 + var headlag int = 3 flag.StringVar(&repo, "repo", repo, "lotus repo path") flag.StringVar(&database, "database", database, "influx database") flag.Int64Var(&height, "height", height, "block height to start syncing from (0 will resume)") + flag.IntVar(&headlag, "head-lag", headlag, "number of head events to hold to protect against small reorgs") flag.BoolVar(&reset, "reset", reset, "truncate database before starting stats gathering") flag.Parse() @@ -66,7 +68,7 @@ func main() { log.Fatal(err) } - tipsetsCh, err := GetTips(ctx, api, uint64(height)) + tipsetsCh, err := GetTips(ctx, api, uint64(height), headlag) if err != nil { log.Fatal(err) } @@ -75,6 +77,7 @@ func main() { defer wq.Close() for tipset := range tipsetsCh { + log.Infow("Collect stats", "height", tipset.Height()) pl := NewPointList() height := tipset.Height() diff --git a/tools/stats/rpc.go b/tools/stats/rpc.go index 860ffc95f..2d634a587 100644 --- a/tools/stats/rpc.go +++ b/tools/stats/rpc.go @@ -120,9 +120,11 @@ sync_complete: } } -func GetTips(ctx context.Context, api api.FullNode, lastHeight uint64) (<-chan *types.TipSet, error) { +func GetTips(ctx context.Context, api api.FullNode, lastHeight uint64, headlag int) (<-chan *types.TipSet, error) { chmain := make(chan *types.TipSet) + hb := NewHeadBuffer(headlag) + notif, err := api.ChainNotify(ctx) if err != nil { return nil, err @@ -151,7 +153,11 @@ func GetTips(ctx context.Context, api api.FullNode, lastHeight uint64) (<-chan * chmain <- tipset } case store.HCApply: - chmain <- change.Val + if out := hb.Push(change); out != nil { + chmain <- out.Val + } + case store.HCRevert: + hb.Pop() } } case <-ping: