2023-01-16 14:28:55 +00:00
|
|
|
package gateway
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
"github.com/filecoin-project/go-jsonrpc"
|
|
|
|
|
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
|
|
"github.com/filecoin-project/lotus/chain/types/ethtypes"
|
|
|
|
)
|
|
|
|
|
|
|
|
type EthSubHandler struct {
|
|
|
|
queued map[ethtypes.EthSubscriptionID][]ethtypes.EthSubscriptionResponse
|
|
|
|
sinks map[ethtypes.EthSubscriptionID]func(context.Context, *ethtypes.EthSubscriptionResponse) error
|
|
|
|
|
|
|
|
lk sync.Mutex
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewEthSubHandler() *EthSubHandler {
|
|
|
|
return &EthSubHandler{
|
|
|
|
queued: make(map[ethtypes.EthSubscriptionID][]ethtypes.EthSubscriptionResponse),
|
|
|
|
sinks: make(map[ethtypes.EthSubscriptionID]func(context.Context, *ethtypes.EthSubscriptionResponse) error),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-01-26 14:20:49 +00:00
|
|
|
func (e *EthSubHandler) AddSub(ctx context.Context, id ethtypes.EthSubscriptionID, sink func(context.Context, *ethtypes.EthSubscriptionResponse) error) error {
|
2023-01-16 14:28:55 +00:00
|
|
|
e.lk.Lock()
|
|
|
|
defer e.lk.Unlock()
|
|
|
|
|
|
|
|
for _, p := range e.queued[id] {
|
2023-01-26 14:20:49 +00:00
|
|
|
p := p // copy
|
2023-01-16 14:28:55 +00:00
|
|
|
if err := sink(ctx, &p); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
delete(e.queued, id)
|
|
|
|
e.sinks[id] = sink
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2023-01-26 14:20:49 +00:00
|
|
|
func (e *EthSubHandler) RemoveSub(id ethtypes.EthSubscriptionID) {
|
2023-01-16 14:28:55 +00:00
|
|
|
e.lk.Lock()
|
|
|
|
defer e.lk.Unlock()
|
|
|
|
|
|
|
|
delete(e.sinks, id)
|
|
|
|
delete(e.queued, id)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (e *EthSubHandler) EthSubscription(ctx context.Context, r jsonrpc.RawParams) error {
|
|
|
|
p, err := jsonrpc.DecodeParams[ethtypes.EthSubscriptionResponse](r)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
e.lk.Lock()
|
|
|
|
|
|
|
|
sink := e.sinks[p.SubscriptionID]
|
|
|
|
|
|
|
|
if sink == nil {
|
|
|
|
e.queued[p.SubscriptionID] = append(e.queued[p.SubscriptionID], p)
|
|
|
|
e.lk.Unlock()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
e.lk.Unlock()
|
|
|
|
|
|
|
|
return sink(ctx, &p) // todo track errors and auto-unsubscribe on rpc conn close?
|
|
|
|
}
|
|
|
|
|
|
|
|
var _ api.EthSubscriber = (*EthSubHandler)(nil)
|