lotus/gateway/eth_sub.go

72 lines
1.7 KiB
Go
Raw Permalink Normal View History

2023-01-16 14:28:55 +00:00
package gateway
import (
"context"
"sync"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
)
type EthSubHandler struct {
queued map[ethtypes.EthSubscriptionID][]ethtypes.EthSubscriptionResponse
sinks map[ethtypes.EthSubscriptionID]func(context.Context, *ethtypes.EthSubscriptionResponse) error
lk sync.Mutex
}
func NewEthSubHandler() *EthSubHandler {
return &EthSubHandler{
queued: make(map[ethtypes.EthSubscriptionID][]ethtypes.EthSubscriptionResponse),
sinks: make(map[ethtypes.EthSubscriptionID]func(context.Context, *ethtypes.EthSubscriptionResponse) error),
}
}
2023-01-26 14:20:49 +00:00
func (e *EthSubHandler) AddSub(ctx context.Context, id ethtypes.EthSubscriptionID, sink func(context.Context, *ethtypes.EthSubscriptionResponse) error) error {
2023-01-16 14:28:55 +00:00
e.lk.Lock()
defer e.lk.Unlock()
for _, p := range e.queued[id] {
2023-01-26 14:20:49 +00:00
p := p // copy
2023-01-16 14:28:55 +00:00
if err := sink(ctx, &p); err != nil {
return err
}
}
delete(e.queued, id)
e.sinks[id] = sink
return nil
}
2023-01-26 14:20:49 +00:00
func (e *EthSubHandler) RemoveSub(id ethtypes.EthSubscriptionID) {
2023-01-16 14:28:55 +00:00
e.lk.Lock()
defer e.lk.Unlock()
delete(e.sinks, id)
delete(e.queued, id)
}
func (e *EthSubHandler) EthSubscription(ctx context.Context, r jsonrpc.RawParams) error {
p, err := jsonrpc.DecodeParams[ethtypes.EthSubscriptionResponse](r)
if err != nil {
return err
}
e.lk.Lock()
sink := e.sinks[p.SubscriptionID]
if sink == nil {
e.queued[p.SubscriptionID] = append(e.queued[p.SubscriptionID], p)
e.lk.Unlock()
return nil
}
e.lk.Unlock()
return sink(ctx, &p) // todo track errors and auto-unsubscribe on rpc conn close?
}
var _ api.EthSubscriber = (*EthSubHandler)(nil)