ipld-eth-server/pkg/contract_watcher/header/repository/header_repository.go
Rob Mulholand eea0b0bdca Return missing headers after starting block number
- Previously, we required that the first missing header match the
  configured starting block number. This helps guarantee that we have
  the necessary data for method polling in memory, but prevents the
  process from moving forward if restarted after the starting block
  has already been checked.
2019-09-23 16:13:45 -05:00

275 lines
9.1 KiB
Go

// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repository
import (
"fmt"
"github.com/hashicorp/golang-lru"
"github.com/jmoiron/sqlx"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
const columnCacheSize = 1000
type HeaderRepository interface {
AddCheckColumn(id string) error
AddCheckColumns(ids []string) error
MarkHeaderChecked(headerID int64, eventID string) error
MarkHeaderCheckedForAll(headerID int64, ids []string) error
MarkHeadersCheckedForAll(headers []core.Header, ids []string) error
MissingHeaders(startingBlockNumber int64, endingBlockNumber int64, eventID string) ([]core.Header, error)
MissingMethodsCheckedEventsIntersection(startingBlockNumber, endingBlockNumber int64, methodIds, eventIds []string) ([]core.Header, error)
MissingHeadersForAll(startingBlockNumber, endingBlockNumber int64, ids []string) ([]core.Header, error)
CheckCache(key string) (interface{}, bool)
}
type headerRepository struct {
db *postgres.DB
columns *lru.Cache // Cache created columns to minimize db connections
}
func NewHeaderRepository(db *postgres.DB) *headerRepository {
ccs, _ := lru.New(columnCacheSize)
return &headerRepository{
db: db,
columns: ccs,
}
}
// Adds a checked_header column for the provided column id
func (r *headerRepository) AddCheckColumn(id string) error {
// Check cache to see if column already exists before querying pg
_, ok := r.columns.Get(id)
if ok {
return nil
}
pgStr := "ALTER TABLE public.checked_headers ADD COLUMN IF NOT EXISTS "
pgStr = pgStr + id + " INTEGER NOT NULL DEFAULT 0"
_, err := r.db.Exec(pgStr)
if err != nil {
return err
}
// Add column name to cache
r.columns.Add(id, true)
return nil
}
// Adds a checked_header column for all of the provided column ids
func (r *headerRepository) AddCheckColumns(ids []string) error {
var err error
baseQuery := "ALTER TABLE public.checked_headers"
input := make([]string, 0, len(ids))
for _, id := range ids {
_, ok := r.columns.Get(id)
if !ok {
baseQuery += " ADD COLUMN IF NOT EXISTS " + id + " INTEGER NOT NULL DEFAULT 0,"
input = append(input, id)
}
}
if len(input) > 0 {
_, err = r.db.Exec(baseQuery[:len(baseQuery)-1])
if err == nil {
for _, id := range input {
r.columns.Add(id, true)
}
}
}
return err
}
// Marks the header checked for the provided column id
func (r *headerRepository) MarkHeaderChecked(headerID int64, id string) error {
_, err := r.db.Exec(`INSERT INTO public.checked_headers (header_id, `+id+`)
VALUES ($1, $2)
ON CONFLICT (header_id) DO
UPDATE SET `+id+` = checked_headers.`+id+` + 1`, headerID, 1)
return err
}
// Marks the header checked for all of the provided column ids
func (r *headerRepository) MarkHeaderCheckedForAll(headerID int64, ids []string) error {
pgStr := "INSERT INTO public.checked_headers (header_id, "
for _, id := range ids {
pgStr += id + ", "
}
pgStr = pgStr[:len(pgStr)-2] + ") VALUES ($1, "
for i := 0; i < len(ids); i++ {
pgStr += "1, "
}
pgStr = pgStr[:len(pgStr)-2] + ") ON CONFLICT (header_id) DO UPDATE SET "
for _, id := range ids {
pgStr += id + `= checked_headers.` + id + ` + 1, `
}
pgStr = pgStr[:len(pgStr)-2]
_, err := r.db.Exec(pgStr, headerID)
return err
}
// Marks all of the provided headers checked for each of the provided column ids
func (r *headerRepository) MarkHeadersCheckedForAll(headers []core.Header, ids []string) error {
tx, err := r.db.Beginx()
if err != nil {
return err
}
for _, header := range headers {
pgStr := "INSERT INTO public.checked_headers (header_id, "
for _, id := range ids {
pgStr += id + ", "
}
pgStr = pgStr[:len(pgStr)-2] + ") VALUES ($1, "
for i := 0; i < len(ids); i++ {
pgStr += "1, "
}
pgStr = pgStr[:len(pgStr)-2] + ") ON CONFLICT (header_id) DO UPDATE SET "
for _, id := range ids {
pgStr += fmt.Sprintf("%s = checked_headers.%s + 1, ", id, id)
}
pgStr = pgStr[:len(pgStr)-2]
_, err = tx.Exec(pgStr, header.Id)
if err != nil {
tx.Rollback()
return err
}
}
err = tx.Commit()
return err
}
// Returns missing headers for the provided checked_headers column id
func (r *headerRepository) MissingHeaders(startingBlockNumber, endingBlockNumber int64, id string) ([]core.Header, error) {
var result []core.Header
var query string
var err error
if endingBlockNumber == -1 {
query = `SELECT headers.id, headers.block_number, headers.hash FROM headers
LEFT JOIN checked_headers on headers.id = header_id
WHERE (header_id ISNULL OR checked_headers.` + id + `=0)
AND headers.block_number >= $1
AND headers.eth_node_fingerprint = $2
ORDER BY headers.block_number`
err = r.db.Select(&result, query, startingBlockNumber, r.db.Node.ID)
} else {
query = `SELECT headers.id, headers.block_number, headers.hash FROM headers
LEFT JOIN checked_headers on headers.id = header_id
WHERE (header_id ISNULL OR checked_headers.` + id + `=0)
AND headers.block_number >= $1
AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $3
ORDER BY headers.block_number`
err = r.db.Select(&result, query, startingBlockNumber, endingBlockNumber, r.db.Node.ID)
}
return continuousHeaders(result), err
}
// Returns missing headers for all of the provided checked_headers column ids
func (r *headerRepository) MissingHeadersForAll(startingBlockNumber, endingBlockNumber int64, ids []string) ([]core.Header, error) {
var result []core.Header
var query string
var err error
baseQuery := `SELECT headers.id, headers.block_number, headers.hash FROM headers
LEFT JOIN checked_headers on headers.id = header_id
WHERE (header_id ISNULL`
for _, id := range ids {
baseQuery += ` OR checked_headers.` + id + `= 0`
}
if endingBlockNumber == -1 {
endStr := `) AND headers.block_number >= $1
AND headers.eth_node_fingerprint = $2
ORDER BY headers.block_number`
query = baseQuery + endStr
err = r.db.Select(&result, query, startingBlockNumber, r.db.Node.ID)
} else {
endStr := `) AND headers.block_number >= $1
AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $3
ORDER BY headers.block_number`
query = baseQuery + endStr
err = r.db.Select(&result, query, startingBlockNumber, endingBlockNumber, r.db.Node.ID)
}
return continuousHeaders(result), err
}
// Returns headers that have been checked for all of the provided event ids but not for the provided method ids
func (r *headerRepository) MissingMethodsCheckedEventsIntersection(startingBlockNumber, endingBlockNumber int64, methodIds, eventIds []string) ([]core.Header, error) {
var result []core.Header
var query string
var err error
baseQuery := `SELECT headers.id, headers.block_number, headers.hash FROM headers
LEFT JOIN checked_headers on headers.id = header_id
WHERE (header_id IS NOT NULL`
for _, id := range eventIds {
baseQuery += ` AND ` + id + `!=0`
}
baseQuery += `) AND (`
for _, id := range methodIds {
baseQuery += id + ` =0 AND `
}
baseQuery = baseQuery[:len(baseQuery)-5] + `) `
if endingBlockNumber == -1 {
endStr := `AND headers.block_number >= $1
AND headers.eth_node_fingerprint = $2
ORDER BY headers.block_number`
query = baseQuery + endStr
err = r.db.Select(&result, query, startingBlockNumber, r.db.Node.ID)
} else {
endStr := `AND headers.block_number >= $1
AND headers.block_number <= $2
AND headers.eth_node_fingerprint = $3
ORDER BY headers.block_number`
query = baseQuery + endStr
err = r.db.Select(&result, query, startingBlockNumber, endingBlockNumber, r.db.Node.ID)
}
return continuousHeaders(result), err
}
// Returns a continuous set of headers
func continuousHeaders(headers []core.Header) []core.Header {
if len(headers) < 1 {
return headers
}
previousHeader := headers[0].BlockNumber
for i := 1; i < len(headers); i++ {
previousHeader++
if headers[i].BlockNumber != previousHeader {
return headers[:i]
}
}
return headers
}
// Check the repositories column id cache for a value
func (r *headerRepository) CheckCache(key string) (interface{}, bool) {
return r.columns.Get(key)
}
// Used to mark a header checked as part of some external transaction so as to group into one commit
func (r *headerRepository) MarkHeaderCheckedInTransaction(headerID int64, tx *sqlx.Tx, eventID string) error {
_, err := tx.Exec(`INSERT INTO public.checked_headers (header_id, `+eventID+`)
VALUES ($1, $2)
ON CONFLICT (header_id) DO
UPDATE SET `+eventID+` = checked_headers.`+eventID+` + 1`, headerID, 1)
return err
}