perf(chainwatch): parallelize miner processing

This commit is contained in:
frrist 2020-08-28 10:31:39 -07:00
parent a7ef61279c
commit 0185090cbb

View File

@ -318,100 +318,107 @@ func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerA
return xerrors.Errorf("Failed to prepare miner precommit info statement: %w", err) return xerrors.Errorf("Failed to prepare miner precommit info statement: %w", err)
} }
grp, _ := errgroup.WithContext(ctx)
for _, m := range miners { for _, m := range miners {
minerSectors, err := adt.AsArray(p.ctxStore, m.state.Sectors) m := m
if err != nil { grp.Go(func() error {
return err minerSectors, err := adt.AsArray(p.ctxStore, m.state.Sectors)
} if err != nil {
changes, err := p.getMinerPreCommitChanges(ctx, m)
if err != nil {
if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) {
continue
} else {
return err return err
} }
}
if changes == nil {
continue
}
preCommitAdded := make([]uint64, len(changes.Added)) changes, err := p.getMinerPreCommitChanges(ctx, m)
for i, added := range changes.Added { if err != nil {
if len(added.Info.DealIDs) > 0 { if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) {
sectorDeals <- &SectorDealEvent{ return nil
MinerID: m.common.addr,
SectorID: uint64(added.Info.SectorNumber),
DealIDs: added.Info.DealIDs,
} }
}
if added.Info.ReplaceCapacity {
if _, err := stmt.Exec(
m.common.addr.String(),
added.Info.SectorNumber,
added.Info.SealedCID.String(),
m.common.stateroot.String(),
added.Info.SealRandEpoch,
added.Info.Expiration,
added.PreCommitDeposit.String(),
added.PreCommitEpoch,
added.DealWeight.String(),
added.VerifiedDealWeight.String(),
added.Info.ReplaceCapacity,
added.Info.ReplaceSectorDeadline,
added.Info.ReplaceSectorPartition,
added.Info.ReplaceSectorNumber,
); err != nil {
return err
}
} else {
if _, err := stmt.Exec(
m.common.addr.String(),
added.Info.SectorNumber,
added.Info.SealedCID.String(),
m.common.stateroot.String(),
added.Info.SealRandEpoch,
added.Info.Expiration,
added.PreCommitDeposit.String(),
added.PreCommitEpoch,
added.DealWeight.String(),
added.VerifiedDealWeight.String(),
added.Info.ReplaceCapacity,
nil, // replace deadline
nil, // replace partition
nil, // replace sector
); err != nil {
return err
}
}
preCommitAdded[i] = uint64(added.Info.SectorNumber)
}
if len(preCommitAdded) > 0 {
sectorEvents <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: preCommitAdded,
Event: PreCommitAdded,
}
}
var preCommitExpired []uint64
for _, removed := range changes.Removed {
var sector miner.SectorOnChainInfo
if found, err := minerSectors.Get(uint64(removed.Info.SectorNumber), &sector); err != nil {
return err return err
} else if !found {
preCommitExpired = append(preCommitExpired, uint64(removed.Info.SectorNumber))
} }
} if changes == nil {
if len(preCommitExpired) > 0 { return nil
sectorEvents <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: preCommitExpired,
Event: PreCommitExpired,
} }
}
preCommitAdded := make([]uint64, len(changes.Added))
for i, added := range changes.Added {
if len(added.Info.DealIDs) > 0 {
sectorDeals <- &SectorDealEvent{
MinerID: m.common.addr,
SectorID: uint64(added.Info.SectorNumber),
DealIDs: added.Info.DealIDs,
}
}
if added.Info.ReplaceCapacity {
if _, err := stmt.Exec(
m.common.addr.String(),
added.Info.SectorNumber,
added.Info.SealedCID.String(),
m.common.stateroot.String(),
added.Info.SealRandEpoch,
added.Info.Expiration,
added.PreCommitDeposit.String(),
added.PreCommitEpoch,
added.DealWeight.String(),
added.VerifiedDealWeight.String(),
added.Info.ReplaceCapacity,
added.Info.ReplaceSectorDeadline,
added.Info.ReplaceSectorPartition,
added.Info.ReplaceSectorNumber,
); err != nil {
return err
}
} else {
if _, err := stmt.Exec(
m.common.addr.String(),
added.Info.SectorNumber,
added.Info.SealedCID.String(),
m.common.stateroot.String(),
added.Info.SealRandEpoch,
added.Info.Expiration,
added.PreCommitDeposit.String(),
added.PreCommitEpoch,
added.DealWeight.String(),
added.VerifiedDealWeight.String(),
added.Info.ReplaceCapacity,
nil, // replace deadline
nil, // replace partition
nil, // replace sector
); err != nil {
return err
}
}
preCommitAdded[i] = uint64(added.Info.SectorNumber)
}
if len(preCommitAdded) > 0 {
sectorEvents <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: preCommitAdded,
Event: PreCommitAdded,
}
}
var preCommitExpired []uint64
for _, removed := range changes.Removed {
var sector miner.SectorOnChainInfo
if found, err := minerSectors.Get(uint64(removed.Info.SectorNumber), &sector); err != nil {
return err
} else if !found {
preCommitExpired = append(preCommitExpired, uint64(removed.Info.SectorNumber))
}
}
if len(preCommitExpired) > 0 {
sectorEvents <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: preCommitExpired,
Event: PreCommitExpired,
}
}
return nil
})
}
if err := grp.Wait(); err != nil {
return err
} }
if err := stmt.Close(); err != nil { if err := stmt.Close(); err != nil {
@ -443,67 +450,75 @@ func (p *Processor) storeMinerSectorInfo(ctx context.Context, miners []minerActo
return xerrors.Errorf("Failed to prepare miner sector info statement: %w", err) return xerrors.Errorf("Failed to prepare miner sector info statement: %w", err)
} }
grp, _ := errgroup.WithContext(ctx)
for _, m := range miners { for _, m := range miners {
changes, err := p.getMinerSectorChanges(ctx, m) m := m
if err != nil { grp.Go(func() error {
if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) { changes, err := p.getMinerSectorChanges(ctx, m)
continue if err != nil {
} else { if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) {
return nil
}
return err return err
} }
} if changes == nil {
if changes == nil { return nil
continue
}
var sectorsAdded []uint64
var ccAdded []uint64
var extended []uint64
for _, added := range changes.Added {
// add the sector to the table
if _, err := stmt.Exec(
m.common.addr.String(),
added.SectorNumber,
added.SealedCID.String(),
m.common.stateroot.String(),
added.Activation.String(),
added.Expiration.String(),
added.DealWeight.String(),
added.VerifiedDealWeight.String(),
added.InitialPledge.String(),
added.ExpectedDayReward.String(),
added.ExpectedStoragePledge.String(),
); err != nil {
return err
} }
if len(added.DealIDs) == 0 { var sectorsAdded []uint64
ccAdded = append(ccAdded, uint64(added.SectorNumber)) var ccAdded []uint64
} else { var extended []uint64
sectorsAdded = append(sectorsAdded, uint64(added.SectorNumber)) for _, added := range changes.Added {
// add the sector to the table
if _, err := stmt.Exec(
m.common.addr.String(),
added.SectorNumber,
added.SealedCID.String(),
m.common.stateroot.String(),
added.Activation.String(),
added.Expiration.String(),
added.DealWeight.String(),
added.VerifiedDealWeight.String(),
added.InitialPledge.String(),
added.ExpectedDayReward.String(),
added.ExpectedStoragePledge.String(),
); err != nil {
log.Errorw("writing miner sector changes statement", "error", err.Error())
}
if len(added.DealIDs) == 0 {
ccAdded = append(ccAdded, uint64(added.SectorNumber))
} else {
sectorsAdded = append(sectorsAdded, uint64(added.SectorNumber))
}
} }
}
for _, mod := range changes.Extended { for _, mod := range changes.Extended {
extended = append(extended, uint64(mod.To.SectorNumber)) extended = append(extended, uint64(mod.To.SectorNumber))
} }
events <- &MinerSectorsEvent{ events <- &MinerSectorsEvent{
MinerID: m.common.addr, MinerID: m.common.addr,
StateRoot: m.common.stateroot, StateRoot: m.common.stateroot,
SectorIDs: ccAdded, SectorIDs: ccAdded,
Event: CommitCapacityAdded, Event: CommitCapacityAdded,
} }
events <- &MinerSectorsEvent{ events <- &MinerSectorsEvent{
MinerID: m.common.addr, MinerID: m.common.addr,
StateRoot: m.common.stateroot, StateRoot: m.common.stateroot,
SectorIDs: sectorsAdded, SectorIDs: sectorsAdded,
Event: SectorAdded, Event: SectorAdded,
} }
events <- &MinerSectorsEvent{ events <- &MinerSectorsEvent{
MinerID: m.common.addr, MinerID: m.common.addr,
StateRoot: m.common.stateroot, StateRoot: m.common.stateroot,
SectorIDs: extended, SectorIDs: extended,
Event: SectorExtended, Event: SectorExtended,
} }
return nil
})
}
if err := grp.Wait(); err != nil {
return err
} }
if err := stmt.Close(); err != nil { if err := stmt.Close(); err != nil {