chainstore: execute reorg ops in separate goroutine

This commit is contained in:
Łukasz Magiera 2019-10-11 01:50:49 +02:00
parent 4c5157d270
commit 1a5bf0afe3
2 changed files with 73 additions and 37 deletions

View File

@ -3,10 +3,9 @@ package metrics
import (
"context"
"encoding/json"
"go.uber.org/fx"
logging "github.com/ipfs/go-log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.uber.org/fx"
"github.com/filecoin-project/go-lotus/node/impl/full"
"github.com/filecoin-project/go-lotus/node/modules/helpers"
@ -23,35 +22,40 @@ type Update struct {
func SendHeadNotifs(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, chain full.ChainAPI) error {
ctx := helpers.LifecycleCtx(mctx, lc)
gen, err := chain.Chain.GetGenesis()
if err != nil {
return err
}
topic := baseTopic + gen.Cid().String()
go func() {
if err := sendHeadNotifs(ctx, ps, topic, chain); err != nil {
log.Error("consensus metrics error", err)
return
}
}()
go func() {
sub, err := ps.Subscribe(topic)
if err != nil {
return
}
defer sub.Cancel()
for {
if _, err := sub.Next(ctx); err != nil {
return
lc.Append(fx.Hook{
OnStart: func(_ context.Context) error {
gen, err := chain.Chain.GetGenesis()
if err != nil {
return err
}
}
}()
topic := baseTopic + gen.Cid().String()
return err
go func() {
if err := sendHeadNotifs(ctx, ps, topic, chain); err != nil {
log.Error("consensus metrics error", err)
return
}
}()
go func() {
sub, err := ps.Subscribe(topic)
if err != nil {
return
}
defer sub.Cancel()
for {
if _, err := sub.Next(ctx); err != nil {
return
}
}
}()
return nil
},
})
return nil
}
func sendHeadNotifs(ctx context.Context, ps *pubsub.PubSub, topic string, chain full.ChainAPI) error {

View File

@ -45,6 +45,7 @@ type ChainStore struct {
tstLk sync.Mutex
tipsets map[uint64][]cid.Cid
reorgCh chan<- reorg
headChangeNotifs []func(rev, app []*types.TipSet) error
}
@ -56,6 +57,8 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore {
tipsets: make(map[uint64][]cid.Cid),
}
cs.reorgCh = cs.reorgWorker(context.TODO())
hcnf := func(rev, app []*types.TipSet) error {
cs.pubLk.Lock()
defer cs.pubLk.Unlock()
@ -217,17 +220,46 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ts *types.TipSet) error {
return nil
}
func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error {
if cs.heaviest != nil {
revert, apply, err := cs.ReorgOps(cs.heaviest, ts)
if err != nil {
return errors.Wrap(err, "computing reorg ops failed")
}
for _, hcf := range cs.headChangeNotifs {
if err := hcf(revert, apply); err != nil {
return errors.Wrap(err, "head change func errored (BAD)")
type reorg struct {
old *types.TipSet
new *types.TipSet
}
func (cs *ChainStore) reorgWorker(ctx context.Context) chan<- reorg {
out := make(chan reorg, 32)
go func() {
defer log.Warn("reorgWorker quit")
for {
select {
case r := <-out:
revert, apply, err := cs.ReorgOps(r.old, r.new)
if err != nil {
log.Error("computing reorg ops failed: ", err)
continue
}
for _, hcf := range cs.headChangeNotifs {
if err := hcf(revert, apply); err != nil {
log.Error("head change func errored (BAD): ", err)
}
}
case <-ctx.Done():
return
}
}
}()
return out
}
func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error {
if cs.heaviest != nil { // buf
if len(cs.reorgCh) > 0 {
log.Warnf("Reorg channel running behind, %d reorgs buffered", len(cs.reorgCh))
}
cs.reorgCh <- reorg{
old: cs.heaviest,
new: ts,
}
} else {
log.Warn("no heaviest tipset found, using %s", ts.Cids())
}