2018-11-07 21:50:43 +00:00
// VulcanizeDB
2019-03-12 15:46:42 +00:00
// Copyright © 2019 Vulcanize
2018-11-07 21:50:43 +00:00
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package repository
import (
2018-11-20 16:38:23 +00:00
"errors"
2018-11-07 21:50:43 +00:00
"fmt"
2018-11-20 16:38:23 +00:00
"strings"
2018-11-24 04:26:07 +00:00
"github.com/hashicorp/golang-lru"
2019-03-11 23:18:54 +00:00
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/types"
2018-11-07 21:50:43 +00:00
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
2018-11-24 04:26:07 +00:00
const methodCacheSize = 1000
2018-11-23 18:12:24 +00:00
type MethodRepository interface {
2018-12-07 15:38:46 +00:00
PersistResults ( results [ ] types . Result , methodInfo types . Method , contractAddr , contractName string ) error
CreateMethodTable ( contractAddr string , method types . Method ) ( bool , error )
2018-11-20 16:38:23 +00:00
CreateContractSchema ( contractAddr string ) ( bool , error )
2018-11-24 04:26:07 +00:00
CheckSchemaCache ( key string ) ( interface { } , bool )
CheckTableCache ( key string ) ( interface { } , bool )
2018-11-07 21:50:43 +00:00
}
2018-11-23 18:12:24 +00:00
type methodRepository struct {
2018-11-07 21:50:43 +00:00
* postgres . DB
2018-11-24 04:26:07 +00:00
mode types . Mode
schemas * lru . Cache // Cache names of recently used schemas to minimize db connections
tables * lru . Cache // Cache names of recently used tables to minimize db connections
2018-11-07 21:50:43 +00:00
}
2018-11-24 04:26:07 +00:00
func NewMethodRepository ( db * postgres . DB , mode types . Mode ) * methodRepository {
ccs , _ := lru . New ( contractCacheSize )
mcs , _ := lru . New ( methodCacheSize )
2018-11-23 18:12:24 +00:00
return & methodRepository {
2018-11-24 04:26:07 +00:00
DB : db ,
mode : mode ,
schemas : ccs ,
tables : mcs ,
2018-11-07 21:50:43 +00:00
}
}
2018-12-07 15:38:46 +00:00
// Creates a schema for the contract if needed
// Creates table for the contract method if needed
// Persists method polling data into this custom table
func ( r * methodRepository ) PersistResults ( results [ ] types . Result , methodInfo types . Method , contractAddr , contractName string ) error {
if len ( results ) == 0 {
return errors . New ( "method repository error: passed empty results slice" )
2018-11-20 16:38:23 +00:00
}
2018-11-24 04:26:07 +00:00
_ , err := r . CreateContractSchema ( contractAddr )
2018-11-07 21:50:43 +00:00
if err != nil {
return err
}
2018-12-07 15:38:46 +00:00
_ , err = r . CreateMethodTable ( contractAddr , methodInfo )
2018-11-20 16:38:23 +00:00
if err != nil {
return err
2018-11-07 21:50:43 +00:00
}
2018-12-07 15:38:46 +00:00
return r . persistResults ( results , methodInfo , contractAddr , contractName )
2018-11-07 21:50:43 +00:00
}
// Creates a custom postgres command to persist logs for the given event
2018-12-07 15:38:46 +00:00
func ( r * methodRepository ) persistResults ( results [ ] types . Result , methodInfo types . Method , contractAddr , contractName string ) error {
2019-03-11 16:19:18 +00:00
tx , err := r . DB . Begin ( )
2018-12-07 15:38:46 +00:00
if err != nil {
return err
2018-11-20 16:38:23 +00:00
}
2018-12-07 15:38:46 +00:00
for _ , result := range results {
// Begin postgres string
pgStr := fmt . Sprintf ( "INSERT INTO %s_%s.%s_method " , r . mode . String ( ) , strings . ToLower ( contractAddr ) , strings . ToLower ( result . Name ) )
pgStr = pgStr + "(token_name, block"
ml := len ( result . Args )
// Preallocate slice of needed capacity and proceed to pack variables into it in same order they appear in string
data := make ( [ ] interface { } , 0 , 3 + ml )
data = append ( data ,
contractName ,
result . Block )
// Iterate over method args and return value, adding names
// to the string and pushing values to the slice
for i , arg := range result . Args {
pgStr = pgStr + fmt . Sprintf ( ", %s_" , strings . ToLower ( arg . Name ) ) // Add underscore after to avoid any collisions with reserved pg words
data = append ( data , result . Inputs [ i ] )
}
pgStr = pgStr + ", returned) VALUES ($1, $2"
data = append ( data , result . Output )
2018-11-20 16:38:23 +00:00
2018-12-07 15:38:46 +00:00
// For each input entry we created we add its postgres command variable to the string
for i := 0 ; i <= ml ; i ++ {
pgStr = pgStr + fmt . Sprintf ( ", $%d" , i + 3 )
}
pgStr = pgStr + ")"
// Add this query to the transaction
_ , err = tx . Exec ( pgStr , data ... )
if err != nil {
tx . Rollback ( )
return err
}
2018-11-07 21:50:43 +00:00
}
2018-12-07 15:38:46 +00:00
return tx . Commit ( )
2018-11-07 21:50:43 +00:00
}
// Checks for event table and creates it if it does not already exist
2018-12-07 15:38:46 +00:00
func ( r * methodRepository ) CreateMethodTable ( contractAddr string , method types . Method ) ( bool , error ) {
2018-11-24 04:26:07 +00:00
tableID := fmt . Sprintf ( "%s_%s.%s_method" , r . mode . String ( ) , strings . ToLower ( contractAddr ) , strings . ToLower ( method . Name ) )
// Check cache before querying pq to see if table exists
_ , ok := r . tables . Get ( tableID )
if ok {
return false , nil
}
tableExists , err := r . checkForTable ( contractAddr , method . Name )
2018-11-07 21:50:43 +00:00
if err != nil {
2018-11-20 16:38:23 +00:00
return false , err
2018-11-07 21:50:43 +00:00
}
if ! tableExists {
2018-11-24 04:26:07 +00:00
err = r . newMethodTable ( tableID , method )
2018-11-07 21:50:43 +00:00
if err != nil {
2018-11-20 16:38:23 +00:00
return false , err
2018-11-07 21:50:43 +00:00
}
}
2018-11-24 04:26:07 +00:00
// Add schema name to cache
r . tables . Add ( tableID , true )
2018-11-20 16:38:23 +00:00
return ! tableExists , nil
2018-11-07 21:50:43 +00:00
}
// Creates a table for the given contract and event
2018-12-07 15:38:46 +00:00
func ( r * methodRepository ) newMethodTable ( tableID string , method types . Method ) error {
2018-11-07 21:50:43 +00:00
// Begin pg string
2018-11-24 04:26:07 +00:00
pgStr := fmt . Sprintf ( "CREATE TABLE IF NOT EXISTS %s " , tableID )
2018-11-07 21:50:43 +00:00
pgStr = pgStr + "(id SERIAL, token_name CHARACTER VARYING(66) NOT NULL, block INTEGER NOT NULL,"
// Iterate over method inputs and outputs, using their name and pgType to grow the string
2018-11-20 16:38:23 +00:00
for _ , arg := range method . Args {
pgStr = pgStr + fmt . Sprintf ( " %s_ %s NOT NULL," , strings . ToLower ( arg . Name ) , arg . PgType )
2018-11-07 21:50:43 +00:00
}
2018-11-20 16:38:23 +00:00
pgStr = pgStr + fmt . Sprintf ( " returned %s NOT NULL)" , method . Return [ 0 ] . PgType )
2018-11-07 21:50:43 +00:00
2018-11-24 04:26:07 +00:00
_ , err := r . DB . Exec ( pgStr )
2018-11-07 21:50:43 +00:00
return err
}
// Checks if a table already exists for the given contract and event
2018-11-24 04:26:07 +00:00
func ( r * methodRepository ) checkForTable ( contractAddr string , methodName string ) ( bool , error ) {
pgStr := fmt . Sprintf ( "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_schema = '%s_%s' AND table_name = '%s_method')" , r . mode . String ( ) , strings . ToLower ( contractAddr ) , strings . ToLower ( methodName ) )
2018-11-07 21:50:43 +00:00
var exists bool
2018-11-24 04:26:07 +00:00
err := r . DB . Get ( & exists , pgStr )
2018-11-07 21:50:43 +00:00
return exists , err
}
// Checks for contract schema and creates it if it does not already exist
2018-11-24 04:26:07 +00:00
func ( r * methodRepository ) CreateContractSchema ( contractAddr string ) ( bool , error ) {
2018-11-20 16:38:23 +00:00
if contractAddr == "" {
return false , errors . New ( "error: no contract address specified" )
}
2018-11-24 04:26:07 +00:00
// Check cache before querying pq to see if schema exists
_ , ok := r . schemas . Get ( contractAddr )
if ok {
return false , nil
}
schemaExists , err := r . checkForSchema ( contractAddr )
2018-11-07 21:50:43 +00:00
if err != nil {
2018-11-20 16:38:23 +00:00
return false , err
2018-11-07 21:50:43 +00:00
}
if ! schemaExists {
2018-11-24 04:26:07 +00:00
err = r . newContractSchema ( contractAddr )
2018-11-07 21:50:43 +00:00
if err != nil {
2018-11-20 16:38:23 +00:00
return false , err
2018-11-07 21:50:43 +00:00
}
}
2018-11-24 04:26:07 +00:00
// Add schema name to cache
r . schemas . Add ( contractAddr , true )
2018-11-20 16:38:23 +00:00
return ! schemaExists , nil
2018-11-07 21:50:43 +00:00
}
// Creates a schema for the given contract
2018-11-24 04:26:07 +00:00
func ( r * methodRepository ) newContractSchema ( contractAddr string ) error {
_ , err := r . DB . Exec ( "CREATE SCHEMA IF NOT EXISTS " + r . mode . String ( ) + "_" + strings . ToLower ( contractAddr ) )
2018-11-07 21:50:43 +00:00
return err
}
// Checks if a schema already exists for the given contract
2018-11-24 04:26:07 +00:00
func ( r * methodRepository ) checkForSchema ( contractAddr string ) ( bool , error ) {
pgStr := fmt . Sprintf ( "SELECT EXISTS (SELECT schema_name FROM information_schema.schemata WHERE schema_name = '%s_%s')" , r . mode . String ( ) , strings . ToLower ( contractAddr ) )
2018-11-07 21:50:43 +00:00
var exists bool
2018-11-24 04:26:07 +00:00
err := r . DB . Get ( & exists , pgStr )
2018-11-07 21:50:43 +00:00
return exists , err
}
2018-11-24 04:26:07 +00:00
func ( r * methodRepository ) CheckSchemaCache ( key string ) ( interface { } , bool ) {
return r . schemas . Get ( key )
}
func ( r * methodRepository ) CheckTableCache ( key string ) ( interface { } , bool ) {
return r . tables . Get ( key )
}