Merge pull request #86 from vulcanize/vdb-371-recheck-queued-storage
(VDB-371) recheck queued storage
This commit is contained in:
commit
782e3fd095
@ -16,16 +16,20 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"os"
|
||||||
|
"plugin"
|
||||||
|
syn "sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
|
||||||
|
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
|
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/fs"
|
"github.com/vulcanize/vulcanizedb/pkg/fs"
|
||||||
p2 "github.com/vulcanize/vulcanizedb/pkg/plugin"
|
p2 "github.com/vulcanize/vulcanizedb/pkg/plugin"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/plugin/helpers"
|
"github.com/vulcanize/vulcanizedb/pkg/plugin/helpers"
|
||||||
"github.com/vulcanize/vulcanizedb/utils"
|
"github.com/vulcanize/vulcanizedb/utils"
|
||||||
"os"
|
|
||||||
"plugin"
|
|
||||||
syn "sync"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// composeAndExecuteCmd represents the composeAndExecute command
|
// composeAndExecuteCmd represents the composeAndExecute command
|
||||||
@ -170,7 +174,8 @@ func composeAndExecute() {
|
|||||||
|
|
||||||
if len(ethStorageInitializers) > 0 {
|
if len(ethStorageInitializers) > 0 {
|
||||||
tailer := fs.FileTailer{Path: storageDiffsPath}
|
tailer := fs.FileTailer{Path: storageDiffsPath}
|
||||||
sw := watcher.NewStorageWatcher(tailer, &db)
|
storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer)
|
||||||
|
sw := watcher.NewStorageWatcher(storageFetcher, &db)
|
||||||
sw.AddTransformers(ethStorageInitializers)
|
sw.AddTransformers(ethStorageInitializers)
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go watchEthStorage(&sw, &wg)
|
go watchEthStorage(&sw, &wg)
|
||||||
@ -187,5 +192,6 @@ func composeAndExecute() {
|
|||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
rootCmd.AddCommand(composeAndExecuteCmd)
|
rootCmd.AddCommand(composeAndExecuteCmd)
|
||||||
composeAndExecuteCmd.Flags().BoolVar(&recheckHeadersArg, "recheckHeaders", false, "checks headers that are already checked for each transformer.")
|
composeAndExecuteCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events")
|
||||||
|
composeAndExecuteCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5 * time.Minute, "how often to recheck queued storage diffs")
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,8 @@ import (
|
|||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/constants"
|
"github.com/vulcanize/vulcanizedb/libraries/shared/constants"
|
||||||
|
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
|
||||||
|
storageUtils "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
|
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
|
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/fs"
|
"github.com/vulcanize/vulcanizedb/pkg/fs"
|
||||||
@ -118,7 +120,8 @@ func execute() {
|
|||||||
|
|
||||||
if len(ethStorageInitializers) > 0 {
|
if len(ethStorageInitializers) > 0 {
|
||||||
tailer := fs.FileTailer{Path: storageDiffsPath}
|
tailer := fs.FileTailer{Path: storageDiffsPath}
|
||||||
sw := watcher.NewStorageWatcher(tailer, &db)
|
storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer)
|
||||||
|
sw := watcher.NewStorageWatcher(storageFetcher, &db)
|
||||||
sw.AddTransformers(ethStorageInitializers)
|
sw.AddTransformers(ethStorageInitializers)
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go watchEthStorage(&sw, &wg)
|
go watchEthStorage(&sw, &wg)
|
||||||
@ -135,7 +138,8 @@ func execute() {
|
|||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
rootCmd.AddCommand(executeCmd)
|
rootCmd.AddCommand(executeCmd)
|
||||||
executeCmd.Flags().BoolVar(&recheckHeadersArg, "recheckHeaders", false, "checks headers that are already checked for each transformer.")
|
executeCmd.Flags().BoolVarP(&recheckHeadersArg, "recheck-headers", "r", false, "whether to re-check headers for watched events")
|
||||||
|
executeCmd.Flags().DurationVarP(&queueRecheckInterval, "queue-recheck-interval", "q", 5 * time.Minute, "how often to recheck queued storage diffs")
|
||||||
}
|
}
|
||||||
|
|
||||||
type Exporter interface {
|
type Exporter interface {
|
||||||
@ -166,7 +170,9 @@ func watchEthStorage(w *watcher.StorageWatcher, wg *syn.WaitGroup) {
|
|||||||
ticker := time.NewTicker(pollingInterval)
|
ticker := time.NewTicker(pollingInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
for range ticker.C {
|
for range ticker.C {
|
||||||
w.Execute()
|
errs := make(chan error)
|
||||||
|
rows := make(chan storageUtils.StorageDiffRow)
|
||||||
|
w.Execute(rows, errs, queueRecheckInterval)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,6 +41,7 @@ var (
|
|||||||
genConfig config.Plugin
|
genConfig config.Plugin
|
||||||
ipc string
|
ipc string
|
||||||
levelDbPath string
|
levelDbPath string
|
||||||
|
queueRecheckInterval time.Duration
|
||||||
startingBlockNumber int64
|
startingBlockNumber int64
|
||||||
storageDiffsPath string
|
storageDiffsPath string
|
||||||
syncAll bool
|
syncAll bool
|
||||||
@ -49,8 +50,8 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
pollingInterval = 7 * time.Second
|
pollingInterval = 7 * time.Second
|
||||||
validationWindow = 15
|
validationWindow = 15
|
||||||
)
|
)
|
||||||
|
|
||||||
var rootCmd = &cobra.Command{
|
var rootCmd = &cobra.Command{
|
||||||
|
@ -36,6 +36,20 @@ composeAndExecute:
|
|||||||
|
|
||||||
`./vulcanizedb composeAndExecute --config=./environments/config_name.toml`
|
`./vulcanizedb composeAndExecute --config=./environments/config_name.toml`
|
||||||
|
|
||||||
|
## Flags
|
||||||
|
|
||||||
|
The `compose` and `composeAndExecute` commands can be passed optional flags to specify the operation of the watchers:
|
||||||
|
|
||||||
|
- `--recheck-headers`/`-r` - specifies whether to re-check headers for events after the header has already been queried for watched logs.
|
||||||
|
Can be useful for redundancy if you suspect that your node is not always returning all desired logs on every query.
|
||||||
|
Argument is expected to be a boolean: e.g. `-r=true`.
|
||||||
|
Defaults to `false`.
|
||||||
|
|
||||||
|
- `query-recheck-interval`/`-q` - specifies interval for re-checking storage diffs that haven been queued for later processing
|
||||||
|
(by default, the storage watched queues storage diffs if transformer execution fails, on the assumption that subsequent data derived from the event transformers may enable us to decode storage keys that we don't recognize right now).
|
||||||
|
Argument is expected to be a duration (integer measured in nanoseconds): e.g. `-q=10m30s` (for 10 minute, 30 second intervals).
|
||||||
|
Defaults to `5m` (5 minutes).
|
||||||
|
|
||||||
## Configuration
|
## Configuration
|
||||||
A .toml config file is specified when executing the commands.
|
A .toml config file is specified when executing the commands.
|
||||||
The config provides information for composing a set of transformers from external repositories:
|
The config provides information for composing a set of transformers from external repositories:
|
||||||
|
@ -24,22 +24,22 @@ import (
|
|||||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||||
)
|
)
|
||||||
|
|
||||||
type LogFetcher interface {
|
type ILogFetcher interface {
|
||||||
FetchLogs(contractAddresses []common.Address, topics []common.Hash, missingHeader core.Header) ([]types.Log, error)
|
FetchLogs(contractAddresses []common.Address, topics []common.Hash, missingHeader core.Header) ([]types.Log, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Fetcher struct {
|
type LogFetcher struct {
|
||||||
blockChain core.BlockChain
|
blockChain core.BlockChain
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFetcher(blockchain core.BlockChain) *Fetcher {
|
func NewLogFetcher(blockchain core.BlockChain) *LogFetcher {
|
||||||
return &Fetcher{
|
return &LogFetcher{
|
||||||
blockChain: blockchain,
|
blockChain: blockchain,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Checks all topic0s, on all addresses, fetching matching logs for the given header
|
// Checks all topic0s, on all addresses, fetching matching logs for the given header
|
||||||
func (fetcher Fetcher) FetchLogs(addresses []common.Address, topic0s []common.Hash, header core.Header) ([]types.Log, error) {
|
func (logFetcher LogFetcher) FetchLogs(addresses []common.Address, topic0s []common.Hash, header core.Header) ([]types.Log, error) {
|
||||||
blockHash := common.HexToHash(header.Hash)
|
blockHash := common.HexToHash(header.Hash)
|
||||||
query := ethereum.FilterQuery{
|
query := ethereum.FilterQuery{
|
||||||
BlockHash: &blockHash,
|
BlockHash: &blockHash,
|
||||||
@ -48,7 +48,7 @@ func (fetcher Fetcher) FetchLogs(addresses []common.Address, topic0s []common.Ha
|
|||||||
Topics: [][]common.Hash{topic0s},
|
Topics: [][]common.Hash{topic0s},
|
||||||
}
|
}
|
||||||
|
|
||||||
logs, err := fetcher.blockChain.GetEthLogsWithCustomQuery(query)
|
logs, err := logFetcher.blockChain.GetEthLogsWithCustomQuery(query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO review aggregate fetching error handling
|
// TODO review aggregate fetching error handling
|
||||||
return []types.Log{}, err
|
return []types.Log{}, err
|
||||||
|
@ -22,16 +22,16 @@ import (
|
|||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
fetch "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
|
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
"github.com/vulcanize/vulcanizedb/pkg/core"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/fakes"
|
"github.com/vulcanize/vulcanizedb/pkg/fakes"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = Describe("Fetcher", func() {
|
var _ = Describe("LogFetcher", func() {
|
||||||
Describe("FetchLogs", func() {
|
Describe("FetchLogs", func() {
|
||||||
It("fetches logs based on the given query", func() {
|
It("fetches logs based on the given query", func() {
|
||||||
blockChain := fakes.NewMockBlockChain()
|
blockChain := fakes.NewMockBlockChain()
|
||||||
fetcher := fetch.NewFetcher(blockChain)
|
logFetcher := fetcher.NewLogFetcher(blockChain)
|
||||||
header := fakes.FakeHeader
|
header := fakes.FakeHeader
|
||||||
|
|
||||||
addresses := []common.Address{
|
addresses := []common.Address{
|
||||||
@ -41,7 +41,7 @@ var _ = Describe("Fetcher", func() {
|
|||||||
|
|
||||||
topicZeros := []common.Hash{common.BytesToHash([]byte{1, 2, 3, 4, 5})}
|
topicZeros := []common.Hash{common.BytesToHash([]byte{1, 2, 3, 4, 5})}
|
||||||
|
|
||||||
_, err := fetcher.FetchLogs(addresses, topicZeros, header)
|
_, err := logFetcher.FetchLogs(addresses, topicZeros, header)
|
||||||
|
|
||||||
address1 := common.HexToAddress("0xfakeAddress")
|
address1 := common.HexToAddress("0xfakeAddress")
|
||||||
address2 := common.HexToAddress("0xanotherFakeAddress")
|
address2 := common.HexToAddress("0xanotherFakeAddress")
|
||||||
@ -59,9 +59,9 @@ var _ = Describe("Fetcher", func() {
|
|||||||
It("returns an error if fetching the logs fails", func() {
|
It("returns an error if fetching the logs fails", func() {
|
||||||
blockChain := fakes.NewMockBlockChain()
|
blockChain := fakes.NewMockBlockChain()
|
||||||
blockChain.SetGetEthLogsWithCustomQueryErr(fakes.FakeError)
|
blockChain.SetGetEthLogsWithCustomQueryErr(fakes.FakeError)
|
||||||
fetcher := fetch.NewFetcher(blockChain)
|
logFetcher := fetcher.NewLogFetcher(blockChain)
|
||||||
|
|
||||||
_, err := fetcher.FetchLogs([]common.Address{}, []common.Hash{}, core.Header{})
|
_, err := logFetcher.FetchLogs([]common.Address{}, []common.Hash{}, core.Header{})
|
||||||
|
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
Expect(err).To(MatchError(fakes.FakeError))
|
Expect(err).To(MatchError(fakes.FakeError))
|
||||||
|
50
libraries/shared/fetcher/storage_fetcher.go
Normal file
50
libraries/shared/fetcher/storage_fetcher.go
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package fetcher
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/fs"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type IStorageFetcher interface {
|
||||||
|
FetchStorageDiffs(chan<- utils.StorageDiffRow, chan<- error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type CsvTailStorageFetcher struct {
|
||||||
|
tailer fs.Tailer
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCsvTailStorageFetcher(tailer fs.Tailer) CsvTailStorageFetcher {
|
||||||
|
return CsvTailStorageFetcher{tailer: tailer}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (storageFetcher CsvTailStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffRow, errs chan<- error) {
|
||||||
|
t, tailErr := storageFetcher.tailer.Tail()
|
||||||
|
if tailErr != nil {
|
||||||
|
errs <- tailErr
|
||||||
|
}
|
||||||
|
for line := range t.Lines {
|
||||||
|
row, parseErr := utils.FromStrings(strings.Split(line.Text, ","))
|
||||||
|
if parseErr != nil {
|
||||||
|
errs <- parseErr
|
||||||
|
} else {
|
||||||
|
out <- row
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
99
libraries/shared/fetcher/storage_fetcher_test.go
Normal file
99
libraries/shared/fetcher/storage_fetcher_test.go
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package fetcher_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/hpcloud/tail"
|
||||||
|
. "github.com/onsi/ginkgo"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
|
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
|
||||||
|
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/fakes"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("Csv Tail Storage Fetcher", func() {
|
||||||
|
var (
|
||||||
|
errorsChannel chan error
|
||||||
|
mockTailer *fakes.MockTailer
|
||||||
|
rowsChannel chan utils.StorageDiffRow
|
||||||
|
storageFetcher fetcher.CsvTailStorageFetcher
|
||||||
|
)
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
errorsChannel = make(chan error)
|
||||||
|
rowsChannel = make(chan utils.StorageDiffRow)
|
||||||
|
mockTailer = fakes.NewMockTailer()
|
||||||
|
storageFetcher = fetcher.NewCsvTailStorageFetcher(mockTailer)
|
||||||
|
})
|
||||||
|
|
||||||
|
It("adds error to errors channel if tailing file fails", func(done Done) {
|
||||||
|
mockTailer.TailErr = fakes.FakeError
|
||||||
|
|
||||||
|
go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel)
|
||||||
|
|
||||||
|
Expect(<-errorsChannel).To(MatchError(fakes.FakeError))
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
|
||||||
|
It("adds parsed csv row to rows channel for storage diff", func(done Done) {
|
||||||
|
line := getFakeLine()
|
||||||
|
|
||||||
|
go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel)
|
||||||
|
mockTailer.Lines <- line
|
||||||
|
|
||||||
|
expectedRow, err := utils.FromStrings(strings.Split(line.Text, ","))
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(<-rowsChannel).To(Equal(expectedRow))
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
|
||||||
|
It("adds error to errors channel if parsing csv fails", func(done Done) {
|
||||||
|
line := &tail.Line{Text: "invalid"}
|
||||||
|
|
||||||
|
go storageFetcher.FetchStorageDiffs(rowsChannel, errorsChannel)
|
||||||
|
mockTailer.Lines <- line
|
||||||
|
|
||||||
|
Expect(<-errorsChannel).To(HaveOccurred())
|
||||||
|
select {
|
||||||
|
case <-rowsChannel:
|
||||||
|
Fail("value passed to rows channel on error")
|
||||||
|
default:
|
||||||
|
Succeed()
|
||||||
|
}
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
func getFakeLine() *tail.Line {
|
||||||
|
address := common.HexToAddress("0x1234567890abcdef")
|
||||||
|
blockHash := []byte{4, 5, 6}
|
||||||
|
blockHeight := int64(789)
|
||||||
|
storageKey := []byte{9, 8, 7}
|
||||||
|
storageValue := []byte{6, 5, 4}
|
||||||
|
return &tail.Line{
|
||||||
|
Text: fmt.Sprintf("%s,%s,%d,%s,%s", common.Bytes2Hex(address.Bytes()), common.Bytes2Hex(blockHash),
|
||||||
|
blockHeight, common.Bytes2Hex(storageKey), common.Bytes2Hex(storageValue)),
|
||||||
|
Time: time.Time{},
|
||||||
|
Err: nil,
|
||||||
|
}
|
||||||
|
}
|
@ -53,6 +53,6 @@ func (converter *MockConverter) SetToEntityConverterError(err error) {
|
|||||||
converter.entityConverterError = err
|
converter.entityConverterError = err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *MockConverter) SetToModelConverterError(err error) {
|
func (converter *MockConverter) SetToModelConverterError(err error) {
|
||||||
c.modelConverterError = err
|
converter.modelConverterError = err
|
||||||
}
|
}
|
||||||
|
39
libraries/shared/mocks/storage_fetcher.go
Normal file
39
libraries/shared/mocks/storage_fetcher.go
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package mocks
|
||||||
|
|
||||||
|
import "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
|
||||||
|
|
||||||
|
type MockStorageFetcher struct {
|
||||||
|
RowsToReturn []utils.StorageDiffRow
|
||||||
|
ErrsToReturn []error
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMockStorageFetcher() *MockStorageFetcher {
|
||||||
|
return &MockStorageFetcher{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fetcher *MockStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffRow, errs chan<- error) {
|
||||||
|
defer close(out)
|
||||||
|
defer close(errs)
|
||||||
|
for _, err := range fetcher.ErrsToReturn {
|
||||||
|
errs <- err
|
||||||
|
}
|
||||||
|
for _, row := range fetcher.RowsToReturn {
|
||||||
|
out <- row
|
||||||
|
}
|
||||||
|
}
|
@ -21,11 +21,26 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type MockStorageQueue struct {
|
type MockStorageQueue struct {
|
||||||
AddCalled bool
|
AddCalled bool
|
||||||
AddError error
|
AddError error
|
||||||
|
AddPassedRow utils.StorageDiffRow
|
||||||
|
DeleteErr error
|
||||||
|
DeletePassedId int
|
||||||
|
GetAllErr error
|
||||||
|
RowsToReturn []utils.StorageDiffRow
|
||||||
}
|
}
|
||||||
|
|
||||||
func (queue *MockStorageQueue) Add(row utils.StorageDiffRow) error {
|
func (queue *MockStorageQueue) Add(row utils.StorageDiffRow) error {
|
||||||
queue.AddCalled = true
|
queue.AddCalled = true
|
||||||
|
queue.AddPassedRow = row
|
||||||
return queue.AddError
|
return queue.AddError
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (queue *MockStorageQueue) Delete(id int) error {
|
||||||
|
queue.DeletePassedId = id
|
||||||
|
return queue.DeleteErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (queue *MockStorageQueue) GetAll() ([]utils.StorageDiffRow, error) {
|
||||||
|
return queue.RowsToReturn, queue.GetAllErr
|
||||||
|
}
|
||||||
|
@ -23,6 +23,8 @@ import (
|
|||||||
|
|
||||||
type IStorageQueue interface {
|
type IStorageQueue interface {
|
||||||
Add(row utils.StorageDiffRow) error
|
Add(row utils.StorageDiffRow) error
|
||||||
|
Delete(id int) error
|
||||||
|
GetAll() ([]utils.StorageDiffRow, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type StorageQueue struct {
|
type StorageQueue struct {
|
||||||
@ -40,3 +42,14 @@ func (queue StorageQueue) Add(row utils.StorageDiffRow) error {
|
|||||||
row.BlockHeight, row.StorageKey.Bytes(), row.StorageValue.Bytes())
|
row.BlockHeight, row.StorageKey.Bytes(), row.StorageValue.Bytes())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (queue StorageQueue) Delete(id int) error {
|
||||||
|
_, err := queue.db.Exec(`DELETE FROM public.queued_storage WHERE id = $1`, id)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (queue StorageQueue) GetAll() ([]utils.StorageDiffRow, error) {
|
||||||
|
var result []utils.StorageDiffRow
|
||||||
|
err := queue.db.Select(&result, `SELECT * FROM public.queued_storage`)
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
@ -1,3 +1,19 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
package storage_test
|
package storage_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -7,27 +23,79 @@ import (
|
|||||||
|
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/storage"
|
"github.com/vulcanize/vulcanizedb/libraries/shared/storage"
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
|
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/test_config"
|
"github.com/vulcanize/vulcanizedb/test_config"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = Describe("Storage queue", func() {
|
var _ = Describe("Storage queue", func() {
|
||||||
It("adds a storage row to the db", func() {
|
var (
|
||||||
row := utils.StorageDiffRow{
|
db *postgres.DB
|
||||||
|
row utils.StorageDiffRow
|
||||||
|
queue storage.IStorageQueue
|
||||||
|
)
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
row = utils.StorageDiffRow{
|
||||||
Contract: common.HexToAddress("0x123456"),
|
Contract: common.HexToAddress("0x123456"),
|
||||||
BlockHash: common.HexToHash("0x678901"),
|
BlockHash: common.HexToHash("0x678901"),
|
||||||
BlockHeight: 987,
|
BlockHeight: 987,
|
||||||
StorageKey: common.HexToHash("0x654321"),
|
StorageKey: common.HexToHash("0x654321"),
|
||||||
StorageValue: common.HexToHash("0x198765"),
|
StorageValue: common.HexToHash("0x198765"),
|
||||||
}
|
}
|
||||||
db := test_config.NewTestDB(test_config.NewTestNode())
|
db = test_config.NewTestDB(test_config.NewTestNode())
|
||||||
queue := storage.NewStorageQueue(db)
|
test_config.CleanTestDB(db)
|
||||||
|
queue = storage.NewStorageQueue(db)
|
||||||
addErr := queue.Add(row)
|
addErr := queue.Add(row)
|
||||||
|
|
||||||
Expect(addErr).NotTo(HaveOccurred())
|
Expect(addErr).NotTo(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("adds a storage row to the db", func() {
|
||||||
var result utils.StorageDiffRow
|
var result utils.StorageDiffRow
|
||||||
getErr := db.Get(&result, `SELECT contract, block_hash, block_height, storage_key, storage_value FROM public.queued_storage`)
|
getErr := db.Get(&result, `SELECT contract, block_hash, block_height, storage_key, storage_value FROM public.queued_storage`)
|
||||||
Expect(getErr).NotTo(HaveOccurred())
|
Expect(getErr).NotTo(HaveOccurred())
|
||||||
Expect(result).To(Equal(row))
|
Expect(result).To(Equal(row))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("deletes storage row from db", func() {
|
||||||
|
rows, getErr := queue.GetAll()
|
||||||
|
Expect(getErr).NotTo(HaveOccurred())
|
||||||
|
Expect(len(rows)).To(Equal(1))
|
||||||
|
|
||||||
|
err := queue.Delete(rows[0].Id)
|
||||||
|
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
remainingRows, secondGetErr := queue.GetAll()
|
||||||
|
Expect(secondGetErr).NotTo(HaveOccurred())
|
||||||
|
Expect(len(remainingRows)).To(BeZero())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("gets all storage rows from db", func() {
|
||||||
|
rowTwo := utils.StorageDiffRow{
|
||||||
|
Contract: common.HexToAddress("0x123456"),
|
||||||
|
BlockHash: common.HexToHash("0x678902"),
|
||||||
|
BlockHeight: 988,
|
||||||
|
StorageKey: common.HexToHash("0x654322"),
|
||||||
|
StorageValue: common.HexToHash("0x198766"),
|
||||||
|
}
|
||||||
|
addErr := queue.Add(rowTwo)
|
||||||
|
Expect(addErr).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
rows, err := queue.GetAll()
|
||||||
|
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(len(rows)).To(Equal(2))
|
||||||
|
Expect(rows[0]).NotTo(Equal(rows[1]))
|
||||||
|
Expect(rows[0].Id).NotTo(BeZero())
|
||||||
|
Expect(rows[0].Contract).To(Or(Equal(row.Contract), Equal(rowTwo.Contract)))
|
||||||
|
Expect(rows[0].BlockHash).To(Or(Equal(row.BlockHash), Equal(rowTwo.BlockHash)))
|
||||||
|
Expect(rows[0].BlockHeight).To(Or(Equal(row.BlockHeight), Equal(rowTwo.BlockHeight)))
|
||||||
|
Expect(rows[0].StorageKey).To(Or(Equal(row.StorageKey), Equal(rowTwo.StorageKey)))
|
||||||
|
Expect(rows[0].StorageValue).To(Or(Equal(row.StorageValue), Equal(rowTwo.StorageValue)))
|
||||||
|
Expect(rows[1].Id).NotTo(BeZero())
|
||||||
|
Expect(rows[1].Contract).To(Or(Equal(row.Contract), Equal(rowTwo.Contract)))
|
||||||
|
Expect(rows[1].BlockHash).To(Or(Equal(row.BlockHash), Equal(rowTwo.BlockHash)))
|
||||||
|
Expect(rows[1].BlockHeight).To(Or(Equal(row.BlockHeight), Equal(rowTwo.BlockHeight)))
|
||||||
|
Expect(rows[1].StorageKey).To(Or(Equal(row.StorageKey), Equal(rowTwo.StorageKey)))
|
||||||
|
Expect(rows[1].StorageValue).To(Or(Equal(row.StorageValue), Equal(rowTwo.StorageValue)))
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
const ExpectedRowLength = 5
|
const ExpectedRowLength = 5
|
||||||
|
|
||||||
type StorageDiffRow struct {
|
type StorageDiffRow struct {
|
||||||
|
Id int
|
||||||
Contract common.Address
|
Contract common.Address
|
||||||
BlockHash common.Hash `db:"block_hash"`
|
BlockHash common.Hash `db:"block_hash"`
|
||||||
BlockHeight int `db:"block_height"`
|
BlockHeight int `db:"block_height"`
|
||||||
|
@ -1,3 +1,19 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
package transactions
|
package transactions
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -1,3 +1,19 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
package transactions_test
|
package transactions_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -1,3 +1,19 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
package transactions_test
|
package transactions_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -54,10 +54,10 @@ func (watcher *ContractWatcher) AddTransformers(inits interface{}) error {
|
|||||||
watcher.Transformers = append(watcher.Transformers, t)
|
watcher.Transformers = append(watcher.Transformers, t)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, transformer := range watcher.Transformers {
|
for _, contractTransformer := range watcher.Transformers {
|
||||||
err := transformer.Init()
|
err := contractTransformer.Init()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print("Unable to initialize transformer:", transformer.GetConfig().Name, err)
|
log.Print("Unable to initialize transformer:", contractTransformer.GetConfig().Name, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -65,10 +65,10 @@ func (watcher *ContractWatcher) AddTransformers(inits interface{}) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (watcher *ContractWatcher) Execute() error {
|
func (watcher *ContractWatcher) Execute() error {
|
||||||
for _, transformer := range watcher.Transformers {
|
for _, contractTransformer := range watcher.Transformers {
|
||||||
err := transformer.Execute()
|
err := contractTransformer.Execute()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Unable to execute transformer:", transformer.GetConfig().Name, err)
|
log.Error("Unable to execute transformer:", contractTransformer.GetConfig().Name, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -37,7 +37,7 @@ type EventWatcher struct {
|
|||||||
Transformers []transformer.EventTransformer
|
Transformers []transformer.EventTransformer
|
||||||
BlockChain core.BlockChain
|
BlockChain core.BlockChain
|
||||||
DB *postgres.DB
|
DB *postgres.DB
|
||||||
Fetcher fetcher.LogFetcher
|
Fetcher fetcher.ILogFetcher
|
||||||
Chunker chunker.Chunker
|
Chunker chunker.Chunker
|
||||||
Addresses []common.Address
|
Addresses []common.Address
|
||||||
Topics []common.Hash
|
Topics []common.Hash
|
||||||
@ -47,7 +47,7 @@ type EventWatcher struct {
|
|||||||
|
|
||||||
func NewEventWatcher(db *postgres.DB, bc core.BlockChain) EventWatcher {
|
func NewEventWatcher(db *postgres.DB, bc core.BlockChain) EventWatcher {
|
||||||
logChunker := chunker.NewLogChunker()
|
logChunker := chunker.NewLogChunker()
|
||||||
logFetcher := fetcher.NewFetcher(bc)
|
logFetcher := fetcher.NewLogFetcher(bc)
|
||||||
transactionSyncer := transactions.NewTransactionsSyncer(db, bc)
|
transactionSyncer := transactions.NewTransactionsSyncer(db, bc)
|
||||||
return EventWatcher{
|
return EventWatcher{
|
||||||
BlockChain: bc,
|
BlockChain: bc,
|
||||||
|
@ -17,73 +17,100 @@
|
|||||||
package watcher
|
package watcher
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/storage"
|
"github.com/vulcanize/vulcanizedb/libraries/shared/storage"
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
|
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
|
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/fs"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type StorageWatcher struct {
|
type StorageWatcher struct {
|
||||||
db *postgres.DB
|
db *postgres.DB
|
||||||
tailer fs.Tailer
|
StorageFetcher fetcher.IStorageFetcher
|
||||||
Queue storage.IStorageQueue
|
Queue storage.IStorageQueue
|
||||||
Transformers map[common.Address]transformer.StorageTransformer
|
Transformers map[common.Address]transformer.StorageTransformer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStorageWatcher(tailer fs.Tailer, db *postgres.DB) StorageWatcher {
|
func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) StorageWatcher {
|
||||||
transformers := make(map[common.Address]transformer.StorageTransformer)
|
transformers := make(map[common.Address]transformer.StorageTransformer)
|
||||||
queue := storage.NewStorageQueue(db)
|
queue := storage.NewStorageQueue(db)
|
||||||
return StorageWatcher{
|
return StorageWatcher{
|
||||||
db: db,
|
db: db,
|
||||||
tailer: tailer,
|
StorageFetcher: fetcher,
|
||||||
Queue: queue,
|
Queue: queue,
|
||||||
Transformers: transformers,
|
Transformers: transformers,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (watcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) {
|
func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) {
|
||||||
for _, initializer := range initializers {
|
for _, initializer := range initializers {
|
||||||
storageTransformer := initializer(watcher.db)
|
storageTransformer := initializer(storageWatcher.db)
|
||||||
watcher.Transformers[storageTransformer.ContractAddress()] = storageTransformer
|
storageWatcher.Transformers[storageTransformer.ContractAddress()] = storageTransformer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (watcher StorageWatcher) Execute() error {
|
func (storageWatcher StorageWatcher) Execute(rows chan utils.StorageDiffRow, errs chan error, queueRecheckInterval time.Duration) {
|
||||||
t, tailErr := watcher.tailer.Tail()
|
ticker := time.NewTicker(queueRecheckInterval)
|
||||||
if tailErr != nil {
|
go storageWatcher.StorageFetcher.FetchStorageDiffs(rows, errs)
|
||||||
return tailErr
|
for {
|
||||||
}
|
select {
|
||||||
for line := range t.Lines {
|
case fetchErr := <-errs:
|
||||||
row, parseErr := utils.FromStrings(strings.Split(line.Text, ","))
|
logrus.Warn(fmt.Sprintf("error fetching storage diffs: %s", fetchErr))
|
||||||
if parseErr != nil {
|
case row := <-rows:
|
||||||
return parseErr
|
storageWatcher.processRow(row)
|
||||||
|
case <-ticker.C:
|
||||||
|
storageWatcher.processQueue()
|
||||||
}
|
}
|
||||||
storageTransformer, ok := watcher.Transformers[row.Contract]
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (storageWatcher StorageWatcher) processRow(row utils.StorageDiffRow) {
|
||||||
|
storageTransformer, ok := storageWatcher.Transformers[row.Contract]
|
||||||
|
if !ok {
|
||||||
|
// ignore rows from unwatched contracts
|
||||||
|
return
|
||||||
|
}
|
||||||
|
executeErr := storageTransformer.Execute(row)
|
||||||
|
if executeErr != nil {
|
||||||
|
logrus.Warn(fmt.Sprintf("error executing storage transformer: %s", executeErr))
|
||||||
|
queueErr := storageWatcher.Queue.Add(row)
|
||||||
|
if queueErr != nil {
|
||||||
|
logrus.Warn(fmt.Sprintf("error queueing storage diff: %s", queueErr))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (storageWatcher StorageWatcher) processQueue() {
|
||||||
|
rows, fetchErr := storageWatcher.Queue.GetAll()
|
||||||
|
if fetchErr != nil {
|
||||||
|
logrus.Warn(fmt.Sprintf("error getting queued storage: %s", fetchErr))
|
||||||
|
}
|
||||||
|
for _, row := range rows {
|
||||||
|
storageTransformer, ok := storageWatcher.Transformers[row.Contract]
|
||||||
if !ok {
|
if !ok {
|
||||||
logrus.Warn(utils.ErrContractNotFound{Contract: row.Contract.Hex()}.Error())
|
// delete row from queue if address no longer watched
|
||||||
|
storageWatcher.deleteRow(row.Id)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
executeErr := storageTransformer.Execute(row)
|
executeErr := storageTransformer.Execute(row)
|
||||||
if executeErr != nil {
|
if executeErr == nil {
|
||||||
if isKeyNotFound(executeErr) {
|
storageWatcher.deleteRow(row.Id)
|
||||||
queueErr := watcher.Queue.Add(row)
|
|
||||||
if queueErr != nil {
|
|
||||||
logrus.Warn(queueErr.Error())
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logrus.Warn(executeErr.Error())
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
}
|
||||||
|
|
||||||
|
func (storageWatcher StorageWatcher) deleteRow(id int) {
|
||||||
|
deleteErr := storageWatcher.Queue.Delete(id)
|
||||||
|
if deleteErr != nil {
|
||||||
|
logrus.Warn(fmt.Sprintf("error deleting persisted row from queue: %s", deleteErr))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func isKeyNotFound(executeErr error) bool {
|
func isKeyNotFound(executeErr error) bool {
|
||||||
|
@ -17,15 +17,11 @@
|
|||||||
package watcher_test
|
package watcher_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/hpcloud/tail"
|
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
@ -34,7 +30,6 @@ import (
|
|||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
|
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
|
"github.com/vulcanize/vulcanizedb/libraries/shared/transformer"
|
||||||
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
|
"github.com/vulcanize/vulcanizedb/libraries/shared/watcher"
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/core"
|
|
||||||
"github.com/vulcanize/vulcanizedb/pkg/fakes"
|
"github.com/vulcanize/vulcanizedb/pkg/fakes"
|
||||||
"github.com/vulcanize/vulcanizedb/test_config"
|
"github.com/vulcanize/vulcanizedb/test_config"
|
||||||
)
|
)
|
||||||
@ -43,170 +38,207 @@ var _ = Describe("Storage Watcher", func() {
|
|||||||
It("adds transformers", func() {
|
It("adds transformers", func() {
|
||||||
fakeAddress := common.HexToAddress("0x12345")
|
fakeAddress := common.HexToAddress("0x12345")
|
||||||
fakeTransformer := &mocks.MockStorageTransformer{Address: fakeAddress}
|
fakeTransformer := &mocks.MockStorageTransformer{Address: fakeAddress}
|
||||||
w := watcher.NewStorageWatcher(&fakes.MockTailer{}, test_config.NewTestDB(core.Node{}))
|
w := watcher.NewStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode()))
|
||||||
|
|
||||||
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
|
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
|
||||||
|
|
||||||
Expect(w.Transformers[fakeAddress]).To(Equal(fakeTransformer))
|
Expect(w.Transformers[fakeAddress]).To(Equal(fakeTransformer))
|
||||||
})
|
})
|
||||||
|
|
||||||
It("reads the tail of the storage diffs file", func() {
|
Describe("executing watcher", func() {
|
||||||
mockTailer := fakes.NewMockTailer()
|
var (
|
||||||
w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
|
errs chan error
|
||||||
|
mockFetcher *mocks.MockStorageFetcher
|
||||||
|
mockQueue *mocks.MockStorageQueue
|
||||||
|
mockTransformer *mocks.MockStorageTransformer
|
||||||
|
row utils.StorageDiffRow
|
||||||
|
rows chan utils.StorageDiffRow
|
||||||
|
storageWatcher watcher.StorageWatcher
|
||||||
|
)
|
||||||
|
|
||||||
assert(func(err error) {
|
BeforeEach(func() {
|
||||||
Expect(err).To(BeNil())
|
errs = make(chan error)
|
||||||
Expect(mockTailer.TailCalled).To(BeTrue())
|
rows = make(chan utils.StorageDiffRow)
|
||||||
}, w, mockTailer, []*tail.Line{})
|
address := common.HexToAddress("0x0123456789abcdef")
|
||||||
})
|
mockFetcher = mocks.NewMockStorageFetcher()
|
||||||
|
mockQueue = &mocks.MockStorageQueue{}
|
||||||
It("returns error if row parsing fails", func() {
|
mockTransformer = &mocks.MockStorageTransformer{Address: address}
|
||||||
mockTailer := fakes.NewMockTailer()
|
row = utils.StorageDiffRow{
|
||||||
w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
|
Id: 1337,
|
||||||
line := &tail.Line{Text: "oops"}
|
Contract: address,
|
||||||
|
BlockHash: common.HexToHash("0xfedcba9876543210"),
|
||||||
assert(func(err error) {
|
BlockHeight: 0,
|
||||||
Expect(err).To(HaveOccurred())
|
StorageKey: common.HexToHash("0xabcdef1234567890"),
|
||||||
Expect(err).To(MatchError(utils.ErrRowMalformed{Length: 1}))
|
StorageValue: common.HexToHash("0x9876543210abcdef"),
|
||||||
}, w, mockTailer, []*tail.Line{line})
|
}
|
||||||
})
|
|
||||||
|
|
||||||
It("logs error if no transformer can parse storage row", func() {
|
|
||||||
mockTailer := fakes.NewMockTailer()
|
|
||||||
address := common.HexToAddress("0x12345")
|
|
||||||
line := getFakeLine(address.Bytes())
|
|
||||||
w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
|
|
||||||
tempFile, err := ioutil.TempFile("", "log")
|
|
||||||
defer os.Remove(tempFile.Name())
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
logrus.SetOutput(tempFile)
|
|
||||||
|
|
||||||
assert(func(err error) {
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
logContent, readErr := ioutil.ReadFile(tempFile.Name())
|
|
||||||
Expect(readErr).NotTo(HaveOccurred())
|
|
||||||
Expect(string(logContent)).To(ContainSubstring(utils.ErrContractNotFound{Contract: address.Hex()}.Error()))
|
|
||||||
}, w, mockTailer, []*tail.Line{line})
|
|
||||||
})
|
|
||||||
|
|
||||||
It("executes transformer with storage row", func() {
|
|
||||||
address := []byte{1, 2, 3}
|
|
||||||
line := getFakeLine(address)
|
|
||||||
mockTailer := fakes.NewMockTailer()
|
|
||||||
w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
|
|
||||||
fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address)}
|
|
||||||
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
|
|
||||||
|
|
||||||
assert(func(err error) {
|
|
||||||
Expect(err).To(BeNil())
|
|
||||||
expectedRow, err := utils.FromStrings(strings.Split(line.Text, ","))
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(fakeTransformer.PassedRow).To(Equal(expectedRow))
|
|
||||||
}, w, mockTailer, []*tail.Line{line})
|
|
||||||
})
|
|
||||||
|
|
||||||
Describe("when executing transformer fails", func() {
|
|
||||||
It("queues row when error is storage key not found", func() {
|
|
||||||
address := []byte{1, 2, 3}
|
|
||||||
line := getFakeLine(address)
|
|
||||||
mockTailer := fakes.NewMockTailer()
|
|
||||||
w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
|
|
||||||
mockQueue := &mocks.MockStorageQueue{}
|
|
||||||
w.Queue = mockQueue
|
|
||||||
keyNotFoundError := utils.ErrStorageKeyNotFound{Key: "unknown_storage_key"}
|
|
||||||
fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: keyNotFoundError}
|
|
||||||
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
|
|
||||||
|
|
||||||
assert(func(err error) {
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(mockQueue.AddCalled).To(BeTrue())
|
|
||||||
}, w, mockTailer, []*tail.Line{line})
|
|
||||||
})
|
})
|
||||||
|
|
||||||
It("logs error if queuing row fails", func() {
|
It("logs error if fetching storage diffs fails", func(done Done) {
|
||||||
address := []byte{1, 2, 3}
|
mockFetcher.ErrsToReturn = []error{fakes.FakeError}
|
||||||
line := getFakeLine(address)
|
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
|
||||||
mockTailer := fakes.NewMockTailer()
|
storageWatcher.Queue = mockQueue
|
||||||
w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
|
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
|
||||||
mockQueue := &mocks.MockStorageQueue{}
|
tempFile, fileErr := ioutil.TempFile("", "log")
|
||||||
mockQueue.AddError = fakes.FakeError
|
Expect(fileErr).NotTo(HaveOccurred())
|
||||||
w.Queue = mockQueue
|
|
||||||
keyNotFoundError := utils.ErrStorageKeyNotFound{Key: "unknown_storage_key"}
|
|
||||||
fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: keyNotFoundError}
|
|
||||||
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
|
|
||||||
tempFile, err := ioutil.TempFile("", "log")
|
|
||||||
defer os.Remove(tempFile.Name())
|
defer os.Remove(tempFile.Name())
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
logrus.SetOutput(tempFile)
|
logrus.SetOutput(tempFile)
|
||||||
|
|
||||||
assert(func(err error) {
|
go storageWatcher.Execute(rows, errs, time.Hour)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
Expect(mockQueue.AddCalled).To(BeTrue())
|
Eventually(func() (string, error) {
|
||||||
logContent, readErr := ioutil.ReadFile(tempFile.Name())
|
logContent, err := ioutil.ReadFile(tempFile.Name())
|
||||||
Expect(readErr).NotTo(HaveOccurred())
|
return string(logContent), err
|
||||||
Expect(string(logContent)).To(ContainSubstring(fakes.FakeError.Error()))
|
}).Should(ContainSubstring(fakes.FakeError.Error()))
|
||||||
}, w, mockTailer, []*tail.Line{line})
|
close(done)
|
||||||
})
|
})
|
||||||
|
|
||||||
It("logs any other error", func() {
|
Describe("transforming new storage diffs", func() {
|
||||||
address := []byte{1, 2, 3}
|
BeforeEach(func() {
|
||||||
line := getFakeLine(address)
|
mockFetcher.RowsToReturn = []utils.StorageDiffRow{row}
|
||||||
mockTailer := fakes.NewMockTailer()
|
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
|
||||||
w := watcher.NewStorageWatcher(mockTailer, test_config.NewTestDB(core.Node{}))
|
storageWatcher.Queue = mockQueue
|
||||||
executionError := errors.New("storage watcher failed attempting to execute transformer")
|
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
|
||||||
fakeTransformer := &mocks.MockStorageTransformer{Address: common.BytesToAddress(address), ExecuteErr: executionError}
|
})
|
||||||
w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer})
|
|
||||||
tempFile, err := ioutil.TempFile("", "log")
|
|
||||||
defer os.Remove(tempFile.Name())
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
logrus.SetOutput(tempFile)
|
|
||||||
|
|
||||||
assert(func(err error) {
|
It("executes transformer for recognized storage row", func(done Done) {
|
||||||
Expect(err).NotTo(HaveOccurred())
|
go storageWatcher.Execute(rows, errs, time.Hour)
|
||||||
logContent, readErr := ioutil.ReadFile(tempFile.Name())
|
|
||||||
Expect(readErr).NotTo(HaveOccurred())
|
Eventually(func() utils.StorageDiffRow {
|
||||||
Expect(string(logContent)).To(ContainSubstring(executionError.Error()))
|
return mockTransformer.PassedRow
|
||||||
}, w, mockTailer, []*tail.Line{line})
|
}).Should(Equal(row))
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
|
||||||
|
It("queues row for later processing if transformer execution fails", func(done Done) {
|
||||||
|
mockTransformer.ExecuteErr = fakes.FakeError
|
||||||
|
|
||||||
|
go storageWatcher.Execute(rows, errs, time.Hour)
|
||||||
|
|
||||||
|
Expect(<-errs).To(BeNil())
|
||||||
|
Eventually(func() bool {
|
||||||
|
return mockQueue.AddCalled
|
||||||
|
}).Should(BeTrue())
|
||||||
|
Eventually(func() utils.StorageDiffRow {
|
||||||
|
return mockQueue.AddPassedRow
|
||||||
|
}).Should(Equal(row))
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
|
||||||
|
It("logs error if queueing row fails", func(done Done) {
|
||||||
|
mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{}
|
||||||
|
mockQueue.AddError = fakes.FakeError
|
||||||
|
tempFile, fileErr := ioutil.TempFile("", "log")
|
||||||
|
Expect(fileErr).NotTo(HaveOccurred())
|
||||||
|
defer os.Remove(tempFile.Name())
|
||||||
|
logrus.SetOutput(tempFile)
|
||||||
|
|
||||||
|
go storageWatcher.Execute(rows, errs, time.Hour)
|
||||||
|
|
||||||
|
Eventually(func() bool {
|
||||||
|
return mockQueue.AddCalled
|
||||||
|
}).Should(BeTrue())
|
||||||
|
Eventually(func() (string, error) {
|
||||||
|
logContent, err := ioutil.ReadFile(tempFile.Name())
|
||||||
|
return string(logContent), err
|
||||||
|
}).Should(ContainSubstring(fakes.FakeError.Error()))
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Describe("transforming queued storage diffs", func() {
|
||||||
|
BeforeEach(func() {
|
||||||
|
mockQueue.RowsToReturn = []utils.StorageDiffRow{row}
|
||||||
|
storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode()))
|
||||||
|
storageWatcher.Queue = mockQueue
|
||||||
|
storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer})
|
||||||
|
})
|
||||||
|
|
||||||
|
It("logs error if getting queued storage fails", func(done Done) {
|
||||||
|
mockQueue.GetAllErr = fakes.FakeError
|
||||||
|
tempFile, fileErr := ioutil.TempFile("", "log")
|
||||||
|
Expect(fileErr).NotTo(HaveOccurred())
|
||||||
|
defer os.Remove(tempFile.Name())
|
||||||
|
logrus.SetOutput(tempFile)
|
||||||
|
|
||||||
|
go storageWatcher.Execute(rows, errs, time.Nanosecond)
|
||||||
|
|
||||||
|
Eventually(func() (string, error) {
|
||||||
|
logContent, err := ioutil.ReadFile(tempFile.Name())
|
||||||
|
return string(logContent), err
|
||||||
|
}).Should(ContainSubstring(fakes.FakeError.Error()))
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
|
||||||
|
It("executes transformer for storage row", func(done Done) {
|
||||||
|
go storageWatcher.Execute(rows, errs, time.Nanosecond)
|
||||||
|
|
||||||
|
Eventually(func() utils.StorageDiffRow {
|
||||||
|
return mockTransformer.PassedRow
|
||||||
|
}).Should(Equal(row))
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
|
||||||
|
It("deletes row from queue if transformer execution successful", func(done Done) {
|
||||||
|
go storageWatcher.Execute(rows, errs, time.Nanosecond)
|
||||||
|
|
||||||
|
Eventually(func() int {
|
||||||
|
return mockQueue.DeletePassedId
|
||||||
|
}).Should(Equal(row.Id))
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
|
||||||
|
It("logs error if deleting persisted row fails", func(done Done) {
|
||||||
|
mockQueue.DeleteErr = fakes.FakeError
|
||||||
|
tempFile, fileErr := ioutil.TempFile("", "log")
|
||||||
|
Expect(fileErr).NotTo(HaveOccurred())
|
||||||
|
defer os.Remove(tempFile.Name())
|
||||||
|
logrus.SetOutput(tempFile)
|
||||||
|
|
||||||
|
go storageWatcher.Execute(rows, errs, time.Nanosecond)
|
||||||
|
|
||||||
|
Eventually(func() (string, error) {
|
||||||
|
logContent, err := ioutil.ReadFile(tempFile.Name())
|
||||||
|
return string(logContent), err
|
||||||
|
}).Should(ContainSubstring(fakes.FakeError.Error()))
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
|
||||||
|
It("deletes obsolete row from queue if contract not recognized", func(done Done) {
|
||||||
|
obsoleteRow := utils.StorageDiffRow{
|
||||||
|
Id: row.Id + 1,
|
||||||
|
Contract: common.HexToAddress("0xfedcba9876543210"),
|
||||||
|
}
|
||||||
|
mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow}
|
||||||
|
|
||||||
|
go storageWatcher.Execute(rows, errs, time.Nanosecond)
|
||||||
|
|
||||||
|
Eventually(func() int {
|
||||||
|
return mockQueue.DeletePassedId
|
||||||
|
}).Should(Equal(obsoleteRow.Id))
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
|
||||||
|
It("logs error if deleting obsolete row fails", func(done Done) {
|
||||||
|
obsoleteRow := utils.StorageDiffRow{
|
||||||
|
Id: row.Id + 1,
|
||||||
|
Contract: common.HexToAddress("0xfedcba9876543210"),
|
||||||
|
}
|
||||||
|
mockQueue.RowsToReturn = []utils.StorageDiffRow{obsoleteRow}
|
||||||
|
mockQueue.DeleteErr = fakes.FakeError
|
||||||
|
tempFile, fileErr := ioutil.TempFile("", "log")
|
||||||
|
Expect(fileErr).NotTo(HaveOccurred())
|
||||||
|
defer os.Remove(tempFile.Name())
|
||||||
|
logrus.SetOutput(tempFile)
|
||||||
|
|
||||||
|
go storageWatcher.Execute(rows, errs, time.Nanosecond)
|
||||||
|
|
||||||
|
Eventually(func() (string, error) {
|
||||||
|
logContent, err := ioutil.ReadFile(tempFile.Name())
|
||||||
|
return string(logContent), err
|
||||||
|
}).Should(ContainSubstring(fakes.FakeError.Error()))
|
||||||
|
close(done)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
func assert(assertion func(err error), watcher watcher.StorageWatcher, mockTailer *fakes.MockTailer, lines []*tail.Line) {
|
|
||||||
errs := make(chan error, 1)
|
|
||||||
done := make(chan bool, 1)
|
|
||||||
go execute(watcher, errs, done)
|
|
||||||
for _, line := range lines {
|
|
||||||
mockTailer.Lines <- line
|
|
||||||
}
|
|
||||||
close(mockTailer.Lines)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case err := <-errs:
|
|
||||||
assertion(err)
|
|
||||||
break
|
|
||||||
case <-done:
|
|
||||||
assertion(nil)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func execute(w watcher.StorageWatcher, errs chan error, done chan bool) {
|
|
||||||
err := w.Execute()
|
|
||||||
if err != nil {
|
|
||||||
errs <- err
|
|
||||||
} else {
|
|
||||||
done <- true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getFakeLine(address []byte) *tail.Line {
|
|
||||||
blockHash := []byte{4, 5, 6}
|
|
||||||
blockHeight := int64(789)
|
|
||||||
storageKey := []byte{9, 8, 7}
|
|
||||||
storageValue := []byte{6, 5, 4}
|
|
||||||
return &tail.Line{
|
|
||||||
Text: fmt.Sprintf("%s,%s,%d,%s,%s", common.Bytes2Hex(address), common.Bytes2Hex(blockHash), blockHeight, common.Bytes2Hex(storageKey), common.Bytes2Hex(storageValue)),
|
|
||||||
Time: time.Time{},
|
|
||||||
Err: nil,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -1,3 +1,19 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
package fakes
|
package fakes
|
||||||
|
|
||||||
import "github.com/vulcanize/vulcanizedb/pkg/filters"
|
import "github.com/vulcanize/vulcanizedb/pkg/filters"
|
||||||
|
@ -1,3 +1,19 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
package fakes
|
package fakes
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -1,3 +1,19 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
package fakes
|
package fakes
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -1,3 +1,19 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
package fakes
|
package fakes
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -6,24 +22,22 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type MockTailer struct {
|
type MockTailer struct {
|
||||||
Lines chan *tail.Line
|
Lines chan *tail.Line
|
||||||
TailCalled bool
|
TailErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMockTailer() *MockTailer {
|
func NewMockTailer() *MockTailer {
|
||||||
return &MockTailer{
|
return &MockTailer{
|
||||||
Lines: make(chan *tail.Line, 1),
|
Lines: make(chan *tail.Line, 1),
|
||||||
TailCalled: false,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mock *MockTailer) Tail() (*tail.Tail, error) {
|
func (mock *MockTailer) Tail() (*tail.Tail, error) {
|
||||||
mock.TailCalled = true
|
|
||||||
fakeTail := &tail.Tail{
|
fakeTail := &tail.Tail{
|
||||||
Filename: "",
|
Filename: "",
|
||||||
Lines: mock.Lines,
|
Lines: mock.Lines,
|
||||||
Config: tail.Config{},
|
Config: tail.Config{},
|
||||||
Tomb: tomb.Tomb{},
|
Tomb: tomb.Tomb{},
|
||||||
}
|
}
|
||||||
return fakeTail, nil
|
return fakeTail, mock.TailErr
|
||||||
}
|
}
|
||||||
|
@ -1,3 +1,19 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
package fakes
|
package fakes
|
||||||
|
|
||||||
import "github.com/ethereum/go-ethereum/core/types"
|
import "github.com/ethereum/go-ethereum/core/types"
|
||||||
|
Loading…
Reference in New Issue
Block a user