Add gc for eth tx database
This commit is contained in:
parent
f8dee0983a
commit
f8121c8f1c
@ -2,17 +2,18 @@ package ethhashlookup
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"math"
|
"errors"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/types/ethtypes"
|
"github.com/filecoin-project/lotus/chain/types/ethtypes"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var ErrNotFound = errors.New("not found")
|
||||||
|
|
||||||
var pragmas = []string{
|
var pragmas = []string{
|
||||||
"PRAGMA synchronous = normal",
|
"PRAGMA synchronous = normal",
|
||||||
"PRAGMA temp_store = memory",
|
"PRAGMA temp_store = memory",
|
||||||
@ -28,10 +29,10 @@ var ddls = []string{
|
|||||||
`CREATE TABLE IF NOT EXISTS eth_tx_hashes (
|
`CREATE TABLE IF NOT EXISTS eth_tx_hashes (
|
||||||
hash TEXT PRIMARY KEY NOT NULL,
|
hash TEXT PRIMARY KEY NOT NULL,
|
||||||
cid TEXT NOT NULL UNIQUE,
|
cid TEXT NOT NULL UNIQUE,
|
||||||
epoch INT NOT NULL
|
insertion_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
|
||||||
)`,
|
)`,
|
||||||
|
|
||||||
`CREATE INDEX IF NOT EXISTS eth_tx_hashes_epoch_index ON eth_tx_hashes (epoch)`,
|
`CREATE INDEX IF NOT EXISTS insertion_time_index ON eth_tx_hashes (insertion_time)`,
|
||||||
|
|
||||||
// metadata containing version of schema
|
// metadata containing version of schema
|
||||||
`CREATE TABLE IF NOT EXISTS _meta (
|
`CREATE TABLE IF NOT EXISTS _meta (
|
||||||
@ -43,31 +44,29 @@ var ddls = []string{
|
|||||||
}
|
}
|
||||||
|
|
||||||
const schemaVersion = 1
|
const schemaVersion = 1
|
||||||
const MemPoolEpoch = math.MaxInt64
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
insertTxHash = `INSERT INTO eth_tx_hashes
|
insertTxHash = `INSERT INTO eth_tx_hashes
|
||||||
(hash, cid, epoch)
|
(hash, cid)
|
||||||
VALUES(?, ?, ?)
|
VALUES(?, ?)
|
||||||
ON CONFLICT (hash) DO UPDATE SET epoch = EXCLUDED.epoch
|
ON CONFLICT (hash) DO UPDATE SET insertion_time = CURRENT_TIMESTAMP`
|
||||||
WHERE epoch > EXCLUDED.epoch`
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type TransactionHashLookup struct {
|
type EthTxHashLookup struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ei *TransactionHashLookup) InsertTxHash(txHash ethtypes.EthHash, c cid.Cid, epoch int64) error {
|
func (ei *EthTxHashLookup) UpsertHash(txHash ethtypes.EthHash, c cid.Cid) error {
|
||||||
hashEntry, err := ei.db.Prepare(insertTxHash)
|
hashEntry, err := ei.db.Prepare(insertTxHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("prepare insert event: %w", err)
|
return xerrors.Errorf("prepare insert event: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = hashEntry.Exec(txHash.String(), c.String(), epoch)
|
_, err = hashEntry.Exec(txHash.String(), c.String())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ei *TransactionHashLookup) LookupCidFromTxHash(txHash ethtypes.EthHash) (cid.Cid, error) {
|
func (ei *EthTxHashLookup) GetCidFromHash(txHash ethtypes.EthHash) (cid.Cid, error) {
|
||||||
q, err := ei.db.Query("SELECT cid FROM eth_tx_hashes WHERE hash = :hash;", sql.Named("hash", txHash.String()))
|
q, err := ei.db.Query("SELECT cid FROM eth_tx_hashes WHERE hash = :hash;", sql.Named("hash", txHash.String()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
@ -75,7 +74,7 @@ func (ei *TransactionHashLookup) LookupCidFromTxHash(txHash ethtypes.EthHash) (c
|
|||||||
|
|
||||||
var c string
|
var c string
|
||||||
if !q.Next() {
|
if !q.Next() {
|
||||||
return cid.Undef, xerrors.Errorf("transaction hash %s not found", txHash.String())
|
return cid.Undef, ErrNotFound
|
||||||
}
|
}
|
||||||
err = q.Scan(&c)
|
err = q.Scan(&c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -84,7 +83,7 @@ func (ei *TransactionHashLookup) LookupCidFromTxHash(txHash ethtypes.EthHash) (c
|
|||||||
return cid.Decode(c)
|
return cid.Decode(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ei *TransactionHashLookup) LookupTxHashFromCid(c cid.Cid) (ethtypes.EthHash, error) {
|
func (ei *EthTxHashLookup) GetHashFromCid(c cid.Cid) (ethtypes.EthHash, error) {
|
||||||
q, err := ei.db.Query("SELECT hash FROM eth_tx_hashes WHERE cid = :cid;", sql.Named("cid", c.String()))
|
q, err := ei.db.Query("SELECT hash FROM eth_tx_hashes WHERE cid = :cid;", sql.Named("cid", c.String()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ethtypes.EmptyEthHash, err
|
return ethtypes.EmptyEthHash, err
|
||||||
@ -92,7 +91,7 @@ func (ei *TransactionHashLookup) LookupTxHashFromCid(c cid.Cid) (ethtypes.EthHas
|
|||||||
|
|
||||||
var hashString string
|
var hashString string
|
||||||
if !q.Next() {
|
if !q.Next() {
|
||||||
return ethtypes.EmptyEthHash, xerrors.Errorf("transaction hash %s not found", c.String())
|
return ethtypes.EmptyEthHash, ErrNotFound
|
||||||
}
|
}
|
||||||
err = q.Scan(&hashString)
|
err = q.Scan(&hashString)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -101,8 +100,8 @@ func (ei *TransactionHashLookup) LookupTxHashFromCid(c cid.Cid) (ethtypes.EthHas
|
|||||||
return ethtypes.ParseEthHash(hashString)
|
return ethtypes.ParseEthHash(hashString)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ei *TransactionHashLookup) RemoveEntriesOlderThan(epoch abi.ChainEpoch) (int64, error) {
|
func (ei *EthTxHashLookup) DeleteEntriesOlderThan(days int) (int64, error) {
|
||||||
res, err := ei.db.Exec("DELETE FROM eth_tx_hashes WHERE epoch < :epoch;", sql.Named("epoch", epoch))
|
res, err := ei.db.Exec("DELETE FROM eth_tx_hashes WHERE insertion_time < datetime('now', ?);", "-"+strconv.Itoa(days)+" day")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@ -110,7 +109,7 @@ func (ei *TransactionHashLookup) RemoveEntriesOlderThan(epoch abi.ChainEpoch) (i
|
|||||||
return res.RowsAffected()
|
return res.RowsAffected()
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTransactionHashLookup(path string) (*TransactionHashLookup, error) {
|
func NewTransactionHashLookup(path string) (*EthTxHashLookup, error) {
|
||||||
db, err := sql.Open("sqlite3", path+"?mode=rwc")
|
db, err := sql.Open("sqlite3", path+"?mode=rwc")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("open sqlite3 database: %w", err)
|
return nil, xerrors.Errorf("open sqlite3 database: %w", err)
|
||||||
@ -151,12 +150,12 @@ func NewTransactionHashLookup(path string) (*TransactionHashLookup, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &TransactionHashLookup{
|
return &EthTxHashLookup{
|
||||||
db: db,
|
db: db,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ei *TransactionHashLookup) Close() error {
|
func (ei *EthTxHashLookup) Close() error {
|
||||||
if ei.db == nil {
|
if ei.db == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -350,4 +350,11 @@
|
|||||||
# env var: LOTUS_FEVM_ENABLEETHHASHTOFILECOINCIDMAPPING
|
# env var: LOTUS_FEVM_ENABLEETHHASHTOFILECOINCIDMAPPING
|
||||||
#EnableEthHashToFilecoinCidMapping = false
|
#EnableEthHashToFilecoinCidMapping = false
|
||||||
|
|
||||||
|
# EthTxHashMappingLifetimeDays the transaction hash lookup database will delete mappings that have been stored for more than x days
|
||||||
|
# Set to 0 to keep all mappings
|
||||||
|
#
|
||||||
|
# type: int
|
||||||
|
# env var: LOTUS_FEVM_ETHTXHASHMAPPINGLIFETIMEDAYS
|
||||||
|
#EthTxHashMappingLifetimeDays = 0
|
||||||
|
|
||||||
|
|
||||||
|
@ -106,7 +106,7 @@ func TestLegacyTransaction(t *testing.T) {
|
|||||||
func TestContractDeploymentValidSignature(t *testing.T) {
|
func TestContractDeploymentValidSignature(t *testing.T) {
|
||||||
|
|
||||||
blockTime := 100 * time.Millisecond
|
blockTime := 100 * time.Millisecond
|
||||||
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC())
|
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.EthTxHashLookup())
|
||||||
|
|
||||||
ens.InterconnectAll().BeginMining(blockTime)
|
ens.InterconnectAll().BeginMining(blockTime)
|
||||||
|
|
||||||
@ -167,7 +167,7 @@ func TestContractDeploymentValidSignature(t *testing.T) {
|
|||||||
|
|
||||||
func TestContractInvocation(t *testing.T) {
|
func TestContractInvocation(t *testing.T) {
|
||||||
blockTime := 100 * time.Millisecond
|
blockTime := 100 * time.Millisecond
|
||||||
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC())
|
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.EthTxHashLookup())
|
||||||
|
|
||||||
ens.InterconnectAll().BeginMining(blockTime)
|
ens.InterconnectAll().BeginMining(blockTime)
|
||||||
|
|
||||||
|
@ -19,7 +19,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/itests/kit"
|
"github.com/filecoin-project/lotus/itests/kit"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestFEVMBasic does a basic ethhash contract installation and invocation
|
// TestFEVMBasic does a basic fevm contract installation and invocation
|
||||||
func TestFEVMBasic(t *testing.T) {
|
func TestFEVMBasic(t *testing.T) {
|
||||||
// TODO the contract installation and invocation can be lifted into utility methods
|
// TODO the contract installation and invocation can be lifted into utility methods
|
||||||
// He who writes the second test, shall do that.
|
// He who writes the second test, shall do that.
|
||||||
|
@ -406,6 +406,13 @@ see https://lotus.filecoin.io/storage-providers/advanced-configurations/market/#
|
|||||||
|
|
||||||
Comment: `EnableEthHashToFilecoinCidMapping enables storing a mapping of eth transaction hashes to filecoin message Cids`,
|
Comment: `EnableEthHashToFilecoinCidMapping enables storing a mapping of eth transaction hashes to filecoin message Cids`,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Name: "EthTxHashMappingLifetimeDays",
|
||||||
|
Type: "int",
|
||||||
|
|
||||||
|
Comment: `EthTxHashMappingLifetimeDays the transaction hash lookup database will delete mappings that have been stored for more than x days
|
||||||
|
Set to 0 to keep all mappings`,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"FullNode": []DocField{
|
"FullNode": []DocField{
|
||||||
{
|
{
|
||||||
|
@ -697,4 +697,7 @@ type ActorEventConfig struct {
|
|||||||
type FevmConfig struct {
|
type FevmConfig struct {
|
||||||
// EnableEthHashToFilecoinCidMapping enables storing a mapping of eth transaction hashes to filecoin message Cids
|
// EnableEthHashToFilecoinCidMapping enables storing a mapping of eth transaction hashes to filecoin message Cids
|
||||||
EnableEthHashToFilecoinCidMapping bool
|
EnableEthHashToFilecoinCidMapping bool
|
||||||
|
// EthTxHashMappingLifetimeDays the transaction hash lookup database will delete mappings that have been stored for more than x days
|
||||||
|
// Set to 0 to keep all mappings
|
||||||
|
EthTxHashMappingLifetimeDays int
|
||||||
}
|
}
|
||||||
|
@ -259,7 +259,7 @@ func (a *EthModule) EthGetTransactionByHash(ctx context.Context, txHash *ethtype
|
|||||||
c := cid.Undef
|
c := cid.Undef
|
||||||
if a.EthTxHashManager != nil {
|
if a.EthTxHashManager != nil {
|
||||||
var err error
|
var err error
|
||||||
c, err = a.EthTxHashManager.TransactionHashLookup.LookupCidFromTxHash(*txHash)
|
c, err = a.EthTxHashManager.TransactionHashLookup.GetCidFromHash(*txHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("could not find transaction hash %s in lookup table", txHash.String())
|
log.Debug("could not find transaction hash %s in lookup table", txHash.String())
|
||||||
}
|
}
|
||||||
@ -326,7 +326,7 @@ func (a *EthModule) EthGetTransactionReceipt(ctx context.Context, txHash ethtype
|
|||||||
c := cid.Undef
|
c := cid.Undef
|
||||||
if a.EthTxHashManager != nil {
|
if a.EthTxHashManager != nil {
|
||||||
var err error
|
var err error
|
||||||
c, err = a.EthTxHashManager.TransactionHashLookup.LookupCidFromTxHash(txHash)
|
c, err = a.EthTxHashManager.TransactionHashLookup.GetCidFromHash(txHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug("could not find transaction hash %s in lookup table", txHash.String())
|
log.Debug("could not find transaction hash %s in lookup table", txHash.String())
|
||||||
}
|
}
|
||||||
@ -1777,7 +1777,7 @@ func (m *EthTxHashManager) Apply(ctx context.Context, from, to *types.TipSet) er
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = m.TransactionHashLookup.InsertTxHash(hash, smsg.Cid(), int64(to.Height()))
|
err = m.TransactionHashLookup.UpsertHash(hash, smsg.Cid())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -1789,7 +1789,7 @@ func (m *EthTxHashManager) Apply(ctx context.Context, from, to *types.TipSet) er
|
|||||||
|
|
||||||
type EthTxHashManager struct {
|
type EthTxHashManager struct {
|
||||||
StateAPI StateAPI
|
StateAPI StateAPI
|
||||||
TransactionHashLookup *ethhashlookup.TransactionHashLookup
|
TransactionHashLookup *ethhashlookup.EthTxHashLookup
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *EthTxHashManager) Revert(ctx context.Context, from, to *types.TipSet) error {
|
func (m *EthTxHashManager) Revert(ctx context.Context, from, to *types.TipSet) error {
|
||||||
@ -1814,7 +1814,7 @@ func WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate, manager
|
|||||||
log.Errorf("error converting filecoin message to eth tx: %s", err)
|
log.Errorf("error converting filecoin message to eth tx: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = manager.TransactionHashLookup.InsertTxHash(ethTx.Hash, u.Message.Cid(), ethhashlookup.MemPoolEpoch)
|
err = manager.TransactionHashLookup.UpsertHash(ethTx.Hash, u.Message.Cid())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("error inserting tx mapping to db: %s", err)
|
log.Errorf("error inserting tx mapping to db: %s", err)
|
||||||
}
|
}
|
||||||
@ -1822,6 +1822,22 @@ func WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate, manager
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func EthTxHashGC(ctx context.Context, retentionDays int, manager *EthTxHashManager) {
|
||||||
|
if retentionDays == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
gcPeriod := 1 * time.Hour
|
||||||
|
for {
|
||||||
|
entriesDeleted, err := manager.TransactionHashLookup.DeleteEntriesOlderThan(retentionDays)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error garbage collecting eth transaction hash database: %s", err)
|
||||||
|
}
|
||||||
|
log.Info("garbage collection run on eth transaction hash lookup database. %d entries deleted", entriesDeleted)
|
||||||
|
time.Sleep(gcPeriod)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// decodeLogBytes decodes a CBOR-serialized array into its original form.
|
// decodeLogBytes decodes a CBOR-serialized array into its original form.
|
||||||
//
|
//
|
||||||
// This function swallows errors and returns the original array if it failed
|
// This function swallows errors and returns the original array if it failed
|
||||||
|
@ -74,6 +74,7 @@ func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRep
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
go full.WaitForMpoolUpdates(ctx, ch, ðTxHashManager)
|
go full.WaitForMpoolUpdates(ctx, ch, ðTxHashManager)
|
||||||
|
go full.EthTxHashGC(ctx, cfg.EthTxHashMappingLifetimeDays, ðTxHashManager)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
|
Loading…
Reference in New Issue
Block a user