Add historic event indexing

This commit is contained in:
Ian Davis 2022-11-14 16:59:29 +00:00
parent 907c201912
commit 0e8dd9efc5
7 changed files with 692 additions and 10 deletions

View File

@ -18,10 +18,6 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)
type RobustAddresser interface {
LookupRobustAddress(ctx context.Context, idAddr address.Address, ts *types.TipSet) (address.Address, error)
}
const indexed uint8 = 0x01
type EventFilter struct {
@ -42,7 +38,7 @@ type EventFilter struct {
var _ Filter = (*EventFilter)(nil)
type CollectedEvent struct {
Event *types.Event
Entries []types.EventEntry
EmitterAddr address.Address // f4 address of emitter
EventIdx int // index of the event within the list of emitted events
Reverted bool
@ -104,7 +100,7 @@ func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
// event matches filter, so record it
cev := &CollectedEvent{
Event: ev,
Entries: ev.Entries,
EmitterAddr: addr,
EventIdx: evIdx,
Reverted: revert,
@ -134,6 +130,12 @@ func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return nil
}
func (f *EventFilter) setCollectedEvents(ces []*CollectedEvent) {
f.mu.Lock()
f.collected = ces
f.mu.Unlock()
}
func (f *EventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent {
f.mu.Lock()
collected := f.collected
@ -282,14 +284,18 @@ type EventFilterManager struct {
ChainStore *cstore.ChainStore
AddressResolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)
MaxFilterResults int
EventIndex *EventIndex
mu sync.Mutex // guards mutations to filters
filters map[string]*EventFilter
mu sync.Mutex // guards mutations to filters
filters map[string]*EventFilter
currentHeight abi.ChainEpoch
}
func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet) error {
m.mu.Lock()
defer m.mu.Unlock()
m.currentHeight = to.Height()
if len(m.filters) == 0 {
return nil
}
@ -313,6 +319,8 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet)
func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet) error {
m.mu.Lock()
defer m.mu.Unlock()
m.currentHeight = to.Height()
if len(m.filters) == 0 {
return nil
}
@ -334,6 +342,14 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
}
func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, keys map[string][][]byte) (*EventFilter, error) {
m.mu.Lock()
currentHeight := m.currentHeight
m.mu.Unlock()
if m.EventIndex == nil && (minHeight < currentHeight || maxHeight < currentHeight) {
return nil, xerrors.Errorf("historic event index disabled")
}
id, err := uuid.NewRandom()
if err != nil {
return nil, xerrors.Errorf("new uuid: %w", err)
@ -349,6 +365,12 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
maxResults: m.MaxFilterResults,
}
if m.EventIndex != nil && (minHeight < currentHeight || maxHeight < currentHeight) {
if err := m.EventIndex.PrefillFilter(ctx, f); err != nil {
return nil, err
}
}
m.mu.Lock()
m.filters[id.String()] = f
m.mu.Unlock()

View File

@ -60,7 +60,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
noCollectedEvents := []*CollectedEvent{}
oneCollectedEvent := []*CollectedEvent{
{
Event: ev1,
Entries: ev1.Entries,
EmitterAddr: a1,
EventIdx: 0,
Reverted: false,

View File

@ -0,0 +1,376 @@
package filter
import (
"context"
"database/sql"
"errors"
"fmt"
"sort"
"strings"
_ "github.com/mattn/go-sqlite3"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/chain/types"
)
var pragmas = []string{
"PRAGMA synchronous = normal",
"PRAGMA temp_store = memory",
"PRAGMA mmap_size = 30000000000",
"PRAGMA page_size = 32768",
"PRAGMA auto_vacuum = NONE",
"PRAGMA automatic_index = OFF",
"PRAGMA journal_mode = WAL",
"PRAGMA read_uncommitted = ON",
}
var ddls = []string{
`CREATE TABLE IF NOT EXISTS event (
id INTEGER PRIMARY KEY,
height INTEGER NOT NULL,
tipset_key BLOB NOT NULL,
tipset_key_cid BLOB NOT NULL,
emitter_addr BLOB NOT NULL,
event_index INTEGER NOT NULL,
message_cid BLOB NOT NULL,
message_index INTEGER NOT NULL,
reverted INTEGER NOT NULL
)`,
`CREATE TABLE IF NOT EXISTS event_entry (
event_id INTEGER,
indexed INTEGER NOT NULL,
flags BLOB NOT NULL,
key BLOB NOT NULL,
value BLOB NOT NULL
)`,
// placeholder version to enable migrations.
`CREATE TABLE IF NOT EXISTS _meta (
version UINT64 NOT NULL UNIQUE
)`,
// version 1.
`INSERT OR IGNORE INTO _meta (version) VALUES (1)`,
}
const (
insertEvent = `INSERT OR IGNORE INTO event
(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted)
VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
insertEntry = `INSERT OR IGNORE INTO event_entry
(event_id, indexed, flags, key, value)
VALUES(?, ?, ?, ?, ?)`
)
type EventIndex struct {
db *sql.DB
}
func NewEventIndex(path string) (*EventIndex, error) {
db, err := sql.Open("sqlite3", path+"?mode=rwc")
if err != nil {
return nil, xerrors.Errorf("open sqlite3 database: %w", err)
}
for _, pragma := range pragmas {
if _, err := db.Exec(pragma); err != nil {
_ = db.Close()
return nil, xerrors.Errorf("exec pragma %q: %w", pragma, err)
}
}
for _, ddl := range ddls {
if _, err := db.Exec(ddl); err != nil {
_ = db.Close()
return nil, xerrors.Errorf("exec ddl %q: %w", ddl, err)
}
}
return &EventIndex{
db: db,
}, nil
}
func (ei *EventIndex) Close() error {
if ei.db == nil {
return nil
}
return ei.db.Close()
}
func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error {
// cache of lookups between actor id and f4 address
addressLookups := make(map[abi.ActorID]address.Address)
ems, err := te.messages(ctx)
if err != nil {
return xerrors.Errorf("load executed messages: %w", err)
}
tx, err := ei.db.Begin()
if err != nil {
return xerrors.Errorf("begin transaction: %w", err)
}
stmtEvent, err := tx.Prepare(insertEvent)
if err != nil {
return xerrors.Errorf("prepare insert event: %w", err)
}
stmtEntry, err := tx.Prepare(insertEntry)
if err != nil {
return xerrors.Errorf("prepare insert entry: %w", err)
}
for msgIdx, em := range ems {
for evIdx, ev := range em.Events() {
addr, found := addressLookups[ev.Emitter]
if !found {
var ok bool
addr, ok = resolver(ctx, ev.Emitter, te.rctTs)
if !ok {
// not an address we will be able to match against
continue
}
addressLookups[ev.Emitter] = addr
}
tsKeyCid, err := te.msgTs.Key().Cid()
if err != nil {
return xerrors.Errorf("tipset key cid: %w", err)
}
res, err := stmtEvent.Exec(
te.msgTs.Height(), // height
te.msgTs.Key().Bytes(), // tipset_key
tsKeyCid.Bytes(), // tipset_key_cid
addr.Bytes(), // emitter_addr
evIdx, // event_index
em.Message().Cid().Bytes(), // message_cid
msgIdx, // message_index
revert, // reverted
)
if err != nil {
return xerrors.Errorf("exec insert event: %w", err)
}
lastID, err := res.LastInsertId()
if err != nil {
return xerrors.Errorf("get last row id: %w", err)
}
for _, entry := range ev.Entries {
_, err := stmtEntry.Exec(
lastID, // event_id
entry.Flags&indexed == indexed, // indexed
[]byte{entry.Flags}, // flags
entry.Key, // key
entry.Value, // value
)
if err != nil {
return xerrors.Errorf("exec insert entry: %w", err)
}
}
}
}
if err := tx.Commit(); err != nil {
return xerrors.Errorf("commit transaction: %w", err)
}
return nil
}
// PrefillFilter fills a filter's collection of events from the historic index
func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error {
clauses := []string{}
values := []any{}
joins := []string{}
if f.tipsetCid != cid.Undef {
clauses = append(clauses, "event.tipset_key_cid=?")
values = append(values, f.tipsetCid.Bytes())
} else {
if f.minHeight >= 0 {
clauses = append(clauses, "event.height>=?")
values = append(values, f.minHeight)
}
if f.maxHeight >= 0 {
clauses = append(clauses, "event.height<=?")
values = append(values, f.maxHeight)
}
}
if len(f.addresses) > 0 {
subclauses := []string{}
for _, addr := range f.addresses {
subclauses = append(subclauses, "emitter_addr=?")
values = append(values, addr.Bytes())
}
clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")")
}
if len(f.keys) > 0 {
join := 0
for key, vals := range f.keys {
join++
joinAlias := fmt.Sprintf("ee%d", join)
// JOIN ee1 event_entry ON event.id=ee1.event_id
joins = append(joins, fmt.Sprintf("event_entry %s on event.id=%[1]s.event_id", joinAlias))
clauses = append(clauses, fmt.Sprintf("%s.indexed=1 AND %[1]s.key=?", joinAlias))
values = append(values, []byte(key))
subclauses := []string{}
for _, val := range vals {
subclauses = append(subclauses, fmt.Sprintf("%s.value=?", joinAlias))
values = append(values, val)
}
clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")")
}
}
s := `SELECT
event.id,
event.height,
event.tipset_key,
event.tipset_key_cid,
event.emitter_addr,
event.event_index,
event.message_cid,
event.message_index,
event.reverted,
event_entry.flags,
event_entry.key,
event_entry.value
FROM event JOIN event_entry ON event.id=event_entry.event_id`
if len(joins) > 0 {
s = s + ", " + strings.Join(joins, ", ")
}
if len(clauses) > 0 {
s = s + " WHERE " + strings.Join(clauses, " AND ")
}
s += " ORDER BY event.height DESC"
stmt, err := ei.db.Prepare(s)
if err != nil {
return xerrors.Errorf("prepare prefill query: %w", err)
}
q, err := stmt.QueryContext(ctx, values...)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil
}
return xerrors.Errorf("exec prefill query: %w", err)
}
var ces []*CollectedEvent
var currentID int64 = -1
var ce *CollectedEvent
for q.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
var row struct {
id int64
height uint64
tipsetKey []byte
tipsetKeyCid []byte
emitterAddr []byte
eventIndex int
messageCid []byte
messageIndex int
reverted bool
flags []byte
key []byte
value []byte
}
if err := q.Scan(
&row.id,
&row.height,
&row.tipsetKey,
&row.tipsetKeyCid,
&row.emitterAddr,
&row.eventIndex,
&row.messageCid,
&row.messageIndex,
&row.reverted,
&row.flags,
&row.key,
&row.value,
); err != nil {
return xerrors.Errorf("read prefill row: %w", err)
}
if row.id != currentID {
if ce != nil {
ces = append(ces, ce)
ce = nil
// Unfortunately we can't easily incorporate the max results limit into the query due to the
// unpredictable number of rows caused by joins
// Break here to stop collecting rows
if f.maxResults > 0 && len(ces) >= f.maxResults {
break
}
}
currentID = row.id
ce = &CollectedEvent{
EventIdx: row.eventIndex,
Reverted: row.reverted,
Height: abi.ChainEpoch(row.height),
MsgIdx: row.messageIndex,
}
ce.EmitterAddr, err = address.NewFromBytes(row.emitterAddr)
if err != nil {
return xerrors.Errorf("parse emitter addr: %w", err)
}
ce.TipSetKey, err = types.TipSetKeyFromBytes(row.tipsetKey)
if err != nil {
return xerrors.Errorf("parse tipsetkey: %w", err)
}
ce.MsgCid, err = cid.Cast(row.messageCid)
if err != nil {
return xerrors.Errorf("parse message cid: %w", err)
}
}
ce.Entries = append(ce.Entries, types.EventEntry{
Flags: row.flags[0],
Key: row.key,
Value: row.value,
})
}
if ce != nil {
ces = append(ces, ce)
}
if len(ces) == 0 {
return nil
}
// collected event list is in inverted order since we selected only the most recent events
// sort it into height order
sort.Slice(ces, func(i, j int) bool { return ces[i].Height < ces[j].Height })
f.setCollectedEvents(ces)
return nil
}

View File

@ -0,0 +1,281 @@
package filter
import (
"context"
pseudo "math/rand"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
)
func TestEventIndexPrefillFilter(t *testing.T) {
rng := pseudo.New(pseudo.NewSource(299792458))
a1 := randomF4Addr(t, rng)
a2 := randomF4Addr(t, rng)
a1ID := abi.ActorID(1)
a2ID := abi.ActorID(2)
addrMap := addressMap{}
addrMap.add(a1ID, a1)
addrMap.add(a2ID, a2)
ev1 := fakeEvent(
a1ID,
[]kv{
{k: "type", v: []byte("approval")},
{k: "signer", v: []byte("addr1")},
},
[]kv{
{k: "amount", v: []byte("2988181")},
},
)
st := newStore()
events := []*types.Event{ev1}
em := executedMessage{
msg: fakeMessage(randomF4Addr(t, rng), randomF4Addr(t, rng)),
rct: fakeReceipt(t, rng, st, events),
evs: events,
}
events14000 := buildTipSetEvents(t, rng, 14000, em)
cid14000, err := events14000.msgTs.Key().Cid()
require.NoError(t, err, "tipset cid")
noCollectedEvents := []*CollectedEvent{}
oneCollectedEvent := []*CollectedEvent{
{
Entries: ev1.Entries,
EmitterAddr: a1,
EventIdx: 0,
Reverted: false,
Height: 14000,
TipSetKey: events14000.msgTs.Key(),
MsgIdx: 0,
MsgCid: em.msg.Cid(),
},
}
workDir, err := os.MkdirTemp("", "lotusevents")
require.NoError(t, err, "create temporary work directory")
defer os.RemoveAll(workDir)
t.Logf("using work dir %q", workDir)
dbPath := filepath.Join(workDir, "actorevents.db")
ei, err := NewEventIndex(dbPath)
require.NoError(t, err, "create event index")
if err := ei.CollectEvents(context.Background(), events14000, false, addrMap.ResolveAddress); err != nil {
require.NoError(t, err, "collect events")
}
testCases := []struct {
name string
filter *EventFilter
te *TipSetEvents
want []*CollectedEvent
}{
{
name: "nomatch tipset min height",
filter: &EventFilter{
minHeight: 14001,
maxHeight: -1,
},
te: events14000,
want: noCollectedEvents,
},
{
name: "nomatch tipset max height",
filter: &EventFilter{
minHeight: -1,
maxHeight: 13999,
},
te: events14000,
want: noCollectedEvents,
},
{
name: "match tipset min height",
filter: &EventFilter{
minHeight: 14000,
maxHeight: -1,
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "match tipset cid",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
tipsetCid: cid14000,
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "nomatch address",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a2},
},
te: events14000,
want: noCollectedEvents,
},
{
name: "match address",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
addresses: []address.Address{a1},
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "match one entry",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("approval"),
},
},
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "match one entry with alternate values",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("cancel"),
[]byte("propose"),
[]byte("approval"),
},
},
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "nomatch one entry by missing value",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("cancel"),
[]byte("propose"),
},
},
},
te: events14000,
want: noCollectedEvents,
},
{
name: "nomatch one entry by missing key",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"method": {
[]byte("approval"),
},
},
},
te: events14000,
want: noCollectedEvents,
},
{
name: "match one entry with multiple keys",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("approval"),
},
"signer": {
[]byte("addr1"),
},
},
},
te: events14000,
want: oneCollectedEvent,
},
{
name: "nomatch one entry with one mismatching key",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("approval"),
},
"approver": {
[]byte("addr1"),
},
},
},
te: events14000,
want: noCollectedEvents,
},
{
name: "nomatch one entry with one mismatching value",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"type": {
[]byte("approval"),
},
"signer": {
[]byte("addr2"),
},
},
},
te: events14000,
want: noCollectedEvents,
},
{
name: "nomatch one entry with one unindexed key",
filter: &EventFilter{
minHeight: -1,
maxHeight: -1,
keys: map[string][][]byte{
"amount": {
[]byte("2988181"),
},
},
},
te: events14000,
want: noCollectedEvents,
},
}
for _, tc := range testCases {
tc := tc // appease lint
t.Run(tc.name, func(t *testing.T) {
if err := ei.PrefillFilter(context.Background(), tc.filter); err != nil {
require.NoError(t, err, "prefill filter events")
}
coll := tc.filter.TakeCollectedEvents(context.Background())
require.ElementsMatch(t, coll, tc.want)
})
}
}

1
go.mod
View File

@ -273,6 +273,7 @@ require (
github.com/mattn/go-colorable v0.1.9 // indirect
github.com/mattn/go-pointer v0.0.1 // indirect
github.com/mattn/go-runewidth v0.0.10 // indirect
github.com/mattn/go-sqlite3 v1.14.16 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/miekg/dns v1.1.50 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect

2
go.sum
View File

@ -1317,6 +1317,8 @@ github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.10 h1:CoZ3S2P7pvtP45xOtBw+/mDL2z0RKI576gSkzRRpdGg=
github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y=
github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/mattn/go-xmlrpc v0.0.3/go.mod h1:mqc2dz7tP5x5BKlCahN/n+hs7OSZKJkS9JsHNBRlrxA=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=

View File

@ -1247,7 +1247,7 @@ func ethFilterResultFromEvents(evs []*filter.CollectedEvent) (*api.EthFilterResu
var err error
for _, entry := range ev.Event.Entries {
for _, entry := range ev.Entries {
hash := api.EthHashData(entry.Value)
if entry.Key == ethTopic1 || entry.Key == ethTopic2 || entry.Key == ethTopic3 || entry.Key == ethTopic4 {
log.Topics = append(log.Topics, hash)