move in-mem journal to Project Oni.

This commit is contained in:
Raúl Kripalani 2020-08-10 15:02:15 +01:00
parent b8475114ba
commit 5074ce5a38
5 changed files with 33 additions and 457 deletions

View File

@ -21,7 +21,7 @@ var log = logging.Logger("journal")
// fsJournal is a basic journal backed by files on a filesystem.
type fsJournal struct {
*eventTypeFactory
EventTypeFactory
dir string
sizeLimit int64
@ -44,7 +44,7 @@ func OpenFSJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled DisabledEvents)
}
f := &fsJournal{
eventTypeFactory: newEventTypeFactory(disabled),
EventTypeFactory: NewEventTypeFactory(disabled),
dir: dir,
sizeLimit: 1 << 30,
incoming: make(chan *Event, 32),

View File

@ -1,248 +0,0 @@
package journal
import (
"context"
"sync/atomic"
"go.uber.org/fx"
"github.com/filecoin-project/lotus/build"
)
// Control messages.
type (
clearCtrl struct{}
addObserverCtrl struct {
observer *observer
replay bool
}
rmObserverCtrl *observer
getEntriesCtrl chan []*Event
)
type MemJournal struct {
*eventTypeFactory
entries []*Event
index map[string]map[string][]*Event
observers []observer
incomingCh chan *Event
controlCh chan interface{}
state int32 // guarded by atomic; 0=closed, 1=running.
closed chan struct{}
}
var _ Journal = (*MemJournal)(nil)
type observer struct {
accept map[EventType]struct{}
ch chan *Event
}
func (o *observer) dispatch(entry *Event) {
if o.accept == nil {
o.ch <- entry
}
if _, ok := o.accept[entry.EventType]; ok {
o.ch <- entry
}
}
func NewMemoryJournal(lc fx.Lifecycle, disabled DisabledEvents) *MemJournal {
m := &MemJournal{
eventTypeFactory: newEventTypeFactory(disabled),
index: make(map[string]map[string][]*Event, 16),
observers: make([]observer, 0, 16),
incomingCh: make(chan *Event, 256),
controlCh: make(chan interface{}, 16),
state: 1,
closed: make(chan struct{}),
}
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error { return m.Close() },
})
go m.process()
return m
}
func (m *MemJournal) RecordEvent(evtType EventType, obj interface{}) {
if !evtType.enabled || !evtType.safe {
// tried to record a disabled event type, or used an invalid EventType.
return
}
entry := &Event{
EventType: evtType,
Timestamp: build.Clock.Now(),
Data: obj,
}
select {
case m.incomingCh <- entry:
case <-m.closed:
}
}
func (m *MemJournal) Close() error {
if !atomic.CompareAndSwapInt32(&m.state, 1, 0) {
// already closed.
return nil
}
close(m.closed)
return nil
}
func (m *MemJournal) Clear() {
select {
case m.controlCh <- clearCtrl{}:
case <-m.closed:
}
}
// Observe starts observing events that are recorded in the MemJournal, and
// returns a channel where new events will be sent. When replay is true, all
// entries that have been recorded prior to the observer being registered will
// be replayed. To restrict the event types this observer will sent, use the
// include argument. If no include set is passed, the observer will receive all
// events types.
func (m *MemJournal) Observe(ctx context.Context, replay bool, include ...EventType) <-chan *Event {
var acc map[EventType]struct{}
if include != nil {
acc = make(map[EventType]struct{}, len(include))
for _, et := range include {
if !et.enabled {
// skip over disabled event type.
continue
}
acc[et] = struct{}{}
}
}
ch := make(chan *Event, 256)
o := &observer{
accept: acc,
ch: ch,
}
// watch the context, and fire the "remove observer" control message upon
// cancellation.
go func() {
<-ctx.Done()
select {
case m.controlCh <- rmObserverCtrl(o):
case <-m.closed:
}
}()
select {
case m.controlCh <- addObserverCtrl{o, replay}:
case <-m.closed:
// we are already stopped.
close(ch)
}
return ch
}
// Entries gets a snapshot of stored entries.
func (m *MemJournal) Entries() []*Event {
ch := make(chan []*Event)
m.controlCh <- getEntriesCtrl(ch)
return <-ch
}
func (m *MemJournal) process() {
processCtrlMsg := func(message interface{}) {
switch msg := message.(type) {
case addObserverCtrl:
// adding an observer.
m.observers = append(m.observers, *msg.observer)
if msg.replay {
// replay all existing entries.
for _, e := range m.entries {
msg.observer.dispatch(e)
}
}
case rmObserverCtrl:
// removing an observer; find the observer, close its channel.
// then discard it from our list by replacing it with the last
// observer and reslicing.
for i, o := range m.observers {
if o.ch == msg.ch {
close(o.ch)
m.observers[i] = m.observers[len(m.observers)-1]
m.observers = m.observers[:len(m.observers)-1]
}
}
case clearCtrl:
m.entries = m.entries[0:0]
// carry over system and event names; there are unlikely to change;
// just reslice the entry slices, so we are not thrashing memory.
for _, events := range m.index {
for ev := range events {
events[ev] = events[ev][0:0]
}
}
case getEntriesCtrl:
cpy := make([]*Event, len(m.entries))
copy(cpy, m.entries)
msg <- cpy
close(msg)
}
}
processClose := func() {
m.entries = nil
m.index = make(map[string]map[string][]*Event, 16)
for _, o := range m.observers {
close(o.ch)
}
m.observers = nil
}
for {
// Drain all control messages first!
select {
case msg := <-m.controlCh:
processCtrlMsg(msg)
continue
case <-m.closed:
processClose()
return
default:
}
// Now consume and pipe messages.
select {
case entry := <-m.incomingCh:
m.entries = append(m.entries, entry)
events := m.index[entry.System]
if events == nil {
events = make(map[string][]*Event, 16)
m.index[entry.System] = events
}
entries := events[entry.Event]
events[entry.Event] = append(entries, entry)
for _, o := range m.observers {
o.dispatch(entry)
}
case msg := <-m.controlCh:
processCtrlMsg(msg)
continue
case <-m.closed:
processClose()
return
}
}
}

View File

@ -1,183 +0,0 @@
package journal
import (
"context"
"fmt"
"testing"
"time"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/raulk/clock"
"github.com/stretchr/testify/require"
"go.uber.org/fx/fxtest"
)
func TestMemJournal_AddEntry(t *testing.T) {
lc := fxtest.NewLifecycle(t)
defer lc.RequireStop()
clk := clock.NewMock()
build.Clock = clk
journal := NewMemoryJournal(lc, nil)
addEntries(journal, 100)
require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond)
entries := journal.Entries()
cnt := make(map[string]int, 10)
for i, e := range entries {
require.EqualValues(t, "spaceship", e.System)
require.Equal(t, HeadChangeEvt{
From: types.TipSetKey{},
FromHeight: abi.ChainEpoch(i),
To: types.TipSetKey{},
ToHeight: abi.ChainEpoch(i),
RevertCount: i,
ApplyCount: i,
}, e.Data)
require.Equal(t, build.Clock.Now(), e.Timestamp)
cnt[e.Event]++
}
// we received 10 entries of each event type.
for _, c := range cnt {
require.Equal(t, 10, c)
}
}
func TestMemJournal_Close(t *testing.T) {
lc := fxtest.NewLifecycle(t)
defer lc.RequireStop()
journal := NewMemoryJournal(lc, nil)
addEntries(journal, 100)
require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond)
o1 := journal.Observe(context.TODO(), false)
o2 := journal.Observe(context.TODO(), false)
o3 := journal.Observe(context.TODO(), false)
time.Sleep(500 * time.Millisecond)
// Close the journal.
require.NoError(t, journal.Close())
time.Sleep(500 * time.Millisecond)
NextChannel:
for _, ch := range []<-chan *Event{o1, o2, o3} {
for {
select {
case _, more := <-ch:
if more {
// keep consuming
} else {
continue NextChannel
}
default:
t.Fatal("nothing more to consume, and channel is not closed")
}
}
}
}
func TestMemJournal_Clear(t *testing.T) {
lc := fxtest.NewLifecycle(t)
defer lc.RequireStop()
journal := NewMemoryJournal(lc, nil)
addEntries(journal, 100)
require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond)
journal.Clear()
require.Empty(t, journal.Entries())
require.Empty(t, journal.Entries())
require.Empty(t, journal.Entries())
}
func TestMemJournal_Observe(t *testing.T) {
lc := fxtest.NewLifecycle(t)
defer lc.RequireStop()
journal := NewMemoryJournal(lc, nil)
addEntries(journal, 100)
require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond)
et1 := journal.RegisterEventType("spaceship", "wheezing-1")
et2 := journal.RegisterEventType("spaceship", "wheezing-2")
o1 := journal.Observe(context.TODO(), false, et1)
o2 := journal.Observe(context.TODO(), true, et1, et2)
o3 := journal.Observe(context.TODO(), true)
time.Sleep(1 * time.Second)
require.Len(t, o1, 0) // no replay
require.Len(t, o2, 20) // replay with include set
require.Len(t, o3, 100) // replay with no include set (all entries)
// add another 100 entries and assert what the observers have seen.
addEntries(journal, 100)
require.Eventually(t, func() bool { return len(journal.Entries()) == 200 }, 1*time.Second, 100*time.Millisecond)
// note: we're able to queue items because the observer channel buffer size is 256.
require.Len(t, o1, 10) // should have 0 old entries + 10 new entries
require.Len(t, o2, 40) // should have 20 old entries + 20 new entries
require.Len(t, o3, 200) // should have 100 old entries + 100 new entries
}
func TestMemJournal_ObserverCancellation(t *testing.T) {
lc := fxtest.NewLifecycle(t)
defer lc.RequireStop()
journal := NewMemoryJournal(lc, nil)
ctx, cancel := context.WithCancel(context.TODO())
o1 := journal.Observe(ctx, false)
o2 := journal.Observe(context.TODO(), false)
addEntries(journal, 100)
require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond)
// all observers have received the 100 entries.
require.Len(t, o1, 100)
require.Len(t, o2, 100)
// cancel o1's context.
cancel()
time.Sleep(500 * time.Millisecond)
// add 50 new entries
addEntries(journal, 50)
require.Eventually(t, func() bool { return len(journal.Entries()) == 150 }, 1*time.Second, 100*time.Millisecond)
require.Len(t, o1, 100) // has not moved.
require.Len(t, o2, 150) // should have 100 old entries + 50 new entries
}
func addEntries(journal *MemJournal, count int) {
for i := 0; i < count; i++ {
eventIdx := i % 10
// RegisterEventType is not _really_ intended to be used this way (on every write).
et := journal.RegisterEventType("spaceship", fmt.Sprintf("wheezing-%d", eventIdx))
journal.RecordEvent(et, HeadChangeEvt{
From: types.TipSetKey{},
FromHeight: abi.ChainEpoch(i),
To: types.TipSetKey{},
ToHeight: abi.ChainEpoch(i),
RevertCount: i,
ApplyCount: i,
})
}
}

View File

@ -1,17 +0,0 @@
package journal
// MaybeRecordEvent is a convenience function that evaluates if the EventType is
// enabled, and if so, it calls the supplier to create the event and
// subsequently journal.RecordEvent on the provided journal to record it.
//
// This is safe to call with a nil Journal, either because the value is nil,
// or because a journal obtained through NilJournal() is in use.
func MaybeRecordEvent(journal Journal, evtType EventType, supplier func() interface{}) {
if journal == nil || journal == nilj {
return
}
if !evtType.Enabled() {
return
}
journal.RecordEvent(evtType, supplier())
}

View File

@ -29,7 +29,17 @@ type EventType struct {
// All event types are enabled by default, and specific event types can only
// be disabled at Journal construction time.
func (et EventType) Enabled() bool {
return et.enabled
return et.safe && et.enabled
}
// EventTypeFactory is a component that constructs tracked EventType tokens,
// for usage with a Journal.
type EventTypeFactory interface {
// RegisterEventType introduces a new event type to a journal, and
// returns an EventType token that components can later use to check whether
// journalling for that type is enabled/suppressed, and to tag journal
// entries appropriately.
RegisterEventType(system, event string) EventType
}
// Journal represents an audit trail of system actions.
@ -41,11 +51,7 @@ func (et EventType) Enabled() bool {
// For cleanliness and type safety, we recommend to use typed events. See the
// *Evt struct types in this package for more info.
type Journal interface {
// RegisterEventType introduces a new event type to this journal, and
// returns an EventType token that components can later use to check whether
// journalling for that type is enabled/suppressed, and to tag journal
// entries appropriately.
RegisterEventType(system, event string) EventType
EventTypeFactory
// RecordEvent records this event to the journal. See godocs on the Journal type
// for more info.
@ -65,6 +71,22 @@ type Event struct {
Data interface{}
}
// MaybeRecordEvent is a convenience function that evaluates if the EventType is
// enabled, and if so, it calls the supplier to create the event and
// subsequently journal.RecordEvent on the provided journal to record it.
//
// This is safe to call with a nil Journal, either because the value is nil,
// or because a journal obtained through NilJournal() is in use.
func MaybeRecordEvent(journal Journal, evtType EventType, supplier func() interface{}) {
if journal == nil || journal == nilj {
return
}
if !evtType.Enabled() {
return
}
journal.RecordEvent(evtType, supplier())
}
// eventTypeFactory is an embeddable mixin that takes care of tracking disabled
// event types, and returning initialized/safe EventTypes when requested.
type eventTypeFactory struct {
@ -73,7 +95,9 @@ type eventTypeFactory struct {
m map[string]EventType
}
func newEventTypeFactory(disabled DisabledEvents) *eventTypeFactory {
var _ EventTypeFactory = (*eventTypeFactory)(nil)
func NewEventTypeFactory(disabled DisabledEvents) EventTypeFactory {
ret := &eventTypeFactory{
m: make(map[string]EventType, len(disabled)+32), // + extra capacity.
}