From 5074ce5a38986ac517c00fc2c022aa2dcf01dd4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Mon, 10 Aug 2020 15:02:15 +0100 Subject: [PATCH] move in-mem journal to Project Oni. --- journal/{filesystem.go => fs.go} | 4 +- journal/memory.go | 248 ------------------------------- journal/memory_test.go | 183 ----------------------- journal/sugar.go | 17 --- journal/types.go | 38 ++++- 5 files changed, 33 insertions(+), 457 deletions(-) rename journal/{filesystem.go => fs.go} (97%) delete mode 100644 journal/memory.go delete mode 100644 journal/memory_test.go delete mode 100644 journal/sugar.go diff --git a/journal/filesystem.go b/journal/fs.go similarity index 97% rename from journal/filesystem.go rename to journal/fs.go index 0f953b4e2..096f5f1fd 100644 --- a/journal/filesystem.go +++ b/journal/fs.go @@ -21,7 +21,7 @@ var log = logging.Logger("journal") // fsJournal is a basic journal backed by files on a filesystem. type fsJournal struct { - *eventTypeFactory + EventTypeFactory dir string sizeLimit int64 @@ -44,7 +44,7 @@ func OpenFSJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled DisabledEvents) } f := &fsJournal{ - eventTypeFactory: newEventTypeFactory(disabled), + EventTypeFactory: NewEventTypeFactory(disabled), dir: dir, sizeLimit: 1 << 30, incoming: make(chan *Event, 32), diff --git a/journal/memory.go b/journal/memory.go deleted file mode 100644 index 6676b9696..000000000 --- a/journal/memory.go +++ /dev/null @@ -1,248 +0,0 @@ -package journal - -import ( - "context" - "sync/atomic" - - "go.uber.org/fx" - - "github.com/filecoin-project/lotus/build" -) - -// Control messages. -type ( - clearCtrl struct{} - addObserverCtrl struct { - observer *observer - replay bool - } - rmObserverCtrl *observer - getEntriesCtrl chan []*Event -) - -type MemJournal struct { - *eventTypeFactory - - entries []*Event - index map[string]map[string][]*Event - observers []observer - - incomingCh chan *Event - controlCh chan interface{} - - state int32 // guarded by atomic; 0=closed, 1=running. - closed chan struct{} -} - -var _ Journal = (*MemJournal)(nil) - -type observer struct { - accept map[EventType]struct{} - ch chan *Event -} - -func (o *observer) dispatch(entry *Event) { - if o.accept == nil { - o.ch <- entry - } - if _, ok := o.accept[entry.EventType]; ok { - o.ch <- entry - } -} - -func NewMemoryJournal(lc fx.Lifecycle, disabled DisabledEvents) *MemJournal { - m := &MemJournal{ - eventTypeFactory: newEventTypeFactory(disabled), - - index: make(map[string]map[string][]*Event, 16), - observers: make([]observer, 0, 16), - incomingCh: make(chan *Event, 256), - controlCh: make(chan interface{}, 16), - state: 1, - closed: make(chan struct{}), - } - - lc.Append(fx.Hook{ - OnStop: func(_ context.Context) error { return m.Close() }, - }) - - go m.process() - - return m -} - -func (m *MemJournal) RecordEvent(evtType EventType, obj interface{}) { - if !evtType.enabled || !evtType.safe { - // tried to record a disabled event type, or used an invalid EventType. - return - } - - entry := &Event{ - EventType: evtType, - Timestamp: build.Clock.Now(), - Data: obj, - } - - select { - case m.incomingCh <- entry: - case <-m.closed: - } -} - -func (m *MemJournal) Close() error { - if !atomic.CompareAndSwapInt32(&m.state, 1, 0) { - // already closed. - return nil - } - close(m.closed) - return nil -} - -func (m *MemJournal) Clear() { - select { - case m.controlCh <- clearCtrl{}: - case <-m.closed: - } -} - -// Observe starts observing events that are recorded in the MemJournal, and -// returns a channel where new events will be sent. When replay is true, all -// entries that have been recorded prior to the observer being registered will -// be replayed. To restrict the event types this observer will sent, use the -// include argument. If no include set is passed, the observer will receive all -// events types. -func (m *MemJournal) Observe(ctx context.Context, replay bool, include ...EventType) <-chan *Event { - var acc map[EventType]struct{} - if include != nil { - acc = make(map[EventType]struct{}, len(include)) - for _, et := range include { - if !et.enabled { - // skip over disabled event type. - continue - } - acc[et] = struct{}{} - } - } - - ch := make(chan *Event, 256) - o := &observer{ - accept: acc, - ch: ch, - } - - // watch the context, and fire the "remove observer" control message upon - // cancellation. - go func() { - <-ctx.Done() - select { - case m.controlCh <- rmObserverCtrl(o): - case <-m.closed: - } - }() - - select { - case m.controlCh <- addObserverCtrl{o, replay}: - case <-m.closed: - // we are already stopped. - close(ch) - } - - return ch -} - -// Entries gets a snapshot of stored entries. -func (m *MemJournal) Entries() []*Event { - ch := make(chan []*Event) - m.controlCh <- getEntriesCtrl(ch) - return <-ch -} - -func (m *MemJournal) process() { - processCtrlMsg := func(message interface{}) { - switch msg := message.(type) { - case addObserverCtrl: - // adding an observer. - m.observers = append(m.observers, *msg.observer) - - if msg.replay { - // replay all existing entries. - for _, e := range m.entries { - msg.observer.dispatch(e) - } - } - case rmObserverCtrl: - // removing an observer; find the observer, close its channel. - // then discard it from our list by replacing it with the last - // observer and reslicing. - for i, o := range m.observers { - if o.ch == msg.ch { - close(o.ch) - m.observers[i] = m.observers[len(m.observers)-1] - m.observers = m.observers[:len(m.observers)-1] - } - } - case clearCtrl: - m.entries = m.entries[0:0] - // carry over system and event names; there are unlikely to change; - // just reslice the entry slices, so we are not thrashing memory. - for _, events := range m.index { - for ev := range events { - events[ev] = events[ev][0:0] - } - } - case getEntriesCtrl: - cpy := make([]*Event, len(m.entries)) - copy(cpy, m.entries) - msg <- cpy - close(msg) - } - } - - processClose := func() { - m.entries = nil - m.index = make(map[string]map[string][]*Event, 16) - for _, o := range m.observers { - close(o.ch) - } - m.observers = nil - } - - for { - // Drain all control messages first! - select { - case msg := <-m.controlCh: - processCtrlMsg(msg) - continue - case <-m.closed: - processClose() - return - default: - } - - // Now consume and pipe messages. - select { - case entry := <-m.incomingCh: - m.entries = append(m.entries, entry) - events := m.index[entry.System] - if events == nil { - events = make(map[string][]*Event, 16) - m.index[entry.System] = events - } - - entries := events[entry.Event] - events[entry.Event] = append(entries, entry) - - for _, o := range m.observers { - o.dispatch(entry) - } - - case msg := <-m.controlCh: - processCtrlMsg(msg) - continue - - case <-m.closed: - processClose() - return - } - } -} diff --git a/journal/memory_test.go b/journal/memory_test.go deleted file mode 100644 index db38085e3..000000000 --- a/journal/memory_test.go +++ /dev/null @@ -1,183 +0,0 @@ -package journal - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/types" - - "github.com/filecoin-project/specs-actors/actors/abi" - - "github.com/raulk/clock" - "github.com/stretchr/testify/require" - "go.uber.org/fx/fxtest" -) - -func TestMemJournal_AddEntry(t *testing.T) { - lc := fxtest.NewLifecycle(t) - defer lc.RequireStop() - - clk := clock.NewMock() - build.Clock = clk - - journal := NewMemoryJournal(lc, nil) - addEntries(journal, 100) - - require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond) - - entries := journal.Entries() - cnt := make(map[string]int, 10) - for i, e := range entries { - require.EqualValues(t, "spaceship", e.System) - require.Equal(t, HeadChangeEvt{ - From: types.TipSetKey{}, - FromHeight: abi.ChainEpoch(i), - To: types.TipSetKey{}, - ToHeight: abi.ChainEpoch(i), - RevertCount: i, - ApplyCount: i, - }, e.Data) - require.Equal(t, build.Clock.Now(), e.Timestamp) - cnt[e.Event]++ - } - - // we received 10 entries of each event type. - for _, c := range cnt { - require.Equal(t, 10, c) - } -} - -func TestMemJournal_Close(t *testing.T) { - lc := fxtest.NewLifecycle(t) - defer lc.RequireStop() - - journal := NewMemoryJournal(lc, nil) - addEntries(journal, 100) - - require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond) - - o1 := journal.Observe(context.TODO(), false) - o2 := journal.Observe(context.TODO(), false) - o3 := journal.Observe(context.TODO(), false) - - time.Sleep(500 * time.Millisecond) - - // Close the journal. - require.NoError(t, journal.Close()) - - time.Sleep(500 * time.Millisecond) - -NextChannel: - for _, ch := range []<-chan *Event{o1, o2, o3} { - for { - select { - case _, more := <-ch: - if more { - // keep consuming - } else { - continue NextChannel - } - default: - t.Fatal("nothing more to consume, and channel is not closed") - } - } - } -} - -func TestMemJournal_Clear(t *testing.T) { - lc := fxtest.NewLifecycle(t) - defer lc.RequireStop() - - journal := NewMemoryJournal(lc, nil) - addEntries(journal, 100) - - require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond) - - journal.Clear() - require.Empty(t, journal.Entries()) - require.Empty(t, journal.Entries()) - require.Empty(t, journal.Entries()) -} - -func TestMemJournal_Observe(t *testing.T) { - lc := fxtest.NewLifecycle(t) - defer lc.RequireStop() - - journal := NewMemoryJournal(lc, nil) - addEntries(journal, 100) - - require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond) - - et1 := journal.RegisterEventType("spaceship", "wheezing-1") - et2 := journal.RegisterEventType("spaceship", "wheezing-2") - - o1 := journal.Observe(context.TODO(), false, et1) - o2 := journal.Observe(context.TODO(), true, et1, et2) - o3 := journal.Observe(context.TODO(), true) - - time.Sleep(1 * time.Second) - - require.Len(t, o1, 0) // no replay - require.Len(t, o2, 20) // replay with include set - require.Len(t, o3, 100) // replay with no include set (all entries) - - // add another 100 entries and assert what the observers have seen. - addEntries(journal, 100) - - require.Eventually(t, func() bool { return len(journal.Entries()) == 200 }, 1*time.Second, 100*time.Millisecond) - - // note: we're able to queue items because the observer channel buffer size is 256. - require.Len(t, o1, 10) // should have 0 old entries + 10 new entries - require.Len(t, o2, 40) // should have 20 old entries + 20 new entries - require.Len(t, o3, 200) // should have 100 old entries + 100 new entries -} - -func TestMemJournal_ObserverCancellation(t *testing.T) { - lc := fxtest.NewLifecycle(t) - defer lc.RequireStop() - - journal := NewMemoryJournal(lc, nil) - - ctx, cancel := context.WithCancel(context.TODO()) - o1 := journal.Observe(ctx, false) - o2 := journal.Observe(context.TODO(), false) - addEntries(journal, 100) - - require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond) - - // all observers have received the 100 entries. - require.Len(t, o1, 100) - require.Len(t, o2, 100) - - // cancel o1's context. - cancel() - time.Sleep(500 * time.Millisecond) - - // add 50 new entries - addEntries(journal, 50) - - require.Eventually(t, func() bool { return len(journal.Entries()) == 150 }, 1*time.Second, 100*time.Millisecond) - - require.Len(t, o1, 100) // has not moved. - require.Len(t, o2, 150) // should have 100 old entries + 50 new entries -} - -func addEntries(journal *MemJournal, count int) { - for i := 0; i < count; i++ { - eventIdx := i % 10 - - // RegisterEventType is not _really_ intended to be used this way (on every write). - et := journal.RegisterEventType("spaceship", fmt.Sprintf("wheezing-%d", eventIdx)) - journal.RecordEvent(et, HeadChangeEvt{ - From: types.TipSetKey{}, - FromHeight: abi.ChainEpoch(i), - To: types.TipSetKey{}, - ToHeight: abi.ChainEpoch(i), - RevertCount: i, - ApplyCount: i, - }) - } -} diff --git a/journal/sugar.go b/journal/sugar.go deleted file mode 100644 index 069434916..000000000 --- a/journal/sugar.go +++ /dev/null @@ -1,17 +0,0 @@ -package journal - -// MaybeRecordEvent is a convenience function that evaluates if the EventType is -// enabled, and if so, it calls the supplier to create the event and -// subsequently journal.RecordEvent on the provided journal to record it. -// -// This is safe to call with a nil Journal, either because the value is nil, -// or because a journal obtained through NilJournal() is in use. -func MaybeRecordEvent(journal Journal, evtType EventType, supplier func() interface{}) { - if journal == nil || journal == nilj { - return - } - if !evtType.Enabled() { - return - } - journal.RecordEvent(evtType, supplier()) -} diff --git a/journal/types.go b/journal/types.go index b1c36b515..29954d9cf 100644 --- a/journal/types.go +++ b/journal/types.go @@ -29,7 +29,17 @@ type EventType struct { // All event types are enabled by default, and specific event types can only // be disabled at Journal construction time. func (et EventType) Enabled() bool { - return et.enabled + return et.safe && et.enabled +} + +// EventTypeFactory is a component that constructs tracked EventType tokens, +// for usage with a Journal. +type EventTypeFactory interface { + // RegisterEventType introduces a new event type to a journal, and + // returns an EventType token that components can later use to check whether + // journalling for that type is enabled/suppressed, and to tag journal + // entries appropriately. + RegisterEventType(system, event string) EventType } // Journal represents an audit trail of system actions. @@ -41,11 +51,7 @@ func (et EventType) Enabled() bool { // For cleanliness and type safety, we recommend to use typed events. See the // *Evt struct types in this package for more info. type Journal interface { - // RegisterEventType introduces a new event type to this journal, and - // returns an EventType token that components can later use to check whether - // journalling for that type is enabled/suppressed, and to tag journal - // entries appropriately. - RegisterEventType(system, event string) EventType + EventTypeFactory // RecordEvent records this event to the journal. See godocs on the Journal type // for more info. @@ -65,6 +71,22 @@ type Event struct { Data interface{} } +// MaybeRecordEvent is a convenience function that evaluates if the EventType is +// enabled, and if so, it calls the supplier to create the event and +// subsequently journal.RecordEvent on the provided journal to record it. +// +// This is safe to call with a nil Journal, either because the value is nil, +// or because a journal obtained through NilJournal() is in use. +func MaybeRecordEvent(journal Journal, evtType EventType, supplier func() interface{}) { + if journal == nil || journal == nilj { + return + } + if !evtType.Enabled() { + return + } + journal.RecordEvent(evtType, supplier()) +} + // eventTypeFactory is an embeddable mixin that takes care of tracking disabled // event types, and returning initialized/safe EventTypes when requested. type eventTypeFactory struct { @@ -73,7 +95,9 @@ type eventTypeFactory struct { m map[string]EventType } -func newEventTypeFactory(disabled DisabledEvents) *eventTypeFactory { +var _ EventTypeFactory = (*eventTypeFactory)(nil) + +func NewEventTypeFactory(disabled DisabledEvents) EventTypeFactory { ret := &eventTypeFactory{ m: make(map[string]EventType, len(disabled)+32), // + extra capacity. }