diff --git a/statediff/indexer/database/file/csv_writer.go b/statediff/indexer/database/file/csv_writer.go index 0ade53565..d5fbea32f 100644 --- a/statediff/indexer/database/file/csv_writer.go +++ b/statediff/indexer/database/file/csv_writer.go @@ -49,9 +49,20 @@ var ( } ) +type tableRow struct { + table types.Table + values []interface{} +} + type CSVWriter struct { dir string // dir containing output files writers fileWriters + + rows chan tableRow + flushChan chan struct{} + flushFinished chan struct{} + quitChan chan struct{} + doneChan chan struct{} } type fileWriter struct { @@ -109,32 +120,66 @@ func NewCSVWriter(path string) (*CSVWriter, error) { return nil, err } csvWriter := &CSVWriter{ - writers: writers, - dir: path, + writers: writers, + dir: path, + flushChan: make(chan struct{}), + flushFinished: make(chan struct{}), + quitChan: make(chan struct{}), + doneChan: make(chan struct{}), } return csvWriter, nil } +func (csw *CSVWriter) Loop() { + go func() { + defer close(csw.doneChan) + for { + select { + case row := <-csw.rows: + // TODO: Check available buffer size and flush + csw.writers.flush() + + csw.writers.write(&row.table, row.values...) + case <-csw.quitChan: + if err := csw.writers.flush(); err != nil { + panic(fmt.Sprintf("error writing csv buffer to file: %v", err)) + } + return + case <-csw.flushChan: + if err := csw.writers.flush(); err != nil { + panic(fmt.Sprintf("error writing csv buffer to file: %v", err)) + } + csw.flushFinished <- struct{}{} + } + } + }() +} + // Flush sends a flush signal to the looping process func (csw *CSVWriter) Flush() { - csw.writers.flush() + csw.flushChan <- struct{}{} + <-csw.flushFinished } func TableFile(dir, name string) string { return filepath.Join(dir, name+".csv") } // Close satisfies io.Closer func (csw *CSVWriter) Close() error { - return csw.writers.flush() + close(csw.quitChan) + <-csw.doneChan + return nil } func (csw *CSVWriter) upsertNode(node nodeinfo.Info) { - csw.writers.write(&types.TableNodeInfo, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID) - csw.writers.flush() + var values []interface{} + values = append(values, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID) + csw.rows <- tableRow{types.TableNodeInfo, values} } func (csw *CSVWriter) upsertIPLD(ipld models.IPLDModel) { - csw.writers.write(&types.TableIPLDBlock, ipld.BlockNumber, ipld.Key, ipld.Data) - csw.writers.flush() + var values []interface{} + values = append(values, ipld.BlockNumber, ipld.Key, ipld.Data) + csw.rows <- tableRow{types.TableIPLDBlock, values} } func (csw *CSVWriter) upsertIPLDDirect(blockNumber, key string, value []byte) { @@ -168,46 +213,52 @@ func (csw *CSVWriter) upsertIPLDRaw(blockNumber string, codec, mh uint64, raw [] } func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) { - csw.writers.write(&types.TableHeader, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, + var values []interface{} + values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.MhKey, 1, header.Coinbase) - csw.writers.flush() + csw.rows <- tableRow{types.TableHeader, values} indexerMetrics.blocks.Inc(1) } func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) { - csw.writers.write(&types.TableUncle, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, + var values []interface{} + values = append(values, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey) - csw.writers.flush() + csw.rows <- tableRow{types.TableUncle, values} } func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) { - csw.writers.write(&types.TableTransaction, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, + var values []interface{} + values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value) - csw.writers.flush() + csw.rows <- tableRow{types.TableTransaction, values} indexerMetrics.transactions.Inc(1) } func (csw *CSVWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) { - csw.writers.write(&types.TableAccessListElement, accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys) - csw.writers.flush() + var values []interface{} + values = append(values, accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys) + csw.rows <- tableRow{types.TableAccessListElement, values} indexerMetrics.accessListEntries.Inc(1) } func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) { - csw.writers.write(&types.TableReceipt, rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, + var values []interface{} + values = append(values, rct.BlockNumber, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot) - csw.writers.flush() + csw.rows <- tableRow{types.TableReceipt, values} indexerMetrics.receipts.Inc(1) } func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) { for _, l := range logs { - csw.writers.write(&types.TableLog, l.BlockNumber, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, + var values []interface{} + values = append(values, l.BlockNumber, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3, l.Data) + csw.rows <- tableRow{types.TableLog, values} indexerMetrics.logs.Inc(1) } - csw.writers.flush() } func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) { @@ -215,15 +266,18 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) { if stateNode.StateKey != nullHash.String() { stateKey = stateNode.StateKey } - csw.writers.write(&types.TableStateNode, stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, + + var values []interface{} + values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey) - csw.writers.flush() + csw.rows <- tableRow{types.TableStateNode, values} } func (csw *CSVWriter) upsertStateAccount(stateAccount models.StateAccountModel) { - csw.writers.write(&types.TableStateAccount, stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, + var values []interface{} + values = append(values, stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, strconv.FormatUint(stateAccount.Nonce, 10), stateAccount.CodeHash, stateAccount.StorageRoot) - csw.writers.flush() + csw.rows <- tableRow{types.TableStateAccount, values} } func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) { @@ -231,7 +285,9 @@ func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) { if storageCID.StorageKey != nullHash.String() { storageKey = storageCID.StorageKey } - csw.writers.write(&types.TableStorageNode, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, + + var values []interface{} + values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) - csw.writers.flush() + csw.rows <- tableRow{types.TableStorageNode, values} } diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 88df17d7b..baf1713fa 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -60,7 +60,7 @@ var ( // StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void type StateDiffIndexer struct { - fileWriter *CSVWriter + fileWriter FileWriter chainConfig *params.ChainConfig nodeID string wg *sync.WaitGroup @@ -90,6 +90,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c } wg := new(sync.WaitGroup) + w.Loop() w.upsertNode(config.NodeInfo) return &StateDiffIndexer{ fileWriter: w, diff --git a/statediff/indexer/database/file/interfaces.go b/statediff/indexer/database/file/interfaces.go new file mode 100644 index 000000000..700df4eac --- /dev/null +++ b/statediff/indexer/database/file/interfaces.go @@ -0,0 +1,45 @@ +// VulcanizeDB +// Copyright © 2022 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package file + +import ( + node "github.com/ipfs/go-ipld-format" + + "github.com/ethereum/go-ethereum/statediff/indexer/models" + nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" +) + +// Writer interface required by the file indexer +type FileWriter interface { + Loop() + Close() error + Flush() + upsertNode(node nodeinfo.Info) + upsertIPLD(ipld models.IPLDModel) + upsertIPLDDirect(blockNumber, key string, value []byte) + upsertIPLDNode(blockNumber string, i node.Node) + upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error) + upsertHeaderCID(header models.HeaderModel) + upsertUncleCID(uncle models.UncleModel) + upsertTransactionCID(transaction models.TxModel) + upsertAccessListElement(accessListElement models.AccessListElementModel) + upsertReceiptCID(rct *models.ReceiptModel) + upsertLogCID(logs []*models.LogsModel) + upsertStateCID(stateNode models.StateNodeModel) + upsertStateAccount(stateAccount models.StateAccountModel) + upsertStorageCID(storageCID models.StorageNodeModel) +}