Merge pull request #10119 from filecoin-project/gstuart/tx-hash-on-simport-napshot

feat: eth: populate tx hash database on startup
This commit is contained in:
Łukasz Magiera 2023-02-09 19:06:20 +01:00 committed by GitHub
commit 75f9b7d040
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 76 additions and 13 deletions

View File

@ -256,6 +256,20 @@ func (cs *ChainStore) MessagesForBlock(ctx context.Context, b *types.BlockHeader
return blsmsgs, secpkmsgs, nil
}
func (cs *ChainStore) SecpkMessagesForBlock(ctx context.Context, b *types.BlockHeader) ([]*types.SignedMessage, error) {
_, secpkcids, err := cs.ReadMsgMetaCids(ctx, b.Messages)
if err != nil {
return nil, err
}
secpkmsgs, err := cs.LoadSignedMessagesFromCids(ctx, secpkcids)
if err != nil {
return nil, xerrors.Errorf("loading secpk messages for block: %w", err)
}
return secpkmsgs, nil
}
func (cs *ChainStore) GetParentReceipt(ctx context.Context, b *types.BlockHeader, i int) (*types.MessageReceipt, error) {
// block headers use adt0, for now.
a, err := blockadt.AsArray(cs.ActorStore(ctx), b.ParentMessageReceipts)

View File

@ -1973,6 +1973,52 @@ func (m *EthTxHashManager) Revert(ctx context.Context, from, to *types.TipSet) e
return nil
}
func (m *EthTxHashManager) PopulateExistingMappings(ctx context.Context, minHeight abi.ChainEpoch) error {
if minHeight < build.UpgradeHyggeHeight {
minHeight = build.UpgradeHyggeHeight
}
ts := m.StateAPI.Chain.GetHeaviestTipSet()
for ts.Height() > minHeight {
for _, block := range ts.Blocks() {
msgs, err := m.StateAPI.Chain.SecpkMessagesForBlock(ctx, block)
if err != nil {
// If we can't find the messages, we've either imported from snapshot or pruned the store
log.Debug("exiting message mapping population at epoch ", ts.Height())
return nil
}
for _, msg := range msgs {
m.ProcessSignedMessage(ctx, msg)
}
}
var err error
ts, err = m.StateAPI.Chain.GetTipSetFromKey(ctx, ts.Parents())
if err != nil {
return err
}
}
return nil
}
func (m *EthTxHashManager) ProcessSignedMessage(ctx context.Context, msg *types.SignedMessage) {
if msg.Signature.Type != crypto.SigTypeDelegated {
return
}
ethTx, err := newEthTxFromSignedMessage(ctx, msg, m.StateAPI)
if err != nil {
log.Errorf("error converting filecoin message to eth tx: %s", err)
}
err = m.TransactionHashLookup.UpsertHash(ethTx.Hash, msg.Cid())
if err != nil {
log.Errorf("error inserting tx mapping to db: %s", err)
}
}
func WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate, manager *EthTxHashManager) {
for {
select {
@ -1982,19 +2028,8 @@ func WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate, manager
if u.Type != api.MpoolAdd {
continue
}
if u.Message.Signature.Type != crypto.SigTypeDelegated {
continue
}
ethTx, err := newEthTxFromSignedMessage(ctx, u.Message, manager.StateAPI)
if err != nil {
log.Errorf("error converting filecoin message to eth tx: %s", err)
}
err = manager.TransactionHashLookup.UpsertHash(ethTx.Hash, u.Message.Cid())
if err != nil {
log.Errorf("error inserting tx mapping to db: %s", err)
}
manager.ProcessSignedMessage(ctx, u.Message)
}
}
}

View File

@ -2,6 +2,7 @@ package modules
import (
"context"
"os"
"path/filepath"
"go.uber.org/fx"
@ -24,7 +25,13 @@ func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRep
return nil, err
}
transactionHashLookup, err := ethhashlookup.NewTransactionHashLookup(filepath.Join(sqlitePath, "txhash.db"))
dbPath := filepath.Join(sqlitePath, "txhash.db")
// Check if the db exists, if not, we'll back-fill some entries
_, err = os.Stat(dbPath)
dbAlreadyExists := err == nil
transactionHashLookup, err := ethhashlookup.NewTransactionHashLookup(dbPath)
if err != nil {
return nil, err
}
@ -40,6 +47,13 @@ func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRep
TransactionHashLookup: transactionHashLookup,
}
if !dbAlreadyExists {
err = ethTxHashManager.PopulateExistingMappings(mctx, 0)
if err != nil {
return nil, err
}
}
const ChainHeadConfidence = 1
ctx := helpers.LifecycleCtx(mctx, lc)