From c16ea42fad490a26ddc6344c06c11a1547d68d4d Mon Sep 17 00:00:00 2001 From: Frrist Date: Tue, 30 Jun 2020 10:26:41 -0700 Subject: [PATCH] fix: update miner_head table with new miner fields (#2142) --- cmd/lotus-chainwatch/main.go | 3 + cmd/lotus-chainwatch/storage.go | 117 ++++++++++++++++---------------- cmd/lotus-chainwatch/sync.go | 15 ++-- 3 files changed, 69 insertions(+), 66 deletions(-) diff --git a/cmd/lotus-chainwatch/main.go b/cmd/lotus-chainwatch/main.go index 704c4d457..b5ceb7348 100644 --- a/cmd/lotus-chainwatch/main.go +++ b/cmd/lotus-chainwatch/main.go @@ -18,6 +18,9 @@ var log = logging.Logger("chainwatch") func main() { _ = logging.SetLogLevel("*", "INFO") + if err := logging.SetLogLevel("rpc", "error"); err != nil { + panic(err) + } log.Info("Starting chainwatch") diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index e9361e5a4..e56cc9a9b 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/libp2p/go-libp2p-core/peer" "sync" "time" @@ -257,7 +258,6 @@ create index if not exists receipts_msg_state_index create table if not exists miner_sectors ( - stateroot text not null, miner text not null, sectorid bigint not null, activation bigint not null, @@ -267,36 +267,29 @@ create table if not exists miner_sectors sealcid text not null, sealrandepoch bigint not null, constraint miner_sectors_pk - primary key (stateroot, miner, sectorid) + primary key (miner, sectorid) ); -create index if not exists miner_sectors_stateroot_miner_sectorid_index - on miner_sectors (stateroot, miner, sectorid); -/* +create index if not exists miner_sectors_miner_sectorid_index + on miner_sectors (miner, sectorid); + create table if not exists miner_heads ( - head text not null, addr text not null, - stateroot text not null, - sectorset text not null, - setsize decimal not null, - provingset text not null, - provingsize decimal not null, owner text not null, worker text not null, - peerid text not null, - sectorsize bigint not null, - power decimal not null, - active bool, - ppe bigint not null, - slashed_at bigint not null, + peerid text, + sectorsize text not null, + windowpostpartitionsectors bigint not null, + + precommitdeposits text not null, + lockedfunds text not null, + nextdeadlineprocessfaults bigint not null, constraint miner_heads_pk - primary key (head, addr) + primary key (addr) ); -create index if not exists miner_heads_stateroot_index - on miner_heads (stateroot); - +/* create or replace function miner_tips(epoch bigint) returns table (head text, addr text, @@ -490,7 +483,7 @@ func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*newMinerInfo, s return xerrors.Errorf("prep temp: %w", err) } - stmt, err := tx.Prepare(`copy ms (stateroot, miner, sectorid, activation, dealweight, verifieddealweight, expiration, sealcid, sealrandepoch) from STDIN `) + stmt, err := tx.Prepare(`copy ms (miner, sectorid, activation, dealweight, verifieddealweight, expiration, sealcid, sealrandepoch) from STDIN `) if err != nil { return err } @@ -504,7 +497,6 @@ func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*newMinerInfo, s for _, sector := range sectors { if _, err := stmt.Exec( - miner.stateroot.String(), miner.addr.String(), uint64(sector.ID), int64(sector.Info.ActivationEpoch), @@ -531,54 +523,65 @@ func (st *storage) storeSectors(minerTips map[types.TipSetKey][]*newMinerInfo, s return tx.Commit() } -func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error { - /*tx, err := st.db.Begin() - if err != nil { - return err - } +func (st *storage) storeMiners(minerTips map[types.TipSetKey][]*newMinerInfo) error { + tx, err := st.db.Begin() + if err != nil { + return err + } - if _, err := tx.Exec(` + if _, err := tx.Exec(` create temp table mh (like miner_heads excluding constraints) on commit drop; `); err != nil { - return xerrors.Errorf("prep temp: %w", err) - } + return xerrors.Errorf("prep temp: %w", err) + } - stmt, err := tx.Prepare(`copy mh (head, addr, stateroot, sectorset, setsize, provingset, provingsize, owner, worker, peerid, sectorsize, power, ppe) from STDIN`) - if err != nil { - return err - } - for k, i := range miners { + stmt, err := tx.Prepare(`copy mh (addr, owner, worker, peerid, sectorsize, windowpostpartitionsectors, precommitdeposits, lockedfunds, nextdeadlineprocessfaults) from STDIN`) + if err != nil { + return err + } + for ts, miners := range minerTips { + for _, miner := range miners { + var pid string + if len(miner.info.PeerId) != 0 { + peerid, err := peer.IDFromBytes(miner.info.PeerId) + if err != nil { + // this should "never happen", but if it does we should still store info about the miner. + log.Warnw("failed to decode peerID", "peerID (bytes)", miner.info.PeerId, "miner", miner.addr, "tipset", ts.String()) + } else { + pid = peerid.String() + } + } if _, err := stmt.Exec( - k.act.Head.String(), - k.addr.String(), - k.stateroot.String(), - i.state.Sectors.String(), - fmt.Sprint(i.ssize), - i.state.ProvingSet.String(), - fmt.Sprint(i.psize), - i.info.Owner.String(), - i.info.Worker.String(), - i.info.PeerId.String(), - i.info.SectorSize, - i.power.String(), // TODO: SPA - i.state.PoStState.ProvingPeriodStart, + miner.addr.String(), + miner.info.Owner.String(), + miner.info.Worker.String(), + pid, + miner.info.SectorSize.ShortString(), + miner.info.WindowPoStPartitionSectors, + + miner.state.PreCommitDeposits.String(), + miner.state.LockedFunds.String(), + miner.state.NextDeadlineToProcessFaults, ); err != nil { + log.Errorw("failed to store miner state", miner.state) + log.Errorw("failed to store miner info", miner.info) return err } - } - if err := stmt.Close(); err != nil { - return err - } - if _, err := tx.Exec(`insert into miner_heads select * from mh on conflict do nothing `); err != nil { - return xerrors.Errorf("actor put: %w", err) } + } + if err := stmt.Close(); err != nil { + return err + } - return tx.Commit()*/ - return nil + if _, err := tx.Exec(`insert into miner_heads select * from mh on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + + return tx.Commit() } func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error { diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index db9786d4c..9483113b6 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -123,7 +123,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. allToSync[bh.Cid()] = bh if len(allToSync)%500 == 10 { - log.Infof("to visit: (%d) %s @%d", len(allToSync), bh.Cid(), bh.Height) + log.Debugf("to visit: (%d) %s @%d", len(allToSync), bh.Cid(), bh.Height) } if len(bh.Parents) == 0 { @@ -276,7 +276,6 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. tsKey: pts.Key(), } addressToID[addr] = address.Undef - // alk.Unlock() } }) @@ -423,14 +422,12 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, headTs *types. } // TODO re-enable when ready to fill miner metadata, the contents of storeMiners is commented out too. - /* - log.Info("Storing miners") + log.Info("Storing miners") - if err := st.storeMiners(miners); err != nil { - log.Error(err) - return - } - */ + if err := st.storeMiners(minerTips); err != nil { + log.Error(err) + return + } log.Info("Storing miner sectors")