feat: add deal state and proposal tracking to cw
This commit is contained in:
parent
39d609a661
commit
e13a251cc8
@ -3,6 +3,7 @@ package main
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -330,8 +331,15 @@ create table if not exists miner_sectors_heads
|
||||
primary key (miner_id,miner_sectors_cid)
|
||||
|
||||
);
|
||||
|
||||
create type miner_sector_event_type as enum ('ADDED', 'EXTENDED', 'EXPIRED', 'TERMINATED');
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'miner_sector_event_type') THEN
|
||||
CREATE TYPE miner_sector_event_type AS ENUM
|
||||
(
|
||||
'ADDED','EXTENDED', 'EXPIRED', 'TERMINATED'
|
||||
);
|
||||
END IF;
|
||||
END$$;
|
||||
|
||||
create table if not exists miner_sector_events
|
||||
(
|
||||
@ -342,7 +350,46 @@ create table if not exists miner_sector_events
|
||||
|
||||
constraint miner_sector_events_pk
|
||||
primary key (sector_id, event, miner_id, state_root)
|
||||
)
|
||||
);
|
||||
|
||||
create table if not exists market_deal_proposals
|
||||
(
|
||||
deal_id bigint not null,
|
||||
|
||||
state_root text not null,
|
||||
|
||||
piece_cid text not null,
|
||||
piece_size bigint not null,
|
||||
verified_deal bool not null,
|
||||
|
||||
client_id text not null,
|
||||
provider_id text not null,
|
||||
|
||||
start_epoch bigint not null,
|
||||
end_epoch bigint not null,
|
||||
storage_price_per_epoch text not null,
|
||||
|
||||
provider_collateral text not null,
|
||||
client_collateral text not null,
|
||||
|
||||
constraint market_deal_proposal_pk
|
||||
primary key (deal_id)
|
||||
);
|
||||
|
||||
create table if not exists market_deal_states
|
||||
(
|
||||
deal_id bigint not null,
|
||||
|
||||
state_root text not null,
|
||||
|
||||
sector_start_epoch bigint not null,
|
||||
last_update_epoch bigint not null,
|
||||
slash_epoch bigint not null,
|
||||
|
||||
constraint market_deal_states_pk
|
||||
primary key (deal_id)
|
||||
|
||||
);
|
||||
|
||||
/*
|
||||
create or replace function miner_tips(epoch bigint)
|
||||
@ -852,7 +899,7 @@ func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStat
|
||||
}
|
||||
|
||||
for _, added := range changes.Added {
|
||||
if _, err := eventStmt.Exec(miner.addr.String(), added.Info.SectorNumber, miner.stateroot.String(), "ADDED"); err != nil {
|
||||
if _, err := eventStmt.Exec(added.Info.SectorNumber, "ADDED", miner.addr.String(), miner.stateroot.String()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -900,6 +947,111 @@ func (st *storage) updateMinerSectors(minerTips map[types.TipSetKey][]*minerStat
|
||||
return updateTx.Commit()
|
||||
}
|
||||
|
||||
func (st *storage) storeMarketActorDealStates(marketTips map[types.TipSetKey]*marketStateInfo, api api.FullNode) error {
|
||||
tx, err := st.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.Exec(`create temp table mds (like market_deal_states excluding constraints) on commit drop;`); err != nil {
|
||||
return err
|
||||
}
|
||||
stmt, err := tx.Prepare(`copy mds (deal_id, state_root, sector_start_epoch, last_update_epoch, slash_epoch) from STDIN`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for tskey, mt := range marketTips {
|
||||
dealStates, err := api.StateMarketDeals(context.TODO(), tskey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for dealID, ds := range dealStates {
|
||||
id, err := strconv.ParseUint(dealID, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := stmt.Exec(
|
||||
id,
|
||||
mt.stateroot.String(),
|
||||
ds.State.SectorStartEpoch,
|
||||
ds.State.LastUpdatedEpoch,
|
||||
ds.State.SlashEpoch,
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`insert into market_deal_states select * from mds on conflict do nothing`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
|
||||
}
|
||||
|
||||
func (st *storage) storeMarketActorDealProposals(marketTips map[types.TipSetKey]*marketStateInfo, api api.FullNode) error {
|
||||
tx, err := st.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`create temp table mdp (like market_deal_proposals excluding constraints) on commit drop;`); err != nil {
|
||||
return xerrors.Errorf("prep temp: %w", err)
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(`copy mdp (deal_id, state_root, piece_cid, piece_size, verified_deal, client_id, provider_id, start_epoch, end_epoch, storage_price_per_epoch, provider_collateral, client_collateral) from STDIN`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for tskey, mt := range marketTips {
|
||||
dealStates, err := api.StateMarketDeals(context.TODO(), tskey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for dealID, ds := range dealStates {
|
||||
id, err := strconv.ParseUint(dealID, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := stmt.Exec(
|
||||
id,
|
||||
mt.stateroot.String(),
|
||||
ds.Proposal.PieceCID.String(),
|
||||
ds.Proposal.PieceSize,
|
||||
ds.Proposal.VerifiedDeal,
|
||||
ds.Proposal.Client.String(),
|
||||
ds.Proposal.Provider.String(),
|
||||
ds.Proposal.StartEpoch,
|
||||
ds.Proposal.EndEpoch,
|
||||
ds.Proposal.StoragePricePerEpoch.String(),
|
||||
ds.Proposal.ProviderCollateral.String(),
|
||||
ds.Proposal.ClientCollateral.String(),
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.Exec(`insert into market_deal_proposals select * from mdp on conflict do nothing`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
|
||||
}
|
||||
|
||||
func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error {
|
||||
st.headerLk.Lock()
|
||||
defer st.headerLk.Unlock()
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/filecoin-project/specs-actors/actors/builtin/market"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
@ -81,6 +82,20 @@ type minerStateInfo struct {
|
||||
psize uint64
|
||||
}
|
||||
|
||||
type marketStateInfo struct {
|
||||
// common
|
||||
act types.Actor
|
||||
stateroot cid.Cid
|
||||
|
||||
// calculating changes
|
||||
// calculating changes
|
||||
tsKey types.TipSetKey
|
||||
parentTsKey types.TipSetKey
|
||||
|
||||
// market actor specific
|
||||
state market.State
|
||||
}
|
||||
|
||||
type actorInfo struct {
|
||||
stateroot cid.Cid
|
||||
tsKey types.TipSetKey
|
||||
@ -313,23 +328,28 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
||||
|
||||
log.Infof("Getting actor change info")
|
||||
|
||||
// highly likely that the market actor will change at every epoch
|
||||
marketActorChanges := make(map[types.TipSetKey]*marketStateInfo, len(changes))
|
||||
|
||||
minerChanges := 0
|
||||
for addr, m := range actors {
|
||||
for actor, c := range m {
|
||||
// reward actor
|
||||
if actor.Code == builtin.RewardActorCodeID {
|
||||
rewardTips[c.tsKey] = &rewardStateInfo{
|
||||
stateroot: c.stateroot,
|
||||
baselinePower: big.Zero(),
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// miner actors with head change events
|
||||
if actor.Code == builtin.StorageMinerActorCodeID {
|
||||
// only want actors with head change events
|
||||
if _, found := headsSeen[actor.Head]; found {
|
||||
continue
|
||||
}
|
||||
headsSeen[actor.Head] = struct{}{}
|
||||
|
||||
switch actor.Code {
|
||||
case builtin.StorageMarketActorCodeID:
|
||||
marketActorChanges[c.tsKey] = &marketStateInfo{
|
||||
act: actor,
|
||||
stateroot: c.stateroot,
|
||||
tsKey: c.tsKey,
|
||||
parentTsKey: c.parentTsKey,
|
||||
state: market.State{},
|
||||
}
|
||||
case builtin.StorageMinerActorCodeID:
|
||||
minerChanges++
|
||||
|
||||
minerTips[c.tsKey] = append(minerTips[c.tsKey], &minerStateInfo{
|
||||
@ -346,10 +366,14 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
||||
rawPower: big.Zero(),
|
||||
qalPower: big.Zero(),
|
||||
})
|
||||
|
||||
headsSeen[actor.Head] = struct{}{}
|
||||
// reward actor
|
||||
case builtin.RewardActorCodeID:
|
||||
rewardTips[c.tsKey] = &rewardStateInfo{
|
||||
stateroot: c.stateroot,
|
||||
baselinePower: big.Zero(),
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -435,6 +459,21 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
||||
})
|
||||
log.Infow("Completed Miner Processing", "duration", time.Since(minerProcessingStartedAt).String(), "processed", minerChanges)
|
||||
|
||||
log.Info("Getting market actor info")
|
||||
// TODO: consider taking the min of the array length and using that for concurrency param, e.g:
|
||||
// concurrency := math.Min(len(marketActorChanges), 50)
|
||||
parmap.Par(50, parmap.MapArr(marketActorChanges), func(mrktInfo *marketStateInfo) {
|
||||
astb, err := api.ChainReadObj(ctx, mrktInfo.act.Head)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
if err := mrktInfo.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
log.Info("Getting receipts")
|
||||
|
||||
receipts := fetchParentReceipts(ctx, api, toSync)
|
||||
@ -496,6 +535,17 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types.
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("Storing market actor info")
|
||||
if err := st.storeMarketActorDealProposals(marketActorChanges, api); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := st.storeMarketActorDealStates(marketActorChanges, api); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("Storing messages")
|
||||
|
||||
if err := st.storeMessages(msgs); err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user