polish: update deal proposal table if slashed

This commit is contained in:
frrist 2020-07-10 16:55:39 -07:00
parent f9d8b051f4
commit c9cb39d5a0
2 changed files with 72 additions and 5 deletions

View File

@ -361,13 +361,14 @@ create table if not exists market_deal_proposals
piece_cid text not null,
padded_piece_size bigint not null,
unpadded_piece_size bigint not null,
verified_deal bool not null,
is_verified bool not null,
client_id text not null,
provider_id text not null,
start_epoch bigint not null,
end_epoch bigint not null,
slashed_epoch bigint,
storage_price_per_epoch text not null,
provider_collateral text not null,
@ -387,8 +388,10 @@ create table if not exists market_deal_states
state_root text not null,
unique (deal_id, sector_start_epoch, last_update_epoch, slash_epoch),
constraint market_deal_states_pk
primary key (deal_id,sector_start_epoch,last_update_epoch,slash_epoch)
primary key (deal_id, state_root)
);
@ -949,6 +952,10 @@ func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStat
}
func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error {
start := time.Now()
defer func() {
log.Infow("Stored Market Deal States", "duration", time.Since(start).String())
}()
tx, err := st.db.Begin()
if err != nil {
return err
@ -962,7 +969,6 @@ func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*ma
}
for _, th := range tipHeights {
mt := marketTips[th.tsKey]
log.Infow("store deal state", "height", th.height, "tipset", th.tsKey, "minerTS", mt.tsKey)
dealStates, err := api.StateMarketDeals(context.TODO(), mt.tsKey)
if err != nil {
return err
@ -998,6 +1004,10 @@ func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*ma
}
func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error {
start := time.Now()
defer func() {
log.Infow("Stored Market Deal Proposals", "duration", time.Since(start).String())
}()
tx, err := st.db.Begin()
if err != nil {
return err
@ -1007,7 +1017,7 @@ func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey]
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy mdp (deal_id, state_root, piece_cid, padded_piece_size, unpadded_piece_size, verified_deal, client_id, provider_id, start_epoch, end_epoch, storage_price_per_epoch, provider_collateral, client_collateral) from STDIN`)
stmt, err := tx.Prepare(`copy mdp (deal_id, state_root, piece_cid, padded_piece_size, unpadded_piece_size, is_verified, client_id, provider_id, start_epoch, end_epoch, slashed_epoch, storage_price_per_epoch, provider_collateral, client_collateral) from STDIN`)
if err != nil {
return err
}
@ -1037,6 +1047,7 @@ func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey]
ds.Proposal.Provider.String(),
ds.Proposal.StartEpoch,
ds.Proposal.EndEpoch,
nil, // slashed_epoch
ds.Proposal.StoragePricePerEpoch.String(),
ds.Proposal.ProviderCollateral.String(),
ds.Proposal.ClientCollateral.String(),
@ -1057,6 +1068,55 @@ func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey]
}
func (st *storage) updateMarketActorDealProposals(marketTip map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error {
start := time.Now()
defer func() {
log.Infow("Updated Market Deal Proposals", "duration", time.Since(start).String())
}()
pred := state.NewStatePredicates(api)
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`update market_deal_proposals set slashed_epoch=$1 where deal_id=$2`)
if err != nil {
return err
}
for _, th := range tipHeights {
mt := marketTip[th.tsKey]
stateDiff := pred.OnStorageMarketActorChanged(pred.OnDealStateChanged(pred.OnDealStateAmtChanged()))
changed, val, err := stateDiff(context.TODO(), mt.parentTsKey, mt.tsKey)
if err != nil {
log.Warnw("error getting market deal state diff", "error", err)
}
if !changed {
continue
}
changes, ok := val.(*state.MarketDealStateChanges)
if !ok {
return xerrors.Errorf("Unknown type returned by Deal State AMT predicate: %T", val)
}
for _, modified := range changes.Modified {
if modified.From.SlashEpoch != modified.To.SlashEpoch {
if _, err := stmt.Exec(modified.To.SlashEpoch, modified.ID); err != nil {
return err
}
}
}
}
if err := stmt.Close(); err != nil {
return err
}
return tx.Commit()
}
func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error {
st.headerLk.Lock()
defer st.headerLk.Unlock()

View File

@ -555,17 +555,24 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
return
}
log.Info("Storing market actor info")
log.Info("Storing market actor deal proposal info")
if err := st.storeMarketActorDealProposals(marketActorChanges, tipHeights, api); err != nil {
log.Error(err)
return
}
log.Info("Storing market actor deal state info")
if err := st.storeMarketActorDealStates(marketActorChanges, tipHeights, api); err != nil {
log.Error(err)
return
}
log.Info("Updating market actor deal proposal info")
if err := st.updateMarketActorDealProposals(marketActorChanges, tipHeights, api); err != nil {
log.Error(err)
return
}
log.Infof("Storing messages")
if err := st.storeMessages(msgs); err != nil {