feat: ethrpc: Support newPendingTransactions in eth_subscribe (#10269)
This commit is contained in:
parent
86b4aeef1a
commit
7150243cda
@ -136,6 +136,113 @@ func TestEthNewPendingTransactionFilter(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEthNewPendingTransactionSub(t *testing.T) {
|
||||||
|
require := require.New(t)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
kit.QuietAllLogsExcept("events", "messagepool")
|
||||||
|
|
||||||
|
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.WithEthRPC())
|
||||||
|
ens.InterconnectAll().BeginMining(10 * time.Millisecond)
|
||||||
|
|
||||||
|
// create a new address where to send funds.
|
||||||
|
addr, err := client.WalletNew(ctx, types.KTBLS)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
// get the existing balance from the default wallet to then split it.
|
||||||
|
bal, err := client.WalletBalance(ctx, client.DefaultKey.Address)
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
// install filter
|
||||||
|
subId, err := client.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newPendingTransactions"})).Assert(require.NoError))
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
var subResponses []ethtypes.EthSubscriptionResponse
|
||||||
|
err = client.EthSubRouter.AddSub(ctx, subId, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error {
|
||||||
|
subResponses = append(subResponses, *resp)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
require.NoError(err)
|
||||||
|
|
||||||
|
const iterations = 100
|
||||||
|
|
||||||
|
// we'll send half our balance (saving the other half for gas),
|
||||||
|
// in `iterations` increments.
|
||||||
|
toSend := big.Div(bal, big.NewInt(2))
|
||||||
|
each := big.Div(toSend, big.NewInt(iterations))
|
||||||
|
|
||||||
|
waitAllCh := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
headChangeCh, err := client.ChainNotify(ctx)
|
||||||
|
require.NoError(err)
|
||||||
|
<-headChangeCh // skip hccurrent
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
close(waitAllCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
count := 0
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case headChanges := <-headChangeCh:
|
||||||
|
for _, change := range headChanges {
|
||||||
|
if change.Type == store.HCApply {
|
||||||
|
msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key())
|
||||||
|
require.NoError(err)
|
||||||
|
count += len(msgs)
|
||||||
|
if count == iterations {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var sms []*types.SignedMessage
|
||||||
|
for i := 0; i < iterations; i++ {
|
||||||
|
msg := &types.Message{
|
||||||
|
From: client.DefaultKey.Address,
|
||||||
|
To: addr,
|
||||||
|
Value: each,
|
||||||
|
}
|
||||||
|
|
||||||
|
sm, err := client.MpoolPushMessage(ctx, msg, nil)
|
||||||
|
require.NoError(err)
|
||||||
|
require.EqualValues(i, sm.Message.Nonce)
|
||||||
|
|
||||||
|
sms = append(sms, sm)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-waitAllCh:
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Errorf("timeout waiting to pack messages")
|
||||||
|
}
|
||||||
|
|
||||||
|
expected := make(map[string]bool)
|
||||||
|
for _, sm := range sms {
|
||||||
|
hash, err := ethtypes.EthHashFromCid(sm.Cid())
|
||||||
|
require.NoError(err)
|
||||||
|
expected[hash.String()] = false
|
||||||
|
}
|
||||||
|
|
||||||
|
// expect to have seen iteration number of mpool messages
|
||||||
|
require.Equal(len(subResponses), len(expected), "expected number of filter results to equal number of messages")
|
||||||
|
|
||||||
|
for _, txid := range subResponses {
|
||||||
|
expected[txid.Result.(string)] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, found := range expected {
|
||||||
|
require.True(found)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestEthNewBlockFilter(t *testing.T) {
|
func TestEthNewBlockFilter(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
@ -1273,6 +1273,7 @@ func (e *EthEvent) uninstallFilter(ctx context.Context, f filter.Filter) error {
|
|||||||
const (
|
const (
|
||||||
EthSubscribeEventTypeHeads = "newHeads"
|
EthSubscribeEventTypeHeads = "newHeads"
|
||||||
EthSubscribeEventTypeLogs = "logs"
|
EthSubscribeEventTypeLogs = "logs"
|
||||||
|
EthSubscribeEventTypePendingTransactions = "newPendingTransactions"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (e *EthEvent) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error) {
|
func (e *EthEvent) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethtypes.EthSubscriptionID, error) {
|
||||||
@ -1334,6 +1335,15 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethty
|
|||||||
_, _ = e.EthUnsubscribe(ctx, sub.id)
|
_, _ = e.EthUnsubscribe(ctx, sub.id)
|
||||||
return ethtypes.EthSubscriptionID{}, err
|
return ethtypes.EthSubscriptionID{}, err
|
||||||
}
|
}
|
||||||
|
sub.addFilter(ctx, f)
|
||||||
|
case EthSubscribeEventTypePendingTransactions:
|
||||||
|
f, err := e.MemPoolFilterManager.Install(ctx)
|
||||||
|
if err != nil {
|
||||||
|
// clean up any previous filters added and stop the sub
|
||||||
|
_, _ = e.EthUnsubscribe(ctx, sub.id)
|
||||||
|
return ethtypes.EthSubscriptionID{}, err
|
||||||
|
}
|
||||||
|
|
||||||
sub.addFilter(ctx, f)
|
sub.addFilter(ctx, f)
|
||||||
default:
|
default:
|
||||||
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("unsupported event type: %s", params.EventType)
|
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("unsupported event type: %s", params.EventType)
|
||||||
@ -1654,6 +1664,15 @@ func (e *ethSubscription) start(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
e.send(ctx, ev)
|
e.send(ctx, ev)
|
||||||
|
case *types.SignedMessage: // mpool txid
|
||||||
|
evs, err := ethFilterResultFromMessages([]*types.SignedMessage{vt}, e.StateAPI)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, r := range evs.Results {
|
||||||
|
e.send(ctx, r)
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
log.Warnf("unexpected subscription value type: %T", vt)
|
log.Warnf("unexpected subscription value type: %T", vt)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user