laconicd/server/indexer_service.go
yihuang 77ed4aa754
feat!: Store eth tx index separately (#1121)
* Store eth tx index separately

Closes: #1075
Solution:
- run a optional indexer service
- adapt the json-rpc to the more efficient query

changelog

changelog

fix lint

fix backward compatibility

fix lint

timeout

better strconv

fix linter

fix package name

add cli command to index old tx

fix for loop

indexer cmd don't have access to local rpc

workaround exceed block gas limit situation

add unit tests for indexer

refactor

polish the indexer module

Update server/config/toml.go

Co-authored-by: Federico Kunze Küllmer <31522760+fedekunze@users.noreply.github.com>

improve comments

share code between GetTxByEthHash and GetTxByIndex

fix unit test

Update server/indexer.go

Co-authored-by: Freddy Caceres <facs95@gmail.com>

* Apply suggestions from code review

* test enable-indexer in integration test

* fix go lint

* address review suggestions

* fix linter

* address review suggestions

- test indexer in backend unit test
- add comments

* fix build

* fix test

* service name

Co-authored-by: Freddy Caceres <facs95@gmail.com>
Co-authored-by: Federico Kunze Küllmer <31522760+fedekunze@users.noreply.github.com>
2022-08-11 22:49:05 +02:00

110 lines
2.7 KiB
Go

package server
import (
"context"
"time"
"github.com/tendermint/tendermint/libs/service"
rpcclient "github.com/tendermint/tendermint/rpc/client"
"github.com/tendermint/tendermint/types"
ethermint "github.com/evmos/ethermint/types"
)
const (
ServiceName = "EVMIndexerService"
NewBlockWaitTimeout = 60 * time.Second
)
// EVMIndexerService indexes transactions for json-rpc service.
type EVMIndexerService struct {
service.BaseService
txIdxr ethermint.EVMTxIndexer
client rpcclient.Client
}
// NewEVMIndexerService returns a new service instance.
func NewEVMIndexerService(
txIdxr ethermint.EVMTxIndexer,
client rpcclient.Client,
) *EVMIndexerService {
is := &EVMIndexerService{txIdxr: txIdxr, client: client}
is.BaseService = *service.NewBaseService(nil, ServiceName, is)
return is
}
// OnStart implements service.Service by subscribing for new blocks
// and indexing them by events.
func (eis *EVMIndexerService) OnStart() error {
ctx := context.Background()
status, err := eis.client.Status(ctx)
if err != nil {
return err
}
latestBlock := status.SyncInfo.LatestBlockHeight
newBlockSignal := make(chan struct{}, 1)
// Use SubscribeUnbuffered here to ensure both subscriptions does not get
// canceled due to not pulling messages fast enough. Cause this might
// sometimes happen when there are no other subscribers.
blockHeadersChan, err := eis.client.Subscribe(
ctx,
ServiceName,
types.QueryForEvent(types.EventNewBlockHeader).String(),
0)
if err != nil {
return err
}
go func() {
for {
msg := <-blockHeadersChan
eventDataHeader := msg.Data.(types.EventDataNewBlockHeader)
if eventDataHeader.Header.Height > latestBlock {
latestBlock = eventDataHeader.Header.Height
// notify
select {
case newBlockSignal <- struct{}{}:
default:
}
}
}
}()
lastBlock, err := eis.txIdxr.LastIndexedBlock()
if err != nil {
return err
}
if lastBlock == -1 {
lastBlock = latestBlock
}
for {
if latestBlock <= lastBlock {
// nothing to index. wait for signal of new block
select {
case <-newBlockSignal:
case <-time.After(NewBlockWaitTimeout):
}
continue
}
for i := lastBlock + 1; i <= latestBlock; i++ {
block, err := eis.client.Block(ctx, &i)
if err != nil {
eis.Logger.Error("failed to fetch block", "height", i, "err", err)
break
}
blockResult, err := eis.client.BlockResults(ctx, &i)
if err != nil {
eis.Logger.Error("failed to fetch block result", "height", i, "err", err)
break
}
if err := eis.txIdxr.IndexBlock(block.Block, blockResult.TxsResults); err != nil {
eis.Logger.Error("failed to index block", "height", i, "err", err)
}
lastBlock = blockResult.Height
}
}
}