update version; minor refactoring

This commit is contained in:
Ian Norden 2020-05-02 00:02:05 -05:00
parent 9b0ded692b
commit dfb66eb67b
16 changed files with 127 additions and 159 deletions

View File

@ -1,43 +0,0 @@
// Copyright © 2020 Vulcanize, Inc
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"fmt"
"github.com/spf13/cobra"
)
// watchCmd represents the watch command
var watchCmd = &cobra.Command{
Use: "watch",
Short: "Watch and transform data from a chain source",
Long: `This command allows one to configure a set of wasm functions and SQL trigger functions
that call them to watch and transform data from the specified chain source.
A watcher is composed of four parts:
1) Go execution engine- this command- which fetches raw chain data and adds it to the Postres queued ready data tables
2) TOML config file which specifies what subset of chain data to fetch and from where and contains references to the below
3) Set of WASM binaries which are loaded into Postgres and used by
4) Set of PostgreSQL trigger functions which automatically act on data as it is inserted into the queued ready data tables`,
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("watch called")
},
}
func init() {
rootCmd.AddCommand(watchCmd)
}

View File

@ -27,6 +27,7 @@ import (
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/utilities"
) )
const ( const (
@ -63,7 +64,7 @@ func (bf *backFiller) BackFill(startingBlock, endingBlock uint64, backFill chan
logrus.Infof("going to fill in gap from %d to %d", startingBlock, endingBlock) logrus.Infof("going to fill in gap from %d to %d", startingBlock, endingBlock)
// break the range up into bins of smaller ranges // break the range up into bins of smaller ranges
blockRangeBins, err := utils.GetBlockHeightBins(startingBlock, endingBlock, bf.batchSize) blockRangeBins, err := utilities.GetBlockHeightBins(startingBlock, endingBlock, bf.batchSize)
if err != nil { if err != nil {
return err return err
} }

View File

@ -1,72 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package utils
import (
"errors"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
func GetBlockHeightBins(startingBlock, endingBlock, batchSize uint64) ([][]uint64, error) {
if endingBlock < startingBlock {
return nil, errors.New("backfill: ending block number needs to be greater than starting block number")
}
if batchSize == 0 {
return nil, errors.New("backfill: batchsize needs to be greater than zero")
}
length := endingBlock - startingBlock + 1
numberOfBins := length / batchSize
remainder := length % batchSize
if remainder != 0 {
numberOfBins++
}
blockRangeBins := make([][]uint64, numberOfBins)
for i := range blockRangeBins {
nextBinStart := startingBlock + batchSize
blockRange := make([]uint64, 0, nextBinStart-startingBlock+1)
for j := startingBlock; j < nextBinStart && j <= endingBlock; j++ {
blockRange = append(blockRange, j)
}
startingBlock = nextBinStart
blockRangeBins[i] = blockRange
}
return blockRangeBins, nil
}
func MissingHeightsToGaps(heights []uint64) []shared.Gap {
validationGaps := make([]shared.Gap, 0)
start := heights[0]
lastHeight := start
for i, height := range heights[1:] {
if height != lastHeight+1 {
validationGaps = append(validationGaps, shared.Gap{
Start: start,
Stop: lastHeight,
})
start = height
}
if i+2 == len(heights) {
validationGaps = append(validationGaps, shared.Gap{
Start: start,
Stop: height,
})
}
lastHeight = height
}
return validationGaps
}

View File

@ -0,0 +1,36 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package utilities_test
import (
"io/ioutil"
"testing"
"github.com/sirupsen/logrus"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestShared(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Shared Utilities Suite")
}
var _ = BeforeSuite(func() {
logrus.SetOutput(ioutil.Discard)
})

View File

@ -16,9 +16,62 @@
package utilities package utilities
func NullToZero(str string) string { import (
if str == "" { "errors"
return "0"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
)
// GetBlockHeightBins splits a block range up into bins of block heights of the given batch size
func GetBlockHeightBins(startingBlock, endingBlock, batchSize uint64) ([][]uint64, error) {
if endingBlock < startingBlock {
return nil, errors.New("backfill: ending block number needs to be greater than starting block number")
} }
return str if batchSize == 0 {
return nil, errors.New("backfill: batchsize needs to be greater than zero")
}
length := endingBlock - startingBlock + 1
numberOfBins := length / batchSize
remainder := length % batchSize
if remainder != 0 {
numberOfBins++
}
blockRangeBins := make([][]uint64, numberOfBins)
for i := range blockRangeBins {
nextBinStart := startingBlock + batchSize
blockRange := make([]uint64, 0, nextBinStart-startingBlock+1)
for j := startingBlock; j < nextBinStart && j <= endingBlock; j++ {
blockRange = append(blockRange, j)
}
startingBlock = nextBinStart
blockRangeBins[i] = blockRange
}
return blockRangeBins, nil
}
// MissingHeightsToGaps returns a slice of gaps from a slice of missing block heights
func MissingHeightsToGaps(heights []uint64) []shared.Gap {
if len(heights) == 0 {
return nil
}
validationGaps := make([]shared.Gap, 0)
start := heights[0]
lastHeight := start
for i, height := range heights[1:] {
if height != lastHeight+1 {
validationGaps = append(validationGaps, shared.Gap{
Start: start,
Stop: lastHeight,
})
start = height
}
if i+2 == len(heights) {
validationGaps = append(validationGaps, shared.Gap{
Start: start,
Stop: height,
})
}
lastHeight = height
}
return validationGaps
} }

View File

@ -14,13 +14,13 @@
// You should have received a copy of the GNU Affero General Public License // You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>. // along with this program. If not, see <http://www.gnu.org/licenses/>.
package utils_test package utilities_test
import ( import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" utils "github.com/vulcanize/vulcanizedb/libraries/shared/utilities"
) )
var _ = Describe("GetBlockHeightBins", func() { var _ = Describe("GetBlockHeightBins", func() {

View File

@ -24,7 +24,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" utils "github.com/vulcanize/vulcanizedb/libraries/shared/utilities"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
) )

View File

@ -17,17 +17,16 @@
package btc package btc
import ( import (
"database/sql"
"fmt" "fmt"
"math/big" "math/big"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/lib/pq"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/lib/pq"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
utils "github.com/vulcanize/vulcanizedb/libraries/shared/utilities"
"github.com/vulcanize/vulcanizedb/pkg/postgres" "github.com/vulcanize/vulcanizedb/pkg/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
) )
@ -179,7 +178,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
Start uint64 `db:"start"` Start uint64 `db:"start"`
Stop uint64 `db:"stop"` Stop uint64 `db:"stop"`
}, 0) }, 0)
if err := ecr.db.Select(&results, pgStr); err != nil { if err := ecr.db.Select(&results, pgStr); err != nil && err != sql.ErrNoRows {
return nil, err return nil, err
} }
emptyGaps := make([]shared.Gap, len(results)) emptyGaps := make([]shared.Gap, len(results))
@ -196,12 +195,9 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
WHERE times_validated < $1 WHERE times_validated < $1
ORDER BY block_number` ORDER BY block_number`
var heights []uint64 var heights []uint64
if err := ecr.db.Select(&heights, pgStr, validationLevel); err != nil { if err := ecr.db.Select(&heights, pgStr, validationLevel); err != nil && err != sql.ErrNoRows {
return nil, err return nil, err
} }
if len(heights) == 0 {
return emptyGaps, nil
}
return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil
} }

View File

@ -17,17 +17,17 @@
package eth package eth
import ( import (
"database/sql"
"fmt" "fmt"
"math/big" "math/big"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/lib/pq" "github.com/lib/pq"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
utils "github.com/vulcanize/vulcanizedb/libraries/shared/utilities"
"github.com/vulcanize/vulcanizedb/pkg/postgres" "github.com/vulcanize/vulcanizedb/pkg/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
) )
@ -452,7 +452,7 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
Start uint64 `db:"start"` Start uint64 `db:"start"`
Stop uint64 `db:"stop"` Stop uint64 `db:"stop"`
}, 0) }, 0)
if err := ecr.db.Select(&results, pgStr); err != nil { if err := ecr.db.Select(&results, pgStr); err != nil && err != sql.ErrNoRows {
return nil, err return nil, err
} }
emptyGaps := make([]shared.Gap, len(results)) emptyGaps := make([]shared.Gap, len(results))
@ -469,12 +469,9 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
WHERE times_validated < $1 WHERE times_validated < $1
ORDER BY block_number` ORDER BY block_number`
var heights []uint64 var heights []uint64
if err := ecr.db.Select(&heights, pgStr, validationLevel); err != nil { if err := ecr.db.Select(&heights, pgStr, validationLevel); err != nil && err != sql.ErrNoRows {
return nil, err return nil, err
} }
if len(heights) == 0 {
return emptyGaps, nil
}
return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil return append(emptyGaps, utils.MissingHeightsToGaps(heights)...), nil
} }

View File

@ -152,7 +152,7 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
err = pub.publishAndIndexStateAndStorage(tx, ipldPayload, headerID) err = pub.publishAndIndexStateAndStorage(tx, ipldPayload, headerID)
// This IPLDPublisher does both publishing and indexing, we do not need to pass anything forward to the indexer // This IPLDPublisher does both publishing and indexing, we do not need to pass anything forward to the indexer
return nil, err // return err variable explicity so that we return the err = tx.Commit() assignment in the defer return nil, err // return err variable explicitly so that we return the err = tx.Commit() assignment in the defer
} }
func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, ipldPayload ConvertedPayload, headerID int64) error { func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, ipldPayload ConvertedPayload, headerID int64) error {

View File

@ -82,8 +82,8 @@ func NewReSyncConfig() (*Config, error) {
viper.BindEnv("resync.timeout", shared.HTTP_TIMEOUT) viper.BindEnv("resync.timeout", shared.HTTP_TIMEOUT)
timeout := viper.GetInt("resync.timeout") timeout := viper.GetInt("resync.timeout")
if timeout < 15 { if timeout < 5 {
timeout = 15 timeout = 5
} }
c.Timeout = time.Second * time.Duration(timeout) c.Timeout = time.Second * time.Duration(timeout)
@ -104,7 +104,7 @@ func NewReSyncConfig() (*Config, error) {
} }
} }
resyncType := viper.GetString("resync.type") resyncType := viper.GetString("resync.type")
c.ResyncType, err = shared.GenerateResyncTypeFromString(resyncType) c.ResyncType, err = shared.GenerateDataTypeFromString(resyncType)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -113,7 +113,7 @@ func NewReSyncConfig() (*Config, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if ok, err := shared.SupportedResyncType(c.ResyncType, c.Chain); !ok { if ok, err := shared.SupportedDataType(c.ResyncType, c.Chain); !ok {
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -22,7 +22,7 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" utils "github.com/vulcanize/vulcanizedb/libraries/shared/utilities"
"github.com/vulcanize/vulcanizedb/pkg/super_node" "github.com/vulcanize/vulcanizedb/pkg/super_node"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
) )

View File

@ -57,8 +57,8 @@ func (r DataType) String() string {
} }
} }
// GenerateResyncTypeFromString // GenerateDataTypeFromString
func GenerateResyncTypeFromString(str string) (DataType, error) { func GenerateDataTypeFromString(str string) (DataType, error) {
switch strings.ToLower(str) { switch strings.ToLower(str) {
case "full", "f": case "full", "f":
return Full, nil return Full, nil
@ -79,7 +79,7 @@ func GenerateResyncTypeFromString(str string) (DataType, error) {
} }
} }
func SupportedResyncType(d DataType, c ChainType) (bool, error) { func SupportedDataType(d DataType, c ChainType) (bool, error) {
switch c { switch c {
case Ethereum: case Ethereum:
switch d { switch d {

View File

@ -90,7 +90,7 @@ func GetIPFSMode() (IPFSMode, error) {
if ipfsMode == "" { if ipfsMode == "" {
return DirectPostgres, nil return DirectPostgres, nil
} }
return NewIPLFMode(ipfsMode) return NewIPFSMode(ipfsMode)
} }
// GetBtcNodeAndClient returns btc node info from path url // GetBtcNodeAndClient returns btc node info from path url

View File

@ -44,15 +44,15 @@ func (c IPFSMode) String() string {
} }
} }
func NewIPLFMode(name string) (IPFSMode, error) { func NewIPFSMode(name string) (IPFSMode, error) {
switch strings.ToLower(name) { switch strings.ToLower(name) {
case "local", "interface", "minimal": case "local", "interface", "minimal":
return LocalInterface, nil return LocalInterface, nil
case "remote", "client": case "remote", "client":
return RemoteClient, nil return RemoteClient, errors.New("remote IPFS client mode is not currently supported")
case "postgres", "direct": case "postgres", "direct":
return DirectPostgres, nil return DirectPostgres, nil
default: default:
return Unknown, errors.New("invalid name for ipfs mode") return Unknown, errors.New("unrecognized name for ipfs mode")
} }
} }

View File

@ -19,22 +19,22 @@ package version
import "fmt" import "fmt"
const ( const (
VersionMajor = 0 // Major version component of the current release Major = 0 // Major version component of the current release
VersionMinor = 1 // Minor version component of the current release Minor = 1 // Minor version component of the current release
VersionPatch = 1 // Patch version component of the current release Patch = 2 // Patch version component of the current release
VersionMeta = "alpha" // Version metadata to append to the version string Meta = "alpha" // Version metadata to append to the version string
) )
// Version holds the textual version string. // Version holds the textual version string.
var Version = func() string { var Version = func() string {
return fmt.Sprintf("%d.%d.%d", VersionMajor, VersionMinor, VersionPatch) return fmt.Sprintf("%d.%d.%d", Major, Minor, Patch)
}() }()
// VersionWithMeta holds the textual version string including the metadata. // VersionWithMeta holds the textual version string including the metadata.
var VersionWithMeta = func() string { var VersionWithMeta = func() string {
v := Version v := Version
if VersionMeta != "" { if Meta != "" {
v += "-" + VersionMeta v += "-" + Meta
} }
return v return v
}() }()