lotus/node/impl/full/txhashmanager.go

130 lines
3.1 KiB
Go
Raw Normal View History

2023-09-21 15:37:02 +00:00
package full
import (
"context"
"time"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/ethhashlookup"
"github.com/filecoin-project/lotus/chain/types"
)
type EthTxHashManager struct {
StateAPI StateAPI
TransactionHashLookup *ethhashlookup.EthTxHashLookup
}
func (m *EthTxHashManager) Revert(ctx context.Context, from, to *types.TipSet) error {
return nil
}
func (m *EthTxHashManager) PopulateExistingMappings(ctx context.Context, minHeight abi.ChainEpoch) error {
if minHeight < build.UpgradeHyggeHeight {
minHeight = build.UpgradeHyggeHeight
}
ts := m.StateAPI.Chain.GetHeaviestTipSet()
for ts.Height() > minHeight {
for _, block := range ts.Blocks() {
msgs, err := m.StateAPI.Chain.SecpkMessagesForBlock(ctx, block)
if err != nil {
// If we can't find the messages, we've either imported from snapshot or pruned the store
log.Debug("exiting message mapping population at epoch ", ts.Height())
return nil
}
for _, msg := range msgs {
m.ProcessSignedMessage(ctx, msg)
}
}
var err error
ts, err = m.StateAPI.Chain.GetTipSetFromKey(ctx, ts.Parents())
if err != nil {
return err
}
}
return nil
}
func (m *EthTxHashManager) Apply(ctx context.Context, from, to *types.TipSet) error {
for _, blk := range to.Blocks() {
_, smsgs, err := m.StateAPI.Chain.MessagesForBlock(ctx, blk)
if err != nil {
return err
}
for _, smsg := range smsgs {
if smsg.Signature.Type != crypto.SigTypeDelegated {
continue
}
hash, err := ethTxHashFromSignedMessage(ctx, smsg, m.StateAPI)
if err != nil {
return err
}
err = m.TransactionHashLookup.UpsertHash(hash, smsg.Cid())
if err != nil {
return err
}
}
}
return nil
}
func (m *EthTxHashManager) ProcessSignedMessage(ctx context.Context, msg *types.SignedMessage) {
if msg.Signature.Type != crypto.SigTypeDelegated {
return
}
ethTx, err := newEthTxFromSignedMessage(ctx, msg, m.StateAPI)
if err != nil {
log.Errorf("error converting filecoin message to eth tx: %s", err)
return
}
err = m.TransactionHashLookup.UpsertHash(ethTx.Hash, msg.Cid())
if err != nil {
log.Errorf("error inserting tx mapping to db: %s", err)
return
}
}
func WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate, manager *EthTxHashManager) {
for {
select {
case <-ctx.Done():
return
case u := <-ch:
if u.Type != api.MpoolAdd {
continue
}
manager.ProcessSignedMessage(ctx, u.Message)
}
}
}
func EthTxHashGC(ctx context.Context, retentionDays int, manager *EthTxHashManager) {
if retentionDays == 0 {
return
}
gcPeriod := 1 * time.Hour
for {
entriesDeleted, err := manager.TransactionHashLookup.DeleteEntriesOlderThan(retentionDays)
if err != nil {
log.Errorf("error garbage collecting eth transaction hash database: %s", err)
}
log.Info("garbage collection run on eth transaction hash lookup database. %d entries deleted", entriesDeleted)
time.Sleep(gcPeriod)
}
}