ipld-eth-state-snapshot/pkg/snapshot/file/publisher_test.go

134 lines
3.4 KiB
Go
Raw Normal View History

package publisher
import (
"context"
"encoding/csv"
"fmt"
"io"
"os"
"path/filepath"
"testing"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/jackc/pgx/v4"
fixt "github.com/vulcanize/ipld-eth-state-snapshot/fixture"
snapt "github.com/vulcanize/ipld-eth-state-snapshot/pkg/types"
"github.com/vulcanize/ipld-eth-state-snapshot/test"
)
var (
pgConfig = test.DefaultPgConfig
nodeInfo = test.DefaultNodeInfo
// tables ordered according to fkey depedencies
allTables = []*snapt.Table{
&snapt.TableIPLDBlock,
&snapt.TableNodeInfo,
&snapt.TableHeader,
&snapt.TableStateNode,
&snapt.TableStorageNode,
}
)
func writeFiles(t *testing.T, dir string) *publisher {
pub, err := NewPublisher(dir, nodeInfo)
test.NoError(t, err)
2022-02-22 09:03:06 +00:00
test.NoError(t, pub.PublishHeader(&fixt.Block1_Header))
tx, err := pub.BeginTx()
test.NoError(t, err)
2022-02-22 09:03:06 +00:00
headerID := fixt.Block1_Header.Hash().String()
test.NoError(t, pub.PublishStateNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number, tx))
test.NoError(t, tx.Commit())
return pub
}
// verify that we can parse the csvs
// TODO check actual data
func verifyFileData(t *testing.T, path string, tbl *snapt.Table) {
file, err := os.Open(path)
test.NoError(t, err)
r := csv.NewReader(file)
test.NoError(t, err)
r.FieldsPerRecord = len(tbl.Columns)
for {
_, err := r.Read()
if err == io.EOF {
break
}
test.NoError(t, err)
}
}
func TestWriting(t *testing.T) {
dir := t.TempDir()
// tempdir like /tmp/TempFoo/001/, TempFoo defaults to 0700
test.NoError(t, os.Chmod(filepath.Dir(dir), 0755))
pub := writeFiles(t, dir)
for _, tbl := range perBlockTables {
verifyFileData(t, TableFile(pub.dir, tbl.Name), tbl)
}
for i := uint32(0); i < pub.txCounter; i++ {
for _, tbl := range perNodeTables {
verifyFileData(t, TableFile(pub.txDir(i), tbl.Name), tbl)
}
}
}
// Note: DB user requires role membership "pg_read_server_files"
func TestPgCopy(t *testing.T) {
test.NeedsDB(t)
dir := t.TempDir()
test.NoError(t, os.Chmod(filepath.Dir(dir), 0755))
pub := writeFiles(t, dir)
ctx := context.Background()
conn, err := pgx.Connect(ctx, pgConfig.DbConnectionString())
test.NoError(t, err)
// clear existing test data
pgDeleteTable := `DELETE FROM %s`
for _, tbl := range allTables {
_, err = conn.Exec(ctx, fmt.Sprintf(pgDeleteTable, tbl.Name))
test.NoError(t, err)
}
// copy from files
pgCopyStatement := `COPY %s FROM '%s' CSV`
for _, tbl := range perBlockTables {
stm := fmt.Sprintf(pgCopyStatement, tbl.Name, TableFile(pub.dir, tbl.Name))
_, err = conn.Exec(ctx, stm)
test.NoError(t, err)
}
for i := uint32(0); i < pub.txCounter; i++ {
for _, tbl := range perNodeTables {
stm := fmt.Sprintf(pgCopyStatement, tbl.Name, TableFile(pub.txDir(i), tbl.Name))
_, err = conn.Exec(ctx, stm)
test.NoError(t, err)
}
}
// check header was successfully committed
pgQueryHeader := `SELECT cid, block_hash
FROM eth.header_cids
WHERE block_number = $1`
type res struct {
CID string
BlockHash string
}
var header res
2022-02-22 09:03:06 +00:00
err = conn.QueryRow(ctx, pgQueryHeader, fixt.Block1_Header.Number.Uint64()).Scan(
&header.CID, &header.BlockHash)
test.NoError(t, err)
2022-02-22 09:03:06 +00:00
headerNode, err := ipld.NewEthHeader(&fixt.Block1_Header)
2022-04-19 10:19:49 +00:00
test.NoError(t, err)
test.ExpectEqual(t, headerNode.Cid().String(), header.CID)
2022-02-22 09:03:06 +00:00
test.ExpectEqual(t, fixt.Block1_Header.Hash().String(), header.BlockHash)
}