forked from cerc-io/ipld-eth-server
Merge pull request #199 from vulcanize/pair_with_new_geth
Pair with new geth version
This commit is contained in:
commit
78e9fbd248
@ -16,6 +16,8 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
@ -85,15 +87,23 @@ func superNode() {
|
|||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
var backFiller super_node.BackFillInterface
|
||||||
if superNodeConfig.BackFill {
|
if superNodeConfig.BackFill {
|
||||||
logWithCommand.Debug("initializing new super node backfill service")
|
logWithCommand.Debug("initializing new super node backfill service")
|
||||||
backFiller, err := super_node.NewBackFillService(superNodeConfig, forwardPayloadChan)
|
backFiller, err = super_node.NewBackFillService(superNodeConfig, forwardPayloadChan)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logWithCommand.Fatal(err)
|
logWithCommand.Fatal(err)
|
||||||
}
|
}
|
||||||
logWithCommand.Info("starting up super node backfill process")
|
logWithCommand.Info("starting up super node backfill process")
|
||||||
backFiller.BackFill(wg)
|
backFiller.BackFill(wg)
|
||||||
}
|
}
|
||||||
|
shutdown := make(chan os.Signal)
|
||||||
|
signal.Notify(shutdown, os.Interrupt)
|
||||||
|
<-shutdown
|
||||||
|
if superNodeConfig.BackFill {
|
||||||
|
backFiller.Stop()
|
||||||
|
}
|
||||||
|
superNode.Stop()
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
2
go.mod
2
go.mod
@ -98,4 +98,4 @@ replace github.com/ipfs/go-ipfs v0.4.22 => github.com/vulcanize/go-ipfs v0.4.22-
|
|||||||
|
|
||||||
replace github.com/ipfs/go-ipfs-config v0.0.3 => github.com/vulcanize/go-ipfs-config v0.0.8-alpha
|
replace github.com/ipfs/go-ipfs-config v0.0.3 => github.com/vulcanize/go-ipfs-config v0.0.8-alpha
|
||||||
|
|
||||||
replace github.com/ethereum/go-ethereum v1.9.1 => github.com/vulcanize/go-ethereum v1.5.10-0.20200311182536-d07dc803d290
|
replace github.com/ethereum/go-ethereum v1.9.1 => github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.2
|
||||||
|
2
go.sum
2
go.sum
@ -692,6 +692,8 @@ github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljT
|
|||||||
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
|
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
|
||||||
github.com/vulcanize/go-ethereum v1.5.10-0.20200311182536-d07dc803d290 h1:uMWt+x6JhVT7GyL983weZSxv1zDBxvGlI9HNkcTnUeg=
|
github.com/vulcanize/go-ethereum v1.5.10-0.20200311182536-d07dc803d290 h1:uMWt+x6JhVT7GyL983weZSxv1zDBxvGlI9HNkcTnUeg=
|
||||||
github.com/vulcanize/go-ethereum v1.5.10-0.20200311182536-d07dc803d290/go.mod h1:7oC0Ni6dosMv5pxMigm6s0hN8g4haJMBnqmmo0D9YfQ=
|
github.com/vulcanize/go-ethereum v1.5.10-0.20200311182536-d07dc803d290/go.mod h1:7oC0Ni6dosMv5pxMigm6s0hN8g4haJMBnqmmo0D9YfQ=
|
||||||
|
github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.2 h1:ebv2bWocCmNKGnpHtRjSWoTpkgyEbRBb028PanH43H8=
|
||||||
|
github.com/vulcanize/go-ethereum v1.9.11-statediff-0.0.2/go.mod h1:7oC0Ni6dosMv5pxMigm6s0hN8g4haJMBnqmmo0D9YfQ=
|
||||||
github.com/vulcanize/go-ipfs v0.4.22-alpha h1:W+6njT14KWllMhABRFtPndqHw8SHCt5SqD4YX528kxM=
|
github.com/vulcanize/go-ipfs v0.4.22-alpha h1:W+6njT14KWllMhABRFtPndqHw8SHCt5SqD4YX528kxM=
|
||||||
github.com/vulcanize/go-ipfs v0.4.22-alpha/go.mod h1:uaekWWeoaA0A9Dv1LObOKCSh9kIzTpZ5RbKW4g5CQHE=
|
github.com/vulcanize/go-ipfs v0.4.22-alpha/go.mod h1:uaekWWeoaA0A9Dv1LObOKCSh9kIzTpZ5RbKW4g5CQHE=
|
||||||
github.com/vulcanize/go-ipfs-config v0.0.8-alpha h1:peaFvbEcPShF6ymOd8flqKkFz4YfcrNr/UOO7FmbWoQ=
|
github.com/vulcanize/go-ipfs-config v0.0.8-alpha h1:peaFvbEcPShF6ymOd8flqKkFz4YfcrNr/UOO7FmbWoQ=
|
||||||
|
@ -45,7 +45,7 @@ func NewGethRPCStorageFetcher(streamer streamer.Streamer) GethRPCStorageFetcher
|
|||||||
|
|
||||||
func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffInput, errs chan<- error) {
|
func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffInput, errs chan<- error) {
|
||||||
ethStatediffPayloadChan := fetcher.StatediffPayloadChan
|
ethStatediffPayloadChan := fetcher.StatediffPayloadChan
|
||||||
clientSubscription, clientSubErr := fetcher.streamer.Stream(ethStatediffPayloadChan)
|
clientSubscription, clientSubErr := fetcher.streamer.Stream(ethStatediffPayloadChan, statediff.Params{})
|
||||||
if clientSubErr != nil {
|
if clientSubErr != nil {
|
||||||
errs <- clientSubErr
|
errs <- clientSubErr
|
||||||
panic(fmt.Sprintf("Error creating a geth client subscription: %v", clientSubErr))
|
panic(fmt.Sprintf("Error creating a geth client subscription: %v", clientSubErr))
|
||||||
@ -55,8 +55,8 @@ func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageD
|
|||||||
for {
|
for {
|
||||||
diff := <-ethStatediffPayloadChan
|
diff := <-ethStatediffPayloadChan
|
||||||
logrus.Trace("received a statediff")
|
logrus.Trace("received a statediff")
|
||||||
stateDiff := new(statediff.StateDiff)
|
stateDiff := new(statediff.StateObject)
|
||||||
decodeErr := rlp.DecodeBytes(diff.StateDiffRlp, stateDiff)
|
decodeErr := rlp.DecodeBytes(diff.StateObjectRlp, stateDiff)
|
||||||
if decodeErr != nil {
|
if decodeErr != nil {
|
||||||
logrus.Warn("Error decoding state diff into RLP: ", decodeErr)
|
logrus.Warn("Error decoding state diff into RLP: ", decodeErr)
|
||||||
errs <- decodeErr
|
errs <- decodeErr
|
||||||
@ -65,8 +65,8 @@ func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageD
|
|||||||
accounts := utils.GetAccountsFromDiff(*stateDiff)
|
accounts := utils.GetAccountsFromDiff(*stateDiff)
|
||||||
logrus.Trace(fmt.Sprintf("iterating through %d accounts on stateDiff for block %d", len(accounts), stateDiff.BlockNumber))
|
logrus.Trace(fmt.Sprintf("iterating through %d accounts on stateDiff for block %d", len(accounts), stateDiff.BlockNumber))
|
||||||
for _, account := range accounts {
|
for _, account := range accounts {
|
||||||
logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.Storage), common.BytesToHash(account.LeafKey).Hex()))
|
logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.StorageNodes), common.BytesToHash(account.LeafKey).Hex()))
|
||||||
for _, storage := range account.Storage {
|
for _, storage := range account.StorageNodes {
|
||||||
diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
|
diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
|
||||||
if formatErr != nil {
|
if formatErr != nil {
|
||||||
logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.LeafKey), "from account with key: ", common.BytesToHash(account.LeafKey))
|
logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.LeafKey), "from account with key: ", common.BytesToHash(account.LeafKey))
|
||||||
|
@ -33,13 +33,14 @@ import (
|
|||||||
type MockStoragediffStreamer struct {
|
type MockStoragediffStreamer struct {
|
||||||
subscribeError error
|
subscribeError error
|
||||||
PassedPayloadChan chan statediff.Payload
|
PassedPayloadChan chan statediff.Payload
|
||||||
|
PassedParams statediff.Params
|
||||||
streamPayloads []statediff.Payload
|
streamPayloads []statediff.Payload
|
||||||
}
|
}
|
||||||
|
|
||||||
func (streamer *MockStoragediffStreamer) Stream(statediffPayloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) {
|
func (streamer *MockStoragediffStreamer) Stream(statediffPayloadChan chan statediff.Payload, params statediff.Params) (*rpc.ClientSubscription, error) {
|
||||||
clientSubscription := rpc.ClientSubscription{}
|
clientSubscription := rpc.ClientSubscription{}
|
||||||
streamer.PassedPayloadChan = statediffPayloadChan
|
streamer.PassedPayloadChan = statediffPayloadChan
|
||||||
|
streamer.PassedParams = params
|
||||||
go func() {
|
go func() {
|
||||||
for _, payload := range streamer.streamPayloads {
|
for _, payload := range streamer.streamPayloads {
|
||||||
streamer.PassedPayloadChan <- payload
|
streamer.PassedPayloadChan <- payload
|
||||||
@ -148,19 +149,19 @@ var _ = Describe("Geth RPC Storage Fetcher", func() {
|
|||||||
|
|
||||||
It("adds errors to error channel if formatting the diff as a StateDiff object fails", func(done Done) {
|
It("adds errors to error channel if formatting the diff as a StateDiff object fails", func(done Done) {
|
||||||
accountDiffs := test_data.CreatedAccountDiffs
|
accountDiffs := test_data.CreatedAccountDiffs
|
||||||
accountDiffs[0].Storage = []statediff.StorageDiff{test_data.StorageWithBadValue}
|
accountDiffs[0].StorageNodes = []statediff.StorageNode{test_data.StorageWithBadValue}
|
||||||
|
|
||||||
stateDiff := statediff.StateDiff{
|
stateDiff := statediff.StateObject{
|
||||||
BlockNumber: test_data.BlockNumber,
|
BlockNumber: test_data.BlockNumber,
|
||||||
BlockHash: common.HexToHash(test_data.BlockHash),
|
BlockHash: common.HexToHash(test_data.BlockHash),
|
||||||
CreatedAccounts: accountDiffs,
|
Nodes: accountDiffs,
|
||||||
}
|
}
|
||||||
|
|
||||||
stateDiffRlp, err := rlp.EncodeToBytes(stateDiff)
|
stateDiffRlp, err := rlp.EncodeToBytes(stateDiff)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
badStatediffPayload := statediff.Payload{
|
badStatediffPayload := statediff.Payload{
|
||||||
StateDiffRlp: stateDiffRlp,
|
StateObjectRlp: stateDiffRlp,
|
||||||
}
|
}
|
||||||
streamer.SetPayloads([]statediff.Payload{badStatediffPayload})
|
streamer.SetPayloads([]statediff.Payload{badStatediffPayload})
|
||||||
|
|
||||||
|
@ -51,12 +51,12 @@ func (mc *BackFillerClient) BatchCall(batch []client.BatchElem) error {
|
|||||||
return errors.New("mockclient needs to be initialized with statediff payloads and errors")
|
return errors.New("mockclient needs to be initialized with statediff payloads and errors")
|
||||||
}
|
}
|
||||||
for _, batchElem := range batch {
|
for _, batchElem := range batch {
|
||||||
if len(batchElem.Args) != 1 {
|
if len(batchElem.Args) < 1 {
|
||||||
return errors.New("expected batch elem to contain single argument")
|
return errors.New("expected batch elem to contain an argument(s)")
|
||||||
}
|
}
|
||||||
blockHeight, ok := batchElem.Args[0].(uint64)
|
blockHeight, ok := batchElem.Args[0].(uint64)
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("expected batch elem argument to be a uint64")
|
return errors.New("expected first batch elem argument to be a uint64")
|
||||||
}
|
}
|
||||||
err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result)
|
err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -72,12 +72,12 @@ func (mc *BackFillerClient) BatchCallContext(ctx context.Context, batch []rpc.Ba
|
|||||||
return errors.New("mockclient needs to be initialized with statediff payloads and errors")
|
return errors.New("mockclient needs to be initialized with statediff payloads and errors")
|
||||||
}
|
}
|
||||||
for _, batchElem := range batch {
|
for _, batchElem := range batch {
|
||||||
if len(batchElem.Args) != 1 {
|
if len(batchElem.Args) < 1 {
|
||||||
return errors.New("expected batch elem to contain single argument")
|
return errors.New("expected batch elem to contain an argument(s)")
|
||||||
}
|
}
|
||||||
blockHeight, ok := batchElem.Args[0].(uint64)
|
blockHeight, ok := batchElem.Args[0].(uint64)
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("expected batch elem argument to be a uint64")
|
return errors.New("expected batch elem first argument to be a uint64")
|
||||||
}
|
}
|
||||||
err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result)
|
err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -120,16 +120,16 @@ func (bf *backFiller) backFillRange(blockHeights []uint64, diffChan chan utils.S
|
|||||||
errChan <- fetchErr
|
errChan <- fetchErr
|
||||||
}
|
}
|
||||||
for _, payload := range payloads {
|
for _, payload := range payloads {
|
||||||
stateDiff := new(statediff.StateDiff)
|
stateDiff := new(statediff.StateObject)
|
||||||
stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff)
|
stateDiffDecodeErr := rlp.DecodeBytes(payload.StateObjectRlp, stateDiff)
|
||||||
if stateDiffDecodeErr != nil {
|
if stateDiffDecodeErr != nil {
|
||||||
errChan <- stateDiffDecodeErr
|
errChan <- stateDiffDecodeErr
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
accounts := utils.GetAccountsFromDiff(*stateDiff)
|
accounts := utils.GetAccountsFromDiff(*stateDiff)
|
||||||
for _, account := range accounts {
|
for _, account := range accounts {
|
||||||
logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.Storage), common.BytesToHash(account.LeafKey).Hex()))
|
logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.StorageNodes), common.BytesToHash(account.LeafKey).Hex()))
|
||||||
for _, storage := range account.Storage {
|
for _, storage := range account.StorageNodes {
|
||||||
diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
|
diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
|
||||||
if formatErr != nil {
|
if formatErr != nil {
|
||||||
logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.LeafKey), "from account with key: ", common.BytesToHash(account.LeafKey))
|
logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.LeafKey), "from account with key: ", common.BytesToHash(account.LeafKey))
|
||||||
|
@ -57,7 +57,7 @@ func FromParityCsvRow(csvRow []string) (StorageDiffInput, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func FromGethStateDiff(account statediff.AccountDiff, stateDiff *statediff.StateDiff, storage statediff.StorageDiff) (StorageDiffInput, error) {
|
func FromGethStateDiff(account statediff.StateNode, stateDiff *statediff.StateObject, storage statediff.StorageNode) (StorageDiffInput, error) {
|
||||||
var decodedValue []byte
|
var decodedValue []byte
|
||||||
err := rlp.DecodeBytes(storage.NodeValue, &decodedValue)
|
err := rlp.DecodeBytes(storage.NodeValue, &decodedValue)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -84,7 +84,6 @@ func HexToKeccak256Hash(hex string) common.Hash {
|
|||||||
return crypto.Keccak256Hash(common.FromHex(hex))
|
return crypto.Keccak256Hash(common.FromHex(hex))
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetAccountsFromDiff(stateDiff statediff.StateDiff) []statediff.AccountDiff {
|
func GetAccountsFromDiff(stateDiff statediff.StateObject) []statediff.StateNode {
|
||||||
accounts := append(stateDiff.CreatedAccounts, stateDiff.UpdatedAccounts...)
|
return stateDiff.Nodes
|
||||||
return append(accounts, stateDiff.DeletedAccounts...)
|
|
||||||
}
|
}
|
||||||
|
@ -67,8 +67,8 @@ var _ = Describe("Storage row parsing", func() {
|
|||||||
|
|
||||||
Describe("FromGethStateDiff", func() {
|
Describe("FromGethStateDiff", func() {
|
||||||
var (
|
var (
|
||||||
accountDiff = statediff.AccountDiff{LeafKey: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}}
|
accountDiff = statediff.StateNode{LeafKey: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}}
|
||||||
stateDiff = &statediff.StateDiff{
|
stateDiff = &statediff.StateObject{
|
||||||
BlockNumber: big.NewInt(rand.Int63()),
|
BlockNumber: big.NewInt(rand.Int63()),
|
||||||
BlockHash: fakes.FakeHash,
|
BlockHash: fakes.FakeHash,
|
||||||
}
|
}
|
||||||
@ -79,7 +79,7 @@ var _ = Describe("Storage row parsing", func() {
|
|||||||
storageValueRlp, encodeErr := rlp.EncodeToBytes(storageValueBytes)
|
storageValueRlp, encodeErr := rlp.EncodeToBytes(storageValueBytes)
|
||||||
Expect(encodeErr).NotTo(HaveOccurred())
|
Expect(encodeErr).NotTo(HaveOccurred())
|
||||||
|
|
||||||
storageDiff := statediff.StorageDiff{
|
storageDiff := statediff.StorageNode{
|
||||||
LeafKey: []byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1},
|
LeafKey: []byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1},
|
||||||
NodeValue: storageValueRlp,
|
NodeValue: storageValueRlp,
|
||||||
NodeType: statediff.Leaf,
|
NodeType: statediff.Leaf,
|
||||||
@ -104,7 +104,7 @@ var _ = Describe("Storage row parsing", func() {
|
|||||||
storageValueRlp, encodeErr := rlp.EncodeToBytes(storageValueBytes)
|
storageValueRlp, encodeErr := rlp.EncodeToBytes(storageValueBytes)
|
||||||
Expect(encodeErr).NotTo(HaveOccurred())
|
Expect(encodeErr).NotTo(HaveOccurred())
|
||||||
|
|
||||||
storageDiff := statediff.StorageDiff{
|
storageDiff := statediff.StorageNode{
|
||||||
LeafKey: []byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1},
|
LeafKey: []byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1},
|
||||||
NodeValue: storageValueRlp,
|
NodeValue: storageValueRlp,
|
||||||
NodeType: statediff.Leaf,
|
NodeType: statediff.Leaf,
|
||||||
|
@ -26,7 +26,7 @@ import (
|
|||||||
|
|
||||||
// Streamer is the interface for streaming a statediff subscription
|
// Streamer is the interface for streaming a statediff subscription
|
||||||
type Streamer interface {
|
type Streamer interface {
|
||||||
Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error)
|
Stream(payloadChan chan statediff.Payload, params statediff.Params) (*rpc.ClientSubscription, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// StateDiffStreamer is the underlying struct for the StateDiffStreamer interface
|
// StateDiffStreamer is the underlying struct for the StateDiffStreamer interface
|
||||||
@ -42,7 +42,7 @@ func NewStateDiffStreamer(client core.RPCClient) Streamer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Stream is the main loop for subscribing to data from the Geth state diff process
|
// Stream is the main loop for subscribing to data from the Geth state diff process
|
||||||
func (sds *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) {
|
func (sds *StateDiffStreamer) Stream(payloadChan chan statediff.Payload, params statediff.Params) (*rpc.ClientSubscription, error) {
|
||||||
logrus.Info("streaming diffs from geth")
|
logrus.Info("streaming diffs from geth")
|
||||||
return sds.Client.Subscribe("statediff", payloadChan, "stream")
|
return sds.Client.Subscribe("statediff", payloadChan, "stream", params)
|
||||||
}
|
}
|
||||||
|
@ -28,9 +28,14 @@ var _ = Describe("StateDiff Streamer", func() {
|
|||||||
client := &fakes.MockRPCClient{}
|
client := &fakes.MockRPCClient{}
|
||||||
streamer := streamer.NewStateDiffStreamer(client)
|
streamer := streamer.NewStateDiffStreamer(client)
|
||||||
payloadChan := make(chan statediff.Payload)
|
payloadChan := make(chan statediff.Payload)
|
||||||
_, err := streamer.Stream(payloadChan)
|
params := statediff.Params{
|
||||||
|
IncludeBlock: true,
|
||||||
|
IncludeTD: true,
|
||||||
|
IncludeReceipts: true,
|
||||||
|
}
|
||||||
|
_, err := streamer.Stream(payloadChan, params)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
client.AssertSubscribeCalledWith("statediff", payloadChan, []interface{}{"stream"})
|
client.AssertSubscribeCalledWith("statediff", payloadChan, []interface{}{"stream", params})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -40,7 +40,7 @@ var (
|
|||||||
StorageKey = common.HexToHash("0000000000000000000000000000000000000000000000000000000000000001").Bytes()
|
StorageKey = common.HexToHash("0000000000000000000000000000000000000000000000000000000000000001").Bytes()
|
||||||
SmallStorageValue = common.Hex2Bytes("03")
|
SmallStorageValue = common.Hex2Bytes("03")
|
||||||
SmallStorageValueRlp, _ = rlp.EncodeToBytes(SmallStorageValue)
|
SmallStorageValueRlp, _ = rlp.EncodeToBytes(SmallStorageValue)
|
||||||
storageWithSmallValue = []statediff.StorageDiff{{
|
storageWithSmallValue = []statediff.StorageNode{{
|
||||||
LeafKey: StorageKey,
|
LeafKey: StorageKey,
|
||||||
NodeValue: SmallStorageValueRlp,
|
NodeValue: SmallStorageValueRlp,
|
||||||
NodeType: statediff.Leaf,
|
NodeType: statediff.Leaf,
|
||||||
@ -48,13 +48,13 @@ var (
|
|||||||
}}
|
}}
|
||||||
LargeStorageValue = common.Hex2Bytes("00191b53778c567b14b50ba0000")
|
LargeStorageValue = common.Hex2Bytes("00191b53778c567b14b50ba0000")
|
||||||
LargeStorageValueRlp, _ = rlp.EncodeToBytes(LargeStorageValue)
|
LargeStorageValueRlp, _ = rlp.EncodeToBytes(LargeStorageValue)
|
||||||
storageWithLargeValue = []statediff.StorageDiff{{
|
storageWithLargeValue = []statediff.StorageNode{{
|
||||||
LeafKey: StorageKey,
|
LeafKey: StorageKey,
|
||||||
NodeValue: LargeStorageValueRlp,
|
NodeValue: LargeStorageValueRlp,
|
||||||
Path: StoragePath,
|
Path: StoragePath,
|
||||||
NodeType: statediff.Leaf,
|
NodeType: statediff.Leaf,
|
||||||
}}
|
}}
|
||||||
StorageWithBadValue = statediff.StorageDiff{
|
StorageWithBadValue = statediff.StorageNode{
|
||||||
LeafKey: StorageKey,
|
LeafKey: StorageKey,
|
||||||
NodeValue: []byte{0, 1, 2},
|
NodeValue: []byte{0, 1, 2},
|
||||||
NodeType: statediff.Leaf,
|
NodeType: statediff.Leaf,
|
||||||
@ -74,44 +74,40 @@ var (
|
|||||||
CodeHash: CodeHash,
|
CodeHash: CodeHash,
|
||||||
}
|
}
|
||||||
valueBytes, _ = rlp.EncodeToBytes(testAccount)
|
valueBytes, _ = rlp.EncodeToBytes(testAccount)
|
||||||
CreatedAccountDiffs = []statediff.AccountDiff{
|
CreatedAccountDiffs = []statediff.StateNode{
|
||||||
{
|
{
|
||||||
LeafKey: ContractLeafKey.Bytes(),
|
LeafKey: ContractLeafKey.Bytes(),
|
||||||
NodeValue: valueBytes,
|
NodeValue: valueBytes,
|
||||||
Storage: storageWithSmallValue,
|
StorageNodes: storageWithSmallValue,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
UpdatedAccountDiffs = []statediff.AccountDiff{{
|
UpdatedAccountDiffs = []statediff.StateNode{{
|
||||||
LeafKey: AnotherContractLeafKey.Bytes(),
|
LeafKey: AnotherContractLeafKey.Bytes(),
|
||||||
NodeValue: valueBytes,
|
NodeValue: valueBytes,
|
||||||
Storage: storageWithLargeValue,
|
StorageNodes: storageWithLargeValue,
|
||||||
}}
|
}}
|
||||||
UpdatedAccountDiffs2 = []statediff.AccountDiff{{
|
UpdatedAccountDiffs2 = []statediff.StateNode{{
|
||||||
LeafKey: AnotherContractLeafKey.Bytes(),
|
LeafKey: AnotherContractLeafKey.Bytes(),
|
||||||
NodeValue: valueBytes,
|
NodeValue: valueBytes,
|
||||||
Storage: storageWithSmallValue,
|
StorageNodes: storageWithSmallValue,
|
||||||
}}
|
}}
|
||||||
|
|
||||||
DeletedAccountDiffs = []statediff.AccountDiff{{
|
DeletedAccountDiffs = []statediff.StateNode{{
|
||||||
LeafKey: AnotherContractLeafKey.Bytes(),
|
LeafKey: AnotherContractLeafKey.Bytes(),
|
||||||
NodeValue: valueBytes,
|
NodeValue: valueBytes,
|
||||||
Storage: storageWithSmallValue,
|
StorageNodes: storageWithSmallValue,
|
||||||
}}
|
}}
|
||||||
|
|
||||||
MockStateDiff = statediff.StateDiff{
|
MockStateDiff = statediff.StateObject{
|
||||||
BlockNumber: BlockNumber,
|
BlockNumber: BlockNumber,
|
||||||
BlockHash: common.HexToHash(BlockHash),
|
BlockHash: common.HexToHash(BlockHash),
|
||||||
CreatedAccounts: CreatedAccountDiffs,
|
Nodes: append(append(CreatedAccountDiffs, UpdatedAccountDiffs...), DeletedAccountDiffs...),
|
||||||
DeletedAccounts: DeletedAccountDiffs,
|
|
||||||
UpdatedAccounts: UpdatedAccountDiffs,
|
|
||||||
}
|
}
|
||||||
MockStateDiff2 = statediff.StateDiff{
|
MockStateDiff2 = statediff.StateObject{
|
||||||
BlockNumber: BlockNumber2,
|
BlockNumber: BlockNumber2,
|
||||||
BlockHash: common.HexToHash(BlockHash2),
|
BlockHash: common.HexToHash(BlockHash2),
|
||||||
CreatedAccounts: nil,
|
Nodes: UpdatedAccountDiffs2,
|
||||||
DeletedAccounts: nil,
|
|
||||||
UpdatedAccounts: UpdatedAccountDiffs2,
|
|
||||||
}
|
}
|
||||||
MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff)
|
MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff)
|
||||||
MockStateDiff2Bytes, _ = rlp.EncodeToBytes(MockStateDiff2)
|
MockStateDiff2Bytes, _ = rlp.EncodeToBytes(MockStateDiff2)
|
||||||
@ -144,12 +140,12 @@ var (
|
|||||||
MockBlockRlp2, _ = rlp.EncodeToBytes(MockBlock2)
|
MockBlockRlp2, _ = rlp.EncodeToBytes(MockBlock2)
|
||||||
|
|
||||||
MockStatediffPayload = statediff.Payload{
|
MockStatediffPayload = statediff.Payload{
|
||||||
BlockRlp: MockBlockRlp,
|
BlockRlp: MockBlockRlp,
|
||||||
StateDiffRlp: MockStateDiffBytes,
|
StateObjectRlp: MockStateDiffBytes,
|
||||||
}
|
}
|
||||||
MockStatediffPayload2 = statediff.Payload{
|
MockStatediffPayload2 = statediff.Payload{
|
||||||
BlockRlp: MockBlockRlp2,
|
BlockRlp: MockBlockRlp2,
|
||||||
StateDiffRlp: MockStateDiff2Bytes,
|
StateObjectRlp: MockStateDiff2Bytes,
|
||||||
}
|
}
|
||||||
|
|
||||||
CreatedExpectedStorageDiff = utils.StorageDiffInput{
|
CreatedExpectedStorageDiff = utils.StorageDiffInput{
|
||||||
|
@ -17,9 +17,7 @@
|
|||||||
package super_node
|
package super_node
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
@ -37,6 +35,7 @@ const (
|
|||||||
type BackFillInterface interface {
|
type BackFillInterface interface {
|
||||||
// Method for the super node to periodically check for and fill in gaps in its data using an archival node
|
// Method for the super node to periodically check for and fill in gaps in its data using an archival node
|
||||||
BackFill(wg *sync.WaitGroup)
|
BackFill(wg *sync.WaitGroup)
|
||||||
|
Stop() error
|
||||||
}
|
}
|
||||||
|
|
||||||
// BackFillService for filling in gaps in the super node
|
// BackFillService for filling in gaps in the super node
|
||||||
@ -62,7 +61,7 @@ type BackFillService struct {
|
|||||||
// Channel for receiving quit signal
|
// Channel for receiving quit signal
|
||||||
QuitChan chan bool
|
QuitChan chan bool
|
||||||
// Chain type
|
// Chain type
|
||||||
Chain shared.ChainType
|
chain shared.ChainType
|
||||||
// Headers with times_validated lower than this will be resynced
|
// Headers with times_validated lower than this will be resynced
|
||||||
validationLevel int
|
validationLevel int
|
||||||
}
|
}
|
||||||
@ -107,8 +106,8 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert
|
|||||||
BatchSize: batchSize,
|
BatchSize: batchSize,
|
||||||
BatchNumber: int64(batchNumber),
|
BatchNumber: int64(batchNumber),
|
||||||
ScreenAndServeChan: screenAndServeChan,
|
ScreenAndServeChan: screenAndServeChan,
|
||||||
QuitChan: settings.Quit,
|
QuitChan: make(chan bool),
|
||||||
Chain: settings.Chain,
|
chain: settings.Chain,
|
||||||
validationLevel: settings.ValidationLevel,
|
validationLevel: settings.ValidationLevel,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
@ -116,119 +115,97 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert
|
|||||||
// BackFill periodically checks for and fills in gaps in the super node db
|
// BackFill periodically checks for and fills in gaps in the super node db
|
||||||
func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) {
|
func (bfs *BackFillService) BackFill(wg *sync.WaitGroup) {
|
||||||
ticker := time.NewTicker(bfs.GapCheckFrequency)
|
ticker := time.NewTicker(bfs.GapCheckFrequency)
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
wg.Add(1)
|
||||||
|
defer wg.Done()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-bfs.QuitChan:
|
case <-bfs.QuitChan:
|
||||||
log.Infof("quiting %s FillGapsInSuperNode process", bfs.Chain.String())
|
log.Infof("quiting %s FillGapsInSuperNode process", bfs.chain.String())
|
||||||
wg.Done()
|
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
log.Infof("searching for gaps in the %s super node database", bfs.Chain.String())
|
|
||||||
startingBlock, err := bfs.Retriever.RetrieveFirstBlockNumber()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("super node db backfill RetrieveFirstBlockNumber error for chain %s: %v", bfs.Chain.String(), err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if startingBlock != 0 {
|
|
||||||
log.Infof("found gap at the beginning of the %s sync", bfs.Chain.String())
|
|
||||||
if err := bfs.backFill(0, uint64(startingBlock-1)); err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
gaps, err := bfs.Retriever.RetrieveGapsInData(bfs.validationLevel)
|
gaps, err := bfs.Retriever.RetrieveGapsInData(bfs.validationLevel)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("super node db backfill RetrieveGapsInData error for chain %s: %v", bfs.Chain.String(), err)
|
log.Errorf("%s super node db backFill RetrieveGapsInData error: %v", bfs.chain.String(), err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// spin up worker goroutines for this search pass
|
||||||
|
// we start and kill a new batch of workers for each pass
|
||||||
|
// so that we know each of the previous workers is done before we search for new gaps
|
||||||
|
heightsChan := make(chan []uint64)
|
||||||
|
for i := 1; i <= int(bfs.BatchNumber); i++ {
|
||||||
|
go bfs.backFill(wg, i, heightsChan)
|
||||||
|
}
|
||||||
for _, gap := range gaps {
|
for _, gap := range gaps {
|
||||||
if err := bfs.backFill(gap.Start, gap.Stop); err != nil {
|
log.Infof("backFilling %s data from %d to %d", bfs.chain.String(), gap.Start, gap.Stop)
|
||||||
log.Error(err)
|
blockRangeBins, err := utils.GetBlockHeightBins(gap.Start, gap.Stop, bfs.BatchSize)
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
log.Infof("%s BackFill goroutine successfully spun up", bfs.Chain.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
// backFill fetches, processes, and returns utils.StorageDiffs over a range of blocks
|
|
||||||
// It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently
|
|
||||||
func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64) error {
|
|
||||||
log.Infof("filling in %s gap from %d to %d", bfs.Chain.String(), startingBlock, endingBlock)
|
|
||||||
if endingBlock < startingBlock {
|
|
||||||
return fmt.Errorf("super node %s db backfill: ending block number needs to be greater than starting block number", bfs.Chain.String())
|
|
||||||
}
|
|
||||||
// break the range up into bins of smaller ranges
|
|
||||||
blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, bfs.BatchSize)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have
|
|
||||||
var activeCount int64
|
|
||||||
// channel for processing goroutines to signal when they are done
|
|
||||||
processingDone := make(chan bool)
|
|
||||||
forwardDone := make(chan bool)
|
|
||||||
|
|
||||||
// for each block range bin spin up a goroutine to batch fetch and process data for that range
|
|
||||||
go func() {
|
|
||||||
for _, blockHeights := range blockRangeBins {
|
|
||||||
// if we have reached our limit of active goroutines
|
|
||||||
// wait for one to finish before starting the next
|
|
||||||
if atomic.AddInt64(&activeCount, 1) > bfs.BatchNumber {
|
|
||||||
// this blocks until a process signals it has finished
|
|
||||||
<-forwardDone
|
|
||||||
}
|
|
||||||
go func(blockHeights []uint64) {
|
|
||||||
payloads, err := bfs.Fetcher.FetchAt(blockHeights)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("%s super node historical data fetcher error: %s", bfs.Chain.String(), err.Error())
|
|
||||||
}
|
|
||||||
for _, payload := range payloads {
|
|
||||||
ipldPayload, err := bfs.Converter.Convert(payload)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("%s super node historical data converter error: %s", bfs.Chain.String(), err.Error())
|
log.Errorf("%s super node db backFill GetBlockHeightBins error: %v", bfs.chain.String(), err)
|
||||||
}
|
|
||||||
// If there is a ScreenAndServe process listening, forward payload to it
|
|
||||||
select {
|
|
||||||
case bfs.ScreenAndServeChan <- ipldPayload:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
cidPayload, err := bfs.Publisher.Publish(ipldPayload)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("%s super node historical data publisher error: %s", bfs.Chain.String(), err.Error())
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := bfs.Indexer.Index(cidPayload); err != nil {
|
for _, heights := range blockRangeBins {
|
||||||
log.Errorf("%s super node historical data indexer error: %s", bfs.Chain.String(), err.Error())
|
select {
|
||||||
|
case <-bfs.QuitChan:
|
||||||
|
log.Infof("quiting %s BackFill process", bfs.chain.String())
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
heightsChan <- heights
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// when this goroutine is done, send out a signal
|
// send a quit signal to each worker
|
||||||
log.Infof("finished filling in %s gap from %d to %d", bfs.Chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1])
|
// this blocks until each worker has finished its current task and is free to receive from the quit channel
|
||||||
processingDone <- true
|
for i := 1; i <= int(bfs.BatchNumber); i++ {
|
||||||
}(blockHeights)
|
bfs.QuitChan <- true
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
log.Infof("%s BackFill goroutine successfully spun up", bfs.chain.String())
|
||||||
|
}
|
||||||
|
|
||||||
// listen on the processingDone chan
|
func (bfs *BackFillService) backFill(wg *sync.WaitGroup, id int, heightChan chan []uint64) {
|
||||||
// keeps track of the number of processing goroutines that have finished
|
wg.Add(1)
|
||||||
// when they have all finished, return
|
defer wg.Done()
|
||||||
goroutinesFinished := 0
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-processingDone:
|
case heights := <-heightChan:
|
||||||
atomic.AddInt64(&activeCount, -1)
|
log.Debugf("%s backFill worker %d processing section from %d to %d", bfs.chain.String(), id, heights[0], heights[len(heights)-1])
|
||||||
select {
|
payloads, err := bfs.Fetcher.FetchAt(heights)
|
||||||
// if we are waiting for a process to finish, signal that one has
|
if err != nil {
|
||||||
case forwardDone <- true:
|
log.Errorf("%s backFill worker %d fetcher error: %s", bfs.chain.String(), id, err.Error())
|
||||||
default:
|
|
||||||
}
|
}
|
||||||
goroutinesFinished++
|
for _, payload := range payloads {
|
||||||
if goroutinesFinished >= len(blockRangeBins) {
|
ipldPayload, err := bfs.Converter.Convert(payload)
|
||||||
return nil
|
if err != nil {
|
||||||
|
log.Errorf("%s backFill worker %d converter error: %s", bfs.chain.String(), id, err.Error())
|
||||||
|
}
|
||||||
|
// If there is a ScreenAndServe process listening, forward converted payload to it
|
||||||
|
select {
|
||||||
|
case bfs.ScreenAndServeChan <- ipldPayload:
|
||||||
|
log.Debugf("%s backFill worker %d forwarded converted payload to server", bfs.chain.String(), id)
|
||||||
|
default:
|
||||||
|
log.Debugf("%s backFill worker %d unable to forward converted payload to server; no channel ready to receive", bfs.chain.String(), id)
|
||||||
|
}
|
||||||
|
cidPayload, err := bfs.Publisher.Publish(ipldPayload)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("%s backFill worker %d publisher error: %s", bfs.chain.String(), id, err.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := bfs.Indexer.Index(cidPayload); err != nil {
|
||||||
|
log.Errorf("%s backFill worker %d indexer error: %s", bfs.chain.String(), id, err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
log.Infof("%s backFill worker %d finished section from %d to %d", bfs.chain.String(), id, heights[0], heights[len(heights)-1])
|
||||||
|
case <-bfs.QuitChan:
|
||||||
|
log.Infof("%s backFill worker %d shutting down", bfs.chain.String(), id)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bfs *BackFillService) Stop() error {
|
||||||
|
log.Infof("Stopping %s backFill service", bfs.chain.String())
|
||||||
|
close(bfs.QuitChan)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -69,7 +69,6 @@ var _ = Describe("BackFiller", func() {
|
|||||||
BatchSize: super_node.DefaultMaxBatchSize,
|
BatchSize: super_node.DefaultMaxBatchSize,
|
||||||
BatchNumber: super_node.DefaultMaxBatchNumber,
|
BatchNumber: super_node.DefaultMaxBatchNumber,
|
||||||
QuitChan: quitChan,
|
QuitChan: quitChan,
|
||||||
Chain: shared.Ethereum,
|
|
||||||
}
|
}
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
backfiller.BackFill(wg)
|
backfiller.BackFill(wg)
|
||||||
@ -125,7 +124,6 @@ var _ = Describe("BackFiller", func() {
|
|||||||
BatchSize: super_node.DefaultMaxBatchSize,
|
BatchSize: super_node.DefaultMaxBatchSize,
|
||||||
BatchNumber: super_node.DefaultMaxBatchNumber,
|
BatchNumber: super_node.DefaultMaxBatchNumber,
|
||||||
QuitChan: quitChan,
|
QuitChan: quitChan,
|
||||||
Chain: shared.Ethereum,
|
|
||||||
}
|
}
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
backfiller.BackFill(wg)
|
backfiller.BackFill(wg)
|
||||||
@ -156,7 +154,12 @@ var _ = Describe("BackFiller", func() {
|
|||||||
}
|
}
|
||||||
mockRetriever := &mocks2.CIDRetriever{
|
mockRetriever := &mocks2.CIDRetriever{
|
||||||
FirstBlockNumberToReturn: 3,
|
FirstBlockNumberToReturn: 3,
|
||||||
GapsToRetrieve: []shared.Gap{},
|
GapsToRetrieve: []shared.Gap{
|
||||||
|
{
|
||||||
|
Start: 0,
|
||||||
|
Stop: 2,
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
mockFetcher := &mocks2.PayloadFetcher{
|
mockFetcher := &mocks2.PayloadFetcher{
|
||||||
PayloadsToReturn: map[uint64]shared.RawChainData{
|
PayloadsToReturn: map[uint64]shared.RawChainData{
|
||||||
@ -175,7 +178,6 @@ var _ = Describe("BackFiller", func() {
|
|||||||
BatchSize: super_node.DefaultMaxBatchSize,
|
BatchSize: super_node.DefaultMaxBatchSize,
|
||||||
BatchNumber: super_node.DefaultMaxBatchNumber,
|
BatchNumber: super_node.DefaultMaxBatchNumber,
|
||||||
QuitChan: quitChan,
|
QuitChan: quitChan,
|
||||||
Chain: shared.Ethereum,
|
|
||||||
}
|
}
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
backfiller.BackFill(wg)
|
backfiller.BackFill(wg)
|
||||||
|
@ -44,21 +44,21 @@ func NewCIDRetriever(db *postgres.DB) *CIDRetriever {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RetrieveFirstBlockNumber is used to retrieve the first block number in the db
|
// RetrieveFirstBlockNumber is used to retrieve the first block number in the db
|
||||||
func (ecr *CIDRetriever) RetrieveFirstBlockNumber() (int64, error) {
|
func (bcr *CIDRetriever) RetrieveFirstBlockNumber() (int64, error) {
|
||||||
var blockNumber int64
|
var blockNumber int64
|
||||||
err := ecr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number ASC LIMIT 1")
|
err := bcr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number ASC LIMIT 1")
|
||||||
return blockNumber, err
|
return blockNumber, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// RetrieveLastBlockNumber is used to retrieve the latest block number in the db
|
// RetrieveLastBlockNumber is used to retrieve the latest block number in the db
|
||||||
func (ecr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) {
|
func (bcr *CIDRetriever) RetrieveLastBlockNumber() (int64, error) {
|
||||||
var blockNumber int64
|
var blockNumber int64
|
||||||
err := ecr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number DESC LIMIT 1 ")
|
err := bcr.db.Get(&blockNumber, "SELECT block_number FROM btc.header_cids ORDER BY block_number DESC LIMIT 1 ")
|
||||||
return blockNumber, err
|
return blockNumber, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters
|
// Retrieve is used to retrieve all of the CIDs which conform to the passed StreamFilters
|
||||||
func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) ([]shared.CIDsForFetching, bool, error) {
|
func (bcr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumber int64) ([]shared.CIDsForFetching, bool, error) {
|
||||||
streamFilter, ok := filter.(*SubscriptionSettings)
|
streamFilter, ok := filter.(*SubscriptionSettings)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, true, fmt.Errorf("btc retriever expected filter type %T got %T", &SubscriptionSettings{}, filter)
|
return nil, true, fmt.Errorf("btc retriever expected filter type %T got %T", &SubscriptionSettings{}, filter)
|
||||||
@ -66,7 +66,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
|
|||||||
log.Debug("retrieving cids")
|
log.Debug("retrieving cids")
|
||||||
|
|
||||||
// Begin new db tx
|
// Begin new db tx
|
||||||
tx, err := ecr.db.Beginx()
|
tx, err := bcr.db.Beginx()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, true, err
|
return nil, true, err
|
||||||
}
|
}
|
||||||
@ -82,7 +82,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// Retrieve cached header CIDs
|
// Retrieve cached header CIDs
|
||||||
headers, err := ecr.RetrieveHeaderCIDs(tx, blockNumber)
|
headers, err := bcr.RetrieveHeaderCIDs(tx, blockNumber)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("header cid retrieval error")
|
log.Error("header cid retrieval error")
|
||||||
return nil, true, err
|
return nil, true, err
|
||||||
@ -98,7 +98,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
|
|||||||
}
|
}
|
||||||
// Retrieve cached trx CIDs
|
// Retrieve cached trx CIDs
|
||||||
if !streamFilter.TxFilter.Off {
|
if !streamFilter.TxFilter.Off {
|
||||||
cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, header.ID)
|
cw.Transactions, err = bcr.RetrieveTxCIDs(tx, streamFilter.TxFilter, header.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("transaction cid retrieval error")
|
log.Error("transaction cid retrieval error")
|
||||||
return nil, true, err
|
return nil, true, err
|
||||||
@ -114,7 +114,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight
|
// RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight
|
||||||
func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]HeaderModel, error) {
|
func (bcr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]HeaderModel, error) {
|
||||||
log.Debug("retrieving header cids for block ", blockNumber)
|
log.Debug("retrieving header cids for block ", blockNumber)
|
||||||
headers := make([]HeaderModel, 0)
|
headers := make([]HeaderModel, 0)
|
||||||
pgStr := `SELECT * FROM btc.header_cids
|
pgStr := `SELECT * FROM btc.header_cids
|
||||||
@ -124,7 +124,7 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]H
|
|||||||
|
|
||||||
// RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters
|
// RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters
|
||||||
// also returns the ids for the returned transaction cids
|
// also returns the ids for the returned transaction cids
|
||||||
func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]TxModel, error) {
|
func (bcr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]TxModel, error) {
|
||||||
log.Debug("retrieving transaction cids for header id ", headerID)
|
log.Debug("retrieving transaction cids for header id ", headerID)
|
||||||
args := make([]interface{}, 0, 3)
|
args := make([]interface{}, 0, 3)
|
||||||
results := make([]TxModel, 0)
|
results := make([]TxModel, 0)
|
||||||
@ -168,7 +168,22 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db
|
// RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db
|
||||||
func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) {
|
func (bcr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) {
|
||||||
|
log.Info("searching for gaps in the btc super node database")
|
||||||
|
startingBlock, err := bcr.RetrieveFirstBlockNumber()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("btc CIDRetriever RetrieveFirstBlockNumber error: %v", err)
|
||||||
|
}
|
||||||
|
var initialGap []shared.Gap
|
||||||
|
if startingBlock != 0 {
|
||||||
|
stop := uint64(startingBlock - 1)
|
||||||
|
log.Infof("found gap at the beginning of the btc sync from 0 to %d", stop)
|
||||||
|
initialGap = []shared.Gap{{
|
||||||
|
Start: 0,
|
||||||
|
Stop: stop,
|
||||||
|
}}
|
||||||
|
}
|
||||||
|
|
||||||
pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM btc.header_cids
|
pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM btc.header_cids
|
||||||
LEFT JOIN btc.header_cids r on btc.header_cids.block_number = r.block_number - 1
|
LEFT JOIN btc.header_cids r on btc.header_cids.block_number = r.block_number - 1
|
||||||
LEFT JOIN btc.header_cids fr on btc.header_cids.block_number < fr.block_number
|
LEFT JOIN btc.header_cids fr on btc.header_cids.block_number < fr.block_number
|
||||||
@ -178,7 +193,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
|
|||||||
Start uint64 `db:"start"`
|
Start uint64 `db:"start"`
|
||||||
Stop uint64 `db:"stop"`
|
Stop uint64 `db:"stop"`
|
||||||
}, 0)
|
}, 0)
|
||||||
if err := ecr.db.Select(&results, pgStr); err != nil && err != sql.ErrNoRows {
|
if err := bcr.db.Select(&results, pgStr); err != nil && err != sql.ErrNoRows {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
emptyGaps := make([]shared.Gap, len(results))
|
emptyGaps := make([]shared.Gap, len(results))
|
||||||
@ -195,18 +210,18 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
|
|||||||
WHERE times_validated < $1
|
WHERE times_validated < $1
|
||||||
ORDER BY block_number`
|
ORDER BY block_number`
|
||||||
var heights []uint64
|
var heights []uint64
|
||||||
if err := ecr.db.Select(&heights, pgStr, validationLevel); err != nil && err != sql.ErrNoRows {
|
if err := bcr.db.Select(&heights, pgStr, validationLevel); err != nil && err != sql.ErrNoRows {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil
|
return append(append(initialGap, emptyGaps...), utils.MissingHeightsToGaps(heights)...), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash
|
// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash
|
||||||
func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel, []TxModel, error) {
|
func (bcr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel, []TxModel, error) {
|
||||||
log.Debug("retrieving block cids for block hash ", blockHash.String())
|
log.Debug("retrieving block cids for block hash ", blockHash.String())
|
||||||
|
|
||||||
// Begin new db tx
|
// Begin new db tx
|
||||||
tx, err := ecr.db.Beginx()
|
tx, err := bcr.db.Beginx()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return HeaderModel{}, nil, err
|
return HeaderModel{}, nil, err
|
||||||
}
|
}
|
||||||
@ -221,12 +236,12 @@ func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
headerCID, err := ecr.RetrieveHeaderCIDByHash(tx, blockHash)
|
headerCID, err := bcr.RetrieveHeaderCIDByHash(tx, blockHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("header cid retrieval error")
|
log.Error("header cid retrieval error")
|
||||||
return HeaderModel{}, nil, err
|
return HeaderModel{}, nil, err
|
||||||
}
|
}
|
||||||
txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID)
|
txCIDs, err := bcr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("tx cid retrieval error")
|
log.Error("tx cid retrieval error")
|
||||||
}
|
}
|
||||||
@ -234,11 +249,11 @@ func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number
|
// RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number
|
||||||
func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, []TxModel, error) {
|
func (bcr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, []TxModel, error) {
|
||||||
log.Debug("retrieving block cids for block number ", blockNumber)
|
log.Debug("retrieving block cids for block number ", blockNumber)
|
||||||
|
|
||||||
// Begin new db tx
|
// Begin new db tx
|
||||||
tx, err := ecr.db.Beginx()
|
tx, err := bcr.db.Beginx()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return HeaderModel{}, nil, err
|
return HeaderModel{}, nil, err
|
||||||
}
|
}
|
||||||
@ -253,7 +268,7 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel,
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
headerCID, err := ecr.RetrieveHeaderCIDs(tx, blockNumber)
|
headerCID, err := bcr.RetrieveHeaderCIDs(tx, blockNumber)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("header cid retrieval error")
|
log.Error("header cid retrieval error")
|
||||||
return HeaderModel{}, nil, err
|
return HeaderModel{}, nil, err
|
||||||
@ -261,7 +276,7 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel,
|
|||||||
if len(headerCID) < 1 {
|
if len(headerCID) < 1 {
|
||||||
return HeaderModel{}, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber)
|
return HeaderModel{}, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber)
|
||||||
}
|
}
|
||||||
txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID)
|
txCIDs, err := bcr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("tx cid retrieval error")
|
log.Error("tx cid retrieval error")
|
||||||
}
|
}
|
||||||
@ -269,7 +284,7 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RetrieveHeaderCIDByHash returns the header for the given block hash
|
// RetrieveHeaderCIDByHash returns the header for the given block hash
|
||||||
func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (HeaderModel, error) {
|
func (bcr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (HeaderModel, error) {
|
||||||
log.Debug("retrieving header cids for block hash ", blockHash.String())
|
log.Debug("retrieving header cids for block hash ", blockHash.String())
|
||||||
pgStr := `SELECT * FROM btc.header_cids
|
pgStr := `SELECT * FROM btc.header_cids
|
||||||
WHERE block_hash = $1`
|
WHERE block_hash = $1`
|
||||||
@ -278,7 +293,7 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.H
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id
|
// RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id
|
||||||
func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]TxModel, error) {
|
func (bcr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]TxModel, error) {
|
||||||
log.Debug("retrieving tx cids for block id ", headerID)
|
log.Debug("retrieving tx cids for block id ", headerID)
|
||||||
pgStr := `SELECT * FROM btc.transaction_cids
|
pgStr := `SELECT * FROM btc.transaction_cids
|
||||||
WHERE header_id = $1`
|
WHERE header_id = $1`
|
||||||
|
@ -66,7 +66,6 @@ type Config struct {
|
|||||||
IPFSPath string
|
IPFSPath string
|
||||||
IPFSMode shared.IPFSMode
|
IPFSMode shared.IPFSMode
|
||||||
DBConfig config.Database
|
DBConfig config.Database
|
||||||
Quit chan bool
|
|
||||||
// Server fields
|
// Server fields
|
||||||
Serve bool
|
Serve bool
|
||||||
ServeDBConn *postgres.DB
|
ServeDBConn *postgres.DB
|
||||||
@ -182,8 +181,6 @@ func NewSuperNodeConfig() (*Config, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Quit = make(chan bool)
|
|
||||||
|
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -443,6 +443,21 @@ func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageF
|
|||||||
// RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db
|
// RetrieveGapsInData is used to find the the block numbers at which we are missing data in the db
|
||||||
// it finds the union of heights where no data exists and where the times_validated is lower than the validation level
|
// it finds the union of heights where no data exists and where the times_validated is lower than the validation level
|
||||||
func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) {
|
func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap, error) {
|
||||||
|
log.Info("searching for gaps in the eth super node database")
|
||||||
|
startingBlock, err := ecr.RetrieveFirstBlockNumber()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("eth CIDRetriever RetrieveFirstBlockNumber error: %v", err)
|
||||||
|
}
|
||||||
|
var initialGap []shared.Gap
|
||||||
|
if startingBlock != 0 {
|
||||||
|
stop := uint64(startingBlock - 1)
|
||||||
|
log.Infof("found gap at the beginning of the eth sync from 0 to %d", stop)
|
||||||
|
initialGap = []shared.Gap{{
|
||||||
|
Start: 0,
|
||||||
|
Stop: stop,
|
||||||
|
}}
|
||||||
|
}
|
||||||
|
|
||||||
pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM eth.header_cids
|
pgStr := `SELECT header_cids.block_number + 1 AS start, min(fr.block_number) - 1 AS stop FROM eth.header_cids
|
||||||
LEFT JOIN eth.header_cids r on eth.header_cids.block_number = r.block_number - 1
|
LEFT JOIN eth.header_cids r on eth.header_cids.block_number = r.block_number - 1
|
||||||
LEFT JOIN eth.header_cids fr on eth.header_cids.block_number < fr.block_number
|
LEFT JOIN eth.header_cids fr on eth.header_cids.block_number < fr.block_number
|
||||||
@ -472,7 +487,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
|
|||||||
if err := ecr.db.Select(&heights, pgStr, validationLevel); err != nil && err != sql.ErrNoRows {
|
if err := ecr.db.Select(&heights, pgStr, validationLevel); err != nil && err != sql.ErrNoRows {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil
|
return append(append(initialGap, emptyGaps...), utils.MissingHeightsToGaps(heights)...), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash
|
// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash
|
||||||
|
@ -474,54 +474,64 @@ var _ = Describe("Retriever", func() {
|
|||||||
|
|
||||||
Describe("RetrieveGapsInData", func() {
|
Describe("RetrieveGapsInData", func() {
|
||||||
It("Doesn't return gaps if there are none", func() {
|
It("Doesn't return gaps if there are none", func() {
|
||||||
|
payload0 := *mocks.MockCIDPayload
|
||||||
|
payload0.HeaderCID.BlockNumber = "0"
|
||||||
payload1 := *mocks.MockCIDPayload
|
payload1 := *mocks.MockCIDPayload
|
||||||
payload1.HeaderCID.BlockNumber = "2"
|
payload1.HeaderCID.BlockNumber = "1"
|
||||||
payload2 := payload1
|
payload2 := payload1
|
||||||
payload2.HeaderCID.BlockNumber = "3"
|
payload2.HeaderCID.BlockNumber = "2"
|
||||||
err := repo.Index(mocks.MockCIDPayload)
|
payload3 := payload2
|
||||||
|
payload3.HeaderCID.BlockNumber = "3"
|
||||||
|
err := repo.Index(&payload0)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
err = repo.Index(&payload1)
|
err = repo.Index(&payload1)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
err = repo.Index(&payload2)
|
err = repo.Index(&payload2)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = repo.Index(&payload3)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
gaps, err := retriever.RetrieveGapsInData(1)
|
gaps, err := retriever.RetrieveGapsInData(1)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
Expect(len(gaps)).To(Equal(0))
|
Expect(len(gaps)).To(Equal(0))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("Doesn't return the gap from 0 to the earliest block", func() {
|
It("Returns the gap from 0 to the earliest block", func() {
|
||||||
payload := *mocks.MockCIDPayload
|
payload := *mocks.MockCIDPayload
|
||||||
payload.HeaderCID.BlockNumber = "5"
|
payload.HeaderCID.BlockNumber = "5"
|
||||||
err := repo.Index(&payload)
|
err := repo.Index(&payload)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
gaps, err := retriever.RetrieveGapsInData(1)
|
gaps, err := retriever.RetrieveGapsInData(1)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
Expect(len(gaps)).To(Equal(0))
|
Expect(len(gaps)).To(Equal(1))
|
||||||
|
Expect(gaps[0].Start).To(Equal(uint64(0)))
|
||||||
|
Expect(gaps[0].Stop).To(Equal(uint64(4)))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("Can handle single block gaps", func() {
|
It("Can handle single block gaps", func() {
|
||||||
|
payload0 := *mocks.MockCIDPayload
|
||||||
|
payload0.HeaderCID.BlockNumber = "0"
|
||||||
payload1 := *mocks.MockCIDPayload
|
payload1 := *mocks.MockCIDPayload
|
||||||
payload1.HeaderCID.BlockNumber = "2"
|
payload1.HeaderCID.BlockNumber = "1"
|
||||||
payload2 := payload1
|
payload3 := payload1
|
||||||
payload2.HeaderCID.BlockNumber = "4"
|
payload3.HeaderCID.BlockNumber = "3"
|
||||||
err := repo.Index(mocks.MockCIDPayload)
|
err := repo.Index(&payload0)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
err = repo.Index(&payload1)
|
err = repo.Index(&payload1)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
err = repo.Index(&payload2)
|
err = repo.Index(&payload3)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
gaps, err := retriever.RetrieveGapsInData(1)
|
gaps, err := retriever.RetrieveGapsInData(1)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
Expect(len(gaps)).To(Equal(1))
|
Expect(len(gaps)).To(Equal(1))
|
||||||
Expect(gaps[0].Start).To(Equal(uint64(3)))
|
Expect(gaps[0].Start).To(Equal(uint64(2)))
|
||||||
Expect(gaps[0].Stop).To(Equal(uint64(3)))
|
Expect(gaps[0].Stop).To(Equal(uint64(2)))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("Finds gap between two entries", func() {
|
It("Finds gap between two entries", func() {
|
||||||
payload1 := *mocks.MockCIDPayload
|
payload1 := *mocks.MockCIDPayload
|
||||||
payload1.HeaderCID.BlockNumber = "1010101"
|
payload1.HeaderCID.BlockNumber = "1010101"
|
||||||
payload2 := payload1
|
payload2 := payload1
|
||||||
payload2.HeaderCID.BlockNumber = "5"
|
payload2.HeaderCID.BlockNumber = "0"
|
||||||
err := repo.Index(&payload1)
|
err := repo.Index(&payload1)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
err = repo.Index(&payload2)
|
err = repo.Index(&payload2)
|
||||||
@ -529,13 +539,15 @@ var _ = Describe("Retriever", func() {
|
|||||||
gaps, err := retriever.RetrieveGapsInData(1)
|
gaps, err := retriever.RetrieveGapsInData(1)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
Expect(len(gaps)).To(Equal(1))
|
Expect(len(gaps)).To(Equal(1))
|
||||||
Expect(gaps[0].Start).To(Equal(uint64(6)))
|
Expect(gaps[0].Start).To(Equal(uint64(1)))
|
||||||
Expect(gaps[0].Stop).To(Equal(uint64(1010100)))
|
Expect(gaps[0].Stop).To(Equal(uint64(1010100)))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("Finds gaps between multiple entries", func() {
|
It("Finds gaps between multiple entries", func() {
|
||||||
payload1 := *mocks.MockCIDPayload
|
payload := *mocks.MockCIDPayload
|
||||||
payload1.HeaderCID.BlockNumber = "1010101"
|
payload.HeaderCID.BlockNumber = "1010101"
|
||||||
|
payload1 := payload
|
||||||
|
payload1.HeaderCID.BlockNumber = "1"
|
||||||
payload2 := payload1
|
payload2 := payload1
|
||||||
payload2.HeaderCID.BlockNumber = "5"
|
payload2.HeaderCID.BlockNumber = "5"
|
||||||
payload3 := payload2
|
payload3 := payload2
|
||||||
@ -554,7 +566,9 @@ var _ = Describe("Retriever", func() {
|
|||||||
payload9.HeaderCID.BlockNumber = "106"
|
payload9.HeaderCID.BlockNumber = "106"
|
||||||
payload10 := payload5
|
payload10 := payload5
|
||||||
payload10.HeaderCID.BlockNumber = "1000"
|
payload10.HeaderCID.BlockNumber = "1000"
|
||||||
err := repo.Index(&payload1)
|
err := repo.Index(&payload)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = repo.Index(&payload1)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
err = repo.Index(&payload2)
|
err = repo.Index(&payload2)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
@ -577,15 +591,19 @@ var _ = Describe("Retriever", func() {
|
|||||||
|
|
||||||
gaps, err := retriever.RetrieveGapsInData(1)
|
gaps, err := retriever.RetrieveGapsInData(1)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
Expect(len(gaps)).To(Equal(3))
|
Expect(len(gaps)).To(Equal(5))
|
||||||
|
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 0, Stop: 0})).To(BeTrue())
|
||||||
|
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 2, Stop: 4})).To(BeTrue())
|
||||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue())
|
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue())
|
||||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue())
|
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 107, Stop: 999})).To(BeTrue())
|
||||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue())
|
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 1001, Stop: 1010100})).To(BeTrue())
|
||||||
})
|
})
|
||||||
|
|
||||||
It("Finds validation level gaps", func() {
|
It("Finds validation level gaps", func() {
|
||||||
payload1 := *mocks.MockCIDPayload
|
payload := *mocks.MockCIDPayload
|
||||||
payload1.HeaderCID.BlockNumber = "1010101"
|
payload.HeaderCID.BlockNumber = "1010101"
|
||||||
|
payload1 := payload
|
||||||
|
payload1.HeaderCID.BlockNumber = "1"
|
||||||
payload2 := payload1
|
payload2 := payload1
|
||||||
payload2.HeaderCID.BlockNumber = "5"
|
payload2.HeaderCID.BlockNumber = "5"
|
||||||
payload3 := payload2
|
payload3 := payload2
|
||||||
@ -610,7 +628,9 @@ var _ = Describe("Retriever", func() {
|
|||||||
payload12.HeaderCID.BlockNumber = "109"
|
payload12.HeaderCID.BlockNumber = "109"
|
||||||
payload13 := payload5
|
payload13 := payload5
|
||||||
payload13.HeaderCID.BlockNumber = "1000"
|
payload13.HeaderCID.BlockNumber = "1000"
|
||||||
err := repo.Index(&payload1)
|
err := repo.Index(&payload)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = repo.Index(&payload1)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
err = repo.Index(&payload2)
|
err = repo.Index(&payload2)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
@ -643,7 +663,9 @@ var _ = Describe("Retriever", func() {
|
|||||||
|
|
||||||
gaps, err := retriever.RetrieveGapsInData(1)
|
gaps, err := retriever.RetrieveGapsInData(1)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
Expect(len(gaps)).To(Equal(6))
|
Expect(len(gaps)).To(Equal(8))
|
||||||
|
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 0, Stop: 0})).To(BeTrue())
|
||||||
|
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 2, Stop: 4})).To(BeTrue())
|
||||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue())
|
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 6, Stop: 99})).To(BeTrue())
|
||||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 101, Stop: 102})).To(BeTrue())
|
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 101, Stop: 102})).To(BeTrue())
|
||||||
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 104, Stop: 104})).To(BeTrue())
|
Expect(shared.ListContainsGap(gaps, shared.Gap{Start: 104, Stop: 104})).To(BeTrue())
|
||||||
|
@ -124,8 +124,8 @@ var (
|
|||||||
storageCID = "mockStorageCID1"
|
storageCID = "mockStorageCID1"
|
||||||
storagePath = []byte{'\x01'}
|
storagePath = []byte{'\x01'}
|
||||||
storageKey = crypto.Keccak256Hash(common.Hex2Bytes("0x0000000000000000000000000000000000000000000000000000000000000000"))
|
storageKey = crypto.Keccak256Hash(common.Hex2Bytes("0x0000000000000000000000000000000000000000000000000000000000000000"))
|
||||||
storageModels1 = map[common.Hash][]eth2.StorageNodeModel{
|
storageModels1 = map[string][]eth2.StorageNodeModel{
|
||||||
crypto.Keccak256Hash(state1Path): {
|
common.Bytes2Hex(state1Path): {
|
||||||
{
|
{
|
||||||
CID: storageCID,
|
CID: storageCID,
|
||||||
StorageKey: storageKey.String(),
|
StorageKey: storageKey.String(),
|
||||||
|
@ -61,7 +61,7 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert
|
|||||||
Receipts: make(types.Receipts, 0, trxLen),
|
Receipts: make(types.Receipts, 0, trxLen),
|
||||||
ReceiptMetaData: make([]ReceiptModel, 0, trxLen),
|
ReceiptMetaData: make([]ReceiptModel, 0, trxLen),
|
||||||
StateNodes: make([]TrieNode, 0),
|
StateNodes: make([]TrieNode, 0),
|
||||||
StorageNodes: make(map[common.Hash][]TrieNode),
|
StorageNodes: make(map[string][]TrieNode),
|
||||||
}
|
}
|
||||||
signer := types.MakeSigner(pc.chainConfig, block.Number())
|
signer := types.MakeSigner(pc.chainConfig, block.Number())
|
||||||
transactions := block.Transactions()
|
transactions := block.Transactions()
|
||||||
@ -100,10 +100,12 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert
|
|||||||
}
|
}
|
||||||
mappedContracts[log.Address.String()] = true
|
mappedContracts[log.Address.String()] = true
|
||||||
}
|
}
|
||||||
|
// These are the contracts seen in the logs
|
||||||
logContracts := make([]string, 0, len(mappedContracts))
|
logContracts := make([]string, 0, len(mappedContracts))
|
||||||
for addr := range mappedContracts {
|
for addr := range mappedContracts {
|
||||||
logContracts = append(logContracts, addr)
|
logContracts = append(logContracts, addr)
|
||||||
}
|
}
|
||||||
|
// This is the contract address if this receipt is for a contract creation tx
|
||||||
contract := shared.HandleNullAddr(receipt.ContractAddress)
|
contract := shared.HandleNullAddr(receipt.ContractAddress)
|
||||||
var contractHash string
|
var contractHash string
|
||||||
if contract != "" {
|
if contract != "" {
|
||||||
@ -124,60 +126,27 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Unpack state diff rlp to access fields
|
// Unpack state diff rlp to access fields
|
||||||
stateDiff := new(statediff.StateDiff)
|
stateDiff := new(statediff.StateObject)
|
||||||
if err := rlp.DecodeBytes(stateDiffPayload.StateDiffRlp, stateDiff); err != nil {
|
if err := rlp.DecodeBytes(stateDiffPayload.StateObjectRlp, stateDiff); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, createdAccount := range stateDiff.CreatedAccounts {
|
for _, stateNode := range stateDiff.Nodes {
|
||||||
statePathHash := crypto.Keccak256Hash(createdAccount.Path)
|
statePath := common.Bytes2Hex(stateNode.Path)
|
||||||
convertedPayload.StateNodes = append(convertedPayload.StateNodes, TrieNode{
|
convertedPayload.StateNodes = append(convertedPayload.StateNodes, TrieNode{
|
||||||
Path: createdAccount.Path,
|
Path: stateNode.Path,
|
||||||
Value: createdAccount.NodeValue,
|
Value: stateNode.NodeValue,
|
||||||
Type: createdAccount.NodeType,
|
Type: stateNode.NodeType,
|
||||||
LeafKey: common.BytesToHash(createdAccount.LeafKey),
|
LeafKey: common.BytesToHash(stateNode.LeafKey),
|
||||||
})
|
})
|
||||||
for _, storageDiff := range createdAccount.Storage {
|
for _, storageNode := range stateNode.StorageNodes {
|
||||||
convertedPayload.StorageNodes[statePathHash] = append(convertedPayload.StorageNodes[statePathHash], TrieNode{
|
convertedPayload.StorageNodes[statePath] = append(convertedPayload.StorageNodes[statePath], TrieNode{
|
||||||
Path: storageDiff.Path,
|
Path: storageNode.Path,
|
||||||
Value: storageDiff.NodeValue,
|
Value: storageNode.NodeValue,
|
||||||
Type: storageDiff.NodeType,
|
Type: storageNode.NodeType,
|
||||||
LeafKey: common.BytesToHash(storageDiff.LeafKey),
|
LeafKey: common.BytesToHash(storageNode.LeafKey),
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, deletedAccount := range stateDiff.DeletedAccounts {
|
|
||||||
statePathHash := crypto.Keccak256Hash(deletedAccount.Path)
|
|
||||||
convertedPayload.StateNodes = append(convertedPayload.StateNodes, TrieNode{
|
|
||||||
Path: deletedAccount.Path,
|
|
||||||
Value: deletedAccount.NodeValue,
|
|
||||||
Type: deletedAccount.NodeType,
|
|
||||||
LeafKey: common.BytesToHash(deletedAccount.LeafKey),
|
|
||||||
})
|
|
||||||
for _, storageDiff := range deletedAccount.Storage {
|
|
||||||
convertedPayload.StorageNodes[statePathHash] = append(convertedPayload.StorageNodes[statePathHash], TrieNode{
|
|
||||||
Path: storageDiff.Path,
|
|
||||||
Value: storageDiff.NodeValue,
|
|
||||||
Type: storageDiff.NodeType,
|
|
||||||
LeafKey: common.BytesToHash(storageDiff.LeafKey),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, updatedAccount := range stateDiff.UpdatedAccounts {
|
|
||||||
statePathHash := crypto.Keccak256Hash(updatedAccount.Path)
|
|
||||||
convertedPayload.StateNodes = append(convertedPayload.StateNodes, TrieNode{
|
|
||||||
Path: updatedAccount.Path,
|
|
||||||
Value: updatedAccount.NodeValue,
|
|
||||||
Type: updatedAccount.NodeType,
|
|
||||||
LeafKey: common.BytesToHash(updatedAccount.LeafKey),
|
|
||||||
})
|
|
||||||
for _, storageDiff := range updatedAccount.Storage {
|
|
||||||
convertedPayload.StorageNodes[statePathHash] = append(convertedPayload.StorageNodes[statePathHash], TrieNode{
|
|
||||||
Path: storageDiff.Path,
|
|
||||||
Value: storageDiff.NodeValue,
|
|
||||||
Type: storageDiff.NodeType,
|
|
||||||
LeafKey: common.BytesToHash(storageDiff.LeafKey),
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return convertedPayload, nil
|
return convertedPayload, nil
|
||||||
}
|
}
|
||||||
|
@ -291,7 +291,7 @@ func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storag
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !storageFilter.Off && checkNodeKeys(storageAddressFilters, stateNode.LeafKey) {
|
if !storageFilter.Off && checkNodeKeys(storageAddressFilters, stateNode.LeafKey) {
|
||||||
for _, storageNode := range payload.StorageNodes[crypto.Keccak256Hash(stateNode.Path)] {
|
for _, storageNode := range payload.StorageNodes[common.Bytes2Hex(stateNode.Path)] {
|
||||||
if checkNodeKeys(storageKeyFilters, storageNode.LeafKey) {
|
if checkNodeKeys(storageKeyFilters, storageNode.LeafKey) {
|
||||||
cid, err := ipld.RawdataToCid(ipld.MEthStorageTrie, storageNode.Value, multihash.KECCAK_256)
|
cid, err := ipld.RawdataToCid(ipld.MEthStorageTrie, storageNode.Value, multihash.KECCAK_256)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -26,6 +26,8 @@ func ResolveFromNodeType(nodeType statediff.NodeType) int {
|
|||||||
return 1
|
return 1
|
||||||
case statediff.Leaf:
|
case statediff.Leaf:
|
||||||
return 2
|
return 2
|
||||||
|
case statediff.Removed:
|
||||||
|
return 3
|
||||||
default:
|
default:
|
||||||
return -1
|
return -1
|
||||||
}
|
}
|
||||||
@ -39,6 +41,8 @@ func ResolveToNodeType(nodeType int) statediff.NodeType {
|
|||||||
return statediff.Extension
|
return statediff.Extension
|
||||||
case 2:
|
case 2:
|
||||||
return statediff.Leaf
|
return statediff.Leaf
|
||||||
|
case 3:
|
||||||
|
return statediff.Removed
|
||||||
default:
|
default:
|
||||||
return statediff.Unknown
|
return statediff.Unknown
|
||||||
}
|
}
|
||||||
|
@ -20,7 +20,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
@ -159,13 +158,13 @@ func (in *CIDIndexer) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *CIDPayload,
|
|||||||
}
|
}
|
||||||
// If we have a state leaf node, index the associated account and storage nodes
|
// If we have a state leaf node, index the associated account and storage nodes
|
||||||
if stateCID.NodeType == 2 {
|
if stateCID.NodeType == 2 {
|
||||||
pathKey := crypto.Keccak256Hash(stateCID.Path)
|
statePath := common.Bytes2Hex(stateCID.Path)
|
||||||
for _, storageCID := range payload.StorageNodeCIDs[pathKey] {
|
for _, storageCID := range payload.StorageNodeCIDs[statePath] {
|
||||||
if err := in.indexStorageCID(tx, storageCID, stateID); err != nil {
|
if err := in.indexStorageCID(tx, storageCID, stateID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if stateAccount, ok := payload.StateAccounts[pathKey]; ok {
|
if stateAccount, ok := payload.StateAccounts[statePath]; ok {
|
||||||
if err := in.indexStateAccount(tx, stateAccount, stateID); err != nil {
|
if err := in.indexStateAccount(tx, stateAccount, stateID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -218,7 +218,7 @@ var (
|
|||||||
nonce1 = uint64(1)
|
nonce1 = uint64(1)
|
||||||
ContractRoot = "0x821e2556a290c86405f8160a2d662042a431ba456b9db265c79bb837c04be5f0"
|
ContractRoot = "0x821e2556a290c86405f8160a2d662042a431ba456b9db265c79bb837c04be5f0"
|
||||||
ContractCodeHash = common.HexToHash("0x753f98a8d4328b15636e46f66f2cb4bc860100aa17967cc145fcd17d1d4710ea")
|
ContractCodeHash = common.HexToHash("0x753f98a8d4328b15636e46f66f2cb4bc860100aa17967cc145fcd17d1d4710ea")
|
||||||
contractPathHash = crypto.Keccak256Hash([]byte{'\x06'})
|
contractPath = common.Bytes2Hex([]byte{'\x06'})
|
||||||
ContractLeafKey = testhelpers.AddressToLeafKey(ContractAddress)
|
ContractLeafKey = testhelpers.AddressToLeafKey(ContractAddress)
|
||||||
ContractAccount, _ = rlp.EncodeToBytes(state.Account{
|
ContractAccount, _ = rlp.EncodeToBytes(state.Account{
|
||||||
Nonce: nonce1,
|
Nonce: nonce1,
|
||||||
@ -235,7 +235,7 @@ var (
|
|||||||
nonce0 = uint64(0)
|
nonce0 = uint64(0)
|
||||||
AccountRoot = "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"
|
AccountRoot = "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"
|
||||||
AccountCodeHash = common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470")
|
AccountCodeHash = common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470")
|
||||||
accountPathHash = crypto.Keccak256Hash([]byte{'\x0c'})
|
accountPath = common.Bytes2Hex([]byte{'\x0c'})
|
||||||
AccountAddresss = common.HexToAddress("0x0D3ab14BBaD3D99F4203bd7a11aCB94882050E7e")
|
AccountAddresss = common.HexToAddress("0x0D3ab14BBaD3D99F4203bd7a11aCB94882050E7e")
|
||||||
AccountLeafKey = testhelpers.Account2LeafKey
|
AccountLeafKey = testhelpers.Account2LeafKey
|
||||||
Account, _ = rlp.EncodeToBytes(state.Account{
|
Account, _ = rlp.EncodeToBytes(state.Account{
|
||||||
@ -250,13 +250,13 @@ var (
|
|||||||
Account,
|
Account,
|
||||||
})
|
})
|
||||||
|
|
||||||
CreatedAccountDiffs = []statediff.AccountDiff{
|
StateDiffs = []statediff.StateNode{
|
||||||
{
|
{
|
||||||
Path: []byte{'\x06'},
|
Path: []byte{'\x06'},
|
||||||
NodeType: statediff.Leaf,
|
NodeType: statediff.Leaf,
|
||||||
LeafKey: ContractLeafKey,
|
LeafKey: ContractLeafKey,
|
||||||
NodeValue: ContractLeafNode,
|
NodeValue: ContractLeafNode,
|
||||||
Storage: []statediff.StorageDiff{
|
StorageNodes: []statediff.StorageNode{
|
||||||
{
|
{
|
||||||
Path: []byte{},
|
Path: []byte{},
|
||||||
NodeType: statediff.Leaf,
|
NodeType: statediff.Leaf,
|
||||||
@ -266,18 +266,18 @@ var (
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Path: []byte{'\x0c'},
|
Path: []byte{'\x0c'},
|
||||||
NodeType: statediff.Leaf,
|
NodeType: statediff.Leaf,
|
||||||
LeafKey: AccountLeafKey,
|
LeafKey: AccountLeafKey,
|
||||||
NodeValue: AccountLeafNode,
|
NodeValue: AccountLeafNode,
|
||||||
Storage: []statediff.StorageDiff{},
|
StorageNodes: []statediff.StorageNode{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
MockStateDiff = statediff.StateDiff{
|
MockStateDiff = statediff.StateObject{
|
||||||
BlockNumber: BlockNumber,
|
BlockNumber: BlockNumber,
|
||||||
BlockHash: MockBlock.Hash(),
|
BlockHash: MockBlock.Hash(),
|
||||||
CreatedAccounts: CreatedAccountDiffs,
|
Nodes: StateDiffs,
|
||||||
}
|
}
|
||||||
MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff)
|
MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff)
|
||||||
MockStateNodes = []eth.TrieNode{
|
MockStateNodes = []eth.TrieNode{
|
||||||
@ -308,8 +308,8 @@ var (
|
|||||||
StateKey: common.BytesToHash(AccountLeafKey).Hex(),
|
StateKey: common.BytesToHash(AccountLeafKey).Hex(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
MockStorageNodes = map[common.Hash][]eth.TrieNode{
|
MockStorageNodes = map[string][]eth.TrieNode{
|
||||||
contractPathHash: {
|
contractPath: {
|
||||||
{
|
{
|
||||||
LeafKey: common.BytesToHash(StorageLeafKey),
|
LeafKey: common.BytesToHash(StorageLeafKey),
|
||||||
Value: StorageLeafNode,
|
Value: StorageLeafNode,
|
||||||
@ -322,7 +322,7 @@ var (
|
|||||||
// aggregate payloads
|
// aggregate payloads
|
||||||
MockStateDiffPayload = statediff.Payload{
|
MockStateDiffPayload = statediff.Payload{
|
||||||
BlockRlp: MockBlockRlp,
|
BlockRlp: MockBlockRlp,
|
||||||
StateDiffRlp: MockStateDiffBytes,
|
StateObjectRlp: MockStateDiffBytes,
|
||||||
ReceiptsRlp: ReceiptsRlp,
|
ReceiptsRlp: ReceiptsRlp,
|
||||||
TotalDifficulty: MockBlock.Difficulty(),
|
TotalDifficulty: MockBlock.Difficulty(),
|
||||||
}
|
}
|
||||||
@ -360,8 +360,8 @@ var (
|
|||||||
MockTransactions[2].Hash(): MockRctMetaPostPublish[2],
|
MockTransactions[2].Hash(): MockRctMetaPostPublish[2],
|
||||||
},
|
},
|
||||||
StateNodeCIDs: MockStateMetaPostPublish,
|
StateNodeCIDs: MockStateMetaPostPublish,
|
||||||
StorageNodeCIDs: map[common.Hash][]eth.StorageNodeModel{
|
StorageNodeCIDs: map[string][]eth.StorageNodeModel{
|
||||||
contractPathHash: {
|
contractPath: {
|
||||||
{
|
{
|
||||||
CID: StorageCID.String(),
|
CID: StorageCID.String(),
|
||||||
Path: []byte{},
|
Path: []byte{},
|
||||||
@ -370,14 +370,14 @@ var (
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
StateAccounts: map[common.Hash]eth.StateAccountModel{
|
StateAccounts: map[string]eth.StateAccountModel{
|
||||||
contractPathHash: {
|
contractPath: {
|
||||||
Balance: big.NewInt(0).String(),
|
Balance: big.NewInt(0).String(),
|
||||||
Nonce: nonce1,
|
Nonce: nonce1,
|
||||||
CodeHash: ContractCodeHash.Bytes(),
|
CodeHash: ContractCodeHash.Bytes(),
|
||||||
StorageRoot: common.HexToHash(ContractRoot).String(),
|
StorageRoot: common.HexToHash(ContractRoot).String(),
|
||||||
},
|
},
|
||||||
accountPathHash: {
|
accountPath: {
|
||||||
Balance: big.NewInt(1000).String(),
|
Balance: big.NewInt(1000).String(),
|
||||||
Nonce: nonce0,
|
Nonce: nonce0,
|
||||||
CodeHash: AccountCodeHash.Bytes(),
|
CodeHash: AccountCodeHash.Bytes(),
|
||||||
|
@ -38,34 +38,41 @@ type PayloadFetcher struct {
|
|||||||
// http.Client is thread-safe
|
// http.Client is thread-safe
|
||||||
client BatchClient
|
client BatchClient
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
|
params statediff.Params
|
||||||
}
|
}
|
||||||
|
|
||||||
const method = "statediff_stateDiffAt"
|
const method = "statediff_stateDiffAt"
|
||||||
|
|
||||||
// NewStateDiffFetcher returns a PayloadFetcher
|
// NewPayloadFetcher returns a PayloadFetcher
|
||||||
func NewPayloadFetcher(bc BatchClient, timeout time.Duration) *PayloadFetcher {
|
func NewPayloadFetcher(bc BatchClient, timeout time.Duration) *PayloadFetcher {
|
||||||
return &PayloadFetcher{
|
return &PayloadFetcher{
|
||||||
client: bc,
|
client: bc,
|
||||||
timeout: timeout,
|
timeout: timeout,
|
||||||
|
params: statediff.Params{
|
||||||
|
IncludeReceipts: true,
|
||||||
|
IncludeTD: true,
|
||||||
|
IncludeBlock: true,
|
||||||
|
IntermediateStateNodes: true,
|
||||||
|
IntermediateStorageNodes: true,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// FetchAt fetches the statediff payloads at the given block heights
|
// FetchAt fetches the statediff payloads at the given block heights
|
||||||
// Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error)
|
// Calls StateDiffAt(ctx context.Context, blockNumber uint64, params Params) (*Payload, error)
|
||||||
func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) {
|
func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) {
|
||||||
batch := make([]rpc.BatchElem, 0)
|
batch := make([]rpc.BatchElem, 0)
|
||||||
for _, height := range blockHeights {
|
for _, height := range blockHeights {
|
||||||
batch = append(batch, rpc.BatchElem{
|
batch = append(batch, rpc.BatchElem{
|
||||||
Method: method,
|
Method: method,
|
||||||
Args: []interface{}{height},
|
Args: []interface{}{height, fetcher.params},
|
||||||
Result: new(statediff.Payload),
|
Result: new(statediff.Payload),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), fetcher.timeout)
|
ctx, cancel := context.WithTimeout(context.Background(), fetcher.timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
batchErr := fetcher.client.BatchCallContext(ctx, batch)
|
if err := fetcher.client.BatchCallContext(ctx, batch); err != nil {
|
||||||
if batchErr != nil {
|
return nil, fmt.Errorf("ethereum PayloadFetcher batch err for block range %d-%d: %s", blockHeights[0], blockHeights[len(blockHeights)-1], err.Error())
|
||||||
return nil, fmt.Errorf("ethereum PayloadFetcher batch err for block range %d-%d: %s", blockHeights[0], blockHeights[len(blockHeights)-1], batchErr.Error())
|
|
||||||
}
|
}
|
||||||
results := make([]shared.RawChainData, 0, len(blockHeights))
|
results := make([]shared.RawChainData, 0, len(blockHeights))
|
||||||
for _, batchElem := range batch {
|
for _, batchElem := range batch {
|
||||||
|
@ -36,10 +36,10 @@ var _ = Describe("StateDiffFetcher", func() {
|
|||||||
)
|
)
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
mc = new(mocks.BackFillerClient)
|
mc = new(mocks.BackFillerClient)
|
||||||
setDiffAtErr1 := mc.SetReturnDiffAt(test_data.BlockNumber.Uint64(), test_data.MockStatediffPayload)
|
err := mc.SetReturnDiffAt(test_data.BlockNumber.Uint64(), test_data.MockStatediffPayload)
|
||||||
Expect(setDiffAtErr1).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
setDiffAtErr2 := mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2)
|
err = mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2)
|
||||||
Expect(setDiffAtErr2).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
stateDiffFetcher = eth.NewPayloadFetcher(mc, time.Second*60)
|
stateDiffFetcher = eth.NewPayloadFetcher(mc, time.Second*60)
|
||||||
})
|
})
|
||||||
It("Batch calls statediff_stateDiffAt", func() {
|
It("Batch calls statediff_stateDiffAt", func() {
|
||||||
@ -47,8 +47,8 @@ var _ = Describe("StateDiffFetcher", func() {
|
|||||||
test_data.BlockNumber.Uint64(),
|
test_data.BlockNumber.Uint64(),
|
||||||
test_data.BlockNumber2.Uint64(),
|
test_data.BlockNumber2.Uint64(),
|
||||||
}
|
}
|
||||||
stateDiffPayloads, fetchErr := stateDiffFetcher.FetchAt(blockHeights)
|
stateDiffPayloads, err := stateDiffFetcher.FetchAt(blockHeights)
|
||||||
Expect(fetchErr).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
Expect(len(stateDiffPayloads)).To(Equal(2))
|
Expect(len(stateDiffPayloads)).To(Equal(2))
|
||||||
payload1, ok := stateDiffPayloads[0].(statediff.Payload)
|
payload1, ok := stateDiffPayloads[0].(statediff.Payload)
|
||||||
Expect(ok).To(BeTrue())
|
Expect(ok).To(BeTrue())
|
||||||
|
@ -19,8 +19,8 @@ package eth
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/state"
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
"github.com/ethereum/go-ethereum/statediff"
|
"github.com/ethereum/go-ethereum/statediff"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
@ -195,8 +195,7 @@ func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx,
|
|||||||
if err := pub.indexer.indexStateAccount(tx, accountModel, stateID); err != nil {
|
if err := pub.indexer.indexStateAccount(tx, accountModel, stateID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
statePathHash := crypto.Keccak256Hash(stateNode.Path)
|
for _, storageNode := range ipldPayload.StorageNodes[common.Bytes2Hex(stateNode.Path)] {
|
||||||
for _, storageNode := range ipldPayload.StorageNodes[statePathHash] {
|
|
||||||
storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.Value)
|
storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
@ -22,7 +22,6 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/state"
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
"github.com/ethereum/go-ethereum/statediff"
|
"github.com/ethereum/go-ethereum/statediff"
|
||||||
|
|
||||||
@ -206,9 +205,9 @@ func (pub *IPLDPublisher) publishReceipts(receipts []*ipld.EthReceipt, receiptTr
|
|||||||
return rctCids, nil
|
return rctCids, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeModel, map[common.Hash]StateAccountModel, error) {
|
func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeModel, map[string]StateAccountModel, error) {
|
||||||
stateNodeCids := make([]StateNodeModel, 0, len(stateNodes))
|
stateNodeCids := make([]StateNodeModel, 0, len(stateNodes))
|
||||||
stateAccounts := make(map[common.Hash]StateAccountModel)
|
stateAccounts := make(map[string]StateAccountModel)
|
||||||
for _, stateNode := range stateNodes {
|
for _, stateNode := range stateNodes {
|
||||||
node, err := ipld.FromStateTrieRLP(stateNode.Value)
|
node, err := ipld.FromStateTrieRLP(stateNode.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -238,8 +237,8 @@ func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeM
|
|||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
// Map state account to the state path hash
|
// Map state account to the state path hash
|
||||||
statePathHash := crypto.Keccak256Hash(stateNode.Path)
|
statePath := common.Bytes2Hex(stateNode.Path)
|
||||||
stateAccounts[statePathHash] = StateAccountModel{
|
stateAccounts[statePath] = StateAccountModel{
|
||||||
Balance: account.Balance.String(),
|
Balance: account.Balance.String(),
|
||||||
Nonce: account.Nonce,
|
Nonce: account.Nonce,
|
||||||
CodeHash: account.CodeHash,
|
CodeHash: account.CodeHash,
|
||||||
@ -250,10 +249,10 @@ func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeM
|
|||||||
return stateNodeCids, stateAccounts, nil
|
return stateNodeCids, stateAccounts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pub *IPLDPublisher) publishStorageNodes(storageNodes map[common.Hash][]TrieNode) (map[common.Hash][]StorageNodeModel, error) {
|
func (pub *IPLDPublisher) publishStorageNodes(storageNodes map[string][]TrieNode) (map[string][]StorageNodeModel, error) {
|
||||||
storageLeafCids := make(map[common.Hash][]StorageNodeModel)
|
storageLeafCids := make(map[string][]StorageNodeModel)
|
||||||
for pathHash, storageTrie := range storageNodes {
|
for path, storageTrie := range storageNodes {
|
||||||
storageLeafCids[pathHash] = make([]StorageNodeModel, 0, len(storageTrie))
|
storageLeafCids[path] = make([]StorageNodeModel, 0, len(storageTrie))
|
||||||
for _, storageNode := range storageTrie {
|
for _, storageNode := range storageTrie {
|
||||||
node, err := ipld.FromStorageTrieRLP(storageNode.Value)
|
node, err := ipld.FromStorageTrieRLP(storageNode.Value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -264,7 +263,7 @@ func (pub *IPLDPublisher) publishStorageNodes(storageNodes map[common.Hash][]Tri
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// Map storage node cids to the state path hash
|
// Map storage node cids to the state path hash
|
||||||
storageLeafCids[pathHash] = append(storageLeafCids[pathHash], StorageNodeModel{
|
storageLeafCids[path] = append(storageLeafCids[path], StorageNodeModel{
|
||||||
Path: storageNode.Path,
|
Path: storageNode.Path,
|
||||||
StorageKey: storageNode.LeafKey.Hex(),
|
StorageKey: storageNode.LeafKey.Hex(),
|
||||||
CID: cid,
|
CID: cid,
|
||||||
|
@ -38,12 +38,20 @@ type StreamClient interface {
|
|||||||
// PayloadStreamer satisfies the PayloadStreamer interface for ethereum
|
// PayloadStreamer satisfies the PayloadStreamer interface for ethereum
|
||||||
type PayloadStreamer struct {
|
type PayloadStreamer struct {
|
||||||
Client StreamClient
|
Client StreamClient
|
||||||
|
params statediff.Params
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPayloadStreamer creates a pointer to a new PayloadStreamer which satisfies the PayloadStreamer interface for ethereum
|
// NewPayloadStreamer creates a pointer to a new PayloadStreamer which satisfies the PayloadStreamer interface for ethereum
|
||||||
func NewPayloadStreamer(client StreamClient) *PayloadStreamer {
|
func NewPayloadStreamer(client StreamClient) *PayloadStreamer {
|
||||||
return &PayloadStreamer{
|
return &PayloadStreamer{
|
||||||
Client: client,
|
Client: client,
|
||||||
|
params: statediff.Params{
|
||||||
|
IncludeBlock: true,
|
||||||
|
IncludeTD: true,
|
||||||
|
IncludeReceipts: true,
|
||||||
|
IntermediateStorageNodes: true,
|
||||||
|
IntermediateStateNodes: true,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -60,5 +68,5 @@ func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return ps.Client.Subscribe(context.Background(), "statediff", stateDiffChan, "stream")
|
return ps.Client.Subscribe(context.Background(), "statediff", stateDiffChan, "stream", ps.params)
|
||||||
}
|
}
|
||||||
|
@ -37,7 +37,7 @@ type ConvertedPayload struct {
|
|||||||
Receipts types.Receipts
|
Receipts types.Receipts
|
||||||
ReceiptMetaData []ReceiptModel
|
ReceiptMetaData []ReceiptModel
|
||||||
StateNodes []TrieNode
|
StateNodes []TrieNode
|
||||||
StorageNodes map[common.Hash][]TrieNode
|
StorageNodes map[string][]TrieNode
|
||||||
}
|
}
|
||||||
|
|
||||||
// Height satisfies the StreamedIPLDs interface
|
// Height satisfies the StreamedIPLDs interface
|
||||||
@ -62,8 +62,8 @@ type CIDPayload struct {
|
|||||||
TransactionCIDs []TxModel
|
TransactionCIDs []TxModel
|
||||||
ReceiptCIDs map[common.Hash]ReceiptModel
|
ReceiptCIDs map[common.Hash]ReceiptModel
|
||||||
StateNodeCIDs []StateNodeModel
|
StateNodeCIDs []StateNodeModel
|
||||||
StateAccounts map[common.Hash]StateAccountModel
|
StateAccounts map[string]StateAccountModel
|
||||||
StorageNodeCIDs map[common.Hash][]StorageNodeModel
|
StorageNodeCIDs map[string][]StorageNodeModel
|
||||||
}
|
}
|
||||||
|
|
||||||
// CIDWrapper is used to direct fetching of IPLDs from IPFS
|
// CIDWrapper is used to direct fetching of IPLDs from IPFS
|
||||||
|
@ -60,8 +60,6 @@ type Config struct {
|
|||||||
BatchSize uint64 // BatchSize for the resync http calls (client has to support batch sizing)
|
BatchSize uint64 // BatchSize for the resync http calls (client has to support batch sizing)
|
||||||
Timeout time.Duration // HTTP connection timeout in seconds
|
Timeout time.Duration // HTTP connection timeout in seconds
|
||||||
BatchNumber uint64
|
BatchNumber uint64
|
||||||
|
|
||||||
Quit chan bool // Channel for shutting down
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewReSyncConfig fills and returns a resync config from toml parameters
|
// NewReSyncConfig fills and returns a resync config from toml parameters
|
||||||
@ -136,7 +134,6 @@ func NewReSyncConfig() (*Config, error) {
|
|||||||
db := utils.LoadPostgres(c.DBConfig, c.NodeInfo)
|
db := utils.LoadPostgres(c.DBConfig, c.NodeInfo)
|
||||||
c.DB = &db
|
c.DB = &db
|
||||||
|
|
||||||
c.Quit = make(chan bool)
|
|
||||||
c.BatchSize = uint64(viper.GetInt64("resync.batchSize"))
|
c.BatchSize = uint64(viper.GetInt64("resync.batchSize"))
|
||||||
c.BatchNumber = uint64(viper.GetInt64("resync.batchNumber"))
|
c.BatchNumber = uint64(viper.GetInt64("resync.batchNumber"))
|
||||||
return c, nil
|
return c, nil
|
||||||
|
@ -18,8 +18,6 @@ package resync
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
utils "github.com/vulcanize/vulcanizedb/libraries/shared/utilities"
|
utils "github.com/vulcanize/vulcanizedb/libraries/shared/utilities"
|
||||||
@ -49,7 +47,7 @@ type Service struct {
|
|||||||
// Number of goroutines
|
// Number of goroutines
|
||||||
BatchNumber int64
|
BatchNumber int64
|
||||||
// Channel for receiving quit signal
|
// Channel for receiving quit signal
|
||||||
QuitChan chan bool
|
quitChan chan bool
|
||||||
// Chain type
|
// Chain type
|
||||||
chain shared.ChainType
|
chain shared.ChainType
|
||||||
// Resync data type
|
// Resync data type
|
||||||
@ -105,7 +103,7 @@ func NewResyncService(settings *Config) (Resync, error) {
|
|||||||
Cleaner: cleaner,
|
Cleaner: cleaner,
|
||||||
BatchSize: batchSize,
|
BatchSize: batchSize,
|
||||||
BatchNumber: int64(batchNumber),
|
BatchNumber: int64(batchNumber),
|
||||||
QuitChan: settings.Quit,
|
quitChan: make(chan bool),
|
||||||
chain: settings.Chain,
|
chain: settings.Chain,
|
||||||
ranges: settings.Ranges,
|
ranges: settings.Ranges,
|
||||||
data: settings.ResyncType,
|
data: settings.ResyncType,
|
||||||
@ -127,81 +125,60 @@ func (rs *Service) Resync() error {
|
|||||||
return fmt.Errorf("%s %s data resync cleaning error: %v", rs.chain.String(), rs.data.String(), err)
|
return fmt.Errorf("%s %s data resync cleaning error: %v", rs.chain.String(), rs.data.String(), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// spin up worker goroutines
|
||||||
|
heightsChan := make(chan []uint64)
|
||||||
|
for i := 1; i <= int(rs.BatchNumber); i++ {
|
||||||
|
go rs.resync(i, heightsChan)
|
||||||
|
}
|
||||||
for _, rng := range rs.ranges {
|
for _, rng := range rs.ranges {
|
||||||
if err := rs.resync(rng[0], rng[1]); err != nil {
|
if rng[1] < rng[0] {
|
||||||
return fmt.Errorf("%s %s data resync initialization error: %v", rs.chain.String(), rs.data.String(), err)
|
logrus.Errorf("%s resync range ending block number needs to be greater than the starting block number", rs.chain.String())
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
logrus.Infof("resyncing %s data from %d to %d", rs.chain.String(), rng[0], rng[1])
|
||||||
|
// break the range up into bins of smaller ranges
|
||||||
|
blockRangeBins, err := utils.GetBlockHeightBins(rng[0], rng[1], rs.BatchSize)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, heights := range blockRangeBins {
|
||||||
|
heightsChan <- heights
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// send a quit signal to each worker
|
||||||
|
// this blocks until each worker has finished its current task and can receive from the quit channel
|
||||||
|
for i := 1; i <= int(rs.BatchNumber); i++ {
|
||||||
|
rs.quitChan <- true
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rs *Service) resync(startingBlock, endingBlock uint64) error {
|
func (rs *Service) resync(id int, heightChan chan []uint64) {
|
||||||
logrus.Infof("resyncing %s data from %d to %d", rs.chain.String(), startingBlock, endingBlock)
|
|
||||||
if endingBlock < startingBlock {
|
|
||||||
return fmt.Errorf("%s resync range ending block number needs to be greater than the starting block number", rs.chain.String())
|
|
||||||
}
|
|
||||||
// break the range up into bins of smaller ranges
|
|
||||||
blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, rs.BatchSize)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have
|
|
||||||
var activeCount int64
|
|
||||||
// channel for processing goroutines to signal when they are done
|
|
||||||
processingDone := make(chan bool)
|
|
||||||
forwardDone := make(chan bool)
|
|
||||||
|
|
||||||
// for each block range bin spin up a goroutine to batch fetch and process state diffs for that range
|
|
||||||
go func() {
|
|
||||||
for _, blockHeights := range blockRangeBins {
|
|
||||||
// if we have reached our limit of active goroutines
|
|
||||||
// wait for one to finish before starting the next
|
|
||||||
if atomic.AddInt64(&activeCount, 1) > rs.BatchNumber {
|
|
||||||
// this blocks until a process signals it has finished
|
|
||||||
<-forwardDone
|
|
||||||
}
|
|
||||||
go func(blockHeights []uint64) {
|
|
||||||
payloads, err := rs.Fetcher.FetchAt(blockHeights)
|
|
||||||
if err != nil {
|
|
||||||
logrus.Errorf("%s resync fetcher error: %s", rs.chain.String(), err.Error())
|
|
||||||
}
|
|
||||||
for _, payload := range payloads {
|
|
||||||
ipldPayload, err := rs.Converter.Convert(payload)
|
|
||||||
if err != nil {
|
|
||||||
logrus.Errorf("%s resync converter error: %s", rs.chain.String(), err.Error())
|
|
||||||
}
|
|
||||||
cidPayload, err := rs.Publisher.Publish(ipldPayload)
|
|
||||||
if err != nil {
|
|
||||||
logrus.Errorf("%s resync publisher error: %s", rs.chain.String(), err.Error())
|
|
||||||
}
|
|
||||||
if err := rs.Indexer.Index(cidPayload); err != nil {
|
|
||||||
logrus.Errorf("%s resync indexer error: %s", rs.chain.String(), err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// when this goroutine is done, send out a signal
|
|
||||||
logrus.Infof("finished %s resync section from %d to %d", rs.chain.String(), blockHeights[0], blockHeights[len(blockHeights)-1])
|
|
||||||
processingDone <- true
|
|
||||||
}(blockHeights)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// listen on the processingDone chan and
|
|
||||||
// keep track of the number of processing goroutines that have finished
|
|
||||||
// when they have all finished, sends the final signal out
|
|
||||||
goroutinesFinished := 0
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-processingDone:
|
case heights := <-heightChan:
|
||||||
atomic.AddInt64(&activeCount, -1)
|
logrus.Debugf("%s resync worker %d processing section from %d to %d", rs.chain.String(), id, heights[0], heights[len(heights)-1])
|
||||||
select {
|
payloads, err := rs.Fetcher.FetchAt(heights)
|
||||||
// if we are waiting for a process to finish, signal that one has
|
if err != nil {
|
||||||
case forwardDone <- true:
|
logrus.Errorf("%s resync worker %d fetcher error: %s", rs.chain.String(), id, err.Error())
|
||||||
default:
|
|
||||||
}
|
}
|
||||||
goroutinesFinished++
|
for _, payload := range payloads {
|
||||||
if goroutinesFinished >= len(blockRangeBins) {
|
ipldPayload, err := rs.Converter.Convert(payload)
|
||||||
return nil
|
if err != nil {
|
||||||
|
logrus.Errorf("%s resync worker %d converter error: %s", rs.chain.String(), id, err.Error())
|
||||||
|
}
|
||||||
|
cidPayload, err := rs.Publisher.Publish(ipldPayload)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Errorf("%s resync worker %d publisher error: %s", rs.chain.String(), id, err.Error())
|
||||||
|
}
|
||||||
|
if err := rs.Indexer.Index(cidPayload); err != nil {
|
||||||
|
logrus.Errorf("%s resync worker %d indexer error: %s", rs.chain.String(), id, err.Error())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
logrus.Infof("%s resync worker %d finished section from %d to %d", rs.chain.String(), id, heights[0], heights[len(heights)-1])
|
||||||
|
case <-rs.quitChan:
|
||||||
|
logrus.Infof("%s resync worker %d goroutine shutting down", rs.chain.String(), id)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -93,6 +93,8 @@ type Service struct {
|
|||||||
ipfsPath string
|
ipfsPath string
|
||||||
// Underlying db
|
// Underlying db
|
||||||
db *postgres.DB
|
db *postgres.DB
|
||||||
|
// wg for syncing serve processes
|
||||||
|
serveWg *sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct
|
// NewSuperNode creates a new super_node.Interface using an underlying super_node.Service struct
|
||||||
@ -134,7 +136,7 @@ func NewSuperNode(settings *Config) (SuperNode, error) {
|
|||||||
}
|
}
|
||||||
sn.db = settings.ServeDBConn
|
sn.db = settings.ServeDBConn
|
||||||
}
|
}
|
||||||
sn.QuitChan = settings.Quit
|
sn.QuitChan = make(chan bool)
|
||||||
sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription)
|
sn.Subscriptions = make(map[common.Hash]map[rpc.ID]Subscription)
|
||||||
sn.SubscriptionTypes = make(map[common.Hash]shared.SubscriptionSettings)
|
sn.SubscriptionTypes = make(map[common.Hash]shared.SubscriptionSettings)
|
||||||
sn.WorkerPoolSize = settings.Workers
|
sn.WorkerPoolSize = settings.Workers
|
||||||
@ -195,16 +197,15 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
wg.Add(1)
|
// spin up publishAndIndex worker goroutines
|
||||||
|
|
||||||
// Channels for forwarding data to the publishAndIndex workers
|
|
||||||
publishAndIndexPayload := make(chan shared.ConvertedData, PayloadChanBufferSize)
|
publishAndIndexPayload := make(chan shared.ConvertedData, PayloadChanBufferSize)
|
||||||
// publishAndIndex worker pool to handle publishing and indexing concurrently, while
|
for i := 1; i <= sap.WorkerPoolSize; i++ {
|
||||||
// limiting the number of Postgres connections we can possibly open so as to prevent error
|
go sap.publishAndIndex(wg, i, publishAndIndexPayload)
|
||||||
for i := 0; i < sap.WorkerPoolSize; i++ {
|
log.Debugf("%s publishAndIndex worker %d successfully spun up", sap.chain.String(), i)
|
||||||
sap.publishAndIndex(i, publishAndIndexPayload)
|
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
|
wg.Add(1)
|
||||||
|
defer wg.Done()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case payload := <-sap.PayloadChan:
|
case payload := <-sap.PayloadChan:
|
||||||
@ -230,8 +231,7 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared
|
|||||||
case err := <-sub.Err():
|
case err := <-sub.Err():
|
||||||
log.Errorf("super node subscription error for chain %s: %v", sap.chain.String(), err)
|
log.Errorf("super node subscription error for chain %s: %v", sap.chain.String(), err)
|
||||||
case <-sap.QuitChan:
|
case <-sap.QuitChan:
|
||||||
log.Infof("quiting %s SyncAndPublish process", sap.chain.String())
|
log.Infof("quiting %s Sync process", sap.chain.String())
|
||||||
wg.Done()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -242,41 +242,44 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared
|
|||||||
|
|
||||||
// publishAndIndex is spun up by SyncAndConvert and receives converted chain data from that process
|
// publishAndIndex is spun up by SyncAndConvert and receives converted chain data from that process
|
||||||
// it publishes this data to IPFS and indexes their CIDs with useful metadata in Postgres
|
// it publishes this data to IPFS and indexes their CIDs with useful metadata in Postgres
|
||||||
func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared.ConvertedData) {
|
func (sap *Service) publishAndIndex(wg *sync.WaitGroup, id int, publishAndIndexPayload <-chan shared.ConvertedData) {
|
||||||
go func() {
|
wg.Add(1)
|
||||||
for {
|
defer wg.Done()
|
||||||
select {
|
for {
|
||||||
case payload := <-publishAndIndexPayload:
|
select {
|
||||||
log.Debugf("publishing %s data streamed at head height %d", sap.chain.String(), payload.Height())
|
case payload := <-publishAndIndexPayload:
|
||||||
cidPayload, err := sap.Publisher.Publish(payload)
|
log.Debugf("%s super node publishAndIndex worker %d publishing data streamed at head height %d", sap.chain.String(), id, payload.Height())
|
||||||
if err != nil {
|
cidPayload, err := sap.Publisher.Publish(payload)
|
||||||
log.Errorf("super node publishAndIndex worker %d publishing error for chain %s: %v", id, sap.chain.String(), err)
|
if err != nil {
|
||||||
continue
|
log.Errorf("%s super node publishAndIndex worker %d publishing error: %v", sap.chain.String(), id, err)
|
||||||
}
|
continue
|
||||||
log.Debugf("indexing %s data streamed at head height %d", sap.chain.String(), payload.Height())
|
|
||||||
if err := sap.Indexer.Index(cidPayload); err != nil {
|
|
||||||
log.Errorf("super node publishAndIndex worker %d indexing error for chain %s: %v", id, sap.chain.String(), err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
log.Debugf("%s super node publishAndIndex worker %d indexing data streamed at head height %d", sap.chain.String(), id, payload.Height())
|
||||||
|
if err := sap.Indexer.Index(cidPayload); err != nil {
|
||||||
|
log.Errorf("%s super node publishAndIndex worker %d indexing error: %v", sap.chain.String(), id, err)
|
||||||
|
}
|
||||||
|
case <-sap.QuitChan:
|
||||||
|
log.Infof("%s super node publishAndIndex worker %d shutting down", sap.chain.String(), id)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}()
|
}
|
||||||
log.Debugf("%s publishAndIndex goroutine successfully spun up", sap.chain.String())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Serve listens for incoming converter data off the screenAndServePayload from the SyncAndConvert process
|
// Serve listens for incoming converter data off the screenAndServePayload from the Sync process
|
||||||
// It filters and sends this data to any subscribers to the service
|
// It filters and sends this data to any subscribers to the service
|
||||||
// This process can be stood up alone, without an screenAndServePayload attached to a SyncAndConvert process
|
// This process can also be stood up alone, without an screenAndServePayload attached to a Sync process
|
||||||
// and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only
|
// and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only
|
||||||
func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) {
|
func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan shared.ConvertedData) {
|
||||||
wg.Add(1)
|
sap.serveWg = wg
|
||||||
go func() {
|
go func() {
|
||||||
|
wg.Add(1)
|
||||||
|
defer wg.Done()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case payload := <-screenAndServePayload:
|
case payload := <-screenAndServePayload:
|
||||||
sap.filterAndServe(payload)
|
sap.filterAndServe(payload)
|
||||||
case <-sap.QuitChan:
|
case <-sap.QuitChan:
|
||||||
log.Infof("quiting %s ScreenAndServe process", sap.chain.String())
|
log.Infof("quiting %s Serve process", sap.chain.String())
|
||||||
wg.Done()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -286,8 +289,11 @@ func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan share
|
|||||||
|
|
||||||
// filterAndServe filters the payload according to each subscription type and sends to the subscriptions
|
// filterAndServe filters the payload according to each subscription type and sends to the subscriptions
|
||||||
func (sap *Service) filterAndServe(payload shared.ConvertedData) {
|
func (sap *Service) filterAndServe(payload shared.ConvertedData) {
|
||||||
log.Debugf("Sending %s payload to subscriptions", sap.chain.String())
|
log.Debugf("sending %s payload to subscriptions", sap.chain.String())
|
||||||
sap.Lock()
|
sap.Lock()
|
||||||
|
sap.serveWg.Add(1)
|
||||||
|
defer sap.Unlock()
|
||||||
|
defer sap.serveWg.Done()
|
||||||
for ty, subs := range sap.Subscriptions {
|
for ty, subs := range sap.Subscriptions {
|
||||||
// Retrieve the subscription parameters for this subscription type
|
// Retrieve the subscription parameters for this subscription type
|
||||||
subConfig, ok := sap.SubscriptionTypes[ty]
|
subConfig, ok := sap.SubscriptionTypes[ty]
|
||||||
@ -322,12 +328,13 @@ func (sap *Service) filterAndServe(payload shared.ConvertedData) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sap.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe is used by the API to remotely subscribe to the service loop
|
// Subscribe is used by the API to remotely subscribe to the service loop
|
||||||
// The params must be rlp serializable and satisfy the SubscriptionSettings() interface
|
// The params must be rlp serializable and satisfy the SubscriptionSettings() interface
|
||||||
func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) {
|
func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params shared.SubscriptionSettings) {
|
||||||
|
sap.serveWg.Add(1)
|
||||||
|
defer sap.serveWg.Done()
|
||||||
log.Infof("New %s subscription %s", sap.chain.String(), id)
|
log.Infof("New %s subscription %s", sap.chain.String(), id)
|
||||||
subscription := Subscription{
|
subscription := Subscription{
|
||||||
ID: id,
|
ID: id,
|
||||||
@ -361,7 +368,7 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitCha
|
|||||||
// Otherwise we only filter new data as it is streamed in from the state diffing geth node
|
// Otherwise we only filter new data as it is streamed in from the state diffing geth node
|
||||||
if params.HistoricalData() || params.HistoricalDataOnly() {
|
if params.HistoricalData() || params.HistoricalDataOnly() {
|
||||||
if err := sap.sendHistoricalData(subscription, id, params); err != nil {
|
if err := sap.sendHistoricalData(subscription, id, params); err != nil {
|
||||||
sendNonBlockingErr(subscription, fmt.Errorf("super node subscriber backfill error for chain %s: %v", sap.chain.String(), err))
|
sendNonBlockingErr(subscription, fmt.Errorf("%s super node subscriber backfill error: %v", sap.chain.String(), err))
|
||||||
sendNonBlockingQuit(subscription)
|
sendNonBlockingQuit(subscription)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -392,10 +399,18 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
|
|||||||
log.Debugf("%s historical data starting block: %d", sap.chain.String(), params.StartingBlock().Int64())
|
log.Debugf("%s historical data starting block: %d", sap.chain.String(), params.StartingBlock().Int64())
|
||||||
log.Debugf("%s historical data ending block: %d", sap.chain.String(), endingBlock)
|
log.Debugf("%s historical data ending block: %d", sap.chain.String(), endingBlock)
|
||||||
go func() {
|
go func() {
|
||||||
|
sap.serveWg.Add(1)
|
||||||
|
defer sap.serveWg.Done()
|
||||||
for i := startingBlock; i <= endingBlock; i++ {
|
for i := startingBlock; i <= endingBlock; i++ {
|
||||||
|
select {
|
||||||
|
case <-sap.QuitChan:
|
||||||
|
log.Infof("%s super node historical data feed to subscription %s closed", sap.chain.String(), id)
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
cidWrappers, empty, err := sap.Retriever.Retrieve(params, i)
|
cidWrappers, empty, err := sap.Retriever.Retrieve(params, i)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sendNonBlockingErr(sub, fmt.Errorf("super node %s CID Retrieval error at block %d\r%s", sap.chain.String(), i, err.Error()))
|
sendNonBlockingErr(sub, fmt.Errorf(" %s super node CID Retrieval error at block %d\r%s", sap.chain.String(), i, err.Error()))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if empty {
|
if empty {
|
||||||
@ -404,7 +419,7 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
|
|||||||
for _, cids := range cidWrappers {
|
for _, cids := range cidWrappers {
|
||||||
response, err := sap.IPLDFetcher.Fetch(cids)
|
response, err := sap.IPLDFetcher.Fetch(cids)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sendNonBlockingErr(sub, fmt.Errorf("super node %s IPLD Fetching error at block %d\r%s", sap.chain.String(), i, err.Error()))
|
sendNonBlockingErr(sub, fmt.Errorf("%s super node IPLD Fetching error at block %d\r%s", sap.chain.String(), i, err.Error()))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
responseRLP, err := rlp.EncodeToBytes(response)
|
responseRLP, err := rlp.EncodeToBytes(response)
|
||||||
@ -416,16 +431,16 @@ func (sap *Service) sendHistoricalData(sub Subscription, id rpc.ID, params share
|
|||||||
case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}:
|
case sub.PayloadChan <- SubscriptionPayload{Data: responseRLP, Err: "", Flag: EmptyFlag, Height: response.Height()}:
|
||||||
log.Debugf("sending super node historical data payload to %s subscription %s", sap.chain.String(), id)
|
log.Debugf("sending super node historical data payload to %s subscription %s", sap.chain.String(), id)
|
||||||
default:
|
default:
|
||||||
log.Infof("unable to send back-fill payload to %s subscription %s; channel has no receiver", sap.chain.String(), id)
|
log.Infof("unable to send backFill payload to %s subscription %s; channel has no receiver", sap.chain.String(), id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// when we are done backfilling send an empty payload signifying so in the msg
|
// when we are done backfilling send an empty payload signifying so in the msg
|
||||||
select {
|
select {
|
||||||
case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Flag: BackFillCompleteFlag}:
|
case sub.PayloadChan <- SubscriptionPayload{Data: nil, Err: "", Flag: BackFillCompleteFlag}:
|
||||||
log.Debugf("sending backfill completion notice to %s subscription %s", sap.chain.String(), id)
|
log.Debugf("sending backFill completion notice to %s subscription %s", sap.chain.String(), id)
|
||||||
default:
|
default:
|
||||||
log.Infof("unable to send backfill completion notice to %s subscription %s", sap.chain.String(), id)
|
log.Infof("unable to send backFill completion notice to %s subscription %s", sap.chain.String(), id)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return nil
|
return nil
|
||||||
|
@ -66,7 +66,7 @@ var _ = Describe("Service", func() {
|
|||||||
err := processor.Sync(wg, nil)
|
err := processor.Sync(wg, nil)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
quitChan <- true
|
close(quitChan)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload))
|
Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload))
|
||||||
Expect(len(mockCidIndexer.PassedCIDPayload)).To(Equal(1))
|
Expect(len(mockCidIndexer.PassedCIDPayload)).To(Equal(1))
|
||||||
|
@ -56,7 +56,7 @@ func (pc *WatcherConverter) Convert(ethIPLDs eth.IPLDs) (*eth.CIDPayload, error)
|
|||||||
cids.TransactionCIDs = make([]eth.TxModel, numTxs)
|
cids.TransactionCIDs = make([]eth.TxModel, numTxs)
|
||||||
cids.ReceiptCIDs = make(map[common.Hash]eth.ReceiptModel, numTxs)
|
cids.ReceiptCIDs = make(map[common.Hash]eth.ReceiptModel, numTxs)
|
||||||
cids.StateNodeCIDs = make([]eth.StateNodeModel, len(ethIPLDs.StateNodes))
|
cids.StateNodeCIDs = make([]eth.StateNodeModel, len(ethIPLDs.StateNodes))
|
||||||
cids.StorageNodeCIDs = make(map[common.Hash][]eth.StorageNodeModel, len(ethIPLDs.StateNodes))
|
cids.StorageNodeCIDs = make(map[string][]eth.StorageNodeModel, len(ethIPLDs.StateNodes))
|
||||||
|
|
||||||
// Unpack header
|
// Unpack header
|
||||||
var header types.Header
|
var header types.Header
|
||||||
@ -164,7 +164,7 @@ func (pc *WatcherConverter) Convert(ethIPLDs eth.IPLDs) (*eth.CIDPayload, error)
|
|||||||
}
|
}
|
||||||
// Storage data
|
// Storage data
|
||||||
for _, storageIPLD := range ethIPLDs.StorageNodes {
|
for _, storageIPLD := range ethIPLDs.StorageNodes {
|
||||||
cids.StorageNodeCIDs[storageIPLD.StateLeafKey] = append(cids.StorageNodeCIDs[storageIPLD.StateLeafKey], eth.StorageNodeModel{
|
cids.StorageNodeCIDs[storageIPLD.StateLeafKey.Hex()] = append(cids.StorageNodeCIDs[storageIPLD.StateLeafKey.Hex()], eth.StorageNodeModel{
|
||||||
CID: storageIPLD.IPLD.CID,
|
CID: storageIPLD.IPLD.CID,
|
||||||
NodeType: eth.ResolveFromNodeType(storageIPLD.Type),
|
NodeType: eth.ResolveFromNodeType(storageIPLD.Type),
|
||||||
StorageKey: storageIPLD.StorageLeafKey.String(),
|
StorageKey: storageIPLD.StorageLeafKey.String(),
|
||||||
|
Loading…
Reference in New Issue
Block a user