rename journal entries to journal events.

This commit is contained in:
Raúl Kripalani 2020-07-21 17:32:01 +01:00
parent d547c2588c
commit 4e82cf369c
14 changed files with 60 additions and 58 deletions

View File

@ -315,7 +315,7 @@ func (mp *MessagePool) repubLocal() {
log.Errorf("errors while republishing: %+v", errout)
}
journal.MaybeAddEntry(mp.jrnl, mp.evtTypes[evtTypeMpoolRepub], func() interface{} {
journal.MaybeRecordEvent(mp.jrnl, mp.evtTypes[evtTypeMpoolRepub], func() interface{} {
msgs := make([]MessagePoolEvt_Message, 0, len(outputMsgs))
for _, m := range outputMsgs {
msgs = append(msgs, MessagePoolEvt_Message{Message: m.Message, CID: m.Cid()})
@ -492,7 +492,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
Message: m,
}, localUpdates)
journal.MaybeAddEntry(mp.jrnl, mp.evtTypes[evtTypeMpoolAdd], func() interface{} {
journal.MaybeRecordEvent(mp.jrnl, mp.evtTypes[evtTypeMpoolAdd], func() interface{} {
return MessagePoolEvt{
Action: "add",
Messages: []MessagePoolEvt_Message{{Message: m.Message, CID: m.Cid()}},
@ -632,7 +632,7 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
Message: m,
}, localUpdates)
journal.MaybeAddEntry(mp.jrnl, mp.evtTypes[evtTypeMpoolRemove], func() interface{} {
journal.MaybeRecordEvent(mp.jrnl, mp.evtTypes[evtTypeMpoolRemove], func() interface{} {
return MessagePoolEvt{
Action: "remove",
Messages: []MessagePoolEvt_Message{{Message: m.Message, CID: m.Cid()}}}

View File

@ -348,7 +348,7 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
continue
}
journal.MaybeAddEntry(cs.journal, cs.evtTypes[evtTypeHeadChange], func() interface{} {
journal.MaybeRecordEvent(cs.journal, cs.evtTypes[evtTypeHeadChange], func() interface{} {
return HeadChangeEvt{
From: r.old.Key(),
FromHeight: r.old.Height(),

2
go.sum
View File

@ -82,6 +82,7 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkY
github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A=
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.32.11 h1:1nYF+Tfccn/hnAZsuwPPMSCVUVnx3j6LKOpx/WhgH0A=
github.com/aws/aws-sdk-go v1.32.11/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/beevik/ntp v0.2.0/go.mod h1:hIHWr+l3+/clUnF44zdK+CWW7fO8dR5cIylAQ76NRpg=
@ -656,6 +657,7 @@ github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M
github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 h1:rp+c0RAYOWj8l6qbCUTSiRLG/iKnW3K3/QfPPuSsBt4=
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak=

View File

@ -30,7 +30,7 @@ type fsJournal struct {
fi *os.File
fSize int64
incoming chan *Entry
incoming chan *Event
closing chan struct{}
}
@ -47,7 +47,7 @@ func OpenFSJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled DisabledEvents)
eventTypeFactory: newEventTypeFactory(disabled),
dir: dir,
sizeLimit: 1 << 30,
incoming: make(chan *Entry, 32),
incoming: make(chan *Event, 32),
closing: make(chan struct{}),
}
@ -64,8 +64,8 @@ func OpenFSJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled DisabledEvents)
return f, nil
}
func (f *fsJournal) AddEntry(evtType EventType, obj interface{}) {
je := &Entry{
func (f *fsJournal) RecordEvent(evtType EventType, obj interface{}) {
je := &Event{
EventType: evtType,
Timestamp: build.Clock.Now(),
Data: obj,
@ -73,7 +73,7 @@ func (f *fsJournal) AddEntry(evtType EventType, obj interface{}) {
select {
case f.incoming <- je:
case <-f.closing:
log.Warnw("journal closed but tried to log event", "entry", je)
log.Warnw("journal closed but tried to log event", "event", je)
}
}
@ -82,8 +82,8 @@ func (f *fsJournal) Close() error {
return nil
}
func (f *fsJournal) putEntry(je *Entry) error {
b, err := json.Marshal(je)
func (f *fsJournal) putEvent(evt *Event) error {
b, err := json.Marshal(evt)
if err != nil {
return err
}
@ -120,8 +120,8 @@ func (f *fsJournal) runLoop() {
for {
select {
case je := <-f.incoming:
if err := f.putEntry(je); err != nil {
log.Errorw("failed to write out journal entry", "entry", je, "err", err)
if err := f.putEvent(je); err != nil {
log.Errorw("failed to write out journal event", "event", je, "err", err)
}
case <-f.closing:
_ = f.fi.Close()

View File

@ -17,17 +17,17 @@ type (
replay bool
}
rmObserverCtrl *observer
getEntriesCtrl chan []*Entry
getEntriesCtrl chan []*Event
)
type MemJournal struct {
*eventTypeFactory
entries []*Entry
index map[string]map[string][]*Entry
entries []*Event
index map[string]map[string][]*Event
observers []observer
incomingCh chan *Entry
incomingCh chan *Event
controlCh chan interface{}
state int32 // guarded by atomic; 0=closed, 1=running.
@ -38,10 +38,10 @@ var _ Journal = (*MemJournal)(nil)
type observer struct {
accept map[EventType]struct{}
ch chan *Entry
ch chan *Event
}
func (o *observer) dispatch(entry *Entry) {
func (o *observer) dispatch(entry *Event) {
if o.accept == nil {
o.ch <- entry
}
@ -54,9 +54,9 @@ func NewMemoryJournal(lc fx.Lifecycle, disabled DisabledEvents) *MemJournal {
m := &MemJournal{
eventTypeFactory: newEventTypeFactory(disabled),
index: make(map[string]map[string][]*Entry, 16),
index: make(map[string]map[string][]*Event, 16),
observers: make([]observer, 0, 16),
incomingCh: make(chan *Entry, 256),
incomingCh: make(chan *Event, 256),
controlCh: make(chan interface{}, 16),
state: 1,
closed: make(chan struct{}),
@ -71,13 +71,13 @@ func NewMemoryJournal(lc fx.Lifecycle, disabled DisabledEvents) *MemJournal {
return m
}
func (m *MemJournal) AddEntry(evtType EventType, obj interface{}) {
func (m *MemJournal) RecordEvent(evtType EventType, obj interface{}) {
if !evtType.enabled || !evtType.safe {
// tried to record a disabled event type, or used an invalid EventType.
return
}
entry := &Entry{
entry := &Event{
EventType: evtType,
Timestamp: build.Clock.Now(),
Data: obj,
@ -111,7 +111,7 @@ func (m *MemJournal) Clear() {
// be replayed. To restrict the event types this observer will sent, use the
// include argument. If no include set is passed, the observer will receive all
// events types.
func (m *MemJournal) Observe(ctx context.Context, replay bool, include ...EventType) <-chan *Entry {
func (m *MemJournal) Observe(ctx context.Context, replay bool, include ...EventType) <-chan *Event {
var acc map[EventType]struct{}
if include != nil {
acc = make(map[EventType]struct{}, len(include))
@ -124,7 +124,7 @@ func (m *MemJournal) Observe(ctx context.Context, replay bool, include ...EventT
}
}
ch := make(chan *Entry, 256)
ch := make(chan *Event, 256)
o := &observer{
accept: acc,
ch: ch,
@ -151,8 +151,8 @@ func (m *MemJournal) Observe(ctx context.Context, replay bool, include ...EventT
}
// Entries gets a snapshot of stored entries.
func (m *MemJournal) Entries() []*Entry {
ch := make(chan []*Entry)
func (m *MemJournal) Entries() []*Event {
ch := make(chan []*Event)
m.controlCh <- getEntriesCtrl(ch)
return <-ch
}
@ -191,7 +191,7 @@ func (m *MemJournal) process() {
}
}
case getEntriesCtrl:
cpy := make([]*Entry, len(m.entries))
cpy := make([]*Event, len(m.entries))
copy(cpy, m.entries)
msg <- cpy
close(msg)
@ -200,7 +200,7 @@ func (m *MemJournal) process() {
processClose := func() {
m.entries = nil
m.index = make(map[string]map[string][]*Entry, 16)
m.index = make(map[string]map[string][]*Event, 16)
for _, o := range m.observers {
close(o.ch)
}
@ -225,7 +225,7 @@ func (m *MemJournal) process() {
m.entries = append(m.entries, entry)
events := m.index[entry.System]
if events == nil {
events = make(map[string][]*Entry, 16)
events = make(map[string][]*Event, 16)
m.index[entry.System] = events
}

View File

@ -71,7 +71,7 @@ func TestMemJournal_Close(t *testing.T) {
time.Sleep(500 * time.Millisecond)
NextChannel:
for _, ch := range []<-chan *Entry{o1, o2, o3} {
for _, ch := range []<-chan *Event{o1, o2, o3} {
for {
select {
case _, more := <-ch:
@ -171,7 +171,7 @@ func addEntries(journal *MemJournal, count int) {
// RegisterEventType is not _really_ intended to be used this way (on every write).
et := journal.RegisterEventType("spaceship", fmt.Sprintf("wheezing-%d", eventIdx))
journal.AddEntry(et, HeadChangeEvt{
journal.RecordEvent(et, HeadChangeEvt{
From: types.TipSetKey{},
FromHeight: abi.ChainEpoch(i),
To: types.TipSetKey{},

View File

@ -11,6 +11,6 @@ func NilJournal() Journal {
func (n *nilJournal) RegisterEventType(_, _ string) EventType { return EventType{} }
func (n *nilJournal) AddEntry(_ EventType, _ interface{}) {}
func (n *nilJournal) RecordEvent(_ EventType, _ interface{}) {}
func (n *nilJournal) Close() error { return nil }

View File

@ -1,17 +1,17 @@
package journal
// MaybeAddEntry is a convenience function that evaluates if the EventType is
// enabled, and if so, it calls the supplier to create the entry and
// subsequently journal.AddEntry on the provided journal to record it.
// MaybeRecordEvent is a convenience function that evaluates if the EventType is
// enabled, and if so, it calls the supplier to create the event and
// subsequently journal.RecordEvent on the provided journal to record it.
//
// This is safe to call with a nil Journal, either because the value is nil,
// or because a journal obtained through NilJournal() is in use.
func MaybeAddEntry(journal Journal, evtType EventType, supplier func() interface{}) {
func MaybeRecordEvent(journal Journal, evtType EventType, supplier func() interface{}) {
if journal == nil || journal == nilj {
return
}
if !evtType.Enabled() {
return
}
journal.AddEntry(evtType, supplier())
journal.RecordEvent(evtType, supplier())
}

View File

@ -47,18 +47,18 @@ type Journal interface {
// entries appropriately.
RegisterEventType(system, event string) EventType
// AddEntry adds an entry to this journal. See godocs on the Journal type
// RecordEvent records this event to the journal. See godocs on the Journal type
// for more info.
AddEntry(evtType EventType, data interface{})
RecordEvent(evtType EventType, data interface{})
// Close closes this journal for further writing.
Close() error
}
// Entry represents a journal entry.
// Event represents a journal entry.
//
// See godocs on Journal for more information.
type Entry struct {
type Event struct {
EventType
Timestamp time.Time

View File

@ -247,7 +247,7 @@ func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal stor
}
dealID := res.IDs[dealIdx]
journal.MaybeAddEntry(c.jrnl, c.evtTypes[evtTypeDealAccepted], func() interface{} {
journal.MaybeRecordEvent(c.jrnl, c.evtTypes[evtTypeDealAccepted], func() interface{} {
deal := deal // copy and strip fields we don't want to log to the journal
deal.ClientSignature = crypto.Signature{}
return ClientDealAcceptedEvt{ID: dealID, Deal: deal, Height: c.cs.GetHeaviestTipSet().Height()}
@ -296,7 +296,7 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider
log.Infof("Storage deal %d activated at epoch %d", dealId, sd.State.SectorStartEpoch)
journal.MaybeAddEntry(c.jrnl, c.evtTypes[evtTypeDealSectorCommitted], func() interface{} {
journal.MaybeRecordEvent(c.jrnl, c.evtTypes[evtTypeDealSectorCommitted], func() interface{} {
return ClientDealSectorCommittedEvt{ID: dealId, State: sd.State, Height: curH}
})

View File

@ -115,7 +115,7 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema
log.Warnf("New Deal: deal %d", deal.DealID)
journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealAccepted], func() interface{} {
journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealAccepted], func() interface{} {
deal := deal // copy and strip fields we don't want to log to the journal
deal.ClientSignature = crypto.Signature{}
return MinerDealAcceptedEvt{ID: deal.DealID, Deal: deal}
@ -287,7 +287,7 @@ func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provide
log.Infof("Storage deal %d activated at epoch %d", dealID, sd.State.SectorStartEpoch)
journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealSectorCommitted], func() interface{} {
journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealSectorCommitted], func() interface{} {
return MinerDealSectorCommittedEvt{ID: dealID, State: sd.State, Height: curH}
})
@ -397,7 +397,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
// Check if the deal has already expired
if sd.Proposal.EndEpoch <= height {
onDealExpired(nil)
journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealExpired], func() interface{} {
journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealExpired], func() interface{} {
return MinerDealExpiredEvt{ID: dealID, State: sd.State, Height: height}
})
return true, false, nil
@ -406,7 +406,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
// If there is no deal assume it's already been slashed
if sd.State.SectorStartEpoch < 0 {
onDealSlashed(height, nil)
journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealSlashed], func() interface{} {
journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealSlashed], func() interface{} {
return MinerDealSlashedEvt{ID: dealID, State: sd.State, Height: height}
})
return true, false, nil
@ -425,7 +425,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
// Check if the deal has already expired
if sd.Proposal.EndEpoch <= height {
onDealExpired(nil)
journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealExpired], func() interface{} {
journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealExpired], func() interface{} {
return MinerDealExpiredEvt{ID: dealID, State: sd.State, Height: height}
})
return false, nil
@ -451,7 +451,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID
// Deal was slashed
if deal.To == nil {
onDealSlashed(height, nil)
journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealSlashed], func() interface{} {
journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealSlashed], func() interface{} {
return MinerDealSlashedEvt{ID: dealID, State: sd.State, Height: height}
})
return false, nil

View File

@ -134,7 +134,7 @@ func (m *Miner) Run(ctx context.Context) error {
}
func (m *Miner) handleSealingNotifications(before, after sealing.SectorInfo) {
journal.MaybeAddEntry(m.jrnl, m.sealingEvtType, func() interface{} {
journal.MaybeRecordEvent(m.jrnl, m.sealingEvtType, func() interface{} {
return SealingStateEvt{
SectorNumber: before.SectorNumber,
SectorType: before.SectorType,

View File

@ -27,7 +27,7 @@ import (
var errNoPartitions = errors.New("no partitions")
func (s *WindowPoStScheduler) failPost(err error, deadline *miner.DeadlineInfo) {
journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} {
journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} {
return WindowPoStEvt{
State: "failed",
Deadline: s.activeDeadline,
@ -51,7 +51,7 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *miner.Deadli
s.abort = abort
s.activeDeadline = deadline
journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} {
journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} {
return WindowPoStEvt{
State: "started",
Deadline: s.activeDeadline,
@ -82,7 +82,7 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *miner.Deadli
return
}
journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} {
journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} {
return WindowPoStEvt{
State: "succeeded",
Deadline: s.activeDeadline,
@ -148,7 +148,7 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
Recoveries: []miner.RecoveryDeclaration{},
}
defer journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} {
defer journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} {
var mcid cid.Cid
if sm != nil {
mcid = sm.Cid()
@ -258,7 +258,7 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64,
Faults: []miner.FaultDeclaration{},
}
defer journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} {
defer journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} {
var mcid cid.Cid
if sm != nil {
mcid = sm.Cid()
@ -519,7 +519,7 @@ func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.Submi
var sm *types.SignedMessage
defer journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} {
defer journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} {
var mcid cid.Cid
if sm != nil {
mcid = sm.Cid()

View File

@ -247,7 +247,7 @@ func (s *WindowPoStScheduler) abortActivePoSt() {
if s.abort != nil {
s.abort()
journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} {
journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} {
return WindowPoStEvt{
State: "abort",
Deadline: s.activeDeadline,