implement Close

This commit is contained in:
vyzo 2023-03-11 22:09:31 +02:00 committed by Maciej Witowski
parent c123fab1da
commit e6a0c5a406
2 changed files with 35 additions and 3 deletions

View File

@ -9,6 +9,7 @@ import (
) )
var ErrNotFound = errors.New("message not found") var ErrNotFound = errors.New("message not found")
var ErrClosed = errors.New("index closed")
// MsgInfo is the Message metadata the index tracks. // MsgInfo is the Message metadata the index tracks.
type MsgInfo struct { type MsgInfo struct {

View File

@ -53,7 +53,10 @@ type msgIndex struct {
mx sync.Mutex mx sync.Mutex
pend []headChange pend []headChange
cancel func() cancel func()
workers sync.WaitGroup
closeLk sync.RWMutex
closed bool
} }
var _ MsgIndex = (*msgIndex)(nil) var _ MsgIndex = (*msgIndex)(nil)
@ -141,6 +144,7 @@ opendb:
) )
cs.SubscribeHeadChanges(rnf) cs.SubscribeHeadChanges(rnf)
msgIndex.workers.Add(1)
go msgIndex.background(ctx) go msgIndex.background(ctx)
return msgIndex, nil return msgIndex, nil
@ -251,6 +255,13 @@ func (x *msgIndex) prepareStatements() error {
// head change notifee // head change notifee
func (x *msgIndex) onHeadChange(rev, app []*types.TipSet) error { func (x *msgIndex) onHeadChange(rev, app []*types.TipSet) error {
x.closeLk.RLock()
defer x.closeLk.RUnlock()
if x.closed {
return nil
}
// do it in the background to avoid blocking head change processing // do it in the background to avoid blocking head change processing
x.mx.Lock() x.mx.Lock()
x.pend = append(x.pend, headChange{rev: rev, app: app}) x.pend = append(x.pend, headChange{rev: rev, app: app})
@ -266,6 +277,8 @@ func (x *msgIndex) onHeadChange(rev, app []*types.TipSet) error {
} }
func (x *msgIndex) background(ctx context.Context) { func (x *msgIndex) background(ctx context.Context) {
defer x.workers.Done()
for { for {
select { select {
case <-x.sema: case <-x.sema:
@ -373,6 +386,13 @@ func (x *msgIndex) doApply(ctx context.Context, ts *types.TipSet) error {
// interface // interface
func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) { func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
x.closeLk.RLock()
defer x.closeLk.RUnlock()
if x.closed {
return MsgInfo{}, ErrClosed
}
var ( var (
tipset string tipset string
epoch int64 epoch int64
@ -404,6 +424,17 @@ func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
} }
func (x *msgIndex) Close() error { func (x *msgIndex) Close() error {
// TODO x.closeLk.Lock()
return errors.New("TODO: msgIndex.Close") defer x.closeLk.Unlock()
if x.closed {
return nil
}
x.closed = true
x.cancel()
x.workers.Wait()
return x.db.Close()
} }