diff --git a/cmd/superNode.go b/cmd/superNode.go index 02f35420..a72dd6a9 100644 --- a/cmd/superNode.go +++ b/cmd/superNode.go @@ -16,6 +16,8 @@ package cmd import ( + "os" + "os/signal" "sync" "github.com/ethereum/go-ethereum/rpc" @@ -85,15 +87,23 @@ func superNode() { logWithCommand.Fatal(err) } } + var backFiller super_node.BackFillInterface if superNodeConfig.BackFill { logWithCommand.Debug("initializing new super node backfill service") - backFiller, err := super_node.NewBackFillService(superNodeConfig, forwardPayloadChan) + backFiller, err = super_node.NewBackFillService(superNodeConfig, forwardPayloadChan) if err != nil { logWithCommand.Fatal(err) } logWithCommand.Info("starting up super node backfill process") backFiller.BackFill(wg) } + shutdown := make(chan os.Signal) + signal.Notify(shutdown, os.Interrupt) + <-shutdown + if superNodeConfig.BackFill { + backFiller.Stop() + } + superNode.Stop() wg.Wait() } diff --git a/go.mod b/go.mod index 116cca35..6ba1ac6b 100644 --- a/go.mod +++ b/go.mod @@ -98,4 +98,4 @@ replace github.com/ipfs/go-ipfs v0.4.22 => github.com/vulcanize/go-ipfs v0.4.22- replace github.com/ipfs/go-ipfs-config v0.0.3 => github.com/vulcanize/go-ipfs-config v0.0.8-alpha -replace github.com/ethereum/go-ethereum v1.9.1 => github.com/vulcanize/go-ethereum v1.5.10-0.20200311182536-d07dc803d290 +replace github.com/ethereum/go-ethereum v1.9.1 => github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.2 diff --git a/go.sum b/go.sum index ea4249a2..7000ae8f 100644 --- a/go.sum +++ b/go.sum @@ -692,6 +692,8 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/vulcanize/go-ethereum v1.5.10-0.20200311182536-d07dc803d290 h1:uMWt+x6JhVT7GyL983weZSxv1zDBxvGlI9HNkcTnUeg= github.com/vulcanize/go-ethereum v1.5.10-0.20200311182536-d07dc803d290/go.mod h1:7oC0Ni6dosMv5pxMigm6s0hN8g4haJMBnqmmo0D9YfQ= +github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.2 h1:ebv2bWocCmNKGnpHtRjSWoTpkgyEbRBb028PanH43H8= +github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.2/go.mod h1:7oC0Ni6dosMv5pxMigm6s0hN8g4haJMBnqmmo0D9YfQ= github.com/vulcanize/go-ipfs v0.4.22-alpha h1:W+6njT14KWllMhABRFtPndqHw8SHCt5SqD4YX528kxM= github.com/vulcanize/go-ipfs v0.4.22-alpha/go.mod h1:uaekWWeoaA0A9Dv1LObOKCSh9kIzTpZ5RbKW4g5CQHE= github.com/vulcanize/go-ipfs-config v0.0.8-alpha h1:peaFvbEcPShF6ymOd8flqKkFz4YfcrNr/UOO7FmbWoQ= diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go index f535c410..ca6f76f6 100644 --- a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go +++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go @@ -45,7 +45,7 @@ func NewGethRPCStorageFetcher(streamer streamer.Streamer) GethRPCStorageFetcher func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffInput, errs chan<- error) { ethStatediffPayloadChan := fetcher.StatediffPayloadChan - clientSubscription, clientSubErr := fetcher.streamer.Stream(ethStatediffPayloadChan) + clientSubscription, clientSubErr := fetcher.streamer.Stream(ethStatediffPayloadChan, statediff.Params{}) if clientSubErr != nil { errs <- clientSubErr panic(fmt.Sprintf("Error creating a geth client subscription: %v", clientSubErr)) @@ -55,8 +55,8 @@ func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageD for { diff := <-ethStatediffPayloadChan logrus.Trace("received a statediff") - stateDiff := new(statediff.StateDiff) - decodeErr := rlp.DecodeBytes(diff.StateDiffRlp, stateDiff) + stateDiff := new(statediff.StateObject) + decodeErr := rlp.DecodeBytes(diff.StateObjectRlp, stateDiff) if decodeErr != nil { logrus.Warn("Error decoding state diff into RLP: ", decodeErr) errs <- decodeErr @@ -65,8 +65,8 @@ func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageD accounts := utils.GetAccountsFromDiff(*stateDiff) logrus.Trace(fmt.Sprintf("iterating through %d accounts on stateDiff for block %d", len(accounts), stateDiff.BlockNumber)) for _, account := range accounts { - logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.Storage), common.BytesToHash(account.LeafKey).Hex())) - for _, storage := range account.Storage { + logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.StorageNodes), common.BytesToHash(account.LeafKey).Hex())) + for _, storage := range account.StorageNodes { diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage) if formatErr != nil { logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.LeafKey), "from account with key: ", common.BytesToHash(account.LeafKey)) diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go index 4e5b9d8f..76b14121 100644 --- a/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go +++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go @@ -33,13 +33,14 @@ import ( type MockStoragediffStreamer struct { subscribeError error PassedPayloadChan chan statediff.Payload + PassedParams statediff.Params streamPayloads []statediff.Payload } -func (streamer *MockStoragediffStreamer) Stream(statediffPayloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { +func (streamer *MockStoragediffStreamer) Stream(statediffPayloadChan chan statediff.Payload, params statediff.Params) (*rpc.ClientSubscription, error) { clientSubscription := rpc.ClientSubscription{} streamer.PassedPayloadChan = statediffPayloadChan - + streamer.PassedParams = params go func() { for _, payload := range streamer.streamPayloads { streamer.PassedPayloadChan <- payload @@ -148,19 +149,19 @@ var _ = Describe("Geth RPC Storage Fetcher", func() { It("adds errors to error channel if formatting the diff as a StateDiff object fails", func(done Done) { accountDiffs := test_data.CreatedAccountDiffs - accountDiffs[0].Storage = []statediff.StorageDiff{test_data.StorageWithBadValue} + accountDiffs[0].StorageNodes = []statediff.StorageNode{test_data.StorageWithBadValue} - stateDiff := statediff.StateDiff{ - BlockNumber: test_data.BlockNumber, - BlockHash: common.HexToHash(test_data.BlockHash), - CreatedAccounts: accountDiffs, + stateDiff := statediff.StateObject{ + BlockNumber: test_data.BlockNumber, + BlockHash: common.HexToHash(test_data.BlockHash), + Nodes: accountDiffs, } stateDiffRlp, err := rlp.EncodeToBytes(stateDiff) Expect(err).NotTo(HaveOccurred()) badStatediffPayload := statediff.Payload{ - StateDiffRlp: stateDiffRlp, + StateObjectRlp: stateDiffRlp, } streamer.SetPayloads([]statediff.Payload{badStatediffPayload}) diff --git a/libraries/shared/mocks/batch_client.go b/libraries/shared/mocks/batch_client.go index e81555af..4ad2d12b 100644 --- a/libraries/shared/mocks/batch_client.go +++ b/libraries/shared/mocks/batch_client.go @@ -51,12 +51,12 @@ func (mc *BackFillerClient) BatchCall(batch []client.BatchElem) error { return errors.New("mockclient needs to be initialized with statediff payloads and errors") } for _, batchElem := range batch { - if len(batchElem.Args) != 1 { - return errors.New("expected batch elem to contain single argument") + if len(batchElem.Args) < 1 { + return errors.New("expected batch elem to contain an argument(s)") } blockHeight, ok := batchElem.Args[0].(uint64) if !ok { - return errors.New("expected batch elem argument to be a uint64") + return errors.New("expected first batch elem argument to be a uint64") } err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result) if err != nil { @@ -72,12 +72,12 @@ func (mc *BackFillerClient) BatchCallContext(ctx context.Context, batch []rpc.Ba return errors.New("mockclient needs to be initialized with statediff payloads and errors") } for _, batchElem := range batch { - if len(batchElem.Args) != 1 { - return errors.New("expected batch elem to contain single argument") + if len(batchElem.Args) < 1 { + return errors.New("expected batch elem to contain an argument(s)") } blockHeight, ok := batchElem.Args[0].(uint64) if !ok { - return errors.New("expected batch elem argument to be a uint64") + return errors.New("expected batch elem first argument to be a uint64") } err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result) if err != nil { diff --git a/libraries/shared/storage/backfiller.go b/libraries/shared/storage/backfiller.go index 08e3fdea..42bf8c98 100644 --- a/libraries/shared/storage/backfiller.go +++ b/libraries/shared/storage/backfiller.go @@ -120,16 +120,16 @@ func (bf *backFiller) backFillRange(blockHeights []uint64, diffChan chan utils.S errChan <- fetchErr } for _, payload := range payloads { - stateDiff := new(statediff.StateDiff) - stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff) + stateDiff := new(statediff.StateObject) + stateDiffDecodeErr := rlp.DecodeBytes(payload.StateObjectRlp, stateDiff) if stateDiffDecodeErr != nil { errChan <- stateDiffDecodeErr continue } accounts := utils.GetAccountsFromDiff(*stateDiff) for _, account := range accounts { - logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.Storage), common.BytesToHash(account.LeafKey).Hex())) - for _, storage := range account.Storage { + logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.StorageNodes), common.BytesToHash(account.LeafKey).Hex())) + for _, storage := range account.StorageNodes { diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage) if formatErr != nil { logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.LeafKey), "from account with key: ", common.BytesToHash(account.LeafKey)) diff --git a/libraries/shared/storage/utils/diff.go b/libraries/shared/storage/utils/diff.go index e6ad4c78..9010dd64 100644 --- a/libraries/shared/storage/utils/diff.go +++ b/libraries/shared/storage/utils/diff.go @@ -57,7 +57,7 @@ func FromParityCsvRow(csvRow []string) (StorageDiffInput, error) { }, nil } -func FromGethStateDiff(account statediff.AccountDiff, stateDiff *statediff.StateDiff, storage statediff.StorageDiff) (StorageDiffInput, error) { +func FromGethStateDiff(account statediff.StateNode, stateDiff *statediff.StateObject, storage statediff.StorageNode) (StorageDiffInput, error) { var decodedValue []byte err := rlp.DecodeBytes(storage.NodeValue, &decodedValue) if err != nil { @@ -84,7 +84,6 @@ func HexToKeccak256Hash(hex string) common.Hash { return crypto.Keccak256Hash(common.FromHex(hex)) } -func GetAccountsFromDiff(stateDiff statediff.StateDiff) []statediff.AccountDiff { - accounts := append(stateDiff.CreatedAccounts, stateDiff.UpdatedAccounts...) - return append(accounts, stateDiff.DeletedAccounts...) +func GetAccountsFromDiff(stateDiff statediff.StateObject) []statediff.StateNode { + return stateDiff.Nodes } diff --git a/libraries/shared/storage/utils/diff_test.go b/libraries/shared/storage/utils/diff_test.go index ad47a6a7..93ad31f8 100644 --- a/libraries/shared/storage/utils/diff_test.go +++ b/libraries/shared/storage/utils/diff_test.go @@ -67,8 +67,8 @@ var _ = Describe("Storage row parsing", func() { Describe("FromGethStateDiff", func() { var ( - accountDiff = statediff.AccountDiff{LeafKey: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}} - stateDiff = &statediff.StateDiff{ + accountDiff = statediff.StateNode{LeafKey: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}} + stateDiff = &statediff.StateObject{ BlockNumber: big.NewInt(rand.Int63()), BlockHash: fakes.FakeHash, } @@ -79,7 +79,7 @@ var _ = Describe("Storage row parsing", func() { storageValueRlp, encodeErr := rlp.EncodeToBytes(storageValueBytes) Expect(encodeErr).NotTo(HaveOccurred()) - storageDiff := statediff.StorageDiff{ + storageDiff := statediff.StorageNode{ LeafKey: []byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1}, NodeValue: storageValueRlp, NodeType: statediff.Leaf, @@ -104,7 +104,7 @@ var _ = Describe("Storage row parsing", func() { storageValueRlp, encodeErr := rlp.EncodeToBytes(storageValueBytes) Expect(encodeErr).NotTo(HaveOccurred()) - storageDiff := statediff.StorageDiff{ + storageDiff := statediff.StorageNode{ LeafKey: []byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1}, NodeValue: storageValueRlp, NodeType: statediff.Leaf, diff --git a/libraries/shared/streamer/statediff_streamer.go b/libraries/shared/streamer/statediff_streamer.go index 7ba3abea..b5e371e0 100644 --- a/libraries/shared/streamer/statediff_streamer.go +++ b/libraries/shared/streamer/statediff_streamer.go @@ -26,7 +26,7 @@ import ( // Streamer is the interface for streaming a statediff subscription type Streamer interface { - Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) + Stream(payloadChan chan statediff.Payload, params statediff.Params) (*rpc.ClientSubscription, error) } // StateDiffStreamer is the underlying struct for the StateDiffStreamer interface @@ -42,7 +42,7 @@ func NewStateDiffStreamer(client core.RPCClient) Streamer { } // Stream is the main loop for subscribing to data from the Geth state diff process -func (sds *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { +func (sds *StateDiffStreamer) Stream(payloadChan chan statediff.Payload, params statediff.Params) (*rpc.ClientSubscription, error) { logrus.Info("streaming diffs from geth") - return sds.Client.Subscribe("statediff", payloadChan, "stream") + return sds.Client.Subscribe("statediff", payloadChan, "stream", params) } diff --git a/libraries/shared/streamer/statediff_streamer_test.go b/libraries/shared/streamer/statediff_streamer_test.go index c92afd41..425a27b7 100644 --- a/libraries/shared/streamer/statediff_streamer_test.go +++ b/libraries/shared/streamer/statediff_streamer_test.go @@ -28,9 +28,14 @@ var _ = Describe("StateDiff Streamer", func() { client := &fakes.MockRPCClient{} streamer := streamer.NewStateDiffStreamer(client) payloadChan := make(chan statediff.Payload) - _, err := streamer.Stream(payloadChan) + params := statediff.Params{ + IncludeBlock: true, + IncludeTD: true, + IncludeReceipts: true, + } + _, err := streamer.Stream(payloadChan, params) Expect(err).NotTo(HaveOccurred()) - client.AssertSubscribeCalledWith("statediff", payloadChan, []interface{}{"stream"}) + client.AssertSubscribeCalledWith("statediff", payloadChan, []interface{}{"stream", params}) }) }) diff --git a/libraries/shared/test_data/statediff.go b/libraries/shared/test_data/statediff.go index a7f15758..7a6f2a13 100644 --- a/libraries/shared/test_data/statediff.go +++ b/libraries/shared/test_data/statediff.go @@ -40,7 +40,7 @@ var ( StorageKey = common.HexToHash("0000000000000000000000000000000000000000000000000000000000000001").Bytes() SmallStorageValue = common.Hex2Bytes("03") SmallStorageValueRlp, _ = rlp.EncodeToBytes(SmallStorageValue) - storageWithSmallValue = []statediff.StorageDiff{{ + storageWithSmallValue = []statediff.StorageNode{{ LeafKey: StorageKey, NodeValue: SmallStorageValueRlp, NodeType: statediff.Leaf, @@ -48,13 +48,13 @@ var ( }} LargeStorageValue = common.Hex2Bytes("00191b53778c567b14b50ba0000") LargeStorageValueRlp, _ = rlp.EncodeToBytes(LargeStorageValue) - storageWithLargeValue = []statediff.StorageDiff{{ + storageWithLargeValue = []statediff.StorageNode{{ LeafKey: StorageKey, NodeValue: LargeStorageValueRlp, Path: StoragePath, NodeType: statediff.Leaf, }} - StorageWithBadValue = statediff.StorageDiff{ + StorageWithBadValue = statediff.StorageNode{ LeafKey: StorageKey, NodeValue: []byte{0, 1, 2}, NodeType: statediff.Leaf, @@ -74,44 +74,40 @@ var ( CodeHash: CodeHash, } valueBytes, _ = rlp.EncodeToBytes(testAccount) - CreatedAccountDiffs = []statediff.AccountDiff{ + CreatedAccountDiffs = []statediff.StateNode{ { - LeafKey: ContractLeafKey.Bytes(), - NodeValue: valueBytes, - Storage: storageWithSmallValue, + LeafKey: ContractLeafKey.Bytes(), + NodeValue: valueBytes, + StorageNodes: storageWithSmallValue, }, } - UpdatedAccountDiffs = []statediff.AccountDiff{{ - LeafKey: AnotherContractLeafKey.Bytes(), - NodeValue: valueBytes, - Storage: storageWithLargeValue, + UpdatedAccountDiffs = []statediff.StateNode{{ + LeafKey: AnotherContractLeafKey.Bytes(), + NodeValue: valueBytes, + StorageNodes: storageWithLargeValue, }} - UpdatedAccountDiffs2 = []statediff.AccountDiff{{ - LeafKey: AnotherContractLeafKey.Bytes(), - NodeValue: valueBytes, - Storage: storageWithSmallValue, + UpdatedAccountDiffs2 = []statediff.StateNode{{ + LeafKey: AnotherContractLeafKey.Bytes(), + NodeValue: valueBytes, + StorageNodes: storageWithSmallValue, }} - DeletedAccountDiffs = []statediff.AccountDiff{{ - LeafKey: AnotherContractLeafKey.Bytes(), - NodeValue: valueBytes, - Storage: storageWithSmallValue, + DeletedAccountDiffs = []statediff.StateNode{{ + LeafKey: AnotherContractLeafKey.Bytes(), + NodeValue: valueBytes, + StorageNodes: storageWithSmallValue, }} - MockStateDiff = statediff.StateDiff{ - BlockNumber: BlockNumber, - BlockHash: common.HexToHash(BlockHash), - CreatedAccounts: CreatedAccountDiffs, - DeletedAccounts: DeletedAccountDiffs, - UpdatedAccounts: UpdatedAccountDiffs, + MockStateDiff = statediff.StateObject{ + BlockNumber: BlockNumber, + BlockHash: common.HexToHash(BlockHash), + Nodes: append(append(CreatedAccountDiffs, UpdatedAccountDiffs...), DeletedAccountDiffs...), } - MockStateDiff2 = statediff.StateDiff{ - BlockNumber: BlockNumber2, - BlockHash: common.HexToHash(BlockHash2), - CreatedAccounts: nil, - DeletedAccounts: nil, - UpdatedAccounts: UpdatedAccountDiffs2, + MockStateDiff2 = statediff.StateObject{ + BlockNumber: BlockNumber2, + BlockHash: common.HexToHash(BlockHash2), + Nodes: UpdatedAccountDiffs2, } MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff) MockStateDiff2Bytes, _ = rlp.EncodeToBytes(MockStateDiff2) @@ -144,12 +140,12 @@ var ( MockBlockRlp2, _ = rlp.EncodeToBytes(MockBlock2) MockStatediffPayload = statediff.Payload{ - BlockRlp: MockBlockRlp, - StateDiffRlp: MockStateDiffBytes, + BlockRlp: MockBlockRlp, + StateObjectRlp: MockStateDiffBytes, } MockStatediffPayload2 = statediff.Payload{ - BlockRlp: MockBlockRlp2, - StateDiffRlp: MockStateDiff2Bytes, + BlockRlp: MockBlockRlp2, + StateObjectRlp: MockStateDiff2Bytes, } CreatedExpectedStorageDiff = utils.StorageDiffInput{ diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index 6355065e..588a9961 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -17,9 +17,7 @@ package super_node import ( - "fmt" "sync" - "sync/atomic" "time" log "github.com/sirupsen/logrus" @@ -37,6 +35,7 @@ const ( type BackFillInterface interface { // Method for the super node to periodically check for and fill in gaps in its data using an archival node BackFill(wg *sync.WaitGroup) + Stop() error } // BackFillService for filling in gaps in the super node @@ -62,7 +61,7 @@ type BackFillService struct { // Channel for receiving quit signal QuitChan chan bool // Chain type - Chain shared.ChainType + chain shared.ChainType // Headers with times_validated lower than this will be resynced validationLevel int } @@ -107,8 +106,8 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert BatchSize: batchSize, BatchNumber: int64(batchNumber), ScreenAndServeChan: screenAndServeChan, - QuitChan: settings.Quit, - Chain: settings.Chain, + QuitChan: make(chan bool), + chain: settings.Chain, validationLevel: settings.ValidationLevel, }, nil } @@ -116,119 +115,97 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert // BackFill periodically checks for and fills in gaps in the super node db func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) { ticker := time.NewTicker(bfs.GapCheckFrequency) - wg.Add(1) - go func() { + wg.Add(1) + defer wg.Done() for { select { case <-bfs.QuitChan: - log.Infof("quiting %s FillGapsInSuperNode process", bfs.Chain.String()) - wg.Done() + log.Infof("quiting %s FillGapsInSuperNode process", bfs.chain.String()) return case <-ticker.C: - log.Infof("searching for gaps in the %s super node database", bfs.Chain.String()) - startingBlock, err := bfs.Retriever.RetrieveFirstBlockNumber() - if err != nil { - log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.Chain.String(), err) - continue - } - if startingBlock != 0 { - log.Infof("found gap at the beginning of the %s sync", bfs.Chain.String()) - if err := bfs.backFill(0, uint64(startingBlock-1)); err != nil { - log.Error(err) - } - } gaps, err := bfs.Retriever.RetrieveGapsInData(bfs.validationLevel) if err != nil { - log.Errorf("super node db backfill RetrieveGapsInData error for chain %s: %v", bfs.Chain.String(), err) + log.Errorf("%s super node db backFill RetrieveGapsInData error: %v", bfs.chain.String(), err) continue } + // spin up worker goroutines for this search pass + // we start and kill a new batch of workers for each pass + // so that we know each of the previous workers is done before we search for new gaps + heightsChan := make(chan []uint64) + for i := 1; i <= int(bfs.BatchNumber); i++ { + go bfs.backFill(wg, i, heightsChan) + } for _, gap := range gaps { - if err := bfs.backFill(gap.Start, gap.Stop); err != nil { - log.Error(err) - } - } - } - } - }() - log.Infof("%s BackFill goroutine successfully spun up", bfs.Chain.String()) -} - -// backFill fetches, processes, and returns utils.StorageDiffs over a range of blocks -// It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently -func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error { - log.Infof("filling in %s gap from %d to %d", bfs.Chain.String(), startingBlock, endingBlock) - if endingBlock < startingBlock { - return fmt.Errorf("super node %s db backfill: ending block number needs to be greater than starting block number", bfs.Chain.String()) - } - // break the range up into bins of smaller ranges - blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, bfs.BatchSize) - if err != nil { - return err - } - // int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have - var activeCount int64 - // channel for processing goroutines to signal when they are done - processingDone := make(chan bool) - forwardDone := make(chan bool) - - // for each block range bin spin up a goroutine to batch fetch and process data for that range - go func() { - for _, blockHeights := range blockRangeBins { - // if we have reached our limit of active goroutines - // wait for one to finish before starting the next - if atomic.AddInt64(&activeCount, 1) > bfs.BatchNumber { - // this blocks until a process signals it has finished - <-forwardDone - } - go func(blockHeights []uint64) { - payloads, err := bfs.Fetcher.FetchAt(blockHeights) - if err != nil { - log.Errorf("%s super node historical data fetcher error: %s", bfs.Chain.String(), err.Error()) - } - for _, payload := range payloads { - ipldPayload, err := bfs.Converter.Convert(payload) + log.Infof("backFilling %s data from %d to %d", bfs.chain.String(), gap.Start, gap.Stop) + blockRangeBins, err := utils.GetBlockHeightBins(gap.Start, gap.Stop, bfs.BatchSize) if err != nil { - log.Errorf("%s super node historical data converter error: %s", bfs.Chain.String(), err.Error()) - } - // If there is a ScreenAndServe process listening, forward payload to it - select { - case bfs.ScreenAndServeChan <- ipldPayload: - default: - } - cidPayload, err := bfs.Publisher.Publish(ipldPayload) - if err != nil { - log.Errorf("%s super node historical data publisher error: %s", bfs.Chain.String(), err.Error()) + log.Errorf("%s super node db backFill GetBlockHeightBins error: %v", bfs.chain.String(), err) continue } - if err := bfs.Indexer.Index(cidPayload); err != nil { - log.Errorf("%s super node historical data indexer error: %s", bfs.Chain.String(), err.Error()) + for _, heights := range blockRangeBins { + select { + case <-bfs.QuitChan: + log.Infof("quiting %s BackFill process", bfs.chain.String()) + return + default: + heightsChan <- heights + } } } - // when this goroutine is done, send out a signal - log.Infof("finished filling in %s gap from %d to %d", bfs.Chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1]) - processingDone <- true - }(blockHeights) + // send a quit signal to each worker + // this blocks until each worker has finished its current task and is free to receive from the quit channel + for i := 1; i <= int(bfs.BatchNumber); i++ { + bfs.QuitChan <- true + } + } } }() + log.Infof("%s BackFill goroutine successfully spun up", bfs.chain.String()) +} - // listen on the processingDone chan - // keeps track of the number of processing goroutines that have finished - // when they have all finished, return - goroutinesFinished := 0 +func (bfs *BackFillService) backFill(wg *sync.WaitGroup, id int, heightChan chan []uint64) { + wg.Add(1) + defer wg.Done() for { select { - case <-processingDone: - atomic.AddInt64(&activeCount, -1) - select { - // if we are waiting for a process to finish, signal that one has - case forwardDone <- true: - default: + case heights := <-heightChan: + log.Debugf("%s backFill worker %d processing section from %d to %d", bfs.chain.String(), id, heights[0], heights[len(heights)-1]) + payloads, err := bfs.Fetcher.FetchAt(heights) + if err != nil { + log.Errorf("%s backFill worker %d fetcher error: %s", bfs.chain.String(), id, err.Error()) } - goroutinesFinished++ - if goroutinesFinished >= len(blockRangeBins) { - return nil + for _, payload := range payloads { + ipldPayload, err := bfs.Converter.Convert(payload) + if err != nil { + log.Errorf("%s backFill worker %d converter error: %s", bfs.chain.String(), id, err.Error()) + } + // If there is a ScreenAndServe process listening, forward converted payload to it + select { + case bfs.ScreenAndServeChan <- ipldPayload: + log.Debugf("%s backFill worker %d forwarded converted payload to server", bfs.chain.String(), id) + default: + log.Debugf("%s backFill worker %d unable to forward converted payload to server; no channel ready to receive", bfs.chain.String(), id) + } + cidPayload, err := bfs.Publisher.Publish(ipldPayload) + if err != nil { + log.Errorf("%s backFill worker %d publisher error: %s", bfs.chain.String(), id, err.Error()) + continue + } + if err := bfs.Indexer.Index(cidPayload); err != nil { + log.Errorf("%s backFill worker %d indexer error: %s", bfs.chain.String(), id, err.Error()) + } } + log.Infof("%s backFill worker %d finished section from %d to %d", bfs.chain.String(), id, heights[0], heights[len(heights)-1]) + case <-bfs.QuitChan: + log.Infof("%s backFill worker %d shutting down", bfs.chain.String(), id) + return } } } + +func (bfs *BackFillService) Stop() error { + log.Infof("Stopping %s backFill service", bfs.chain.String()) + close(bfs.QuitChan) + return nil +} diff --git a/pkg/super_node/backfiller_test.go b/pkg/super_node/backfiller_test.go index 1df2db68..e0e804ff 100644 --- a/pkg/super_node/backfiller_test.go +++ b/pkg/super_node/backfiller_test.go @@ -69,7 +69,6 @@ var _ = Describe("BackFiller", func() { BatchSize: super_node.DefaultMaxBatchSize, BatchNumber: super_node.DefaultMaxBatchNumber, QuitChan: quitChan, - Chain: shared.Ethereum, } wg := &sync.WaitGroup{} backfiller.BackFill(wg) @@ -125,7 +124,6 @@ var _ = Describe("BackFiller", func() { BatchSize: super_node.DefaultMaxBatchSize, BatchNumber: super_node.DefaultMaxBatchNumber, QuitChan: quitChan, - Chain: shared.Ethereum, } wg := &sync.WaitGroup{} backfiller.BackFill(wg) @@ -156,7 +154,12 @@ var _ = Describe("BackFiller", func() { } mockRetriever := &mocks2.CIDRetriever{ FirstBlockNumberToReturn: 3, - GapsToRetrieve: []shared.Gap{}, + GapsToRetrieve: []shared.Gap{ + { + Start: 0, + Stop: 2, + }, + }, } mockFetcher := &mocks2.PayloadFetcher{ PayloadsToReturn: map[uint64]shared.RawChainData{ @@ -175,7 +178,6 @@ var _ = Describe("BackFiller", func() { BatchSize: super_node.DefaultMaxBatchSize, BatchNumber: super_node.DefaultMaxBatchNumber, QuitChan: quitChan, - Chain: shared.Ethereum, } wg := &sync.WaitGroup{} backfiller.BackFill(wg) diff --git a/pkg/super_node/btc/cid_retriever.go b/pkg/super_node/btc/cid_retriever.go index ce6f7630..f097d642 100644 --- a/pkg/super_node/btc/cid_retriever.go +++ b/pkg/super_node/btc/cid_retriever.go @@ -44,21 +44,21 @@ func NewCIDRetriever(db *postgres.DB) *CIDRetriever { } // RetrieveFirstBlockNumber is used to retrieve the first block number in the db -func (ecr *CIDRetriever) RetrieveFirstBlockNumber() (int64, error) { +func (bcr *CIDRetriever) RetrieveFirstBlockNumber() (int64, error) { var blockNumber int64 - err := ecr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number ASC LIMIT 1") + err := bcr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number ASC LIMIT 1") return blockNumber, err } // RetrieveLastBlockNumber is used to retrieve the latest block number in the db -func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) { +func (bcr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) { var blockNumber int64 - err := ecr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number DESC LIMIT 1 ") + err := bcr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number DESC LIMIT 1 ") return blockNumber, err } // Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters -func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) ([]shared.CIDsForFetching, bool, error) { +func (bcr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) ([]shared.CIDsForFetching, bool, error) { streamFilter, ok := filter.(*SubscriptionSettings) if !ok { return nil, true, fmt.Errorf("btc retriever expected filter type %T got %T", &SubscriptionSettings{}, filter) @@ -66,7 +66,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe log.Debug("retrieving cids") // Begin new db tx - tx, err := ecr.db.Beginx() + tx, err := bcr.db.Beginx() if err != nil { return nil, true, err } @@ -82,7 +82,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe }() // Retrieve cached header CIDs - headers, err := ecr.RetrieveHeaderCIDs(tx, blockNumber) + headers, err := bcr.RetrieveHeaderCIDs(tx, blockNumber) if err != nil { log.Error("header cid retrieval error") return nil, true, err @@ -98,7 +98,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe } // Retrieve cached trx CIDs if !streamFilter.TxFilter.Off { - cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, header.ID) + cw.Transactions, err = bcr.RetrieveTxCIDs(tx, streamFilter.TxFilter, header.ID) if err != nil { log.Error("transaction cid retrieval error") return nil, true, err @@ -114,7 +114,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe } // RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight -func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]HeaderModel, error) { +func (bcr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]HeaderModel, error) { log.Debug("retrieving header cids for block ", blockNumber) headers := make([]HeaderModel, 0) pgStr := `SELECT * FROM btc.header_cids @@ -124,7 +124,7 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]H // RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters // also returns the ids for the returned transaction cids -func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]TxModel, error) { +func (bcr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]TxModel, error) { log.Debug("retrieving transaction cids for header id ", headerID) args := make([]interface{}, 0, 3) results := make([]TxModel, 0) @@ -168,7 +168,22 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID } // RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db -func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) { +func (bcr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) { + log.Info("searching for gaps in the btc super node database") + startingBlock, err := bcr.RetrieveFirstBlockNumber() + if err != nil { + return nil, fmt.Errorf("btc CIDRetriever RetrieveFirstBlockNumber error: %v", err) + } + var initialGap []shared.Gap + if startingBlock != 0 { + stop := uint64(startingBlock - 1) + log.Infof("found gap at the beginning of the btc sync from 0 to %d", stop) + initialGap = []shared.Gap{{ + Start: 0, + Stop: stop, + }} + } + pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM btc.header_cids LEFT JOIN btc.header_cids r on btc.header_cids.block_number = r.block_number - 1 LEFT JOIN btc.header_cids fr on btc.header_cids.block_number < fr.block_number @@ -178,7 +193,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, Start uint64 `db:"start"` Stop uint64 `db:"stop"` }, 0) - if err := ecr.db.Select(&results, pgStr); err != nil && err != sql.ErrNoRows { + if err := bcr.db.Select(&results, pgStr); err != nil && err != sql.ErrNoRows { return nil, err } emptyGaps := make([]shared.Gap, len(results)) @@ -195,18 +210,18 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, WHERE times_validated < $1 ORDER BY block_number` var heights []uint64 - if err := ecr.db.Select(&heights, pgStr, validationLevel); err != nil && err != sql.ErrNoRows { + if err := bcr.db.Select(&heights, pgStr, validationLevel); err != nil && err != sql.ErrNoRows { return nil, err } - return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil + return append(append(initialGap, emptyGaps...), utils.MissingHeightsToGaps(heights)...), nil } // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash -func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel, []TxModel, error) { +func (bcr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel, []TxModel, error) { log.Debug("retrieving block cids for block hash ", blockHash.String()) // Begin new db tx - tx, err := ecr.db.Beginx() + tx, err := bcr.db.Beginx() if err != nil { return HeaderModel{}, nil, err } @@ -221,12 +236,12 @@ func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel } }() - headerCID, err := ecr.RetrieveHeaderCIDByHash(tx, blockHash) + headerCID, err := bcr.RetrieveHeaderCIDByHash(tx, blockHash) if err != nil { log.Error("header cid retrieval error") return HeaderModel{}, nil, err } - txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID) + txCIDs, err := bcr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID) if err != nil { log.Error("tx cid retrieval error") } @@ -234,11 +249,11 @@ func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel } // RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number -func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, []TxModel, error) { +func (bcr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, []TxModel, error) { log.Debug("retrieving block cids for block number ", blockNumber) // Begin new db tx - tx, err := ecr.db.Beginx() + tx, err := bcr.db.Beginx() if err != nil { return HeaderModel{}, nil, err } @@ -253,7 +268,7 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, } }() - headerCID, err := ecr.RetrieveHeaderCIDs(tx, blockNumber) + headerCID, err := bcr.RetrieveHeaderCIDs(tx, blockNumber) if err != nil { log.Error("header cid retrieval error") return HeaderModel{}, nil, err @@ -261,7 +276,7 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, if len(headerCID) < 1 { return HeaderModel{}, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber) } - txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID) + txCIDs, err := bcr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID) if err != nil { log.Error("tx cid retrieval error") } @@ -269,7 +284,7 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, } // RetrieveHeaderCIDByHash returns the header for the given block hash -func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (HeaderModel, error) { +func (bcr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (HeaderModel, error) { log.Debug("retrieving header cids for block hash ", blockHash.String()) pgStr := `SELECT * FROM btc.header_cids WHERE block_hash = $1` @@ -278,7 +293,7 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.H } // RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id -func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]TxModel, error) { +func (bcr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]TxModel, error) { log.Debug("retrieving tx cids for block id ", headerID) pgStr := `SELECT * FROM btc.transaction_cids WHERE header_id = $1` diff --git a/pkg/super_node/btc/publishAndIndexer.go b/pkg/super_node/btc/publish_and_indexer.go similarity index 100% rename from pkg/super_node/btc/publishAndIndexer.go rename to pkg/super_node/btc/publish_and_indexer.go diff --git a/pkg/super_node/btc/publishAndIndexer_test.go b/pkg/super_node/btc/publish_and_indexer_test.go similarity index 100% rename from pkg/super_node/btc/publishAndIndexer_test.go rename to pkg/super_node/btc/publish_and_indexer_test.go diff --git a/pkg/super_node/config.go b/pkg/super_node/config.go index ba1547b7..ba582160 100644 --- a/pkg/super_node/config.go +++ b/pkg/super_node/config.go @@ -66,7 +66,6 @@ type Config struct { IPFSPath string IPFSMode shared.IPFSMode DBConfig config.Database - Quit chan bool // Server fields Serve bool ServeDBConn *postgres.DB @@ -182,8 +181,6 @@ func NewSuperNodeConfig() (*Config, error) { } } - c.Quit = make(chan bool) - return c, nil } diff --git a/pkg/super_node/eth/cid_retriever.go b/pkg/super_node/eth/cid_retriever.go index e7acf571..d87541c7 100644 --- a/pkg/super_node/eth/cid_retriever.go +++ b/pkg/super_node/eth/cid_retriever.go @@ -443,6 +443,21 @@ func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageF // RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db // it finds the union of heights where no data exists and where the times_validated is lower than the validation level func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) { + log.Info("searching for gaps in the eth super node database") + startingBlock, err := ecr.RetrieveFirstBlockNumber() + if err != nil { + return nil, fmt.Errorf("eth CIDRetriever RetrieveFirstBlockNumber error: %v", err) + } + var initialGap []shared.Gap + if startingBlock != 0 { + stop := uint64(startingBlock - 1) + log.Infof("found gap at the beginning of the eth sync from 0 to %d", stop) + initialGap = []shared.Gap{{ + Start: 0, + Stop: stop, + }} + } + pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM eth.header_cids LEFT JOIN eth.header_cids r on eth.header_cids.block_number = r.block_number - 1 LEFT JOIN eth.header_cids fr on eth.header_cids.block_number < fr.block_number @@ -472,7 +487,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, if err := ecr.db.Select(&heights, pgStr, validationLevel); err != nil && err != sql.ErrNoRows { return nil, err } - return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil + return append(append(initialGap, emptyGaps...), utils.MissingHeightsToGaps(heights)...), nil } // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash diff --git a/pkg/super_node/eth/cid_retriever_test.go b/pkg/super_node/eth/cid_retriever_test.go index 7a2a4a7c..ecbf7311 100644 --- a/pkg/super_node/eth/cid_retriever_test.go +++ b/pkg/super_node/eth/cid_retriever_test.go @@ -474,54 +474,64 @@ var _ = Describe("Retriever", func() { Describe("RetrieveGapsInData", func() { It("Doesn't return gaps if there are none", func() { + payload0 := *mocks.MockCIDPayload + payload0.HeaderCID.BlockNumber = "0" payload1 := *mocks.MockCIDPayload - payload1.HeaderCID.BlockNumber = "2" + payload1.HeaderCID.BlockNumber = "1" payload2 := payload1 - payload2.HeaderCID.BlockNumber = "3" - err := repo.Index(mocks.MockCIDPayload) + payload2.HeaderCID.BlockNumber = "2" + payload3 := payload2 + payload3.HeaderCID.BlockNumber = "3" + err := repo.Index(&payload0) Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload1) Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload2) Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload3) + Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(0)) }) - It("Doesn't return the gap from 0 to the earliest block", func() { + It("Returns the gap from 0 to the earliest block", func() { payload := *mocks.MockCIDPayload payload.HeaderCID.BlockNumber = "5" err := repo.Index(&payload) Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) - Expect(len(gaps)).To(Equal(0)) + Expect(len(gaps)).To(Equal(1)) + Expect(gaps[0].Start).To(Equal(uint64(0))) + Expect(gaps[0].Stop).To(Equal(uint64(4))) }) It("Can handle single block gaps", func() { + payload0 := *mocks.MockCIDPayload + payload0.HeaderCID.BlockNumber = "0" payload1 := *mocks.MockCIDPayload - payload1.HeaderCID.BlockNumber = "2" - payload2 := payload1 - payload2.HeaderCID.BlockNumber = "4" - err := repo.Index(mocks.MockCIDPayload) + payload1.HeaderCID.BlockNumber = "1" + payload3 := payload1 + payload3.HeaderCID.BlockNumber = "3" + err := repo.Index(&payload0) Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload1) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload2) + err = repo.Index(&payload3) Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(1)) - Expect(gaps[0].Start).To(Equal(uint64(3))) - Expect(gaps[0].Stop).To(Equal(uint64(3))) + Expect(gaps[0].Start).To(Equal(uint64(2))) + Expect(gaps[0].Stop).To(Equal(uint64(2))) }) It("Finds gap between two entries", func() { payload1 := *mocks.MockCIDPayload payload1.HeaderCID.BlockNumber = "1010101" payload2 := payload1 - payload2.HeaderCID.BlockNumber = "5" + payload2.HeaderCID.BlockNumber = "0" err := repo.Index(&payload1) Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload2) @@ -529,13 +539,15 @@ var _ = Describe("Retriever", func() { gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) Expect(len(gaps)).To(Equal(1)) - Expect(gaps[0].Start).To(Equal(uint64(6))) + Expect(gaps[0].Start).To(Equal(uint64(1))) Expect(gaps[0].Stop).To(Equal(uint64(1010100))) }) It("Finds gaps between multiple entries", func() { - payload1 := *mocks.MockCIDPayload - payload1.HeaderCID.BlockNumber = "1010101" + payload := *mocks.MockCIDPayload + payload.HeaderCID.BlockNumber = "1010101" + payload1 := payload + payload1.HeaderCID.BlockNumber = "1" payload2 := payload1 payload2.HeaderCID.BlockNumber = "5" payload3 := payload2 @@ -554,7 +566,9 @@ var _ = Describe("Retriever", func() { payload9.HeaderCID.BlockNumber = "106" payload10 := payload5 payload10.HeaderCID.BlockNumber = "1000" - err := repo.Index(&payload1) + err := repo.Index(&payload) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload1) Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload2) Expect(err).ToNot(HaveOccurred()) @@ -577,15 +591,19 @@ var _ = Describe("Retriever", func() { gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) - Expect(len(gaps)).To(Equal(3)) + Expect(len(gaps)).To(Equal(5)) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 0, Stop: 0})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 2, Stop: 4})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue()) }) It("Finds validation level gaps", func() { - payload1 := *mocks.MockCIDPayload - payload1.HeaderCID.BlockNumber = "1010101" + payload := *mocks.MockCIDPayload + payload.HeaderCID.BlockNumber = "1010101" + payload1 := payload + payload1.HeaderCID.BlockNumber = "1" payload2 := payload1 payload2.HeaderCID.BlockNumber = "5" payload3 := payload2 @@ -610,7 +628,9 @@ var _ = Describe("Retriever", func() { payload12.HeaderCID.BlockNumber = "109" payload13 := payload5 payload13.HeaderCID.BlockNumber = "1000" - err := repo.Index(&payload1) + err := repo.Index(&payload) + Expect(err).ToNot(HaveOccurred()) + err = repo.Index(&payload1) Expect(err).ToNot(HaveOccurred()) err = repo.Index(&payload2) Expect(err).ToNot(HaveOccurred()) @@ -643,7 +663,9 @@ var _ = Describe("Retriever", func() { gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) - Expect(len(gaps)).To(Equal(6)) + Expect(len(gaps)).To(Equal(8)) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 0, Stop: 0})).To(BeTrue()) + Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 2, Stop: 4})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 101, Stop: 102})).To(BeTrue()) Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 104, Stop: 104})).To(BeTrue()) diff --git a/pkg/super_node/eth/cleaner_test.go b/pkg/super_node/eth/cleaner_test.go index 3ced56ac..b7b8ad2f 100644 --- a/pkg/super_node/eth/cleaner_test.go +++ b/pkg/super_node/eth/cleaner_test.go @@ -124,8 +124,8 @@ var ( storageCID = "mockStorageCID1" storagePath = []byte{'\x01'} storageKey = crypto.Keccak256Hash(common.Hex2Bytes("0x0000000000000000000000000000000000000000000000000000000000000000")) - storageModels1 = map[common.Hash][]eth2.StorageNodeModel{ - crypto.Keccak256Hash(state1Path): { + storageModels1 = map[string][]eth2.StorageNodeModel{ + common.Bytes2Hex(state1Path): { { CID: storageCID, StorageKey: storageKey.String(), diff --git a/pkg/super_node/eth/converter.go b/pkg/super_node/eth/converter.go index d6287293..ca2b587f 100644 --- a/pkg/super_node/eth/converter.go +++ b/pkg/super_node/eth/converter.go @@ -61,7 +61,7 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert Receipts: make(types.Receipts, 0, trxLen), ReceiptMetaData: make([]ReceiptModel, 0, trxLen), StateNodes: make([]TrieNode, 0), - StorageNodes: make(map[common.Hash][]TrieNode), + StorageNodes: make(map[string][]TrieNode), } signer := types.MakeSigner(pc.chainConfig, block.Number()) transactions := block.Transactions() @@ -100,10 +100,12 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert } mappedContracts[log.Address.String()] = true } + // These are the contracts seen in the logs logContracts := make([]string, 0, len(mappedContracts)) for addr := range mappedContracts { logContracts = append(logContracts, addr) } + // This is the contract address if this receipt is for a contract creation tx contract := shared.HandleNullAddr(receipt.ContractAddress) var contractHash string if contract != "" { @@ -124,60 +126,27 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert } // Unpack state diff rlp to access fields - stateDiff := new(statediff.StateDiff) - if err := rlp.DecodeBytes(stateDiffPayload.StateDiffRlp, stateDiff); err != nil { + stateDiff := new(statediff.StateObject) + if err := rlp.DecodeBytes(stateDiffPayload.StateObjectRlp, stateDiff); err != nil { return nil, err } - for _, createdAccount := range stateDiff.CreatedAccounts { - statePathHash := crypto.Keccak256Hash(createdAccount.Path) + for _, stateNode := range stateDiff.Nodes { + statePath := common.Bytes2Hex(stateNode.Path) convertedPayload.StateNodes = append(convertedPayload.StateNodes, TrieNode{ - Path: createdAccount.Path, - Value: createdAccount.NodeValue, - Type: createdAccount.NodeType, - LeafKey: common.BytesToHash(createdAccount.LeafKey), + Path: stateNode.Path, + Value: stateNode.NodeValue, + Type: stateNode.NodeType, + LeafKey: common.BytesToHash(stateNode.LeafKey), }) - for _, storageDiff := range createdAccount.Storage { - convertedPayload.StorageNodes[statePathHash] = append(convertedPayload.StorageNodes[statePathHash], TrieNode{ - Path: storageDiff.Path, - Value: storageDiff.NodeValue, - Type: storageDiff.NodeType, - LeafKey: common.BytesToHash(storageDiff.LeafKey), - }) - } - } - for _, deletedAccount := range stateDiff.DeletedAccounts { - statePathHash := crypto.Keccak256Hash(deletedAccount.Path) - convertedPayload.StateNodes = append(convertedPayload.StateNodes, TrieNode{ - Path: deletedAccount.Path, - Value: deletedAccount.NodeValue, - Type: deletedAccount.NodeType, - LeafKey: common.BytesToHash(deletedAccount.LeafKey), - }) - for _, storageDiff := range deletedAccount.Storage { - convertedPayload.StorageNodes[statePathHash] = append(convertedPayload.StorageNodes[statePathHash], TrieNode{ - Path: storageDiff.Path, - Value: storageDiff.NodeValue, - Type: storageDiff.NodeType, - LeafKey: common.BytesToHash(storageDiff.LeafKey), - }) - } - } - for _, updatedAccount := range stateDiff.UpdatedAccounts { - statePathHash := crypto.Keccak256Hash(updatedAccount.Path) - convertedPayload.StateNodes = append(convertedPayload.StateNodes, TrieNode{ - Path: updatedAccount.Path, - Value: updatedAccount.NodeValue, - Type: updatedAccount.NodeType, - LeafKey: common.BytesToHash(updatedAccount.LeafKey), - }) - for _, storageDiff := range updatedAccount.Storage { - convertedPayload.StorageNodes[statePathHash] = append(convertedPayload.StorageNodes[statePathHash], TrieNode{ - Path: storageDiff.Path, - Value: storageDiff.NodeValue, - Type: storageDiff.NodeType, - LeafKey: common.BytesToHash(storageDiff.LeafKey), + for _, storageNode := range stateNode.StorageNodes { + convertedPayload.StorageNodes[statePath] = append(convertedPayload.StorageNodes[statePath], TrieNode{ + Path: storageNode.Path, + Value: storageNode.NodeValue, + Type: storageNode.NodeType, + LeafKey: common.BytesToHash(storageNode.LeafKey), }) } } + return convertedPayload, nil } diff --git a/pkg/super_node/eth/filterer.go b/pkg/super_node/eth/filterer.go index 6ae7d691..0407ac7c 100644 --- a/pkg/super_node/eth/filterer.go +++ b/pkg/super_node/eth/filterer.go @@ -291,7 +291,7 @@ func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storag } } if !storageFilter.Off && checkNodeKeys(storageAddressFilters, stateNode.LeafKey) { - for _, storageNode := range payload.StorageNodes[crypto.Keccak256Hash(stateNode.Path)] { + for _, storageNode := range payload.StorageNodes[common.Bytes2Hex(stateNode.Path)] { if checkNodeKeys(storageKeyFilters, storageNode.LeafKey) { cid, err := ipld.RawdataToCid(ipld.MEthStorageTrie, storageNode.Value, multihash.KECCAK_256) if err != nil { diff --git a/pkg/super_node/eth/helpers.go b/pkg/super_node/eth/helpers.go index c46cf430..7d9021ba 100644 --- a/pkg/super_node/eth/helpers.go +++ b/pkg/super_node/eth/helpers.go @@ -26,6 +26,8 @@ func ResolveFromNodeType(nodeType statediff.NodeType) int { return 1 case statediff.Leaf: return 2 + case statediff.Removed: + return 3 default: return -1 } @@ -39,6 +41,8 @@ func ResolveToNodeType(nodeType int) statediff.NodeType { return statediff.Extension case 2: return statediff.Leaf + case 3: + return statediff.Removed default: return statediff.Unknown } diff --git a/pkg/super_node/eth/indexer.go b/pkg/super_node/eth/indexer.go index 0bf179c2..a35231d5 100644 --- a/pkg/super_node/eth/indexer.go +++ b/pkg/super_node/eth/indexer.go @@ -20,7 +20,6 @@ import ( "fmt" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto" "github.com/jmoiron/sqlx" log "github.com/sirupsen/logrus" @@ -159,13 +158,13 @@ func (in *CIDIndexer) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *CIDPayload, } // If we have a state leaf node, index the associated account and storage nodes if stateCID.NodeType == 2 { - pathKey := crypto.Keccak256Hash(stateCID.Path) - for _, storageCID := range payload.StorageNodeCIDs[pathKey] { + statePath := common.Bytes2Hex(stateCID.Path) + for _, storageCID := range payload.StorageNodeCIDs[statePath] { if err := in.indexStorageCID(tx, storageCID, stateID); err != nil { return err } } - if stateAccount, ok := payload.StateAccounts[pathKey]; ok { + if stateAccount, ok := payload.StateAccounts[statePath]; ok { if err := in.indexStateAccount(tx, stateAccount, stateID); err != nil { return err } diff --git a/pkg/super_node/eth/mocks/test_data.go b/pkg/super_node/eth/mocks/test_data.go index af00a171..4c1eea14 100644 --- a/pkg/super_node/eth/mocks/test_data.go +++ b/pkg/super_node/eth/mocks/test_data.go @@ -218,7 +218,7 @@ var ( nonce1 = uint64(1) ContractRoot = "0x821e2556a290c86405f8160a2d662042a431ba456b9db265c79bb837c04be5f0" ContractCodeHash = common.HexToHash("0x753f98a8d4328b15636e46f66f2cb4bc860100aa17967cc145fcd17d1d4710ea") - contractPathHash = crypto.Keccak256Hash([]byte{'\x06'}) + contractPath = common.Bytes2Hex([]byte{'\x06'}) ContractLeafKey = testhelpers.AddressToLeafKey(ContractAddress) ContractAccount, _ = rlp.EncodeToBytes(state.Account{ Nonce: nonce1, @@ -235,7 +235,7 @@ var ( nonce0 = uint64(0) AccountRoot = "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421" AccountCodeHash = common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470") - accountPathHash = crypto.Keccak256Hash([]byte{'\x0c'}) + accountPath = common.Bytes2Hex([]byte{'\x0c'}) AccountAddresss = common.HexToAddress("0x0D3ab14BBaD3D99F4203bd7a11aCB94882050E7e") AccountLeafKey = testhelpers.Account2LeafKey Account, _ = rlp.EncodeToBytes(state.Account{ @@ -250,13 +250,13 @@ var ( Account, }) - CreatedAccountDiffs = []statediff.AccountDiff{ + StateDiffs = []statediff.StateNode{ { Path: []byte{'\x06'}, NodeType: statediff.Leaf, LeafKey: ContractLeafKey, NodeValue: ContractLeafNode, - Storage: []statediff.StorageDiff{ + StorageNodes: []statediff.StorageNode{ { Path: []byte{}, NodeType: statediff.Leaf, @@ -266,18 +266,18 @@ var ( }, }, { - Path: []byte{'\x0c'}, - NodeType: statediff.Leaf, - LeafKey: AccountLeafKey, - NodeValue: AccountLeafNode, - Storage: []statediff.StorageDiff{}, + Path: []byte{'\x0c'}, + NodeType: statediff.Leaf, + LeafKey: AccountLeafKey, + NodeValue: AccountLeafNode, + StorageNodes: []statediff.StorageNode{}, }, } - MockStateDiff = statediff.StateDiff{ - BlockNumber: BlockNumber, - BlockHash: MockBlock.Hash(), - CreatedAccounts: CreatedAccountDiffs, + MockStateDiff = statediff.StateObject{ + BlockNumber: BlockNumber, + BlockHash: MockBlock.Hash(), + Nodes: StateDiffs, } MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff) MockStateNodes = []eth.TrieNode{ @@ -308,8 +308,8 @@ var ( StateKey: common.BytesToHash(AccountLeafKey).Hex(), }, } - MockStorageNodes = map[common.Hash][]eth.TrieNode{ - contractPathHash: { + MockStorageNodes = map[string][]eth.TrieNode{ + contractPath: { { LeafKey: common.BytesToHash(StorageLeafKey), Value: StorageLeafNode, @@ -322,7 +322,7 @@ var ( // aggregate payloads MockStateDiffPayload = statediff.Payload{ BlockRlp: MockBlockRlp, - StateDiffRlp: MockStateDiffBytes, + StateObjectRlp: MockStateDiffBytes, ReceiptsRlp: ReceiptsRlp, TotalDifficulty: MockBlock.Difficulty(), } @@ -360,8 +360,8 @@ var ( MockTransactions[2].Hash(): MockRctMetaPostPublish[2], }, StateNodeCIDs: MockStateMetaPostPublish, - StorageNodeCIDs: map[common.Hash][]eth.StorageNodeModel{ - contractPathHash: { + StorageNodeCIDs: map[string][]eth.StorageNodeModel{ + contractPath: { { CID: StorageCID.String(), Path: []byte{}, @@ -370,14 +370,14 @@ var ( }, }, }, - StateAccounts: map[common.Hash]eth.StateAccountModel{ - contractPathHash: { + StateAccounts: map[string]eth.StateAccountModel{ + contractPath: { Balance: big.NewInt(0).String(), Nonce: nonce1, CodeHash: ContractCodeHash.Bytes(), StorageRoot: common.HexToHash(ContractRoot).String(), }, - accountPathHash: { + accountPath: { Balance: big.NewInt(1000).String(), Nonce: nonce0, CodeHash: AccountCodeHash.Bytes(), diff --git a/pkg/super_node/eth/payload_fetcher.go b/pkg/super_node/eth/payload_fetcher.go index b1f33649..f771ab46 100644 --- a/pkg/super_node/eth/payload_fetcher.go +++ b/pkg/super_node/eth/payload_fetcher.go @@ -38,34 +38,41 @@ type PayloadFetcher struct { // http.Client is thread-safe client BatchClient timeout time.Duration + params statediff.Params } const method = "statediff_stateDiffAt" -// NewStateDiffFetcher returns a PayloadFetcher +// NewPayloadFetcher returns a PayloadFetcher func NewPayloadFetcher(bc BatchClient, timeout time.Duration) *PayloadFetcher { return &PayloadFetcher{ client: bc, timeout: timeout, + params: statediff.Params{ + IncludeReceipts: true, + IncludeTD: true, + IncludeBlock: true, + IntermediateStateNodes: true, + IntermediateStorageNodes: true, + }, } } // FetchAt fetches the statediff payloads at the given block heights -// Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error) +// Calls StateDiffAt(ctx context.Context, blockNumber uint64, params Params) (*Payload, error) func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) { batch := make([]rpc.BatchElem, 0) for _, height := range blockHeights { batch = append(batch, rpc.BatchElem{ Method: method, - Args: []interface{}{height}, + Args: []interface{}{height, fetcher.params}, Result: new(statediff.Payload), }) } ctx, cancel := context.WithTimeout(context.Background(), fetcher.timeout) defer cancel() - batchErr := fetcher.client.BatchCallContext(ctx, batch) - if batchErr != nil { - return nil, fmt.Errorf("ethereum PayloadFetcher batch err for block range %d-%d: %s", blockHeights[0], blockHeights[len(blockHeights)-1], batchErr.Error()) + if err := fetcher.client.BatchCallContext(ctx, batch); err != nil { + return nil, fmt.Errorf("ethereum PayloadFetcher batch err for block range %d-%d: %s", blockHeights[0], blockHeights[len(blockHeights)-1], err.Error()) } results := make([]shared.RawChainData, 0, len(blockHeights)) for _, batchElem := range batch { diff --git a/pkg/super_node/eth/payload_fetcher_test.go b/pkg/super_node/eth/payload_fetcher_test.go index fb7b49b2..572c706a 100644 --- a/pkg/super_node/eth/payload_fetcher_test.go +++ b/pkg/super_node/eth/payload_fetcher_test.go @@ -36,10 +36,10 @@ var _ = Describe("StateDiffFetcher", func() { ) BeforeEach(func() { mc = new(mocks.BackFillerClient) - setDiffAtErr1 := mc.SetReturnDiffAt(test_data.BlockNumber.Uint64(), test_data.MockStatediffPayload) - Expect(setDiffAtErr1).ToNot(HaveOccurred()) - setDiffAtErr2 := mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2) - Expect(setDiffAtErr2).ToNot(HaveOccurred()) + err := mc.SetReturnDiffAt(test_data.BlockNumber.Uint64(), test_data.MockStatediffPayload) + Expect(err).ToNot(HaveOccurred()) + err = mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2) + Expect(err).ToNot(HaveOccurred()) stateDiffFetcher = eth.NewPayloadFetcher(mc, time.Second*60) }) It("Batch calls statediff_stateDiffAt", func() { @@ -47,8 +47,8 @@ var _ = Describe("StateDiffFetcher", func() { test_data.BlockNumber.Uint64(), test_data.BlockNumber2.Uint64(), } - stateDiffPayloads, fetchErr := stateDiffFetcher.FetchAt(blockHeights) - Expect(fetchErr).ToNot(HaveOccurred()) + stateDiffPayloads, err := stateDiffFetcher.FetchAt(blockHeights) + Expect(err).ToNot(HaveOccurred()) Expect(len(stateDiffPayloads)).To(Equal(2)) payload1, ok := stateDiffPayloads[0].(statediff.Payload) Expect(ok).To(BeTrue()) diff --git a/pkg/super_node/eth/publishAndIndexer.go b/pkg/super_node/eth/publish_and_indexer.go similarity index 97% rename from pkg/super_node/eth/publishAndIndexer.go rename to pkg/super_node/eth/publish_and_indexer.go index 3a1232cd..54f844a5 100644 --- a/pkg/super_node/eth/publishAndIndexer.go +++ b/pkg/super_node/eth/publish_and_indexer.go @@ -19,8 +19,8 @@ package eth import ( "fmt" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff" "github.com/jmoiron/sqlx" @@ -195,8 +195,7 @@ func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, if err := pub.indexer.indexStateAccount(tx, accountModel, stateID); err != nil { return err } - statePathHash := crypto.Keccak256Hash(stateNode.Path) - for _, storageNode := range ipldPayload.StorageNodes[statePathHash] { + for _, storageNode := range ipldPayload.StorageNodes[common.Bytes2Hex(stateNode.Path)] { storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.Value) if err != nil { return err diff --git a/pkg/super_node/eth/publishAndIndexer_test.go b/pkg/super_node/eth/publish_and_indexer_test.go similarity index 100% rename from pkg/super_node/eth/publishAndIndexer_test.go rename to pkg/super_node/eth/publish_and_indexer_test.go diff --git a/pkg/super_node/eth/publisher.go b/pkg/super_node/eth/publisher.go index 08b967ea..a4d839b1 100644 --- a/pkg/super_node/eth/publisher.go +++ b/pkg/super_node/eth/publisher.go @@ -22,7 +22,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff" @@ -206,9 +205,9 @@ func (pub *IPLDPublisher) publishReceipts(receipts []*ipld.EthReceipt, receiptTr return rctCids, nil } -func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeModel, map[common.Hash]StateAccountModel, error) { +func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeModel, map[string]StateAccountModel, error) { stateNodeCids := make([]StateNodeModel, 0, len(stateNodes)) - stateAccounts := make(map[common.Hash]StateAccountModel) + stateAccounts := make(map[string]StateAccountModel) for _, stateNode := range stateNodes { node, err := ipld.FromStateTrieRLP(stateNode.Value) if err != nil { @@ -238,8 +237,8 @@ func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeM return nil, nil, err } // Map state account to the state path hash - statePathHash := crypto.Keccak256Hash(stateNode.Path) - stateAccounts[statePathHash] = StateAccountModel{ + statePath := common.Bytes2Hex(stateNode.Path) + stateAccounts[statePath] = StateAccountModel{ Balance: account.Balance.String(), Nonce: account.Nonce, CodeHash: account.CodeHash, @@ -250,10 +249,10 @@ func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeM return stateNodeCids, stateAccounts, nil } -func (pub *IPLDPublisher) publishStorageNodes(storageNodes map[common.Hash][]TrieNode) (map[common.Hash][]StorageNodeModel, error) { - storageLeafCids := make(map[common.Hash][]StorageNodeModel) - for pathHash, storageTrie := range storageNodes { - storageLeafCids[pathHash] = make([]StorageNodeModel, 0, len(storageTrie)) +func (pub *IPLDPublisher) publishStorageNodes(storageNodes map[string][]TrieNode) (map[string][]StorageNodeModel, error) { + storageLeafCids := make(map[string][]StorageNodeModel) + for path, storageTrie := range storageNodes { + storageLeafCids[path] = make([]StorageNodeModel, 0, len(storageTrie)) for _, storageNode := range storageTrie { node, err := ipld.FromStorageTrieRLP(storageNode.Value) if err != nil { @@ -264,7 +263,7 @@ func (pub *IPLDPublisher) publishStorageNodes(storageNodes map[common.Hash][]Tri return nil, err } // Map storage node cids to the state path hash - storageLeafCids[pathHash] = append(storageLeafCids[pathHash], StorageNodeModel{ + storageLeafCids[path] = append(storageLeafCids[path], StorageNodeModel{ Path: storageNode.Path, StorageKey: storageNode.LeafKey.Hex(), CID: cid, diff --git a/pkg/super_node/eth/streamer.go b/pkg/super_node/eth/streamer.go index 31c60655..704b15d7 100644 --- a/pkg/super_node/eth/streamer.go +++ b/pkg/super_node/eth/streamer.go @@ -38,12 +38,20 @@ type StreamClient interface { // PayloadStreamer satisfies the PayloadStreamer interface for ethereum type PayloadStreamer struct { Client StreamClient + params statediff.Params } // NewPayloadStreamer creates a pointer to a new PayloadStreamer which satisfies the PayloadStreamer interface for ethereum func NewPayloadStreamer(client StreamClient) *PayloadStreamer { return &PayloadStreamer{ Client: client, + params: statediff.Params{ + IncludeBlock: true, + IncludeTD: true, + IncludeReceipts: true, + IntermediateStorageNodes: true, + IntermediateStateNodes: true, + }, } } @@ -60,5 +68,5 @@ func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared. } } }() - return ps.Client.Subscribe(context.Background(), "statediff", stateDiffChan, "stream") + return ps.Client.Subscribe(context.Background(), "statediff", stateDiffChan, "stream", ps.params) } diff --git a/pkg/super_node/eth/types.go b/pkg/super_node/eth/types.go index 03244814..79c43a7d 100644 --- a/pkg/super_node/eth/types.go +++ b/pkg/super_node/eth/types.go @@ -37,7 +37,7 @@ type ConvertedPayload struct { Receipts types.Receipts ReceiptMetaData []ReceiptModel StateNodes []TrieNode - StorageNodes map[common.Hash][]TrieNode + StorageNodes map[string][]TrieNode } // Height satisfies the StreamedIPLDs interface @@ -62,8 +62,8 @@ type CIDPayload struct { TransactionCIDs []TxModel ReceiptCIDs map[common.Hash]ReceiptModel StateNodeCIDs []StateNodeModel - StateAccounts map[common.Hash]StateAccountModel - StorageNodeCIDs map[common.Hash][]StorageNodeModel + StateAccounts map[string]StateAccountModel + StorageNodeCIDs map[string][]StorageNodeModel } // CIDWrapper is used to direct fetching of IPLDs from IPFS diff --git a/pkg/super_node/resync/config.go b/pkg/super_node/resync/config.go index e43460a6..22b77b83 100644 --- a/pkg/super_node/resync/config.go +++ b/pkg/super_node/resync/config.go @@ -60,8 +60,6 @@ type Config struct { BatchSize uint64 // BatchSize for the resync http calls (client has to support batch sizing) Timeout time.Duration // HTTP connection timeout in seconds BatchNumber uint64 - - Quit chan bool // Channel for shutting down } // NewReSyncConfig fills and returns a resync config from toml parameters @@ -136,7 +134,6 @@ func NewReSyncConfig() (*Config, error) { db := utils.LoadPostgres(c.DBConfig, c.NodeInfo) c.DB = &db - c.Quit = make(chan bool) c.BatchSize = uint64(viper.GetInt64("resync.batchSize")) c.BatchNumber = uint64(viper.GetInt64("resync.batchNumber")) return c, nil diff --git a/pkg/super_node/resync/service.go b/pkg/super_node/resync/service.go index b5c43f0f..4a613a20 100644 --- a/pkg/super_node/resync/service.go +++ b/pkg/super_node/resync/service.go @@ -18,8 +18,6 @@ package resync import ( "fmt" - "sync/atomic" - "github.com/sirupsen/logrus" utils "github.com/vulcanize/vulcanizedb/libraries/shared/utilities" @@ -49,7 +47,7 @@ type Service struct { // Number of goroutines BatchNumber int64 // Channel for receiving quit signal - QuitChan chan bool + quitChan chan bool // Chain type chain shared.ChainType // Resync data type @@ -105,7 +103,7 @@ func NewResyncService(settings *Config) (Resync, error) { Cleaner: cleaner, BatchSize: batchSize, BatchNumber: int64(batchNumber), - QuitChan: settings.Quit, + quitChan: make(chan bool), chain: settings.Chain, ranges: settings.Ranges, data: settings.ResyncType, @@ -127,81 +125,60 @@ func (rs *Service) Resync() error { return fmt.Errorf("%s %s data resync cleaning error: %v", rs.chain.String(), rs.data.String(), err) } } + // spin up worker goroutines + heightsChan := make(chan []uint64) + for i := 1; i <= int(rs.BatchNumber); i++ { + go rs.resync(i, heightsChan) + } for _, rng := range rs.ranges { - if err := rs.resync(rng[0], rng[1]); err != nil { - return fmt.Errorf("%s %s data resync initialization error: %v", rs.chain.String(), rs.data.String(), err) + if rng[1] < rng[0] { + logrus.Errorf("%s resync range ending block number needs to be greater than the starting block number", rs.chain.String()) + continue } + logrus.Infof("resyncing %s data from %d to %d", rs.chain.String(), rng[0], rng[1]) + // break the range up into bins of smaller ranges + blockRangeBins, err := utils.GetBlockHeightBins(rng[0], rng[1], rs.BatchSize) + if err != nil { + return err + } + for _, heights := range blockRangeBins { + heightsChan <- heights + } + } + // send a quit signal to each worker + // this blocks until each worker has finished its current task and can receive from the quit channel + for i := 1; i <= int(rs.BatchNumber); i++ { + rs.quitChan <- true } return nil } -func (rs *Service) resync(startingBlock, endingBlock uint64) error { - logrus.Infof("resyncing %s data from %d to %d", rs.chain.String(), startingBlock, endingBlock) - if endingBlock < startingBlock { - return fmt.Errorf("%s resync range ending block number needs to be greater than the starting block number", rs.chain.String()) - } - // break the range up into bins of smaller ranges - blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, rs.BatchSize) - if err != nil { - return err - } - // int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have - var activeCount int64 - // channel for processing goroutines to signal when they are done - processingDone := make(chan bool) - forwardDone := make(chan bool) - - // for each block range bin spin up a goroutine to batch fetch and process state diffs for that range - go func() { - for _, blockHeights := range blockRangeBins { - // if we have reached our limit of active goroutines - // wait for one to finish before starting the next - if atomic.AddInt64(&activeCount, 1) > rs.BatchNumber { - // this blocks until a process signals it has finished - <-forwardDone - } - go func(blockHeights []uint64) { - payloads, err := rs.Fetcher.FetchAt(blockHeights) - if err != nil { - logrus.Errorf("%s resync fetcher error: %s", rs.chain.String(), err.Error()) - } - for _, payload := range payloads { - ipldPayload, err := rs.Converter.Convert(payload) - if err != nil { - logrus.Errorf("%s resync converter error: %s", rs.chain.String(), err.Error()) - } - cidPayload, err := rs.Publisher.Publish(ipldPayload) - if err != nil { - logrus.Errorf("%s resync publisher error: %s", rs.chain.String(), err.Error()) - } - if err := rs.Indexer.Index(cidPayload); err != nil { - logrus.Errorf("%s resync indexer error: %s", rs.chain.String(), err.Error()) - } - } - // when this goroutine is done, send out a signal - logrus.Infof("finished %s resync section from %d to %d", rs.chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1]) - processingDone <- true - }(blockHeights) - } - }() - - // listen on the processingDone chan and - // keep track of the number of processing goroutines that have finished - // when they have all finished, sends the final signal out - goroutinesFinished := 0 +func (rs *Service) resync(id int, heightChan chan []uint64) { for { select { - case <-processingDone: - atomic.AddInt64(&activeCount, -1) - select { - // if we are waiting for a process to finish, signal that one has - case forwardDone <- true: - default: + case heights := <-heightChan: + logrus.Debugf("%s resync worker %d processing section from %d to %d", rs.chain.String(), id, heights[0], heights[len(heights)-1]) + payloads, err := rs.Fetcher.FetchAt(heights) + if err != nil { + logrus.Errorf("%s resync worker %d fetcher error: %s", rs.chain.String(), id, err.Error()) } - goroutinesFinished++ - if goroutinesFinished >= len(blockRangeBins) { - return nil + for _, payload := range payloads { + ipldPayload, err := rs.Converter.Convert(payload) + if err != nil { + logrus.Errorf("%s resync worker %d converter error: %s", rs.chain.String(), id, err.Error()) + } + cidPayload, err := rs.Publisher.Publish(ipldPayload) + if err != nil { + logrus.Errorf("%s resync worker %d publisher error: %s", rs.chain.String(), id, err.Error()) + } + if err := rs.Indexer.Index(cidPayload); err != nil { + logrus.Errorf("%s resync worker %d indexer error: %s", rs.chain.String(), id, err.Error()) + } } + logrus.Infof("%s resync worker %d finished section from %d to %d", rs.chain.String(), id, heights[0], heights[len(heights)-1]) + case <-rs.quitChan: + logrus.Infof("%s resync worker %d goroutine shutting down", rs.chain.String(), id) + return } } } diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index 1f09ea50..611eb85a 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -93,6 +93,8 @@ type Service struct { ipfsPath string // Underlying db db *postgres.DB + // wg for syncing serve processes + serveWg *sync.WaitGroup } // NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct @@ -134,7 +136,7 @@ func NewSuperNode(settings *Config) (SuperNode, error) { } sn.db = settings.ServeDBConn } - sn.QuitChan = settings.Quit + sn.QuitChan = make(chan bool) sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription) sn.SubscriptionTypes = make(map[common.Hash]shared.SubscriptionSettings) sn.WorkerPoolSize = settings.Workers @@ -195,16 +197,15 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared if err != nil { return err } - wg.Add(1) - - // Channels for forwarding data to the publishAndIndex workers + // spin up publishAndIndex worker goroutines publishAndIndexPayload := make(chan shared.ConvertedData, PayloadChanBufferSize) - // publishAndIndex worker pool to handle publishing and indexing concurrently, while - // limiting the number of Postgres connections we can possibly open so as to prevent error - for i := 0; i < sap.WorkerPoolSize; i++ { - sap.publishAndIndex(i, publishAndIndexPayload) + for i := 1; i <= sap.WorkerPoolSize; i++ { + go sap.publishAndIndex(wg, i, publishAndIndexPayload) + log.Debugf("%s publishAndIndex worker %d successfully spun up", sap.chain.String(), i) } go func() { + wg.Add(1) + defer wg.Done() for { select { case payload := <-sap.PayloadChan: @@ -230,8 +231,7 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared case err := <-sub.Err(): log.Errorf("super node subscription error for chain %s: %v", sap.chain.String(), err) case <-sap.QuitChan: - log.Infof("quiting %s SyncAndPublish process", sap.chain.String()) - wg.Done() + log.Infof("quiting %s Sync process", sap.chain.String()) return } } @@ -242,41 +242,44 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared // publishAndIndex is spun up by SyncAndConvert and receives converted chain data from that process // it publishes this data to IPFS and indexes their CIDs with useful metadata in Postgres -func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared.ConvertedData) { - go func() { - for { - select { - case payload := <-publishAndIndexPayload: - log.Debugf("publishing %s data streamed at head height %d", sap.chain.String(), payload.Height()) - cidPayload, err := sap.Publisher.Publish(payload) - if err != nil { - log.Errorf("super node publishAndIndex worker %d publishing error for chain %s: %v", id, sap.chain.String(), err) - continue - } - log.Debugf("indexing %s data streamed at head height %d", sap.chain.String(), payload.Height()) - if err := sap.Indexer.Index(cidPayload); err != nil { - log.Errorf("super node publishAndIndex worker %d indexing error for chain %s: %v", id, sap.chain.String(), err) - } +func (sap *Service) publishAndIndex(wg *sync.WaitGroup, id int, publishAndIndexPayload <-chan shared.ConvertedData) { + wg.Add(1) + defer wg.Done() + for { + select { + case payload := <-publishAndIndexPayload: + log.Debugf("%s super node publishAndIndex worker %d publishing data streamed at head height %d", sap.chain.String(), id, payload.Height()) + cidPayload, err := sap.Publisher.Publish(payload) + if err != nil { + log.Errorf("%s super node publishAndIndex worker %d publishing error: %v", sap.chain.String(), id, err) + continue } + log.Debugf("%s super node publishAndIndex worker %d indexing data streamed at head height %d", sap.chain.String(), id, payload.Height()) + if err := sap.Indexer.Index(cidPayload); err != nil { + log.Errorf("%s super node publishAndIndex worker %d indexing error: %v", sap.chain.String(), id, err) + } + case <-sap.QuitChan: + log.Infof("%s super node publishAndIndex worker %d shutting down", sap.chain.String(), id) + return } - }() - log.Debugf("%s publishAndIndex goroutine successfully spun up", sap.chain.String()) + } } -// Serve listens for incoming converter data off the screenAndServePayload from the SyncAndConvert process +// Serve listens for incoming converter data off the screenAndServePayload from the Sync process // It filters and sends this data to any subscribers to the service -// This process can be stood up alone, without an screenAndServePayload attached to a SyncAndConvert process +// This process can also be stood up alone, without an screenAndServePayload attached to a Sync process // and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) { - wg.Add(1) + sap.serveWg = wg go func() { + wg.Add(1) + defer wg.Done() for { select { case payload := <-screenAndServePayload: sap.filterAndServe(payload) case <-sap.QuitChan: - log.Infof("quiting %s ScreenAndServe process", sap.chain.String()) - wg.Done() + log.Infof("quiting %s Serve process", sap.chain.String()) return } } @@ -286,8 +289,11 @@ func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan share // filterAndServe filters the payload according to each subscription type and sends to the subscriptions func (sap *Service) filterAndServe(payload shared.ConvertedData) { - log.Debugf("Sending %s payload to subscriptions", sap.chain.String()) + log.Debugf("sending %s payload to subscriptions", sap.chain.String()) sap.Lock() + sap.serveWg.Add(1) + defer sap.Unlock() + defer sap.serveWg.Done() for ty, subs := range sap.Subscriptions { // Retrieve the subscription parameters for this subscription type subConfig, ok := sap.SubscriptionTypes[ty] @@ -322,12 +328,13 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) { } } } - sap.Unlock() } // Subscribe is used by the API to remotely subscribe to the service loop // The params must be rlp serializable and satisfy the SubscriptionSettings() interface func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) { + sap.serveWg.Add(1) + defer sap.serveWg.Done() log.Infof("New %s subscription %s", sap.chain.String(), id) subscription := Subscription{ ID: id, @@ -361,7 +368,7 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha // Otherwise we only filter new data as it is streamed in from the state diffing geth node if params.HistoricalData() || params.HistoricalDataOnly() { if err := sap.sendHistoricalData(subscription, id, params); err != nil { - sendNonBlockingErr(subscription, fmt.Errorf("super node subscriber backfill error for chain %s: %v", sap.chain.String(), err)) + sendNonBlockingErr(subscription, fmt.Errorf("%s super node subscriber backfill error: %v", sap.chain.String(), err)) sendNonBlockingQuit(subscription) return } @@ -392,10 +399,18 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share log.Debugf("%s historical data starting block: %d", sap.chain.String(), params.StartingBlock().Int64()) log.Debugf("%s historical data ending block: %d", sap.chain.String(), endingBlock) go func() { + sap.serveWg.Add(1) + defer sap.serveWg.Done() for i := startingBlock; i <= endingBlock; i++ { + select { + case <-sap.QuitChan: + log.Infof("%s super node historical data feed to subscription %s closed", sap.chain.String(), id) + return + default: + } cidWrappers, empty, err := sap.Retriever.Retrieve(params, i) if err != nil { - sendNonBlockingErr(sub, fmt.Errorf("super node %s CID Retrieval error at block %d\r%s", sap.chain.String(), i, err.Error())) + sendNonBlockingErr(sub, fmt.Errorf(" %s super node CID Retrieval error at block %d\r%s", sap.chain.String(), i, err.Error())) continue } if empty { @@ -404,7 +419,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share for _, cids := range cidWrappers { response, err := sap.IPLDFetcher.Fetch(cids) if err != nil { - sendNonBlockingErr(sub, fmt.Errorf("super node %s IPLD Fetching error at block %d\r%s", sap.chain.String(), i, err.Error())) + sendNonBlockingErr(sub, fmt.Errorf("%s super node IPLD Fetching error at block %d\r%s", sap.chain.String(), i, err.Error())) continue } responseRLP, err := rlp.EncodeToBytes(response) @@ -416,16 +431,16 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}: log.Debugf("sending super node historical data payload to %s subscription %s", sap.chain.String(), id) default: - log.Infof("unable to send back-fill payload to %s subscription %s; channel has no receiver", sap.chain.String(), id) + log.Infof("unable to send backFill payload to %s subscription %s; channel has no receiver", sap.chain.String(), id) } } } // when we are done backfilling send an empty payload signifying so in the msg select { case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Flag: BackFillCompleteFlag}: - log.Debugf("sending backfill completion notice to %s subscription %s", sap.chain.String(), id) + log.Debugf("sending backFill completion notice to %s subscription %s", sap.chain.String(), id) default: - log.Infof("unable to send backfill completion notice to %s subscription %s", sap.chain.String(), id) + log.Infof("unable to send backFill completion notice to %s subscription %s", sap.chain.String(), id) } }() return nil diff --git a/pkg/super_node/service_test.go b/pkg/super_node/service_test.go index face19e7..bc32849a 100644 --- a/pkg/super_node/service_test.go +++ b/pkg/super_node/service_test.go @@ -66,7 +66,7 @@ var _ = Describe("Service", func() { err := processor.Sync(wg, nil) Expect(err).ToNot(HaveOccurred()) time.Sleep(2 * time.Second) - quitChan <- true + close(quitChan) wg.Wait() Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload)) Expect(len(mockCidIndexer.PassedCIDPayload)).To(Equal(1)) diff --git a/pkg/watcher/eth/converter.go b/pkg/watcher/eth/converter.go index b08dbb31..f0fc99ce 100644 --- a/pkg/watcher/eth/converter.go +++ b/pkg/watcher/eth/converter.go @@ -56,7 +56,7 @@ func (pc *WatcherConverter) Convert(ethIPLDs eth.IPLDs) (*eth.CIDPayload, error) cids.TransactionCIDs = make([]eth.TxModel, numTxs) cids.ReceiptCIDs = make(map[common.Hash]eth.ReceiptModel, numTxs) cids.StateNodeCIDs = make([]eth.StateNodeModel, len(ethIPLDs.StateNodes)) - cids.StorageNodeCIDs = make(map[common.Hash][]eth.StorageNodeModel, len(ethIPLDs.StateNodes)) + cids.StorageNodeCIDs = make(map[string][]eth.StorageNodeModel, len(ethIPLDs.StateNodes)) // Unpack header var header types.Header @@ -164,7 +164,7 @@ func (pc *WatcherConverter) Convert(ethIPLDs eth.IPLDs) (*eth.CIDPayload, error) } // Storage data for _, storageIPLD := range ethIPLDs.StorageNodes { - cids.StorageNodeCIDs[storageIPLD.StateLeafKey] = append(cids.StorageNodeCIDs[storageIPLD.StateLeafKey], eth.StorageNodeModel{ + cids.StorageNodeCIDs[storageIPLD.StateLeafKey.Hex()] = append(cids.StorageNodeCIDs[storageIPLD.StateLeafKey.Hex()], eth.StorageNodeModel{ CID: storageIPLD.IPLD.CID, NodeType: eth.ResolveFromNodeType(storageIPLD.Type), StorageKey: storageIPLD.StorageLeafKey.String(),