fix(chainwatch): fix deadlock in sector deals
This commit is contained in:
parent
fb8340acb2
commit
5e40dda3bc
@ -96,12 +96,6 @@ func (p *Processor) HandleMarketChanges(ctx context.Context, marketTips ActorTip
|
|||||||
log.Fatalw("Failed to persist market actors", "error", err)
|
log.Fatalw("Failed to persist market actors", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// we persist the dealID <--> minerID,sectorID here since the dealID needs to be stored above first
|
|
||||||
if err := p.storePreCommitDealInfo(p.sectorDealEvents); err != nil {
|
|
||||||
close(p.sectorDealEvents)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := p.updateMarket(ctx, marketChanges); err != nil {
|
if err := p.updateMarket(ctx, marketChanges); err != nil {
|
||||||
log.Fatalw("Failed to update market actors", "error", err)
|
log.Fatalw("Failed to update market actors", "error", err)
|
||||||
}
|
}
|
||||||
@ -272,48 +266,6 @@ func (p *Processor) storeMarketActorDealProposals(ctx context.Context, marketTip
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error {
|
|
||||||
tx, err := p.db.Begin()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil {
|
|
||||||
return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`)
|
|
||||||
if err != nil {
|
|
||||||
return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for sde := range dealEvents {
|
|
||||||
for _, did := range sde.DealIDs {
|
|
||||||
if _, err := stmt.Exec(
|
|
||||||
uint64(did),
|
|
||||||
sde.MinerID.String(),
|
|
||||||
sde.SectorID,
|
|
||||||
); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := stmt.Close(); err != nil {
|
|
||||||
return xerrors.Errorf("Failed to close miner sector deals statement: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil {
|
|
||||||
return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := tx.Commit(); err != nil {
|
|
||||||
return xerrors.Errorf("Failed to commit miner deal sector table: %w", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTip []marketActorInfo) error {
|
func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTip []marketActorInfo) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -271,7 +271,11 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo)
|
|||||||
preCommitEvents := make(chan *MinerSectorsEvent, 8)
|
preCommitEvents := make(chan *MinerSectorsEvent, 8)
|
||||||
sectorEvents := make(chan *MinerSectorsEvent, 8)
|
sectorEvents := make(chan *MinerSectorsEvent, 8)
|
||||||
partitionEvents := make(chan *MinerSectorsEvent, 8)
|
partitionEvents := make(chan *MinerSectorsEvent, 8)
|
||||||
p.sectorDealEvents = make(chan *SectorDealEvent, 8)
|
dealEvents := make(chan *SectorDealEvent, 8)
|
||||||
|
|
||||||
|
grp.Go(func() error {
|
||||||
|
return p.storePreCommitDealInfo(dealEvents)
|
||||||
|
})
|
||||||
|
|
||||||
grp.Go(func() error {
|
grp.Go(func() error {
|
||||||
return p.storeMinerSectorEvents(ctx, sectorEvents, preCommitEvents, partitionEvents)
|
return p.storeMinerSectorEvents(ctx, sectorEvents, preCommitEvents, partitionEvents)
|
||||||
@ -280,9 +284,9 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo)
|
|||||||
grp.Go(func() error {
|
grp.Go(func() error {
|
||||||
defer func() {
|
defer func() {
|
||||||
close(preCommitEvents)
|
close(preCommitEvents)
|
||||||
close(p.sectorDealEvents)
|
close(dealEvents)
|
||||||
}()
|
}()
|
||||||
return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, p.sectorDealEvents)
|
return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, dealEvents)
|
||||||
})
|
})
|
||||||
|
|
||||||
grp.Go(func() error {
|
grp.Go(func() error {
|
||||||
@ -911,6 +915,48 @@ func (p *Processor) storeMinersActorInfoState(ctx context.Context, miners []mine
|
|||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error {
|
||||||
|
tx, err := p.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil {
|
||||||
|
return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for sde := range dealEvents {
|
||||||
|
for _, did := range sde.DealIDs {
|
||||||
|
if _, err := stmt.Exec(
|
||||||
|
uint64(did),
|
||||||
|
sde.MinerID.String(),
|
||||||
|
sde.SectorID,
|
||||||
|
); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := stmt.Close(); err != nil {
|
||||||
|
return xerrors.Errorf("Failed to close miner sector deals statement: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil {
|
||||||
|
return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return xerrors.Errorf("Failed to commit miner deal sector table: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Processor) storeMinersPower(miners []minerActorInfo) error {
|
func (p *Processor) storeMinersPower(miners []minerActorInfo) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -35,9 +35,6 @@ type Processor struct {
|
|||||||
|
|
||||||
// number of blocks processed at a time
|
// number of blocks processed at a time
|
||||||
batch int
|
batch int
|
||||||
|
|
||||||
// process communication channels
|
|
||||||
sectorDealEvents chan *SectorDealEvent
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type ActorTips map[types.TipSetKey][]actorInfo
|
type ActorTips map[types.TipSetKey][]actorInfo
|
||||||
|
Loading…
Reference in New Issue
Block a user