diff --git a/go.mod b/go.mod index 00b4cdabc..27d488458 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 github.com/filecoin-project/go-fil-markets v1.27.0-rc1 - github.com/filecoin-project/go-jsonrpc v0.2.2 + github.com/filecoin-project/go-jsonrpc v0.2.3 github.com/filecoin-project/go-padreader v0.0.1 github.com/filecoin-project/go-paramfetch v0.0.4 github.com/filecoin-project/go-state-types v0.10.0 @@ -147,6 +147,7 @@ require ( github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 github.com/xeipuuv/gojsonschema v1.2.0 github.com/xorcare/golden v0.6.1-0.20191112154924-b87f686d7542 + github.com/zyedidia/generic v1.2.1 go.opencensus.io v0.24.0 go.opentelemetry.io/otel v1.12.0 go.opentelemetry.io/otel/bridge/opencensus v0.33.0 diff --git a/go.sum b/go.sum index b655bcc91..5e20e850f 100644 --- a/go.sum +++ b/go.sum @@ -332,8 +332,8 @@ github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0/go.mod h1:7aWZdaQ1b16BVoQUYR+ github.com/filecoin-project/go-hamt-ipld/v3 v3.0.1/go.mod h1:gXpNmr3oQx8l3o7qkGyDjJjYSRX7hp/FGOStdqrWyDI= github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0 h1:rVVNq0x6RGQIzCo1iiJlGFm9AGIZzeifggxtKMU7zmI= github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0/go.mod h1:bxmzgT8tmeVQA1/gvBwFmYdT8SOFUwB3ovSUfG1Ux0g= -github.com/filecoin-project/go-jsonrpc v0.2.2 h1:yo7Ga5qaSFfAukjyI6pdFBxzUVbQoHjKdYMpf2vMvh4= -github.com/filecoin-project/go-jsonrpc v0.2.2/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM= +github.com/filecoin-project/go-jsonrpc v0.2.3 h1:xdixRQfLbD8gt56dpBNKiya9gvI/79nNM13IHKTfm5E= +github.com/filecoin-project/go-jsonrpc v0.2.3/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM= github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak= github.com/filecoin-project/go-padreader v0.0.1 h1:8h2tVy5HpoNbr2gBRr+WD6zV6VD6XHig+ynSGJg8ZOs= github.com/filecoin-project/go-padreader v0.0.1/go.mod h1:VYVPJqwpsfmtoHnAmPx6MUwmrK6HIcDqZJiuZhtmfLQ= @@ -1705,6 +1705,8 @@ github.com/zondax/hid v0.9.1 h1:gQe66rtmyZ8VeGFcOpbuH3r7erYtNEAezCAYu8LdkJo= github.com/zondax/hid v0.9.1/go.mod h1:l5wttcP0jwtdLjqjMMWFVEE7d1zO0jvSPA9OPZxWpEM= github.com/zondax/ledger-go v0.12.1 h1:hYRcyznPRJp+5mzF2sazTLP2nGvGjYDD2VzhHhFomLU= github.com/zondax/ledger-go v0.12.1/go.mod h1:KatxXrVDzgWwbssUWsF5+cOJHXPvzQ09YSlzGNuhOEo= +github.com/zyedidia/generic v1.2.1 h1:Zv5KS/N2m0XZZiuLS82qheRG4X1o5gsWreGb0hR7XDc= +github.com/zyedidia/generic v1.2.1/go.mod h1:ly2RBz4mnz1yeuVbQA/VFwGjK3mnHGRj1JuoG336Bis= go.dedis.ch/fixbuf v1.0.3 h1:hGcV9Cd/znUxlusJ64eAlExS+5cJDIyTyEG+otu5wQs= go.dedis.ch/fixbuf v1.0.3/go.mod h1:yzJMt34Wa5xD37V5RTdmp38cz3QhMagdGoem9anUalw= go.dedis.ch/kyber/v3 v3.0.4/go.mod h1:OzvaEnPvKlyrWyp3kGXlFdp7ap1VC6RkZDTaPikqhsQ= diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index b4a5c75d6..64592a4c6 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -14,6 +14,7 @@ import ( "github.com/google/uuid" "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" + "github.com/zyedidia/generic/queue" "go.uber.org/fx" "golang.org/x/xerrors" @@ -1352,7 +1353,7 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethty return ethtypes.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks") } - sub, err := e.SubManager.StartSubscription(e.SubscribtionCtx, ethCb.EthSubscription) + sub, err := e.SubManager.StartSubscription(e.SubscribtionCtx, ethCb.EthSubscription, e.uninstallFilter) if err != nil { return ethtypes.EthSubscriptionID{}, err } @@ -1418,18 +1419,11 @@ func (e *EthEvent) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscripti return false, api.ErrNotSupported } - filters, err := e.SubManager.StopSubscription(ctx, id) + err := e.SubManager.StopSubscription(ctx, id) if err != nil { return false, nil } - for _, f := range filters { - if err := e.uninstallFilter(ctx, f); err != nil { - // this will leave the filter a zombie, collecting events up to the maximum allowed - log.Warnf("failed to remove filter when unsubscribing: %v", err) - } - } - return true, nil } @@ -1615,7 +1609,7 @@ type EthSubscriptionManager struct { subs map[ethtypes.EthSubscriptionID]*ethSubscription } -func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethSubscriptionCallback) (*ethSubscription, error) { // nolint +func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethSubscriptionCallback, dropFilter func(context.Context, filter.Filter) error) (*ethSubscription, error) { // nolint rawid, err := uuid.NewRandom() if err != nil { return nil, xerrors.Errorf("new uuid: %w", err) @@ -1626,13 +1620,17 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethS ctx, quit := context.WithCancel(ctx) sub := ðSubscription{ - Chain: e.Chain, - StateAPI: e.StateAPI, - ChainAPI: e.ChainAPI, - id: id, - in: make(chan interface{}, 200), - out: out, - quit: quit, + Chain: e.Chain, + StateAPI: e.StateAPI, + ChainAPI: e.ChainAPI, + uninstallFilter: dropFilter, + id: id, + in: make(chan interface{}, 200), + out: out, + quit: quit, + + toSend: queue.New[[]byte](), + sendCond: make(chan struct{}, 1), } e.mu.Lock() @@ -1643,37 +1641,46 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context, out ethS e.mu.Unlock() go sub.start(ctx) + go sub.startOut(ctx) return sub, nil } -func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id ethtypes.EthSubscriptionID) ([]filter.Filter, error) { +func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id ethtypes.EthSubscriptionID) error { e.mu.Lock() defer e.mu.Unlock() sub, ok := e.subs[id] if !ok { - return nil, xerrors.Errorf("subscription not found") + return xerrors.Errorf("subscription not found") } sub.stop() delete(e.subs, id) - return sub.filters, nil + return nil } type ethSubscriptionCallback func(context.Context, jsonrpc.RawParams) error +const maxSendQueue = 20000 + type ethSubscription struct { - Chain *store.ChainStore - StateAPI StateAPI - ChainAPI ChainAPI - id ethtypes.EthSubscriptionID - in chan interface{} - out ethSubscriptionCallback + Chain *store.ChainStore + StateAPI StateAPI + ChainAPI ChainAPI + uninstallFilter func(context.Context, filter.Filter) error + id ethtypes.EthSubscriptionID + in chan interface{} + out ethSubscriptionCallback mu sync.Mutex filters []filter.Filter quit func() + + sendLk sync.Mutex + sendQueueLen int + toSend *queue.Queue[[]byte] + sendCond chan struct{} } func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) { @@ -1684,6 +1691,36 @@ func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) { e.filters = append(e.filters, f) } +// sendOut processes the final subscription queue. It's here in case the subscriber +// is slow, and we need to buffer the messages. +func (e *ethSubscription) startOut(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-e.sendCond: + e.sendLk.Lock() + + for !e.toSend.Empty() { + front := e.toSend.Dequeue() + e.sendQueueLen-- + + e.sendLk.Unlock() + + if err := e.out(ctx, front); err != nil { + log.Warnw("error sending subscription response, killing subscription", "sub", e.id, "error", err) + e.stop() + return + } + + e.sendLk.Lock() + } + + e.sendLk.Unlock() + } + } +} + func (e *ethSubscription) send(ctx context.Context, v interface{}) { resp := ethtypes.EthSubscriptionResponse{ SubscriptionID: e.id, @@ -1696,10 +1733,22 @@ func (e *ethSubscription) send(ctx context.Context, v interface{}) { return } - if err := e.out(ctx, outParam); err != nil { - log.Warnw("sending subscription response", "sub", e.id, "error", err) + e.sendLk.Lock() + defer e.sendLk.Unlock() + + e.toSend.Enqueue(outParam) + + e.sendQueueLen++ + if e.sendQueueLen > maxSendQueue { + log.Warnw("subscription send queue full, killing subscription", "sub", e.id) + e.stop() return } + + select { + case e.sendCond <- struct{}{}: + default: // already signalled, and we're holding the lock so we know that the event will be processed + } } func (e *ethSubscription) start(ctx context.Context) { @@ -1743,11 +1792,23 @@ func (e *ethSubscription) start(ctx context.Context) { func (e *ethSubscription) stop() { e.mu.Lock() - defer e.mu.Unlock() + if e.quit == nil { + e.mu.Unlock() + return + } if e.quit != nil { e.quit() e.quit = nil + e.mu.Unlock() + + for _, f := range e.filters { + // note: the context in actually unused in uninstallFilter + if err := e.uninstallFilter(context.TODO(), f); err != nil { + // this will leave the filter a zombie, collecting events up to the maximum allowed + log.Warnf("failed to remove filter when unsubscribing: %v", err) + } + } } }