Merge pull request #10485 from filecoin-project/fix/ethsub-hang
fix: ethrpc: Don't lock up when eth subscriber goes away
This commit is contained in:
commit
4c80e179ae
3
go.mod
3
go.mod
@ -40,7 +40,7 @@ require (
|
||||
github.com/filecoin-project/go-fil-commcid v0.1.0
|
||||
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
|
||||
github.com/filecoin-project/go-fil-markets v1.27.0-rc1
|
||||
github.com/filecoin-project/go-jsonrpc v0.2.2
|
||||
github.com/filecoin-project/go-jsonrpc v0.2.3
|
||||
github.com/filecoin-project/go-padreader v0.0.1
|
||||
github.com/filecoin-project/go-paramfetch v0.0.4
|
||||
github.com/filecoin-project/go-state-types v0.10.0
|
||||
@ -147,6 +147,7 @@ require (
|
||||
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
|
||||
github.com/xeipuuv/gojsonschema v1.2.0
|
||||
github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542
|
||||
github.com/zyedidia/generic v1.2.1
|
||||
go.opencensus.io v0.24.0
|
||||
go.opentelemetry.io/otel v1.12.0
|
||||
go.opentelemetry.io/otel/bridge/opencensus v0.33.0
|
||||
|
6
go.sum
6
go.sum
@ -332,8 +332,8 @@ github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0/go.mod h1:7aWZdaQ1b16BVoQUYR+
|
||||
github.com/filecoin-project/go-hamt-ipld/v3 v3.0.1/go.mod h1:gXpNmr3oQx8l3o7qkGyDjJjYSRX7hp/FGOStdqrWyDI=
|
||||
github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0 h1:rVVNq0x6RGQIzCo1iiJlGFm9AGIZzeifggxtKMU7zmI=
|
||||
github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0/go.mod h1:bxmzgT8tmeVQA1/gvBwFmYdT8SOFUwB3ovSUfG1Ux0g=
|
||||
github.com/filecoin-project/go-jsonrpc v0.2.2 h1:yo7Ga5qaSFfAukjyI6pdFBxzUVbQoHjKdYMpf2vMvh4=
|
||||
github.com/filecoin-project/go-jsonrpc v0.2.2/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM=
|
||||
github.com/filecoin-project/go-jsonrpc v0.2.3 h1:xdixRQfLbD8gt56dpBNKiya9gvI/79nNM13IHKTfm5E=
|
||||
github.com/filecoin-project/go-jsonrpc v0.2.3/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM=
|
||||
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak=
|
||||
github.com/filecoin-project/go-padreader v0.0.1 h1:8h2tVy5HpoNbr2gBRr+WD6zV6VD6XHig+ynSGJg8ZOs=
|
||||
github.com/filecoin-project/go-padreader v0.0.1/go.mod h1:VYVPJqwpsfmtoHnAmPx6MUwmrK6HIcDqZJiuZhtmfLQ=
|
||||
@ -1705,6 +1705,8 @@ github.com/zondax/hid v0.9.1 h1:gQe66rtmyZ8VeGFcOpbuH3r7erYtNEAezCAYu8LdkJo=
|
||||
github.com/zondax/hid v0.9.1/go.mod h1:l5wttcP0jwtdLjqjMMWFVEE7d1zO0jvSPA9OPZxWpEM=
|
||||
github.com/zondax/ledger-go v0.12.1 h1:hYRcyznPRJp+5mzF2sazTLP2nGvGjYDD2VzhHhFomLU=
|
||||
github.com/zondax/ledger-go v0.12.1/go.mod h1:KatxXrVDzgWwbssUWsF5+cOJHXPvzQ09YSlzGNuhOEo=
|
||||
github.com/zyedidia/generic v1.2.1 h1:Zv5KS/N2m0XZZiuLS82qheRG4X1o5gsWreGb0hR7XDc=
|
||||
github.com/zyedidia/generic v1.2.1/go.mod h1:ly2RBz4mnz1yeuVbQA/VFwGjK3mnHGRj1JuoG336Bis=
|
||||
go.dedis.ch/fixbuf v1.0.3 h1:hGcV9Cd/znUxlusJ64eAlExS+5cJDIyTyEG+otu5wQs=
|
||||
go.dedis.ch/fixbuf v1.0.3/go.mod h1:yzJMt34Wa5xD37V5RTdmp38cz3QhMagdGoem9anUalw=
|
||||
go.dedis.ch/kyber/v3 v3.0.4/go.mod h1:OzvaEnPvKlyrWyp3kGXlFdp7ap1VC6RkZDTaPikqhsQ=
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"github.com/google/uuid"
|
||||
"github.com/ipfs/go-cid"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
"github.com/zyedidia/generic/queue"
|
||||
"go.uber.org/fx"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
@ -1352,7 +1353,7 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethty
|
||||
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks")
|
||||
}
|
||||
|
||||
sub, err := e.SubManager.StartSubscription(e.SubscribtionCtx, ethCb.EthSubscription)
|
||||
sub, err := e.SubManager.StartSubscription(e.SubscribtionCtx, ethCb.EthSubscription, e.uninstallFilter)
|
||||
if err != nil {
|
||||
return ethtypes.EthSubscriptionID{}, err
|
||||
}
|
||||
@ -1418,18 +1419,11 @@ func (e *EthEvent) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscripti
|
||||
return false, api.ErrNotSupported
|
||||
}
|
||||
|
||||
filters, err := e.SubManager.StopSubscription(ctx, id)
|
||||
err := e.SubManager.StopSubscription(ctx, id)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
for _, f := range filters {
|
||||
if err := e.uninstallFilter(ctx, f); err != nil {
|
||||
// this will leave the filter a zombie, collecting events up to the maximum allowed
|
||||
log.Warnf("failed to remove filter when unsubscribing: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
@ -1615,7 +1609,7 @@ type EthSubscriptionManager struct {
|
||||
subs map[ethtypes.EthSubscriptionID]*ethSubscription
|
||||
}
|
||||
|
||||
func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethSubscriptionCallback) (*ethSubscription, error) { // nolint
|
||||
func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethSubscriptionCallback, dropFilter func(context.Context, filter.Filter) error) (*ethSubscription, error) { // nolint
|
||||
rawid, err := uuid.NewRandom()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("new uuid: %w", err)
|
||||
@ -1629,10 +1623,14 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethS
|
||||
Chain: e.Chain,
|
||||
StateAPI: e.StateAPI,
|
||||
ChainAPI: e.ChainAPI,
|
||||
uninstallFilter: dropFilter,
|
||||
id: id,
|
||||
in: make(chan interface{}, 200),
|
||||
out: out,
|
||||
quit: quit,
|
||||
|
||||
toSend: queue.New[[]byte](),
|
||||
sendCond: make(chan struct{}, 1),
|
||||
}
|
||||
|
||||
e.mu.Lock()
|
||||
@ -1643,30 +1641,34 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethS
|
||||
e.mu.Unlock()
|
||||
|
||||
go sub.start(ctx)
|
||||
go sub.startOut(ctx)
|
||||
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id ethtypes.EthSubscriptionID) ([]filter.Filter, error) {
|
||||
func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id ethtypes.EthSubscriptionID) error {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
sub, ok := e.subs[id]
|
||||
if !ok {
|
||||
return nil, xerrors.Errorf("subscription not found")
|
||||
return xerrors.Errorf("subscription not found")
|
||||
}
|
||||
sub.stop()
|
||||
delete(e.subs, id)
|
||||
|
||||
return sub.filters, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
type ethSubscriptionCallback func(context.Context, jsonrpc.RawParams) error
|
||||
|
||||
const maxSendQueue = 20000
|
||||
|
||||
type ethSubscription struct {
|
||||
Chain *store.ChainStore
|
||||
StateAPI StateAPI
|
||||
ChainAPI ChainAPI
|
||||
uninstallFilter func(context.Context, filter.Filter) error
|
||||
id ethtypes.EthSubscriptionID
|
||||
in chan interface{}
|
||||
out ethSubscriptionCallback
|
||||
@ -1674,6 +1676,11 @@ type ethSubscription struct {
|
||||
mu sync.Mutex
|
||||
filters []filter.Filter
|
||||
quit func()
|
||||
|
||||
sendLk sync.Mutex
|
||||
sendQueueLen int
|
||||
toSend *queue.Queue[[]byte]
|
||||
sendCond chan struct{}
|
||||
}
|
||||
|
||||
func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) {
|
||||
@ -1684,6 +1691,36 @@ func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) {
|
||||
e.filters = append(e.filters, f)
|
||||
}
|
||||
|
||||
// sendOut processes the final subscription queue. It's here in case the subscriber
|
||||
// is slow, and we need to buffer the messages.
|
||||
func (e *ethSubscription) startOut(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-e.sendCond:
|
||||
e.sendLk.Lock()
|
||||
|
||||
for !e.toSend.Empty() {
|
||||
front := e.toSend.Dequeue()
|
||||
e.sendQueueLen--
|
||||
|
||||
e.sendLk.Unlock()
|
||||
|
||||
if err := e.out(ctx, front); err != nil {
|
||||
log.Warnw("error sending subscription response, killing subscription", "sub", e.id, "error", err)
|
||||
e.stop()
|
||||
return
|
||||
}
|
||||
|
||||
e.sendLk.Lock()
|
||||
}
|
||||
|
||||
e.sendLk.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *ethSubscription) send(ctx context.Context, v interface{}) {
|
||||
resp := ethtypes.EthSubscriptionResponse{
|
||||
SubscriptionID: e.id,
|
||||
@ -1696,10 +1733,22 @@ func (e *ethSubscription) send(ctx context.Context, v interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := e.out(ctx, outParam); err != nil {
|
||||
log.Warnw("sending subscription response", "sub", e.id, "error", err)
|
||||
e.sendLk.Lock()
|
||||
defer e.sendLk.Unlock()
|
||||
|
||||
e.toSend.Enqueue(outParam)
|
||||
|
||||
e.sendQueueLen++
|
||||
if e.sendQueueLen > maxSendQueue {
|
||||
log.Warnw("subscription send queue full, killing subscription", "sub", e.id)
|
||||
e.stop()
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case e.sendCond <- struct{}{}:
|
||||
default: // already signalled, and we're holding the lock so we know that the event will be processed
|
||||
}
|
||||
}
|
||||
|
||||
func (e *ethSubscription) start(ctx context.Context) {
|
||||
@ -1743,11 +1792,23 @@ func (e *ethSubscription) start(ctx context.Context) {
|
||||
|
||||
func (e *ethSubscription) stop() {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
if e.quit == nil {
|
||||
e.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
if e.quit != nil {
|
||||
e.quit()
|
||||
e.quit = nil
|
||||
e.mu.Unlock()
|
||||
|
||||
for _, f := range e.filters {
|
||||
// note: the context in actually unused in uninstallFilter
|
||||
if err := e.uninstallFilter(context.TODO(), f); err != nil {
|
||||
// this will leave the filter a zombie, collecting events up to the maximum allowed
|
||||
log.Warnf("failed to remove filter when unsubscribing: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user