diff --git a/api/struct.go b/api/struct.go index 118a7bff8..9d8af338f 100644 --- a/api/struct.go +++ b/api/struct.go @@ -106,7 +106,7 @@ type FullNodeStruct struct { StateMarketDeals func(context.Context, *types.TipSet) (map[string]actors.OnChainDeal, error) `perm:"read"` StateMarketStorageDeal func(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) `perm:"read"` StateLookupID func(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) `perm:"read"` - StateChangedActors func(context.Context, cid.Cid, cid.Cid) (map[string]types.Actor, error) `perm:"read"` + StateChangedActors func(context.Context, cid.Cid, cid.Cid) (map[string]types.Actor, error) `perm:"read"` MarketEnsureAvailable func(context.Context, address.Address, types.BigInt) error `perm:"sign"` diff --git a/cmd/lotus-chainwatch/site/index.html b/cmd/lotus-chainwatch/site/index.html new file mode 100644 index 000000000..b4784acff --- /dev/null +++ b/cmd/lotus-chainwatch/site/index.html @@ -0,0 +1,27 @@ + + + + Lotus ChainWatch + + + +
+
+
+ Lotus ChainWatch +
+
+
+
+ X Actors; Y Miners; Z Power (A Total; B Slashed); +
+
+ U Messages; V Gas Used; Total D FIL transferred; E state changes +
+
+ N Wallets; E% FIL in wallets; F% FIL in miners; M% in market; %G Other actors; %H FIL it treasury +
+
+
+ + diff --git a/cmd/lotus-chainwatch/site/main.css b/cmd/lotus-chainwatch/site/main.css new file mode 100644 index 000000000..38f93fd24 --- /dev/null +++ b/cmd/lotus-chainwatch/site/main.css @@ -0,0 +1,61 @@ +body { + font-family: 'monospace'; + background: #1f1f1f; + color: #f0f0f0; + padding: 0; + margin: 0; +} + +.Index { + width: 100vw; + height: 100vh; + background: #1a1a1a; + color: #f0f0f0; + font-family: monospace; + + display: grid; + grid-template-columns: auto 80vw auto; + grid-template-rows: 3em auto auto auto; + grid-template-areas: + "header header header header" + ". . . ." + ". main main ." + ". main main ." + ". main main ." + ". main main ." + ". main main ." + ". . . ."; +} + +.Index-header { + background: #2a2a2a; + grid-area: header; +} + +.Index-Index-header > div { + padding-left: 0.7em; + padding-top: 0.7em; +} + +.Index-nodes { + grid-area: main; + background: #2a2a2a; +} + +.Index-node { + margin: 5px; + padding: 15px; + background: #1f1f1f; +} + +a:link { + color: #50f020; +} + +a:visited { + color: #50f020; +} + +a:hover { + color: #30a00a; +} diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 2df5821de..5af8e0521 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -38,8 +38,11 @@ create table actors head text not null, nonce int not null, balance text, + stateroot text + constraint actors_blocks_stateroot_fk + references blocks (parentStateRoot), constraint actors_pk - unique (id, code, head, nonce, balance) + primary key (id, nonce, balance, stateroot) ); create table id_address_map @@ -52,7 +55,6 @@ create table id_address_map primary key (id, address) ); - create table messages ( cid text not null @@ -81,6 +83,7 @@ create table blocks constraint blocks_pk primary key, parentWeight numeric not null, + parentStateRoot text not null, height int not null, miner text not null constraint blocks_id_address_map_miner_fk @@ -100,8 +103,36 @@ create table block_messages constraint block_messages_msg_fk references messages, constraint block_messages_pk - unique (block, message) + primary key (block, message) ); + +create table miner_heads +( + head text not null + constraint miner_heads_actors_head_fk + references actors (head), + addr text not null + constraint miner_heads_actors_id_fk + references actors (id), + stateroot text not null + constraint miner_heads_blocks_stateroot_fk + references blocks (parentStateRoot), + sectorset text not null, + provingset text not null, + owner text not null, + worker text not null, + peerid text not null, + sectorsize int not null, + power text not null, + active int, + ppe int not null, + slashed_at int not null, + constraint miner_heads_id_address_map_address_address_fk + foreign key (owner, worker) references id_address_map (address, address), + constraint miner_heads_pk + primary key (head, addr) +); + `) if err != nil { return err @@ -119,20 +150,20 @@ func (st *storage) hasBlock(bh *types.BlockHeader) bool { return exitsts } -func (st *storage) storeActors(actors map[address.Address]map[types.Actor]struct{}) error { +func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Cid) error { tx, err := st.db.Begin() if err != nil { return err } - stmt, err := tx.Prepare(`insert into actors (id, code, head, nonce, balance) values (?, ?, ?, ?, ?) on conflict do nothing`) + stmt, err := tx.Prepare(`insert into actors (id, code, head, nonce, balance, stateroot) values (?, ?, ?, ?, ?, ?) on conflict do nothing`) if err != nil { return err } defer stmt.Close() for addr, acts := range actors { - for act, _ := range acts { - if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String()); err != nil { + for act, st := range acts { + if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String(), st.String()); err != nil { return err } } @@ -141,19 +172,53 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]struct return tx.Commit() } +func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error { + tx, err := st.db.Begin() + if err != nil { + return err + } + + stmt, err := tx.Prepare(`insert into miner_heads (head, addr, stateroot, sectorset, provingset, owner, worker, peerid, sectorsize, power, active, ppe, slashed_at) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict do nothing`) + if err != nil { + return err + } + defer stmt.Close() + for k, i := range miners { + if _, err := stmt.Exec( + k.act.Head.String(), + k.addr.String(), + k.stateroot.String(), + i.state.Sectors.String(), + i.state.ProvingSet.String(), + i.info.Owner.String(), + i.info.Worker.String(), + i.info.PeerID.String(), + i.info.SectorSize, + i.state.Power.String(), + i.state.Active, + i.state.ProvingPeriodEnd, + i.state.SlashedAt, + ); err != nil { + return err + } + } + + return tx.Commit() +} + func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader) error { tx, err := st.db.Begin() if err != nil { return err } - stmt, err := tx.Prepare(`insert into blocks (cid, parentWeight, height, miner, "timestamp") values (?, ?, ?, ?, ?) on conflict do nothing`) + stmt, err := tx.Prepare(`insert into blocks (cid, parentWeight, parentStateRoot, height, miner, "timestamp") values (?, ?, ?, ?, ?, ?) on conflict do nothing`) if err != nil { return err } defer stmt.Close() for _, bh := range bhs { - if _, err := stmt.Exec(bh.Cid().String(), bh.ParentWeight.String(), bh.Height, bh.Miner.String(), bh.Timestamp); err != nil { + if _, err := stmt.Exec(bh.Cid().String(), bh.ParentWeight.String(), bh.ParentStateRoot.String(), bh.Height, bh.Miner.String(), bh.Timestamp); err != nil { return err } } diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 58dc6943c..34d992185 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -1,8 +1,10 @@ package main import ( + "bytes" "container/list" "context" + actors2 "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/address" "sync" @@ -34,9 +36,20 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage) { }() } +type minerKey struct { + addr address.Address + act types.Actor + stateroot cid.Cid +} + +type minerInfo struct { + state actors2.StorageMinerActorState + info actors2.MinerInfo +} + func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet) { addresses := map[address.Address]address.Address{} - actors := map[address.Address]map[types.Actor]struct{}{} + actors := map[address.Address]map[types.Actor]cid.Cid{} var alk sync.Mutex log.Infof("Getting headers / actors") @@ -82,32 +95,67 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Infof("Persisting actors") paDone := 0 - par(40, maparr(toSync), func(bh *types.BlockHeader) { + par(50, maparr(toSync), func(bh *types.BlockHeader) { paDone++ if paDone%100 == 0 { log.Infof("pa: %d %d%%", paDone, (paDone*100)/len(toSync)) } - ts, err := types.NewTipSet([]*types.BlockHeader{bh}) - aadrs, err := api.StateListActors(ctx, ts) - if err != nil { - return - } - par(50, aadrs, func(addr address.Address) { - act, err := api.StateGetActor(ctx, addr, ts) + if len(bh.Parents) == 0 { // genesis case + ts, err := types.NewTipSet([]*types.BlockHeader{bh}) + aadrs, err := api.StateListActors(ctx, ts) if err != nil { log.Error(err) return } + + par(50, aadrs, func(addr address.Address) { + act, err := api.StateGetActor(ctx, addr, ts) + if err != nil { + log.Error(err) + return + } + alk.Lock() + _, ok := actors[addr] + if !ok { + actors[addr] = map[types.Actor]cid.Cid{} + } + actors[addr][*act] = bh.ParentStateRoot + addresses[addr] = address.Undef + alk.Unlock() + }) + + return + } + + pts, err := api.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...)) + if err != nil { + log.Error(err) + return + } + + changes, err := api.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot) + if err != nil { + log.Error(err) + return + } + + for a, act := range changes { + addr, err := address.NewFromString(a) + if err != nil { + log.Error(err) + return + } + alk.Lock() _, ok := actors[addr] if !ok { - actors[addr] = map[types.Actor]struct{}{} + actors[addr] = map[types.Actor]cid.Cid{} } - actors[addr][*act] = struct{}{} + actors[addr][act] = bh.ParentStateRoot addresses[addr] = address.Undef alk.Unlock() - }) + } }) if err := st.storeActors(actors); err != nil { @@ -115,6 +163,55 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS return } + log.Infof("Persisting miners") + + miners := map[minerKey]*minerInfo{} + + for addr, m := range actors { + for actor, c := range m { + if actor.Code != actors2.StorageMinerCodeCid { + continue + } + + miners[minerKey{ + addr: addr, + act: actor, + stateroot: c, + }] = &minerInfo{} + } + } + + par(50, kvmaparr(miners), func(it func() (minerKey, *minerInfo)) { + k, info := it() + + astb, err := api.ChainReadObj(ctx, k.act.Head) + if err != nil { + log.Error(err) + return + } + + if err := info.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil { + log.Error(err) + return + } + + ib, err := api.ChainReadObj(ctx, info.state.Info) + if err != nil { + log.Error(err) + return + } + + if err := info.info.UnmarshalCBOR(bytes.NewReader(ib)); err != nil { + log.Error(err) + return + } + }) + + if err := st.storeMiners(miners); err != nil { + log.Error(err) + return + } + log.Infof("Persisting headers") if err := st.storeHeaders(toSync); err != nil { log.Error(err) diff --git a/cmd/lotus-chainwatch/utils.go b/cmd/lotus-chainwatch/utils.go index 82a2a0110..a5c821683 100644 --- a/cmd/lotus-chainwatch/utils.go +++ b/cmd/lotus-chainwatch/utils.go @@ -21,7 +21,7 @@ func maparr(in interface{}) interface{} { func kmaparr(in interface{}) interface{} { rin := reflect.ValueOf(in) - rout := reflect.MakeSlice(reflect.SliceOf(rin.Type().Elem()), rin.Len(), rin.Len()) + rout := reflect.MakeSlice(reflect.SliceOf(rin.Type().Key()), rin.Len(), rin.Len()) var i int it := rin.MapRange() @@ -33,6 +33,32 @@ func kmaparr(in interface{}) interface{} { return rout.Interface() } +// map[k]v => []func() (k, v) +func kvmaparr(in interface{}) interface{} { + rin := reflect.ValueOf(in) + + t := reflect.FuncOf([]reflect.Type{}, []reflect.Type{ + rin.Type().Key(), + rin.Type().Elem(), + }, false) + + rout := reflect.MakeSlice(reflect.SliceOf(t), rin.Len(), rin.Len()) + var i int + + it := rin.MapRange() + for it.Next() { + k := it.Key() + v := it.Value() + + rout.Index(i).Set(reflect.MakeFunc(t, func(args []reflect.Value) (results []reflect.Value) { + return []reflect.Value{k, v} + })) + i++ + } + + return rout.Interface() +} + func par(concurrency int, arr interface{}, f interface{}) { throttle := make(chan struct{}, concurrency) var wg sync.WaitGroup