header_cids.final => header_cids.uncle

This commit is contained in:
Ian Norden 2019-10-08 15:31:07 -05:00
parent 6880611436
commit 67df8dea77
9 changed files with 20 additions and 21 deletions

View File

@ -175,8 +175,8 @@ func configureSubscription() {
// Below default to false, which means we get all headers by default // Below default to false, which means we get all headers by default
HeaderFilter: config.HeaderFilter{ HeaderFilter: config.HeaderFilter{
Off: viper.GetBool("subscription.headerFilter.off"), Off: viper.GetBool("subscription.headerFilter.off"),
FinalOnly: viper.GetBool("subscription.headerFilter.finalOnly"), Uncles: viper.GetBool("subscription.headerFilter.uncles"),
}, },
// Below defaults to false and two slices of length 0 // Below defaults to false and two slices of length 0

View File

@ -4,7 +4,7 @@ CREATE TABLE public.header_cids (
block_number BIGINT NOT NULL, block_number BIGINT NOT NULL,
block_hash VARCHAR(66) NOT NULL, block_hash VARCHAR(66) NOT NULL,
cid TEXT NOT NULL, cid TEXT NOT NULL,
final BOOLEAN NOT NULL, uncle BOOLEAN NOT NULL,
UNIQUE (block_number, block_hash) UNIQUE (block_number, block_hash)
); );

View File

@ -273,7 +273,7 @@ The config for `streamSubscribe` has the `subscribe` set of parameters, for exam
endingBlock = 0 endingBlock = 0
[subscription.headerFilter] [subscription.headerFilter]
off = false off = false
finalOnly = true uncles = false
[subscription.trxFilter] [subscription.trxFilter]
off = false off = false
src = [ src = [
@ -320,8 +320,8 @@ send that to the subscriber, if this is set to `false` then the super-node only
`subscription.endingBlock` is the ending block number for the range we want to receive data in; `subscription.endingBlock` is the ending block number for the range we want to receive data in;
setting to 0 means there is no end/we will continue indefinitely. setting to 0 means there is no end/we will continue indefinitely.
`subscription.headerFilter` has two sub-options: `off` and `finalOnly`. Setting `off` to true tells the super-node to `subscription.headerFilter` has two sub-options: `off` and `uncles`. Setting `off` to true tells the super-node to
not send any headers to the subscriber; setting `finalOnly` to true tells the super-node to send only canonical headers. not send any headers to the subscriber; setting `uncles` to true tells the super-node to send uncles in addition to normal headers.
`subscription.trxFilter` has three sub-options: `off`, `src`, and `dst`. Setting `off` to true tells the super-node to `subscription.trxFilter` has three sub-options: `off`, `src`, and `dst`. Setting `off` to true tells the super-node to
not send any transactions to the subscriber; `src` and `dst` are string arrays which can be filled with ETH addresses we want to filter transactions for, not send any transactions to the subscriber; `src` and `dst` are string arrays which can be filled with ETH addresses we want to filter transactions for,

View File

@ -32,8 +32,8 @@ type Subscription struct {
} }
type HeaderFilter struct { type HeaderFilter struct {
Off bool Off bool
FinalOnly bool Uncles bool
} }
type TrxFilter struct { type TrxFilter struct {

View File

@ -35,8 +35,7 @@ var _ = Describe("Resolver", func() {
resolver = ipfs.NewIPLDResolver() resolver = ipfs.NewIPLDResolver()
}) })
It("Resolves IPLD data to their correct geth data types and packages them to send to requesting transformers", func() { It("Resolves IPLD data to their correct geth data types and packages them to send to requesting transformers", func() {
superNodePayload, err := resolver.ResolveIPLDs(mocks.MockIPLDWrapper) superNodePayload := resolver.ResolveIPLDs(mocks.MockIPLDWrapper)
Expect(err).ToNot(HaveOccurred())
Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64())) Expect(superNodePayload.BlockNumber.Int64()).To(Equal(mocks.MockSeeNodePayload.BlockNumber.Int64()))
Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp)) Expect(superNodePayload.HeadersRlp).To(Equal(mocks.MockSeeNodePayload.HeadersRlp))
Expect(superNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp)) Expect(superNodePayload.UnclesRlp).To(Equal(mocks.MockSeeNodePayload.UnclesRlp))

View File

@ -71,7 +71,7 @@ func (s *Filterer) FilterResponse(streamFilters config.Subscription, payload ipf
func (s *Filterer) filterHeaders(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error { func (s *Filterer) filterHeaders(streamFilters config.Subscription, response *streamer.SuperNodePayload, payload ipfs.IPLDPayload) error {
if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) { if !streamFilters.HeaderFilter.Off && checkRange(streamFilters.StartingBlock.Int64(), streamFilters.EndingBlock.Int64(), payload.BlockNumber.Int64()) {
response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP) response.HeadersRlp = append(response.HeadersRlp, payload.HeaderRLP)
if !streamFilters.HeaderFilter.FinalOnly { if streamFilters.HeaderFilter.Uncles {
for _, uncle := range payload.BlockBody.Uncles { for _, uncle := range payload.BlockBody.Uncles {
uncleRlp, err := rlp.EncodeToBytes(uncle) uncleRlp, err := rlp.EncodeToBytes(uncle)
if err != nil { if err != nil {

View File

@ -87,17 +87,17 @@ func (repo *Repository) Index(cidPayload *ipfs.CIDPayload) error {
func (repo *Repository) indexHeaderCID(tx *sqlx.Tx, cid, blockNumber, hash string) (int64, error) { func (repo *Repository) indexHeaderCID(tx *sqlx.Tx, cid, blockNumber, hash string) (int64, error) {
var headerID int64 var headerID int64
err := tx.QueryRowx(`INSERT INTO public.header_cids (block_number, block_hash, cid, final) VALUES ($1, $2, $3, $4) err := tx.QueryRowx(`INSERT INTO public.header_cids (block_number, block_hash, cid, uncle) VALUES ($1, $2, $3, $4)
ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, final) = ($3, $4) ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, uncle) = ($3, $4)
RETURNING id`, RETURNING id`,
blockNumber, hash, cid, true).Scan(&headerID) blockNumber, hash, cid, false).Scan(&headerID)
return headerID, err return headerID, err
} }
func (repo *Repository) indexUncleCID(tx *sqlx.Tx, cid, blockNumber, hash string) error { func (repo *Repository) indexUncleCID(tx *sqlx.Tx, cid, blockNumber, hash string) error {
_, err := tx.Exec(`INSERT INTO public.header_cids (block_number, block_hash, cid, final) VALUES ($1, $2, $3, $4) _, err := tx.Exec(`INSERT INTO public.header_cids (block_number, block_hash, cid, uncle) VALUES ($1, $2, $3, $4)
ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, final) = ($3, $4)`, ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, uncle) = ($3, $4)`,
blockNumber, hash, cid, false) blockNumber, hash, cid, true)
return err return err
} }

View File

@ -47,7 +47,7 @@ var _ = Describe("Repository", func() {
err = repo.Index(mocks.MockCIDPayload) err = repo.Index(mocks.MockCIDPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
pgStr := `SELECT cid FROM header_cids pgStr := `SELECT cid FROM header_cids
WHERE block_number = $1 AND final IS TRUE` WHERE block_number = $1 AND uncle IS FALSE`
// check header was properly indexed // check header was properly indexed
headers := make([]string, 0) headers := make([]string, 0)
err = db.Select(&headers, pgStr, 1) err = db.Select(&headers, pgStr, 1)

View File

@ -86,7 +86,7 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, bloc
log.Error("header cid retrieval error") log.Error("header cid retrieval error")
return nil, headersErr return nil, headersErr
} }
if !streamFilters.HeaderFilter.FinalOnly { if streamFilters.HeaderFilter.Uncles {
var unclesErr error var unclesErr error
cw.Uncles, unclesErr = ecr.retrieveUncleCIDs(tx, streamFilters, blockNumber) cw.Uncles, unclesErr = ecr.retrieveUncleCIDs(tx, streamFilters, blockNumber)
if unclesErr != nil { if unclesErr != nil {
@ -164,7 +164,7 @@ func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters config
log.Debug("retrieving header cids for block ", blockNumber) log.Debug("retrieving header cids for block ", blockNumber)
headers := make([]string, 0) headers := make([]string, 0)
pgStr := `SELECT cid FROM header_cids pgStr := `SELECT cid FROM header_cids
WHERE block_number = $1 AND final IS TRUE` WHERE block_number = $1 AND uncle IS FALSE`
err := tx.Select(&headers, pgStr, blockNumber) err := tx.Select(&headers, pgStr, blockNumber)
return headers, err return headers, err
} }
@ -173,7 +173,7 @@ func (ecr *EthCIDRetriever) retrieveUncleCIDs(tx *sqlx.Tx, streamFilters config.
log.Debug("retrieving header cids for block ", blockNumber) log.Debug("retrieving header cids for block ", blockNumber)
headers := make([]string, 0) headers := make([]string, 0)
pgStr := `SELECT cid FROM header_cids pgStr := `SELECT cid FROM header_cids
WHERE block_number = $1 AND final IS FALSE` WHERE block_number = $1 AND uncle IS TRUE`
err := tx.Select(&headers, pgStr, blockNumber) err := tx.Select(&headers, pgStr, blockNumber)
return headers, err return headers, err
} }