Use EthHash compatible type for subscription and filter IDs
This commit is contained in:
parent
ce91e4261d
commit
56259c40fc
@ -378,8 +378,11 @@ func init() {
|
||||
ethFeeHistoryReward := [][]api.EthBigInt{}
|
||||
addExample(ðFeeHistoryReward)
|
||||
|
||||
addExample(api.EthFilterID("c5564560217c43e4bc0484df655e9019"))
|
||||
addExample(api.EthSubscriptionID("b62df77831484129adf6682332ad0725"))
|
||||
filterid, _ := api.EthHashFromHex("0x5CbEeC012345673f25E309Cc264f240bb0664031")
|
||||
addExample(api.EthFilterID(filterid))
|
||||
|
||||
subid, _ := api.EthHashFromHex("0x5CbEeCF99d3fDB301234567c264f240bb0664031")
|
||||
addExample(api.EthSubscriptionID(subid))
|
||||
|
||||
pstring := func(s string) *string { return &s }
|
||||
addExample(&api.EthFilterSpec{
|
||||
|
@ -418,10 +418,10 @@ type EthFeeHistory struct {
|
||||
}
|
||||
|
||||
// An opaque identifier generated by the Lotus node to refer to an installed filter.
|
||||
type EthFilterID string
|
||||
type EthFilterID EthHash
|
||||
|
||||
// An opaque identifier generated by the Lotus node to refer to an active subscription.
|
||||
type EthSubscriptionID string
|
||||
type EthSubscriptionID EthHash
|
||||
|
||||
type EthFilterSpec struct {
|
||||
// Interpreted as an epoch or one of "latest" for last mined block, "earliest" for first,
|
||||
|
@ -176,7 +176,7 @@ func TestEthFilterResultMarshalJSON(t *testing.T) {
|
||||
BlockHash: hash2,
|
||||
BlockNumber: 53,
|
||||
Topics: []EthBytes{hash1[:]},
|
||||
Data: hash1[:],
|
||||
Data: []EthBytes{hash1[:]},
|
||||
Address: addr,
|
||||
}
|
||||
logjson, err := json.Marshal(log)
|
||||
|
@ -7,7 +7,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/ipfs/go-cid"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
"golang.org/x/xerrors"
|
||||
@ -24,7 +23,7 @@ import (
|
||||
const indexed uint8 = 0x01
|
||||
|
||||
type EventFilter struct {
|
||||
id string
|
||||
id FilterID
|
||||
minHeight abi.ChainEpoch // minimum epoch to apply filter or -1 if no minimum
|
||||
maxHeight abi.ChainEpoch // maximum epoch to apply filter or -1 if no maximum
|
||||
tipsetCid cid.Cid
|
||||
@ -51,7 +50,7 @@ type CollectedEvent struct {
|
||||
MsgCid cid.Cid // cid of message that produced event
|
||||
}
|
||||
|
||||
func (f *EventFilter) ID() string {
|
||||
func (f *EventFilter) ID() FilterID {
|
||||
return f.id
|
||||
}
|
||||
|
||||
@ -290,7 +289,7 @@ type EventFilterManager struct {
|
||||
EventIndex *EventIndex
|
||||
|
||||
mu sync.Mutex // guards mutations to filters
|
||||
filters map[string]*EventFilter
|
||||
filters map[FilterID]*EventFilter
|
||||
currentHeight abi.ChainEpoch
|
||||
}
|
||||
|
||||
@ -365,13 +364,13 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
|
||||
return nil, xerrors.Errorf("historic event index disabled")
|
||||
}
|
||||
|
||||
id, err := uuid.NewRandom()
|
||||
id, err := newFilterID()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("new uuid: %w", err)
|
||||
return nil, xerrors.Errorf("new filter id: %w", err)
|
||||
}
|
||||
|
||||
f := &EventFilter{
|
||||
id: id.String(),
|
||||
id: id,
|
||||
minHeight: minHeight,
|
||||
maxHeight: maxHeight,
|
||||
tipsetCid: tipsetCid,
|
||||
@ -389,15 +388,15 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
|
||||
|
||||
m.mu.Lock()
|
||||
if m.filters == nil {
|
||||
m.filters = make(map[string]*EventFilter)
|
||||
m.filters = make(map[FilterID]*EventFilter)
|
||||
}
|
||||
m.filters[id.String()] = f
|
||||
m.filters[id] = f
|
||||
m.mu.Unlock()
|
||||
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (m *EventFilterManager) Remove(ctx context.Context, id string) error {
|
||||
func (m *EventFilterManager) Remove(ctx context.Context, id FilterID) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if _, found := m.filters[id]; !found {
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
@ -14,7 +13,7 @@ import (
|
||||
)
|
||||
|
||||
type MemPoolFilter struct {
|
||||
id string
|
||||
id FilterID
|
||||
maxResults int // maximum number of results to collect, 0 is unlimited
|
||||
ch chan<- interface{}
|
||||
|
||||
@ -25,7 +24,7 @@ type MemPoolFilter struct {
|
||||
|
||||
var _ Filter = (*MemPoolFilter)(nil)
|
||||
|
||||
func (f *MemPoolFilter) ID() string {
|
||||
func (f *MemPoolFilter) ID() FilterID {
|
||||
return f.id
|
||||
}
|
||||
|
||||
@ -79,7 +78,7 @@ type MemPoolFilterManager struct {
|
||||
MaxFilterResults int
|
||||
|
||||
mu sync.Mutex // guards mutations to filters
|
||||
filters map[string]*MemPoolFilter
|
||||
filters map[FilterID]*MemPoolFilter
|
||||
}
|
||||
|
||||
func (m *MemPoolFilterManager) WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate) {
|
||||
@ -113,27 +112,27 @@ func (m *MemPoolFilterManager) processUpdate(ctx context.Context, u api.MpoolUpd
|
||||
}
|
||||
|
||||
func (m *MemPoolFilterManager) Install(ctx context.Context) (*MemPoolFilter, error) {
|
||||
id, err := uuid.NewRandom()
|
||||
id, err := newFilterID()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("new uuid: %w", err)
|
||||
return nil, xerrors.Errorf("new filter id: %w", err)
|
||||
}
|
||||
|
||||
f := &MemPoolFilter{
|
||||
id: id.String(),
|
||||
id: id,
|
||||
maxResults: m.MaxFilterResults,
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
if m.filters == nil {
|
||||
m.filters = make(map[string]*MemPoolFilter)
|
||||
m.filters = make(map[FilterID]*MemPoolFilter)
|
||||
}
|
||||
m.filters[id.String()] = f
|
||||
m.filters[id] = f
|
||||
m.mu.Unlock()
|
||||
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (m *MemPoolFilterManager) Remove(ctx context.Context, id string) error {
|
||||
func (m *MemPoolFilterManager) Remove(ctx context.Context, id FilterID) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if _, found := m.filters[id]; !found {
|
||||
|
@ -5,10 +5,13 @@ import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
type Filter interface {
|
||||
ID() string
|
||||
ID() FilterID
|
||||
LastTaken() time.Time
|
||||
SetSubChannel(chan<- interface{})
|
||||
ClearSubChannel()
|
||||
@ -16,8 +19,8 @@ type Filter interface {
|
||||
|
||||
type FilterStore interface {
|
||||
Add(context.Context, Filter) error
|
||||
Get(context.Context, string) (Filter, error)
|
||||
Remove(context.Context, string) error
|
||||
Get(context.Context, FilterID) (Filter, error)
|
||||
Remove(context.Context, FilterID) error
|
||||
NotTakenSince(when time.Time) []Filter // returns a list of filters that have not had their collected results taken
|
||||
}
|
||||
|
||||
@ -27,10 +30,22 @@ var (
|
||||
ErrMaximumNumberOfFilters = errors.New("maximum number of filters registered")
|
||||
)
|
||||
|
||||
type FilterID [32]byte // compatible with EthHash
|
||||
|
||||
func newFilterID() (FilterID, error) {
|
||||
rawid, err := uuid.NewRandom()
|
||||
if err != nil {
|
||||
return FilterID{}, xerrors.Errorf("new uuid: %w", err)
|
||||
}
|
||||
id := FilterID{}
|
||||
copy(id[:], rawid[:]) // uuid is 16 bytes
|
||||
return id, nil
|
||||
}
|
||||
|
||||
type memFilterStore struct {
|
||||
max int
|
||||
mu sync.Mutex
|
||||
filters map[string]Filter
|
||||
filters map[FilterID]Filter
|
||||
}
|
||||
|
||||
var _ FilterStore = (*memFilterStore)(nil)
|
||||
@ -38,7 +53,7 @@ var _ FilterStore = (*memFilterStore)(nil)
|
||||
func NewMemFilterStore(maxFilters int) FilterStore {
|
||||
return &memFilterStore{
|
||||
max: maxFilters,
|
||||
filters: make(map[string]Filter),
|
||||
filters: make(map[FilterID]Filter),
|
||||
}
|
||||
}
|
||||
|
||||
@ -57,7 +72,7 @@ func (m *memFilterStore) Add(_ context.Context, f Filter) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *memFilterStore) Get(_ context.Context, id string) (Filter, error) {
|
||||
func (m *memFilterStore) Get(_ context.Context, id FilterID) (Filter, error) {
|
||||
m.mu.Lock()
|
||||
f, found := m.filters[id]
|
||||
m.mu.Unlock()
|
||||
@ -67,7 +82,7 @@ func (m *memFilterStore) Get(_ context.Context, id string) (Filter, error) {
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (m *memFilterStore) Remove(_ context.Context, id string) error {
|
||||
func (m *memFilterStore) Remove(_ context.Context, id FilterID) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
|
@ -5,14 +5,13 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
type TipSetFilter struct {
|
||||
id string
|
||||
id FilterID
|
||||
maxResults int // maximum number of results to collect, 0 is unlimited
|
||||
ch chan<- interface{}
|
||||
|
||||
@ -23,7 +22,7 @@ type TipSetFilter struct {
|
||||
|
||||
var _ Filter = (*TipSetFilter)(nil)
|
||||
|
||||
func (f *TipSetFilter) ID() string {
|
||||
func (f *TipSetFilter) ID() FilterID {
|
||||
return f.id
|
||||
}
|
||||
|
||||
@ -77,7 +76,7 @@ type TipSetFilterManager struct {
|
||||
MaxFilterResults int
|
||||
|
||||
mu sync.Mutex // guards mutations to filters
|
||||
filters map[string]*TipSetFilter
|
||||
filters map[FilterID]*TipSetFilter
|
||||
}
|
||||
|
||||
func (m *TipSetFilterManager) Apply(ctx context.Context, from, to *types.TipSet) error {
|
||||
@ -100,27 +99,27 @@ func (m *TipSetFilterManager) Revert(ctx context.Context, from, to *types.TipSet
|
||||
}
|
||||
|
||||
func (m *TipSetFilterManager) Install(ctx context.Context) (*TipSetFilter, error) {
|
||||
id, err := uuid.NewRandom()
|
||||
id, err := newFilterID()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("new uuid: %w", err)
|
||||
return nil, xerrors.Errorf("new filter id: %w", err)
|
||||
}
|
||||
|
||||
f := &TipSetFilter{
|
||||
id: id.String(),
|
||||
id: id,
|
||||
maxResults: m.MaxFilterResults,
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
if m.filters == nil {
|
||||
m.filters = make(map[string]*TipSetFilter)
|
||||
m.filters = make(map[FilterID]*TipSetFilter)
|
||||
}
|
||||
m.filters[id.String()] = f
|
||||
m.filters[id] = f
|
||||
m.mu.Unlock()
|
||||
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (m *TipSetFilterManager) Remove(ctx context.Context, id string) error {
|
||||
func (m *TipSetFilterManager) Remove(ctx context.Context, id FilterID) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if _, found := m.filters[id]; !found {
|
||||
|
@ -294,11 +294,11 @@ func TestEthNewFilterCatchAll(t *testing.T) {
|
||||
// expect to have seen iteration number of events
|
||||
require.Equal(iterations, len(res.Results))
|
||||
|
||||
topic1Hash := api.EthHashData([]byte{0x42, 0x11, 0x11})
|
||||
topic2Hash := api.EthHashData([]byte{0x42, 0x22, 0x22})
|
||||
topic3Hash := api.EthHashData([]byte{0x42, 0x33, 0x33})
|
||||
topic4Hash := api.EthHashData([]byte{0x42, 0x44, 0x44})
|
||||
data1Hash := api.EthHashData([]byte{0x48, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88})
|
||||
topic1 := api.EthBytes(leftpad32([]byte{0x11, 0x11}))
|
||||
topic2 := api.EthBytes(leftpad32([]byte{0x22, 0x22}))
|
||||
topic3 := api.EthBytes(leftpad32([]byte{0x33, 0x33}))
|
||||
topic4 := api.EthBytes(leftpad32([]byte{0x44, 0x44}))
|
||||
data1 := api.EthBytes(leftpad32([]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88}))
|
||||
|
||||
for _, r := range res.Results {
|
||||
// since response is a union and Go doesn't support them well, go-jsonrpc won't give us typed results
|
||||
@ -323,13 +323,12 @@ func TestEthNewFilterCatchAll(t *testing.T) {
|
||||
require.Equal(tsCidHash, elog.BlockHash, "block hash")
|
||||
|
||||
require.Equal(4, len(elog.Topics), "number of topics")
|
||||
require.Equal(topic1Hash, elog.Topics[0], "topic1")
|
||||
require.Equal(topic2Hash, elog.Topics[1], "topic2")
|
||||
require.Equal(topic3Hash, elog.Topics[2], "topic3")
|
||||
require.Equal(topic4Hash, elog.Topics[3], "topic4")
|
||||
require.Equal(topic1, elog.Topics[0], "topic1")
|
||||
require.Equal(topic2, elog.Topics[1], "topic2")
|
||||
require.Equal(topic3, elog.Topics[2], "topic3")
|
||||
require.Equal(topic4, elog.Topics[3], "topic4")
|
||||
|
||||
require.Equal(1, len(elog.Data), "number of data")
|
||||
require.Equal(data1Hash, elog.Data[0], "data1")
|
||||
require.Equal(data1, elog.Data, "data1")
|
||||
|
||||
}
|
||||
}
|
||||
@ -368,67 +367,76 @@ func ParseEthLog(in map[string]interface{}) (*api.EthLog, error) {
|
||||
}
|
||||
s, ok := v.(string)
|
||||
if !ok {
|
||||
return nil, xerrors.Errorf(k + " not a string")
|
||||
return nil, xerrors.Errorf(k + ": not a string")
|
||||
}
|
||||
el.Removed, err = strconv.ParseBool(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, xerrors.Errorf("%s: %w", k, err)
|
||||
}
|
||||
case "address":
|
||||
s, ok := v.(string)
|
||||
if !ok {
|
||||
return nil, xerrors.Errorf(k + " not a string")
|
||||
return nil, xerrors.Errorf(k + ": not a string")
|
||||
}
|
||||
el.Address, err = api.EthAddressFromHex(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, xerrors.Errorf("%s: %w", k, err)
|
||||
}
|
||||
case "logIndex":
|
||||
el.LogIndex, err = ethUint64(k, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, xerrors.Errorf("%s: %w", k, err)
|
||||
}
|
||||
case "transactionIndex":
|
||||
el.TransactionIndex, err = ethUint64(k, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, xerrors.Errorf("%s: %w", k, err)
|
||||
}
|
||||
case "blockNumber":
|
||||
el.BlockNumber, err = ethUint64(k, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, xerrors.Errorf("%s: %w", k, err)
|
||||
}
|
||||
case "transactionHash":
|
||||
el.TransactionHash, err = ethHash(k, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, xerrors.Errorf("%s: %w", k, err)
|
||||
}
|
||||
case "blockHash":
|
||||
el.BlockHash, err = ethHash(k, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, xerrors.Errorf("%s: %w", k, err)
|
||||
}
|
||||
case "data":
|
||||
sl, ok := v.([]interface{})
|
||||
s, ok := v.(string)
|
||||
if !ok {
|
||||
return nil, xerrors.Errorf(k + " not a slice")
|
||||
return nil, xerrors.Errorf(k + ": not a string")
|
||||
}
|
||||
for _, s := range sl {
|
||||
data, err := hex.DecodeString(s.(string))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
el.Data = data
|
||||
data, err := hex.DecodeString(s[2:])
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("%s: %w", k, err)
|
||||
}
|
||||
el.Data = data
|
||||
|
||||
case "topics":
|
||||
s, ok := v.(string)
|
||||
if ok {
|
||||
topic, err := hex.DecodeString(s[2:])
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("%s: %w", k, err)
|
||||
}
|
||||
el.Topics = append(el.Topics, topic)
|
||||
continue
|
||||
}
|
||||
|
||||
sl, ok := v.([]interface{})
|
||||
if !ok {
|
||||
return nil, xerrors.Errorf(k + " not a slice")
|
||||
return nil, xerrors.Errorf(k + ": not a slice")
|
||||
}
|
||||
for _, s := range sl {
|
||||
topic, err := hex.DecodeString(s.(string))
|
||||
topic, err := hex.DecodeString(s.(string)[2:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, xerrors.Errorf("%s: %w", k, err)
|
||||
}
|
||||
el.Topics = append(el.Topics, topic)
|
||||
}
|
||||
@ -542,11 +550,11 @@ func TestEthGetLogsAll(t *testing.T) {
|
||||
ethContractAddr, err := api.EthAddressFromFilecoinAddress(*actor.Address)
|
||||
require.NoError(err)
|
||||
|
||||
topic1Hash := api.EthHashData([]byte{0x42, 0x11, 0x11})
|
||||
topic2Hash := api.EthHashData([]byte{0x42, 0x22, 0x22})
|
||||
topic3Hash := api.EthHashData([]byte{0x42, 0x33, 0x33})
|
||||
topic4Hash := api.EthHashData([]byte{0x42, 0x44, 0x44})
|
||||
data1Hash := api.EthHashData([]byte{0x48, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88})
|
||||
topic1 := api.EthBytes(leftpad32([]byte{0x11, 0x11}))
|
||||
topic2 := api.EthBytes(leftpad32([]byte{0x22, 0x22}))
|
||||
topic3 := api.EthBytes(leftpad32([]byte{0x33, 0x33}))
|
||||
topic4 := api.EthBytes(leftpad32([]byte{0x44, 0x44}))
|
||||
data1 := api.EthBytes(leftpad32([]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88}))
|
||||
|
||||
pstring := func(s string) *string { return &s }
|
||||
|
||||
@ -582,13 +590,12 @@ func TestEthGetLogsAll(t *testing.T) {
|
||||
require.Equal(tsCidHash, elog.BlockHash, "block hash")
|
||||
|
||||
require.Equal(4, len(elog.Topics), "number of topics")
|
||||
require.Equal(topic1Hash, elog.Topics[0], "topic1")
|
||||
require.Equal(topic2Hash, elog.Topics[1], "topic2")
|
||||
require.Equal(topic3Hash, elog.Topics[2], "topic3")
|
||||
require.Equal(topic4Hash, elog.Topics[3], "topic4")
|
||||
require.Equal(topic1, elog.Topics[0], "topic1")
|
||||
require.Equal(topic2, elog.Topics[1], "topic2")
|
||||
require.Equal(topic3, elog.Topics[2], "topic3")
|
||||
require.Equal(topic4, elog.Topics[3], "topic4")
|
||||
|
||||
require.Equal(1, len(elog.Data), "number of data")
|
||||
require.Equal(data1Hash, elog.Data[0], "data1")
|
||||
require.Equal(data1, elog.Data, "data1")
|
||||
|
||||
}
|
||||
}
|
||||
@ -706,3 +713,13 @@ func TestEthSubscribeLogs(t *testing.T) {
|
||||
// expect to have seen all logs
|
||||
require.Equal(len(received), len(subResponses))
|
||||
}
|
||||
|
||||
func leftpad32(orig []byte) []byte {
|
||||
needed := 32 - len(orig)
|
||||
if needed <= 0 {
|
||||
return orig
|
||||
}
|
||||
ret := make([]byte, 32)
|
||||
copy(ret[needed:], orig)
|
||||
return ret
|
||||
}
|
||||
|
@ -696,7 +696,7 @@ func (e *EthEvent) EthGetFilterChanges(ctx context.Context, id api.EthFilterID)
|
||||
return nil, api.ErrNotSupported
|
||||
}
|
||||
|
||||
f, err := e.FilterStore.Get(ctx, string(id))
|
||||
f, err := e.FilterStore.Get(ctx, filter.FilterID(id))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -718,7 +718,7 @@ func (e *EthEvent) EthGetFilterLogs(ctx context.Context, id api.EthFilterID) (*a
|
||||
return nil, api.ErrNotSupported
|
||||
}
|
||||
|
||||
f, err := e.FilterStore.Get(ctx, string(id))
|
||||
f, err := e.FilterStore.Get(ctx, filter.FilterID(id))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -823,22 +823,22 @@ func (e *EthEvent) installEthFilterSpec(ctx context.Context, filterSpec *api.Eth
|
||||
|
||||
func (e *EthEvent) EthNewFilter(ctx context.Context, filterSpec *api.EthFilterSpec) (api.EthFilterID, error) {
|
||||
if e.FilterStore == nil || e.EventFilterManager == nil {
|
||||
return "", api.ErrNotSupported
|
||||
return api.EthFilterID{}, api.ErrNotSupported
|
||||
}
|
||||
|
||||
f, err := e.installEthFilterSpec(ctx, filterSpec)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return api.EthFilterID{}, err
|
||||
}
|
||||
|
||||
if err := e.FilterStore.Add(ctx, f); err != nil {
|
||||
// Could not record in store, attempt to delete filter to clean up
|
||||
err2 := e.TipSetFilterManager.Remove(ctx, f.ID())
|
||||
if err2 != nil {
|
||||
return "", xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err)
|
||||
return api.EthFilterID{}, xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err)
|
||||
}
|
||||
|
||||
return "", err
|
||||
return api.EthFilterID{}, err
|
||||
}
|
||||
|
||||
return api.EthFilterID(f.ID()), nil
|
||||
@ -846,22 +846,22 @@ func (e *EthEvent) EthNewFilter(ctx context.Context, filterSpec *api.EthFilterSp
|
||||
|
||||
func (e *EthEvent) EthNewBlockFilter(ctx context.Context) (api.EthFilterID, error) {
|
||||
if e.FilterStore == nil || e.TipSetFilterManager == nil {
|
||||
return "", api.ErrNotSupported
|
||||
return api.EthFilterID{}, api.ErrNotSupported
|
||||
}
|
||||
|
||||
f, err := e.TipSetFilterManager.Install(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return api.EthFilterID{}, err
|
||||
}
|
||||
|
||||
if err := e.FilterStore.Add(ctx, f); err != nil {
|
||||
// Could not record in store, attempt to delete filter to clean up
|
||||
err2 := e.TipSetFilterManager.Remove(ctx, f.ID())
|
||||
if err2 != nil {
|
||||
return "", xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err)
|
||||
return api.EthFilterID{}, xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err)
|
||||
}
|
||||
|
||||
return "", err
|
||||
return api.EthFilterID{}, err
|
||||
}
|
||||
|
||||
return api.EthFilterID(f.ID()), nil
|
||||
@ -869,22 +869,22 @@ func (e *EthEvent) EthNewBlockFilter(ctx context.Context) (api.EthFilterID, erro
|
||||
|
||||
func (e *EthEvent) EthNewPendingTransactionFilter(ctx context.Context) (api.EthFilterID, error) {
|
||||
if e.FilterStore == nil || e.MemPoolFilterManager == nil {
|
||||
return "", api.ErrNotSupported
|
||||
return api.EthFilterID{}, api.ErrNotSupported
|
||||
}
|
||||
|
||||
f, err := e.MemPoolFilterManager.Install(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return api.EthFilterID{}, err
|
||||
}
|
||||
|
||||
if err := e.FilterStore.Add(ctx, f); err != nil {
|
||||
// Could not record in store, attempt to delete filter to clean up
|
||||
err2 := e.MemPoolFilterManager.Remove(ctx, f.ID())
|
||||
if err2 != nil {
|
||||
return "", xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err)
|
||||
return api.EthFilterID{}, xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err)
|
||||
}
|
||||
|
||||
return "", err
|
||||
return api.EthFilterID{}, err
|
||||
}
|
||||
|
||||
return api.EthFilterID(f.ID()), nil
|
||||
@ -895,7 +895,7 @@ func (e *EthEvent) EthUninstallFilter(ctx context.Context, id api.EthFilterID) (
|
||||
return false, api.ErrNotSupported
|
||||
}
|
||||
|
||||
f, err := e.FilterStore.Get(ctx, string(id))
|
||||
f, err := e.FilterStore.Get(ctx, filter.FilterID(id))
|
||||
if err != nil {
|
||||
if errors.Is(err, filter.ErrFilterNotFound) {
|
||||
return false, nil
|
||||
@ -956,7 +956,7 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *a
|
||||
f, err := e.TipSetFilterManager.Install(ctx)
|
||||
if err != nil {
|
||||
// clean up any previous filters added and stop the sub
|
||||
_, _ = e.EthUnsubscribe(ctx, api.EthSubscriptionID(sub.id))
|
||||
_, _ = e.EthUnsubscribe(ctx, sub.id)
|
||||
return nil, err
|
||||
}
|
||||
sub.addFilter(ctx, f)
|
||||
@ -978,7 +978,7 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *a
|
||||
f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, []address.Address{}, keys)
|
||||
if err != nil {
|
||||
// clean up any previous filters added and stop the sub
|
||||
_, _ = e.EthUnsubscribe(ctx, api.EthSubscriptionID(sub.id))
|
||||
_, _ = e.EthUnsubscribe(ctx, sub.id)
|
||||
return nil, err
|
||||
}
|
||||
sub.addFilter(ctx, f)
|
||||
@ -994,7 +994,7 @@ func (e *EthEvent) EthUnsubscribe(ctx context.Context, id api.EthSubscriptionID)
|
||||
return false, api.ErrNotSupported
|
||||
}
|
||||
|
||||
filters, err := e.SubManager.StopSubscription(ctx, string(id))
|
||||
filters, err := e.SubManager.StopSubscription(ctx, id)
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
@ -1130,14 +1130,16 @@ type EthSubscriptionManager struct {
|
||||
StateAPI StateAPI
|
||||
ChainAPI ChainAPI
|
||||
mu sync.Mutex
|
||||
subs map[string]*ethSubscription
|
||||
subs map[api.EthSubscriptionID]*ethSubscription
|
||||
}
|
||||
|
||||
func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSubscription, error) { // nolint
|
||||
id, err := uuid.NewRandom()
|
||||
rawid, err := uuid.NewRandom()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("new uuid: %w", err)
|
||||
}
|
||||
id := api.EthSubscriptionID{}
|
||||
copy(id[:], rawid[:]) // uuid is 16 bytes
|
||||
|
||||
ctx, quit := context.WithCancel(ctx)
|
||||
|
||||
@ -1145,7 +1147,7 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub
|
||||
Chain: e.Chain,
|
||||
StateAPI: e.StateAPI,
|
||||
ChainAPI: e.ChainAPI,
|
||||
id: id.String(),
|
||||
id: id,
|
||||
in: make(chan interface{}, 200),
|
||||
out: make(chan api.EthSubscriptionResponse, 20),
|
||||
quit: quit,
|
||||
@ -1153,7 +1155,7 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub
|
||||
|
||||
e.mu.Lock()
|
||||
if e.subs == nil {
|
||||
e.subs = make(map[string]*ethSubscription)
|
||||
e.subs = make(map[api.EthSubscriptionID]*ethSubscription)
|
||||
}
|
||||
e.subs[sub.id] = sub
|
||||
e.mu.Unlock()
|
||||
@ -1163,7 +1165,7 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id string) ([]filter.Filter, error) {
|
||||
func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id api.EthSubscriptionID) ([]filter.Filter, error) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
@ -1181,7 +1183,7 @@ type ethSubscription struct {
|
||||
Chain *store.ChainStore
|
||||
StateAPI StateAPI
|
||||
ChainAPI ChainAPI
|
||||
id string
|
||||
id api.EthSubscriptionID
|
||||
in chan interface{}
|
||||
out chan api.EthSubscriptionResponse
|
||||
|
||||
@ -1205,7 +1207,7 @@ func (e *ethSubscription) start(ctx context.Context) {
|
||||
return
|
||||
case v := <-e.in:
|
||||
resp := api.EthSubscriptionResponse{
|
||||
SubscriptionID: api.EthSubscriptionID(e.id),
|
||||
SubscriptionID: e.id,
|
||||
}
|
||||
|
||||
var err error
|
||||
|
Loading…
Reference in New Issue
Block a user