diff --git a/chain/store/store.go b/chain/store/store.go index a8e378e64..1c90b7e0c 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -293,27 +293,36 @@ func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan []*api.HeadChange }} go func() { - defer close(out) - var unsubOnce sync.Once + defer func() { + // Tell the caller we're done first, the following may block for a bit. + close(out) + + // Unsubscribe. + cs.bestTips.Unsub(subch) + + // Drain the channel. + for range subch { + } + }() for { select { case val, ok := <-subch: if !ok { - log.Warn("chain head sub exit loop") + // Shutting down. + return + } + select { + case out <- val.([]*api.HeadChange): + default: + log.Errorf("closing head change subscription due to slow reader") return } if len(out) > 5 { log.Warnf("head change sub is slow, has %d buffered entries", len(out)) } - select { - case out <- val.([]*api.HeadChange): - case <-ctx.Done(): - } case <-ctx.Done(): - unsubOnce.Do(func() { - go cs.bestTips.Unsub(subch) - }) + return } } }() diff --git a/itests/api_test.go b/itests/api_test.go index 01e006fed..9a21c9dfc 100644 --- a/itests/api_test.go +++ b/itests/api_test.go @@ -15,6 +15,7 @@ import ( "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/itests/kit" + logging "github.com/ipfs/go-log/v2" ) func TestAPI(t *testing.T) { @@ -39,6 +40,7 @@ func runAPITest(t *testing.T, opts ...interface{}) { t.Run("testConnectTwo", ts.testConnectTwo) t.Run("testMining", ts.testMining) t.Run("testMiningReal", ts.testMiningReal) + t.Run("testSlowNotify", ts.testSlowNotify) t.Run("testSearchMsg", ts.testSearchMsg) t.Run("testNonGenesisMiner", ts.testNonGenesisMiner) } @@ -169,6 +171,51 @@ func (ts *apiSuite) testMiningReal(t *testing.T) { ts.testMining(t) } +func (ts *apiSuite) testSlowNotify(t *testing.T) { + _ = logging.SetLogLevel("rpc", "ERROR") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + full, miner, _ := kit.EnsembleMinimal(t, ts.opts...) + + // Subscribe a bunch of times to make sure we fill up any RPC buffers. + var newHeadsChans []<-chan []*lapi.HeadChange + for i := 0; i < 100; i++ { + newHeads, err := full.ChainNotify(ctx) + require.NoError(t, err) + newHeadsChans = append(newHeadsChans, newHeads) + } + + initHead := (<-newHeadsChans[0])[0] + baseHeight := initHead.Val.Height() + + bm := kit.NewBlockMiner(t, miner) + bm.MineBlocks(ctx, time.Microsecond) + + full.WaitTillChain(ctx, kit.HeightAtLeast(baseHeight+100)) + + // Make sure they were all closed. + for _, ch := range newHeadsChans { + var ok bool + for ok { + select { + case _, ok = <-ch: + default: + t.Fatal("expected new heads channel to be closed") + } + } + } + + // Make sure we can resubscribe and everything still works. + newHeads, err := full.ChainNotify(ctx) + require.NoError(t, err) + for i := 0; i < 10; i++ { + _, ok := <-newHeads + require.True(t, ok, "notify channel closed") + } +} + func (ts *apiSuite) testNonGenesisMiner(t *testing.T) { ctx := context.Background() diff --git a/tools/stats/rpc.go b/tools/stats/rpc.go index 0aa3d141e..4e503cb39 100644 --- a/tools/stats/rpc.go +++ b/tools/stats/rpc.go @@ -139,7 +139,10 @@ func GetTips(ctx context.Context, api v0api.FullNode, lastHeight abi.ChainEpoch, for { select { - case changes := <-notif: + case changes, ok := <-notif: + if !ok { + return + } for _, change := range changes { log.Infow("Head event", "height", change.Val.Height(), "type", change.Type)