laconicd/ethereum/rpc/pubsub/pubsub_test.go

80 lines
1.4 KiB
Go
Raw Normal View History

2021-04-18 16:39:15 +00:00
package pubsub
import (
"log"
"sync"
"testing"
"time"
2021-04-21 13:41:30 +00:00
"github.com/stretchr/testify/require"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
2021-04-18 16:39:15 +00:00
)
func TestAddTopic(t *testing.T) {
q := NewEventBus()
2021-04-21 13:41:30 +00:00
err := q.AddTopic("kek", make(<-chan coretypes.ResultEvent))
require.NoError(t, err)
err = q.AddTopic("lol", make(<-chan coretypes.ResultEvent))
require.NoError(t, err)
err = q.AddTopic("lol", make(<-chan coretypes.ResultEvent))
require.Error(t, err)
require.EqualValues(t, []string{"kek", "lol"}, q.Topics())
2021-04-18 16:39:15 +00:00
}
func TestSubscribe(t *testing.T) {
q := NewEventBus()
2021-05-05 13:10:21 +00:00
kekSrc := make(chan coretypes.ResultEvent)
2021-04-18 16:39:15 +00:00
q.AddTopic("kek", kekSrc)
2021-05-05 13:10:21 +00:00
lolSrc := make(chan coretypes.ResultEvent)
2021-04-18 16:39:15 +00:00
q.AddTopic("lol", lolSrc)
kekSubC, err := q.Subscribe("kek")
2021-04-21 13:41:30 +00:00
require.NoError(t, err)
2021-04-18 16:39:15 +00:00
lolSubC, err := q.Subscribe("lol")
2021-04-21 13:41:30 +00:00
require.NoError(t, err)
2021-04-18 16:39:15 +00:00
lol2SubC, err := q.Subscribe("lol")
2021-04-21 13:41:30 +00:00
require.NoError(t, err)
2021-04-18 16:39:15 +00:00
wg := new(sync.WaitGroup)
wg.Add(4)
go func() {
defer wg.Done()
msg := <-kekSubC
log.Println("kek:", msg)
2021-04-21 13:41:30 +00:00
require.EqualValues(t, 1, msg)
2021-04-18 16:39:15 +00:00
}()
go func() {
defer wg.Done()
msg := <-lolSubC
log.Println("lol:", msg)
2021-04-21 13:41:30 +00:00
require.EqualValues(t, 1, msg)
2021-04-18 16:39:15 +00:00
}()
go func() {
defer wg.Done()
msg := <-lol2SubC
log.Println("lol2:", msg)
2021-04-21 13:41:30 +00:00
require.EqualValues(t, 1, msg)
2021-04-18 16:39:15 +00:00
}()
go func() {
defer wg.Done()
time.Sleep(time.Second)
2021-05-05 13:10:21 +00:00
close(kekSrc)
close(lolSrc)
2021-04-18 16:39:15 +00:00
}()
wg.Wait()
time.Sleep(time.Second)
}