diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index bacb751e2..221e8e17a 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -378,8 +378,11 @@ func init() { ethFeeHistoryReward := [][]api.EthBigInt{} addExample(ðFeeHistoryReward) - addExample(api.EthFilterID("c5564560217c43e4bc0484df655e9019")) - addExample(api.EthSubscriptionID("b62df77831484129adf6682332ad0725")) + filterid, _ := api.EthHashFromHex("0x5CbEeC012345673f25E309Cc264f240bb0664031") + addExample(api.EthFilterID(filterid)) + + subid, _ := api.EthHashFromHex("0x5CbEeCF99d3fDB301234567c264f240bb0664031") + addExample(api.EthSubscriptionID(subid)) pstring := func(s string) *string { return &s } addExample(&api.EthFilterSpec{ diff --git a/api/eth_types.go b/api/eth_types.go index 57937ad8c..4708239ca 100644 --- a/api/eth_types.go +++ b/api/eth_types.go @@ -418,10 +418,10 @@ type EthFeeHistory struct { } // An opaque identifier generated by the Lotus node to refer to an installed filter. -type EthFilterID string +type EthFilterID EthHash // An opaque identifier generated by the Lotus node to refer to an active subscription. -type EthSubscriptionID string +type EthSubscriptionID EthHash type EthFilterSpec struct { // Interpreted as an epoch or one of "latest" for last mined block, "earliest" for first, diff --git a/api/eth_types_test.go b/api/eth_types_test.go index 1036a6f79..d65a32625 100644 --- a/api/eth_types_test.go +++ b/api/eth_types_test.go @@ -176,7 +176,7 @@ func TestEthFilterResultMarshalJSON(t *testing.T) { BlockHash: hash2, BlockNumber: 53, Topics: []EthBytes{hash1[:]}, - Data: hash1[:], + Data: []EthBytes{hash1[:]}, Address: addr, } logjson, err := json.Marshal(log) diff --git a/chain/events/filter/event.go b/chain/events/filter/event.go index f5f2123ad..2f4d3206f 100644 --- a/chain/events/filter/event.go +++ b/chain/events/filter/event.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/google/uuid" "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" @@ -24,7 +23,7 @@ import ( const indexed uint8 = 0x01 type EventFilter struct { - id string + id FilterID minHeight abi.ChainEpoch // minimum epoch to apply filter or -1 if no minimum maxHeight abi.ChainEpoch // maximum epoch to apply filter or -1 if no maximum tipsetCid cid.Cid @@ -51,7 +50,7 @@ type CollectedEvent struct { MsgCid cid.Cid // cid of message that produced event } -func (f *EventFilter) ID() string { +func (f *EventFilter) ID() FilterID { return f.id } @@ -290,7 +289,7 @@ type EventFilterManager struct { EventIndex *EventIndex mu sync.Mutex // guards mutations to filters - filters map[string]*EventFilter + filters map[FilterID]*EventFilter currentHeight abi.ChainEpoch } @@ -365,13 +364,13 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a return nil, xerrors.Errorf("historic event index disabled") } - id, err := uuid.NewRandom() + id, err := newFilterID() if err != nil { - return nil, xerrors.Errorf("new uuid: %w", err) + return nil, xerrors.Errorf("new filter id: %w", err) } f := &EventFilter{ - id: id.String(), + id: id, minHeight: minHeight, maxHeight: maxHeight, tipsetCid: tipsetCid, @@ -389,15 +388,15 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a m.mu.Lock() if m.filters == nil { - m.filters = make(map[string]*EventFilter) + m.filters = make(map[FilterID]*EventFilter) } - m.filters[id.String()] = f + m.filters[id] = f m.mu.Unlock() return f, nil } -func (m *EventFilterManager) Remove(ctx context.Context, id string) error { +func (m *EventFilterManager) Remove(ctx context.Context, id FilterID) error { m.mu.Lock() defer m.mu.Unlock() if _, found := m.filters[id]; !found { diff --git a/chain/events/filter/mempool.go b/chain/events/filter/mempool.go index 5be350644..addae7bf1 100644 --- a/chain/events/filter/mempool.go +++ b/chain/events/filter/mempool.go @@ -5,7 +5,6 @@ import ( "sync" "time" - "github.com/google/uuid" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -14,7 +13,7 @@ import ( ) type MemPoolFilter struct { - id string + id FilterID maxResults int // maximum number of results to collect, 0 is unlimited ch chan<- interface{} @@ -25,7 +24,7 @@ type MemPoolFilter struct { var _ Filter = (*MemPoolFilter)(nil) -func (f *MemPoolFilter) ID() string { +func (f *MemPoolFilter) ID() FilterID { return f.id } @@ -79,7 +78,7 @@ type MemPoolFilterManager struct { MaxFilterResults int mu sync.Mutex // guards mutations to filters - filters map[string]*MemPoolFilter + filters map[FilterID]*MemPoolFilter } func (m *MemPoolFilterManager) WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate) { @@ -113,27 +112,27 @@ func (m *MemPoolFilterManager) processUpdate(ctx context.Context, u api.MpoolUpd } func (m *MemPoolFilterManager) Install(ctx context.Context) (*MemPoolFilter, error) { - id, err := uuid.NewRandom() + id, err := newFilterID() if err != nil { - return nil, xerrors.Errorf("new uuid: %w", err) + return nil, xerrors.Errorf("new filter id: %w", err) } f := &MemPoolFilter{ - id: id.String(), + id: id, maxResults: m.MaxFilterResults, } m.mu.Lock() if m.filters == nil { - m.filters = make(map[string]*MemPoolFilter) + m.filters = make(map[FilterID]*MemPoolFilter) } - m.filters[id.String()] = f + m.filters[id] = f m.mu.Unlock() return f, nil } -func (m *MemPoolFilterManager) Remove(ctx context.Context, id string) error { +func (m *MemPoolFilterManager) Remove(ctx context.Context, id FilterID) error { m.mu.Lock() defer m.mu.Unlock() if _, found := m.filters[id]; !found { diff --git a/chain/events/filter/store.go b/chain/events/filter/store.go index 2f8a09875..9f201fc38 100644 --- a/chain/events/filter/store.go +++ b/chain/events/filter/store.go @@ -5,10 +5,13 @@ import ( "errors" "sync" "time" + + "github.com/google/uuid" + "golang.org/x/xerrors" ) type Filter interface { - ID() string + ID() FilterID LastTaken() time.Time SetSubChannel(chan<- interface{}) ClearSubChannel() @@ -16,8 +19,8 @@ type Filter interface { type FilterStore interface { Add(context.Context, Filter) error - Get(context.Context, string) (Filter, error) - Remove(context.Context, string) error + Get(context.Context, FilterID) (Filter, error) + Remove(context.Context, FilterID) error NotTakenSince(when time.Time) []Filter // returns a list of filters that have not had their collected results taken } @@ -27,10 +30,22 @@ var ( ErrMaximumNumberOfFilters = errors.New("maximum number of filters registered") ) +type FilterID [32]byte // compatible with EthHash + +func newFilterID() (FilterID, error) { + rawid, err := uuid.NewRandom() + if err != nil { + return FilterID{}, xerrors.Errorf("new uuid: %w", err) + } + id := FilterID{} + copy(id[:], rawid[:]) // uuid is 16 bytes + return id, nil +} + type memFilterStore struct { max int mu sync.Mutex - filters map[string]Filter + filters map[FilterID]Filter } var _ FilterStore = (*memFilterStore)(nil) @@ -38,7 +53,7 @@ var _ FilterStore = (*memFilterStore)(nil) func NewMemFilterStore(maxFilters int) FilterStore { return &memFilterStore{ max: maxFilters, - filters: make(map[string]Filter), + filters: make(map[FilterID]Filter), } } @@ -57,7 +72,7 @@ func (m *memFilterStore) Add(_ context.Context, f Filter) error { return nil } -func (m *memFilterStore) Get(_ context.Context, id string) (Filter, error) { +func (m *memFilterStore) Get(_ context.Context, id FilterID) (Filter, error) { m.mu.Lock() f, found := m.filters[id] m.mu.Unlock() @@ -67,7 +82,7 @@ func (m *memFilterStore) Get(_ context.Context, id string) (Filter, error) { return f, nil } -func (m *memFilterStore) Remove(_ context.Context, id string) error { +func (m *memFilterStore) Remove(_ context.Context, id FilterID) error { m.mu.Lock() defer m.mu.Unlock() diff --git a/chain/events/filter/tipset.go b/chain/events/filter/tipset.go index 0e43c96ef..d3ad8b67b 100644 --- a/chain/events/filter/tipset.go +++ b/chain/events/filter/tipset.go @@ -5,14 +5,13 @@ import ( "sync" "time" - "github.com/google/uuid" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/chain/types" ) type TipSetFilter struct { - id string + id FilterID maxResults int // maximum number of results to collect, 0 is unlimited ch chan<- interface{} @@ -23,7 +22,7 @@ type TipSetFilter struct { var _ Filter = (*TipSetFilter)(nil) -func (f *TipSetFilter) ID() string { +func (f *TipSetFilter) ID() FilterID { return f.id } @@ -77,7 +76,7 @@ type TipSetFilterManager struct { MaxFilterResults int mu sync.Mutex // guards mutations to filters - filters map[string]*TipSetFilter + filters map[FilterID]*TipSetFilter } func (m *TipSetFilterManager) Apply(ctx context.Context, from, to *types.TipSet) error { @@ -100,27 +99,27 @@ func (m *TipSetFilterManager) Revert(ctx context.Context, from, to *types.TipSet } func (m *TipSetFilterManager) Install(ctx context.Context) (*TipSetFilter, error) { - id, err := uuid.NewRandom() + id, err := newFilterID() if err != nil { - return nil, xerrors.Errorf("new uuid: %w", err) + return nil, xerrors.Errorf("new filter id: %w", err) } f := &TipSetFilter{ - id: id.String(), + id: id, maxResults: m.MaxFilterResults, } m.mu.Lock() if m.filters == nil { - m.filters = make(map[string]*TipSetFilter) + m.filters = make(map[FilterID]*TipSetFilter) } - m.filters[id.String()] = f + m.filters[id] = f m.mu.Unlock() return f, nil } -func (m *TipSetFilterManager) Remove(ctx context.Context, id string) error { +func (m *TipSetFilterManager) Remove(ctx context.Context, id FilterID) error { m.mu.Lock() defer m.mu.Unlock() if _, found := m.filters[id]; !found { diff --git a/itests/eth_filter_test.go b/itests/eth_filter_test.go index 45c70b289..8c900bbf1 100644 --- a/itests/eth_filter_test.go +++ b/itests/eth_filter_test.go @@ -294,11 +294,11 @@ func TestEthNewFilterCatchAll(t *testing.T) { // expect to have seen iteration number of events require.Equal(iterations, len(res.Results)) - topic1Hash := api.EthHashData([]byte{0x42, 0x11, 0x11}) - topic2Hash := api.EthHashData([]byte{0x42, 0x22, 0x22}) - topic3Hash := api.EthHashData([]byte{0x42, 0x33, 0x33}) - topic4Hash := api.EthHashData([]byte{0x42, 0x44, 0x44}) - data1Hash := api.EthHashData([]byte{0x48, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88}) + topic1 := api.EthBytes(leftpad32([]byte{0x11, 0x11})) + topic2 := api.EthBytes(leftpad32([]byte{0x22, 0x22})) + topic3 := api.EthBytes(leftpad32([]byte{0x33, 0x33})) + topic4 := api.EthBytes(leftpad32([]byte{0x44, 0x44})) + data1 := api.EthBytes(leftpad32([]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88})) for _, r := range res.Results { // since response is a union and Go doesn't support them well, go-jsonrpc won't give us typed results @@ -323,13 +323,12 @@ func TestEthNewFilterCatchAll(t *testing.T) { require.Equal(tsCidHash, elog.BlockHash, "block hash") require.Equal(4, len(elog.Topics), "number of topics") - require.Equal(topic1Hash, elog.Topics[0], "topic1") - require.Equal(topic2Hash, elog.Topics[1], "topic2") - require.Equal(topic3Hash, elog.Topics[2], "topic3") - require.Equal(topic4Hash, elog.Topics[3], "topic4") + require.Equal(topic1, elog.Topics[0], "topic1") + require.Equal(topic2, elog.Topics[1], "topic2") + require.Equal(topic3, elog.Topics[2], "topic3") + require.Equal(topic4, elog.Topics[3], "topic4") - require.Equal(1, len(elog.Data), "number of data") - require.Equal(data1Hash, elog.Data[0], "data1") + require.Equal(data1, elog.Data, "data1") } } @@ -368,67 +367,76 @@ func ParseEthLog(in map[string]interface{}) (*api.EthLog, error) { } s, ok := v.(string) if !ok { - return nil, xerrors.Errorf(k + " not a string") + return nil, xerrors.Errorf(k + ": not a string") } el.Removed, err = strconv.ParseBool(s) if err != nil { - return nil, err + return nil, xerrors.Errorf("%s: %w", k, err) } case "address": s, ok := v.(string) if !ok { - return nil, xerrors.Errorf(k + " not a string") + return nil, xerrors.Errorf(k + ": not a string") } el.Address, err = api.EthAddressFromHex(s) if err != nil { - return nil, err + return nil, xerrors.Errorf("%s: %w", k, err) } case "logIndex": el.LogIndex, err = ethUint64(k, v) if err != nil { - return nil, err + return nil, xerrors.Errorf("%s: %w", k, err) } case "transactionIndex": el.TransactionIndex, err = ethUint64(k, v) if err != nil { - return nil, err + return nil, xerrors.Errorf("%s: %w", k, err) } case "blockNumber": el.BlockNumber, err = ethUint64(k, v) if err != nil { - return nil, err + return nil, xerrors.Errorf("%s: %w", k, err) } case "transactionHash": el.TransactionHash, err = ethHash(k, v) if err != nil { - return nil, err + return nil, xerrors.Errorf("%s: %w", k, err) } case "blockHash": el.BlockHash, err = ethHash(k, v) if err != nil { - return nil, err + return nil, xerrors.Errorf("%s: %w", k, err) } case "data": - sl, ok := v.([]interface{}) + s, ok := v.(string) if !ok { - return nil, xerrors.Errorf(k + " not a slice") + return nil, xerrors.Errorf(k + ": not a string") } - for _, s := range sl { - data, err := hex.DecodeString(s.(string)) - if err != nil { - return nil, err - } - el.Data = data + data, err := hex.DecodeString(s[2:]) + if err != nil { + return nil, xerrors.Errorf("%s: %w", k, err) } + el.Data = data + case "topics": + s, ok := v.(string) + if ok { + topic, err := hex.DecodeString(s[2:]) + if err != nil { + return nil, xerrors.Errorf("%s: %w", k, err) + } + el.Topics = append(el.Topics, topic) + continue + } + sl, ok := v.([]interface{}) if !ok { - return nil, xerrors.Errorf(k + " not a slice") + return nil, xerrors.Errorf(k + ": not a slice") } for _, s := range sl { - topic, err := hex.DecodeString(s.(string)) + topic, err := hex.DecodeString(s.(string)[2:]) if err != nil { - return nil, err + return nil, xerrors.Errorf("%s: %w", k, err) } el.Topics = append(el.Topics, topic) } @@ -542,11 +550,11 @@ func TestEthGetLogsAll(t *testing.T) { ethContractAddr, err := api.EthAddressFromFilecoinAddress(*actor.Address) require.NoError(err) - topic1Hash := api.EthHashData([]byte{0x42, 0x11, 0x11}) - topic2Hash := api.EthHashData([]byte{0x42, 0x22, 0x22}) - topic3Hash := api.EthHashData([]byte{0x42, 0x33, 0x33}) - topic4Hash := api.EthHashData([]byte{0x42, 0x44, 0x44}) - data1Hash := api.EthHashData([]byte{0x48, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88}) + topic1 := api.EthBytes(leftpad32([]byte{0x11, 0x11})) + topic2 := api.EthBytes(leftpad32([]byte{0x22, 0x22})) + topic3 := api.EthBytes(leftpad32([]byte{0x33, 0x33})) + topic4 := api.EthBytes(leftpad32([]byte{0x44, 0x44})) + data1 := api.EthBytes(leftpad32([]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88})) pstring := func(s string) *string { return &s } @@ -582,13 +590,12 @@ func TestEthGetLogsAll(t *testing.T) { require.Equal(tsCidHash, elog.BlockHash, "block hash") require.Equal(4, len(elog.Topics), "number of topics") - require.Equal(topic1Hash, elog.Topics[0], "topic1") - require.Equal(topic2Hash, elog.Topics[1], "topic2") - require.Equal(topic3Hash, elog.Topics[2], "topic3") - require.Equal(topic4Hash, elog.Topics[3], "topic4") + require.Equal(topic1, elog.Topics[0], "topic1") + require.Equal(topic2, elog.Topics[1], "topic2") + require.Equal(topic3, elog.Topics[2], "topic3") + require.Equal(topic4, elog.Topics[3], "topic4") - require.Equal(1, len(elog.Data), "number of data") - require.Equal(data1Hash, elog.Data[0], "data1") + require.Equal(data1, elog.Data, "data1") } } @@ -706,3 +713,13 @@ func TestEthSubscribeLogs(t *testing.T) { // expect to have seen all logs require.Equal(len(received), len(subResponses)) } + +func leftpad32(orig []byte) []byte { + needed := 32 - len(orig) + if needed <= 0 { + return orig + } + ret := make([]byte, 32) + copy(ret[needed:], orig) + return ret +} diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index feb7e0bb5..3f5e69110 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -696,7 +696,7 @@ func (e *EthEvent) EthGetFilterChanges(ctx context.Context, id api.EthFilterID) return nil, api.ErrNotSupported } - f, err := e.FilterStore.Get(ctx, string(id)) + f, err := e.FilterStore.Get(ctx, filter.FilterID(id)) if err != nil { return nil, err } @@ -718,7 +718,7 @@ func (e *EthEvent) EthGetFilterLogs(ctx context.Context, id api.EthFilterID) (*a return nil, api.ErrNotSupported } - f, err := e.FilterStore.Get(ctx, string(id)) + f, err := e.FilterStore.Get(ctx, filter.FilterID(id)) if err != nil { return nil, err } @@ -823,22 +823,22 @@ func (e *EthEvent) installEthFilterSpec(ctx context.Context, filterSpec *api.Eth func (e *EthEvent) EthNewFilter(ctx context.Context, filterSpec *api.EthFilterSpec) (api.EthFilterID, error) { if e.FilterStore == nil || e.EventFilterManager == nil { - return "", api.ErrNotSupported + return api.EthFilterID{}, api.ErrNotSupported } f, err := e.installEthFilterSpec(ctx, filterSpec) if err != nil { - return "", err + return api.EthFilterID{}, err } if err := e.FilterStore.Add(ctx, f); err != nil { // Could not record in store, attempt to delete filter to clean up err2 := e.TipSetFilterManager.Remove(ctx, f.ID()) if err2 != nil { - return "", xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err) + return api.EthFilterID{}, xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err) } - return "", err + return api.EthFilterID{}, err } return api.EthFilterID(f.ID()), nil @@ -846,22 +846,22 @@ func (e *EthEvent) EthNewFilter(ctx context.Context, filterSpec *api.EthFilterSp func (e *EthEvent) EthNewBlockFilter(ctx context.Context) (api.EthFilterID, error) { if e.FilterStore == nil || e.TipSetFilterManager == nil { - return "", api.ErrNotSupported + return api.EthFilterID{}, api.ErrNotSupported } f, err := e.TipSetFilterManager.Install(ctx) if err != nil { - return "", err + return api.EthFilterID{}, err } if err := e.FilterStore.Add(ctx, f); err != nil { // Could not record in store, attempt to delete filter to clean up err2 := e.TipSetFilterManager.Remove(ctx, f.ID()) if err2 != nil { - return "", xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err) + return api.EthFilterID{}, xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err) } - return "", err + return api.EthFilterID{}, err } return api.EthFilterID(f.ID()), nil @@ -869,22 +869,22 @@ func (e *EthEvent) EthNewBlockFilter(ctx context.Context) (api.EthFilterID, erro func (e *EthEvent) EthNewPendingTransactionFilter(ctx context.Context) (api.EthFilterID, error) { if e.FilterStore == nil || e.MemPoolFilterManager == nil { - return "", api.ErrNotSupported + return api.EthFilterID{}, api.ErrNotSupported } f, err := e.MemPoolFilterManager.Install(ctx) if err != nil { - return "", err + return api.EthFilterID{}, err } if err := e.FilterStore.Add(ctx, f); err != nil { // Could not record in store, attempt to delete filter to clean up err2 := e.MemPoolFilterManager.Remove(ctx, f.ID()) if err2 != nil { - return "", xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err) + return api.EthFilterID{}, xerrors.Errorf("encountered error %v while removing new filter due to %v", err2, err) } - return "", err + return api.EthFilterID{}, err } return api.EthFilterID(f.ID()), nil @@ -895,7 +895,7 @@ func (e *EthEvent) EthUninstallFilter(ctx context.Context, id api.EthFilterID) ( return false, api.ErrNotSupported } - f, err := e.FilterStore.Get(ctx, string(id)) + f, err := e.FilterStore.Get(ctx, filter.FilterID(id)) if err != nil { if errors.Is(err, filter.ErrFilterNotFound) { return false, nil @@ -956,7 +956,7 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *a f, err := e.TipSetFilterManager.Install(ctx) if err != nil { // clean up any previous filters added and stop the sub - _, _ = e.EthUnsubscribe(ctx, api.EthSubscriptionID(sub.id)) + _, _ = e.EthUnsubscribe(ctx, sub.id) return nil, err } sub.addFilter(ctx, f) @@ -978,7 +978,7 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *a f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, []address.Address{}, keys) if err != nil { // clean up any previous filters added and stop the sub - _, _ = e.EthUnsubscribe(ctx, api.EthSubscriptionID(sub.id)) + _, _ = e.EthUnsubscribe(ctx, sub.id) return nil, err } sub.addFilter(ctx, f) @@ -994,7 +994,7 @@ func (e *EthEvent) EthUnsubscribe(ctx context.Context, id api.EthSubscriptionID) return false, api.ErrNotSupported } - filters, err := e.SubManager.StopSubscription(ctx, string(id)) + filters, err := e.SubManager.StopSubscription(ctx, id) if err != nil { return false, nil } @@ -1130,14 +1130,16 @@ type EthSubscriptionManager struct { StateAPI StateAPI ChainAPI ChainAPI mu sync.Mutex - subs map[string]*ethSubscription + subs map[api.EthSubscriptionID]*ethSubscription } func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSubscription, error) { // nolint - id, err := uuid.NewRandom() + rawid, err := uuid.NewRandom() if err != nil { return nil, xerrors.Errorf("new uuid: %w", err) } + id := api.EthSubscriptionID{} + copy(id[:], rawid[:]) // uuid is 16 bytes ctx, quit := context.WithCancel(ctx) @@ -1145,7 +1147,7 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub Chain: e.Chain, StateAPI: e.StateAPI, ChainAPI: e.ChainAPI, - id: id.String(), + id: id, in: make(chan interface{}, 200), out: make(chan api.EthSubscriptionResponse, 20), quit: quit, @@ -1153,7 +1155,7 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub e.mu.Lock() if e.subs == nil { - e.subs = make(map[string]*ethSubscription) + e.subs = make(map[api.EthSubscriptionID]*ethSubscription) } e.subs[sub.id] = sub e.mu.Unlock() @@ -1163,7 +1165,7 @@ func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub return sub, nil } -func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id string) ([]filter.Filter, error) { +func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id api.EthSubscriptionID) ([]filter.Filter, error) { e.mu.Lock() defer e.mu.Unlock() @@ -1181,7 +1183,7 @@ type ethSubscription struct { Chain *store.ChainStore StateAPI StateAPI ChainAPI ChainAPI - id string + id api.EthSubscriptionID in chan interface{} out chan api.EthSubscriptionResponse @@ -1205,7 +1207,7 @@ func (e *ethSubscription) start(ctx context.Context) { return case v := <-e.in: resp := api.EthSubscriptionResponse{ - SubscriptionID: api.EthSubscriptionID(e.id), + SubscriptionID: e.id, } var err error