PR#72 from public repo- https://github.com/vulcanize/vulcanizedb/pull/72- also needed to finish pluggin in ENS record transformer

This commit is contained in:
Ian Norden 2019-03-11 19:02:04 -05:00
parent 24879a85aa
commit 37e581c7ec
9 changed files with 165 additions and 117 deletions

View File

@ -20,7 +20,7 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
)
var _ = Describe("Omni full transformer", func() {
var _ = Describe("contractWatcher full transformer", func() {
var db *postgres.DB
var err error
var blockChain core.BlockChain

View File

@ -18,12 +18,12 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
)
var _ = Describe("Omnit light transformer", func() {
var _ = Describe("contractWatcher light transformer", func() {
var db *postgres.DB
var err error
var blockChain core.BlockChain
var headerRepository repositories.HeaderRepository
var headerID, headerID2 int64
var headerID int64
var ensAddr = strings.ToLower(constants.EnsContractAddress)
var tusdAddr = strings.ToLower(constants.TusdContractAddress)
@ -292,26 +292,12 @@ var _ = Describe("Omnit light transformer", func() {
Describe("Execute- against both ENS and TrueUSD", func() {
BeforeEach(func() {
header1, err := blockChain.GetHeaderByNumber(6791668)
for i := 6885692; i < 6885702; i++ {
header, err := blockChain.GetHeaderByNumber(int64(i))
Expect(err).ToNot(HaveOccurred())
header2, err := blockChain.GetHeaderByNumber(6791669)
_, err = headerRepository.CreateOrUpdateHeader(header)
Expect(err).ToNot(HaveOccurred())
header3, err := blockChain.GetHeaderByNumber(6791670)
Expect(err).ToNot(HaveOccurred())
header4, err := blockChain.GetHeaderByNumber(6885695)
Expect(err).ToNot(HaveOccurred())
header5, err := blockChain.GetHeaderByNumber(6885696)
Expect(err).ToNot(HaveOccurred())
header6, err := blockChain.GetHeaderByNumber(6885697)
Expect(err).ToNot(HaveOccurred())
headerRepository.CreateOrUpdateHeader(header1)
headerID, err = headerRepository.CreateOrUpdateHeader(header2)
Expect(err).ToNot(HaveOccurred())
headerRepository.CreateOrUpdateHeader(header3)
headerRepository.CreateOrUpdateHeader(header4)
headerID2, err = headerRepository.CreateOrUpdateHeader(header5)
Expect(err).ToNot(HaveOccurred())
headerRepository.CreateOrUpdateHeader(header6)
}
})
It("Transforms watched contract data into custom repositories", func() {
@ -325,7 +311,6 @@ var _ = Describe("Omnit light transformer", func() {
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", ensAddr)).StructScan(&newOwnerLog)
Expect(err).ToNot(HaveOccurred())
// We don't know vulcID, so compare individual fields instead of complete structures
Expect(newOwnerLog.HeaderID).To(Equal(headerID2))
Expect(newOwnerLog.Node).To(Equal("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae"))
Expect(newOwnerLog.Label).To(Equal("0x95832c7a47ff8a7840e28b78ce695797aaf402b1c186bad9eca28842625b5047"))
Expect(newOwnerLog.Owner).To(Equal("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef"))
@ -334,10 +319,9 @@ var _ = Describe("Omnit light transformer", func() {
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.transfer_event", tusdAddr)).StructScan(&transferLog)
Expect(err).ToNot(HaveOccurred())
// We don't know vulcID, so compare individual fields instead of complete structures
Expect(transferLog.HeaderID).To(Equal(headerID))
Expect(transferLog.From).To(Equal("0x1062a747393198f70F71ec65A582423Dba7E5Ab3"))
Expect(transferLog.To).To(Equal("0x2930096dB16b4A44Ecd4084EA4bd26F7EeF1AEf0"))
Expect(transferLog.Value).To(Equal("9998940000000000000000"))
Expect(transferLog.From).To(Equal("0x8cA465764873E71CEa525F5EB6AE973d650c22C2"))
Expect(transferLog.To).To(Equal("0xc338482360651E5D30BEd77b7c85358cbBFB2E0e"))
Expect(transferLog.Value).To(Equal("2800000000000000000000"))
})
It("Keeps track of contract-related hashes and addresses while transforming event data if they need to be used for later method polling", func() {
@ -358,7 +342,7 @@ var _ = Describe("Omnit light transformer", func() {
Expect(err).ToNot(HaveOccurred())
Expect(len(ens.EmittedHashes)).To(Equal(2))
Expect(len(ens.EmittedAddrs)).To(Equal(0))
Expect(len(tusd.EmittedAddrs)).To(Equal(4))
Expect(len(tusd.EmittedAddrs)).To(Equal(2))
Expect(len(tusd.EmittedHashes)).To(Equal(0))
b, ok := ens.EmittedHashes[common.HexToHash("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae")]
@ -369,21 +353,16 @@ var _ = Describe("Omnit light transformer", func() {
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = tusd.EmittedAddrs[common.HexToAddress("0x1062a747393198f70F71ec65A582423Dba7E5Ab3")]
b, ok = tusd.EmittedAddrs[common.HexToAddress("0x8cA465764873E71CEa525F5EB6AE973d650c22C2")]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = tusd.EmittedAddrs[common.HexToAddress("0x2930096dB16b4A44Ecd4084EA4bd26F7EeF1AEf0")]
b, ok = tusd.EmittedAddrs[common.HexToAddress("0xc338482360651E5D30BEd77b7c85358cbBFB2E0e")]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = tusd.EmittedAddrs[common.HexToAddress("0x571A326f5B15E16917dC17761c340c1ec5d06f6d")]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = tusd.EmittedAddrs[common.HexToAddress("0xFBb1b73C4f0BDa4f67dcA266ce6Ef42f520fBB98")]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
_, ok = tusd.EmittedAddrs[common.HexToAddress("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef")]
Expect(ok).To(Equal(false))
})
It("Polls given methods for each contract, using list of collected values", func() {
@ -414,12 +393,12 @@ var _ = Describe("Omnit light transformer", func() {
Expect(err).To(HaveOccurred())
bal := test_helpers.BalanceOf{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x1062a747393198f70F71ec65A582423Dba7E5Ab3' AND block = '6791669'", tusdAddr)).StructScan(&bal)
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x8cA465764873E71CEa525F5EB6AE973d650c22C2' AND block = '6885701'", tusdAddr)).StructScan(&bal)
Expect(err).ToNot(HaveOccurred())
Expect(bal.Balance).To(Equal("55849938025000000000000"))
Expect(bal.Balance).To(Equal("1954436000000000000000"))
Expect(bal.TokenName).To(Equal("TrueUSD"))
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843b1234567890' AND block = '6791669'", tusdAddr)).StructScan(&bal)
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843b1234567890' AND block = '6885701'", tusdAddr)).StructScan(&bal)
Expect(err).To(HaveOccurred())
})
})

View File

@ -53,6 +53,7 @@ func NewHeaderRepository(db *postgres.DB) *headerRepository {
}
}
// Adds a checked_header column for the provided column id
func (r *headerRepository) AddCheckColumn(id string) error {
// Check cache to see if column already exists before querying pg
_, ok := r.columns.Get(id)
@ -73,6 +74,7 @@ func (r *headerRepository) AddCheckColumn(id string) error {
return nil
}
// Adds a checked_header column for all of the provided column ids
func (r *headerRepository) AddCheckColumns(ids []string) error {
var err error
baseQuery := "ALTER TABLE public.checked_headers"
@ -96,6 +98,7 @@ func (r *headerRepository) AddCheckColumns(ids []string) error {
return err
}
// Marks the header checked for the provided column id
func (r *headerRepository) MarkHeaderChecked(headerID int64, id string) error {
_, err := r.db.Exec(`INSERT INTO public.checked_headers (header_id, `+id+`)
VALUES ($1, $2)
@ -105,6 +108,7 @@ func (r *headerRepository) MarkHeaderChecked(headerID int64, id string) error {
return err
}
// Marks the header checked for all of the provided column ids
func (r *headerRepository) MarkHeaderCheckedForAll(headerID int64, ids []string) error {
pgStr := "INSERT INTO public.checked_headers (header_id, "
for _, id := range ids {
@ -124,6 +128,7 @@ func (r *headerRepository) MarkHeaderCheckedForAll(headerID int64, ids []string)
return err
}
// Marks all of the provided headers checked for each of the provided column ids
func (r *headerRepository) MarkHeadersCheckedForAll(headers []core.Header, ids []string) error {
tx, err := r.db.Begin()
if err != nil {
@ -154,6 +159,7 @@ func (r *headerRepository) MarkHeadersCheckedForAll(headers []core.Header, ids [
return tx.Commit()
}
// Returns missing headers for the provided checked_headers column id
func (r *headerRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64, id string) ([]core.Header, error) {
var result []core.Header
var query string
@ -165,7 +171,7 @@ func (r *headerRepository) MissingHeaders(startingBlockNumber, endingBlockNumber
WHERE (header_id ISNULL OR checked_headers.` + id + `=0)
AND headers.block_number >= $1
AND headers.eth_node_fingerprint = $2
ORDER BY headers.block_number`
ORDER BY headers.block_number LIMIT 100`
err = r.db.Select(&result, query, startingBlockNumber, r.db.Node.ID)
} else {
query = `SELECT headers.id, headers.block_number, headers.hash FROM headers
@ -174,13 +180,14 @@ func (r *headerRepository) MissingHeaders(startingBlockNumber, endingBlockNumber
AND headers.block_number >= $1
AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $3
ORDER BY headers.block_number`
ORDER BY headers.block_number LIMIT 100`
err = r.db.Select(&result, query, startingBlockNumber, endingBlockNumber, r.db.Node.ID)
}
return result, err
return contiguousHeaders(result, startingBlockNumber), err
}
// Returns missing headers for all of the provided checked_headers column ids
func (r *headerRepository) MissingHeadersForAll(startingBlockNumber, endingBlockNumber int64, ids []string) ([]core.Header, error) {
var result []core.Header
var query string
@ -196,21 +203,42 @@ func (r *headerRepository) MissingHeadersForAll(startingBlockNumber, endingBlock
if endingBlockNumber == -1 {
endStr := `) AND headers.block_number >= $1
AND headers.eth_node_fingerprint = $2
ORDER BY headers.block_number`
ORDER BY headers.block_number LIMIT 100`
query = baseQuery + endStr
err = r.db.Select(&result, query, startingBlockNumber, r.db.Node.ID)
} else {
endStr := `) AND headers.block_number >= $1
AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $3
ORDER BY headers.block_number`
ORDER BY headers.block_number LIMIT 100`
query = baseQuery + endStr
err = r.db.Select(&result, query, startingBlockNumber, endingBlockNumber, r.db.Node.ID)
}
return result, err
return contiguousHeaders(result, startingBlockNumber), err
}
// Takes in an ordered sequence of headers and returns only the first contiguous segment
// Enforce continuity with previous segment with the appropriate startingBlockNumber
func contiguousHeaders(headers []core.Header, startingBlockNumber int64) []core.Header {
if len(headers) < 1 {
return headers
}
previousHeader := headers[0].BlockNumber
if previousHeader != startingBlockNumber {
return []core.Header{}
}
for i := 1; i < len(headers); i++ {
previousHeader++
if headers[i].BlockNumber != previousHeader {
return headers[:i]
}
}
return headers
}
// Returns headers that have been checked for all of the provided event ids but not for the provided method ids
func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlockNumber, endingBlockNumber int64, methodIds, eventIds []string) ([]core.Header, error) {
var result []core.Header
var query string
@ -231,14 +259,14 @@ func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlock
if endingBlockNumber == -1 {
endStr := `AND headers.block_number >= $1
AND headers.eth_node_fingerprint = $2
ORDER BY headers.block_number`
ORDER BY headers.block_number LIMIT 100`
query = baseQuery + endStr
err = r.db.Select(&result, query, startingBlockNumber, r.db.Node.ID)
} else {
endStr := `AND headers.block_number >= $1
AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $3
ORDER BY headers.block_number`
ORDER BY headers.block_number LIMIT 100`
query = baseQuery + endStr
err = r.db.Select(&result, query, startingBlockNumber, endingBlockNumber, r.db.Node.ID)
}
@ -246,10 +274,12 @@ func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlock
return result, err
}
// Check the repositories column id cache for a value
func (r *headerRepository) CheckCache(key string) (interface{}, bool) {
return r.columns.Get(key)
}
// Used to mark a header checked as part of some external transaction so as to group into one commit
func MarkHeaderCheckedInTransaction(headerID int64, tx *sql.Tx, eventID string) error {
_, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+eventID+`)
VALUES ($1, $2)

View File

@ -32,6 +32,7 @@ import (
var _ = Describe("Repository", func() {
var db *postgres.DB
var bc core.BlockChain
var omniHeaderRepo repository.HeaderRepository // omni/light header repository
var coreHeaderRepo repositories.HeaderRepository // pkg/datastore header repository
var eventIDs = []string{
@ -46,7 +47,7 @@ var _ = Describe("Repository", func() {
}
BeforeEach(func() {
db, _ = test_helpers.SetupDBandBC()
db, bc = test_helpers.SetupDBandBC()
omniHeaderRepo = repository.NewHeaderRepository(db)
coreHeaderRepo = repositories.NewHeaderRepository(db)
})
@ -120,7 +121,7 @@ var _ = Describe("Repository", func() {
err := omniHeaderRepo.AddCheckColumn(eventIDs[0])
Expect(err).ToNot(HaveOccurred())
missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0])
missingHeaders, err := omniHeaderRepo.MissingHeaders(6194632, 6194635, eventIDs[0])
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3))
})
@ -130,7 +131,7 @@ var _ = Describe("Repository", func() {
err := omniHeaderRepo.AddCheckColumn(eventIDs[0])
Expect(err).ToNot(HaveOccurred())
missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0])
missingHeaders, err := omniHeaderRepo.MissingHeaders(6194632, 6194635, eventIDs[0])
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3))
@ -142,12 +143,24 @@ var _ = Describe("Repository", func() {
Expect(h3.BlockNumber).To(Equal(int64(6194634)))
})
It("Returns only contiguous chunks of headers", func() {
addDiscontinuousHeaders(coreHeaderRepo)
err := omniHeaderRepo.AddCheckColumns(eventIDs)
Expect(err).ToNot(HaveOccurred())
missingHeaders, err := omniHeaderRepo.MissingHeaders(6194632, 6194635, eventIDs[0])
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(2))
Expect(missingHeaders[0].BlockNumber).To(Equal(int64(6194632)))
Expect(missingHeaders[1].BlockNumber).To(Equal(int64(6194633)))
})
It("Fails if eventID does not yet exist in check_headers table", func() {
addHeaders(coreHeaderRepo)
err := omniHeaderRepo.AddCheckColumn(eventIDs[0])
Expect(err).ToNot(HaveOccurred())
_, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, "notEventId")
_, err = omniHeaderRepo.MissingHeaders(6194632, 6194635, "notEventId")
Expect(err).To(HaveOccurred())
})
})
@ -158,14 +171,14 @@ var _ = Describe("Repository", func() {
err := omniHeaderRepo.AddCheckColumns(eventIDs)
Expect(err).ToNot(HaveOccurred())
missingHeaders, err := omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, eventIDs)
missingHeaders, err := omniHeaderRepo.MissingHeadersForAll(6194632, 6194635, eventIDs)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3))
err = omniHeaderRepo.MarkHeaderChecked(missingHeaders[0].Id, eventIDs[0])
Expect(err).ToNot(HaveOccurred())
missingHeaders, err = omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, eventIDs)
missingHeaders, err = omniHeaderRepo.MissingHeadersForAll(6194632, 6194635, eventIDs)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3))
@ -174,18 +187,42 @@ var _ = Describe("Repository", func() {
err = omniHeaderRepo.MarkHeaderChecked(missingHeaders[0].Id, eventIDs[2])
Expect(err).ToNot(HaveOccurred())
missingHeaders, err = omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, eventIDs)
missingHeaders, err = omniHeaderRepo.MissingHeadersForAll(6194633, 6194635, eventIDs)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(2))
})
It("Returns only contiguous chunks of headers", func() {
addDiscontinuousHeaders(coreHeaderRepo)
err := omniHeaderRepo.AddCheckColumns(eventIDs)
Expect(err).ToNot(HaveOccurred())
missingHeaders, err := omniHeaderRepo.MissingHeadersForAll(6194632, 6194635, eventIDs)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(2))
Expect(missingHeaders[0].BlockNumber).To(Equal(int64(6194632)))
Expect(missingHeaders[1].BlockNumber).To(Equal(int64(6194633)))
})
It("Returns at most 100 headers", func() {
add102Headers(coreHeaderRepo, bc)
err := omniHeaderRepo.AddCheckColumns(eventIDs)
Expect(err).ToNot(HaveOccurred())
missingHeaders, err := omniHeaderRepo.MissingHeadersForAll(6194632, 6194733, eventIDs)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(100))
Expect(missingHeaders[0].BlockNumber).To(Equal(int64(6194632)))
Expect(missingHeaders[1].BlockNumber).To(Equal(int64(6194633)))
})
It("Fails if one of the eventIDs does not yet exist in check_headers table", func() {
addHeaders(coreHeaderRepo)
err := omniHeaderRepo.AddCheckColumns(eventIDs)
Expect(err).ToNot(HaveOccurred())
badEventIDs := append(eventIDs, "notEventId")
_, err = omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, badEventIDs)
_, err = omniHeaderRepo.MissingHeadersForAll(6194632, 6194635, badEventIDs)
Expect(err).To(HaveOccurred())
})
})
@ -196,7 +233,7 @@ var _ = Describe("Repository", func() {
err := omniHeaderRepo.AddCheckColumn(eventIDs[0])
Expect(err).ToNot(HaveOccurred())
missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0])
missingHeaders, err := omniHeaderRepo.MissingHeaders(6194632, 6194635, eventIDs[0])
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3))
@ -204,7 +241,7 @@ var _ = Describe("Repository", func() {
err = omniHeaderRepo.MarkHeaderChecked(headerID, eventIDs[0])
Expect(err).ToNot(HaveOccurred())
missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0])
missingHeaders, err = omniHeaderRepo.MissingHeaders(6194633, 6194635, eventIDs[0])
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(2))
})
@ -214,7 +251,7 @@ var _ = Describe("Repository", func() {
err := omniHeaderRepo.AddCheckColumn(eventIDs[0])
Expect(err).ToNot(HaveOccurred())
missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0])
missingHeaders, err := omniHeaderRepo.MissingHeaders(6194632, 6194635, eventIDs[0])
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3))
@ -222,7 +259,7 @@ var _ = Describe("Repository", func() {
err = omniHeaderRepo.MarkHeaderChecked(headerID, "notEventId")
Expect(err).To(HaveOccurred())
missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0])
missingHeaders, err = omniHeaderRepo.MissingHeaders(6194632, 6194635, eventIDs[0])
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3))
})
@ -234,7 +271,7 @@ var _ = Describe("Repository", func() {
err := omniHeaderRepo.AddCheckColumns(eventIDs)
Expect(err).ToNot(HaveOccurred())
missingHeaders, err := omniHeaderRepo.MissingHeadersForAll(6194630, 6194635, eventIDs)
missingHeaders, err := omniHeaderRepo.MissingHeadersForAll(6194632, 6194635, eventIDs)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3))
@ -242,7 +279,7 @@ var _ = Describe("Repository", func() {
err = omniHeaderRepo.MarkHeaderCheckedForAll(headerID, eventIDs)
Expect(err).ToNot(HaveOccurred())
missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0])
missingHeaders, err = omniHeaderRepo.MissingHeaders(6194633, 6194635, eventIDs[0])
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(2))
})
@ -261,7 +298,7 @@ var _ = Describe("Repository", func() {
for _, id := range methodIDs {
err := omniHeaderRepo.AddCheckColumn(id)
Expect(err).ToNot(HaveOccurred())
missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, id)
missingHeaders, err = omniHeaderRepo.MissingHeaders(6194632, 6194635, id)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3))
}
@ -269,7 +306,7 @@ var _ = Describe("Repository", func() {
err := omniHeaderRepo.MarkHeadersCheckedForAll(missingHeaders, methodIDs)
Expect(err).ToNot(HaveOccurred())
for _, id := range methodIDs {
missingHeaders, err = omniHeaderRepo.MissingHeaders(6194630, 6194635, id)
missingHeaders, err = omniHeaderRepo.MissingHeaders(6194632, 6194635, id)
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(0))
}
@ -286,7 +323,7 @@ var _ = Describe("Repository", func() {
Expect(err).ToNot(HaveOccurred())
}
missingHeaders, err := omniHeaderRepo.MissingHeaders(6194630, 6194635, eventIDs[0])
missingHeaders, err := omniHeaderRepo.MissingHeaders(6194632, 6194635, eventIDs[0])
Expect(err).ToNot(HaveOccurred())
Expect(len(missingHeaders)).To(Equal(3))
@ -301,7 +338,7 @@ var _ = Describe("Repository", func() {
Expect(err).ToNot(HaveOccurred())
}
intersectionHeaders, err := omniHeaderRepo.MissingMethodsCheckedEventsIntersection(6194630, 6194635, methodIDs, eventIDs)
intersectionHeaders, err := omniHeaderRepo.MissingMethodsCheckedEventsIntersection(6194632, 6194635, methodIDs, eventIDs)
Expect(err).ToNot(HaveOccurred())
Expect(len(intersectionHeaders)).To(Equal(1))
Expect(intersectionHeaders[0].Id).To(Equal(headerID2))
@ -315,3 +352,18 @@ func addHeaders(coreHeaderRepo repositories.HeaderRepository) {
coreHeaderRepo.CreateOrUpdateHeader(mocks.MockHeader2)
coreHeaderRepo.CreateOrUpdateHeader(mocks.MockHeader3)
}
func addDiscontinuousHeaders(coreHeaderRepo repositories.HeaderRepository) {
coreHeaderRepo.CreateOrUpdateHeader(mocks.MockHeader1)
coreHeaderRepo.CreateOrUpdateHeader(mocks.MockHeader2)
coreHeaderRepo.CreateOrUpdateHeader(mocks.MockHeader4)
}
func add102Headers(coreHeaderRepo repositories.HeaderRepository, blockChain core.BlockChain) {
for i := 6194632; i < 6194733; i++ {
header, err := blockChain.GetHeaderByNumber(int64(i))
Expect(err).ToNot(HaveOccurred())
_, err = coreHeaderRepo.CreateOrUpdateHeader(header)
Expect(err).ToNot(HaveOccurred())
}
}

View File

@ -285,6 +285,8 @@ func (tr *transformer) Execute() error {
if err != nil {
return err
}
tr.start = header.BlockNumber + 1
}
return nil

View File

@ -39,7 +39,7 @@ var _ = Describe("Transformer", func() {
var err error
var blockChain core.BlockChain
var headerRepository repositories.HeaderRepository
var headerID, headerID2 int64
var headerID int64
var ensAddr = strings.ToLower(constants.EnsContractAddress)
var tusdAddr = strings.ToLower(constants.TusdContractAddress)
@ -328,26 +328,12 @@ var _ = Describe("Transformer", func() {
Describe("Execute- against both ENS and TrueUSD", func() {
BeforeEach(func() {
header1, err := blockChain.GetHeaderByNumber(6791668)
for i := 6885692; i < 6885702; i++ {
header, err := blockChain.GetHeaderByNumber(int64(i))
Expect(err).ToNot(HaveOccurred())
header2, err := blockChain.GetHeaderByNumber(6791669)
_, err = headerRepository.CreateOrUpdateHeader(header)
Expect(err).ToNot(HaveOccurred())
header3, err := blockChain.GetHeaderByNumber(6791670)
Expect(err).ToNot(HaveOccurred())
header4, err := blockChain.GetHeaderByNumber(6885695)
Expect(err).ToNot(HaveOccurred())
header5, err := blockChain.GetHeaderByNumber(6885696)
Expect(err).ToNot(HaveOccurred())
header6, err := blockChain.GetHeaderByNumber(6885697)
Expect(err).ToNot(HaveOccurred())
headerRepository.CreateOrUpdateHeader(header1)
headerID, err = headerRepository.CreateOrUpdateHeader(header2)
Expect(err).ToNot(HaveOccurred())
headerRepository.CreateOrUpdateHeader(header3)
headerRepository.CreateOrUpdateHeader(header4)
headerID2, err = headerRepository.CreateOrUpdateHeader(header5)
Expect(err).ToNot(HaveOccurred())
headerRepository.CreateOrUpdateHeader(header6)
}
})
It("Transforms watched contract data into custom repositories", func() {
@ -361,7 +347,6 @@ var _ = Describe("Transformer", func() {
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.newowner_event", ensAddr)).StructScan(&newOwnerLog)
Expect(err).ToNot(HaveOccurred())
// We don't know vulcID, so compare individual fields instead of complete structures
Expect(newOwnerLog.HeaderID).To(Equal(headerID2))
Expect(newOwnerLog.Node).To(Equal("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae"))
Expect(newOwnerLog.Label).To(Equal("0x95832c7a47ff8a7840e28b78ce695797aaf402b1c186bad9eca28842625b5047"))
Expect(newOwnerLog.Owner).To(Equal("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef"))
@ -370,10 +355,9 @@ var _ = Describe("Transformer", func() {
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.transfer_event", tusdAddr)).StructScan(&transferLog)
Expect(err).ToNot(HaveOccurred())
// We don't know vulcID, so compare individual fields instead of complete structures
Expect(transferLog.HeaderID).To(Equal(headerID))
Expect(transferLog.From).To(Equal("0x1062a747393198f70F71ec65A582423Dba7E5Ab3"))
Expect(transferLog.To).To(Equal("0x2930096dB16b4A44Ecd4084EA4bd26F7EeF1AEf0"))
Expect(transferLog.Value).To(Equal("9998940000000000000000"))
Expect(transferLog.From).To(Equal("0x8cA465764873E71CEa525F5EB6AE973d650c22C2"))
Expect(transferLog.To).To(Equal("0xc338482360651E5D30BEd77b7c85358cbBFB2E0e"))
Expect(transferLog.Value).To(Equal("2800000000000000000000"))
})
It("Keeps track of contract-related hashes and addresses while transforming event data if they need to be used for later method polling", func() {
@ -394,7 +378,7 @@ var _ = Describe("Transformer", func() {
Expect(err).ToNot(HaveOccurred())
Expect(len(ens.EmittedHashes)).To(Equal(2))
Expect(len(ens.EmittedAddrs)).To(Equal(0))
Expect(len(tusd.EmittedAddrs)).To(Equal(4))
Expect(len(tusd.EmittedAddrs)).To(Equal(2))
Expect(len(tusd.EmittedHashes)).To(Equal(0))
b, ok := ens.EmittedHashes[common.HexToHash("0x93cdeb708b7545dc668eb9280176169d1c33cfd8ed6f04690a0bcc88a93fc4ae")]
@ -405,21 +389,16 @@ var _ = Describe("Transformer", func() {
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = tusd.EmittedAddrs[common.HexToAddress("0x1062a747393198f70F71ec65A582423Dba7E5Ab3")]
b, ok = tusd.EmittedAddrs[common.HexToAddress("0x8cA465764873E71CEa525F5EB6AE973d650c22C2")]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = tusd.EmittedAddrs[common.HexToAddress("0x2930096dB16b4A44Ecd4084EA4bd26F7EeF1AEf0")]
b, ok = tusd.EmittedAddrs[common.HexToAddress("0xc338482360651E5D30BEd77b7c85358cbBFB2E0e")]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = tusd.EmittedAddrs[common.HexToAddress("0x571A326f5B15E16917dC17761c340c1ec5d06f6d")]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
b, ok = tusd.EmittedAddrs[common.HexToAddress("0xFBb1b73C4f0BDa4f67dcA266ce6Ef42f520fBB98")]
Expect(ok).To(Equal(true))
Expect(b).To(Equal(true))
_, ok = tusd.EmittedAddrs[common.HexToAddress("0x6090A6e47849629b7245Dfa1Ca21D94cd15878Ef")]
Expect(ok).To(Equal(false))
})
It("Polls given methods for each contract, using list of collected values", func() {
@ -450,12 +429,12 @@ var _ = Describe("Transformer", func() {
Expect(err).To(HaveOccurred())
bal := test_helpers.BalanceOf{}
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x1062a747393198f70F71ec65A582423Dba7E5Ab3' AND block = '6791669'", tusdAddr)).StructScan(&bal)
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x8cA465764873E71CEa525F5EB6AE973d650c22C2' AND block = '6885701'", tusdAddr)).StructScan(&bal)
Expect(err).ToNot(HaveOccurred())
Expect(bal.Balance).To(Equal("55849938025000000000000"))
Expect(bal.Balance).To(Equal("1954436000000000000000"))
Expect(bal.TokenName).To(Equal("TrueUSD"))
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843b1234567890' AND block = '6791669'", tusdAddr)).StructScan(&bal)
err = db.QueryRowx(fmt.Sprintf("SELECT * FROM light_%s.balanceof_method WHERE who_ = '0x09BbBBE21a5975cAc061D82f7b843b1234567890' AND block = '6885701'", tusdAddr)).StructScan(&bal)
Expect(err).To(HaveOccurred())
})
})

View File

@ -183,6 +183,13 @@ var MockHeader3 = core.Header{
Timestamp: "50000030",
}
var MockHeader4 = core.Header{
Hash: "0x135391a0962a63944e5908e6fedfff90fb4be3e3290a21017861099bad234hfs",
BlockNumber: 6194635,
Raw: rawFakeHeader,
Timestamp: "50000030",
}
var MockTransferLog1 = types.Log{
Index: 1,
Address: common.HexToAddress(constants.TusdContractAddress),

View File

@ -54,7 +54,9 @@ func NewPoller(blockChain core.BlockChain, db *postgres.DB, mode types.Mode) *po
func (p *poller) PollContract(con contract.Contract) error {
for i := con.StartingBlock; i <= con.LastBlock; i++ {
p.PollContractAt(con, i)
if err := p.PollContractAt(con, i); err != nil {
return err
}
}
return nil
@ -98,7 +100,6 @@ func (p *poller) pollNoArgAt(m types.Method, bn int64) error {
if err != nil {
return errors.New(fmt.Sprintf("poller error calling 0 argument method\r\nblock: %d, method: %s, contract: %s\r\nerr: %v", bn, m.Name, p.contract.Address, err))
}
strOut, err := stringify(out)
if err != nil {
return err
@ -138,8 +139,8 @@ func (p *poller) pollSingleArgAt(m types.Method, bn int64) error {
if len(args) == 0 { // If we haven't collected any args by now we can't call the method
return nil
}
results := make([]types.Result, 0, len(args))
results := make([]types.Result, 0, len(args))
for arg := range args {
in := []interface{}{arg}
strIn := []interface{}{contract.StringifyArg(arg)}
@ -160,7 +161,6 @@ func (p *poller) pollSingleArgAt(m types.Method, bn int64) error {
result.Output = strOut
results = append(results, result)
}
// Persist result set as batch
err := p.PersistResults(results, m, p.contract.Address, p.contract.Name)
if err != nil {
@ -204,7 +204,6 @@ func (p *poller) pollDoubleArgAt(m types.Method, bn int64) error {
}
results := make([]types.Result, 0, len(firstArgs)*len(secondArgs))
for arg1 := range firstArgs {
for arg2 := range secondArgs {
in := []interface{}{arg1, arg2}
@ -215,18 +214,15 @@ func (p *poller) pollDoubleArgAt(m types.Method, bn int64) error {
if err != nil {
return errors.New(fmt.Sprintf("poller error calling 2 argument method\r\nblock: %d, method: %s, contract: %s\r\nerr: %v", bn, m.Name, p.contract.Address, err))
}
strOut, err := stringify(out)
if err != nil {
return err
}
p.cache(out)
result.Output = strOut
result.Inputs = strIn
results = append(results, result)
}
}
@ -243,9 +239,8 @@ func (p *poller) FetchContractData(contractAbi, contractAddress, method string,
return p.bc.FetchContractData(contractAbi, contractAddress, method, methodArgs, result, blockNumber)
}
// This is used to cache an method return value if method piping is turned on
// This is used to cache a method return value if method piping is turned on
func (p *poller) cache(out interface{}) {
// Cache returned value if piping is turned on
if p.contract.Piping {
switch out.(type) {
case common.Hash:

View File

@ -43,7 +43,11 @@ func (blockChain *BlockChain) FetchContractData(abiJSON string, address string,
if err != nil {
return err
}
output, err := blockChain.callContract(address, input, big.NewInt(blockNumber))
var bn *big.Int
if blockNumber > 0 {
bn = big.NewInt(blockNumber)
}
output, err := blockChain.callContract(address, input, bn)
if err != nil {
return err
}