fix: events: sqlite db improvements (#12090)

* fix: events: sqlite db improvements

* fix unclosed multi-row query
* tune options to limit wal growth

Ref: https://github.com/filecoin-project/lotus/issues/12089

* fix: events: use correct context for CollectEvents transaction

* fix: events: close prepared read statement

* fix: events: close initial query; handle lint failures
This commit is contained in:
Rod Vagg 2024-06-15 01:09:35 +10:00 committed by Phi
parent 26235d753e
commit 2e781e6f64
No known key found for this signature in database
GPG Key ID: AFAE959F938CC79E

View File

@ -26,10 +26,11 @@ var pragmas = []string{
"PRAGMA temp_store = memory",
"PRAGMA mmap_size = 30000000000",
"PRAGMA page_size = 32768",
"PRAGMA auto_vacuum = NONE", // not useful until we implement GC
"PRAGMA auto_vacuum = NONE",
"PRAGMA automatic_index = OFF",
"PRAGMA journal_mode = WAL",
"PRAGMA read_uncommitted = ON",
"PRAGMA wal_autocheckpoint = 256", // checkpoint @ 256 pages
"PRAGMA journal_size_limit = 0", // always reset journal and wal files
}
// Any changes to this schema should be matched for the `lotus-shed indexes backfill-events` command
@ -438,6 +439,9 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor
eventIndex := EventIndex{db: db}
q, err := db.QueryContext(ctx, "SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';")
if q != nil {
defer func() { _ = q.Close() }()
}
if errors.Is(err, sql.ErrNoRows) || !q.Next() {
// empty database, create the schema
for _, ddl := range ddls {
@ -521,7 +525,7 @@ func (ei *EventIndex) Close() error {
}
func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error {
tx, err := ei.db.Begin()
tx, err := ei.db.BeginTx(ctx, nil)
if err != nil {
return xerrors.Errorf("begin transaction: %w", err)
}
@ -743,6 +747,7 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude
if err != nil {
return xerrors.Errorf("prepare prefill query: %w", err)
}
defer func() { _ = stmt.Close() }()
q, err := stmt.QueryContext(ctx, values...)
if err != nil {
@ -751,6 +756,7 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude
}
return xerrors.Errorf("exec prefill query: %w", err)
}
defer func() { _ = q.Close() }()
var ces []*CollectedEvent
var currentID int64 = -1
@ -839,7 +845,6 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude
Codec: row.codec,
Value: row.value,
})
}
if ce != nil {