chainwatch: Store deals

This commit is contained in:
Łukasz Magiera 2020-01-20 01:49:52 +01:00
parent 62d661f898
commit a46a9bb42e
2 changed files with 103 additions and 8 deletions

View File

@ -2,16 +2,18 @@ package main
import (
"database/sql"
"encoding/hex"
"fmt"
"github.com/filecoin-project/lotus/api"
"golang.org/x/xerrors"
"sync"
"time"
"github.com/filecoin-project/go-address"
"github.com/ipfs/go-cid"
_ "github.com/lib/pq"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
)
@ -208,6 +210,33 @@ create table if not exists miner_heads
primary key (head, addr)
);
create table if not exists deals
(
id int not null,
pieceRef text not null,
pieceSize bigint not null,
client text not null,
provider text not null,
expiration decimal not null,
duration decimal not null,
epochPrice decimal not null,
collateral decimal not null,
constraint deals_pk
primary key (id)
);
create index if not exists deals_client_index
on deals (client);
create unique index if not exists deals_id_uindex
on deals (id);
create index if not exists deals_pieceRef_index
on deals (pieceRef);
create index if not exists deals_provider_index
on deals (provider);
`)
if err != nil {
return err
@ -656,11 +685,8 @@ func (st *storage) storeMpoolInclusions(msgs []api.MpoolUpdate) error {
}
if _, err := tx.Exec(`
create temp table mi (like mpool_messages excluding constraints) on commit drop;
`); err != nil {
create temp table mi (like mpool_messages excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
@ -693,6 +719,59 @@ create temp table mi (like mpool_messages excluding constraints) on commit drop;
return tx.Commit()
}
func (st *storage) storeDeals(deals map[string]actors.OnChainDeal) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`
create temp table d (like deals excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy d (id, pieceref, piecesize, client, "provider", expiration, duration, epochprice, collateral) from stdin `)
if err != nil {
return err
}
var bloat uint64
for id, deal := range deals {
if len(deal.PieceRef) > 40 {
bloat += uint64(len(deal.PieceRef))
continue
}
if _, err := stmt.Exec(
id,
hex.EncodeToString(deal.PieceRef),
deal.PieceSize,
deal.Client.String(),
deal.Provider.String(),
fmt.Sprint(deal.ProposalExpiration),
fmt.Sprint(deal.Duration),
deal.StoragePricePerEpoch.String(),
deal.StorageCollateral.String(),
); err != nil {
return err
}
}
if bloat > 0 {
log.Warnf("deal PieceRefs had %d bytes of garbage", bloat)
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into deals select * from d on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit()
}
func (st *storage) close() error {
return st.db.Close()
}

View File

@ -355,6 +355,22 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
log.Infof("Sync stage done")
}
log.Infof("Get deals")
// TODO: incremental, gather expired
deals, err := api.StateMarketDeals(ctx, ts)
if err != nil {
log.Error(err)
return
}
log.Infof("Store deals")
if err := st.storeDeals(deals); err != nil {
log.Error(err)
return
}
log.Infof("Sync done")
}