Implement writer interface for file indexer

This commit is contained in:
nabarun 2022-06-22 18:44:23 +05:30
parent 201770b74d
commit fc2d7452fa
3 changed files with 129 additions and 27 deletions

View File

@ -49,9 +49,20 @@ var (
}
)
type tableRow struct {
table types.Table
values []interface{}
}
type CSVWriter struct {
dir string // dir containing output files
writers fileWriters
rows chan tableRow
flushChan chan struct{}
flushFinished chan struct{}
quitChan chan struct{}
doneChan chan struct{}
}
type fileWriter struct {
@ -109,32 +120,66 @@ func NewCSVWriter(path string) (*CSVWriter, error) {
return nil, err
}
csvWriter := &CSVWriter{
writers: writers,
dir: path,
writers: writers,
dir: path,
flushChan: make(chan struct{}),
flushFinished: make(chan struct{}),
quitChan: make(chan struct{}),
doneChan: make(chan struct{}),
}
return csvWriter, nil
}
func (csw *CSVWriter) Loop() {
go func() {
defer close(csw.doneChan)
for {
select {
case row := <-csw.rows:
// TODO: Check available buffer size and flush
csw.writers.flush()
csw.writers.write(&row.table, row.values...)
case <-csw.quitChan:
if err := csw.writers.flush(); err != nil {
panic(fmt.Sprintf("error writing csv buffer to file: %v", err))
}
return
case <-csw.flushChan:
if err := csw.writers.flush(); err != nil {
panic(fmt.Sprintf("error writing csv buffer to file: %v", err))
}
csw.flushFinished <- struct{}{}
}
}
}()
}
// Flush sends a flush signal to the looping process
func (csw *CSVWriter) Flush() {
csw.writers.flush()
csw.flushChan <- struct{}{}
<-csw.flushFinished
}
func TableFile(dir, name string) string { return filepath.Join(dir, name+".csv") }
// Close satisfies io.Closer
func (csw *CSVWriter) Close() error {
return csw.writers.flush()
close(csw.quitChan)
<-csw.doneChan
return nil
}
func (csw *CSVWriter) upsertNode(node nodeinfo.Info) {
csw.writers.write(&types.TableNodeInfo, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID)
csw.writers.flush()
var values []interface{}
values = append(values, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID)
csw.rows <- tableRow{types.TableNodeInfo, values}
}
func (csw *CSVWriter) upsertIPLD(ipld models.IPLDModel) {
csw.writers.write(&types.TableIPLDBlock, ipld.BlockNumber, ipld.Key, ipld.Data)
csw.writers.flush()
var values []interface{}
values = append(values, ipld.BlockNumber, ipld.Key, ipld.Data)
csw.rows <- tableRow{types.TableIPLDBlock, values}
}
func (csw *CSVWriter) upsertIPLDDirect(blockNumber, key string, value []byte) {
@ -168,46 +213,52 @@ func (csw *CSVWriter) upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []
}
func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) {
csw.writers.write(&types.TableHeader, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
var values []interface{}
values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.MhKey, 1, header.Coinbase)
csw.writers.flush()
csw.rows <- tableRow{types.TableHeader, values}
indexerMetrics.blocks.Inc(1)
}
func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) {
csw.writers.write(&types.TableUncle, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID,
var values []interface{}
values = append(values, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID,
uncle.Reward, uncle.MhKey)
csw.writers.flush()
csw.rows <- tableRow{types.TableUncle, values}
}
func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) {
csw.writers.write(&types.TableTransaction, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
var values []interface{}
values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)
csw.writers.flush()
csw.rows <- tableRow{types.TableTransaction, values}
indexerMetrics.transactions.Inc(1)
}
func (csw *CSVWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) {
csw.writers.write(&types.TableAccessListElement, accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys)
csw.writers.flush()
var values []interface{}
values = append(values, accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys)
csw.rows <- tableRow{types.TableAccessListElement, values}
indexerMetrics.accessListEntries.Inc(1)
}
func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
csw.writers.write(&types.TableReceipt, rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey,
var values []interface{}
values = append(values, rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey,
rct.PostState, rct.PostStatus, rct.LogRoot)
csw.writers.flush()
csw.rows <- tableRow{types.TableReceipt, values}
indexerMetrics.receipts.Inc(1)
}
func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) {
for _, l := range logs {
csw.writers.write(&types.TableLog, l.BlockNumber, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0,
var values []interface{}
values = append(values, l.BlockNumber, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3, l.Data)
csw.rows <- tableRow{types.TableLog, values}
indexerMetrics.logs.Inc(1)
}
csw.writers.flush()
}
func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) {
@ -215,15 +266,18 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) {
if stateNode.StateKey != nullHash.String() {
stateKey = stateNode.StateKey
}
csw.writers.write(&types.TableStateNode, stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path,
var values []interface{}
values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path,
stateNode.NodeType, true, stateNode.MhKey)
csw.writers.flush()
csw.rows <- tableRow{types.TableStateNode, values}
}
func (csw *CSVWriter) upsertStateAccount(stateAccount models.StateAccountModel) {
csw.writers.write(&types.TableStateAccount, stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance,
var values []interface{}
values = append(values, stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance,
strconv.FormatUint(stateAccount.Nonce, 10), stateAccount.CodeHash, stateAccount.StorageRoot)
csw.writers.flush()
csw.rows <- tableRow{types.TableStateAccount, values}
}
func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
@ -231,7 +285,9 @@ func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
if storageCID.StorageKey != nullHash.String() {
storageKey = storageCID.StorageKey
}
csw.writers.write(&types.TableStorageNode, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
var values []interface{}
values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)
csw.writers.flush()
csw.rows <- tableRow{types.TableStorageNode, values}
}

View File

@ -60,7 +60,7 @@ var (
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
type StateDiffIndexer struct {
fileWriter *CSVWriter
fileWriter FileWriter
chainConfig *params.ChainConfig
nodeID string
wg *sync.WaitGroup
@ -90,6 +90,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
}
wg := new(sync.WaitGroup)
w.Loop()
w.upsertNode(config.NodeInfo)
return &StateDiffIndexer{
fileWriter: w,

View File

@ -0,0 +1,45 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package file
import (
node "github.com/ipfs/go-ipld-format"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
)
// Writer interface required by the file indexer
type FileWriter interface {
Loop()
Close() error
Flush()
upsertNode(node nodeinfo.Info)
upsertIPLD(ipld models.IPLDModel)
upsertIPLDDirect(blockNumber, key string, value []byte)
upsertIPLDNode(blockNumber string, i node.Node)
upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error)
upsertHeaderCID(header models.HeaderModel)
upsertUncleCID(uncle models.UncleModel)
upsertTransactionCID(transaction models.TxModel)
upsertAccessListElement(accessListElement models.AccessListElementModel)
upsertReceiptCID(rct *models.ReceiptModel)
upsertLogCID(logs []*models.LogsModel)
upsertStateCID(stateNode models.StateNodeModel)
upsertStateAccount(stateAccount models.StateAccountModel)
upsertStorageCID(storageCID models.StorageNodeModel)
}