refactor EventType construction.
This commit is contained in:
parent
695f6cfe45
commit
bef7d64e66
@ -21,7 +21,7 @@ var log = logging.Logger("journal")
|
|||||||
|
|
||||||
// fsJournal is a basic journal backed by files on a filesystem.
|
// fsJournal is a basic journal backed by files on a filesystem.
|
||||||
type fsJournal struct {
|
type fsJournal struct {
|
||||||
disabledTracker
|
*eventTypeFactory
|
||||||
|
|
||||||
dir string
|
dir string
|
||||||
sizeLimit int64
|
sizeLimit int64
|
||||||
@ -44,11 +44,11 @@ func OpenFSJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled []EventType) (J
|
|||||||
}
|
}
|
||||||
|
|
||||||
f := &fsJournal{
|
f := &fsJournal{
|
||||||
disabledTracker: newDisabledTracker(disabled),
|
eventTypeFactory: newEventTypeFactory(disabled),
|
||||||
dir: dir,
|
dir: dir,
|
||||||
sizeLimit: 1 << 30,
|
sizeLimit: 1 << 30,
|
||||||
incoming: make(chan *Entry, 32),
|
incoming: make(chan *Entry, 32),
|
||||||
closing: make(chan struct{}),
|
closing: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := f.rollJournalFile(); err != nil {
|
if err := f.rollJournalFile(); err != nil {
|
||||||
|
@ -21,7 +21,7 @@ type (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type MemJournal struct {
|
type MemJournal struct {
|
||||||
disabledTracker
|
*eventTypeFactory
|
||||||
|
|
||||||
entries []*Entry
|
entries []*Entry
|
||||||
index map[string]map[string][]*Entry
|
index map[string]map[string][]*Entry
|
||||||
@ -52,7 +52,7 @@ func (o *observer) dispatch(entry *Entry) {
|
|||||||
|
|
||||||
func NewMemoryJournal(lc fx.Lifecycle, disabled []EventType) *MemJournal {
|
func NewMemoryJournal(lc fx.Lifecycle, disabled []EventType) *MemJournal {
|
||||||
m := &MemJournal{
|
m := &MemJournal{
|
||||||
disabledTracker: newDisabledTracker(disabled),
|
eventTypeFactory: newEventTypeFactory(disabled),
|
||||||
|
|
||||||
index: make(map[string]map[string][]*Entry, 16),
|
index: make(map[string]map[string][]*Entry, 16),
|
||||||
observers: make([]observer, 0, 16),
|
observers: make([]observer, 0, 16),
|
||||||
@ -72,6 +72,11 @@ func NewMemoryJournal(lc fx.Lifecycle, disabled []EventType) *MemJournal {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *MemJournal) AddEntry(evtType EventType, obj interface{}) {
|
func (m *MemJournal) AddEntry(evtType EventType, obj interface{}) {
|
||||||
|
if !evtType.enabled || !evtType.safe {
|
||||||
|
// tried to record a disabled event type, or used an invalid EventType.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
entry := &Entry{
|
entry := &Entry{
|
||||||
EventType: evtType,
|
EventType: evtType,
|
||||||
Timestamp: build.Clock.Now(),
|
Timestamp: build.Clock.Now(),
|
||||||
@ -109,8 +114,12 @@ func (m *MemJournal) Clear() {
|
|||||||
func (m *MemJournal) Observe(ctx context.Context, replay bool, include ...EventType) <-chan *Entry {
|
func (m *MemJournal) Observe(ctx context.Context, replay bool, include ...EventType) <-chan *Entry {
|
||||||
var acc map[EventType]struct{}
|
var acc map[EventType]struct{}
|
||||||
if include != nil {
|
if include != nil {
|
||||||
acc = make(map[EventType]struct{}, 16)
|
acc = make(map[EventType]struct{}, len(include))
|
||||||
for _, et := range include {
|
for _, et := range include {
|
||||||
|
if !et.enabled {
|
||||||
|
// skip over disabled event type.
|
||||||
|
continue
|
||||||
|
}
|
||||||
acc[et] = struct{}{}
|
acc[et] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -111,8 +111,11 @@ func TestMemJournal_Observe(t *testing.T) {
|
|||||||
|
|
||||||
require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond)
|
require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond)
|
||||||
|
|
||||||
o1 := journal.Observe(context.TODO(), false, EventType{"spaceship", "wheezing-1"})
|
et1 := journal.RegisterEventType("spaceship", "wheezing-1")
|
||||||
o2 := journal.Observe(context.TODO(), true, EventType{"spaceship", "wheezing-1"}, EventType{"spaceship", "wheezing-2"})
|
et2 := journal.RegisterEventType("spaceship", "wheezing-2")
|
||||||
|
|
||||||
|
o1 := journal.Observe(context.TODO(), false, et1)
|
||||||
|
o2 := journal.Observe(context.TODO(), true, et1, et2)
|
||||||
o3 := journal.Observe(context.TODO(), true)
|
o3 := journal.Observe(context.TODO(), true)
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
@ -160,13 +163,15 @@ func TestMemJournal_ObserverCancellation(t *testing.T) {
|
|||||||
|
|
||||||
require.Len(t, o1, 100) // has not moved.
|
require.Len(t, o1, 100) // has not moved.
|
||||||
require.Len(t, o2, 150) // should have 100 old entries + 50 new entries
|
require.Len(t, o2, 150) // should have 100 old entries + 50 new entries
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func addEntries(journal *MemJournal, count int) {
|
func addEntries(journal *MemJournal, count int) {
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
eventIdx := i % 10
|
eventIdx := i % 10
|
||||||
journal.AddEntry(EventType{"spaceship", fmt.Sprintf("wheezing-%d", eventIdx)}, HeadChangeEvt{
|
|
||||||
|
// RegisterEventType is not _really_ intended to be used this way (on every write).
|
||||||
|
et := journal.RegisterEventType("spaceship", fmt.Sprintf("wheezing-%d", eventIdx))
|
||||||
|
journal.AddEntry(et, HeadChangeEvt{
|
||||||
From: types.TipSetKey{},
|
From: types.TipSetKey{},
|
||||||
FromHeight: abi.ChainEpoch(i),
|
FromHeight: abi.ChainEpoch(i),
|
||||||
To: types.TipSetKey{},
|
To: types.TipSetKey{},
|
||||||
|
@ -1,11 +1,32 @@
|
|||||||
package journal
|
package journal
|
||||||
|
|
||||||
import "time"
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
// EventType represents the signature of an event.
|
// EventType represents the signature of an event.
|
||||||
type EventType struct {
|
type EventType struct {
|
||||||
System string
|
System string
|
||||||
Event string
|
Event string
|
||||||
|
|
||||||
|
// enabled stores whether this event type is enabled.
|
||||||
|
enabled bool
|
||||||
|
|
||||||
|
// safe is a sentinel marker that's set to true if this EventType was
|
||||||
|
// constructed correctly (via Journal#RegisterEventType).
|
||||||
|
safe bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enabled returns whether this event type is enabled in the journaling
|
||||||
|
// subsystem. Users are advised to check this before actually attempting to
|
||||||
|
// add a journal entry, as it helps bypass object construction for events that
|
||||||
|
// would be discarded anyway.
|
||||||
|
//
|
||||||
|
// All event types are enabled by default, and specific event types can only
|
||||||
|
// be disabled at Journal construction time.
|
||||||
|
func (et EventType) Enabled() bool {
|
||||||
|
return et.enabled
|
||||||
}
|
}
|
||||||
|
|
||||||
// Journal represents an audit trail of system actions.
|
// Journal represents an audit trail of system actions.
|
||||||
@ -17,12 +38,11 @@ type EventType struct {
|
|||||||
// For cleanliness and type safety, we recommend to use typed events. See the
|
// For cleanliness and type safety, we recommend to use typed events. See the
|
||||||
// *Evt struct types in this package for more info.
|
// *Evt struct types in this package for more info.
|
||||||
type Journal interface {
|
type Journal interface {
|
||||||
// IsEnabled allows components to check if a given event type is enabled.
|
// RegisterEventType introduces a new event type to this journal, and
|
||||||
// All event types are enabled by default, and specific event types can only
|
// returns an EventType token that components can later use to check whether
|
||||||
// be disabled at construction type. Components are advised to check if the
|
// journalling for that type is enabled/suppressed, and to tag journal
|
||||||
// journal event types they record are enabled as soon as possible, and hold
|
// entries appropriately.
|
||||||
// on to the answer all throughout the lifetime of the process.
|
RegisterEventType(system, event string) EventType
|
||||||
IsEnabled(evtType EventType) bool
|
|
||||||
|
|
||||||
// AddEntry adds an entry to this journal. See godocs on the Journal type
|
// AddEntry adds an entry to this journal. See godocs on the Journal type
|
||||||
// for more info.
|
// for more info.
|
||||||
@ -42,19 +62,43 @@ type Entry struct {
|
|||||||
Data interface{}
|
Data interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// disabledTracker is an embeddable mixin that takes care of tracking disabled
|
// eventTypeFactory is an embeddable mixin that takes care of tracking disabled
|
||||||
// event types.
|
// event types, and returning initialized/safe EventTypes when requested.
|
||||||
type disabledTracker map[EventType]struct{}
|
type eventTypeFactory struct {
|
||||||
|
sync.Mutex
|
||||||
|
|
||||||
func newDisabledTracker(disabled []EventType) disabledTracker {
|
m map[string]EventType
|
||||||
dis := make(map[EventType]struct{}, len(disabled))
|
}
|
||||||
for _, et := range disabled {
|
|
||||||
dis[et] = struct{}{}
|
func newEventTypeFactory(disabled []EventType) *eventTypeFactory {
|
||||||
|
ret := &eventTypeFactory{
|
||||||
|
m: make(map[string]EventType, len(disabled)+32), // + extra capacity.
|
||||||
}
|
}
|
||||||
return dis
|
|
||||||
|
for _, et := range disabled {
|
||||||
|
et.enabled, et.safe = false, true
|
||||||
|
ret.m[et.System+":"+et.Event] = et
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d disabledTracker) IsEnabled(evtType EventType) bool {
|
func (d *eventTypeFactory) RegisterEventType(system, event string) EventType {
|
||||||
_, ok := d[evtType]
|
d.Lock()
|
||||||
return !ok
|
defer d.Unlock()
|
||||||
|
|
||||||
|
key := system + ":" + event
|
||||||
|
if et, ok := d.m[key]; ok {
|
||||||
|
return et
|
||||||
|
}
|
||||||
|
|
||||||
|
et := EventType{
|
||||||
|
System: system,
|
||||||
|
Event: event,
|
||||||
|
enabled: true,
|
||||||
|
safe: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
d.m[key] = et
|
||||||
|
return et
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user