Merge pull request #364 from cerc-io/roy/v5-dev
More fixes for async statediff
This commit is contained in:
commit
99b29292e5
@ -178,10 +178,8 @@ func (api *PublicStateDiffAPI) StreamWrites(ctx context.Context) (*rpc.Subscript
|
|||||||
|
|
||||||
var err error
|
var err error
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err = api.sds.UnsubscribeWriteStatus(rpcSub.ID); err != nil {
|
||||||
if err = api.sds.UnsubscribeWriteStatus(rpcSub.ID); err != nil {
|
log.Error("Failed to unsubscribe from job status stream: " + err.Error())
|
||||||
log.Error("Failed to unsubscribe from job status stream: " + err.Error())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
// loop and await payloads and relay them to the subscriber with the notifier
|
// loop and await payloads and relay them to the subscriber with the notifier
|
||||||
|
@ -890,9 +890,6 @@ func (sds *Service) UnsubscribeWriteStatus(id rpc.ID) error {
|
|||||||
sds.Lock()
|
sds.Lock()
|
||||||
close(sds.jobStatusSubs[id].quitChan)
|
close(sds.jobStatusSubs[id].quitChan)
|
||||||
delete(sds.jobStatusSubs, id)
|
delete(sds.jobStatusSubs, id)
|
||||||
if len(sds.jobStatusSubs) == 0 {
|
|
||||||
sds.jobStatusSubs = nil
|
|
||||||
}
|
|
||||||
sds.Unlock()
|
sds.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -302,29 +302,26 @@ func TestGetStateDiffAt(t *testing.T) {
|
|||||||
type writeSub struct {
|
type writeSub struct {
|
||||||
sub *rpc.ClientSubscription
|
sub *rpc.ClientSubscription
|
||||||
statusChan <-chan statediff.JobStatus
|
statusChan <-chan statediff.JobStatus
|
||||||
client *rpc.Client
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws writeSub) close() {
|
func makeClient(svc *statediff.Service) *rpc.Client {
|
||||||
ws.sub.Unsubscribe()
|
|
||||||
ws.client.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// awaitStatus awaits status update for writeStateDiffAt job
|
|
||||||
func subscribeWrites(ctx context.Context, svc *statediff.Service) (writeSub, error) {
|
|
||||||
server := rpc.NewServer()
|
server := rpc.NewServer()
|
||||||
api := statediff.NewPublicStateDiffAPI(svc)
|
api := statediff.NewPublicStateDiffAPI(svc)
|
||||||
err := server.RegisterName("statediff", api)
|
err := server.RegisterName("statediff", api)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return writeSub{}, err
|
panic(err)
|
||||||
}
|
}
|
||||||
client := rpc.DialInProc(server)
|
return rpc.DialInProc(server)
|
||||||
statusChan := make(chan statediff.JobStatus)
|
|
||||||
sub, err := client.Subscribe(ctx, "statediff", statusChan, "streamWrites")
|
|
||||||
return writeSub{sub, statusChan, client}, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func awaitJob(ws writeSub, job statediff.JobID, timeout time.Duration) (bool, error) {
|
// awaitStatus awaits status update for writeStateDiffAt job
|
||||||
|
func subscribeWrites(client *rpc.Client) (writeSub, error) {
|
||||||
|
statusChan := make(chan statediff.JobStatus)
|
||||||
|
sub, err := client.Subscribe(context.Background(), "statediff", statusChan, "streamWrites")
|
||||||
|
return writeSub{sub, statusChan}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ws writeSub) await(job statediff.JobID, timeout time.Duration) (bool, error) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case err := <-ws.sub.Err():
|
case err := <-ws.sub.Err():
|
||||||
@ -358,13 +355,15 @@ func TestWriteStateDiffAt(t *testing.T) {
|
|||||||
// delay to avoid subscription request being sent after statediff is written,
|
// delay to avoid subscription request being sent after statediff is written,
|
||||||
// and timeout to prevent hanging just in case it still happens
|
// and timeout to prevent hanging just in case it still happens
|
||||||
writeDelay := 100 * time.Millisecond
|
writeDelay := 100 * time.Millisecond
|
||||||
jobTimeout := time.Second
|
jobTimeout := 200 * time.Millisecond
|
||||||
ws, err := subscribeWrites(context.Background(), service)
|
client := makeClient(service)
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
ws, err := subscribeWrites(client)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer ws.close()
|
|
||||||
time.Sleep(writeDelay)
|
time.Sleep(writeDelay)
|
||||||
job := service.WriteStateDiffAt(testBlock1.NumberU64(), defaultParams)
|
job := service.WriteStateDiffAt(testBlock1.NumberU64(), defaultParams)
|
||||||
ok, err := awaitJob(ws, job, jobTimeout)
|
ok, err := ws.await(job, jobTimeout)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
|
|
||||||
@ -372,6 +371,27 @@ func TestWriteStateDiffAt(t *testing.T) {
|
|||||||
require.Equal(t, testBlock1.Hash(), builder.Args.BlockHash)
|
require.Equal(t, testBlock1.Hash(), builder.Args.BlockHash)
|
||||||
require.Equal(t, parentBlock1.Root(), builder.Args.OldStateRoot)
|
require.Equal(t, parentBlock1.Root(), builder.Args.OldStateRoot)
|
||||||
require.Equal(t, testBlock1.Root(), builder.Args.NewStateRoot)
|
require.Equal(t, testBlock1.Root(), builder.Args.NewStateRoot)
|
||||||
|
|
||||||
|
// unsubscribe and verify we get nothing
|
||||||
|
// TODO - StreamWrites receives EOF error after unsubscribing. Doesn't seem to impact
|
||||||
|
// anything but would be good to know why.
|
||||||
|
ws.sub.Unsubscribe()
|
||||||
|
time.Sleep(writeDelay)
|
||||||
|
job = service.WriteStateDiffAt(testBlock1.NumberU64(), defaultParams)
|
||||||
|
ok, _ = ws.await(job, jobTimeout)
|
||||||
|
require.False(t, ok)
|
||||||
|
|
||||||
|
client.Close()
|
||||||
|
client = makeClient(service)
|
||||||
|
|
||||||
|
// re-subscribe and test again
|
||||||
|
ws, err = subscribeWrites(client)
|
||||||
|
require.NoError(t, err)
|
||||||
|
time.Sleep(writeDelay)
|
||||||
|
job = service.WriteStateDiffAt(testBlock1.NumberU64(), defaultParams)
|
||||||
|
ok, err = ws.await(job, jobTimeout)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWaitForSync(t *testing.T) {
|
func TestWaitForSync(t *testing.T) {
|
||||||
|
Loading…
Reference in New Issue
Block a user