ethrpc: Buffer sub messages if subscriber is slow

This commit is contained in:
Łukasz Magiera 2023-03-15 18:34:10 +01:00
parent a44f598cc2
commit 470dd8da3d
3 changed files with 96 additions and 29 deletions

2
go.mod
View File

@ -304,6 +304,7 @@ require (
github.com/rivo/uniseg v0.1.0 // indirect github.com/rivo/uniseg v0.1.0 // indirect
github.com/rs/cors v1.7.0 // indirect github.com/rs/cors v1.7.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/segmentio/fasthash v1.0.3 // indirect
github.com/shirou/gopsutil v2.18.12+incompatible // indirect github.com/shirou/gopsutil v2.18.12+incompatible // indirect
github.com/sirupsen/logrus v1.9.0 // indirect github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect
@ -319,6 +320,7 @@ require (
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/zondax/hid v0.9.1 // indirect github.com/zondax/hid v0.9.1 // indirect
github.com/zondax/ledger-go v0.12.1 // indirect github.com/zondax/ledger-go v0.12.1 // indirect
github.com/zyedidia/generic v1.2.1 // indirect
go.opentelemetry.io/otel/metric v0.33.0 // indirect go.opentelemetry.io/otel/metric v0.33.0 // indirect
go.opentelemetry.io/otel/sdk/metric v0.33.0 // indirect go.opentelemetry.io/otel/sdk/metric v0.33.0 // indirect
go.opentelemetry.io/otel/trace v1.12.0 // indirect go.opentelemetry.io/otel/trace v1.12.0 // indirect

4
go.sum
View File

@ -1516,6 +1516,8 @@ github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd/go.mod h1:hPqNNc0
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM=
github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY=
github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8= github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil v2.18.12+incompatible h1:1eaJvGomDnH74/5cF4CTmTbLHAriGFsTZppLXDX93OM= github.com/shirou/gopsutil v2.18.12+incompatible h1:1eaJvGomDnH74/5cF4CTmTbLHAriGFsTZppLXDX93OM=
@ -1705,6 +1707,8 @@ github.com/zondax/hid v0.9.1 h1:gQe66rtmyZ8VeGFcOpbuH3r7erYtNEAezCAYu8LdkJo=
github.com/zondax/hid v0.9.1/go.mod h1:l5wttcP0jwtdLjqjMMWFVEE7d1zO0jvSPA9OPZxWpEM= github.com/zondax/hid v0.9.1/go.mod h1:l5wttcP0jwtdLjqjMMWFVEE7d1zO0jvSPA9OPZxWpEM=
github.com/zondax/ledger-go v0.12.1 h1:hYRcyznPRJp+5mzF2sazTLP2nGvGjYDD2VzhHhFomLU= github.com/zondax/ledger-go v0.12.1 h1:hYRcyznPRJp+5mzF2sazTLP2nGvGjYDD2VzhHhFomLU=
github.com/zondax/ledger-go v0.12.1/go.mod h1:KatxXrVDzgWwbssUWsF5+cOJHXPvzQ09YSlzGNuhOEo= github.com/zondax/ledger-go v0.12.1/go.mod h1:KatxXrVDzgWwbssUWsF5+cOJHXPvzQ09YSlzGNuhOEo=
github.com/zyedidia/generic v1.2.1 h1:Zv5KS/N2m0XZZiuLS82qheRG4X1o5gsWreGb0hR7XDc=
github.com/zyedidia/generic v1.2.1/go.mod h1:ly2RBz4mnz1yeuVbQA/VFwGjK3mnHGRj1JuoG336Bis=
go.dedis.ch/fixbuf v1.0.3 h1:hGcV9Cd/znUxlusJ64eAlExS+5cJDIyTyEG+otu5wQs= go.dedis.ch/fixbuf v1.0.3 h1:hGcV9Cd/znUxlusJ64eAlExS+5cJDIyTyEG+otu5wQs=
go.dedis.ch/fixbuf v1.0.3/go.mod h1:yzJMt34Wa5xD37V5RTdmp38cz3QhMagdGoem9anUalw= go.dedis.ch/fixbuf v1.0.3/go.mod h1:yzJMt34Wa5xD37V5RTdmp38cz3QhMagdGoem9anUalw=
go.dedis.ch/kyber/v3 v3.0.4/go.mod h1:OzvaEnPvKlyrWyp3kGXlFdp7ap1VC6RkZDTaPikqhsQ= go.dedis.ch/kyber/v3 v3.0.4/go.mod h1:OzvaEnPvKlyrWyp3kGXlFdp7ap1VC6RkZDTaPikqhsQ=

View File

@ -6,6 +6,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/zyedidia/generic/queue"
"sort" "sort"
"strconv" "strconv"
"sync" "sync"
@ -1352,7 +1353,7 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethty
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks") return ethtypes.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks")
} }
sub, err := e.SubManager.StartSubscription(e.SubscribtionCtx, ethCb.EthSubscription) sub, err := e.SubManager.StartSubscription(e.SubscribtionCtx, ethCb.EthSubscription, e.uninstallFilter)
if err != nil { if err != nil {
return ethtypes.EthSubscriptionID{}, err return ethtypes.EthSubscriptionID{}, err
} }
@ -1418,18 +1419,11 @@ func (e *EthEvent) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscripti
return false, api.ErrNotSupported return false, api.ErrNotSupported
} }
filters, err := e.SubManager.StopSubscription(ctx, id) err := e.SubManager.StopSubscription(ctx, id)
if err != nil { if err != nil {
return false, nil return false, nil
} }
for _, f := range filters {
if err := e.uninstallFilter(ctx, f); err != nil {
// this will leave the filter a zombie, collecting events up to the maximum allowed
log.Warnf("failed to remove filter when unsubscribing: %v", err)
}
}
return true, nil return true, nil
} }
@ -1615,7 +1609,7 @@ type EthSubscriptionManager struct {
subs map[ethtypes.EthSubscriptionID]*ethSubscription subs map[ethtypes.EthSubscriptionID]*ethSubscription
} }
func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethSubscriptionCallback) (*ethSubscription, error) { // nolint func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethSubscriptionCallback, dropFilter func(context.Context, filter.Filter) error) (*ethSubscription, error) { // nolint
rawid, err := uuid.NewRandom() rawid, err := uuid.NewRandom()
if err != nil { if err != nil {
return nil, xerrors.Errorf("new uuid: %w", err) return nil, xerrors.Errorf("new uuid: %w", err)
@ -1626,13 +1620,17 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethS
ctx, quit := context.WithCancel(ctx) ctx, quit := context.WithCancel(ctx)
sub := &ethSubscription{ sub := &ethSubscription{
Chain: e.Chain, Chain: e.Chain,
StateAPI: e.StateAPI, StateAPI: e.StateAPI,
ChainAPI: e.ChainAPI, ChainAPI: e.ChainAPI,
id: id, uninstallFilter: dropFilter,
in: make(chan interface{}, 200), id: id,
out: out, in: make(chan interface{}, 200),
quit: quit, out: out,
quit: quit,
toSend: queue.New[[]byte](),
sendCond: make(chan struct{}, 1),
} }
e.mu.Lock() e.mu.Lock()
@ -1643,37 +1641,46 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethS
e.mu.Unlock() e.mu.Unlock()
go sub.start(ctx) go sub.start(ctx)
go sub.startOut(ctx)
return sub, nil return sub, nil
} }
func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id ethtypes.EthSubscriptionID) ([]filter.Filter, error) { func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id ethtypes.EthSubscriptionID) error {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() defer e.mu.Unlock()
sub, ok := e.subs[id] sub, ok := e.subs[id]
if !ok { if !ok {
return nil, xerrors.Errorf("subscription not found") return xerrors.Errorf("subscription not found")
} }
sub.stop() sub.stop()
delete(e.subs, id) delete(e.subs, id)
return sub.filters, nil return nil
} }
type ethSubscriptionCallback func(context.Context, jsonrpc.RawParams) error type ethSubscriptionCallback func(context.Context, jsonrpc.RawParams) error
const maxSendQueue = 20000
type ethSubscription struct { type ethSubscription struct {
Chain *store.ChainStore Chain *store.ChainStore
StateAPI StateAPI StateAPI StateAPI
ChainAPI ChainAPI ChainAPI ChainAPI
id ethtypes.EthSubscriptionID uninstallFilter func(context.Context, filter.Filter) error
in chan interface{} id ethtypes.EthSubscriptionID
out ethSubscriptionCallback in chan interface{}
out ethSubscriptionCallback
mu sync.Mutex mu sync.Mutex
filters []filter.Filter filters []filter.Filter
quit func() quit func()
sendLk sync.Mutex
sendQueueLen int
toSend *queue.Queue[[]byte]
sendCond chan struct{}
} }
func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) { func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) {
@ -1684,6 +1691,36 @@ func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) {
e.filters = append(e.filters, f) e.filters = append(e.filters, f)
} }
// sendOut processes the final subscription queue. It's here in case the subscriber
// is slow, and we need to buffer the messages.
func (e *ethSubscription) startOut(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-e.sendCond:
e.sendLk.Lock()
for !e.toSend.Empty() {
front := e.toSend.Dequeue()
e.sendQueueLen--
e.sendLk.Unlock()
if err := e.out(ctx, front); err != nil {
log.Warnw("error sending subscription response, killing subscription", "sub", e.id, "error", err)
e.stop()
return
}
e.sendLk.Lock()
}
e.sendLk.Unlock()
}
}
}
func (e *ethSubscription) send(ctx context.Context, v interface{}) { func (e *ethSubscription) send(ctx context.Context, v interface{}) {
resp := ethtypes.EthSubscriptionResponse{ resp := ethtypes.EthSubscriptionResponse{
SubscriptionID: e.id, SubscriptionID: e.id,
@ -1696,10 +1733,22 @@ func (e *ethSubscription) send(ctx context.Context, v interface{}) {
return return
} }
if err := e.out(ctx, outParam); err != nil { e.sendLk.Lock()
log.Warnw("sending subscription response", "sub", e.id, "error", err) defer e.sendLk.Unlock()
e.toSend.Enqueue(outParam)
e.sendQueueLen++
if e.sendQueueLen > maxSendQueue {
log.Warnw("subscription send queue full, killing subscription", "sub", e.id)
e.stop()
return return
} }
select {
case e.sendCond <- struct{}{}:
default: // already signalled, and we're holding the lock so we know that the event will be processed
}
} }
func (e *ethSubscription) start(ctx context.Context) { func (e *ethSubscription) start(ctx context.Context) {
@ -1743,11 +1792,23 @@ func (e *ethSubscription) start(ctx context.Context) {
func (e *ethSubscription) stop() { func (e *ethSubscription) stop() {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() if e.quit == nil {
e.mu.Unlock()
return
}
if e.quit != nil { if e.quit != nil {
e.quit() e.quit()
e.quit = nil e.quit = nil
e.mu.Unlock()
for _, f := range e.filters {
// note: the context in actually unused in uninstallFilter
if err := e.uninstallFilter(context.TODO(), f); err != nil {
// this will leave the filter a zombie, collecting events up to the maximum allowed
log.Warnf("failed to remove filter when unsubscribing: %v", err)
}
}
} }
} }