2019-11-01 05:29:57 +00:00
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package storage
import (
"fmt"
"sync/atomic"
2019-11-04 20:50:10 +00:00
"github.com/ethereum/go-ethereum/common"
2019-11-01 05:29:57 +00:00
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
)
const (
2019-11-04 20:50:10 +00:00
DefaultMaxBatchSize uint64 = 100
2019-11-01 05:35:10 +00:00
defaultMaxBatchNumber int64 = 10
2019-11-01 05:29:57 +00:00
)
// BackFiller is the backfilling interface
type BackFiller interface {
2019-11-04 20:50:10 +00:00
BackFill ( startingBlock , endingBlock uint64 , backFill chan utils . StorageDiff , errChan chan error , done chan bool ) error
2019-11-01 05:29:57 +00:00
}
// backFiller is the backfilling struct
type backFiller struct {
fetcher fetcher . StateDiffFetcher
batchSize uint64
startingBlock uint64
}
// NewStorageBackFiller returns a BackFiller
2019-11-04 20:50:10 +00:00
func NewStorageBackFiller ( fetcher fetcher . StateDiffFetcher , batchSize uint64 ) BackFiller {
2019-11-01 05:29:57 +00:00
if batchSize == 0 {
batchSize = DefaultMaxBatchSize
}
return & backFiller {
2019-11-04 20:50:10 +00:00
fetcher : fetcher ,
batchSize : batchSize ,
2019-11-01 05:29:57 +00:00
}
}
// BackFill fetches, processes, and returns utils.StorageDiffs over a range of blocks
// It splits a large range up into smaller chunks, batch fetching and processing those chunks concurrently
2019-11-04 20:50:10 +00:00
func ( bf * backFiller ) BackFill ( startingBlock , endingBlock uint64 , backFill chan utils . StorageDiff , errChan chan error , done chan bool ) error {
logrus . Infof ( "going to fill in gap from %d to %d" , startingBlock , endingBlock )
2019-11-01 05:29:57 +00:00
// break the range up into bins of smaller ranges
2019-11-04 20:50:10 +00:00
blockRangeBins , err := utils . GetBlockHeightBins ( startingBlock , endingBlock , bf . batchSize )
if err != nil {
return err
2019-11-01 05:29:57 +00:00
}
// int64 for atomic incrementing and decrementing to track the number of active processing goroutines we have
var activeCount int64
// channel for processing goroutines to signal when they are done
2019-11-01 05:35:10 +00:00
processingDone := make ( chan [ 2 ] uint64 )
forwardDone := make ( chan bool )
2019-11-01 05:29:57 +00:00
2019-11-04 20:50:10 +00:00
// for each block range bin spin up a goroutine to batch fetch and process state diffs in that range
2019-11-01 05:29:57 +00:00
go func ( ) {
for _ , blockHeights := range blockRangeBins {
// if we have reached our limit of active goroutines
// wait for one to finish before starting the next
if atomic . AddInt64 ( & activeCount , 1 ) > defaultMaxBatchNumber {
// this blocks until a process signals it has finished
2019-11-01 05:35:10 +00:00
<- forwardDone
2019-11-01 05:29:57 +00:00
}
2019-11-04 20:50:10 +00:00
go bf . backFillRange ( blockHeights , backFill , errChan , processingDone )
2019-11-01 05:29:57 +00:00
}
} ( )
// goroutine that listens on the processingDone chan
// keeps track of the number of processing goroutines that have finished
// when they have all finished, sends the final signal out
go func ( ) {
goroutinesFinished := 0
for {
select {
2019-11-01 05:35:10 +00:00
case doneWithHeights := <- processingDone :
2019-11-01 05:29:57 +00:00
atomic . AddInt64 ( & activeCount , - 1 )
2019-11-01 05:35:10 +00:00
select {
// if we are waiting for a process to finish, signal that one has
case forwardDone <- true :
default :
}
logrus . Infof ( "finished fetching gap sub-bin from %d to %d" , doneWithHeights [ 0 ] , doneWithHeights [ 1 ] )
2019-11-01 05:29:57 +00:00
goroutinesFinished ++
2019-11-04 20:50:10 +00:00
if goroutinesFinished >= len ( blockRangeBins ) {
2019-11-01 05:29:57 +00:00
done <- true
return
}
}
}
} ( )
return nil
}
2019-11-04 20:50:10 +00:00
func ( bf * backFiller ) backFillRange ( blockHeights [ ] uint64 , diffChan chan utils . StorageDiff , errChan chan error , doneChan chan [ 2 ] uint64 ) {
payloads , fetchErr := bf . fetcher . FetchStateDiffsAt ( blockHeights )
if fetchErr != nil {
errChan <- fetchErr
}
for _ , payload := range payloads {
stateDiff := new ( statediff . StateDiff )
stateDiffDecodeErr := rlp . DecodeBytes ( payload . StateDiffRlp , stateDiff )
if stateDiffDecodeErr != nil {
errChan <- stateDiffDecodeErr
continue
}
accounts := utils . GetAccountsFromDiff ( * stateDiff )
for _ , account := range accounts {
logrus . Trace ( fmt . Sprintf ( "iterating through %d Storage values on account with key %s" , len ( account . Storage ) , common . BytesToHash ( account . Key ) . Hex ( ) ) )
for _ , storage := range account . Storage {
diff , formatErr := utils . FromGethStateDiff ( account , stateDiff , storage )
if formatErr != nil {
logrus . Error ( "failed to format utils.StorageDiff from storage with key: " , common . BytesToHash ( storage . Key ) , "from account with key: " , common . BytesToHash ( account . Key ) )
errChan <- formatErr
continue
}
logrus . Trace ( "adding storage diff to results" ,
"keccak of address: " , diff . HashedAddress . Hex ( ) ,
"block height: " , diff . BlockHeight ,
"storage key: " , diff . StorageKey . Hex ( ) ,
"storage value: " , diff . StorageValue . Hex ( ) )
diffChan <- diff
}
}
}
// when this is done, send out a signal
doneChan <- [ 2 ] uint64 { blockHeights [ 0 ] , blockHeights [ len ( blockHeights ) - 1 ] }
}