Do nothing on duplicate log insert

- Prevents duplicate key constraint violation from blocking the process
  from moving forward on restart.
- If header_id, log_idx, and tx_idx are the same, we can safely do
  nothing since it's definitely the same log - a reorg would cause the
  original header to be replaced with a new ID.
This commit is contained in:
Rob Mulholand 2019-09-21 14:13:06 -05:00
parent dc30099a7c
commit 37e2673a8d
2 changed files with 25 additions and 9 deletions

View File

@ -128,7 +128,7 @@ func (r *eventRepository) persistHeaderSyncLogs(logs []types.Log, eventInfo type
for i := 0; i < el; i++ {
pgStr = pgStr + fmt.Sprintf(", $%d", i+6)
}
pgStr = pgStr + ")"
pgStr = pgStr + ") ON CONFLICT DO NOTHING"
// Add this query to the transaction
_, err = tx.Exec(pgStr, data...)

View File

@ -320,17 +320,13 @@ var _ = Describe("Repository", func() {
err = hr.AddCheckColumn(event.Name + "_" + con.Address)
Expect(err).ToNot(HaveOccurred())
// Try and fail to persist the same log twice in a single call
err = dataStore.PersistLogs([]types.Log{logs[0], logs[0]}, event, con.Address, con.Name)
Expect(err).To(HaveOccurred())
// Successfuly persist the two unique logs
// Successfully persist the two unique logs
err = dataStore.PersistLogs(logs, event, con.Address, con.Name)
Expect(err).ToNot(HaveOccurred())
// Try and fail to persist the same logs again in separate call
err = dataStore.PersistLogs([]types.Log{*log}, event, con.Address, con.Name)
Expect(err).To(HaveOccurred())
// Try to insert the same logs again
err = dataStore.PersistLogs(logs, event, con.Address, con.Name)
Expect(err).ToNot(HaveOccurred())
// Show that no new logs were entered
var count int
@ -339,6 +335,26 @@ var _ = Describe("Repository", func() {
Expect(count).To(Equal(2))
})
It("inserts additional log if only some are duplicate", func() {
hr := lr.NewHeaderRepository(db)
err = hr.AddCheckColumn(event.Name + "_" + con.Address)
Expect(err).ToNot(HaveOccurred())
// Successfully persist first log
err = dataStore.PersistLogs([]types.Log{logs[0]}, event, con.Address, con.Name)
Expect(err).ToNot(HaveOccurred())
// Successfully persist second log even though first already persisted
err = dataStore.PersistLogs(logs, event, con.Address, con.Name)
Expect(err).ToNot(HaveOccurred())
// Show that both logs were entered
var count int
err = db.Get(&count, fmt.Sprintf("SELECT COUNT(*) FROM header_%s.transfer_event", constants.TusdContractAddress))
Expect(err).ToNot(HaveOccurred())
Expect(count).To(Equal(2))
})
It("Fails if the persisted event does not have a corresponding eventID column in the checked_headers table", func() {
err = dataStore.PersistLogs(logs, event, con.Address, con.Name)
Expect(err).To(HaveOccurred())