db cleaners to eth and btc

This commit is contained in:
Ian Norden 2020-03-12 13:51:58 -05:00
parent e5c5422edc
commit b4005eff39
8 changed files with 478 additions and 13 deletions

View File

@ -30,7 +30,7 @@ import (
const (
DefaultMaxBatchSize uint64 = 100
defaultMaxBatchNumber int64 = 10
DefaultMaxBatchNumber int64 = 10
)
// BackFillInterface for filling in gaps in the super node
@ -146,8 +146,7 @@ func (bfs *BackFillService) fillGaps(startingBlock, endingBlock uint64) error {
log.Infof("going to fill in %s gap from %d to %d", bfs.chain.String(), startingBlock, endingBlock)
errChan := make(chan error)
done := make(chan bool)
err := bfs.backFill(startingBlock, endingBlock, errChan, done)
if err != nil {
if err := bfs.backFill(startingBlock, endingBlock, errChan, done); err != nil {
return err
}
for {
@ -184,7 +183,7 @@ func (bfs *BackFillService) backFill(startingBlock, endingBlock uint64, errChan
for _, blockHeights := range blockRangeBins {
// if we have reached our limit of active goroutines
// wait for one to finish before starting the next
if atomic.AddInt64(&activeCount, 1) > defaultMaxBatchNumber {
if atomic.AddInt64(&activeCount, 1) > DefaultMaxBatchNumber {
// this blocks until a process signals it has finished
<-forwardDone
}

View File

@ -0,0 +1,132 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package btc
import (
"fmt"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// Cleaner satisfies the shared.Cleaner interface fo bitcoin
type Cleaner struct {
db *postgres.DB
}
// NewCleaner returns a new Cleaner struct that satisfies the shared.Cleaner interface
func NewCleaner(db *postgres.DB) *Cleaner {
return &Cleaner{
db: db,
}
}
// Clean removes the specified data from the db within the provided block range
func (c *Cleaner) Clean(rngs [][2]uint64, t shared.DataType) error {
tx, err := c.db.Beginx()
if err != nil {
return err
}
for _, rng := range rngs {
if err := c.clean(tx, rng, t); err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
return err
}
}
return tx.Commit()
}
func (c *Cleaner) clean(tx *sqlx.Tx, rng [2]uint64, t shared.DataType) error {
switch t {
case shared.Full:
return c.cleanFull(tx, rng)
case shared.Headers:
if err := c.cleanTransactionIPLDs(tx, rng); err != nil {
return err
}
if err := c.cleanHeaderIPLDs(tx, rng); err != nil {
return err
}
return c.cleanHeaderMetaData(tx, rng)
case shared.Transactions:
if err := c.cleanTransactionIPLDs(tx, rng); err != nil {
return err
}
return c.cleanTransactionMetaData(tx, rng)
default:
return fmt.Errorf("btc cleaner unrecognized type: %s", t.String())
}
return nil
}
func (c *Cleaner) cleanFull(tx *sqlx.Tx, rng [2]uint64) error {
// Clear all of the indexed iplds
pgStr := `DELETE FROM public.blocks A
USING btc.transaction_cids B, btc.header_cids C
WHERE (A.key = B.cid OR A.key = C.cid)
AND B.header_id = C.id
AND C.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
if err != nil {
return err
}
// Clear all the header_cids, this will cascade delete the rest of the index metadata
pgStr = `DELETE FROM eth.header_cids
WHERE block_number BETWEEN $1 AND $2`
_, err = tx.Exec(pgStr, rng[0], rng[1])
return err
}
func (c *Cleaner) cleanTransactionIPLDs(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM public.blocks A
USING btc.transaction_cids B, btc.header_cids C
WHERE A.key = B.cid
AND B.header_id = C.id
AND C.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
return err
}
func (c *Cleaner) cleanTransactionMetaData(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM btc.transaction_cids A
USING btc.header_cids B
WHERE A.header_id = B.id
AND B.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
return err
}
func (c *Cleaner) cleanHeaderIPLDs(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM public.blocks A
USING btc.header_cids B
WHERE A.key = B.cid
AND B.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
return err
}
func (c *Cleaner) cleanHeaderMetaData(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM btc.header_cids
WHERE block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
return err
}

View File

@ -166,3 +166,14 @@ func NewPublicAPI(chain shared.ChainType, db *postgres.DB, ipfsPath string) (rpc
return rpc.API{}, fmt.Errorf("invalid chain %s for public api constructor", chain.String())
}
}
// NewCleaner constructs a Cleaner for the provided chain type
func NewCleaner(chain shared.ChainType, db *postgres.DB) (shared.Cleaner, error) {
switch chain {
case shared.Ethereum:
return eth.NewCleaner(db), nil
// TODO: support BTC
default:
return nil, fmt.Errorf("invalid chain %s for publisher constructor", chain.String())
}
}

View File

@ -0,0 +1,226 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package eth
import (
"fmt"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// Cleaner satisfies the shared.Cleaner interface fo ethereum
type Cleaner struct {
db *postgres.DB
}
// NewCleaner returns a new Cleaner struct that satisfies the shared.Cleaner interface
func NewCleaner(db *postgres.DB) *Cleaner {
return &Cleaner{
db: db,
}
}
// Clean removes the specified data from the db within the provided block range
func (c *Cleaner) Clean(rngs [][2]uint64, t shared.DataType) error {
tx, err := c.db.Beginx()
if err != nil {
return err
}
for _, rng := range rngs {
if err := c.clean(tx, rng, t); err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
return err
}
}
return tx.Commit()
}
func (c *Cleaner) clean(tx *sqlx.Tx, rng [2]uint64, t shared.DataType) error {
switch t {
case shared.Full:
return c.cleanFull(tx, rng)
case shared.Headers:
if err := c.cleanStorageIPLDs(tx, rng); err != nil {
return err
}
if err := c.cleanStateIPLDs(tx, rng); err != nil {
return err
}
if err := c.cleanReceiptIPLDs(tx, rng); err != nil {
return err
}
if err := c.cleanTransactionIPLDs(tx, rng); err != nil {
return err
}
if err := c.cleanHeaderIPLDs(tx, rng); err != nil {
return err
}
return c.cleanHeaderMetaData(tx, rng)
case shared.Transactions:
if err := c.cleanReceiptIPLDs(tx, rng); err != nil {
return err
}
if err := c.cleanTransactionIPLDs(tx, rng); err != nil {
return err
}
return c.cleanTransactionMetaData(tx, rng)
case shared.Receipts:
if err := c.cleanReceiptIPLDs(tx, rng); err != nil {
return err
}
return c.cleanReceiptMetaData(tx, rng)
case shared.State:
if err := c.cleanStorageIPLDs(tx, rng); err != nil {
return err
}
if err := c.cleanStateIPLDs(tx, rng); err != nil {
return err
}
return c.cleanStateMetaData(tx, rng)
case shared.Storage:
if err := c.cleanStorageIPLDs(tx, rng); err != nil {
return err
}
return c.cleanStorageMetaData(tx, rng)
default:
return fmt.Errorf("eth cleaner unrecognized type: %s", t.String())
}
return nil
}
func (c *Cleaner) cleanFull(tx *sqlx.Tx, rng [2]uint64) error {
// Clear all of the indexed iplds
pgStr := `DELETE FROM public.blocks A
USING eth.storage_cids B, eth.state_cids C, eth.receipt_cids D, eth.transaction_cids E, eth.header_cids F
WHERE (A.key = B.cid OR A.key = C.cid OR A.key = D.cid OR A.key = E.cid OR A.key = F.cid)
AND B.state_id = C.id
AND C.header_id = F.id
AND D.tx_id = E.id
AND E.header_id = F.id
AND F.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
if err != nil {
return err
}
// Clear all the header_cids, this will cascade delete the rest of the index metadata
pgStr = `DELETE FROM eth.header_cids
WHERE block_number BETWEEN $1 AND $2`
_, err = tx.Exec(pgStr, rng[0], rng[1])
return err
}
func (c *Cleaner) cleanStorageIPLDs(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM public.blocks A
USING eth.storage_cids B, eth.state_cids C, eth.header_cids D
WHERE A.key = B.cid
AND B.state_id = C.id
AND C.header_id = D.id
AND D.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
return err
}
func (c *Cleaner) cleanStorageMetaData(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM eth.storage_cids A
USING eth.state_cids B, eth.header_cids C
WHERE A.state_id = B.id
AND B.header_id = C.id
AND C.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
return err
}
func (c *Cleaner) cleanStateIPLDs(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM public.blocks A
USING eth.state_cids B, eth.header_cids C
WHERE A.key = B.cid
AND B.header_cid = C.id
AND C.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
return err
}
func (c *Cleaner) cleanStateMetaData(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM eth.state_cids A
USING eth.header_cids B
WHERE A.header_id = B.id
AND B.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
return err
}
func (c *Cleaner) cleanReceiptIPLDs(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM public.blocks A
USING eth.receipt_cids B, eth.transaction_cids C, eth.header_cids D
WHERE A.key = B.cid
AND B.tx_id = C.id
AND C.header_id = D.id
AND D.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
return err
}
func (c *Cleaner) cleanReceiptMetaData(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM eth.receipt_cids A
USING eth.transaction_cids B, eth.header_cids C
WHERE A.tx_id = B.id
AND B.header_id = C.id
AND C.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
return err
}
func (c *Cleaner) cleanTransactionIPLDs(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM public.blocks A
USING eth.transaction_cids B, eth.header_cids C
WHERE A.key = B.cid
AND B.header_id = C.id
AND C.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
return err
}
func (c *Cleaner) cleanTransactionMetaData(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM eth.transaction_cids A
USING eth.header_cids B
WHERE A.header_id = B.id
AND B.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
return err
}
func (c *Cleaner) cleanHeaderIPLDs(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM public.blocks A
USING eth.header_cids B
WHERE A.key = B.cid
AND B.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
return err
}
func (c *Cleaner) cleanHeaderMetaData(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM eth.header_cids
WHERE block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
return err
}

View File

@ -25,7 +25,7 @@ import (
type ChainType int
const (
Unknown ChainType = iota
UnknownChain ChainType = iota
Ethereum
Bitcoin
Omni
@ -66,6 +66,6 @@ func NewChainType(name string) (ChainType, error) {
case "omni":
return Omni, nil
default:
return Unknown, errors.New("invalid name for chain")
return UnknownChain, errors.New("invalid name for chain")
}
}

View File

@ -0,0 +1,94 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package shared
import (
"fmt"
"strings"
)
// DataType is an enum to loosely represent type of chain data
type DataType int
const (
UnknownDataType DataType = iota - 1
Full
Headers
Transactions
Receipts
State
Storage
)
// String() method to resolve ReSyncType enum
func (r DataType) String() string {
switch r {
case Full:
return "full"
case Headers:
return "headers"
case Transactions:
return "transactions"
case Receipts:
return "receipts"
case State:
return "state"
case Storage:
return "storage"
default:
return "unknown"
}
}
// GenerateResyncTypeFromString
func GenerateResyncTypeFromString(str string) (DataType, error) {
switch strings.ToLower(str) {
case "full", "f":
return Full, nil
case "headers", "header", "h":
return Headers, nil
case "transactions", "transaction", "trxs", "txs", "trx", "tx", "t":
return Transactions, nil
case "receipts", "receipt", "rcts", "rct", "r":
return Receipts, nil
case "state":
return State, nil
case "storage":
return Storage, nil
default:
return UnknownDataType, fmt.Errorf("unrecognized resync type: %s", str)
}
}
func SupportedResyncType(d DataType) bool {
switch d {
case Full:
return true
case Headers:
return false
case Transactions:
return false
case Receipts:
return false
case State:
return false
case Storage:
return false
default:
return false
}
}

View File

@ -74,6 +74,11 @@ type DagPutter interface {
DagPut(raw interface{}) ([]string, error)
}
// Cleaner is for cleaning out data from the cache within the given ranges
type Cleaner interface {
Clean(rngs [][2]uint64, t DataType) error
}
// SubscriptionSettings is the interface every subscription filter type needs to satisfy, no matter the chain
// Further specifics of the underlying filter type depend on the internal needs of the types
// which satisfy the ResponseFilterer and CIDRetriever interfaces for a specific chain

View File

@ -82,13 +82,11 @@ func (r *Repository) QueueData(payload super_node.SubscriptionPayload) error {
return err
}
// GetQueueData grabs payload data from the queue table so that it can
// be forwarded to the ready tables
// this is used to make sure we enter data into the tables that triggers act on in sequential order
// even if we receive data out-of-order
// it returns the new index
// delete the data it retrieves so as to clear the queue
// periodically vacuum's the table to free up space from the deleted rows
// GetQueueData grabs payload data from the queue table so that it can be readied
// Used ensure we enter data into the tables that triggers act on in sequential order, even if we receive data out-of-order
// Returns the queued data, the new index, and err
// Deletes from the queue the data it retrieves
// Periodically vacuum's the table to free up space from the deleted rows
func (r *Repository) GetQueueData(height int64) (super_node.SubscriptionPayload, int64, error) {
pgStr := `DELETE FROM eth.queued_data
WHERE height = $1