diff --git a/cmd/write.go b/cmd/write.go index bd5eaec..58d8686 100644 --- a/cmd/write.go +++ b/cmd/write.go @@ -16,13 +16,18 @@ package cmd import ( - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - "github.com/spf13/viper" + "fmt" + "net/http" + "strconv" + "sync" + "time" gethsd "github.com/ethereum/go-ethereum/statediff" ind "github.com/ethereum/go-ethereum/statediff/indexer" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" sd "github.com/vulcanize/eth-statediff-service/pkg" ) @@ -36,15 +41,20 @@ var writeCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { subCommand = cmd.CalledAs() logWithCommand = *logrus.WithField("SubCommand", subCommand) - write() + + addr, _ := cmd.Flags().GetString("serve") + write(addr) }, } +type blockRange [2]uint64 + func init() { rootCmd.AddCommand(writeCmd) + writeCmd.Flags().String("serve", ":8888", "starts a server which handles write request through endpoints") } -func write() { +func write(addr string) { logWithCommand.Info("Starting statediff writer") // load params @@ -85,7 +95,7 @@ func write() { } // Read all defined block ranges, write statediffs to database - var blockRanges [][2]uint64 + var blockRanges []blockRange diffParams := gethsd.Params{ // todo: configurable? IntermediateStateNodes: true, IntermediateStorageNodes: true, @@ -97,13 +107,71 @@ func write() { viper.UnmarshalKey("write.ranges", &blockRanges) viper.UnmarshalKey("write.params", &diffParams) - for _, rng := range blockRanges { - if rng[1] < rng[0] { - logWithCommand.Fatal("range ending block number needs to be greater than starting block number") + blockRangesCh := make(chan blockRange, 100) + go func() { + for _, r := range blockRanges { + blockRangesCh <- r } - logrus.Infof("Writing statediffs from block %d to %d", rng[0], rng[1]) - for height := rng[0]; height <= rng[1]; height++ { - statediffService.WriteStateDiffAt(height, diffParams) + if addr == "" { + close(blockRangesCh) + return } - } + startServer(addr, blockRangesCh) + }() + + processRanges(statediffService, diffParams, blockRangesCh) +} + +func startServer(addr string, blockRangesCh chan<- blockRange) { + handler := func(w http.ResponseWriter, req *http.Request) { + start, err := strconv.Atoi(req.URL.Query().Get("start")) + if err != nil { + http.Error(w, fmt.Sprintf("failed to parse start value: %v", err), http.StatusInternalServerError) + return + } + + end, err := strconv.Atoi(req.URL.Query().Get("end")) + if err != nil { + http.Error(w, fmt.Sprintf("failed to parse end value: %v", err), http.StatusInternalServerError) + return + } + + select { + case blockRangesCh <- blockRange{uint64(start), uint64(end)}: + case <-time.After(time.Millisecond * 200): + http.Error(w, "server is busy", http.StatusInternalServerError) + return + } + + fmt.Fprintf(w, "added block range to the queue\n") + } + + http.HandleFunc("/writeDiff", handler) + logrus.Fatal(http.ListenAndServe(addr, nil)) +} + +type diffService interface { + WriteStateDiffAt(blockNumber uint64, params gethsd.Params) error +} + +func processRanges(sds diffService, param gethsd.Params, blockRangesCh chan blockRange) { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for rng := range blockRangesCh { + if rng[1] < rng[0] { + logWithCommand.Fatal("range ending block number needs to be greater than starting block number") + } + logrus.Infof("Writing statediffs from block %d to %d", rng[0], rng[1]) + for height := rng[0]; height <= rng[1]; height++ { + err := sds.WriteStateDiffAt(height, param) + if err != nil { + logrus.Errorf("failed to write state diff for range: %v %v", rng, err) + } + } + } + }() + + wg.Wait() } diff --git a/cmd/write_test.go b/cmd/write_test.go new file mode 100644 index 0000000..bcbfadc --- /dev/null +++ b/cmd/write_test.go @@ -0,0 +1,76 @@ +package cmd + +import ( + "fmt" + "net/http" + "testing" + "time" + + gethsd "github.com/ethereum/go-ethereum/statediff" + "github.com/stretchr/testify/require" +) + +type mockService struct { + reqCount int +} + +func (ms *mockService) WriteStateDiffAt(_ uint64, _ gethsd.Params) error { + ms.reqCount++ + return nil +} + +func TestProcessRanges(t *testing.T) { + blockRangesCh := make(chan blockRange) + srv := new(mockService) + + go func() { + blockRangesCh <- blockRange{uint64(1), uint64(5)} + blockRangesCh <- blockRange{uint64(8), uint64(10)} + blockRangesCh <- blockRange{uint64(6), uint64(7)} + blockRangesCh <- blockRange{uint64(50), uint64(100)} + blockRangesCh <- blockRange{uint64(5), uint64(8)} + close(blockRangesCh) + }() + + processRanges(srv, gethsd.Params{}, blockRangesCh) + require.Equal(t, 65, srv.reqCount) +} + +func TestHttpEndpoint(t *testing.T) { + addr := ":11111" + queueSize := 5 + blockRangesCh := make(chan blockRange, queueSize) + srv := new(mockService) + + go startServer(addr, blockRangesCh) + + go func() { + br := []blockRange{ + {uint64(1), uint64(5)}, + {uint64(8), uint64(10)}, + {uint64(6), uint64(7)}, + {uint64(50), uint64(100)}, + {uint64(5), uint64(8)}, + // Below request should fail since server has queue size of 5 + {uint64(52), uint64(328)}, + {uint64(35), uint64(428)}, + {uint64(45), uint64(844)}, + } + + for idx, r := range br { + res, err := http.Get(fmt.Sprintf("http://localhost:11111/writeDiff?start=%d&end=%d", r[0], r[1])) + require.NoError(t, err) + require.NotNil(t, res) + if idx < queueSize { + require.Equal(t, res.StatusCode, 200) + } else { + require.Equal(t, res.StatusCode, 500) + } + require.NoError(t, res.Body.Close()) + } + processRanges(srv, gethsd.Params{}, blockRangesCh) + }() + + time.Sleep(1 * time.Second) + require.Equal(t, 65, srv.reqCount) +} diff --git a/go.mod b/go.mod index f804631..b0776e7 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/sirupsen/logrus v1.7.0 github.com/spf13/cobra v1.1.1 github.com/spf13/viper v1.7.1 + github.com/stretchr/testify v1.7.0 github.com/vulcanize/go-eth-state-node-iterator v0.0.1-alpha.0.20211014064906-d23d01ed8191 )