Merge pull request #3643 from filecoin-project/revert-3602-feat/delayed-pubsub-join
Revert "only subscribe to pubsub topics once we are synced"
This commit is contained in:
commit
f985f42e93
@ -5,7 +5,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -52,8 +51,6 @@ var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation")
|
|||||||
var DefaultTipSetCacheSize = 8192
|
var DefaultTipSetCacheSize = 8192
|
||||||
var DefaultMsgMetaCacheSize = 2048
|
var DefaultMsgMetaCacheSize = 2048
|
||||||
|
|
||||||
var ErrNotifieeDone = errors.New("notifee is done and should be removed")
|
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
if s := os.Getenv("LOTUS_CHAIN_TIPSET_CACHE"); s != "" {
|
if s := os.Getenv("LOTUS_CHAIN_TIPSET_CACHE"); s != "" {
|
||||||
tscs, err := strconv.Atoi(s)
|
tscs, err := strconv.Atoi(s)
|
||||||
@ -361,33 +358,11 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
|
|||||||
apply[i], apply[opp] = apply[opp], apply[i]
|
apply[i], apply[opp] = apply[opp], apply[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
var toremove map[int]struct{}
|
for _, hcf := range notifees {
|
||||||
for i, hcf := range notifees {
|
if err := hcf(revert, apply); err != nil {
|
||||||
err := hcf(revert, apply)
|
log.Error("head change func errored (BAD): ", err)
|
||||||
if err != nil {
|
|
||||||
if err == ErrNotifieeDone {
|
|
||||||
if toremove == nil {
|
|
||||||
toremove = make(map[int]struct{})
|
|
||||||
}
|
|
||||||
toremove[i] = struct{}{}
|
|
||||||
} else {
|
|
||||||
log.Error("head change func errored (BAD): ", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(toremove) > 0 {
|
|
||||||
newNotifees := make([]ReorgNotifee, 0, len(notifees)-len(toremove))
|
|
||||||
for i, hcf := range notifees {
|
|
||||||
_, remove := toremove[i]
|
|
||||||
if remove {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
newNotifees = append(newNotifees, hcf)
|
|
||||||
}
|
|
||||||
notifees = newNotifees
|
|
||||||
}
|
|
||||||
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -34,7 +34,6 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/node"
|
"github.com/filecoin-project/lotus/node"
|
||||||
"github.com/filecoin-project/lotus/node/impl"
|
"github.com/filecoin-project/lotus/node/impl"
|
||||||
"github.com/filecoin-project/lotus/node/modules"
|
"github.com/filecoin-project/lotus/node/modules"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -234,7 +233,6 @@ func (tu *syncTestUtil) addSourceNode(gen int) {
|
|||||||
node.Repo(sourceRepo),
|
node.Repo(sourceRepo),
|
||||||
node.MockHost(tu.mn),
|
node.MockHost(tu.mn),
|
||||||
node.Test(),
|
node.Test(),
|
||||||
node.Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(true)),
|
|
||||||
|
|
||||||
node.Override(new(modules.Genesis), modules.LoadGenesis(genesis)),
|
node.Override(new(modules.Genesis), modules.LoadGenesis(genesis)),
|
||||||
)
|
)
|
||||||
@ -267,7 +265,6 @@ func (tu *syncTestUtil) addClientNode() int {
|
|||||||
node.Repo(repo.NewMemory(nil)),
|
node.Repo(repo.NewMemory(nil)),
|
||||||
node.MockHost(tu.mn),
|
node.MockHost(tu.mn),
|
||||||
node.Test(),
|
node.Test(),
|
||||||
node.Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(true)),
|
|
||||||
|
|
||||||
node.Override(new(modules.Genesis), modules.LoadGenesis(tu.genesis)),
|
node.Override(new(modules.Genesis), modules.LoadGenesis(tu.genesis)),
|
||||||
)
|
)
|
||||||
|
@ -1,8 +1,6 @@
|
|||||||
package modules
|
package modules
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
eventbus "github.com/libp2p/go-eventbus"
|
eventbus "github.com/libp2p/go-eventbus"
|
||||||
@ -24,7 +22,6 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/sub"
|
"github.com/filecoin-project/lotus/chain/sub"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
|
||||||
"github.com/filecoin-project/lotus/lib/peermgr"
|
"github.com/filecoin-project/lotus/lib/peermgr"
|
||||||
"github.com/filecoin-project/lotus/node/hello"
|
"github.com/filecoin-project/lotus/node/hello"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
@ -76,45 +73,14 @@ func RunBlockSync(h host.Host, svc *blocksync.BlockSyncService) {
|
|||||||
h.SetStreamHandler(blocksync.BlockSyncProtocolID, svc.HandleStream)
|
h.SetStreamHandler(blocksync.BlockSyncProtocolID, svc.HandleStream)
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) {
|
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) {
|
||||||
nearsync := uint64(epochs) * uint64(build.BlockDelaySecs) * uint64(time.Second) //nolint
|
|
||||||
|
|
||||||
// early check, are we synced at start up?
|
|
||||||
ts := stmgr.ChainStore().GetHeaviestTipSet()
|
|
||||||
timestamp := ts.MinTimestamp()
|
|
||||||
now := uint64(build.Clock.Now().UnixNano())
|
|
||||||
if timestamp > now-nearsync {
|
|
||||||
subscribe()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// we are not synced, subscribe to head changes and wait for sync
|
|
||||||
stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
|
|
||||||
if len(app) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
latest := app[0].MinTimestamp()
|
|
||||||
for _, ts := range app[1:] {
|
|
||||||
timestamp := ts.MinTimestamp()
|
|
||||||
if timestamp > latest {
|
|
||||||
latest = timestamp
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
now := uint64(build.Clock.Now().UnixNano())
|
|
||||||
if latest > now-nearsync {
|
|
||||||
subscribe()
|
|
||||||
return store.ErrNotifieeDone
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {
|
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
|
blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
v := sub.NewBlockValidator(
|
v := sub.NewBlockValidator(
|
||||||
h.ID(), chain, stmgr,
|
h.ID(), chain, stmgr,
|
||||||
func(p peer.ID) {
|
func(p peer.ID) {
|
||||||
@ -126,53 +92,24 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
subscribe := func() {
|
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
|
||||||
log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn))
|
|
||||||
|
|
||||||
blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
|
|
||||||
}
|
|
||||||
|
|
||||||
if bootstrapper {
|
|
||||||
subscribe()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait until we are synced within 10 blocks
|
|
||||||
waitForSync(stmgr, 10, subscribe)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) {
|
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) {
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
|
msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint:staticcheck
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
v := sub.NewMessageValidator(h.ID(), mpool)
|
v := sub.NewMessageValidator(h.ID(), mpool)
|
||||||
|
|
||||||
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
|
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
subscribe := func() {
|
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
|
||||||
log.Infof("subscribing to pubsub topic %s", build.MessagesTopic(nn))
|
|
||||||
|
|
||||||
msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
go sub.HandleIncomingMessages(ctx, mpool, msgsub)
|
|
||||||
}
|
|
||||||
|
|
||||||
if bootstrapper {
|
|
||||||
subscribe()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait until we are synced within 1 block
|
|
||||||
waitForSync(stmgr, 1, subscribe)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLocalDiscovery(ds dtypes.MetadataDS) *discovery.Local {
|
func NewLocalDiscovery(ds dtypes.MetadataDS) *discovery.Local {
|
||||||
|
@ -33,7 +33,6 @@ import (
|
|||||||
miner2 "github.com/filecoin-project/lotus/miner"
|
miner2 "github.com/filecoin-project/lotus/miner"
|
||||||
"github.com/filecoin-project/lotus/node"
|
"github.com/filecoin-project/lotus/node"
|
||||||
"github.com/filecoin-project/lotus/node/modules"
|
"github.com/filecoin-project/lotus/node/modules"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
|
||||||
testing2 "github.com/filecoin-project/lotus/node/modules/testing"
|
testing2 "github.com/filecoin-project/lotus/node/modules/testing"
|
||||||
"github.com/filecoin-project/lotus/node/repo"
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
"github.com/filecoin-project/lotus/storage/mockstorage"
|
"github.com/filecoin-project/lotus/storage/mockstorage"
|
||||||
@ -371,8 +370,6 @@ func MockSbBuilder(t *testing.T, nFull int, storage []test.StorageMiner) ([]test
|
|||||||
|
|
||||||
node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
|
node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
|
||||||
|
|
||||||
node.Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(true)),
|
|
||||||
|
|
||||||
genesis,
|
genesis,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user