Event index should be unique for tipsets (#11952)
* event index should be unique for tipsets * fix formatting * migrate to version 5
This commit is contained in:
parent
10f7b6ed5f
commit
6bbe090411
@ -57,7 +57,7 @@ var _ Filter = (*eventFilter)(nil)
|
|||||||
type CollectedEvent struct {
|
type CollectedEvent struct {
|
||||||
Entries []types.EventEntry
|
Entries []types.EventEntry
|
||||||
EmitterAddr address.Address // address of emitter
|
EmitterAddr address.Address // address of emitter
|
||||||
EventIdx int // index of the event within the list of emitted events
|
EventIdx int // index of the event within the list of emitted events in a given tipset
|
||||||
Reverted bool
|
Reverted bool
|
||||||
Height abi.ChainEpoch
|
Height abi.ChainEpoch
|
||||||
TipSetKey types.TipSetKey // tipset that contained the message
|
TipSetKey types.TipSetKey // tipset that contained the message
|
||||||
@ -94,8 +94,11 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("load executed messages: %w", err)
|
return xerrors.Errorf("load executed messages: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
eventCount := 0
|
||||||
|
|
||||||
for msgIdx, em := range ems {
|
for msgIdx, em := range ems {
|
||||||
for evIdx, ev := range em.Events() {
|
for _, ev := range em.Events() {
|
||||||
// lookup address corresponding to the actor id
|
// lookup address corresponding to the actor id
|
||||||
addr, found := addressLookups[ev.Emitter]
|
addr, found := addressLookups[ev.Emitter]
|
||||||
if !found {
|
if !found {
|
||||||
@ -119,7 +122,7 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
|
|||||||
cev := &CollectedEvent{
|
cev := &CollectedEvent{
|
||||||
Entries: ev.Entries,
|
Entries: ev.Entries,
|
||||||
EmitterAddr: addr,
|
EmitterAddr: addr,
|
||||||
EventIdx: evIdx,
|
EventIdx: eventCount,
|
||||||
Reverted: revert,
|
Reverted: revert,
|
||||||
Height: te.msgTs.Height(),
|
Height: te.msgTs.Height(),
|
||||||
TipSetKey: te.msgTs.Key(),
|
TipSetKey: te.msgTs.Key(),
|
||||||
@ -141,6 +144,7 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
|
|||||||
}
|
}
|
||||||
f.collected = append(f.collected, cev)
|
f.collected = append(f.collected, cev)
|
||||||
f.mu.Unlock()
|
f.mu.Unlock()
|
||||||
|
eventCount++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,6 +72,7 @@ var ddls = []string{
|
|||||||
`INSERT OR IGNORE INTO _meta (version) VALUES (2)`,
|
`INSERT OR IGNORE INTO _meta (version) VALUES (2)`,
|
||||||
`INSERT OR IGNORE INTO _meta (version) VALUES (3)`,
|
`INSERT OR IGNORE INTO _meta (version) VALUES (3)`,
|
||||||
`INSERT OR IGNORE INTO _meta (version) VALUES (4)`,
|
`INSERT OR IGNORE INTO _meta (version) VALUES (4)`,
|
||||||
|
`INSERT OR IGNORE INTO _meta (version) VALUES (5)`,
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -79,7 +80,7 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
schemaVersion = 4
|
schemaVersion = 5
|
||||||
|
|
||||||
eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
|
eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
|
||||||
insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
|
insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
|
||||||
@ -365,9 +366,42 @@ func (ei *EventIndex) migrateToVersion4(ctx context.Context) error {
|
|||||||
return xerrors.Errorf("commit transaction: %w", err)
|
return xerrors.Errorf("commit transaction: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Infof("Successfully migrated event index from version 3 to version 4 in %s", time.Since(now))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ei *EventIndex) migrateToVersion5(ctx context.Context) error {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
tx, err := ei.db.BeginTx(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("begin transaction: %w", err)
|
||||||
|
}
|
||||||
|
defer func() { _ = tx.Rollback() }()
|
||||||
|
|
||||||
|
stmtEventIndexUpdate, err := tx.PrepareContext(ctx, "UPDATE event SET event_index = (SELECT COUNT(*) FROM event e2 WHERE e2.tipset_key_cid = event.tipset_key_cid AND e2.id <= event.id) - 1")
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("prepare stmtEventIndexUpdate: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = stmtEventIndexUpdate.ExecContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("update event index: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (5)")
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("increment _meta version: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("commit transaction: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
ei.vacuumDBAndCheckpointWAL(ctx)
|
ei.vacuumDBAndCheckpointWAL(ctx)
|
||||||
|
|
||||||
log.Infof("Successfully migrated event index from version 3 to version 4 in %s", time.Since(now))
|
log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -452,6 +486,16 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor
|
|||||||
version = 4
|
version = 4
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if version == 4 {
|
||||||
|
log.Infof("Upgrading event index from version 4 to version 5")
|
||||||
|
err = eventIndex.migrateToVersion5(ctx)
|
||||||
|
if err != nil {
|
||||||
|
_ = db.Close()
|
||||||
|
return nil, xerrors.Errorf("could not migrate event index schema from version 4 to version 5: %w", err)
|
||||||
|
}
|
||||||
|
version = 5
|
||||||
|
}
|
||||||
|
|
||||||
if version != schemaVersion {
|
if version != schemaVersion {
|
||||||
_ = db.Close()
|
_ = db.Close()
|
||||||
return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion)
|
return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion)
|
||||||
@ -505,10 +549,11 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
|
|||||||
return xerrors.Errorf("load executed messages: %w", err)
|
return xerrors.Errorf("load executed messages: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
eventCount := 0
|
||||||
// iterate over all executed messages in this tipset and insert them into the database if they
|
// iterate over all executed messages in this tipset and insert them into the database if they
|
||||||
// don't exist, otherwise mark them as not reverted
|
// don't exist, otherwise mark them as not reverted
|
||||||
for msgIdx, em := range ems {
|
for msgIdx, em := range ems {
|
||||||
for evIdx, ev := range em.Events() {
|
for _, ev := range em.Events() {
|
||||||
addr, found := addressLookups[ev.Emitter]
|
addr, found := addressLookups[ev.Emitter]
|
||||||
if !found {
|
if !found {
|
||||||
var ok bool
|
var ok bool
|
||||||
@ -532,7 +577,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
|
|||||||
te.msgTs.Key().Bytes(), // tipset_key
|
te.msgTs.Key().Bytes(), // tipset_key
|
||||||
tsKeyCid.Bytes(), // tipset_key_cid
|
tsKeyCid.Bytes(), // tipset_key_cid
|
||||||
addr.Bytes(), // emitter_addr
|
addr.Bytes(), // emitter_addr
|
||||||
evIdx, // event_index
|
eventCount, // event_index
|
||||||
em.Message().Cid().Bytes(), // message_cid
|
em.Message().Cid().Bytes(), // message_cid
|
||||||
msgIdx, // message_index
|
msgIdx, // message_index
|
||||||
).Scan(&entryID)
|
).Scan(&entryID)
|
||||||
@ -547,7 +592,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
|
|||||||
te.msgTs.Key().Bytes(), // tipset_key
|
te.msgTs.Key().Bytes(), // tipset_key
|
||||||
tsKeyCid.Bytes(), // tipset_key_cid
|
tsKeyCid.Bytes(), // tipset_key_cid
|
||||||
addr.Bytes(), // emitter_addr
|
addr.Bytes(), // emitter_addr
|
||||||
evIdx, // event_index
|
eventCount, // event_index
|
||||||
em.Message().Cid().Bytes(), // message_cid
|
em.Message().Cid().Bytes(), // message_cid
|
||||||
msgIdx, // message_index
|
msgIdx, // message_index
|
||||||
false, // reverted
|
false, // reverted
|
||||||
@ -582,7 +627,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
|
|||||||
te.msgTs.Key().Bytes(), // tipset_key
|
te.msgTs.Key().Bytes(), // tipset_key
|
||||||
tsKeyCid.Bytes(), // tipset_key_cid
|
tsKeyCid.Bytes(), // tipset_key_cid
|
||||||
addr.Bytes(), // emitter_addr
|
addr.Bytes(), // emitter_addr
|
||||||
evIdx, // event_index
|
eventCount, // event_index
|
||||||
em.Message().Cid().Bytes(), // message_cid
|
em.Message().Cid().Bytes(), // message_cid
|
||||||
msgIdx, // message_index
|
msgIdx, // message_index
|
||||||
)
|
)
|
||||||
@ -600,6 +645,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
|
|||||||
log.Warnf("restored %d events but expected only one to exist", rowsAffected)
|
log.Warnf("restored %d events but expected only one to exist", rowsAffected)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
eventCount++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user