diff --git a/cmd/lotus-chainwatch/processor/market.go b/cmd/lotus-chainwatch/processor/market.go index 11f02271e..426005ac3 100644 --- a/cmd/lotus-chainwatch/processor/market.go +++ b/cmd/lotus-chainwatch/processor/market.go @@ -61,6 +61,20 @@ create table if not exists market_deal_states ); +create table if not exists minerid_dealid_sectorid +( + deal_id bigint not null + constraint sectors_sector_ids_id_fk + references market_deal_proposals(deal_id), + + sector_id bigint not null, + miner_id text not null, + foreign key (sector_id, miner_id) references sector_precommit_info(sector_id, miner_id), + + constraint miner_sector_deal_ids_pk + primary key (miner_id, sector_id, deal_id) +); + `); err != nil { return err } @@ -82,6 +96,12 @@ func (p *Processor) HandleMarketChanges(ctx context.Context, marketTips ActorTip log.Fatalw("Failed to persist market actors", "error", err) } + // we persist the dealID <--> minerID,sectorID here since the dealID needs to be stored above first + if err := p.storePreCommitDealInfo(p.sectorDealEvents); err != nil { + close(p.sectorDealEvents) + return err + } + if err := p.updateMarket(ctx, marketChanges); err != nil { log.Fatalw("Failed to update market actors", "error", err) } @@ -252,6 +272,48 @@ func (p *Processor) storeMarketActorDealProposals(ctx context.Context, marketTip } +func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error { + tx, err := p.db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil { + return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err) + } + + stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`) + if err != nil { + return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err) + } + + for sde := range dealEvents { + for _, did := range sde.DealIDs { + if _, err := stmt.Exec( + uint64(did), + sde.MinerID.String(), + sde.SectorID, + ); err != nil { + return err + } + } + } + + if err := stmt.Close(); err != nil { + return xerrors.Errorf("Failed to close miner sector deals statement: %w", err) + } + + if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil { + return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err) + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("Failed to commit miner deal sector table: %w", err) + } + return nil + +} + func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTip []marketActorInfo) error { start := time.Now() defer func() { diff --git a/cmd/lotus-chainwatch/processor/miner.go b/cmd/lotus-chainwatch/processor/miner.go index be8f0449b..882d7eed7 100644 --- a/cmd/lotus-chainwatch/processor/miner.go +++ b/cmd/lotus-chainwatch/processor/miner.go @@ -67,6 +67,8 @@ create table if not exists sector_precommit_info replace_sector_partition bigint, replace_sector_number bigint, + unique (miner_id, sector_id), + constraint sector_precommit_info_pk primary key (miner_id, sector_id, sealed_cid) @@ -157,6 +159,12 @@ type MinerSectorsEvent struct { Event SectorLifecycleEvent } +type SectorDealEvent struct { + MinerID address.Address + SectorID uint64 + DealIDs []abi.DealID +} + type PartitionStatus struct { Terminated *abi.BitField Expired *abi.BitField @@ -261,14 +269,18 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) preCommitEvents := make(chan *MinerSectorsEvent, 8) sectorEvents := make(chan *MinerSectorsEvent, 8) partitionEvents := make(chan *MinerSectorsEvent, 8) + p.sectorDealEvents = make(chan *SectorDealEvent, 8) grp.Go(func() error { return p.storeMinerSectorEvents(ctx, sectorEvents, preCommitEvents, partitionEvents) }) grp.Go(func() error { - defer close(preCommitEvents) - return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents) + defer func() { + close(preCommitEvents) + close(p.sectorDealEvents) + }() + return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, p.sectorDealEvents) }) grp.Go(func() error { @@ -284,7 +296,7 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) return grp.Wait() } -func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerActorInfo, events chan<- *MinerSectorsEvent) error { +func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerActorInfo, sectorEvents chan<- *MinerSectorsEvent, sectorDeals chan<- *SectorDealEvent) error { tx, err := p.db.Begin() if err != nil { return err @@ -319,6 +331,13 @@ func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerA preCommitAdded := make([]uint64, len(changes.Added)) for i, added := range changes.Added { + if len(added.Info.DealIDs) > 0 { + sectorDeals <- &SectorDealEvent{ + MinerID: m.common.addr, + SectorID: uint64(added.Info.SectorNumber), + DealIDs: added.Info.DealIDs, + } + } if added.Info.ReplaceCapacity { if _, err := stmt.Exec( m.common.addr.String(), @@ -362,7 +381,7 @@ func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerA preCommitAdded[i] = uint64(added.Info.SectorNumber) } if len(preCommitAdded) > 0 { - events <- &MinerSectorsEvent{ + sectorEvents <- &MinerSectorsEvent{ MinerID: m.common.addr, StateRoot: m.common.stateroot, SectorIDs: preCommitAdded, @@ -379,7 +398,7 @@ func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerA } } if len(preCommitExpired) > 0 { - events <- &MinerSectorsEvent{ + sectorEvents <- &MinerSectorsEvent{ MinerID: m.common.addr, StateRoot: m.common.stateroot, SectorIDs: preCommitExpired, diff --git a/cmd/lotus-chainwatch/processor/processor.go b/cmd/lotus-chainwatch/processor/processor.go index 8c578db37..cab73f070 100644 --- a/cmd/lotus-chainwatch/processor/processor.go +++ b/cmd/lotus-chainwatch/processor/processor.go @@ -36,6 +36,9 @@ type Processor struct { // number of blocks processed at a time batch int + + // process communication channels + sectorDealEvents chan *SectorDealEvent } type ActorTips map[types.TipSetKey][]actorInfo @@ -64,11 +67,12 @@ func NewProcessor(ctx context.Context, db *sql.DB, node api.FullNode, batch int) } func (p *Processor) setupSchemas() error { - if err := p.setupMarket(); err != nil { + // maintain order, subsequent calls create tables with foreign keys. + if err := p.setupMiners(); err != nil { return err } - if err := p.setupMiners(); err != nil { + if err := p.setupMarket(); err != nil { return err }