fix: close chain head subscription when the reader is slow

The reader can just re-subscribe when they're ready to catch up. This
prevents a slow reader from bogging down the entire system.
This commit is contained in:
Steven Allen 2021-08-02 14:12:00 -07:00
parent 14754f1b18
commit 43bbde1e6b
3 changed files with 70 additions and 11 deletions

View File

@ -293,27 +293,36 @@ func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan []*api.HeadChange
}}
go func() {
defer close(out)
var unsubOnce sync.Once
defer func() {
// Tell the caller we're done first, the following may block for a bit.
close(out)
// Unsubscribe.
cs.bestTips.Unsub(subch)
// Drain the channel.
for range subch {
}
}()
for {
select {
case val, ok := <-subch:
if !ok {
log.Warn("chain head sub exit loop")
// Shutting down.
return
}
select {
case out <- val.([]*api.HeadChange):
default:
log.Errorf("closing head change subscription due to slow reader")
return
}
if len(out) > 5 {
log.Warnf("head change sub is slow, has %d buffered entries", len(out))
}
select {
case out <- val.([]*api.HeadChange):
case <-ctx.Done():
}
case <-ctx.Done():
unsubOnce.Do(func() {
go cs.bestTips.Unsub(subch)
})
return
}
}
}()

View File

@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
logging "github.com/ipfs/go-log/v2"
)
func TestAPI(t *testing.T) {
@ -39,6 +40,7 @@ func runAPITest(t *testing.T, opts ...interface{}) {
t.Run("testConnectTwo", ts.testConnectTwo)
t.Run("testMining", ts.testMining)
t.Run("testMiningReal", ts.testMiningReal)
t.Run("testSlowNotify", ts.testSlowNotify)
t.Run("testSearchMsg", ts.testSearchMsg)
t.Run("testNonGenesisMiner", ts.testNonGenesisMiner)
}
@ -169,6 +171,51 @@ func (ts *apiSuite) testMiningReal(t *testing.T) {
ts.testMining(t)
}
func (ts *apiSuite) testSlowNotify(t *testing.T) {
_ = logging.SetLogLevel("rpc", "ERROR")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
full, miner, _ := kit.EnsembleMinimal(t, ts.opts...)
// Subscribe a bunch of times to make sure we fill up any RPC buffers.
var newHeadsChans []<-chan []*lapi.HeadChange
for i := 0; i < 100; i++ {
newHeads, err := full.ChainNotify(ctx)
require.NoError(t, err)
newHeadsChans = append(newHeadsChans, newHeads)
}
initHead := (<-newHeadsChans[0])[0]
baseHeight := initHead.Val.Height()
bm := kit.NewBlockMiner(t, miner)
bm.MineBlocks(ctx, time.Microsecond)
full.WaitTillChain(ctx, kit.HeightAtLeast(baseHeight+100))
// Make sure they were all closed.
for _, ch := range newHeadsChans {
var ok bool
for ok {
select {
case _, ok = <-ch:
default:
t.Fatal("expected new heads channel to be closed")
}
}
}
// Make sure we can resubscribe and everything still works.
newHeads, err := full.ChainNotify(ctx)
require.NoError(t, err)
for i := 0; i < 10; i++ {
_, ok := <-newHeads
require.True(t, ok, "notify channel closed")
}
}
func (ts *apiSuite) testNonGenesisMiner(t *testing.T) {
ctx := context.Background()

View File

@ -139,7 +139,10 @@ func GetTips(ctx context.Context, api v0api.FullNode, lastHeight abi.ChainEpoch,
for {
select {
case changes := <-notif:
case changes, ok := <-notif:
if !ok {
return
}
for _, change := range changes {
log.Infow("Head event", "height", change.Val.Height(), "type", change.Type)