Merge pull request #61 from 8thlight/vat-flux-migration

vat flux transformer
This commit is contained in:
Takayuki Goto 2018-10-17 13:04:18 -05:00 committed by GitHub
commit 75246e9ce9
21 changed files with 957 additions and 2 deletions

View File

@ -111,6 +111,7 @@ func buildTransformerInitializerMap() map[string]shared2.TransformerInitializer
transformerInitializerMap["vatFold"] = transformers.VatFoldTransformerInitializer transformerInitializerMap["vatFold"] = transformers.VatFoldTransformerInitializer
transformerInitializerMap["vatToll"] = transformers.VatTollTransformerInitializer transformerInitializerMap["vatToll"] = transformers.VatTollTransformerInitializer
transformerInitializerMap["vatTune"] = transformers.VatTuneTransformerInitializer transformerInitializerMap["vatTune"] = transformers.VatTuneTransformerInitializer
transformerInitializerMap["vatFlux"] = transformers.VatFluxTransformerInitializer
return transformerInitializerMap return transformerInitializerMap
} }

View File

@ -0,0 +1,3 @@
DROP TABLE maker.vat_flux;
ALTER TABLE public.checked_headers
DROP COLUMN vat_flux_checked;

View File

@ -0,0 +1,14 @@
CREATE TABLE maker.vat_flux (
id SERIAL PRIMARY KEY,
header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE,
ilk TEXT,
src TEXT,
dst TEXT,
rad numeric,
tx_idx INTEGER NOT NULL,
raw_log JSONB,
UNIQUE (header_id, tx_idx)
);
ALTER TABLE public.checked_headers
ADD COLUMN vat_flux_checked BOOLEAN NOT NULL DEFAULT FALSE;

View File

@ -697,6 +697,42 @@ CREATE SEQUENCE maker.tend_id_seq
ALTER SEQUENCE maker.tend_id_seq OWNED BY maker.tend.id; ALTER SEQUENCE maker.tend_id_seq OWNED BY maker.tend.id;
--
-- Name: vat_flux; Type: TABLE; Schema: maker; Owner: -
--
CREATE TABLE maker.vat_flux (
id integer NOT NULL,
header_id integer NOT NULL,
ilk text,
src text,
dst text,
rad numeric,
tx_idx integer NOT NULL,
raw_log jsonb
);
--
-- Name: vat_flux_id_seq; Type: SEQUENCE; Schema: maker; Owner: -
--
CREATE SEQUENCE maker.vat_flux_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
--
-- Name: vat_flux_id_seq; Type: SEQUENCE OWNED BY; Schema: maker; Owner: -
--
ALTER SEQUENCE maker.vat_flux_id_seq OWNED BY maker.vat_flux.id;
-- --
-- Name: vat_fold; Type: TABLE; Schema: maker; Owner: - -- Name: vat_fold; Type: TABLE; Schema: maker; Owner: -
-- --
@ -1052,7 +1088,8 @@ CREATE TABLE public.checked_headers (
vat_heal_checked boolean DEFAULT false NOT NULL, vat_heal_checked boolean DEFAULT false NOT NULL,
vat_toll_checked boolean DEFAULT false NOT NULL, vat_toll_checked boolean DEFAULT false NOT NULL,
vat_tune_checked boolean DEFAULT false NOT NULL, vat_tune_checked boolean DEFAULT false NOT NULL,
vat_grab_checked boolean DEFAULT false NOT NULL vat_grab_checked boolean DEFAULT false NOT NULL,
vat_flux_checked boolean DEFAULT false NOT NULL
); );
@ -1500,6 +1537,13 @@ ALTER TABLE ONLY maker.price_feeds ALTER COLUMN id SET DEFAULT nextval('maker.pr
ALTER TABLE ONLY maker.tend ALTER COLUMN id SET DEFAULT nextval('maker.tend_id_seq'::regclass); ALTER TABLE ONLY maker.tend ALTER COLUMN id SET DEFAULT nextval('maker.tend_id_seq'::regclass);
--
-- Name: vat_flux id; Type: DEFAULT; Schema: maker; Owner: -
--
ALTER TABLE ONLY maker.vat_flux ALTER COLUMN id SET DEFAULT nextval('maker.vat_flux_id_seq'::regclass);
-- --
-- Name: vat_fold id; Type: DEFAULT; Schema: maker; Owner: - -- Name: vat_fold id; Type: DEFAULT; Schema: maker; Owner: -
-- --
@ -1907,6 +1951,22 @@ ALTER TABLE ONLY maker.tend
ADD CONSTRAINT tend_pkey PRIMARY KEY (id); ADD CONSTRAINT tend_pkey PRIMARY KEY (id);
--
-- Name: vat_flux vat_flux_header_id_tx_idx_key; Type: CONSTRAINT; Schema: maker; Owner: -
--
ALTER TABLE ONLY maker.vat_flux
ADD CONSTRAINT vat_flux_header_id_tx_idx_key UNIQUE (header_id, tx_idx);
--
-- Name: vat_flux vat_flux_pkey; Type: CONSTRAINT; Schema: maker; Owner: -
--
ALTER TABLE ONLY maker.vat_flux
ADD CONSTRAINT vat_flux_pkey PRIMARY KEY (id);
-- --
-- Name: vat_fold vat_fold_header_id_tx_idx_key; Type: CONSTRAINT; Schema: maker; Owner: - -- Name: vat_fold vat_fold_header_id_tx_idx_key; Type: CONSTRAINT; Schema: maker; Owner: -
-- --
@ -2317,6 +2377,14 @@ ALTER TABLE ONLY maker.tend
ADD CONSTRAINT tend_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE; ADD CONSTRAINT tend_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE;
--
-- Name: vat_flux vat_flux_header_id_fkey; Type: FK CONSTRAINT; Schema: maker; Owner: -
--
ALTER TABLE ONLY maker.vat_flux
ADD CONSTRAINT vat_flux_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.headers(id) ON DELETE CASCADE;
-- --
-- Name: vat_fold vat_fold_header_id_fkey; Type: FK CONSTRAINT; Schema: maker; Owner: - -- Name: vat_fold vat_fold_header_id_fkey; Type: FK CONSTRAINT; Schema: maker; Owner: -
-- --

View File

@ -62,6 +62,7 @@ var (
vatFoldMethod = GetSolidityMethodSignature(VatABI, "fold") vatFoldMethod = GetSolidityMethodSignature(VatABI, "fold")
vatTollMethod = GetSolidityMethodSignature(VatABI, "toll") vatTollMethod = GetSolidityMethodSignature(VatABI, "toll")
vatTuneMethod = GetSolidityMethodSignature(VatABI, "tune") vatTuneMethod = GetSolidityMethodSignature(VatABI, "tune")
vatFluxMethod = GetSolidityMethodSignature(VatABI, "flux")
BiteSignature = GetEventSignature(biteMethod) BiteSignature = GetEventSignature(biteMethod)
DealSignature = GetLogNoteSignature(dealMethod) DealSignature = GetLogNoteSignature(dealMethod)
@ -88,4 +89,5 @@ var (
VatFoldSignature = GetLogNoteSignature(vatFoldMethod) VatFoldSignature = GetLogNoteSignature(vatFoldMethod)
VatTollSignature = GetLogNoteSignature(vatTollMethod) VatTollSignature = GetLogNoteSignature(vatTollMethod)
VatTuneSignature = GetLogNoteSignature(vatTuneMethod) VatTuneSignature = GetLogNoteSignature(vatTuneMethod)
VatFluxSignature = GetLogNoteSignature(vatFluxMethod)
) )

View File

@ -133,6 +133,13 @@ var _ = Describe("Event signature generator", func() {
Expect(expected).To(Equal(actual)) Expect(expected).To(Equal(actual))
}) })
It("gets the vat flux method signature", func() {
expected := "flux(bytes32,bytes32,bytes32,int256)"
actual := shared.GetSolidityMethodSignature(shared.VatABI, "flux")
Expect(expected).To(Equal(actual))
})
It("gets the flip deal method signature", func() { It("gets the flip deal method signature", func() {
expected := "deal(uint256)" expected := "deal(uint256)"
actual := shared.GetSolidityMethodSignature(shared.FlipperABI, "deal") actual := shared.GetSolidityMethodSignature(shared.FlipperABI, "deal")

View File

@ -0,0 +1,22 @@
package vat_flux
import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/transformers/test_data"
"github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux"
)
type MockVatFlux struct {
err error
PassedLogs []types.Log
}
func (converter *MockVatFlux) ToModels(ethLogs []types.Log) ([]vat_flux.VatFluxModel, error) {
converter.PassedLogs = ethLogs
return []vat_flux.VatFluxModel{test_data.VatFluxModel}, converter.err
}
func (converter *MockVatFlux) SetConverterError(e error) {
converter.err = e
}

View File

@ -0,0 +1,65 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package vat_flux
import (
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux"
)
type MockVatFluxRepository struct {
createErr error
markHeaderCheckedErr error
MarkHeaderCheckedPassedHeaderID int64
missingHeaders []core.Header
missingHeadersErr error
PassedStartingBlockNumber int64
PassedEndingBlockNumber int64
PassedHeaderID int64
PassedModels []vat_flux.VatFluxModel
}
func (repository *MockVatFluxRepository) MarkCheckedHeader(headerId int64) error {
repository.MarkHeaderCheckedPassedHeaderID = headerId
return repository.markHeaderCheckedErr
}
func (repository *MockVatFluxRepository) Create(headerID int64, models []vat_flux.VatFluxModel) error {
repository.PassedHeaderID = headerID
repository.PassedModels = models
return repository.createErr
}
func (repository *MockVatFluxRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64) ([]core.Header, error) {
repository.PassedStartingBlockNumber = startingBlockNumber
repository.PassedEndingBlockNumber = endingBlockNumber
return repository.missingHeaders, repository.missingHeadersErr
}
func (repository *MockVatFluxRepository) SetMarkHeaderCheckedErr(e error) {
repository.markHeaderCheckedErr = e
}
func (repository *MockVatFluxRepository) SetMissingHeadersErr(e error) {
repository.missingHeadersErr = e
}
func (repository *MockVatFluxRepository) SetMissingHeaders(headers []core.Header) {
repository.missingHeaders = headers
}
func (repository *MockVatFluxRepository) SetCreateError(e error) {
repository.createErr = e
}

View File

@ -0,0 +1,52 @@
/*
* Copyright 2018 Vulcanize
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test_data
import (
"encoding/json"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux"
)
var VatFluxLog = types.Log{
Address: common.HexToAddress("0x239e6f0ab02713f1f8aa90ebeded9fc66dc96cd6"),
Topics: []common.Hash{
common.HexToHash("0xa6e4182100000000000000000000000000000000000000000000000000000000"),
common.HexToHash("0x0000000000000000000000007340e006f4135BA6970D43bf43d88DCAD4e7a8CA"),
common.HexToHash("0x0000000000000000000000007FA9EF6609Ca7921112231f8f195138ebba29770"),
common.HexToHash("0x00000000000000000000000093086347c52a8878af71bb818509d484c6a2e1bf"),
},
Data: hexutil.MustDecode("0x00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000084a6e418217340e006f4135ba6970d43bf43d88dcad4e7a8ca00000000000000000000000007fa9ef6609ca7921112231f8f195138ebba297700000000000000000000000093086347c52a8878af71bb818509d484c6a2e1bf000000000000000000000000000000000000000000000000000000000000000000000000000000000000007b"),
BlockNumber: 23,
TxHash: common.HexToHash("0xf98681bab9b8c75bd8aa4a7d0a8142ff527c5ea8fa54f3c2835d4533838b2e6f"),
TxIndex: 0,
BlockHash: common.HexToHash("0xc3fe212ad4f81ade1265af6de2b4bb50d962b1a4db06aabc982e7f9cb0972c2d"),
Index: 0,
Removed: false,
}
var rawFluxLog, _ = json.Marshal(VatFluxLog)
var VatFluxModel = vat_flux.VatFluxModel{
Ilk: "0x7340e006f4135BA6970D43bf43d88DCAD4e7a8CA",
Src: "0x7FA9EF6609Ca7921112231f8f195138ebba29770",
Dst: "0x93086347c52a8878af71bB818509d484c6a2e1bF",
Rad: "123",
TransactionIndex: 0,
Raw: rawFluxLog,
}

View File

@ -37,6 +37,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/transformers/price_feeds" "github.com/vulcanize/vulcanizedb/pkg/transformers/price_feeds"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared" "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/tend" "github.com/vulcanize/vulcanizedb/pkg/transformers/tend"
"github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux"
"github.com/vulcanize/vulcanizedb/pkg/transformers/vat_fold" "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_fold"
"github.com/vulcanize/vulcanizedb/pkg/transformers/vat_grab" "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_grab"
"github.com/vulcanize/vulcanizedb/pkg/transformers/vat_heal" "github.com/vulcanize/vulcanizedb/pkg/transformers/vat_heal"
@ -75,6 +76,7 @@ var (
VatFoldTransformerInitializer = vat_fold.VatFoldTransformerInitializer{Config: vat_fold.VatFoldConfig}.NewVatFoldTransformer VatFoldTransformerInitializer = vat_fold.VatFoldTransformerInitializer{Config: vat_fold.VatFoldConfig}.NewVatFoldTransformer
VatTollTransformerInitializer = vat_toll.VatTollTransformerInitializer{Config: vat_toll.VatTollConfig}.NewVatTollTransformer VatTollTransformerInitializer = vat_toll.VatTollTransformerInitializer{Config: vat_toll.VatTollConfig}.NewVatTollTransformer
VatTuneTransformerInitializer = vat_tune.VatTuneTransformerInitializer{Config: vat_tune.VatTuneConfig}.NewVatTuneTransformer VatTuneTransformerInitializer = vat_tune.VatTuneTransformerInitializer{Config: vat_tune.VatTuneConfig}.NewVatTuneTransformer
VatFluxTransformerInitializer = vat_flux.VatFluxTransformerInitializer{Config: vat_flux.VatFluxConfig}.NewVatFluxTransformer
) )
func TransformerInitializers() []shared.TransformerInitializer { func TransformerInitializers() []shared.TransformerInitializer {
@ -104,5 +106,6 @@ func TransformerInitializers() []shared.TransformerInitializer {
VatFoldTransformerInitializer, VatFoldTransformerInitializer,
VatTollTransformerInitializer, VatTollTransformerInitializer,
VatTuneTransformerInitializer, VatTuneTransformerInitializer,
VatFluxTransformerInitializer,
} }
} }

View File

@ -0,0 +1,11 @@
package vat_flux
import "github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
var VatFluxConfig = shared.TransformerConfig{
ContractAddresses: []string{shared.VatContractAddress},
ContractAbi: shared.VatABI,
Topics: []string{shared.VatFluxSignature},
StartingBlockNumber: 0,
EndingBlockNumber: 10000000,
}

View File

@ -0,0 +1,75 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package vat_flux
import (
"encoding/json"
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"math/big"
)
type Converter interface {
ToModels(ethLogs []types.Log) ([]VatFluxModel, error)
}
type VatFluxConverter struct{}
func (VatFluxConverter) ToModels(ethLogs []types.Log) ([]VatFluxModel, error) {
var models []VatFluxModel
for _, ethLog := range ethLogs {
err := verifyLog(ethLog)
if err != nil {
return nil, err
}
ilk := common.BytesToAddress(ethLog.Topics[1].Bytes())
src := common.BytesToAddress(ethLog.Topics[2].Bytes())
dst := common.BytesToAddress(ethLog.Topics[3].Bytes())
radBytes := shared.GetDataBytesAtIndex(-1, ethLog.Data)
rad := big.NewInt(0).SetBytes(radBytes).String()
if err != nil {
return nil, err
}
rawLogJson, err := json.Marshal(ethLog)
if err != nil {
return nil, err
}
model := VatFluxModel{
Ilk: ilk.String(),
Src: src.String(),
Dst: dst.String(),
Rad: rad,
TransactionIndex: ethLog.TxIndex,
Raw: rawLogJson,
}
models = append(models, model)
}
return models, nil
}
func verifyLog(log types.Log) error {
if len(log.Topics) < 4 {
return errors.New("log missing topics")
}
return nil
}

View File

@ -0,0 +1,55 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package vat_flux_test
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/transformers/test_data"
"github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux"
)
var _ = Describe("VatFlux converter", func() {
It("Converts logs to models", func() {
converter := vat_flux.VatFluxConverter{}
models, err := converter.ToModels([]types.Log{test_data.VatFluxLog})
Expect(err).NotTo(HaveOccurred())
Expect(len(models)).To(Equal(1))
Expect(models[0].Ilk).To(Equal(test_data.VatFluxModel.Ilk))
Expect(models[0].Src).To(Equal(test_data.VatFluxModel.Src))
Expect(models[0].Dst).To(Equal(test_data.VatFluxModel.Dst))
Expect(models[0].Rad).To(Equal(test_data.VatFluxModel.Rad))
Expect(models[0].TransactionIndex).To(Equal(test_data.VatFluxModel.TransactionIndex))
Expect(models[0].Raw).To(Equal(test_data.VatFluxModel.Raw))
})
It("Returns an error there are missing topics", func() {
converter := vat_flux.VatFluxConverter{}
badLog := types.Log{
Topics: []common.Hash{
common.HexToHash("0x"),
common.HexToHash("0x"),
common.HexToHash("0x"),
},
}
_, err := converter.ToModels([]types.Log{badLog})
Expect(err).To(HaveOccurred())
})
})

View File

@ -0,0 +1,24 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package vat_flux
type VatFluxModel struct {
Ilk string
Src string
Dst string
Rad string
TransactionIndex uint `db:"tx_idx"`
Raw []byte `db:"raw_log"`
}

View File

@ -0,0 +1,84 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package vat_flux
import (
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
type Repository interface {
Create(headerId int64, models []VatFluxModel) error
MissingHeaders(startingBlock, endingBlock int64) ([]core.Header, error)
MarkCheckedHeader(headerId int64) error
}
type VatFluxRepository struct {
DB *postgres.DB
}
func NewVatFluxRepository(db *postgres.DB) VatFluxRepository {
return VatFluxRepository{DB: db}
}
func (repository VatFluxRepository) Create(headerId int64, models []VatFluxModel) error {
tx, err := repository.DB.Begin()
if err != nil {
return err
}
for _, model := range models {
_, err := tx.Exec(`INSERT INTO maker.vat_flux (header_id, ilk, dst, src, rad, tx_idx, raw_log)
VALUES($1, $2, $3, $4, $5::numeric, $6, $7)`,
headerId, model.Ilk, model.Dst, model.Src, model.Rad, model.TransactionIndex, model.Raw)
if err != nil {
tx.Rollback()
return err
}
}
_, err = tx.Exec(`INSERT INTO public.checked_headers (header_id, vat_flux_checked)
VALUES($1, $2)
ON CONFLICT (header_id) DO
UPDATE SET vat_flux_checked = $2`, headerId, true)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
func (repository VatFluxRepository) MissingHeaders(startingBlock, endingBlock int64) ([]core.Header, error) {
var headers []core.Header
err := repository.DB.Select(&headers,
`SELECT headers.id, block_number from headers
LEFT JOIN checked_headers on headers.id = header_id
WHERE (header_id ISNULL OR vat_flux_checked IS FALSE)
AND headers.block_number >= $1
AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $3`,
startingBlock, endingBlock, repository.DB.Node.ID)
return headers, err
}
func (repository VatFluxRepository) MarkCheckedHeader(headerId int64) error {
_, err := repository.DB.Exec(`INSERT INTO public.checked_headers (header_id, vat_flux_checked)
VALUES($1, $2)
ON CONFLICT (header_id) DO
UPDATE SET vat_flux_checked = $2`, headerId, true)
return err
}

View File

@ -0,0 +1,227 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package vat_flux_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/pkg/transformers/test_data"
"github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux"
"github.com/vulcanize/vulcanizedb/test_config"
)
var _ = Describe("VatFlux Repository", func() {
var db *postgres.DB
var repository vat_flux.VatFluxRepository
var headerRepository repositories.HeaderRepository
var headerId int64
var err error
BeforeEach(func() {
node := test_config.NewTestNode()
db = test_config.NewTestDB(node)
test_config.CleanTestDB(db)
repository = vat_flux.VatFluxRepository{DB: db}
headerRepository = repositories.NewHeaderRepository(db)
headerId, err = headerRepository.CreateOrUpdateHeader(fakes.FakeHeader)
Expect(err).NotTo(HaveOccurred())
})
type VatFluxDBResult struct {
vat_flux.VatFluxModel
Id int
HeaderId int64 `db:"header_id"`
}
type CheckedHeaderResult struct {
VatFluxChecked bool `db:"vat_flux_checked"`
}
Describe("Create", func() {
It("persists vat flux records", func() {
anotherVatFlux := test_data.VatFluxModel
anotherVatFlux.TransactionIndex = test_data.VatFluxModel.TransactionIndex + 1
err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel, anotherVatFlux})
var dbResult []VatFluxDBResult
err = db.Select(&dbResult, `SELECT * from maker.vat_flux where header_id = $1`, headerId)
Expect(err).NotTo(HaveOccurred())
Expect(len(dbResult)).To(Equal(2))
Expect(dbResult[0].Ilk).To(Equal(test_data.VatFluxModel.Ilk))
Expect(dbResult[0].Dst).To(Equal(test_data.VatFluxModel.Dst))
Expect(dbResult[0].Src).To(Equal(test_data.VatFluxModel.Src))
Expect(dbResult[0].Rad).To(Equal(test_data.VatFluxModel.Rad))
Expect(dbResult[0].TransactionIndex).To(Equal(test_data.VatFluxModel.TransactionIndex))
Expect(dbResult[1].TransactionIndex).To(Equal(test_data.VatFluxModel.TransactionIndex + 1))
Expect(dbResult[0].Raw).To(MatchJSON(test_data.VatFluxModel.Raw))
Expect(dbResult[0].HeaderId).To(Equal(headerId))
})
It("returns an error if the insertion fails", func() {
err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel})
Expect(err).NotTo(HaveOccurred())
err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel})
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("pq: duplicate key value violates unique constraint"))
})
It("marks the header as checked for vat flux logs", func() {
err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel})
Expect(err).NotTo(HaveOccurred())
var headerChecked bool
err = db.Get(&headerChecked, `SELECT vat_flux_checked FROM public.checked_headers WHERE header_id = $1`, headerId)
Expect(err).NotTo(HaveOccurred())
Expect(headerChecked).To(BeTrue())
})
It("removes vat flux if corresponding header is deleted", func() {
err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel})
Expect(err).NotTo(HaveOccurred())
_, err = db.Exec(`DELETE FROM headers WHERE id = $1`, headerId)
Expect(err).NotTo(HaveOccurred())
var count int
err = db.QueryRow(`SELECT count(*) from maker.vat_flux`).Scan(&count)
Expect(err).NotTo(HaveOccurred())
Expect(count).To(Equal(0))
})
It("wraps create in a transaction", func() {
err = repository.Create(headerId, []vat_flux.VatFluxModel{test_data.VatFluxModel, test_data.VatFluxModel})
Expect(err).To(HaveOccurred())
var count int
err = repository.DB.QueryRowx(`SELECT count(*) FROM maker.vat_flux`).Scan(&count)
Expect(count).To(Equal(0))
})
})
Describe("MissingHeaders", func() {
It("returns headers that haven't been checked", func() {
startingBlock := GinkgoRandomSeed()
vatFluxBlock := startingBlock + 1
endingBlock := startingBlock + 2
outsideRangeBlock := startingBlock + 3
var headerIds []int64
blockNumbers := []int64{startingBlock, vatFluxBlock, endingBlock, outsideRangeBlock}
for _, n := range blockNumbers {
headerId, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n))
Expect(err).NotTo(HaveOccurred())
headerIds = append(headerIds, headerId)
}
err = repository.MarkCheckedHeader(headerIds[0])
Expect(err).NotTo(HaveOccurred())
headers, err := repository.MissingHeaders(startingBlock, endingBlock)
Expect(err).NotTo(HaveOccurred())
Expect(headers[0].Id).To(Or(Equal(headerIds[1]), Equal(headerIds[2])))
Expect(headers[1].Id).To(Or(Equal(headerIds[1]), Equal(headerIds[2])))
Expect(len(headers)).To(Equal(2))
})
It("returns header ids when checked_headers.vat_flux is false", func() {
startingBlock := GinkgoRandomSeed()
vatFluxBlock := startingBlock + 1
endingBlock := startingBlock + 2
outsideRangeBlock := startingBlock + 3
var headerIds []int64
blockNumbers := []int64{startingBlock, vatFluxBlock, endingBlock, outsideRangeBlock}
for _, n := range blockNumbers {
headerId, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n))
Expect(err).NotTo(HaveOccurred())
headerIds = append(headerIds, headerId)
}
err = repository.MarkCheckedHeader(headerIds[0])
_, err = repository.DB.Exec(`INSERT INTO checked_headers (header_id) VALUES ($1)`, headerIds[1])
Expect(err).NotTo(HaveOccurred())
headers, err := repository.MissingHeaders(startingBlock, endingBlock)
Expect(err).NotTo(HaveOccurred())
Expect(headers[0].Id).To(Or(Equal(headerIds[1]), Equal(headerIds[2])))
Expect(headers[1].Id).To(Or(Equal(headerIds[1]), Equal(headerIds[2])))
Expect(len(headers)).To(Equal(2))
})
It("only returns header ids for the current node", func() {
startingBlock := GinkgoRandomSeed()
vatFluxBlock := startingBlock + 1
endingBlock := startingBlock + 2
outsideRangeBlock := startingBlock + 3
db2 := test_config.NewTestDB(core.Node{ID: "second node"})
headerRepository2 := repositories.NewHeaderRepository(db2)
repository2 := vat_flux.NewVatFluxRepository(db2)
var headerIds []int64
blockNumbers := []int64{startingBlock, vatFluxBlock, endingBlock, outsideRangeBlock}
for _, n := range blockNumbers {
headerId, err := headerRepository.CreateOrUpdateHeader(fakes.GetFakeHeader(n))
Expect(err).NotTo(HaveOccurred())
headerIds = append(headerIds, headerId)
_, err = headerRepository2.CreateOrUpdateHeader(fakes.GetFakeHeader(n))
Expect(err).NotTo(HaveOccurred())
}
err = repository.MarkCheckedHeader(headerIds[0])
Expect(err).NotTo(HaveOccurred())
nodeOneMissingHeaders, err := repository.MissingHeaders(startingBlock, endingBlock)
Expect(err).NotTo(HaveOccurred())
Expect(len(nodeOneMissingHeaders)).To(Equal(2))
nodeTwoMissingHeaders, err := repository2.MissingHeaders(startingBlock, endingBlock)
Expect(err).NotTo(HaveOccurred())
Expect(len(nodeTwoMissingHeaders)).To(Equal(3))
})
})
Describe("MarkCheckedHeader", func() {
It("creates a new checked_header record", func() {
err := repository.MarkCheckedHeader(headerId)
Expect(err).NotTo(HaveOccurred())
var checkedHeaderResult = CheckedHeaderResult{}
err = db.Get(&checkedHeaderResult, `SELECT vat_flux_checked FROM checked_headers WHERE header_id = $1`, headerId)
Expect(err).NotTo(HaveOccurred())
Expect(checkedHeaderResult.VatFluxChecked).To(BeTrue())
})
It("updates an existing checked header", func() {
_, err := repository.DB.Exec(`INSERT INTO checked_headers (header_id) VALUES($1)`, headerId)
Expect(err).NotTo(HaveOccurred())
var checkedHeaderResult CheckedHeaderResult
err = db.Get(&checkedHeaderResult, `SELECT vat_flux_checked FROM checked_headers WHERE header_id = $1`, headerId)
Expect(err).NotTo(HaveOccurred())
Expect(checkedHeaderResult.VatFluxChecked).To(BeFalse())
err = repository.MarkCheckedHeader(headerId)
Expect(err).NotTo(HaveOccurred())
err = db.Get(&checkedHeaderResult, `SELECT vat_flux_checked FROM checked_headers WHERE header_id = $1`, headerId)
Expect(err).NotTo(HaveOccurred())
Expect(checkedHeaderResult.VatFluxChecked).To(BeTrue())
})
})
})

View File

@ -0,0 +1,63 @@
package vat_flux
import (
"github.com/ethereum/go-ethereum/common"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"log"
)
type VatFluxTransformerInitializer struct {
Config shared.TransformerConfig
}
func (initializer VatFluxTransformerInitializer) NewVatFluxTransformer(db *postgres.DB, blockChain core.BlockChain) shared.Transformer {
converter := VatFluxConverter{}
fetcher := shared.NewFetcher(blockChain)
repository := NewVatFluxRepository(db)
return VatFluxTransformer{
Config: initializer.Config,
Converter: converter,
Fetcher: fetcher,
Repository: repository,
}
}
type VatFluxTransformer struct {
Config shared.TransformerConfig
Converter Converter
Fetcher shared.LogFetcher
Repository Repository
}
func (transformer VatFluxTransformer) Execute() error {
missingHeaders, err := transformer.Repository.MissingHeaders(transformer.Config.StartingBlockNumber, transformer.Config.EndingBlockNumber)
if err != nil {
return err
}
log.Printf("Fetching vat flux event logs for %d headers \n", len(missingHeaders))
for _, header := range missingHeaders {
topics := [][]common.Hash{{common.HexToHash(shared.VatFluxSignature)}}
matchingLogs, err := transformer.Fetcher.FetchLogs(VatFluxConfig.ContractAddresses, topics, header.BlockNumber)
if err != nil {
return err
}
if len(matchingLogs) < 1 {
err = transformer.Repository.MarkCheckedHeader(header.Id)
if err != nil {
return err
}
continue
}
models, err := transformer.Converter.ToModels(matchingLogs)
if err != nil {
return err
}
err = transformer.Repository.Create(header.Id, models)
if err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,159 @@
package vat_flux_test
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/pkg/transformers/shared"
"github.com/vulcanize/vulcanizedb/pkg/transformers/test_data"
"github.com/vulcanize/vulcanizedb/pkg/transformers/test_data/mocks"
vat_flux_mocks "github.com/vulcanize/vulcanizedb/pkg/transformers/test_data/mocks/vat_flux"
"github.com/vulcanize/vulcanizedb/pkg/transformers/vat_flux"
)
type setupOptions struct {
setMissingHeadersError bool
setFetcherError bool
setConverterError bool
setCreateError bool
fetchedLogs []types.Log
missingHeaders []core.Header
}
var _ = Describe("Vat flux transformer", func() {
var (
config shared.TransformerConfig
converter *vat_flux_mocks.MockVatFlux
fetcher *mocks.MockLogFetcher
repository *vat_flux_mocks.MockVatFluxRepository
transformer vat_flux.VatFluxTransformer
)
BeforeEach(func() {
config = vat_flux.VatFluxConfig
converter = &vat_flux_mocks.MockVatFlux{}
fetcher = &mocks.MockLogFetcher{}
repository = &vat_flux_mocks.MockVatFluxRepository{}
transformer = vat_flux.VatFluxTransformer{
Config: config,
Converter: converter,
Fetcher: fetcher,
Repository: repository,
}
})
It("gets missing headers for block numbers specified in config", func() {
err := transformer.Execute()
Expect(err).NotTo(HaveOccurred())
Expect(repository.PassedStartingBlockNumber).To(Equal(vat_flux.VatFluxConfig.StartingBlockNumber))
Expect(repository.PassedEndingBlockNumber).To(Equal(vat_flux.VatFluxConfig.EndingBlockNumber))
})
It("returns error if repository returns error for missing headers", func() {
repository.SetMissingHeadersErr(fakes.FakeError)
err := transformer.Execute()
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})
It("marks the header as checked when there are no logs", func() {
header := core.Header{Id: GinkgoRandomSeed()}
repository.SetMissingHeaders([]core.Header{header})
err := transformer.Execute()
Expect(err).NotTo(HaveOccurred())
Expect(repository.MarkHeaderCheckedPassedHeaderID).To(Equal(header.Id))
})
It("fetches logs for missing headers", func() {
repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}, {BlockNumber: 2}})
err := transformer.Execute()
Expect(err).NotTo(HaveOccurred())
Expect(fetcher.FetchedBlocks).To(Equal([]int64{1, 2}))
Expect(fetcher.FetchedContractAddresses).To(Equal([][]string{vat_flux.VatFluxConfig.ContractAddresses, vat_flux.VatFluxConfig.ContractAddresses}))
Expect(fetcher.FetchedTopics).To(Equal([][]common.Hash{{common.HexToHash(shared.VatFluxSignature)}}))
})
It("returns error if fetcher returns error", func() {
fetcher.SetFetcherError(fakes.FakeError)
repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}})
err := transformer.Execute()
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})
It("returns error if marking header checked returns err", func() {
repository.SetMissingHeaders([]core.Header{{Id: int64(123)}})
repository.SetMarkHeaderCheckedErr(fakes.FakeError)
mockFetcher := &mocks.MockLogFetcher{}
transformer := vat_flux.VatFluxTransformer{
Converter: converter,
Fetcher: mockFetcher,
Repository: repository,
}
err := transformer.Execute()
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})
It("converts matching logs", func() {
fetcher.SetFetchedLogs([]types.Log{test_data.VatFluxLog})
repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}})
transformer := vat_flux.VatFluxTransformer{
Fetcher: fetcher,
Converter: converter,
Repository: repository,
}
err := transformer.Execute()
Expect(err).NotTo(HaveOccurred())
Expect(converter.PassedLogs).To(Equal([]types.Log{test_data.VatFluxLog}))
})
It("returns error if converter returns error", func() {
converter.SetConverterError(fakes.FakeError)
fetcher.SetFetchedLogs([]types.Log{test_data.VatFluxLog})
repository.SetMissingHeaders([]core.Header{{BlockNumber: 1}})
err := transformer.Execute()
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})
It("persists vat flux model", func() {
fetcher.SetFetchedLogs([]types.Log{test_data.VatFluxLog})
fakeHeader := core.Header{BlockNumber: 1, Id: 2}
repository.SetMissingHeaders([]core.Header{fakeHeader})
err := transformer.Execute()
Expect(err).NotTo(HaveOccurred())
Expect(repository.PassedHeaderID).To(Equal(fakeHeader.Id))
Expect(repository.PassedModels).To(Equal([]vat_flux.VatFluxModel{test_data.VatFluxModel}))
})
It("returns error if repository returns error for create", func() {
fetcher.SetFetchedLogs([]types.Log{test_data.VatFluxLog})
repository.SetMissingHeaders([]core.Header{{BlockNumber: 1, Id: 2}})
repository.SetCreateError(fakes.FakeError)
err := transformer.Execute()
Expect(err).To(HaveOccurred())
Expect(err).To(MatchError(fakes.FakeError))
})
})

View File

@ -0,0 +1,19 @@
package vat_flux_test
import (
"io/ioutil"
"log"
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestVatFlux(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "VatFlux Suite")
}
var _ = BeforeSuite(func() {
log.SetOutput(ioutil.Discard)
})

View File

@ -36,7 +36,7 @@ func (transformer VatGrabTransformer) Execute() error {
if err != nil { if err != nil {
return err return err
} }
log.Printf("Fetching vat init event logs for %d headers \n", len(missingHeaders)) log.Printf("Fetching vat grab event logs for %d headers \n", len(missingHeaders))
for _, header := range missingHeaders { for _, header := range missingHeaders {
topics := [][]common.Hash{{common.HexToHash(shared.VatGrabSignature)}} topics := [][]common.Hash{{common.HexToHash(shared.VatGrabSignature)}}
matchingLogs, err := transformer.Fetcher.FetchLogs(VatGrabConfig.ContractAddresses, topics, header.BlockNumber) matchingLogs, err := transformer.Fetcher.FetchLogs(VatGrabConfig.ContractAddresses, topics, header.BlockNumber)

View File

@ -103,6 +103,7 @@ func CleanTestDB(db *postgres.DB) {
db.MustExec("DELETE FROM maker.vat_fold") db.MustExec("DELETE FROM maker.vat_fold")
db.MustExec("DELETE FROM maker.vat_toll") db.MustExec("DELETE FROM maker.vat_toll")
db.MustExec("DELETE FROM maker.vat_tune") db.MustExec("DELETE FROM maker.vat_tune")
db.MustExec("DELETE FROM maker.vat_flux")
db.MustExec("DELETE FROM receipts") db.MustExec("DELETE FROM receipts")
db.MustExec("DELETE FROM transactions") db.MustExec("DELETE FROM transactions")
db.MustExec("DELETE FROM watched_contracts") db.MustExec("DELETE FROM watched_contracts")