Merge pull request #2331 from filecoin-project/chainwatch/market-state-deal-tracking

feat: add deal state and proposal tracking to cw
This commit is contained in:
Łukasz Magiera 2020-07-15 16:19:10 +02:00 committed by GitHub
commit ccba495840
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 311 additions and 17 deletions

View File

@ -3,6 +3,7 @@ package main
import (
"context"
"database/sql"
"strconv"
"sync"
"time"
@ -330,8 +331,15 @@ create table if not exists miner_sectors_heads
primary key (miner_id,miner_sectors_cid)
);
create type miner_sector_event_type as enum ('ADDED', 'EXTENDED', 'EXPIRED', 'TERMINATED');
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'miner_sector_event_type') THEN
CREATE TYPE miner_sector_event_type AS ENUM
(
'ADDED','EXTENDED', 'EXPIRED', 'TERMINATED'
);
END IF;
END$$;
create table if not exists miner_sector_events
(
@ -342,7 +350,50 @@ create table if not exists miner_sector_events
constraint miner_sector_events_pk
primary key (sector_id, event, miner_id, state_root)
)
);
create table if not exists market_deal_proposals
(
deal_id bigint not null,
state_root text not null,
piece_cid text not null,
padded_piece_size bigint not null,
unpadded_piece_size bigint not null,
is_verified bool not null,
client_id text not null,
provider_id text not null,
start_epoch bigint not null,
end_epoch bigint not null,
slashed_epoch bigint,
storage_price_per_epoch text not null,
provider_collateral text not null,
client_collateral text not null,
constraint market_deal_proposal_pk
primary key (deal_id)
);
create table if not exists market_deal_states
(
deal_id bigint not null,
sector_start_epoch bigint not null,
last_update_epoch bigint not null,
slash_epoch bigint not null,
state_root text not null,
unique (deal_id, sector_start_epoch, last_update_epoch, slash_epoch),
constraint market_deal_states_pk
primary key (deal_id, state_root)
);
/*
create or replace function miner_tips(epoch bigint)
@ -852,7 +903,7 @@ func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStat
}
for _, added := range changes.Added {
if _, err := eventStmt.Exec(miner.addr.String(), added.Info.SectorNumber, miner.stateroot.String(), "ADDED"); err != nil {
if _, err := eventStmt.Exec(added.Info.SectorNumber, "ADDED", miner.addr.String(), miner.stateroot.String()); err != nil {
return err
}
}
@ -900,6 +951,172 @@ func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStat
return updateTx.Commit()
}
func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error {
start := time.Now()
defer func() {
log.Infow("Stored Market Deal States", "duration", time.Since(start).String())
}()
tx, err := st.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`create temp table mds (like market_deal_states excluding constraints) on commit drop;`); err != nil {
return err
}
stmt, err := tx.Prepare(`copy mds (deal_id, sector_start_epoch, last_update_epoch, slash_epoch, state_root) from STDIN`)
if err != nil {
return err
}
for _, th := range tipHeights {
mt := marketTips[th.tsKey]
dealStates, err := api.StateMarketDeals(context.TODO(), mt.tsKey)
if err != nil {
return err
}
for dealID, ds := range dealStates {
id, err := strconv.ParseUint(dealID, 10, 64)
if err != nil {
return err
}
if _, err := stmt.Exec(
id,
ds.State.SectorStartEpoch,
ds.State.LastUpdatedEpoch,
ds.State.SlashEpoch,
mt.stateroot.String(),
); err != nil {
return err
}
}
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into market_deal_states select * from mds on conflict do nothing`); err != nil {
return err
}
return tx.Commit()
}
func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error {
start := time.Now()
defer func() {
log.Infow("Stored Market Deal Proposals", "duration", time.Since(start).String())
}()
tx, err := st.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`create temp table mdp (like market_deal_proposals excluding constraints) on commit drop;`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy mdp (deal_id, state_root, piece_cid, padded_piece_size, unpadded_piece_size, is_verified, client_id, provider_id, start_epoch, end_epoch, slashed_epoch, storage_price_per_epoch, provider_collateral, client_collateral) from STDIN`)
if err != nil {
return err
}
// insert in sorted order (lowest height -> highest height) since dealid is pk of table.
for _, th := range tipHeights {
mt := marketTips[th.tsKey]
dealStates, err := api.StateMarketDeals(context.TODO(), mt.tsKey)
if err != nil {
return err
}
for dealID, ds := range dealStates {
id, err := strconv.ParseUint(dealID, 10, 64)
if err != nil {
return err
}
if _, err := stmt.Exec(
id,
mt.stateroot.String(),
ds.Proposal.PieceCID.String(),
ds.Proposal.PieceSize,
ds.Proposal.PieceSize.Unpadded(),
ds.Proposal.VerifiedDeal,
ds.Proposal.Client.String(),
ds.Proposal.Provider.String(),
ds.Proposal.StartEpoch,
ds.Proposal.EndEpoch,
nil, // slashed_epoch
ds.Proposal.StoragePricePerEpoch.String(),
ds.Proposal.ProviderCollateral.String(),
ds.Proposal.ClientCollateral.String(),
); err != nil {
return err
}
}
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into market_deal_proposals select * from mdp on conflict do nothing`); err != nil {
return err
}
return tx.Commit()
}
func (st *storage) updateMarketActorDealProposals(marketTip map[types.TipSetKey]*marketStateInfo, tipHeights []tipsetKeyHeight, api api.FullNode) error {
start := time.Now()
defer func() {
log.Infow("Updated Market Deal Proposals", "duration", time.Since(start).String())
}()
pred := state.NewStatePredicates(api)
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`update market_deal_proposals set slashed_epoch=$1 where deal_id=$2`)
if err != nil {
return err
}
for _, th := range tipHeights {
mt := marketTip[th.tsKey]
stateDiff := pred.OnStorageMarketActorChanged(pred.OnDealStateChanged(pred.OnDealStateAmtChanged()))
changed, val, err := stateDiff(context.TODO(), mt.parentTsKey, mt.tsKey)
if err != nil {
log.Warnw("error getting market deal state diff", "error", err)
}
if !changed {
continue
}
changes, ok := val.(*state.MarketDealStateChanges)
if !ok {
return xerrors.Errorf("Unknown type returned by Deal State AMT predicate: %T", val)
}
for _, modified := range changes.Modified {
if modified.From.SlashEpoch != modified.To.SlashEpoch {
if _, err := stmt.Exec(modified.To.SlashEpoch, modified.ID); err != nil {
return err
}
}
}
}
if err := stmt.Close(); err != nil {
return err
}
return tx.Commit()
}
func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error {
st.headerLk.Lock()
defer st.headerLk.Unlock()

View File

@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"math"
"sort"
"sync"
"time"
@ -18,6 +19,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
"github.com/filecoin-project/specs-actors/actors/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/builtin/power"
"github.com/filecoin-project/specs-actors/actors/builtin/reward"
@ -81,6 +83,20 @@ type minerStateInfo struct {
psize uint64
}
type marketStateInfo struct {
// common
act types.Actor
stateroot cid.Cid
// calculating changes
// calculating changes
tsKey types.TipSetKey
parentTsKey types.TipSetKey
// market actor specific
state market.State
}
type actorInfo struct {
stateroot cid.Cid
tsKey types.TipSetKey
@ -88,6 +104,11 @@ type actorInfo struct {
state string
}
type tipsetKeyHeight struct {
height abi.ChainEpoch
tsKey types.TipSetKey
}
func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.TipSet, maxBatch int) {
var alk sync.Mutex
@ -169,6 +190,9 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
log.Infow("Starting Sync", "height", minH, "numBlocks", len(toSync), "maxBatch", maxBatch)
// relate tipset keys to height so they may be processed in ascending order.
var tipHeights []tipsetKeyHeight
tipsSeen := make(map[types.TipSetKey]struct{})
// map of addresses to changed actors
var changes map[string]types.Actor
// collect all actor state that has changes between block headers
@ -276,9 +300,20 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
parentTsKey: pts.Parents(),
}
addressToID[addr] = address.Undef
if _, ok := tipsSeen[pts.Key()]; !ok {
tipHeights = append(tipHeights, tipsetKeyHeight{
height: pts.Height(),
tsKey: pts.Key(),
})
}
tipsSeen[pts.Key()] = struct{}{}
alk.Unlock()
}
})
// sort tipHeights in ascending order.
sort.Slice(tipHeights, func(i, j int) bool {
return tipHeights[i].height < tipHeights[j].height
})
// map of tipset to reward state
rewardTips := make(map[types.TipSetKey]*rewardStateInfo, len(changes))
@ -313,23 +348,28 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
log.Infof("Getting actor change info")
// highly likely that the market actor will change at every epoch
marketActorChanges := make(map[types.TipSetKey]*marketStateInfo, len(changes))
minerChanges := 0
for addr, m := range actors {
for actor, c := range m {
// reward actor
if actor.Code == builtin.RewardActorCodeID {
rewardTips[c.tsKey] = &rewardStateInfo{
stateroot: c.stateroot,
baselinePower: big.Zero(),
}
// only want actors with head change events
if _, found := headsSeen[actor.Head]; found {
continue
}
headsSeen[actor.Head] = struct{}{}
// miner actors with head change events
if actor.Code == builtin.StorageMinerActorCodeID {
if _, found := headsSeen[actor.Head]; found {
continue
switch actor.Code {
case builtin.StorageMarketActorCodeID:
marketActorChanges[c.tsKey] = &marketStateInfo{
act: actor,
stateroot: c.stateroot,
tsKey: c.tsKey,
parentTsKey: c.parentTsKey,
state: market.State{},
}
case builtin.StorageMinerActorCodeID:
minerChanges++
minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{
@ -346,10 +386,14 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
rawPower: big.Zero(),
qalPower: big.Zero(),
})
headsSeen[actor.Head] = struct{}{}
// reward actor
case builtin.RewardActorCodeID:
rewardTips[c.tsKey] = &rewardStateInfo{
stateroot: c.stateroot,
baselinePower: big.Zero(),
}
}
continue
}
}
@ -435,6 +479,21 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
})
log.Infow("Completed Miner Processing", "duration", time.Since(minerProcessingStartedAt).String(), "processed", minerChanges)
log.Info("Getting market actor info")
// TODO: consider taking the min of the array length and using that for concurrency param, e.g:
// concurrency := math.Min(len(marketActorChanges), 50)
parmap.Par(50, parmap.MapArr(marketActorChanges), func(mrktInfo *marketStateInfo) {
astb, err := api.ChainReadObj(ctx, mrktInfo.act.Head)
if err != nil {
log.Error(err)
return
}
if err := mrktInfo.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil {
log.Error(err)
return
}
})
log.Info("Getting receipts")
receipts := fetchParentReceipts(ctx, api, toSync)
@ -496,6 +555,24 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
return
}
log.Info("Storing market actor deal proposal info")
if err := st.storeMarketActorDealProposals(marketActorChanges, tipHeights, api); err != nil {
log.Error(err)
return
}
log.Info("Storing market actor deal state info")
if err := st.storeMarketActorDealStates(marketActorChanges, tipHeights, api); err != nil {
log.Error(err)
return
}
log.Info("Updating market actor deal proposal info")
if err := st.updateMarketActorDealProposals(marketActorChanges, tipHeights, api); err != nil {
log.Error(err)
return
}
log.Infof("Storing messages")
if err := st.storeMessages(msgs); err != nil {