From a44f598cc21ab7610db227080b9faf7d612c14b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 15 Mar 2023 17:19:32 +0100 Subject: [PATCH 1/3] update go-jsonrpc to v0.2.3 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 00b4cdabc..ebb33ffd4 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 github.com/filecoin-project/go-fil-markets v1.27.0-rc1 - github.com/filecoin-project/go-jsonrpc v0.2.2 + github.com/filecoin-project/go-jsonrpc v0.2.3 github.com/filecoin-project/go-padreader v0.0.1 github.com/filecoin-project/go-paramfetch v0.0.4 github.com/filecoin-project/go-state-types v0.10.0 diff --git a/go.sum b/go.sum index b655bcc91..7d0d05dd7 100644 --- a/go.sum +++ b/go.sum @@ -332,8 +332,8 @@ github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0/go.mod h1:7aWZdaQ1b16BVoQUYR+ github.com/filecoin-project/go-hamt-ipld/v3 v3.0.1/go.mod h1:gXpNmr3oQx8l3o7qkGyDjJjYSRX7hp/FGOStdqrWyDI= github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0 h1:rVVNq0x6RGQIzCo1iiJlGFm9AGIZzeifggxtKMU7zmI= github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0/go.mod h1:bxmzgT8tmeVQA1/gvBwFmYdT8SOFUwB3ovSUfG1Ux0g= -github.com/filecoin-project/go-jsonrpc v0.2.2 h1:yo7Ga5qaSFfAukjyI6pdFBxzUVbQoHjKdYMpf2vMvh4= -github.com/filecoin-project/go-jsonrpc v0.2.2/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM= +github.com/filecoin-project/go-jsonrpc v0.2.3 h1:xdixRQfLbD8gt56dpBNKiya9gvI/79nNM13IHKTfm5E= +github.com/filecoin-project/go-jsonrpc v0.2.3/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM= github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak= github.com/filecoin-project/go-padreader v0.0.1 h1:8h2tVy5HpoNbr2gBRr+WD6zV6VD6XHig+ynSGJg8ZOs= github.com/filecoin-project/go-padreader v0.0.1/go.mod h1:VYVPJqwpsfmtoHnAmPx6MUwmrK6HIcDqZJiuZhtmfLQ= From 470dd8da3dfdc8fe4de9e5c838894efcb28be0db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 15 Mar 2023 18:34:10 +0100 Subject: [PATCH 2/3] ethrpc: Buffer sub messages if subscriber is slow --- go.mod | 2 + go.sum | 4 ++ node/impl/full/eth.go | 119 ++++++++++++++++++++++++++++++++---------- 3 files changed, 96 insertions(+), 29 deletions(-) diff --git a/go.mod b/go.mod index ebb33ffd4..48c33cfde 100644 --- a/go.mod +++ b/go.mod @@ -304,6 +304,7 @@ require ( github.com/rivo/uniseg v0.1.0 // indirect github.com/rs/cors v1.7.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/segmentio/fasthash v1.0.3 // indirect github.com/shirou/gopsutil v2.18.12+incompatible // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect @@ -319,6 +320,7 @@ require ( github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/zondax/hid v0.9.1 // indirect github.com/zondax/ledger-go v0.12.1 // indirect + github.com/zyedidia/generic v1.2.1 // indirect go.opentelemetry.io/otel/metric v0.33.0 // indirect go.opentelemetry.io/otel/sdk/metric v0.33.0 // indirect go.opentelemetry.io/otel/trace v1.12.0 // indirect diff --git a/go.sum b/go.sum index 7d0d05dd7..94927236b 100644 --- a/go.sum +++ b/go.sum @@ -1516,6 +1516,8 @@ github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd/go.mod h1:hPqNNc0 github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= +github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM= +github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY= github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shirou/gopsutil v2.18.12+incompatible h1:1eaJvGomDnH74/5cF4CTmTbLHAriGFsTZppLXDX93OM= @@ -1705,6 +1707,8 @@ github.com/zondax/hid v0.9.1 h1:gQe66rtmyZ8VeGFcOpbuH3r7erYtNEAezCAYu8LdkJo= github.com/zondax/hid v0.9.1/go.mod h1:l5wttcP0jwtdLjqjMMWFVEE7d1zO0jvSPA9OPZxWpEM= github.com/zondax/ledger-go v0.12.1 h1:hYRcyznPRJp+5mzF2sazTLP2nGvGjYDD2VzhHhFomLU= github.com/zondax/ledger-go v0.12.1/go.mod h1:KatxXrVDzgWwbssUWsF5+cOJHXPvzQ09YSlzGNuhOEo= +github.com/zyedidia/generic v1.2.1 h1:Zv5KS/N2m0XZZiuLS82qheRG4X1o5gsWreGb0hR7XDc= +github.com/zyedidia/generic v1.2.1/go.mod h1:ly2RBz4mnz1yeuVbQA/VFwGjK3mnHGRj1JuoG336Bis= go.dedis.ch/fixbuf v1.0.3 h1:hGcV9Cd/znUxlusJ64eAlExS+5cJDIyTyEG+otu5wQs= go.dedis.ch/fixbuf v1.0.3/go.mod h1:yzJMt34Wa5xD37V5RTdmp38cz3QhMagdGoem9anUalw= go.dedis.ch/kyber/v3 v3.0.4/go.mod h1:OzvaEnPvKlyrWyp3kGXlFdp7ap1VC6RkZDTaPikqhsQ= diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index b4a5c75d6..91a20c27e 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/zyedidia/generic/queue" "sort" "strconv" "sync" @@ -1352,7 +1353,7 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethty return ethtypes.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks") } - sub, err := e.SubManager.StartSubscription(e.SubscribtionCtx, ethCb.EthSubscription) + sub, err := e.SubManager.StartSubscription(e.SubscribtionCtx, ethCb.EthSubscription, e.uninstallFilter) if err != nil { return ethtypes.EthSubscriptionID{}, err } @@ -1418,18 +1419,11 @@ func (e *EthEvent) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscripti return false, api.ErrNotSupported } - filters, err := e.SubManager.StopSubscription(ctx, id) + err := e.SubManager.StopSubscription(ctx, id) if err != nil { return false, nil } - for _, f := range filters { - if err := e.uninstallFilter(ctx, f); err != nil { - // this will leave the filter a zombie, collecting events up to the maximum allowed - log.Warnf("failed to remove filter when unsubscribing: %v", err) - } - } - return true, nil } @@ -1615,7 +1609,7 @@ type EthSubscriptionManager struct { subs map[ethtypes.EthSubscriptionID]*ethSubscription } -func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethSubscriptionCallback) (*ethSubscription, error) { // nolint +func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethSubscriptionCallback, dropFilter func(context.Context, filter.Filter) error) (*ethSubscription, error) { // nolint rawid, err := uuid.NewRandom() if err != nil { return nil, xerrors.Errorf("new uuid: %w", err) @@ -1626,13 +1620,17 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethS ctx, quit := context.WithCancel(ctx) sub := ðSubscription{ - Chain: e.Chain, - StateAPI: e.StateAPI, - ChainAPI: e.ChainAPI, - id: id, - in: make(chan interface{}, 200), - out: out, - quit: quit, + Chain: e.Chain, + StateAPI: e.StateAPI, + ChainAPI: e.ChainAPI, + uninstallFilter: dropFilter, + id: id, + in: make(chan interface{}, 200), + out: out, + quit: quit, + + toSend: queue.New[[]byte](), + sendCond: make(chan struct{}, 1), } e.mu.Lock() @@ -1643,37 +1641,46 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethS e.mu.Unlock() go sub.start(ctx) + go sub.startOut(ctx) return sub, nil } -func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id ethtypes.EthSubscriptionID) ([]filter.Filter, error) { +func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id ethtypes.EthSubscriptionID) error { e.mu.Lock() defer e.mu.Unlock() sub, ok := e.subs[id] if !ok { - return nil, xerrors.Errorf("subscription not found") + return xerrors.Errorf("subscription not found") } sub.stop() delete(e.subs, id) - return sub.filters, nil + return nil } type ethSubscriptionCallback func(context.Context, jsonrpc.RawParams) error +const maxSendQueue = 20000 + type ethSubscription struct { - Chain *store.ChainStore - StateAPI StateAPI - ChainAPI ChainAPI - id ethtypes.EthSubscriptionID - in chan interface{} - out ethSubscriptionCallback + Chain *store.ChainStore + StateAPI StateAPI + ChainAPI ChainAPI + uninstallFilter func(context.Context, filter.Filter) error + id ethtypes.EthSubscriptionID + in chan interface{} + out ethSubscriptionCallback mu sync.Mutex filters []filter.Filter quit func() + + sendLk sync.Mutex + sendQueueLen int + toSend *queue.Queue[[]byte] + sendCond chan struct{} } func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) { @@ -1684,6 +1691,36 @@ func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) { e.filters = append(e.filters, f) } +// sendOut processes the final subscription queue. It's here in case the subscriber +// is slow, and we need to buffer the messages. +func (e *ethSubscription) startOut(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-e.sendCond: + e.sendLk.Lock() + + for !e.toSend.Empty() { + front := e.toSend.Dequeue() + e.sendQueueLen-- + + e.sendLk.Unlock() + + if err := e.out(ctx, front); err != nil { + log.Warnw("error sending subscription response, killing subscription", "sub", e.id, "error", err) + e.stop() + return + } + + e.sendLk.Lock() + } + + e.sendLk.Unlock() + } + } +} + func (e *ethSubscription) send(ctx context.Context, v interface{}) { resp := ethtypes.EthSubscriptionResponse{ SubscriptionID: e.id, @@ -1696,10 +1733,22 @@ func (e *ethSubscription) send(ctx context.Context, v interface{}) { return } - if err := e.out(ctx, outParam); err != nil { - log.Warnw("sending subscription response", "sub", e.id, "error", err) + e.sendLk.Lock() + defer e.sendLk.Unlock() + + e.toSend.Enqueue(outParam) + + e.sendQueueLen++ + if e.sendQueueLen > maxSendQueue { + log.Warnw("subscription send queue full, killing subscription", "sub", e.id) + e.stop() return } + + select { + case e.sendCond <- struct{}{}: + default: // already signalled, and we're holding the lock so we know that the event will be processed + } } func (e *ethSubscription) start(ctx context.Context) { @@ -1743,11 +1792,23 @@ func (e *ethSubscription) start(ctx context.Context) { func (e *ethSubscription) stop() { e.mu.Lock() - defer e.mu.Unlock() + if e.quit == nil { + e.mu.Unlock() + return + } if e.quit != nil { e.quit() e.quit = nil + e.mu.Unlock() + + for _, f := range e.filters { + // note: the context in actually unused in uninstallFilter + if err := e.uninstallFilter(context.TODO(), f); err != nil { + // this will leave the filter a zombie, collecting events up to the maximum allowed + log.Warnf("failed to remove filter when unsubscribing: %v", err) + } + } } } From 720e95991c19140037e2270e624451c5636a5124 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 15 Mar 2023 18:51:46 +0100 Subject: [PATCH 3/3] make lint happy --- go.mod | 3 +-- go.sum | 2 -- node/impl/full/eth.go | 2 +- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index 48c33cfde..27d488458 100644 --- a/go.mod +++ b/go.mod @@ -147,6 +147,7 @@ require ( github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 github.com/xeipuuv/gojsonschema v1.2.0 github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542 + github.com/zyedidia/generic v1.2.1 go.opencensus.io v0.24.0 go.opentelemetry.io/otel v1.12.0 go.opentelemetry.io/otel/bridge/opencensus v0.33.0 @@ -304,7 +305,6 @@ require ( github.com/rivo/uniseg v0.1.0 // indirect github.com/rs/cors v1.7.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect - github.com/segmentio/fasthash v1.0.3 // indirect github.com/shirou/gopsutil v2.18.12+incompatible // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect @@ -320,7 +320,6 @@ require ( github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/zondax/hid v0.9.1 // indirect github.com/zondax/ledger-go v0.12.1 // indirect - github.com/zyedidia/generic v1.2.1 // indirect go.opentelemetry.io/otel/metric v0.33.0 // indirect go.opentelemetry.io/otel/sdk/metric v0.33.0 // indirect go.opentelemetry.io/otel/trace v1.12.0 // indirect diff --git a/go.sum b/go.sum index 94927236b..5e20e850f 100644 --- a/go.sum +++ b/go.sum @@ -1516,8 +1516,6 @@ github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd/go.mod h1:hPqNNc0 github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= -github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM= -github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY= github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shirou/gopsutil v2.18.12+incompatible h1:1eaJvGomDnH74/5cF4CTmTbLHAriGFsTZppLXDX93OM= diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index 91a20c27e..64592a4c6 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/zyedidia/generic/queue" "sort" "strconv" "sync" @@ -15,6 +14,7 @@ import ( "github.com/google/uuid" "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" + "github.com/zyedidia/generic/queue" "go.uber.org/fx" "golang.org/x/xerrors"