diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index dcd7acf50..28cf1b263 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -315,7 +315,7 @@ func (mp *MessagePool) repubLocal() { log.Errorf("errors while republishing: %+v", errout) } - journal.MaybeAddEntry(mp.jrnl, mp.evtTypes[evtTypeMpoolRepub], func() interface{} { + journal.MaybeRecordEvent(mp.jrnl, mp.evtTypes[evtTypeMpoolRepub], func() interface{} { msgs := make([]MessagePoolEvt_Message, 0, len(outputMsgs)) for _, m := range outputMsgs { msgs = append(msgs, MessagePoolEvt_Message{Message: m.Message, CID: m.Cid()}) @@ -492,7 +492,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error { Message: m, }, localUpdates) - journal.MaybeAddEntry(mp.jrnl, mp.evtTypes[evtTypeMpoolAdd], func() interface{} { + journal.MaybeRecordEvent(mp.jrnl, mp.evtTypes[evtTypeMpoolAdd], func() interface{} { return MessagePoolEvt{ Action: "add", Messages: []MessagePoolEvt_Message{{Message: m.Message, CID: m.Cid()}}, @@ -632,7 +632,7 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) { Message: m, }, localUpdates) - journal.MaybeAddEntry(mp.jrnl, mp.evtTypes[evtTypeMpoolRemove], func() interface{} { + journal.MaybeRecordEvent(mp.jrnl, mp.evtTypes[evtTypeMpoolRemove], func() interface{} { return MessagePoolEvt{ Action: "remove", Messages: []MessagePoolEvt_Message{{Message: m.Message, CID: m.Cid()}}} diff --git a/chain/store/store.go b/chain/store/store.go index ba7a54ed3..07ac57ac4 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -348,7 +348,7 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo continue } - journal.MaybeAddEntry(cs.journal, cs.evtTypes[evtTypeHeadChange], func() interface{} { + journal.MaybeRecordEvent(cs.journal, cs.evtTypes[evtTypeHeadChange], func() interface{} { return HeadChangeEvt{ From: r.old.Key(), FromHeight: r.old.Height(), diff --git a/go.sum b/go.sum index 0ffae068c..4d99d3d46 100644 --- a/go.sum +++ b/go.sum @@ -82,6 +82,7 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkY github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6lCRdSC2Tm3DSWRPvIPr6xNKyeHdqDQSQT+A= github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/aws/aws-sdk-go v1.32.11 h1:1nYF+Tfccn/hnAZsuwPPMSCVUVnx3j6LKOpx/WhgH0A= github.com/aws/aws-sdk-go v1.32.11/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/beevik/ntp v0.2.0/go.mod h1:hIHWr+l3+/clUnF44zdK+CWW7fO8dR5cIylAQ76NRpg= @@ -656,6 +657,7 @@ github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 h1:rp+c0RAYOWj8l6qbCUTSiRLG/iKnW3K3/QfPPuSsBt4= github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901/go.mod h1:Z86h9688Y0wesXCyonoVr47MasHilkuLMqGhRZ4Hpak= diff --git a/journal/filesystem.go b/journal/filesystem.go index 7aa470a2e..0f953b4e2 100644 --- a/journal/filesystem.go +++ b/journal/filesystem.go @@ -30,7 +30,7 @@ type fsJournal struct { fi *os.File fSize int64 - incoming chan *Entry + incoming chan *Event closing chan struct{} } @@ -47,7 +47,7 @@ func OpenFSJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled DisabledEvents) eventTypeFactory: newEventTypeFactory(disabled), dir: dir, sizeLimit: 1 << 30, - incoming: make(chan *Entry, 32), + incoming: make(chan *Event, 32), closing: make(chan struct{}), } @@ -64,8 +64,8 @@ func OpenFSJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled DisabledEvents) return f, nil } -func (f *fsJournal) AddEntry(evtType EventType, obj interface{}) { - je := &Entry{ +func (f *fsJournal) RecordEvent(evtType EventType, obj interface{}) { + je := &Event{ EventType: evtType, Timestamp: build.Clock.Now(), Data: obj, @@ -73,7 +73,7 @@ func (f *fsJournal) AddEntry(evtType EventType, obj interface{}) { select { case f.incoming <- je: case <-f.closing: - log.Warnw("journal closed but tried to log event", "entry", je) + log.Warnw("journal closed but tried to log event", "event", je) } } @@ -82,8 +82,8 @@ func (f *fsJournal) Close() error { return nil } -func (f *fsJournal) putEntry(je *Entry) error { - b, err := json.Marshal(je) +func (f *fsJournal) putEvent(evt *Event) error { + b, err := json.Marshal(evt) if err != nil { return err } @@ -120,8 +120,8 @@ func (f *fsJournal) runLoop() { for { select { case je := <-f.incoming: - if err := f.putEntry(je); err != nil { - log.Errorw("failed to write out journal entry", "entry", je, "err", err) + if err := f.putEvent(je); err != nil { + log.Errorw("failed to write out journal event", "event", je, "err", err) } case <-f.closing: _ = f.fi.Close() diff --git a/journal/memory.go b/journal/memory.go index 5471d3e11..6676b9696 100644 --- a/journal/memory.go +++ b/journal/memory.go @@ -17,17 +17,17 @@ type ( replay bool } rmObserverCtrl *observer - getEntriesCtrl chan []*Entry + getEntriesCtrl chan []*Event ) type MemJournal struct { *eventTypeFactory - entries []*Entry - index map[string]map[string][]*Entry + entries []*Event + index map[string]map[string][]*Event observers []observer - incomingCh chan *Entry + incomingCh chan *Event controlCh chan interface{} state int32 // guarded by atomic; 0=closed, 1=running. @@ -38,10 +38,10 @@ var _ Journal = (*MemJournal)(nil) type observer struct { accept map[EventType]struct{} - ch chan *Entry + ch chan *Event } -func (o *observer) dispatch(entry *Entry) { +func (o *observer) dispatch(entry *Event) { if o.accept == nil { o.ch <- entry } @@ -54,9 +54,9 @@ func NewMemoryJournal(lc fx.Lifecycle, disabled DisabledEvents) *MemJournal { m := &MemJournal{ eventTypeFactory: newEventTypeFactory(disabled), - index: make(map[string]map[string][]*Entry, 16), + index: make(map[string]map[string][]*Event, 16), observers: make([]observer, 0, 16), - incomingCh: make(chan *Entry, 256), + incomingCh: make(chan *Event, 256), controlCh: make(chan interface{}, 16), state: 1, closed: make(chan struct{}), @@ -71,13 +71,13 @@ func NewMemoryJournal(lc fx.Lifecycle, disabled DisabledEvents) *MemJournal { return m } -func (m *MemJournal) AddEntry(evtType EventType, obj interface{}) { +func (m *MemJournal) RecordEvent(evtType EventType, obj interface{}) { if !evtType.enabled || !evtType.safe { // tried to record a disabled event type, or used an invalid EventType. return } - entry := &Entry{ + entry := &Event{ EventType: evtType, Timestamp: build.Clock.Now(), Data: obj, @@ -111,7 +111,7 @@ func (m *MemJournal) Clear() { // be replayed. To restrict the event types this observer will sent, use the // include argument. If no include set is passed, the observer will receive all // events types. -func (m *MemJournal) Observe(ctx context.Context, replay bool, include ...EventType) <-chan *Entry { +func (m *MemJournal) Observe(ctx context.Context, replay bool, include ...EventType) <-chan *Event { var acc map[EventType]struct{} if include != nil { acc = make(map[EventType]struct{}, len(include)) @@ -124,7 +124,7 @@ func (m *MemJournal) Observe(ctx context.Context, replay bool, include ...EventT } } - ch := make(chan *Entry, 256) + ch := make(chan *Event, 256) o := &observer{ accept: acc, ch: ch, @@ -151,8 +151,8 @@ func (m *MemJournal) Observe(ctx context.Context, replay bool, include ...EventT } // Entries gets a snapshot of stored entries. -func (m *MemJournal) Entries() []*Entry { - ch := make(chan []*Entry) +func (m *MemJournal) Entries() []*Event { + ch := make(chan []*Event) m.controlCh <- getEntriesCtrl(ch) return <-ch } @@ -191,7 +191,7 @@ func (m *MemJournal) process() { } } case getEntriesCtrl: - cpy := make([]*Entry, len(m.entries)) + cpy := make([]*Event, len(m.entries)) copy(cpy, m.entries) msg <- cpy close(msg) @@ -200,7 +200,7 @@ func (m *MemJournal) process() { processClose := func() { m.entries = nil - m.index = make(map[string]map[string][]*Entry, 16) + m.index = make(map[string]map[string][]*Event, 16) for _, o := range m.observers { close(o.ch) } @@ -225,7 +225,7 @@ func (m *MemJournal) process() { m.entries = append(m.entries, entry) events := m.index[entry.System] if events == nil { - events = make(map[string][]*Entry, 16) + events = make(map[string][]*Event, 16) m.index[entry.System] = events } diff --git a/journal/memory_test.go b/journal/memory_test.go index 442a7f46d..db38085e3 100644 --- a/journal/memory_test.go +++ b/journal/memory_test.go @@ -71,7 +71,7 @@ func TestMemJournal_Close(t *testing.T) { time.Sleep(500 * time.Millisecond) NextChannel: - for _, ch := range []<-chan *Entry{o1, o2, o3} { + for _, ch := range []<-chan *Event{o1, o2, o3} { for { select { case _, more := <-ch: @@ -171,7 +171,7 @@ func addEntries(journal *MemJournal, count int) { // RegisterEventType is not _really_ intended to be used this way (on every write). et := journal.RegisterEventType("spaceship", fmt.Sprintf("wheezing-%d", eventIdx)) - journal.AddEntry(et, HeadChangeEvt{ + journal.RecordEvent(et, HeadChangeEvt{ From: types.TipSetKey{}, FromHeight: abi.ChainEpoch(i), To: types.TipSetKey{}, diff --git a/journal/nil.go b/journal/nil.go index 0123fe585..5d0c78b05 100644 --- a/journal/nil.go +++ b/journal/nil.go @@ -11,6 +11,6 @@ func NilJournal() Journal { func (n *nilJournal) RegisterEventType(_, _ string) EventType { return EventType{} } -func (n *nilJournal) AddEntry(_ EventType, _ interface{}) {} +func (n *nilJournal) RecordEvent(_ EventType, _ interface{}) {} func (n *nilJournal) Close() error { return nil } diff --git a/journal/sugar.go b/journal/sugar.go index 9ee59f2c7..069434916 100644 --- a/journal/sugar.go +++ b/journal/sugar.go @@ -1,17 +1,17 @@ package journal -// MaybeAddEntry is a convenience function that evaluates if the EventType is -// enabled, and if so, it calls the supplier to create the entry and -// subsequently journal.AddEntry on the provided journal to record it. +// MaybeRecordEvent is a convenience function that evaluates if the EventType is +// enabled, and if so, it calls the supplier to create the event and +// subsequently journal.RecordEvent on the provided journal to record it. // // This is safe to call with a nil Journal, either because the value is nil, // or because a journal obtained through NilJournal() is in use. -func MaybeAddEntry(journal Journal, evtType EventType, supplier func() interface{}) { +func MaybeRecordEvent(journal Journal, evtType EventType, supplier func() interface{}) { if journal == nil || journal == nilj { return } if !evtType.Enabled() { return } - journal.AddEntry(evtType, supplier()) + journal.RecordEvent(evtType, supplier()) } diff --git a/journal/types.go b/journal/types.go index 95349b24e..b1c36b515 100644 --- a/journal/types.go +++ b/journal/types.go @@ -47,18 +47,18 @@ type Journal interface { // entries appropriately. RegisterEventType(system, event string) EventType - // AddEntry adds an entry to this journal. See godocs on the Journal type + // RecordEvent records this event to the journal. See godocs on the Journal type // for more info. - AddEntry(evtType EventType, data interface{}) + RecordEvent(evtType EventType, data interface{}) // Close closes this journal for further writing. Close() error } -// Entry represents a journal entry. +// Event represents a journal entry. // // See godocs on Journal for more information. -type Entry struct { +type Event struct { EventType Timestamp time.Time diff --git a/markets/storageadapter/client.go b/markets/storageadapter/client.go index 1da85fa69..925a0a271 100644 --- a/markets/storageadapter/client.go +++ b/markets/storageadapter/client.go @@ -247,7 +247,7 @@ func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal stor } dealID := res.IDs[dealIdx] - journal.MaybeAddEntry(c.jrnl, c.evtTypes[evtTypeDealAccepted], func() interface{} { + journal.MaybeRecordEvent(c.jrnl, c.evtTypes[evtTypeDealAccepted], func() interface{} { deal := deal // copy and strip fields we don't want to log to the journal deal.ClientSignature = crypto.Signature{} return ClientDealAcceptedEvt{ID: dealID, Deal: deal, Height: c.cs.GetHeaviestTipSet().Height()} @@ -296,7 +296,7 @@ func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider log.Infof("Storage deal %d activated at epoch %d", dealId, sd.State.SectorStartEpoch) - journal.MaybeAddEntry(c.jrnl, c.evtTypes[evtTypeDealSectorCommitted], func() interface{} { + journal.MaybeRecordEvent(c.jrnl, c.evtTypes[evtTypeDealSectorCommitted], func() interface{} { return ClientDealSectorCommittedEvt{ID: dealId, State: sd.State, Height: curH} }) diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index 2aa9c45df..0b877cf6e 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -115,7 +115,7 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema log.Warnf("New Deal: deal %d", deal.DealID) - journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealAccepted], func() interface{} { + journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealAccepted], func() interface{} { deal := deal // copy and strip fields we don't want to log to the journal deal.ClientSignature = crypto.Signature{} return MinerDealAcceptedEvt{ID: deal.DealID, Deal: deal} @@ -287,7 +287,7 @@ func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provide log.Infof("Storage deal %d activated at epoch %d", dealID, sd.State.SectorStartEpoch) - journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealSectorCommitted], func() interface{} { + journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealSectorCommitted], func() interface{} { return MinerDealSectorCommittedEvt{ID: dealID, State: sd.State, Height: curH} }) @@ -397,7 +397,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID // Check if the deal has already expired if sd.Proposal.EndEpoch <= height { onDealExpired(nil) - journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealExpired], func() interface{} { + journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealExpired], func() interface{} { return MinerDealExpiredEvt{ID: dealID, State: sd.State, Height: height} }) return true, false, nil @@ -406,7 +406,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID // If there is no deal assume it's already been slashed if sd.State.SectorStartEpoch < 0 { onDealSlashed(height, nil) - journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealSlashed], func() interface{} { + journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealSlashed], func() interface{} { return MinerDealSlashedEvt{ID: dealID, State: sd.State, Height: height} }) return true, false, nil @@ -425,7 +425,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID // Check if the deal has already expired if sd.Proposal.EndEpoch <= height { onDealExpired(nil) - journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealExpired], func() interface{} { + journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealExpired], func() interface{} { return MinerDealExpiredEvt{ID: dealID, State: sd.State, Height: height} }) return false, nil @@ -451,7 +451,7 @@ func (n *ProviderNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID // Deal was slashed if deal.To == nil { onDealSlashed(height, nil) - journal.MaybeAddEntry(n.jrnl, n.evtTypes[evtTypeDealSlashed], func() interface{} { + journal.MaybeRecordEvent(n.jrnl, n.evtTypes[evtTypeDealSlashed], func() interface{} { return MinerDealSlashedEvt{ID: dealID, State: sd.State, Height: height} }) return false, nil diff --git a/storage/miner.go b/storage/miner.go index 6f16b9cfe..499509b25 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -134,7 +134,7 @@ func (m *Miner) Run(ctx context.Context) error { } func (m *Miner) handleSealingNotifications(before, after sealing.SectorInfo) { - journal.MaybeAddEntry(m.jrnl, m.sealingEvtType, func() interface{} { + journal.MaybeRecordEvent(m.jrnl, m.sealingEvtType, func() interface{} { return SealingStateEvt{ SectorNumber: before.SectorNumber, SectorType: before.SectorType, diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index 191ac5319..c6aedc068 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -27,7 +27,7 @@ import ( var errNoPartitions = errors.New("no partitions") func (s *WindowPoStScheduler) failPost(err error, deadline *miner.DeadlineInfo) { - journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} { + journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} { return WindowPoStEvt{ State: "failed", Deadline: s.activeDeadline, @@ -51,7 +51,7 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *miner.Deadli s.abort = abort s.activeDeadline = deadline - journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} { + journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} { return WindowPoStEvt{ State: "started", Deadline: s.activeDeadline, @@ -82,7 +82,7 @@ func (s *WindowPoStScheduler) doPost(ctx context.Context, deadline *miner.Deadli return } - journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} { + journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} { return WindowPoStEvt{ State: "succeeded", Deadline: s.activeDeadline, @@ -148,7 +148,7 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin Recoveries: []miner.RecoveryDeclaration{}, } - defer journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} { + defer journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} { var mcid cid.Cid if sm != nil { mcid = sm.Cid() @@ -258,7 +258,7 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64, Faults: []miner.FaultDeclaration{}, } - defer journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} { + defer journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} { var mcid cid.Cid if sm != nil { mcid = sm.Cid() @@ -519,7 +519,7 @@ func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.Submi var sm *types.SignedMessage - defer journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} { + defer journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} { var mcid cid.Cid if sm != nil { mcid = sm.Cid() diff --git a/storage/wdpost_sched.go b/storage/wdpost_sched.go index 3bc591bb7..8344097dc 100644 --- a/storage/wdpost_sched.go +++ b/storage/wdpost_sched.go @@ -247,7 +247,7 @@ func (s *WindowPoStScheduler) abortActivePoSt() { if s.abort != nil { s.abort() - journal.MaybeAddEntry(s.jrnl, s.wdPoStEvtType, func() interface{} { + journal.MaybeRecordEvent(s.jrnl, s.wdPoStEvtType, func() interface{} { return WindowPoStEvt{ State: "abort", Deadline: s.activeDeadline,