Merge pull request #632 from filecoin-project/feat/chainwatch

ChainWatch
This commit is contained in:
Łukasz Magiera 2019-11-20 10:57:53 -06:00 committed by GitHub
commit 4e2d7f0b1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1758 additions and 23 deletions

4
.gitignore vendored
View File

@ -13,6 +13,10 @@
build/.*
build/paramfetch.sh
/vendor
/blocks.dot
/blocks.svg
/chainwatch
/chainwatch.db
*-fuzz.zip
/chain/types/work_msg/

View File

@ -109,6 +109,12 @@ fountain:
go run github.com/GeertJohan/go.rice/rice append --exec fountain -i ./cmd/lotus-fountain
.PHONY: fountain
chainwatch:
rm -f chainwatch
go build -o chainwatch ./cmd/lotus-chainwatch
go run github.com/GeertJohan/go.rice/rice append --exec chainwatch -i ./cmd/lotus-chainwatch
.PHONY: chainwatch
stats:
rm -f stats
go build -o stats ./tools/stats

View File

@ -38,12 +38,14 @@ type FullNode interface {
// syncer
SyncState(context.Context) (*SyncState, error)
SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error
SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error)
// messages
MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error)
MpoolPush(context.Context, *types.SignedMessage) error // TODO: remove
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) // get nonce, sign, push
MpoolGetNonce(context.Context, address.Address) (uint64, error)
MpoolSub(context.Context) (<-chan MpoolUpdate, error)
// FullNodeStruct
@ -110,6 +112,8 @@ type FullNode interface {
StateMarketParticipants(context.Context, *types.TipSet) (map[string]actors.StorageParticipantBalance, error)
StateMarketDeals(context.Context, *types.TipSet) (map[string]actors.OnChainDeal, error)
StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error)
StateLookupID(context.Context, address.Address, *types.TipSet) (address.Address, error)
StateChangedActors(context.Context, cid.Cid, cid.Cid) (map[string]types.Actor, error)
MarketEnsureAvailable(context.Context, address.Address, types.BigInt) error
// MarketFreeBalance
@ -274,3 +278,15 @@ const (
StageMessages
StageSyncComplete
)
type MpoolChange int
const (
MpoolAdd MpoolChange = iota
MpoolRemove
)
type MpoolUpdate struct {
Type MpoolChange
Message *types.SignedMessage
}

View File

@ -51,12 +51,15 @@ type FullNodeStruct struct {
ChainGetGenesis func(context.Context) (*types.TipSet, error) `perm:"read"`
ChainTipSetWeight func(context.Context, *types.TipSet) (types.BigInt, error) `perm:"read"`
SyncState func(context.Context) (*SyncState, error) `perm:"read"`
SyncSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"`
SyncState func(context.Context) (*SyncState, error) `perm:"read"`
SyncSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"`
SyncIncomingBlocks func(ctx context.Context) (<-chan *types.BlockHeader, error) `perm:"read"`
MpoolPending func(context.Context, *types.TipSet) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPush func(context.Context, *types.SignedMessage) error `perm:"write"`
MpoolPushMessage func(context.Context, *types.Message) (*types.SignedMessage, error) `perm:"sign"`
MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"`
MpoolSub func(context.Context) (<-chan MpoolUpdate, error) `perm:"read"`
MinerRegister func(context.Context, address.Address) error `perm:"admin"`
MinerUnregister func(context.Context, address.Address) error `perm:"admin"`
@ -74,8 +77,6 @@ type FullNodeStruct struct {
WalletExport func(context.Context, address.Address) (*types.KeyInfo, error) `perm:"admin"`
WalletImport func(context.Context, *types.KeyInfo) (address.Address, error) `perm:"admin"`
MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"`
ClientImport func(ctx context.Context, path string) (cid.Cid, error) `perm:"admin"`
ClientListImports func(ctx context.Context) ([]Import, error) `perm:"write"`
ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"`
@ -105,6 +106,8 @@ type FullNodeStruct struct {
StateMarketParticipants func(context.Context, *types.TipSet) (map[string]actors.StorageParticipantBalance, error) `perm:"read"`
StateMarketDeals func(context.Context, *types.TipSet) (map[string]actors.OnChainDeal, error) `perm:"read"`
StateMarketStorageDeal func(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) `perm:"read"`
StateLookupID func(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) `perm:"read"`
StateChangedActors func(context.Context, cid.Cid, cid.Cid) (map[string]types.Actor, error) `perm:"read"`
MarketEnsureAvailable func(context.Context, address.Address, types.BigInt) error `perm:"sign"`
@ -225,6 +228,10 @@ func (c *FullNodeStruct) MpoolPushMessage(ctx context.Context, msg *types.Messag
return c.Internal.MpoolPushMessage(ctx, msg)
}
func (c *FullNodeStruct) MpoolSub(ctx context.Context) (<-chan MpoolUpdate, error) {
return c.Internal.MpoolSub(ctx)
}
func (c *FullNodeStruct) MinerRegister(ctx context.Context, addr address.Address) error {
return c.Internal.MinerRegister(ctx, addr)
}
@ -345,6 +352,10 @@ func (c *FullNodeStruct) SyncSubmitBlock(ctx context.Context, blk *types.BlockMs
return c.Internal.SyncSubmitBlock(ctx, blk)
}
func (c *FullNodeStruct) SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) {
return c.Internal.SyncIncomingBlocks(ctx)
}
func (c *FullNodeStruct) StateMinerSectors(ctx context.Context, addr address.Address, ts *types.TipSet) ([]*ChainSectorInfo, error) {
return c.Internal.StateMinerSectors(ctx, addr, ts)
}
@ -420,6 +431,14 @@ func (c *FullNodeStruct) StateMarketStorageDeal(ctx context.Context, dealid uint
return c.Internal.StateMarketStorageDeal(ctx, dealid, ts)
}
func (c *FullNodeStruct) StateLookupID(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
return c.Internal.StateLookupID(ctx, addr, ts)
}
func (c *FullNodeStruct) StateChangedActors(ctx context.Context, olnstate cid.Cid, newstate cid.Cid) (map[string]types.Actor, error) {
return c.Internal.StateChangedActors(ctx, olnstate, newstate)
}
func (c *FullNodeStruct) MarketEnsureAvailable(ctx context.Context, addr address.Address, amt types.BigInt) error {
return c.Internal.MarketEnsureAvailable(ctx, addr, amt)
}

View File

@ -12,6 +12,7 @@ import (
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/minio/blake2b-simd"
"github.com/polydawn/refmt/obj/atlas"
"golang.org/x/xerrors"
cbg "github.com/whyrusleeping/cbor-gen"
)
@ -147,6 +148,22 @@ func (a Address) Format(f fmt.State, c rune) {
}
}
func (a *Address) Scan(value interface{}) error {
switch value := value.(type) {
case string:
a1, err := decode(value)
if err != nil {
return err
}
*a = a1
return nil
default:
return xerrors.New("non-string types unsupported")
}
}
// NewIDAddress returns an address using the ID protocol.
func NewIDAddress(id uint64) (Address, error) {
return newAddress(ID, leb128.FromUInt64(id))

View File

@ -1,17 +1,19 @@
package chain
import (
"context"
"errors"
"sort"
"sync"
"time"
"errors"
lru "github.com/hashicorp/golang-lru"
pubsub "github.com/libp2p/go-libp2p-pubsub"
lps "github.com/whyrusleeping/pubsub"
"go.uber.org/multierr"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/stmgr"
@ -32,6 +34,8 @@ var (
const (
msgTopic = "/fil/messages"
localUpdates = "update"
)
type MessagePool struct {
@ -54,6 +58,8 @@ type MessagePool struct {
maxTxPoolSize int
blsSigCache *lru.TwoQueueCache
changes *lps.PubSub
}
type msgSet struct {
@ -95,6 +101,7 @@ func NewMessagePool(sm *stmgr.StateManager, ps *pubsub.PubSub) *MessagePool {
minGasPrice: types.NewInt(0),
maxTxPoolSize: 100000,
blsSigCache: cache,
changes: lps.New(50),
}
sm.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
err := mp.HeadChange(rev, app)
@ -231,6 +238,11 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
}
mset.add(m)
mp.changes.Pub(api.MpoolUpdate{
Type: api.MpoolAdd,
Message: m,
}, localUpdates)
return nil
}
@ -304,6 +316,13 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
return
}
if m, ok := mset.msgs[nonce]; ok {
mp.changes.Pub(api.MpoolUpdate{
Type: api.MpoolRemove,
Message: m,
}, localUpdates)
}
// NB: This deletes any message with the given nonce. This makes sense
// as two messages with the same sender cannot have the same nonce
delete(mset.msgs, nonce)
@ -413,3 +432,27 @@ func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage {
Signature: sig,
}
}
func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, error) {
out := make(chan api.MpoolUpdate, 20)
sub := mp.changes.Sub(localUpdates)
go func() {
defer mp.changes.Unsub(sub, localIncoming)
for {
select {
case u := <-sub:
select {
case out <- u.(api.MpoolUpdate):
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out, nil
}

View File

@ -47,13 +47,11 @@ func LoadStateTree(cst *hamt.CborIpldStore, c cid.Cid) (*StateTree, error) {
}
func (st *StateTree) SetActor(addr address.Address, act *types.Actor) error {
if addr.Protocol() != address.ID {
iaddr, err := st.lookupID(addr)
if err != nil {
return xerrors.Errorf("ID lookup failed: %w", err)
}
addr = iaddr
iaddr, err := st.LookupID(addr)
if err != nil {
return xerrors.Errorf("ID lookup failed: %w", err)
}
addr = iaddr
cact, ok := st.actorcache[addr]
if ok {
@ -67,7 +65,11 @@ func (st *StateTree) SetActor(addr address.Address, act *types.Actor) error {
return st.root.Set(context.TODO(), string(addr.Bytes()), act)
}
func (st *StateTree) lookupID(addr address.Address) (address.Address, error) {
func (st *StateTree) LookupID(addr address.Address) (address.Address, error) {
if addr.Protocol() == address.ID {
return addr, nil
}
act, err := st.GetActor(actors.InitAddress)
if err != nil {
return address.Undef, xerrors.Errorf("getting init actor: %w", err)
@ -86,16 +88,14 @@ func (st *StateTree) GetActor(addr address.Address) (*types.Actor, error) {
return nil, fmt.Errorf("GetActor called on undefined address")
}
if addr.Protocol() != address.ID {
iaddr, err := st.lookupID(addr)
if err != nil {
if xerrors.Is(err, hamt.ErrNotFound) {
return nil, xerrors.Errorf("resolution lookup failed (%s): %w", addr, types.ErrActorNotFound)
}
return nil, xerrors.Errorf("address resolution: %w", err)
iaddr, err := st.LookupID(addr)
if err != nil {
if xerrors.Is(err, hamt.ErrNotFound) {
return nil, xerrors.Errorf("resolution lookup failed (%s): %w", addr, types.ErrActorNotFound)
}
addr = iaddr
return nil, xerrors.Errorf("address resolution: %w", err)
}
addr = iaddr
cact, ok := st.actorcache[addr]
if ok {
@ -103,7 +103,7 @@ func (st *StateTree) GetActor(addr address.Address) (*types.Actor, error) {
}
var act types.Actor
err := st.root.Find(context.TODO(), string(addr.Bytes()), &act)
err = st.root.Find(context.TODO(), string(addr.Bytes()), &act)
if err != nil {
if err == hamt.ErrNotFound {
return nil, types.ErrActorNotFound

View File

@ -19,6 +19,7 @@ import (
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"github.com/whyrusleeping/pubsub"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
@ -35,6 +36,8 @@ import (
var log = logging.Logger("chain")
var localIncoming = "incoming"
type Syncer struct {
// The heaviest known tipset in the network.
@ -58,6 +61,8 @@ type Syncer struct {
syncLock sync.Mutex
syncmgr *SyncManager
incoming *pubsub.PubSub
}
func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID) (*Syncer, error) {
@ -78,6 +83,8 @@ func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, self peer.ID)
store: sm.ChainStore(),
sm: sm,
self: self,
incoming: pubsub.New(50),
}
s.syncmgr = NewSyncManager(s.Sync)
@ -109,6 +116,8 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
}
}
syncer.incoming.Pub(fts.TipSet().Blocks(), localIncoming)
if from == syncer.self {
// TODO: this is kindof a hack...
log.Debug("got block from ourselves")
@ -139,6 +148,33 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
syncer.syncmgr.SetPeerHead(ctx, from, fts.TipSet())
}
func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) {
sub := syncer.incoming.Sub(localIncoming)
out := make(chan *types.BlockHeader, 10)
go func() {
defer syncer.incoming.Unsub(sub, localIncoming)
for {
select {
case r := <-sub:
hs := r.([]*types.BlockHeader)
for _, h := range hs {
select {
case out <- h:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}()
return out, nil
}
func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
var bcids, scids []cbg.CBORMarshaler
for _, m := range fblk.BlsMessages {

View File

@ -124,6 +124,28 @@ func (bi *BigInt) UnmarshalJSON(b []byte) error {
return nil
}
func (bi *BigInt) Scan(value interface{}) error {
switch value := value.(type) {
case string:
i, ok := big.NewInt(0).SetString(value, 10)
if !ok {
if value == "<nil>" {
return nil
}
return xerrors.Errorf("failed to parse bigint string: '%s'", value)
}
bi.Int = i
return nil
case int64:
bi.Int = big.NewInt(value)
return nil
default:
return xerrors.Errorf("non-string types unsupported: %T", value)
}
}
func (bi *BigInt) cborBytes() []byte {
if bi.Int == nil {
return []byte{}

View File

@ -12,6 +12,7 @@ var mpoolCmd = &cli.Command{
Usage: "Manage message pool",
Subcommands: []*cli.Command{
mpoolPending,
mpoolSub,
},
}
@ -43,3 +44,35 @@ var mpoolPending = &cli.Command{
return nil
},
}
var mpoolSub = &cli.Command{
Name: "sub",
Usage: "Subscibe to mpool changes",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
sub, err := api.MpoolSub(ctx)
if err != nil {
return err
}
for {
select {
case update := <-sub:
out, err := json.MarshalIndent(update, "", " ")
if err != nil {
return err
}
fmt.Println(string(out))
case <-ctx.Done():
return nil
}
}
},
}

View File

@ -21,6 +21,7 @@ var stateCmd = &cli.Command{
stateListActorsCmd,
stateListMinersCmd,
stateGetActorCmd,
stateLookupIDCmd,
},
}
@ -288,3 +289,35 @@ var stateGetActorCmd = &cli.Command{
return nil
},
}
var stateLookupIDCmd = &cli.Command{
Name: "lookup",
Usage: "Find corresponding ID address",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
if !cctx.Args().Present() {
return fmt.Errorf("must pass address of actor to get")
}
addr, err := address.NewFromString(cctx.Args().First())
if err != nil {
return err
}
a, err := api.StateLookupID(ctx, addr, nil)
if err != nil {
return err
}
fmt.Printf("%s\n", a)
return nil
},
}

View File

@ -0,0 +1,27 @@
package main
import (
"context"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
aapi "github.com/filecoin-project/lotus/api"
)
func subBlocks(ctx context.Context, api aapi.FullNode, st *storage) {
sub, err := api.SyncIncomingBlocks(ctx)
if err != nil {
log.Error(err)
return
}
for bh := range sub {
err := st.storeHeaders(map[cid.Cid]*types.BlockHeader{
bh.Cid(): bh,
}, false)
if err != nil {
log.Error(err)
}
}
}

View File

@ -0,0 +1,63 @@
package main
import (
"fmt"
"hash/crc32"
"strconv"
"github.com/ipfs/go-cid"
"gopkg.in/urfave/cli.v2"
)
var dotCmd = &cli.Command{
Name: "dot",
Usage: "generate dot graphs",
Action: func(cctx *cli.Context) error {
st, err := openStorage()
if err != nil {
return err
}
minH, err := strconv.ParseInt(cctx.Args().Get(0), 10, 32)
tosee, err := strconv.ParseInt(cctx.Args().Get(1), 10, 32)
maxH := minH + tosee
res, err := st.db.Query("select block, parent, b.miner, b.height from block_parents inner join blocks b on block_parents.block = b.cid where b.height > ? and b.height < ?", minH, maxH)
if err != nil {
return err
}
fmt.Println("digraph D {")
for res.Next() {
var block, parent, miner string
var height uint64
if err := res.Scan(&block, &parent, &miner, &height); err != nil {
return err
}
bc, err := cid.Parse(block)
if err != nil {
return err
}
has := st.hasBlock(bc)
col := crc32.Checksum([]byte(miner), crc32.MakeTable(crc32.Castagnoli))&0xc0c0c0c0 + 0x30303030
hasstr := ""
if !has {
hasstr = " UNSYNCED"
}
fmt.Printf("%s [label = \"%s:%d%s\", fillcolor = \"#%06x\", style=filled, forcelabels=true]\n%s -> %s\n", block, miner, height, hasstr, col, block, parent)
}
if res.Err() != nil {
return res.Err()
}
fmt.Println("}")
return nil
},
}

View File

@ -0,0 +1,99 @@
package main
import (
"fmt"
"net/http"
"os"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
)
var log = logging.Logger("chainwatch")
func main() {
logging.SetLogLevel("*", "INFO")
log.Info("Starting chainwatch")
local := []*cli.Command{
runCmd,
dotCmd,
}
app := &cli.App{
Name: "lotus-chainwatch",
Usage: "Devnet token distribution utility",
Version: build.Version,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
EnvVars: []string{"LOTUS_PATH"},
Value: "~/.lotus", // TODO: Consider XDG_DATA_HOME
},
},
Commands: local,
}
if err := app.Run(os.Args); err != nil {
log.Warnf("%+v", err)
return
}
}
var runCmd = &cli.Command{
Name: "run",
Usage: "Start lotus chainwatch",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "front",
Value: "127.0.0.1:8418",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
v, err := api.Version(ctx)
if err != nil {
return err
}
log.Info("Remote version: %s", v.Version)
st, err := openStorage()
if err != nil {
return err
}
defer st.close()
runSyncer(ctx, api, st)
go subMpool(ctx, api, st)
go subBlocks(ctx, api, st)
h, err := newHandler(api, st)
if err != nil {
return xerrors.Errorf("handler setup: %w", err)
}
http.Handle("/", h)
fmt.Printf("Open http://%s\n", cctx.String("front"))
go func() {
<-ctx.Done()
os.Exit(0)
}()
return http.ListenAndServe(cctx.String("front"), nil)
},
}

View File

@ -0,0 +1,38 @@
package main
import (
"context"
"github.com/ipfs/go-cid"
aapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
)
func subMpool(ctx context.Context, api aapi.FullNode, st *storage) {
sub, err := api.MpoolSub(ctx)
if err != nil {
return
}
for change := range sub {
if change.Type != aapi.MpoolAdd {
continue
}
log.Info("mpool message")
err := st.storeMessages(map[cid.Cid]*types.Message{
change.Message.Message.Cid(): &change.Message.Message,
})
if err != nil {
log.Error(err)
continue
}
if err := st.storeMpoolInclusion(change.Message.Message.Cid()); err != nil {
log.Error(err)
continue
}
}
}

View File

@ -0,0 +1,32 @@
<!DOCTYPE html>
<html>
<head>
<title>Lotus ChainWatch</title>
<link rel="stylesheet" type="text/css" href="main.css">
</head>
<body>
<div class="Index">
<div class="Index-header">
<div>
<span>Lotus ChainWatch</span>
</div>
</div>
<div class="Index-nodes">
<div class="Index-node">
<b>{{countCol "actors" "id"}}</b> Actors;
<b>{{countCol "miner_heads" "addr"}}</b> Miners;
<b>{{netPower "slashed_at = 0" | sizeStr}}</b> Power
(<b>{{netPower "" | sizeStr}}</b> Total;
<b>{{netPower "slashed_at > 0" | sizeStr}}</b> Slashed)
</div>
<div class="Index-node">
{{count "messages"}} Messages; {{count "actors"}} state changes
</div>
<div class="Index-node">
{{count "id_address_map" "id != address"}} <a href="keys.html">Keys</a>;
E% FIL in wallets; F% FIL in miners; M% in market; %G Other actors; %H FIL it treasury
</div>
</div>
</div>
</body>
</html>

View File

@ -0,0 +1,40 @@
<!DOCTYPE html>
<html>
<head>
<title>Lotus ChainWatch</title>
<link rel="stylesheet" type="text/css" href="main.css">
</head>
<body>
{{$wallet := param "w"}}
<div class="Index">
<div class="Index-header">
<div>
<span>Lotus ChainWatch - Wallet {{$wallet}}</span>
</div>
</div>
<div class="Index-nodes">
<div class="Index-node">
Balance: {{queryNum "select balance from actors inner join main.id_address_map m on m.address = ? where actors.id = m.id order by nonce desc limit 1" $wallet }}
</div>
<div class="Index-node">
Messages:
<table>
<tr><td>Dir</td><td>Peer</td><td>Nonce</td><td>Value</td><td>Block</td><td>Mpool Wait</td></tr>
{{ range messages "`from` = ? or `to` = ?" $wallet $wallet $wallet}}
<tr>
{{ if eq .From.String $wallet }}
<td>To</td><td>{{.To.String}}</td>
{{else}}
<td>From</td><td>{{.From.String}}</td>
{{end}}
<td>{{.Nonce}}</td>
<td>{{.Value}}</td>
</tr>
{{end}}
</table>
</div>
</div>
</div>
</body>
</html>

View File

@ -0,0 +1,23 @@
<!DOCTYPE html>
<html>
<head>
<title>Lotus ChainWatch</title>
<link rel="stylesheet" type="text/css" href="main.css">
</head>
<body>
<div class="Index">
<div class="Index-header">
<div>
<span>Lotus ChainWatch - Wallets</span>
</div>
</div>
<div class="Index-nodes">
<div class="Index-node">
{{range strings "id_address_map" "address" "address != id"}}
<div><a href="key.html?w={{.}}">{{.}}</a></div>
{{end}}
</div>
</div>
</div>
</body>
</html>

View File

@ -0,0 +1,62 @@
body {
font-family: 'monospace';
background: #1f1f1f;
color: #f0f0f0;
padding: 0;
margin: 0;
}
.Index {
width: 100vw;
height: 100vh;
background: #1a1a1a;
color: #f0f0f0;
font-family: monospace;
overflow: auto;
display: grid;
grid-template-columns: auto 80vw auto;
grid-template-rows: 3em auto auto auto;
grid-template-areas:
"header header header header"
". . . ."
". main main ."
". main main ."
". main main ."
". main main ."
". main main ."
". . . .";
}
.Index-header {
background: #2a2a2a;
grid-area: header;
}
.Index-Index-header > div {
padding-left: 0.7em;
padding-top: 0.7em;
}
.Index-nodes {
grid-area: main;
background: #2a2a2a;
}
.Index-node {
margin: 5px;
padding: 15px;
background: #1f1f1f;
}
a:link {
color: #50f020;
}
a:visited {
color: #50f020;
}
a:hover {
color: #30a00a;
}

View File

@ -0,0 +1,421 @@
package main
import (
"database/sql"
"sync"
"time"
"github.com/ipfs/go-cid"
_ "github.com/mattn/go-sqlite3"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types"
)
type storage struct {
db *sql.DB
headerLk sync.Mutex
}
func openStorage() (*storage, error) {
db, err := sql.Open("sqlite3", "./chainwatch.db")
if err != nil {
return nil, err
}
st := &storage{db: db}
return st, st.setup()
}
func (st *storage) setup() error {
tx, err := st.db.Begin()
if err != nil {
return err
}
_, err = tx.Exec(`
create table if not exists actors
(
id text not null,
code text not null,
head text not null,
nonce int not null,
balance text not null,
stateroot text
constraint actors_blocks_stateroot_fk
references blocks (parentStateRoot),
constraint actors_pk
primary key (id, nonce, balance, stateroot)
);
create index if not exists actors_id_index
on actors (id);
create table if not exists id_address_map
(
id text not null
constraint id_address_map_actors_id_fk
references actors (id),
address text not null,
constraint id_address_map_pk
primary key (id, address)
);
create index if not exists id_address_map_address_index
on id_address_map (address);
create index if not exists id_address_map_id_index
on id_address_map (id);
create table if not exists messages
(
cid text not null
constraint messages_pk
primary key,
"from" text not null
constraint messages_id_address_map_from_fk
references id_address_map (address),
"to" text not null
constraint messages_id_address_map_to_fk
references id_address_map (address),
nonce int not null,
value text not null,
gasprice int not null,
gaslimit int not null,
method int,
params blob
);
create unique index if not exists messages_cid_uindex
on messages (cid);
create table if not exists blocks
(
cid text not null
constraint blocks_pk
primary key,
parentWeight numeric not null,
parentStateRoot text not null,
height int not null,
miner text not null
constraint blocks_id_address_map_miner_fk
references id_address_map (address),
timestamp int not null
);
create unique index if not exists block_cid_uindex
on blocks (cid);
create table if not exists blocks_synced
(
cid text not null
constraint blocks_synced_pk
primary key
constraint blocks_synced_blocks_cid_fk
references blocks
);
create unique index if not exists blocks_synced_cid_uindex
on blocks_synced (cid);
create table if not exists block_parents
(
block text not null
constraint block_parents_blocks_cid_fk
references blocks,
parent text not null
constraint block_parents_blocks_cid_fk_2
references blocks
);
create unique index if not exists block_parents_block_parent_uindex
on block_parents (block, parent);
create unique index if not exists blocks_cid_uindex
on blocks (cid);
create table if not exists block_messages
(
block text not null
constraint block_messages_blk_fk
references blocks (cid),
message text not null
constraint block_messages_msg_fk
references messages,
constraint block_messages_pk
primary key (block, message)
);
create table if not exists mpool_messages
(
msg text not null
constraint mpool_messages_pk
primary key
constraint mpool_messages_messages_cid_fk
references messages,
add_ts int not null
);
create unique index if not exists mpool_messages_msg_uindex
on mpool_messages (msg);
create table if not exists miner_heads
(
head text not null
constraint miner_heads_actors_head_fk
references actors (head),
addr text not null
constraint miner_heads_actors_id_fk
references actors (id),
stateroot text not null
constraint miner_heads_blocks_stateroot_fk
references blocks (parentStateRoot),
sectorset text not null,
provingset text not null,
owner text not null,
worker text not null,
peerid text not null,
sectorsize int not null,
power text not null,
active int,
ppe int not null,
slashed_at int not null,
constraint miner_heads_id_address_map_owner_fk
foreign key (owner) references id_address_map (address),
constraint miner_heads_id_address_map_worker_fk
foreign key (worker) references id_address_map (address),
constraint miner_heads_pk
primary key (head, addr)
);
`)
if err != nil {
return err
}
return tx.Commit()
}
func (st *storage) hasBlock(bh cid.Cid) bool {
var exitsts bool
err := st.db.QueryRow(`select exists (select 1 FROM blocks_synced where cid=?)`, bh.String()).Scan(&exitsts)
if err != nil {
log.Error(err)
return false
}
return exitsts
}
func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Cid) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`insert into actors (id, code, head, nonce, balance, stateroot) values (?, ?, ?, ?, ?, ?) on conflict do nothing`)
if err != nil {
return err
}
defer stmt.Close()
for addr, acts := range actors {
for act, st := range acts {
if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String(), st.String()); err != nil {
return err
}
}
}
return tx.Commit()
}
func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`insert into miner_heads (head, addr, stateroot, sectorset, provingset, owner, worker, peerid, sectorsize, power, active, ppe, slashed_at) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict do nothing`)
if err != nil {
return err
}
defer stmt.Close()
for k, i := range miners {
if _, err := stmt.Exec(
k.act.Head.String(),
k.addr.String(),
k.stateroot.String(),
i.state.Sectors.String(),
i.state.ProvingSet.String(),
i.info.Owner.String(),
i.info.Worker.String(),
i.info.PeerID.String(),
i.info.SectorSize,
i.state.Power.String(),
i.state.Active,
i.state.ProvingPeriodEnd,
i.state.SlashedAt,
); err != nil {
return err
}
}
return tx.Commit()
}
func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error {
st.headerLk.Lock()
defer st.headerLk.Unlock()
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`insert into blocks (cid, parentWeight, parentStateRoot, height, miner, "timestamp") values (?, ?, ?, ?, ?, ?) on conflict do nothing`)
if err != nil {
return err
}
defer stmt.Close()
for _, bh := range bhs {
if _, err := stmt.Exec(bh.Cid().String(), bh.ParentWeight.String(), bh.ParentStateRoot.String(), bh.Height, bh.Miner.String(), bh.Timestamp); err != nil {
return err
}
}
stmt2, err := tx.Prepare(`insert into block_parents (block, parent) values (?, ?) on conflict do nothing`)
if err != nil {
return err
}
defer stmt2.Close()
for _, bh := range bhs {
for _, parent := range bh.Parents {
if _, err := stmt2.Exec(bh.Cid().String(), parent.String()); err != nil {
return err
}
}
}
if sync {
stmt, err := tx.Prepare(`insert into blocks_synced (cid) values (?) on conflict do nothing`)
if err != nil {
return err
}
defer stmt.Close()
for _, bh := range bhs {
if _, err := stmt.Exec(bh.Cid().String()); err != nil {
return err
}
}
}
return tx.Commit()
}
func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`insert into messages (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict do nothing`)
if err != nil {
return err
}
defer stmt.Close()
for c, m := range msgs {
if _, err := stmt.Exec(
c.String(),
m.From.String(),
m.To.String(),
m.Nonce,
m.Value.String(),
m.GasPrice.String(),
m.GasLimit.String(),
m.Method,
m.Params,
); err != nil {
return err
}
}
return tx.Commit()
}
func (st *storage) storeAddressMap(addrs map[address.Address]address.Address) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`insert into id_address_map (id, address) VALUES (?, ?) on conflict do nothing`)
if err != nil {
return err
}
defer stmt.Close()
for a, i := range addrs {
if i == address.Undef {
continue
}
if _, err := stmt.Exec(
i.String(),
a.String(),
); err != nil {
return err
}
}
return tx.Commit()
}
func (st *storage) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`insert into block_messages (block, message) VALUES (?, ?) on conflict do nothing`)
if err != nil {
return err
}
defer stmt.Close()
for b, msgs := range incls {
for _, msg := range msgs {
if _, err := stmt.Exec(
b.String(),
msg.String(),
); err != nil {
return err
}
}
}
return tx.Commit()
}
func (st *storage) storeMpoolInclusion(msg cid.Cid) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`insert into mpool_messages (msg, add_ts) VALUES (?, ?) on conflict do nothing`)
if err != nil {
return err
}
defer stmt.Close()
if _, err := stmt.Exec(
msg.String(),
time.Now().Unix(),
); err != nil {
return err
}
return tx.Commit()
}
func (st *storage) close() error {
return st.db.Close()
}

View File

@ -0,0 +1,291 @@
package main
import (
"bytes"
"container/list"
"context"
actors2 "github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/address"
"sync"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
func runSyncer(ctx context.Context, api api.FullNode, st *storage) {
notifs, err := api.ChainNotify(ctx)
if err != nil {
panic(err)
}
go func() {
for notif := range notifs {
for _, change := range notif {
switch change.Type {
case store.HCCurrent:
fallthrough
case store.HCApply:
syncHead(ctx, api, st, change.Val)
case store.HCRevert:
log.Warnf("revert todo")
}
}
}
}()
}
type minerKey struct {
addr address.Address
act types.Actor
stateroot cid.Cid
}
type minerInfo struct {
state actors2.StorageMinerActorState
info actors2.MinerInfo
}
func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet) {
addresses := map[address.Address]address.Address{}
actors := map[address.Address]map[types.Actor]cid.Cid{}
var alk sync.Mutex
log.Infof("Getting headers / actors")
toSync := map[cid.Cid]*types.BlockHeader{}
toVisit := list.New()
for _, header := range ts.Blocks() {
toVisit.PushBack(header)
}
for toVisit.Len() > 0 {
bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader)
if _, seen := toSync[bh.Cid()]; seen || st.hasBlock(bh.Cid()) {
continue
}
toSync[bh.Cid()] = bh
addresses[bh.Miner] = address.Undef
if len(toSync)%500 == 10 {
log.Infof("todo: (%d) %s", len(toSync), bh.Cid())
}
if len(bh.Parents) == 0 {
continue
}
pts, err := api.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...))
if err != nil {
log.Error(err)
continue
}
for _, header := range pts.Blocks() {
toVisit.PushBack(header)
}
}
log.Infof("Syncing %d blocks", len(toSync))
log.Infof("Persisting actors")
paDone := 0
par(50, maparr(toSync), func(bh *types.BlockHeader) {
paDone++
if paDone%100 == 0 {
log.Infof("pa: %d %d%%", paDone, (paDone*100)/len(toSync))
}
if len(bh.Parents) == 0 { // genesis case
ts, err := types.NewTipSet([]*types.BlockHeader{bh})
aadrs, err := api.StateListActors(ctx, ts)
if err != nil {
log.Error(err)
return
}
par(50, aadrs, func(addr address.Address) {
act, err := api.StateGetActor(ctx, addr, ts)
if err != nil {
log.Error(err)
return
}
alk.Lock()
_, ok := actors[addr]
if !ok {
actors[addr] = map[types.Actor]cid.Cid{}
}
actors[addr][*act] = bh.ParentStateRoot
addresses[addr] = address.Undef
alk.Unlock()
})
return
}
pts, err := api.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...))
if err != nil {
log.Error(err)
return
}
changes, err := api.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot)
if err != nil {
log.Error(err)
return
}
for a, act := range changes {
addr, err := address.NewFromString(a)
if err != nil {
log.Error(err)
return
}
alk.Lock()
_, ok := actors[addr]
if !ok {
actors[addr] = map[types.Actor]cid.Cid{}
}
actors[addr][act] = bh.ParentStateRoot
addresses[addr] = address.Undef
alk.Unlock()
}
})
if err := st.storeActors(actors); err != nil {
log.Error(err)
return
}
log.Infof("Persisting miners")
miners := map[minerKey]*minerInfo{}
for addr, m := range actors {
for actor, c := range m {
if actor.Code != actors2.StorageMinerCodeCid {
continue
}
miners[minerKey{
addr: addr,
act: actor,
stateroot: c,
}] = &minerInfo{}
}
}
par(50, kvmaparr(miners), func(it func() (minerKey, *minerInfo)) {
k, info := it()
astb, err := api.ChainReadObj(ctx, k.act.Head)
if err != nil {
log.Error(err)
return
}
if err := info.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil {
log.Error(err)
return
}
ib, err := api.ChainReadObj(ctx, info.state.Info)
if err != nil {
log.Error(err)
return
}
if err := info.info.UnmarshalCBOR(bytes.NewReader(ib)); err != nil {
log.Error(err)
return
}
})
if err := st.storeMiners(miners); err != nil {
log.Error(err)
return
}
log.Infof("Persisting headers")
if err := st.storeHeaders(toSync, true); err != nil {
log.Error(err)
return
}
log.Infof("Getting messages")
msgs, incls := fetchMessages(ctx, api, toSync)
if err := st.storeMessages(msgs); err != nil {
log.Error(err)
return
}
if err := st.storeMsgInclusions(incls); err != nil {
log.Error(err)
return
}
log.Infof("Resolving addresses")
for _, message := range msgs {
addresses[message.To] = address.Undef
addresses[message.From] = address.Undef
}
par(50, kmaparr(addresses), func(addr address.Address) {
raddr, err := api.StateLookupID(ctx, addr, nil)
if err != nil {
log.Warn(err)
return
}
alk.Lock()
addresses[addr] = raddr
alk.Unlock()
})
if err := st.storeAddressMap(addresses); err != nil {
log.Error(err)
return
}
log.Infof("Sync done")
}
func fetchMessages(ctx context.Context, api api.FullNode, toSync map[cid.Cid]*types.BlockHeader) (map[cid.Cid]*types.Message, map[cid.Cid][]cid.Cid) {
var lk sync.Mutex
messages := map[cid.Cid]*types.Message{}
inclusions := map[cid.Cid][]cid.Cid{} // block -> msgs
par(50, maparr(toSync), func(header *types.BlockHeader) {
msgs, err := api.ChainGetBlockMessages(ctx, header.Cid())
if err != nil {
log.Error(err)
return
}
vmm := make([]*types.Message, 0, len(msgs.Cids))
for _, m := range msgs.BlsMessages {
vmm = append(vmm, m)
}
for _, m := range msgs.SecpkMessages {
vmm = append(vmm, &m.Message)
}
lk.Lock()
for _, message := range vmm {
messages[message.Cid()] = message
inclusions[header.Cid()] = append(inclusions[header.Cid()], message.Cid())
}
lk.Unlock()
})
return messages, inclusions
}

View File

@ -0,0 +1,253 @@
package main
import (
"fmt"
"html/template"
"net/http"
"os"
"path/filepath"
rice "github.com/GeertJohan/go.rice"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
)
type handler struct {
api api.FullNode
st *storage
site *rice.Box
assets http.Handler
templates map[string]*template.Template
}
func newHandler(api api.FullNode, st *storage) (*handler, error) {
h := &handler{
api: api,
st: st,
site: rice.MustFindBox("site"),
templates: map[string]*template.Template{},
}
h.assets = http.FileServer(h.site.HTTPBox())
funcs := template.FuncMap{
"count": h.count,
"countCol": h.countCol,
"sum": h.sum,
"netPower": h.netPower,
"queryNum": h.queryNum,
"sizeStr": sizeStr,
"strings": h.strings,
"messages": h.messages,
"param": func(string) string { return "" }, // replaced in request handler
}
base := template.New("")
base.Funcs(funcs)
return h, h.site.Walk("", func(path string, info os.FileInfo, err error) error {
if filepath.Ext(path) != ".html" {
return nil
}
if err != nil {
return err
}
log.Info(path)
h.templates["/"+path], err = base.New(path).Parse(h.site.MustString(path))
return err
})
}
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h, err := newHandler(h.api, h.st) // for faster dev
if err != nil {
log.Error(err)
return
}
p := r.URL.Path
if p == "/" {
p = "/index.html"
}
t, ok := h.templates[p]
if !ok {
h.assets.ServeHTTP(w, r)
return
}
t, err = t.Clone()
if err != nil {
log.Error(err)
return
}
t.Funcs(map[string]interface{}{
"param": r.FormValue,
})
if err := t.Execute(w, nil); err != nil {
log.Errorf("%+v", err)
return
}
log.Info(r.URL.Path)
}
// Template funcs
func (h *handler) count(table string, filters ...string) (int, error) {
// explicitly not caring about sql injection too much, this doesn't take user input
filts := ""
if len(filters) > 0 {
filts = " where "
for _, filter := range filters {
filts += filter + " and "
}
filts = filts[:len(filts)-5]
}
var c int
err := h.st.db.QueryRow("select count(1) from " + table + filts).Scan(&c)
if err != nil {
return 0, err
}
return c, nil
}
func (h *handler) countCol(table string, col string, filters ...string) (int, error) {
// explicitly not caring about sql injection too much, this doesn't take user input
filts := ""
if len(filters) > 0 {
filts = " where "
for _, filter := range filters {
filts += filter + " and "
}
filts = filts[:len(filts)-5]
}
var c int
err := h.st.db.QueryRow("select count(distinct " + col + ") from " + table + filts).Scan(&c)
if err != nil {
return 0, err
}
return c, nil
}
func (h *handler) sum(table string, col string) (types.BigInt, error) {
return h.queryNum("select sum(cast(" + col + " as bigint)) from " + table)
}
func (h *handler) netPower(slashFilt string) (types.BigInt, error) {
if slashFilt != "" {
slashFilt = " where " + slashFilt
}
return h.queryNum(`select sum(power) from (
select miner_heads.power, miner_heads.slashed_at, max(height) from miner_heads
inner join blocks b on miner_heads.stateroot = b.parentStateRoot
group by miner_heads.addr)` + slashFilt)
}
func (h *handler) queryNum(q string, p ...interface{}) (types.BigInt, error) {
// explicitly not caring about sql injection too much, this doesn't take user input
var c string
err := h.st.db.QueryRow(q, p...).Scan(&c)
if err != nil {
log.Error("qnum ", q, p, err)
return types.NewInt(0), err
}
i := types.NewInt(0)
_, ok := i.SetString(c, 10)
if !ok {
return types.NewInt(0), xerrors.New("num parse error: " + c)
}
return i, nil
}
var units = []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB"}
func sizeStr(size types.BigInt) string {
size = types.BigMul(size, types.NewInt(100))
i := 0
for types.BigCmp(size, types.NewInt(102400)) >= 0 && i < len(units)-1 {
size = types.BigDiv(size, types.NewInt(1024))
i++
}
return fmt.Sprintf("%s.%s %s", types.BigDiv(size, types.NewInt(100)), types.BigMod(size, types.NewInt(100)), units[i])
}
func (h *handler) strings(table string, col string, filter string, args ...interface{}) (out []string, err error) {
if len(filter) > 0 {
filter = " where " + filter
}
log.Info("strings qstr ", "select "+col+" from "+table+filter)
rws, err := h.st.db.Query("select "+col+" from "+table+filter, args...)
if err != nil {
return nil, err
}
for rws.Next() {
var r string
if err := rws.Scan(&r); err != nil {
return nil, err
}
out = append(out, r)
}
return
}
func (h *handler) messages(filter string, args ...interface{}) (out []types.Message, err error) {
if len(filter) > 0 {
filter = " where " + filter
}
rws, err := h.st.db.Query("select * from messages "+filter, args...)
if err != nil {
return nil, err
}
for rws.Next() {
var r types.Message
var cs string
if err := rws.Scan(
&cs,
&r.From,
&r.To,
&r.Nonce,
&r.Value,
&r.GasPrice,
&r.GasLimit,
&r.Method,
&r.Params,
); err != nil {
return nil, err
}
c, err := cid.Parse(cs)
if err != nil {
return nil, err
}
if c != r.Cid() {
log.Warn("msg cid doesn't match")
}
out = append(out, r)
}
return
}
var _ http.Handler = &handler{}

View File

@ -0,0 +1,85 @@
package main
import (
"reflect"
"sync"
)
func maparr(in interface{}) interface{} {
rin := reflect.ValueOf(in)
rout := reflect.MakeSlice(reflect.SliceOf(rin.Type().Elem()), rin.Len(), rin.Len())
var i int
it := rin.MapRange()
for it.Next() {
rout.Index(i).Set(it.Value())
i++
}
return rout.Interface()
}
func kmaparr(in interface{}) interface{} {
rin := reflect.ValueOf(in)
rout := reflect.MakeSlice(reflect.SliceOf(rin.Type().Key()), rin.Len(), rin.Len())
var i int
it := rin.MapRange()
for it.Next() {
rout.Index(i).Set(it.Key())
i++
}
return rout.Interface()
}
// map[k]v => []func() (k, v)
func kvmaparr(in interface{}) interface{} {
rin := reflect.ValueOf(in)
t := reflect.FuncOf([]reflect.Type{}, []reflect.Type{
rin.Type().Key(),
rin.Type().Elem(),
}, false)
rout := reflect.MakeSlice(reflect.SliceOf(t), rin.Len(), rin.Len())
var i int
it := rin.MapRange()
for it.Next() {
k := it.Key()
v := it.Value()
rout.Index(i).Set(reflect.MakeFunc(t, func(args []reflect.Value) (results []reflect.Value) {
return []reflect.Value{k, v}
}))
i++
}
return rout.Interface()
}
func par(concurrency int, arr interface{}, f interface{}) {
throttle := make(chan struct{}, concurrency)
var wg sync.WaitGroup
varr := reflect.ValueOf(arr)
l := varr.Len()
rf := reflect.ValueOf(f)
wg.Add(l)
for i := 0; i < l; i++ {
throttle <- struct{}{}
go func(i int) {
defer wg.Done()
defer func() {
<-throttle
}()
rf.Call([]reflect.Value{varr.Index(i)})
}(i)
}
wg.Wait()
}

1
go.mod
View File

@ -72,6 +72,7 @@ require (
github.com/libp2p/go-maddr-filter v0.0.5
github.com/mattn/go-isatty v0.0.9 // indirect
github.com/mattn/go-runewidth v0.0.4 // indirect
github.com/mattn/go-sqlite3 v1.12.0
github.com/miekg/dns v1.1.16 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1
github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771

2
go.sum
View File

@ -425,6 +425,8 @@ github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-sqlite3 v1.12.0 h1:u/x3mp++qUxvYfulZ4HKOvVO0JWhk7HtE8lWhbGz/Do=
github.com/mattn/go-sqlite3 v1.12.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/miekg/dns v1.1.12/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=

View File

@ -6,6 +6,7 @@ import (
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types"
@ -53,3 +54,7 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*t
func (a *MpoolAPI) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
return a.Mpool.GetNonce(addr)
}
func (a *MpoolAPI) MpoolSub(ctx context.Context) (<-chan api.MpoolUpdate, error) {
return a.Mpool.Updates(ctx)
}

View File

@ -154,6 +154,15 @@ func (a *StateAPI) StateGetActor(ctx context.Context, actor address.Address, ts
return state.GetActor(actor)
}
func (a *StateAPI) StateLookupID(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
state, err := a.stateForTs(ctx, ts)
if err != nil {
return address.Undef, err
}
return state.LookupID(addr)
}
func (a *StateAPI) StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*api.ActorState, error) {
state, err := a.stateForTs(ctx, ts)
if err != nil {
@ -294,3 +303,54 @@ func (a *StateAPI) StateMarketDeals(ctx context.Context, ts *types.TipSet) (map[
func (a *StateAPI) StateMarketStorageDeal(ctx context.Context, dealId uint64, ts *types.TipSet) (*actors.OnChainDeal, error) {
return stmgr.GetStorageDeal(ctx, a.StateManager, dealId, ts)
}
func (a *StateAPI) StateChangedActors(ctx context.Context, old cid.Cid, new cid.Cid) (map[string]types.Actor, error) {
cst := hamt.CSTFromBstore(a.Chain.Blockstore())
nh, err := hamt.LoadNode(ctx, cst, new)
if err != nil {
return nil, err
}
oh, err := hamt.LoadNode(ctx, cst, old)
if err != nil {
return nil, err
}
out := map[string]types.Actor{}
err = nh.ForEach(ctx, func(k string, nval interface{}) error {
ncval := nval.(*cbg.Deferred)
var act types.Actor
var ocval cbg.Deferred
switch err := oh.Find(ctx, k, &ocval); err {
case nil:
if bytes.Equal(ocval.Raw, ncval.Raw) {
return nil // not changed
}
fallthrough
case hamt.ErrNotFound:
if err := act.UnmarshalCBOR(bytes.NewReader(ncval.Raw)); err != nil {
return err
}
addr, err := address.NewFromBytes([]byte(k))
if err != nil {
return xerrors.Errorf("address in state tree was not valid: %w", err)
}
out[addr.String()] = act
default:
return err
}
return nil
})
if err != nil {
return nil, err
}
return out, nil
}

View File

@ -54,7 +54,7 @@ func (a *SyncAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) erro
}
if err := a.Syncer.ValidateMsgMeta(fb); err != nil {
xerrors.Errorf("provided messages did not match block: %w", err)
return xerrors.Errorf("provided messages did not match block: %w", err)
}
ts, err := types.NewTipSet([]*types.BlockHeader{blk.Header})
@ -73,3 +73,7 @@ func (a *SyncAPI) SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) erro
// TODO: anything else to do here?
return a.PubSub.Publish("/fil/blocks", b)
}
func (a *SyncAPI) SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) {
return a.Syncer.IncomingBlocks(ctx)
}