dont waste work when calling ComputeTipSetState in parallel
This commit is contained in:
parent
d934f03d70
commit
e8c43d00cc
@ -27,14 +27,16 @@ var log = logging.Logger("statemgr")
|
|||||||
type StateManager struct {
|
type StateManager struct {
|
||||||
cs *store.ChainStore
|
cs *store.ChainStore
|
||||||
|
|
||||||
stCache map[string][]cid.Cid
|
stCache map[string][]cid.Cid
|
||||||
stlk sync.Mutex
|
compWait map[string]chan struct{}
|
||||||
|
stlk sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStateManager(cs *store.ChainStore) *StateManager {
|
func NewStateManager(cs *store.ChainStore) *StateManager {
|
||||||
return &StateManager{
|
return &StateManager{
|
||||||
cs: cs,
|
cs: cs,
|
||||||
stCache: make(map[string][]cid.Cid),
|
stCache: make(map[string][]cid.Cid),
|
||||||
|
compWait: make(map[string]chan struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -46,7 +48,7 @@ func cidsToKey(cids []cid.Cid) string {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (cid.Cid, cid.Cid, error) {
|
func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (st cid.Cid, rec cid.Cid, err error) {
|
||||||
ctx, span := trace.StartSpan(ctx, "tipSetState")
|
ctx, span := trace.StartSpan(ctx, "tipSetState")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
if span.IsRecordingEvents() {
|
if span.IsRecordingEvents() {
|
||||||
@ -55,12 +57,37 @@ func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (cid.
|
|||||||
|
|
||||||
ck := cidsToKey(ts.Cids())
|
ck := cidsToKey(ts.Cids())
|
||||||
sm.stlk.Lock()
|
sm.stlk.Lock()
|
||||||
|
cw, cwok := sm.compWait[ck]
|
||||||
|
if cwok {
|
||||||
|
sm.stlk.Unlock()
|
||||||
|
span.AddAttributes(trace.BoolAttribute("waited", true))
|
||||||
|
select {
|
||||||
|
case <-cw:
|
||||||
|
sm.stlk.Lock()
|
||||||
|
case <-ctx.Done():
|
||||||
|
return cid.Undef, cid.Undef, ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
cached, ok := sm.stCache[ck]
|
cached, ok := sm.stCache[ck]
|
||||||
sm.stlk.Unlock()
|
|
||||||
if ok {
|
if ok {
|
||||||
|
sm.stlk.Unlock()
|
||||||
span.AddAttributes(trace.BoolAttribute("cache", true))
|
span.AddAttributes(trace.BoolAttribute("cache", true))
|
||||||
return cached[0], cached[1], nil
|
return cached[0], cached[1], nil
|
||||||
}
|
}
|
||||||
|
ch := make(chan struct{})
|
||||||
|
sm.compWait[ck] = ch
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
sm.stlk.Lock()
|
||||||
|
delete(sm.compWait, ck)
|
||||||
|
if st != cid.Undef {
|
||||||
|
sm.stCache[ck] = []cid.Cid{st, rec}
|
||||||
|
}
|
||||||
|
sm.stlk.Unlock()
|
||||||
|
close(ch)
|
||||||
|
}()
|
||||||
|
|
||||||
|
sm.stlk.Unlock()
|
||||||
|
|
||||||
if ts.Height() == 0 {
|
if ts.Height() == 0 {
|
||||||
// NB: This is here because the process that executes blocks requires that the
|
// NB: This is here because the process that executes blocks requires that the
|
||||||
@ -70,14 +97,11 @@ func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (cid.
|
|||||||
return ts.Blocks()[0].ParentStateRoot, ts.Blocks()[0].ParentMessageReceipts, nil
|
return ts.Blocks()[0].ParentStateRoot, ts.Blocks()[0].ParentMessageReceipts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
st, rec, err := sm.computeTipSetState(ctx, ts.Blocks(), nil)
|
st, rec, err = sm.computeTipSetState(ctx, ts.Blocks(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, cid.Undef, err
|
return cid.Undef, cid.Undef, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sm.stlk.Lock()
|
|
||||||
sm.stCache[ck] = []cid.Cid{st, rec}
|
|
||||||
sm.stlk.Unlock()
|
|
||||||
return st, rec, nil
|
return st, rec, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user