diff --git a/go.mod b/go.mod index ebb33ffd4..48c33cfde 100644 --- a/go.mod +++ b/go.mod @@ -304,6 +304,7 @@ require ( github.com/rivo/uniseg v0.1.0 // indirect github.com/rs/cors v1.7.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/segmentio/fasthash v1.0.3 // indirect github.com/shirou/gopsutil v2.18.12+incompatible // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect @@ -319,6 +320,7 @@ require ( github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/zondax/hid v0.9.1 // indirect github.com/zondax/ledger-go v0.12.1 // indirect + github.com/zyedidia/generic v1.2.1 // indirect go.opentelemetry.io/otel/metric v0.33.0 // indirect go.opentelemetry.io/otel/sdk/metric v0.33.0 // indirect go.opentelemetry.io/otel/trace v1.12.0 // indirect diff --git a/go.sum b/go.sum index 7d0d05dd7..94927236b 100644 --- a/go.sum +++ b/go.sum @@ -1516,6 +1516,8 @@ github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd/go.mod h1:hPqNNc0 github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= +github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM= +github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY= github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shirou/gopsutil v2.18.12+incompatible h1:1eaJvGomDnH74/5cF4CTmTbLHAriGFsTZppLXDX93OM= @@ -1705,6 +1707,8 @@ github.com/zondax/hid v0.9.1 h1:gQe66rtmyZ8VeGFcOpbuH3r7erYtNEAezCAYu8LdkJo= github.com/zondax/hid v0.9.1/go.mod h1:l5wttcP0jwtdLjqjMMWFVEE7d1zO0jvSPA9OPZxWpEM= github.com/zondax/ledger-go v0.12.1 h1:hYRcyznPRJp+5mzF2sazTLP2nGvGjYDD2VzhHhFomLU= github.com/zondax/ledger-go v0.12.1/go.mod h1:KatxXrVDzgWwbssUWsF5+cOJHXPvzQ09YSlzGNuhOEo= +github.com/zyedidia/generic v1.2.1 h1:Zv5KS/N2m0XZZiuLS82qheRG4X1o5gsWreGb0hR7XDc= +github.com/zyedidia/generic v1.2.1/go.mod h1:ly2RBz4mnz1yeuVbQA/VFwGjK3mnHGRj1JuoG336Bis= go.dedis.ch/fixbuf v1.0.3 h1:hGcV9Cd/znUxlusJ64eAlExS+5cJDIyTyEG+otu5wQs= go.dedis.ch/fixbuf v1.0.3/go.mod h1:yzJMt34Wa5xD37V5RTdmp38cz3QhMagdGoem9anUalw= go.dedis.ch/kyber/v3 v3.0.4/go.mod h1:OzvaEnPvKlyrWyp3kGXlFdp7ap1VC6RkZDTaPikqhsQ= diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index b4a5c75d6..91a20c27e 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/zyedidia/generic/queue" "sort" "strconv" "sync" @@ -1352,7 +1353,7 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethty return ethtypes.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks") } - sub, err := e.SubManager.StartSubscription(e.SubscribtionCtx, ethCb.EthSubscription) + sub, err := e.SubManager.StartSubscription(e.SubscribtionCtx, ethCb.EthSubscription, e.uninstallFilter) if err != nil { return ethtypes.EthSubscriptionID{}, err } @@ -1418,18 +1419,11 @@ func (e *EthEvent) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscripti return false, api.ErrNotSupported } - filters, err := e.SubManager.StopSubscription(ctx, id) + err := e.SubManager.StopSubscription(ctx, id) if err != nil { return false, nil } - for _, f := range filters { - if err := e.uninstallFilter(ctx, f); err != nil { - // this will leave the filter a zombie, collecting events up to the maximum allowed - log.Warnf("failed to remove filter when unsubscribing: %v", err) - } - } - return true, nil } @@ -1615,7 +1609,7 @@ type EthSubscriptionManager struct { subs map[ethtypes.EthSubscriptionID]*ethSubscription } -func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethSubscriptionCallback) (*ethSubscription, error) { // nolint +func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethSubscriptionCallback, dropFilter func(context.Context, filter.Filter) error) (*ethSubscription, error) { // nolint rawid, err := uuid.NewRandom() if err != nil { return nil, xerrors.Errorf("new uuid: %w", err) @@ -1626,13 +1620,17 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethS ctx, quit := context.WithCancel(ctx) sub := ðSubscription{ - Chain: e.Chain, - StateAPI: e.StateAPI, - ChainAPI: e.ChainAPI, - id: id, - in: make(chan interface{}, 200), - out: out, - quit: quit, + Chain: e.Chain, + StateAPI: e.StateAPI, + ChainAPI: e.ChainAPI, + uninstallFilter: dropFilter, + id: id, + in: make(chan interface{}, 200), + out: out, + quit: quit, + + toSend: queue.New[[]byte](), + sendCond: make(chan struct{}, 1), } e.mu.Lock() @@ -1643,37 +1641,46 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethS e.mu.Unlock() go sub.start(ctx) + go sub.startOut(ctx) return sub, nil } -func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id ethtypes.EthSubscriptionID) ([]filter.Filter, error) { +func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id ethtypes.EthSubscriptionID) error { e.mu.Lock() defer e.mu.Unlock() sub, ok := e.subs[id] if !ok { - return nil, xerrors.Errorf("subscription not found") + return xerrors.Errorf("subscription not found") } sub.stop() delete(e.subs, id) - return sub.filters, nil + return nil } type ethSubscriptionCallback func(context.Context, jsonrpc.RawParams) error +const maxSendQueue = 20000 + type ethSubscription struct { - Chain *store.ChainStore - StateAPI StateAPI - ChainAPI ChainAPI - id ethtypes.EthSubscriptionID - in chan interface{} - out ethSubscriptionCallback + Chain *store.ChainStore + StateAPI StateAPI + ChainAPI ChainAPI + uninstallFilter func(context.Context, filter.Filter) error + id ethtypes.EthSubscriptionID + in chan interface{} + out ethSubscriptionCallback mu sync.Mutex filters []filter.Filter quit func() + + sendLk sync.Mutex + sendQueueLen int + toSend *queue.Queue[[]byte] + sendCond chan struct{} } func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) { @@ -1684,6 +1691,36 @@ func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) { e.filters = append(e.filters, f) } +// sendOut processes the final subscription queue. It's here in case the subscriber +// is slow, and we need to buffer the messages. +func (e *ethSubscription) startOut(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-e.sendCond: + e.sendLk.Lock() + + for !e.toSend.Empty() { + front := e.toSend.Dequeue() + e.sendQueueLen-- + + e.sendLk.Unlock() + + if err := e.out(ctx, front); err != nil { + log.Warnw("error sending subscription response, killing subscription", "sub", e.id, "error", err) + e.stop() + return + } + + e.sendLk.Lock() + } + + e.sendLk.Unlock() + } + } +} + func (e *ethSubscription) send(ctx context.Context, v interface{}) { resp := ethtypes.EthSubscriptionResponse{ SubscriptionID: e.id, @@ -1696,10 +1733,22 @@ func (e *ethSubscription) send(ctx context.Context, v interface{}) { return } - if err := e.out(ctx, outParam); err != nil { - log.Warnw("sending subscription response", "sub", e.id, "error", err) + e.sendLk.Lock() + defer e.sendLk.Unlock() + + e.toSend.Enqueue(outParam) + + e.sendQueueLen++ + if e.sendQueueLen > maxSendQueue { + log.Warnw("subscription send queue full, killing subscription", "sub", e.id) + e.stop() return } + + select { + case e.sendCond <- struct{}{}: + default: // already signalled, and we're holding the lock so we know that the event will be processed + } } func (e *ethSubscription) start(ctx context.Context) { @@ -1743,11 +1792,23 @@ func (e *ethSubscription) start(ctx context.Context) { func (e *ethSubscription) stop() { e.mu.Lock() - defer e.mu.Unlock() + if e.quit == nil { + e.mu.Unlock() + return + } if e.quit != nil { e.quit() e.quit = nil + e.mu.Unlock() + + for _, f := range e.filters { + // note: the context in actually unused in uninstallFilter + if err := e.uninstallFilter(context.TODO(), f); err != nil { + // this will leave the filter a zombie, collecting events up to the maximum allowed + log.Warnf("failed to remove filter when unsubscribing: %v", err) + } + } } }