diff --git a/chain/index/interface.go b/chain/index/interface.go index a6c07d4be..f15774196 100644 --- a/chain/index/interface.go +++ b/chain/index/interface.go @@ -9,6 +9,7 @@ import ( ) var ErrNotFound = errors.New("message not found") +var ErrClosed = errors.New("index closed") // MsgInfo is the Message metadata the index tracks. type MsgInfo struct { diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index 704145b89..79f85af33 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -53,7 +53,10 @@ type msgIndex struct { mx sync.Mutex pend []headChange - cancel func() + cancel func() + workers sync.WaitGroup + closeLk sync.RWMutex + closed bool } var _ MsgIndex = (*msgIndex)(nil) @@ -141,6 +144,7 @@ opendb: ) cs.SubscribeHeadChanges(rnf) + msgIndex.workers.Add(1) go msgIndex.background(ctx) return msgIndex, nil @@ -251,6 +255,13 @@ func (x *msgIndex) prepareStatements() error { // head change notifee func (x *msgIndex) onHeadChange(rev, app []*types.TipSet) error { + x.closeLk.RLock() + defer x.closeLk.RUnlock() + + if x.closed { + return nil + } + // do it in the background to avoid blocking head change processing x.mx.Lock() x.pend = append(x.pend, headChange{rev: rev, app: app}) @@ -266,6 +277,8 @@ func (x *msgIndex) onHeadChange(rev, app []*types.TipSet) error { } func (x *msgIndex) background(ctx context.Context) { + defer x.workers.Done() + for { select { case <-x.sema: @@ -373,6 +386,13 @@ func (x *msgIndex) doApply(ctx context.Context, ts *types.TipSet) error { // interface func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) { + x.closeLk.RLock() + defer x.closeLk.RUnlock() + + if x.closed { + return MsgInfo{}, ErrClosed + } + var ( tipset string epoch int64 @@ -404,6 +424,17 @@ func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) { } func (x *msgIndex) Close() error { - // TODO - return errors.New("TODO: msgIndex.Close") + x.closeLk.Lock() + defer x.closeLk.Unlock() + + if x.closed { + return nil + } + + x.closed = true + + x.cancel() + x.workers.Wait() + + return x.db.Close() }