backfill concurrently, one block at a time, while normal streaming continues

This commit is contained in:
Ian Norden 2019-07-02 12:38:12 -05:00
parent f2efbb5d01
commit 905585b212
5 changed files with 126 additions and 89 deletions

View File

@ -49,6 +49,7 @@ func NewIPLDFetcher(ipfsPath string) (*EthIPLDFetcher, error) {
// FetchCIDs is the exported method for fetching and returning all the cids passed in a CidWrapper // FetchCIDs is the exported method for fetching and returning all the cids passed in a CidWrapper
func (f *EthIPLDFetcher) FetchCIDs(cids CidWrapper) (*IpldWrapper, error) { func (f *EthIPLDFetcher) FetchCIDs(cids CidWrapper) (*IpldWrapper, error) {
log.Debug("fetching iplds") log.Debug("fetching iplds")
blocks := &IpldWrapper{ blocks := &IpldWrapper{
BlockNumber: cids.BlockNumber, BlockNumber: cids.BlockNumber,

View File

@ -54,3 +54,10 @@ func HexToKey(hex string) common.Hash {
addr := common.FromHex(hex) addr := common.FromHex(hex)
return crypto.Keccak256Hash(addr[:]) return crypto.Keccak256Hash(addr[:])
} }
func emptyCidWrapper(cids CidWrapper) bool {
if len(cids.Transactions) > 0 || len(cids.Headers) > 0 || len(cids.Uncles) > 0 || len(cids.Receipts) > 0 || len(cids.StateNodes) > 0 || len(cids.StorageNodes) > 0 || cids.BlockNumber == nil {
return false
}
return true
}

View File

@ -37,6 +37,7 @@ func NewIPLDResolver() *EthIPLDResolver {
// ResolveIPLDs is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper // ResolveIPLDs is the exported method for resolving all of the ETH IPLDs packaged in an IpfsBlockWrapper
func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IpldWrapper) (*ResponsePayload, error) { func (eir *EthIPLDResolver) ResolveIPLDs(ipfsBlocks IpldWrapper) (*ResponsePayload, error) {
response := new(ResponsePayload) response := new(ResponsePayload)
response.BlockNumber = ipfsBlocks.BlockNumber
eir.resolveHeaders(ipfsBlocks.Headers, response) eir.resolveHeaders(ipfsBlocks.Headers, response)
eir.resolveUncles(ipfsBlocks.Uncles, response) eir.resolveUncles(ipfsBlocks.Uncles, response)
eir.resolveTransactions(ipfsBlocks.Transactions, response) eir.resolveTransactions(ipfsBlocks.Transactions, response)

View File

@ -29,7 +29,9 @@ import (
// CIDRetriever is the interface for retrieving CIDs from the Postgres cache // CIDRetriever is the interface for retrieving CIDs from the Postgres cache
type CIDRetriever interface { type CIDRetriever interface {
RetrieveCIDs(streamFilters config.Subscription) ([]CidWrapper, error) RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*CidWrapper, error)
RetrieveLastBlockNumber() (int64, error)
RetrieveFirstBlockNumber() (int64, error)
} }
// EthCIDRetriever is the underlying struct supporting the CIDRetriever interface // EthCIDRetriever is the underlying struct supporting the CIDRetriever interface
@ -44,47 +46,42 @@ func NewCIDRetriever(db *postgres.DB) *EthCIDRetriever {
} }
} }
func (ecr *EthCIDRetriever) RetrieveFirstBlockNumber() (int64, error) {
var blockNumber int64
err := ecr.db.Get(&blockNumber, "SELECT block_number FROM header_cids ORDER BY block_number ASC LIMIT 1")
return blockNumber, err
}
// GetLastBlockNumber is used to retrieve the latest block number in the cache // GetLastBlockNumber is used to retrieve the latest block number in the cache
func (ecr *EthCIDRetriever) GetLastBlockNumber() (int64, error) { func (ecr *EthCIDRetriever) RetrieveLastBlockNumber() (int64, error) {
var blockNumber int64 var blockNumber int64
err := ecr.db.Get(&blockNumber, "SELECT block_number FROM header_cids ORDER BY block_number DESC LIMIT 1 ") err := ecr.db.Get(&blockNumber, "SELECT block_number FROM header_cids ORDER BY block_number DESC LIMIT 1 ")
return blockNumber, err return blockNumber, err
} }
// RetrieveCIDs is used to retrieve all of the CIDs which conform to the passed StreamFilters // RetrieveCIDs is used to retrieve all of the CIDs which conform to the passed StreamFilters
func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]CidWrapper, error) { func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription, blockNumber int64) (*CidWrapper, error) {
log.Debug("retrieving cids") log.Debug("retrieving cids")
var endingBlock int64
var err error var err error
if streamFilters.EndingBlock.Int64() <= 0 || streamFilters.EndingBlock.Int64() <= streamFilters.StartingBlock.Int64() {
endingBlock, err = ecr.GetLastBlockNumber()
if err != nil {
return nil, err
}
}
cids := make([]CidWrapper, 0, endingBlock+1-streamFilters.StartingBlock.Int64())
tx, err := ecr.db.Beginx() tx, err := ecr.db.Beginx()
if err != nil { if err != nil {
return nil, err return nil, err
} }
log.Debug("backfill starting block:", streamFilters.StartingBlock)
log.Debug("backfill ending block:", endingBlock)
// THIS IS SUPER EXPENSIVE HAVING TO CYCLE THROUGH EACH BLOCK, NEED BETTER WAY TO FETCH CIDS // THIS IS SUPER EXPENSIVE HAVING TO CYCLE THROUGH EACH BLOCK, NEED BETTER WAY TO FETCH CIDS
// WHILE STILL MAINTAINING RELATION INFO ABOUT WHAT BLOCK THE CIDS BELONG TO // WHILE STILL MAINTAINING RELATION INFO ABOUT WHAT BLOCK THE CIDS BELONG TO
for i := streamFilters.StartingBlock.Int64(); i <= endingBlock; i++ { cw := new(CidWrapper)
cw := CidWrapper{} cw.BlockNumber = big.NewInt(blockNumber)
cw.BlockNumber = big.NewInt(i)
// Retrieve cached header CIDs // Retrieve cached header CIDs
if !streamFilters.HeaderFilter.Off { if !streamFilters.HeaderFilter.Off {
cw.Headers, err = ecr.retrieveHeaderCIDs(tx, streamFilters, i) cw.Headers, err = ecr.retrieveHeaderCIDs(tx, streamFilters, blockNumber)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
log.Error("header cid retrieval error") log.Error("header cid retrieval error")
return nil, err return nil, err
} }
if !streamFilters.HeaderFilter.FinalOnly { if !streamFilters.HeaderFilter.FinalOnly {
cw.Uncles, err = ecr.retrieveUncleCIDs(tx, streamFilters, i) cw.Uncles, err = ecr.retrieveUncleCIDs(tx, streamFilters, blockNumber)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
log.Error("header cid retrieval error") log.Error("header cid retrieval error")
@ -96,7 +93,7 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C
// Retrieve cached trx CIDs // Retrieve cached trx CIDs
var trxIds []int64 var trxIds []int64
if !streamFilters.TrxFilter.Off { if !streamFilters.TrxFilter.Off {
cw.Transactions, trxIds, err = ecr.retrieveTrxCIDs(tx, streamFilters, i) cw.Transactions, trxIds, err = ecr.retrieveTrxCIDs(tx, streamFilters, blockNumber)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
log.Error("transaction cid retrieval error") log.Error("transaction cid retrieval error")
@ -106,7 +103,7 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C
// Retrieve cached receipt CIDs // Retrieve cached receipt CIDs
if !streamFilters.ReceiptFilter.Off { if !streamFilters.ReceiptFilter.Off {
cw.Receipts, err = ecr.retrieveRctCIDs(tx, streamFilters, i, trxIds) cw.Receipts, err = ecr.retrieveRctCIDs(tx, streamFilters, blockNumber, trxIds)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
log.Error("receipt cid retrieval error") log.Error("receipt cid retrieval error")
@ -116,7 +113,7 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C
// Retrieve cached state CIDs // Retrieve cached state CIDs
if !streamFilters.StateFilter.Off { if !streamFilters.StateFilter.Off {
cw.StateNodes, err = ecr.retrieveStateCIDs(tx, streamFilters, i) cw.StateNodes, err = ecr.retrieveStateCIDs(tx, streamFilters, blockNumber)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
log.Error("state cid retrieval error") log.Error("state cid retrieval error")
@ -126,17 +123,15 @@ func (ecr *EthCIDRetriever) RetrieveCIDs(streamFilters config.Subscription) ([]C
// Retrieve cached storage CIDs // Retrieve cached storage CIDs
if !streamFilters.StorageFilter.Off { if !streamFilters.StorageFilter.Off {
cw.StorageNodes, err = ecr.retrieveStorageCIDs(tx, streamFilters, i) cw.StorageNodes, err = ecr.retrieveStorageCIDs(tx, streamFilters, blockNumber)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
log.Error("storage cid retrieval error") log.Error("storage cid retrieval error")
return nil, err return nil, err
} }
} }
cids = append(cids, cw)
}
return cids, tx.Commit() return cw, tx.Commit()
} }
func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]string, error) { func (ecr *EthCIDRetriever) retrieveHeaderCIDs(tx *sqlx.Tx, streamFilters config.Subscription, blockNumber int64) ([]string, error) {

View File

@ -294,19 +294,50 @@ func (sap *Service) Subscribe(id rpc.ID, sub chan<- ResponsePayload, quitChan ch
func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscription) { func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscription) {
log.Debug("back-filling data for id", id) log.Debug("back-filling data for id", id)
// Retrieve cached CIDs relevant to this subscriber // Retrieve cached CIDs relevant to this subscriber
cidWrappers, err := sap.Retriever.RetrieveCIDs(con) var endingBlock int64
var startingBlock int64
var err error
startingBlock, err = sap.Retriever.RetrieveFirstBlockNumber()
if err != nil {
sub.PayloadChan <- ResponsePayload{
ErrMsg: "unable to set block range; error: " + err.Error(),
}
}
if startingBlock < con.StartingBlock.Int64() {
startingBlock = con.StartingBlock.Int64()
}
if con.EndingBlock.Int64() <= 0 || con.EndingBlock.Int64() <= startingBlock {
endingBlock, err = sap.Retriever.RetrieveLastBlockNumber()
if err != nil {
sub.PayloadChan <- ResponsePayload{
ErrMsg: "unable to set block range; error: " + err.Error(),
}
}
}
log.Debug("backfill starting block:", con.StartingBlock)
log.Debug("backfill ending block:", endingBlock)
// Backfilled payloads are sent concurrently to the streamed payloads, so the receiver needs to pay attention to
// the blocknumbers in the payloads they receive to keep things in order
// TODO: separate backfill into a different rpc subscription method altogether?
go func() {
for i := con.StartingBlock.Int64(); i <= endingBlock; i++ {
cidWrapper, err := sap.Retriever.RetrieveCIDs(con, i)
if err != nil { if err != nil {
sub.PayloadChan <- ResponsePayload{ sub.PayloadChan <- ResponsePayload{
ErrMsg: "CID retrieval error: " + err.Error(), ErrMsg: "CID retrieval error: " + err.Error(),
} }
continue
} }
for _, cidWrapper := range cidWrappers { if emptyCidWrapper(*cidWrapper) {
blocksWrapper, err := sap.Fetcher.FetchCIDs(cidWrapper) continue
}
blocksWrapper, err := sap.Fetcher.FetchCIDs(*cidWrapper)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
sub.PayloadChan <- ResponsePayload{ sub.PayloadChan <- ResponsePayload{
ErrMsg: "IPLD fetching error: " + err.Error(), ErrMsg: "IPLD fetching error: " + err.Error(),
} }
continue
} }
backFillIplds, err := sap.Resolver.ResolveIPLDs(*blocksWrapper) backFillIplds, err := sap.Resolver.ResolveIPLDs(*blocksWrapper)
if err != nil { if err != nil {
@ -314,14 +345,16 @@ func (sap *Service) backFill(sub Subscription, id rpc.ID, con config.Subscriptio
sub.PayloadChan <- ResponsePayload{ sub.PayloadChan <- ResponsePayload{
ErrMsg: "IPLD resolving error: " + err.Error(), ErrMsg: "IPLD resolving error: " + err.Error(),
} }
continue
} }
select { select {
case sub.PayloadChan <- *backFillIplds: case sub.PayloadChan <- *backFillIplds:
log.Infof("sending seed node back-fill payload to subscription %s", id) log.Infof("sending seed node back-fill payload to subscription %s", id)
default: default:
log.Infof("unable to send back-fill ppayload to subscription %s; channel has no receiver", id) log.Infof("unable to send back-fill payload to subscription %s; channel has no receiver", id)
} }
} }
}()
} }
// Unsubscribe is used to unsubscribe to the StateDiffingService loop // Unsubscribe is used to unsubscribe to the StateDiffingService loop