only subscribe to messages topic if we are synced within 10 epochs

This commit is contained in:
vyzo 2020-09-08 09:16:34 +03:00
parent c966054bf6
commit 32b7797394
2 changed files with 94 additions and 14 deletions

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"errors"
"io" "io"
"os" "os"
"strconv" "strconv"
@ -59,6 +60,8 @@ var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation")
var DefaultTipSetCacheSize = 8192 var DefaultTipSetCacheSize = 8192
var DefaultMsgMetaCacheSize = 2048 var DefaultMsgMetaCacheSize = 2048
var ErrNotifeeDone = errors.New("notifee is done and should be removed")
func init() { func init() {
if s := os.Getenv("LOTUS_CHAIN_TIPSET_CACHE"); s != "" { if s := os.Getenv("LOTUS_CHAIN_TIPSET_CACHE"); s != "" {
tscs, err := strconv.Atoi(s) tscs, err := strconv.Atoi(s)
@ -404,11 +407,36 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
apply[i], apply[opp] = apply[opp], apply[i] apply[i], apply[opp] = apply[opp], apply[i]
} }
for _, hcf := range notifees { var toremove map[int]struct{}
if err := hcf(revert, apply); err != nil { for i, hcf := range notifees {
err := hcf(revert, apply)
switch err {
case nil:
case ErrNotifeeDone:
if toremove == nil {
toremove = make(map[int]struct{})
}
toremove[i] = struct{}{}
default:
log.Error("head change func errored (BAD): ", err) log.Error("head change func errored (BAD): ", err)
} }
} }
if len(toremove) > 0 {
newNotifees := make([]ReorgNotifee, 0, len(notifees)-len(toremove))
for i, hcf := range notifees {
_, remove := toremove[i]
if remove {
continue
}
newNotifees = append(newNotifees, hcf)
}
notifees = newNotifees
}
case <-ctx.Done(): case <-ctx.Done():
return return
} }

View File

@ -2,6 +2,7 @@ package modules
import ( import (
"context" "context"
"time"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/namespace"
@ -25,6 +26,7 @@ import (
"github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/sub" "github.com/filecoin-project/lotus/chain/sub"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/peermgr" "github.com/filecoin-project/lotus/lib/peermgr"
marketevents "github.com/filecoin-project/lotus/markets/loggers" marketevents "github.com/filecoin-project/lotus/markets/loggers"
@ -82,14 +84,45 @@ func RunChainExchange(h host.Host, svc exchange.Server) {
h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new
} }
func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) {
nearsync := uint64(epochs) * uint64(build.BlockDelaySecs) * uint64(time.Second) //nolint
// early check, are we synced at start up?
ts := stmgr.ChainStore().GetHeaviestTipSet()
timestamp := ts.MinTimestamp()
now := uint64(build.Clock.Now().UnixNano())
if timestamp > now-nearsync {
subscribe()
return
}
// we are not synced, subscribe to head changes and wait for sync
stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
if len(app) == 0 {
return nil
}
latest := app[0].MinTimestamp()
for _, ts := range app[1:] {
timestamp := ts.MinTimestamp()
if timestamp > latest {
latest = timestamp
}
}
now := uint64(build.Clock.Now().UnixNano())
if latest > now-nearsync {
subscribe()
return store.ErrNotifeeDone
}
return nil
})
}
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) { func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) {
ctx := helpers.LifecycleCtx(mctx, lc) ctx := helpers.LifecycleCtx(mctx, lc)
blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
if err != nil {
panic(err)
}
v := sub.NewBlockValidator( v := sub.NewBlockValidator(
h.ID(), chain, stmgr, h.ID(), chain, stmgr,
func(p peer.ID) { func(p peer.ID) {
@ -101,24 +134,43 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P
panic(err) panic(err)
} }
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager()) log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn))
}
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) { blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
ctx := helpers.LifecycleCtx(mctx, lc)
msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint:staticcheck
if err != nil { if err != nil {
panic(err) panic(err)
} }
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
}
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {
ctx := helpers.LifecycleCtx(mctx, lc)
v := sub.NewMessageValidator(h.ID(), mpool) v := sub.NewMessageValidator(h.ID(), mpool)
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil { if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
panic(err) panic(err)
} }
subscribe := func() {
log.Infof("subscribing to pubsub topic %s", build.MessagesTopic(nn))
msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint
if err != nil {
panic(err)
}
go sub.HandleIncomingMessages(ctx, mpool, msgsub) go sub.HandleIncomingMessages(ctx, mpool, msgsub)
}
if bootstrapper {
subscribe()
return
}
// wait until we are synced within 10 epochs
waitForSync(stmgr, 10, subscribe)
} }
func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) { func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) {