364 lines
9.3 KiB
Go
364 lines
9.3 KiB
Go
package processor
|
|
|
|
import (
|
|
"context"
|
|
"strconv"
|
|
"time"
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/lotus/chain/events/state"
|
|
)
|
|
|
|
func (p *Processor) setupMarket() error {
|
|
tx, err := p.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := tx.Exec(`
|
|
create table if not exists market_deal_proposals
|
|
(
|
|
deal_id bigint not null,
|
|
|
|
state_root text not null,
|
|
|
|
piece_cid text not null,
|
|
padded_piece_size bigint not null,
|
|
unpadded_piece_size bigint not null,
|
|
is_verified bool not null,
|
|
|
|
client_id text not null,
|
|
provider_id text not null,
|
|
|
|
start_epoch bigint not null,
|
|
end_epoch bigint not null,
|
|
slashed_epoch bigint,
|
|
storage_price_per_epoch text not null,
|
|
|
|
provider_collateral text not null,
|
|
client_collateral text not null,
|
|
|
|
constraint market_deal_proposal_pk
|
|
primary key (deal_id)
|
|
);
|
|
|
|
create table if not exists market_deal_states
|
|
(
|
|
deal_id bigint not null,
|
|
|
|
sector_start_epoch bigint not null,
|
|
last_update_epoch bigint not null,
|
|
slash_epoch bigint not null,
|
|
|
|
state_root text not null,
|
|
|
|
unique (deal_id, sector_start_epoch, last_update_epoch, slash_epoch),
|
|
|
|
constraint market_deal_states_pk
|
|
primary key (deal_id, state_root)
|
|
|
|
);
|
|
|
|
create table if not exists minerid_dealid_sectorid
|
|
(
|
|
deal_id bigint not null
|
|
constraint sectors_sector_ids_id_fk
|
|
references market_deal_proposals(deal_id),
|
|
|
|
sector_id bigint not null,
|
|
miner_id text not null,
|
|
foreign key (sector_id, miner_id) references sector_precommit_info(sector_id, miner_id),
|
|
|
|
constraint miner_sector_deal_ids_pk
|
|
primary key (miner_id, sector_id, deal_id)
|
|
);
|
|
|
|
`); err != nil {
|
|
return err
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
type marketActorInfo struct {
|
|
common actorInfo
|
|
}
|
|
|
|
func (p *Processor) HandleMarketChanges(ctx context.Context, marketTips ActorTips) error {
|
|
marketChanges, err := p.processMarket(ctx, marketTips)
|
|
if err != nil {
|
|
log.Fatalw("Failed to process market actors", "error", err)
|
|
}
|
|
|
|
if err := p.persistMarket(ctx, marketChanges); err != nil {
|
|
log.Fatalw("Failed to persist market actors", "error", err)
|
|
}
|
|
|
|
// we persist the dealID <--> minerID,sectorID here since the dealID needs to be stored above first
|
|
if err := p.storePreCommitDealInfo(p.sectorDealEvents); err != nil {
|
|
close(p.sectorDealEvents)
|
|
return err
|
|
}
|
|
|
|
if err := p.updateMarket(ctx, marketChanges); err != nil {
|
|
log.Fatalw("Failed to update market actors", "error", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *Processor) processMarket(ctx context.Context, marketTips ActorTips) ([]marketActorInfo, error) {
|
|
start := time.Now()
|
|
defer func() {
|
|
log.Debugw("Processed Market", "duration", time.Since(start).String())
|
|
}()
|
|
|
|
var out []marketActorInfo
|
|
for _, markets := range marketTips {
|
|
for _, mt := range markets {
|
|
// NB: here is where we can extract the market state when we need it.
|
|
out = append(out, marketActorInfo{common: mt})
|
|
}
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (p *Processor) persistMarket(ctx context.Context, info []marketActorInfo) error {
|
|
start := time.Now()
|
|
defer func() {
|
|
log.Debugw("Persisted Market", "duration", time.Since(start).String())
|
|
}()
|
|
|
|
grp, ctx := errgroup.WithContext(ctx)
|
|
|
|
grp.Go(func() error {
|
|
if err := p.storeMarketActorDealProposals(ctx, info); err != nil {
|
|
return xerrors.Errorf("Failed to store marker deal proposals: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
grp.Go(func() error {
|
|
if err := p.storeMarketActorDealStates(info); err != nil {
|
|
return xerrors.Errorf("Failed to store marker deal states: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
return grp.Wait()
|
|
|
|
}
|
|
|
|
func (p *Processor) updateMarket(ctx context.Context, info []marketActorInfo) error {
|
|
if err := p.updateMarketActorDealProposals(ctx, info); err != nil {
|
|
return xerrors.Errorf("Failed to update market info: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *Processor) storeMarketActorDealStates(marketTips []marketActorInfo) error {
|
|
start := time.Now()
|
|
defer func() {
|
|
log.Debugw("Stored Market Deal States", "duration", time.Since(start).String())
|
|
}()
|
|
tx, err := p.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, err := tx.Exec(`create temp table mds (like market_deal_states excluding constraints) on commit drop;`); err != nil {
|
|
return err
|
|
}
|
|
stmt, err := tx.Prepare(`copy mds (deal_id, sector_start_epoch, last_update_epoch, slash_epoch, state_root) from STDIN`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, mt := range marketTips {
|
|
dealStates, err := p.node.StateMarketDeals(context.TODO(), mt.common.tsKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for dealID, ds := range dealStates {
|
|
id, err := strconv.ParseUint(dealID, 10, 64)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := stmt.Exec(
|
|
id,
|
|
ds.State.SectorStartEpoch,
|
|
ds.State.LastUpdatedEpoch,
|
|
ds.State.SlashEpoch,
|
|
mt.common.stateroot.String(),
|
|
); err != nil {
|
|
return err
|
|
}
|
|
|
|
}
|
|
}
|
|
if err := stmt.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := tx.Exec(`insert into market_deal_states select * from mds on conflict do nothing`); err != nil {
|
|
return err
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
func (p *Processor) storeMarketActorDealProposals(ctx context.Context, marketTips []marketActorInfo) error {
|
|
start := time.Now()
|
|
defer func() {
|
|
log.Debugw("Stored Market Deal Proposals", "duration", time.Since(start).String())
|
|
}()
|
|
tx, err := p.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := tx.Exec(`create temp table mdp (like market_deal_proposals excluding constraints) on commit drop;`); err != nil {
|
|
return xerrors.Errorf("prep temp: %w", err)
|
|
}
|
|
|
|
stmt, err := tx.Prepare(`copy mdp (deal_id, state_root, piece_cid, padded_piece_size, unpadded_piece_size, is_verified, client_id, provider_id, start_epoch, end_epoch, slashed_epoch, storage_price_per_epoch, provider_collateral, client_collateral) from STDIN`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// insert in sorted order (lowest height -> highest height) since dealid is pk of table.
|
|
for _, mt := range marketTips {
|
|
dealStates, err := p.node.StateMarketDeals(ctx, mt.common.tsKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for dealID, ds := range dealStates {
|
|
id, err := strconv.ParseUint(dealID, 10, 64)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := stmt.Exec(
|
|
id,
|
|
mt.common.stateroot.String(),
|
|
ds.Proposal.PieceCID.String(),
|
|
ds.Proposal.PieceSize,
|
|
ds.Proposal.PieceSize.Unpadded(),
|
|
ds.Proposal.VerifiedDeal,
|
|
ds.Proposal.Client.String(),
|
|
ds.Proposal.Provider.String(),
|
|
ds.Proposal.StartEpoch,
|
|
ds.Proposal.EndEpoch,
|
|
nil, // slashed_epoch
|
|
ds.Proposal.StoragePricePerEpoch.String(),
|
|
ds.Proposal.ProviderCollateral.String(),
|
|
ds.Proposal.ClientCollateral.String(),
|
|
); err != nil {
|
|
return err
|
|
}
|
|
|
|
}
|
|
}
|
|
if err := stmt.Close(); err != nil {
|
|
return err
|
|
}
|
|
if _, err := tx.Exec(`insert into market_deal_proposals select * from mdp on conflict do nothing`); err != nil {
|
|
return err
|
|
}
|
|
|
|
return tx.Commit()
|
|
|
|
}
|
|
|
|
func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error {
|
|
tx, err := p.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil {
|
|
return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err)
|
|
}
|
|
|
|
stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`)
|
|
if err != nil {
|
|
return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err)
|
|
}
|
|
|
|
for sde := range dealEvents {
|
|
for _, did := range sde.DealIDs {
|
|
if _, err := stmt.Exec(
|
|
uint64(did),
|
|
sde.MinerID.String(),
|
|
sde.SectorID,
|
|
); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := stmt.Close(); err != nil {
|
|
return xerrors.Errorf("Failed to close miner sector deals statement: %w", err)
|
|
}
|
|
|
|
if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil {
|
|
return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err)
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return xerrors.Errorf("Failed to commit miner deal sector table: %w", err)
|
|
}
|
|
return nil
|
|
|
|
}
|
|
|
|
func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTip []marketActorInfo) error {
|
|
start := time.Now()
|
|
defer func() {
|
|
log.Debugw("Updated Market Deal Proposals", "duration", time.Since(start).String())
|
|
}()
|
|
pred := state.NewStatePredicates(p.node)
|
|
|
|
tx, err := p.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
stmt, err := tx.Prepare(`update market_deal_proposals set slashed_epoch=$1 where deal_id=$2`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, mt := range marketTip {
|
|
stateDiff := pred.OnStorageMarketActorChanged(pred.OnDealStateChanged(pred.OnDealStateAmtChanged()))
|
|
|
|
changed, val, err := stateDiff(ctx, mt.common.parentTsKey, mt.common.tsKey)
|
|
if err != nil {
|
|
log.Warnw("error getting market deal state diff", "error", err)
|
|
}
|
|
if !changed {
|
|
continue
|
|
}
|
|
changes, ok := val.(*state.MarketDealStateChanges)
|
|
if !ok {
|
|
return xerrors.Errorf("Unknown type returned by Deal State AMT predicate: %T", val)
|
|
}
|
|
|
|
for _, modified := range changes.Modified {
|
|
if modified.From.SlashEpoch != modified.To.SlashEpoch {
|
|
if _, err := stmt.Exec(modified.To.SlashEpoch, modified.ID); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if err := stmt.Close(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|