Merge pull request #3380 from filecoin-project/fix/chainwatch/perf

perf(chainwatch): parallelize miner processing
This commit is contained in:
Łukasz Magiera 2020-08-29 02:51:46 +02:00 committed by GitHub
commit e86f90fe8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -318,7 +318,10 @@ func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerA
return xerrors.Errorf("Failed to prepare miner precommit info statement: %w", err)
}
grp, _ := errgroup.WithContext(ctx)
for _, m := range miners {
m := m
grp.Go(func() error {
minerSectors, err := adt.AsArray(p.ctxStore, m.state.Sectors)
if err != nil {
return err
@ -327,13 +330,12 @@ func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerA
changes, err := p.getMinerPreCommitChanges(ctx, m)
if err != nil {
if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) {
continue
} else {
return nil
}
return err
}
}
if changes == nil {
continue
return nil
}
preCommitAdded := make([]uint64, len(changes.Added))
@ -412,6 +414,11 @@ func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerA
Event: PreCommitExpired,
}
}
return nil
})
}
if err := grp.Wait(); err != nil {
return err
}
if err := stmt.Close(); err != nil {
@ -443,17 +450,19 @@ func (p *Processor) storeMinerSectorInfo(ctx context.Context, miners []minerActo
return xerrors.Errorf("Failed to prepare miner sector info statement: %w", err)
}
grp, _ := errgroup.WithContext(ctx)
for _, m := range miners {
m := m
grp.Go(func() error {
changes, err := p.getMinerSectorChanges(ctx, m)
if err != nil {
if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) {
continue
} else {
return nil
}
return err
}
}
if changes == nil {
continue
return nil
}
var sectorsAdded []uint64
var ccAdded []uint64
@ -473,7 +482,7 @@ func (p *Processor) storeMinerSectorInfo(ctx context.Context, miners []minerActo
added.ExpectedDayReward.String(),
added.ExpectedStoragePledge.String(),
); err != nil {
return err
log.Errorw("writing miner sector changes statement", "error", err.Error())
}
if len(added.DealIDs) == 0 {
ccAdded = append(ccAdded, uint64(added.SectorNumber))
@ -504,6 +513,12 @@ func (p *Processor) storeMinerSectorInfo(ctx context.Context, miners []minerActo
SectorIDs: extended,
Event: SectorExtended,
}
return nil
})
}
if err := grp.Wait(); err != nil {
return err
}
if err := stmt.Close(); err != nil {