Merge pull request #794 from filecoin-project/feat/parallel-tss-caching
dont waste work when calling ComputeTipSetState in parallel
This commit is contained in:
commit
ed0324bcc8
@ -28,6 +28,7 @@ type StateManager struct {
|
||||
cs *store.ChainStore
|
||||
|
||||
stCache map[string][]cid.Cid
|
||||
compWait map[string]chan struct{}
|
||||
stlk sync.Mutex
|
||||
}
|
||||
|
||||
@ -35,6 +36,7 @@ func NewStateManager(cs *store.ChainStore) *StateManager {
|
||||
return &StateManager{
|
||||
cs: cs,
|
||||
stCache: make(map[string][]cid.Cid),
|
||||
compWait: make(map[string]chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
@ -46,7 +48,7 @@ func cidsToKey(cids []cid.Cid) string {
|
||||
return out
|
||||
}
|
||||
|
||||
func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (cid.Cid, cid.Cid, error) {
|
||||
func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (st cid.Cid, rec cid.Cid, err error) {
|
||||
ctx, span := trace.StartSpan(ctx, "tipSetState")
|
||||
defer span.End()
|
||||
if span.IsRecordingEvents() {
|
||||
@ -55,12 +57,37 @@ func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (cid.
|
||||
|
||||
ck := cidsToKey(ts.Cids())
|
||||
sm.stlk.Lock()
|
||||
cached, ok := sm.stCache[ck]
|
||||
cw, cwok := sm.compWait[ck]
|
||||
if cwok {
|
||||
sm.stlk.Unlock()
|
||||
span.AddAttributes(trace.BoolAttribute("waited", true))
|
||||
select {
|
||||
case <-cw:
|
||||
sm.stlk.Lock()
|
||||
case <-ctx.Done():
|
||||
return cid.Undef, cid.Undef, ctx.Err()
|
||||
}
|
||||
}
|
||||
cached, ok := sm.stCache[ck]
|
||||
if ok {
|
||||
sm.stlk.Unlock()
|
||||
span.AddAttributes(trace.BoolAttribute("cache", true))
|
||||
return cached[0], cached[1], nil
|
||||
}
|
||||
ch := make(chan struct{})
|
||||
sm.compWait[ck] = ch
|
||||
|
||||
defer func() {
|
||||
sm.stlk.Lock()
|
||||
delete(sm.compWait, ck)
|
||||
if st != cid.Undef {
|
||||
sm.stCache[ck] = []cid.Cid{st, rec}
|
||||
}
|
||||
sm.stlk.Unlock()
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
sm.stlk.Unlock()
|
||||
|
||||
if ts.Height() == 0 {
|
||||
// NB: This is here because the process that executes blocks requires that the
|
||||
@ -70,14 +97,11 @@ func (sm *StateManager) TipSetState(ctx context.Context, ts *types.TipSet) (cid.
|
||||
return ts.Blocks()[0].ParentStateRoot, ts.Blocks()[0].ParentMessageReceipts, nil
|
||||
}
|
||||
|
||||
st, rec, err := sm.computeTipSetState(ctx, ts.Blocks(), nil)
|
||||
st, rec, err = sm.computeTipSetState(ctx, ts.Blocks(), nil)
|
||||
if err != nil {
|
||||
return cid.Undef, cid.Undef, err
|
||||
}
|
||||
|
||||
sm.stlk.Lock()
|
||||
sm.stCache[ck] = []cid.Cid{st, rec}
|
||||
sm.stlk.Unlock()
|
||||
return st, rec, nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user