diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index de8a78d1..ace79700 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -30,7 +30,7 @@ import ( const ( DefaultMaxBatchSize uint64 = 100 - defaultMaxBatchNumber int64 = 10 + DefaultMaxBatchNumber int64 = 10 ) // BackFillInterface for filling in gaps in the super node @@ -146,8 +146,7 @@ func (bfs *BackFillService) fillGaps(startingBlock, endingBlock uint64) error { log.Infof("going to fill in %s gap from %d to %d", bfs.chain.String(), startingBlock, endingBlock) errChan := make(chan error) done := make(chan bool) - err := bfs.backFill(startingBlock, endingBlock, errChan, done) - if err != nil { + if err := bfs.backFill(startingBlock, endingBlock, errChan, done); err != nil { return err } for { @@ -184,7 +183,7 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64, errChan for _, blockHeights := range blockRangeBins { // if we have reached our limit of active goroutines // wait for one to finish before starting the next - if atomic.AddInt64(&activeCount, 1) > defaultMaxBatchNumber { + if atomic.AddInt64(&activeCount, 1) > DefaultMaxBatchNumber { // this blocks until a process signals it has finished <-forwardDone } diff --git a/pkg/super_node/btc/cleaner.go b/pkg/super_node/btc/cleaner.go new file mode 100644 index 00000000..a2ed3408 --- /dev/null +++ b/pkg/super_node/btc/cleaner.go @@ -0,0 +1,132 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package btc + +import ( + "fmt" + + "github.com/jmoiron/sqlx" + "github.com/sirupsen/logrus" + + "github.com/vulcanize/vulcanizedb/pkg/postgres" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" +) + +// Cleaner satisfies the shared.Cleaner interface fo bitcoin +type Cleaner struct { + db *postgres.DB +} + +// NewCleaner returns a new Cleaner struct that satisfies the shared.Cleaner interface +func NewCleaner(db *postgres.DB) *Cleaner { + return &Cleaner{ + db: db, + } +} + +// Clean removes the specified data from the db within the provided block range +func (c *Cleaner) Clean(rngs [][2]uint64, t shared.DataType) error { + tx, err := c.db.Beginx() + if err != nil { + return err + } + for _, rng := range rngs { + if err := c.clean(tx, rng, t); err != nil { + if err := tx.Rollback(); err != nil { + logrus.Error(err) + } + return err + } + } + return tx.Commit() +} + +func (c *Cleaner) clean(tx *sqlx.Tx, rng [2]uint64, t shared.DataType) error { + switch t { + case shared.Full: + return c.cleanFull(tx, rng) + case shared.Headers: + if err := c.cleanTransactionIPLDs(tx, rng); err != nil { + return err + } + if err := c.cleanHeaderIPLDs(tx, rng); err != nil { + return err + } + return c.cleanHeaderMetaData(tx, rng) + case shared.Transactions: + if err := c.cleanTransactionIPLDs(tx, rng); err != nil { + return err + } + return c.cleanTransactionMetaData(tx, rng) + default: + return fmt.Errorf("btc cleaner unrecognized type: %s", t.String()) + } + return nil +} + +func (c *Cleaner) cleanFull(tx *sqlx.Tx, rng [2]uint64) error { + // Clear all of the indexed iplds + pgStr := `DELETE FROM public.blocks A + USING btc.transaction_cids B, btc.header_cids C + WHERE (A.key = B.cid OR A.key = C.cid) + AND B.header_id = C.id + AND C.block_number BETWEEN $1 AND $2` + _, err := tx.Exec(pgStr, rng[0], rng[1]) + if err != nil { + return err + } + // Clear all the header_cids, this will cascade delete the rest of the index metadata + pgStr = `DELETE FROM eth.header_cids + WHERE block_number BETWEEN $1 AND $2` + _, err = tx.Exec(pgStr, rng[0], rng[1]) + return err +} + +func (c *Cleaner) cleanTransactionIPLDs(tx *sqlx.Tx, rng [2]uint64) error { + pgStr := `DELETE FROM public.blocks A + USING btc.transaction_cids B, btc.header_cids C + WHERE A.key = B.cid + AND B.header_id = C.id + AND C.block_number BETWEEN $1 AND $2` + _, err := tx.Exec(pgStr, rng[0], rng[1]) + return err +} + +func (c *Cleaner) cleanTransactionMetaData(tx *sqlx.Tx, rng [2]uint64) error { + pgStr := `DELETE FROM btc.transaction_cids A + USING btc.header_cids B + WHERE A.header_id = B.id + AND B.block_number BETWEEN $1 AND $2` + _, err := tx.Exec(pgStr, rng[0], rng[1]) + return err +} + +func (c *Cleaner) cleanHeaderIPLDs(tx *sqlx.Tx, rng [2]uint64) error { + pgStr := `DELETE FROM public.blocks A + USING btc.header_cids B + WHERE A.key = B.cid + AND B.block_number BETWEEN $1 AND $2` + _, err := tx.Exec(pgStr, rng[0], rng[1]) + return err +} + +func (c *Cleaner) cleanHeaderMetaData(tx *sqlx.Tx, rng [2]uint64) error { + pgStr := `DELETE FROM btc.header_cids + WHERE block_number BETWEEN $1 AND $2` + _, err := tx.Exec(pgStr, rng[0], rng[1]) + return err +} diff --git a/pkg/super_node/constructors.go b/pkg/super_node/constructors.go index 5a2888f9..8f0ccf6e 100644 --- a/pkg/super_node/constructors.go +++ b/pkg/super_node/constructors.go @@ -166,3 +166,14 @@ func NewPublicAPI(chain shared.ChainType, db *postgres.DB, ipfsPath string) (rpc return rpc.API{}, fmt.Errorf("invalid chain %s for public api constructor", chain.String()) } } + +// NewCleaner constructs a Cleaner for the provided chain type +func NewCleaner(chain shared.ChainType, db *postgres.DB) (shared.Cleaner, error) { + switch chain { + case shared.Ethereum: + return eth.NewCleaner(db), nil + // TODO: support BTC + default: + return nil, fmt.Errorf("invalid chain %s for publisher constructor", chain.String()) + } +} diff --git a/pkg/super_node/eth/cleaner.go b/pkg/super_node/eth/cleaner.go new file mode 100644 index 00000000..d66cbf48 --- /dev/null +++ b/pkg/super_node/eth/cleaner.go @@ -0,0 +1,226 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package eth + +import ( + "fmt" + + "github.com/jmoiron/sqlx" + "github.com/sirupsen/logrus" + + "github.com/vulcanize/vulcanizedb/pkg/postgres" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" +) + +// Cleaner satisfies the shared.Cleaner interface fo ethereum +type Cleaner struct { + db *postgres.DB +} + +// NewCleaner returns a new Cleaner struct that satisfies the shared.Cleaner interface +func NewCleaner(db *postgres.DB) *Cleaner { + return &Cleaner{ + db: db, + } +} + +// Clean removes the specified data from the db within the provided block range +func (c *Cleaner) Clean(rngs [][2]uint64, t shared.DataType) error { + tx, err := c.db.Beginx() + if err != nil { + return err + } + for _, rng := range rngs { + if err := c.clean(tx, rng, t); err != nil { + if err := tx.Rollback(); err != nil { + logrus.Error(err) + } + return err + } + } + return tx.Commit() +} + +func (c *Cleaner) clean(tx *sqlx.Tx, rng [2]uint64, t shared.DataType) error { + switch t { + case shared.Full: + return c.cleanFull(tx, rng) + case shared.Headers: + if err := c.cleanStorageIPLDs(tx, rng); err != nil { + return err + } + if err := c.cleanStateIPLDs(tx, rng); err != nil { + return err + } + if err := c.cleanReceiptIPLDs(tx, rng); err != nil { + return err + } + if err := c.cleanTransactionIPLDs(tx, rng); err != nil { + return err + } + if err := c.cleanHeaderIPLDs(tx, rng); err != nil { + return err + } + return c.cleanHeaderMetaData(tx, rng) + case shared.Transactions: + if err := c.cleanReceiptIPLDs(tx, rng); err != nil { + return err + } + if err := c.cleanTransactionIPLDs(tx, rng); err != nil { + return err + } + return c.cleanTransactionMetaData(tx, rng) + case shared.Receipts: + if err := c.cleanReceiptIPLDs(tx, rng); err != nil { + return err + } + return c.cleanReceiptMetaData(tx, rng) + case shared.State: + if err := c.cleanStorageIPLDs(tx, rng); err != nil { + return err + } + if err := c.cleanStateIPLDs(tx, rng); err != nil { + return err + } + return c.cleanStateMetaData(tx, rng) + case shared.Storage: + if err := c.cleanStorageIPLDs(tx, rng); err != nil { + return err + } + return c.cleanStorageMetaData(tx, rng) + default: + return fmt.Errorf("eth cleaner unrecognized type: %s", t.String()) + } + return nil +} + +func (c *Cleaner) cleanFull(tx *sqlx.Tx, rng [2]uint64) error { + // Clear all of the indexed iplds + pgStr := `DELETE FROM public.blocks A + USING eth.storage_cids B, eth.state_cids C, eth.receipt_cids D, eth.transaction_cids E, eth.header_cids F + WHERE (A.key = B.cid OR A.key = C.cid OR A.key = D.cid OR A.key = E.cid OR A.key = F.cid) + AND B.state_id = C.id + AND C.header_id = F.id + AND D.tx_id = E.id + AND E.header_id = F.id + AND F.block_number BETWEEN $1 AND $2` + _, err := tx.Exec(pgStr, rng[0], rng[1]) + if err != nil { + return err + } + // Clear all the header_cids, this will cascade delete the rest of the index metadata + pgStr = `DELETE FROM eth.header_cids + WHERE block_number BETWEEN $1 AND $2` + _, err = tx.Exec(pgStr, rng[0], rng[1]) + return err +} + +func (c *Cleaner) cleanStorageIPLDs(tx *sqlx.Tx, rng [2]uint64) error { + pgStr := `DELETE FROM public.blocks A + USING eth.storage_cids B, eth.state_cids C, eth.header_cids D + WHERE A.key = B.cid + AND B.state_id = C.id + AND C.header_id = D.id + AND D.block_number BETWEEN $1 AND $2` + _, err := tx.Exec(pgStr, rng[0], rng[1]) + return err +} + +func (c *Cleaner) cleanStorageMetaData(tx *sqlx.Tx, rng [2]uint64) error { + pgStr := `DELETE FROM eth.storage_cids A + USING eth.state_cids B, eth.header_cids C + WHERE A.state_id = B.id + AND B.header_id = C.id + AND C.block_number BETWEEN $1 AND $2` + _, err := tx.Exec(pgStr, rng[0], rng[1]) + return err +} + +func (c *Cleaner) cleanStateIPLDs(tx *sqlx.Tx, rng [2]uint64) error { + pgStr := `DELETE FROM public.blocks A + USING eth.state_cids B, eth.header_cids C + WHERE A.key = B.cid + AND B.header_cid = C.id + AND C.block_number BETWEEN $1 AND $2` + _, err := tx.Exec(pgStr, rng[0], rng[1]) + return err +} + +func (c *Cleaner) cleanStateMetaData(tx *sqlx.Tx, rng [2]uint64) error { + pgStr := `DELETE FROM eth.state_cids A + USING eth.header_cids B + WHERE A.header_id = B.id + AND B.block_number BETWEEN $1 AND $2` + _, err := tx.Exec(pgStr, rng[0], rng[1]) + return err +} + +func (c *Cleaner) cleanReceiptIPLDs(tx *sqlx.Tx, rng [2]uint64) error { + pgStr := `DELETE FROM public.blocks A + USING eth.receipt_cids B, eth.transaction_cids C, eth.header_cids D + WHERE A.key = B.cid + AND B.tx_id = C.id + AND C.header_id = D.id + AND D.block_number BETWEEN $1 AND $2` + _, err := tx.Exec(pgStr, rng[0], rng[1]) + return err +} + +func (c *Cleaner) cleanReceiptMetaData(tx *sqlx.Tx, rng [2]uint64) error { + pgStr := `DELETE FROM eth.receipt_cids A + USING eth.transaction_cids B, eth.header_cids C + WHERE A.tx_id = B.id + AND B.header_id = C.id + AND C.block_number BETWEEN $1 AND $2` + _, err := tx.Exec(pgStr, rng[0], rng[1]) + return err +} + +func (c *Cleaner) cleanTransactionIPLDs(tx *sqlx.Tx, rng [2]uint64) error { + pgStr := `DELETE FROM public.blocks A + USING eth.transaction_cids B, eth.header_cids C + WHERE A.key = B.cid + AND B.header_id = C.id + AND C.block_number BETWEEN $1 AND $2` + _, err := tx.Exec(pgStr, rng[0], rng[1]) + return err +} + +func (c *Cleaner) cleanTransactionMetaData(tx *sqlx.Tx, rng [2]uint64) error { + pgStr := `DELETE FROM eth.transaction_cids A + USING eth.header_cids B + WHERE A.header_id = B.id + AND B.block_number BETWEEN $1 AND $2` + _, err := tx.Exec(pgStr, rng[0], rng[1]) + return err +} + +func (c *Cleaner) cleanHeaderIPLDs(tx *sqlx.Tx, rng [2]uint64) error { + pgStr := `DELETE FROM public.blocks A + USING eth.header_cids B + WHERE A.key = B.cid + AND B.block_number BETWEEN $1 AND $2` + _, err := tx.Exec(pgStr, rng[0], rng[1]) + return err +} + +func (c *Cleaner) cleanHeaderMetaData(tx *sqlx.Tx, rng [2]uint64) error { + pgStr := `DELETE FROM eth.header_cids + WHERE block_number BETWEEN $1 AND $2` + _, err := tx.Exec(pgStr, rng[0], rng[1]) + return err +} diff --git a/pkg/super_node/shared/chain_type.go b/pkg/super_node/shared/chain_type.go index 90faf946..83445192 100644 --- a/pkg/super_node/shared/chain_type.go +++ b/pkg/super_node/shared/chain_type.go @@ -25,7 +25,7 @@ import ( type ChainType int const ( - Unknown ChainType = iota + UnknownChain ChainType = iota Ethereum Bitcoin Omni @@ -66,6 +66,6 @@ func NewChainType(name string) (ChainType, error) { case "omni": return Omni, nil default: - return Unknown, errors.New("invalid name for chain") + return UnknownChain, errors.New("invalid name for chain") } } diff --git a/pkg/super_node/shared/data_type.go b/pkg/super_node/shared/data_type.go new file mode 100644 index 00000000..0c219c6a --- /dev/null +++ b/pkg/super_node/shared/data_type.go @@ -0,0 +1,94 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package shared + +import ( + "fmt" + "strings" +) + +// DataType is an enum to loosely represent type of chain data +type DataType int + +const ( + UnknownDataType DataType = iota - 1 + Full + Headers + Transactions + Receipts + State + Storage +) + +// String() method to resolve ReSyncType enum +func (r DataType) String() string { + switch r { + case Full: + return "full" + case Headers: + return "headers" + case Transactions: + return "transactions" + case Receipts: + return "receipts" + case State: + return "state" + case Storage: + return "storage" + default: + return "unknown" + } +} + +// GenerateResyncTypeFromString +func GenerateResyncTypeFromString(str string) (DataType, error) { + switch strings.ToLower(str) { + case "full", "f": + return Full, nil + case "headers", "header", "h": + return Headers, nil + case "transactions", "transaction", "trxs", "txs", "trx", "tx", "t": + return Transactions, nil + case "receipts", "receipt", "rcts", "rct", "r": + return Receipts, nil + case "state": + return State, nil + case "storage": + return Storage, nil + default: + return UnknownDataType, fmt.Errorf("unrecognized resync type: %s", str) + } +} + +func SupportedResyncType(d DataType) bool { + switch d { + case Full: + return true + case Headers: + return false + case Transactions: + return false + case Receipts: + return false + case State: + return false + case Storage: + return false + default: + return false + } +} diff --git a/pkg/super_node/shared/intefaces.go b/pkg/super_node/shared/intefaces.go index f62bba76..72940422 100644 --- a/pkg/super_node/shared/intefaces.go +++ b/pkg/super_node/shared/intefaces.go @@ -74,6 +74,11 @@ type DagPutter interface { DagPut(raw interface{}) ([]string, error) } +// Cleaner is for cleaning out data from the cache within the given ranges +type Cleaner interface { + Clean(rngs [][2]uint64, t DataType) error +} + // SubscriptionSettings is the interface every subscription filter type needs to satisfy, no matter the chain // Further specifics of the underlying filter type depend on the internal needs of the types // which satisfy the ResponseFilterer and CIDRetriever interfaces for a specific chain diff --git a/pkg/watcher/eth/repository.go b/pkg/watcher/eth/repository.go index cde4255d..1791697e 100644 --- a/pkg/watcher/eth/repository.go +++ b/pkg/watcher/eth/repository.go @@ -82,13 +82,11 @@ func (r *Repository) QueueData(payload super_node.SubscriptionPayload) error { return err } -// GetQueueData grabs payload data from the queue table so that it can -// be forwarded to the ready tables -// this is used to make sure we enter data into the tables that triggers act on in sequential order -// even if we receive data out-of-order -// it returns the new index -// delete the data it retrieves so as to clear the queue -// periodically vacuum's the table to free up space from the deleted rows +// GetQueueData grabs payload data from the queue table so that it can be readied +// Used ensure we enter data into the tables that triggers act on in sequential order, even if we receive data out-of-order +// Returns the queued data, the new index, and err +// Deletes from the queue the data it retrieves +// Periodically vacuum's the table to free up space from the deleted rows func (r *Repository) GetQueueData(height int64) (super_node.SubscriptionPayload, int64, error) { pgStr := `DELETE FROM eth.queued_data WHERE height = $1