fix for issue #146; mark header checked for contract if it doesnt have

any logs at that header but other contracts do; test
This commit is contained in:
Ian Norden 2019-10-08 11:48:15 -05:00
parent 3fff2896aa
commit 11b5efbfe3
4 changed files with 54 additions and 16 deletions

View File

@ -40,8 +40,8 @@ var _ = Describe("contractWatcher headerSync transformer", func() {
var blockChain core.BlockChain
var headerRepository repositories.HeaderRepository
var headerID int64
var ensAddr = strings.ToLower(constants.EnsContractAddress)
var tusdAddr = strings.ToLower(constants.TusdContractAddress)
var ensAddr = strings.ToLower(constants.EnsContractAddress) // 0x314159265dd8dbb310642f98f50c066173c1259b
var tusdAddr = strings.ToLower(constants.TusdContractAddress) // 0x8dd5fbce2f6a956c3022ba3663759011dd51e73e
BeforeEach(func() {
db, blockChain = test_helpers.SetupDBandBC()
@ -377,6 +377,42 @@ var _ = Describe("contractWatcher headerSync transformer", func() {
Expect(transferLog.Value).To(Equal("2800000000000000000000"))
})
It("Marks header checked for a contract that has no logs at that header", func() {
t := transformer.NewTransformer(test_helpers.ENSandTusdConfig, blockChain, db)
err = t.Init()
Expect(err).ToNot(HaveOccurred())
err = t.Execute()
Expect(err).ToNot(HaveOccurred())
Expect(t.Start).To(Equal(int64(6885702)))
newOwnerLog := test_helpers.HeaderSyncNewOwnerLog{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM header_%s.newowner_event", ensAddr)).StructScan(&newOwnerLog)
Expect(err).ToNot(HaveOccurred())
transferLog := test_helpers.HeaderSyncTransferLog{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM header_%s.transfer_event", tusdAddr)).StructScan(&transferLog)
Expect(err).ToNot(HaveOccurred())
Expect(transferLog.HeaderID).ToNot(Equal(newOwnerLog.HeaderID))
type checkedHeader struct {
ID int64 `db:"id"`
HeaderID int64 `db:"header_id"`
NewOwner int64 `db:"newowner_0x314159265dd8dbb310642f98f50c066173c1259b"`
Transfer int64 `db:"transfer_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e"`
}
transferCheckedHeader := new(checkedHeader)
err = db.QueryRowx("SELECT * FROM public.checked_headers WHERE header_id = $1", transferLog.HeaderID).StructScan(transferCheckedHeader)
Expect(err).ToNot(HaveOccurred())
Expect(transferCheckedHeader.Transfer).To(Equal(int64(1)))
Expect(transferCheckedHeader.NewOwner).To(Equal(int64(1)))
newOwnerCheckedHeader := new(checkedHeader)
err = db.QueryRowx("SELECT * FROM public.checked_headers WHERE header_id = $1", newOwnerLog.HeaderID).StructScan(newOwnerCheckedHeader)
Expect(err).ToNot(HaveOccurred())
Expect(newOwnerCheckedHeader.NewOwner).To(Equal(int64(1)))
Expect(newOwnerCheckedHeader.Transfer).To(Equal(int64(1)))
})
It("Keeps track of contract-related hashes and addresses while transforming event data if they need to be used for later method polling", func() {
var testConf config.ContractConfig
testConf = test_helpers.ENSandTusdConfig

View File

@ -20,7 +20,6 @@ import (
"fmt"
"github.com/hashicorp/golang-lru"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core"
@ -268,12 +267,3 @@ func continuousHeaders(headers []core.Header) []core.Header {
func (r *headerRepository) CheckCache(key string) (interface{}, bool) {
return r.columns.Get(key)
}
// Used to mark a header checked as part of some external transaction so as to group into one commit
func (r *headerRepository) MarkHeaderCheckedInTransaction(headerID int64, tx *sqlx.Tx, eventID string) error {
_, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+eventID+`)
VALUES ($1, $2)
ON CONFLICT (header_id) DO
UPDATE SET `+eventID+` = checked_headers.`+eventID+` + 1`, headerID, 1)
return err
}

View File

@ -249,6 +249,11 @@ func (tr *Transformer) Execute() error {
}
// Sort logs by the contract they belong to
// Start by adding every contract addr to the map
// So that if we don't have any logs for it, it is caught and the header is still marked checked for its events
for _, addr := range tr.contractAddresses {
sortedLogs[addr] = nil
}
for _, log := range allLogs {
addr := strings.ToLower(log.Address.Hex())
sortedLogs[addr] = append(sortedLogs[addr], log)
@ -256,12 +261,20 @@ func (tr *Transformer) Execute() error {
// Process logs for each contract
for conAddr, logs := range sortedLogs {
if logs == nil {
con := tr.Contracts[conAddr]
if len(logs) < 1 {
eventIds := make([]string, 0)
for eventName := range con.Events {
eventIds = append(eventIds, strings.ToLower(eventName+"_"+con.Address))
}
markCheckedErr := tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, eventIds)
if markCheckedErr != nil {
return fmt.Errorf("error marking header checked: %s", markCheckedErr.Error())
}
logrus.Tracef("no logs found for contract %s at block %d, continuing", conAddr, header.BlockNumber)
continue
}
// Configure converter with this contract
con := tr.Contracts[conAddr]
tr.Converter.Update(con)
// Convert logs into batches of log mappings (eventName => []types.Logs

View File

@ -294,8 +294,7 @@ func TearDown(db *postgres.DB) {
_, err = tx.Exec(`CREATE TABLE checked_headers (
id SERIAL PRIMARY KEY,
header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE,
check_count INTEGER NOT NULL DEFAULT 1);`)
header_id INTEGER UNIQUE NOT NULL REFERENCES headers (id) ON DELETE CASCADE);`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DROP SCHEMA IF EXISTS full_0x8dd5fbce2f6a956c3022ba3663759011dd51e73e CASCADE`)