2018-11-07 21:50:43 +00:00
// VulcanizeDB
// Copyright © 2018 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repository
import (
"errors"
"fmt"
"strings"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
2018-11-23 18:12:24 +00:00
"github.com/vulcanize/vulcanizedb/pkg/omni/shared/types"
2018-11-07 21:50:43 +00:00
)
2018-11-23 18:12:24 +00:00
type EventRepository interface {
2018-11-20 16:38:23 +00:00
PersistLog ( event types . Log , contractAddr , contractName string ) error
CreateEventTable ( contractName string , event types . Log ) ( bool , error )
2018-11-07 21:50:43 +00:00
CreateContractSchema ( contractName string ) ( bool , error )
}
2018-11-23 18:12:24 +00:00
type eventRepository struct {
db * postgres . DB
2018-11-07 21:50:43 +00:00
}
2018-11-23 18:12:24 +00:00
func NewEventRepository ( db * postgres . DB ) * eventRepository {
return & eventRepository {
db : db ,
2018-11-07 21:50:43 +00:00
}
}
2018-11-20 16:38:23 +00:00
// Creates a schema for the contract if needed
// Creates table for the watched contract event if needed
// Persists converted event log data into this custom table
2018-11-23 18:12:24 +00:00
func ( r * eventRepository ) PersistLog ( event types . Log , contractAddr , contractName string ) error {
_ , err := r . CreateContractSchema ( contractAddr )
2018-11-07 21:50:43 +00:00
if err != nil {
return err
}
2018-11-23 18:12:24 +00:00
_ , err = r . CreateEventTable ( contractAddr , event )
2018-11-20 16:38:23 +00:00
if err != nil {
return err
2018-11-07 21:50:43 +00:00
}
2018-11-23 18:12:24 +00:00
return r . persistLog ( event , contractAddr , contractName )
2018-11-07 21:50:43 +00:00
}
// Creates a custom postgres command to persist logs for the given event
2018-11-23 18:12:24 +00:00
func ( r * eventRepository ) persistLog ( event types . Log , contractAddr , contractName string ) error {
2018-11-20 16:38:23 +00:00
// Begin postgres string
2018-11-23 18:12:24 +00:00
pgStr := fmt . Sprintf ( "INSERT INTO l%s.%s_event " , strings . ToLower ( contractAddr ) , strings . ToLower ( event . Name ) )
pgStr = pgStr + "(header_id, token_name, raw_log, log_idx, tx_idx"
2018-11-20 16:38:23 +00:00
// Pack the corresponding variables in a slice
var data [ ] interface { }
data = append ( data ,
event . Id ,
contractName ,
2018-11-23 18:12:24 +00:00
event . Raw ,
event . LogIndex ,
event . TransactionIndex )
2018-11-20 16:38:23 +00:00
// Iterate over name-value pairs in the log adding
// names to the string and pushing values to the slice
counter := 0 // Keep track of number of inputs
for inputName , input := range event . Values {
counter += 1
pgStr = pgStr + fmt . Sprintf ( ", %s_" , strings . ToLower ( inputName ) ) // Add underscore after to avoid any collisions with reserved pg words
data = append ( data , input )
}
2018-11-07 21:50:43 +00:00
2018-11-20 16:38:23 +00:00
// Finish off the string and execute the command using the packed data
// For each input entry we created we add its postgres command variable to the string
2018-11-23 18:12:24 +00:00
pgStr = pgStr + ") VALUES ($1, $2, $3, $4, $5"
2018-11-20 16:38:23 +00:00
for i := 0 ; i < counter ; i ++ {
2018-11-23 18:12:24 +00:00
pgStr = pgStr + fmt . Sprintf ( ", $%d" , i + 6 )
2018-11-20 16:38:23 +00:00
}
2018-11-23 18:12:24 +00:00
pgStr = pgStr + ")"
2018-11-07 21:50:43 +00:00
2018-11-23 18:12:24 +00:00
_ , err := r . db . Exec ( pgStr , data ... )
2018-11-20 16:38:23 +00:00
if err != nil {
return err
2018-11-07 21:50:43 +00:00
}
return nil
}
// Checks for event table and creates it if it does not already exist
2018-11-23 18:12:24 +00:00
func ( r * eventRepository ) CreateEventTable ( contractAddr string , event types . Log ) ( bool , error ) {
tableExists , err := r . checkForTable ( contractAddr , event . Name )
2018-11-07 21:50:43 +00:00
if err != nil {
return false , err
}
if ! tableExists {
2018-11-23 18:12:24 +00:00
err = r . newEventTable ( contractAddr , event )
2018-11-07 21:50:43 +00:00
if err != nil {
return false , err
}
}
return ! tableExists , nil
}
// Creates a table for the given contract and event
2018-11-23 18:12:24 +00:00
func ( r * eventRepository ) newEventTable ( contractAddr string , event types . Log ) error {
2018-11-07 21:50:43 +00:00
// Begin pg string
2018-11-23 18:12:24 +00:00
pgStr := fmt . Sprintf ( "CREATE TABLE IF NOT EXISTS l%s.%s_event " , strings . ToLower ( contractAddr ) , strings . ToLower ( event . Name ) )
pgStr = pgStr + "(id SERIAL, header_id INTEGER NOT NULL REFERENCES headers (id) ON DELETE CASCADE, token_name CHARACTER VARYING(66) NOT NULL, raw_log JSONB, log_idx INTEGER NOT NULL, tx_idx INTEGER NOT NULL,"
2018-11-07 21:50:43 +00:00
// Iterate over event fields, using their name and pgType to grow the string
for _ , field := range event . Fields {
pgStr = pgStr + fmt . Sprintf ( " %s_ %s NOT NULL," , strings . ToLower ( field . Name ) , field . PgType )
}
2018-11-23 18:12:24 +00:00
pgStr = pgStr + " UNIQUE (header_id, tx_idx, log_idx))"
_ , err := r . db . Exec ( pgStr )
2018-11-07 21:50:43 +00:00
return err
}
// Checks if a table already exists for the given contract and event
2018-11-23 18:12:24 +00:00
func ( r * eventRepository ) checkForTable ( contractAddr string , eventName string ) ( bool , error ) {
pgStr := fmt . Sprintf ( "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = 'l%s' AND table_name = '%s_event')" , strings . ToLower ( contractAddr ) , strings . ToLower ( eventName ) )
2018-11-07 21:50:43 +00:00
var exists bool
2018-11-23 18:12:24 +00:00
err := r . db . Get ( & exists , pgStr )
2018-11-07 21:50:43 +00:00
return exists , err
}
// Checks for contract schema and creates it if it does not already exist
2018-11-23 18:12:24 +00:00
func ( r * eventRepository ) CreateContractSchema ( contractAddr string ) ( bool , error ) {
2018-11-07 21:50:43 +00:00
if contractAddr == "" {
return false , errors . New ( "error: no contract address specified" )
}
2018-11-20 16:38:23 +00:00
2018-11-23 18:12:24 +00:00
schemaExists , err := r . checkForSchema ( contractAddr )
2018-11-07 21:50:43 +00:00
if err != nil {
return false , err
}
if ! schemaExists {
2018-11-23 18:12:24 +00:00
err = r . newContractSchema ( contractAddr )
2018-11-07 21:50:43 +00:00
if err != nil {
return false , err
}
}
return ! schemaExists , nil
}
// Creates a schema for the given contract
2018-11-23 18:12:24 +00:00
func ( r * eventRepository ) newContractSchema ( contractAddr string ) error {
_ , err := r . db . Exec ( "CREATE SCHEMA IF NOT EXISTS l" + strings . ToLower ( contractAddr ) )
2018-11-07 21:50:43 +00:00
return err
}
// Checks if a schema already exists for the given contract
2018-11-23 18:12:24 +00:00
func ( r * eventRepository ) checkForSchema ( contractAddr string ) ( bool , error ) {
pgStr := fmt . Sprintf ( "SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'l%s')" , strings . ToLower ( contractAddr ) )
2018-11-07 21:50:43 +00:00
var exists bool
2018-11-23 18:12:24 +00:00
err := r . db . QueryRow ( pgStr ) . Scan ( & exists )
2018-11-07 21:50:43 +00:00
return exists , err
}