Merge pull request #6912 from filecoin-project/chore/drop-dead-utils
Drop townhall/chainwatch
This commit is contained in:
commit
ef94509eac
40
Makefile
40
Makefile
@ -163,31 +163,12 @@ lotus-pond-front:
|
||||
lotus-pond-app: lotus-pond-front lotus-pond
|
||||
.PHONY: lotus-pond-app
|
||||
|
||||
lotus-townhall:
|
||||
rm -f lotus-townhall
|
||||
$(GOCC) build -o lotus-townhall ./cmd/lotus-townhall
|
||||
.PHONY: lotus-townhall
|
||||
BINS+=lotus-townhall
|
||||
|
||||
lotus-townhall-front:
|
||||
(cd ./cmd/lotus-townhall/townhall && npm i && npm run build)
|
||||
.PHONY: lotus-townhall-front
|
||||
|
||||
lotus-townhall-app: lotus-touch lotus-townhall-front
|
||||
.PHONY: lotus-townhall-app
|
||||
|
||||
lotus-fountain:
|
||||
rm -f lotus-fountain
|
||||
$(GOCC) build -o lotus-fountain ./cmd/lotus-fountain
|
||||
.PHONY: lotus-fountain
|
||||
BINS+=lotus-fountain
|
||||
|
||||
lotus-chainwatch:
|
||||
rm -f lotus-chainwatch
|
||||
$(GOCC) build $(GOFLAGS) -o lotus-chainwatch ./cmd/lotus-chainwatch
|
||||
.PHONY: lotus-chainwatch
|
||||
BINS+=lotus-chainwatch
|
||||
|
||||
lotus-bench:
|
||||
rm -f lotus-bench
|
||||
$(GOCC) build -o lotus-bench ./cmd/lotus-bench
|
||||
@ -236,9 +217,6 @@ tvx:
|
||||
.PHONY: tvx
|
||||
BINS+=tvx
|
||||
|
||||
install-chainwatch: lotus-chainwatch
|
||||
install -C ./lotus-chainwatch /usr/local/bin/lotus-chainwatch
|
||||
|
||||
lotus-sim: $(BUILD_DEPS)
|
||||
rm -f lotus-sim
|
||||
$(GOCC) build $(GOFLAGS) -o lotus-sim ./cmd/lotus-sim
|
||||
@ -263,21 +241,13 @@ install-miner-service: install-miner install-daemon-service
|
||||
@echo
|
||||
@echo "lotus-miner service installed. Don't forget to run 'sudo systemctl start lotus-miner' to start it and 'sudo systemctl enable lotus-miner' for it to be enabled on startup."
|
||||
|
||||
install-chainwatch-service: install-chainwatch install-daemon-service
|
||||
mkdir -p /etc/systemd/system
|
||||
mkdir -p /var/log/lotus
|
||||
install -C -m 0644 ./scripts/lotus-chainwatch.service /etc/systemd/system/lotus-chainwatch.service
|
||||
systemctl daemon-reload
|
||||
@echo
|
||||
@echo "chainwatch service installed. Don't forget to run 'sudo systemctl start lotus-chainwatch' to start it and 'sudo systemctl enable lotus-chainwatch' for it to be enabled on startup."
|
||||
|
||||
install-main-services: install-miner-service
|
||||
|
||||
install-all-services: install-main-services install-chainwatch-service
|
||||
install-all-services: install-main-services
|
||||
|
||||
install-services: install-main-services
|
||||
|
||||
clean-daemon-service: clean-miner-service clean-chainwatch-service
|
||||
clean-daemon-service: clean-miner-service
|
||||
-systemctl stop lotus-daemon
|
||||
-systemctl disable lotus-daemon
|
||||
rm -f /etc/systemd/system/lotus-daemon.service
|
||||
@ -289,12 +259,6 @@ clean-miner-service:
|
||||
rm -f /etc/systemd/system/lotus-miner.service
|
||||
systemctl daemon-reload
|
||||
|
||||
clean-chainwatch-service:
|
||||
-systemctl stop lotus-chainwatch
|
||||
-systemctl disable lotus-chainwatch
|
||||
rm -f /etc/systemd/system/lotus-chainwatch.service
|
||||
systemctl daemon-reload
|
||||
|
||||
clean-main-services: clean-daemon-service
|
||||
|
||||
clean-all-services: clean-main-services
|
||||
|
@ -1,129 +0,0 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"go.uber.org/fx"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/node/impl/full"
|
||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||
)
|
||||
|
||||
var log = logging.Logger("metrics")
|
||||
|
||||
const baseTopic = "/fil/headnotifs/"
|
||||
|
||||
type Update struct {
|
||||
Type string
|
||||
}
|
||||
|
||||
func SendHeadNotifs(nickname string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, chain full.ChainAPI) error {
|
||||
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, chain full.ChainAPI) error {
|
||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStart: func(_ context.Context) error {
|
||||
gen, err := chain.Chain.GetGenesis()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
topic := baseTopic + gen.Cid().String()
|
||||
|
||||
go func() {
|
||||
if err := sendHeadNotifs(ctx, ps, topic, chain, nickname); err != nil {
|
||||
log.Error("consensus metrics error", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
sub, err := ps.Subscribe(topic) //nolint
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer sub.Cancel()
|
||||
|
||||
for {
|
||||
if _, err := sub.Next(ctx); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
return nil
|
||||
},
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
type message struct {
|
||||
// TipSet
|
||||
Cids []cid.Cid
|
||||
Blocks []*types.BlockHeader
|
||||
Height abi.ChainEpoch
|
||||
Weight types.BigInt
|
||||
Time uint64
|
||||
Nonce uint64
|
||||
|
||||
// Meta
|
||||
|
||||
NodeName string
|
||||
}
|
||||
|
||||
func sendHeadNotifs(ctx context.Context, ps *pubsub.PubSub, topic string, chain full.ChainAPI, nickname string) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
notifs, err := chain.ChainNotify(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// using unix nano time makes very sure we pick a nonce higher than previous restart
|
||||
nonce := uint64(build.Clock.Now().UnixNano())
|
||||
|
||||
for {
|
||||
select {
|
||||
case notif := <-notifs:
|
||||
n := notif[len(notif)-1]
|
||||
|
||||
w, err := chain.ChainTipSetWeight(ctx, n.Val.Key())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m := message{
|
||||
Cids: n.Val.Cids(),
|
||||
Blocks: n.Val.Blocks(),
|
||||
Height: n.Val.Height(),
|
||||
Weight: w,
|
||||
NodeName: nickname,
|
||||
Time: uint64(build.Clock.Now().UnixNano() / 1000_000),
|
||||
Nonce: nonce,
|
||||
}
|
||||
|
||||
b, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//nolint
|
||||
if err := ps.Publish(topic, b); err != nil {
|
||||
return err
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
|
||||
nonce++
|
||||
}
|
||||
}
|
@ -1,131 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"strconv"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/urfave/cli/v2"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
var dotCmd = &cli.Command{
|
||||
Name: "dot",
|
||||
Usage: "generate dot graphs",
|
||||
ArgsUsage: "<minHeight> <toseeHeight>",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
ll := cctx.String("log-level")
|
||||
if err := logging.SetLogLevel("*", ll); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
db, err := sql.Open("postgres", cctx.String("db"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err := db.Close(); err != nil {
|
||||
log.Errorw("Failed to close database", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := db.Ping(); err != nil {
|
||||
return xerrors.Errorf("Database failed to respond to ping (is it online?): %w", err)
|
||||
}
|
||||
|
||||
minH, err := strconv.ParseInt(cctx.Args().Get(0), 10, 32)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tosee, err := strconv.ParseInt(cctx.Args().Get(1), 10, 32)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
maxH := minH + tosee
|
||||
|
||||
res, err := db.Query(`select block, parent, b.miner, b.height, p.height from block_parents
|
||||
inner join blocks b on block_parents.block = b.cid
|
||||
inner join blocks p on block_parents.parent = p.cid
|
||||
where b.height > $1 and b.height < $2`, minH, maxH)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Println("digraph D {")
|
||||
|
||||
hl, err := syncedBlocks(db)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
for res.Next() {
|
||||
var block, parent, miner string
|
||||
var height, ph uint64
|
||||
if err := res.Scan(&block, &parent, &miner, &height, &ph); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bc, err := cid.Parse(block)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, has := hl[bc]
|
||||
|
||||
col := crc32.Checksum([]byte(miner), crc32.MakeTable(crc32.Castagnoli))&0xc0c0c0c0 + 0x30303030
|
||||
|
||||
hasstr := ""
|
||||
if !has {
|
||||
//col = 0xffffffff
|
||||
hasstr = " UNSYNCED"
|
||||
}
|
||||
|
||||
nulls := height - ph - 1
|
||||
for i := uint64(0); i < nulls; i++ {
|
||||
name := block + "NP" + fmt.Sprint(i)
|
||||
|
||||
fmt.Printf("%s [label = \"NULL:%d\", fillcolor = \"#ffddff\", style=filled, forcelabels=true]\n%s -> %s\n",
|
||||
name, height-nulls+i, name, parent)
|
||||
|
||||
parent = name
|
||||
}
|
||||
|
||||
fmt.Printf("%s [label = \"%s:%d%s\", fillcolor = \"#%06x\", style=filled, forcelabels=true]\n%s -> %s\n", block, miner, height, hasstr, col, block, parent)
|
||||
}
|
||||
if res.Err() != nil {
|
||||
return res.Err()
|
||||
}
|
||||
|
||||
fmt.Println("}")
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
func syncedBlocks(db *sql.DB) (map[cid.Cid]struct{}, error) {
|
||||
// timestamp is used to return a configurable amount of rows based on when they were last added.
|
||||
rws, err := db.Query(`select cid FROM blocks_synced`)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("Failed to query blocks_synced: %w", err)
|
||||
}
|
||||
out := map[cid.Cid]struct{}{}
|
||||
|
||||
for rws.Next() {
|
||||
var c string
|
||||
if err := rws.Scan(&c); err != nil {
|
||||
return nil, xerrors.Errorf("Failed to scan blocks_synced: %w", err)
|
||||
}
|
||||
|
||||
ci, err := cid.Parse(c)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("Failed to parse blocks_synced: %w", err)
|
||||
}
|
||||
|
||||
out[ci] = struct{}{}
|
||||
}
|
||||
return out, nil
|
||||
}
|
@ -1,54 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
var log = logging.Logger("chainwatch")
|
||||
|
||||
func main() {
|
||||
if err := logging.SetLogLevel("*", "info"); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
log.Info("Starting chainwatch", " v", build.UserVersion())
|
||||
|
||||
app := &cli.App{
|
||||
Name: "lotus-chainwatch",
|
||||
Usage: "Devnet token distribution utility",
|
||||
Version: build.UserVersion(),
|
||||
Flags: []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "repo",
|
||||
EnvVars: []string{"LOTUS_PATH"},
|
||||
Value: "~/.lotus", // TODO: Consider XDG_DATA_HOME
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "api",
|
||||
EnvVars: []string{"FULLNODE_API_INFO"},
|
||||
Value: "",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "db",
|
||||
EnvVars: []string{"LOTUS_DB"},
|
||||
Value: "",
|
||||
},
|
||||
&cli.StringFlag{
|
||||
Name: "log-level",
|
||||
EnvVars: []string{"GOLOG_LOG_LEVEL"},
|
||||
Value: "info",
|
||||
},
|
||||
},
|
||||
Commands: []*cli.Command{
|
||||
dotCmd,
|
||||
runCmd,
|
||||
},
|
||||
}
|
||||
|
||||
if err := app.Run(os.Args); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
@ -1,299 +0,0 @@
|
||||
package processor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/ipfs/go-cid"
|
||||
|
||||
builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin"
|
||||
_init "github.com/filecoin-project/lotus/chain/actors/builtin/init"
|
||||
"github.com/filecoin-project/lotus/chain/events/state"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util"
|
||||
)
|
||||
|
||||
func (p *Processor) setupCommonActors() error {
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`
|
||||
create table if not exists id_address_map
|
||||
(
|
||||
id text not null,
|
||||
address text not null,
|
||||
constraint id_address_map_pk
|
||||
primary key (id, address)
|
||||
);
|
||||
|
||||
create unique index if not exists id_address_map_id_uindex
|
||||
on id_address_map (id);
|
||||
|
||||
create unique index if not exists id_address_map_address_uindex
|
||||
on id_address_map (address);
|
||||
|
||||
create table if not exists actors
|
||||
(
|
||||
id text not null
|
||||
constraint id_address_map_actors_id_fk
|
||||
references id_address_map (id),
|
||||
code text not null,
|
||||
head text not null,
|
||||
nonce int not null,
|
||||
balance text not null,
|
||||
stateroot text
|
||||
);
|
||||
|
||||
create index if not exists actors_id_index
|
||||
on actors (id);
|
||||
|
||||
create index if not exists id_address_map_address_index
|
||||
on id_address_map (address);
|
||||
|
||||
create index if not exists id_address_map_id_index
|
||||
on id_address_map (id);
|
||||
|
||||
create or replace function actor_tips(epoch bigint)
|
||||
returns table (id text,
|
||||
code text,
|
||||
head text,
|
||||
nonce int,
|
||||
balance text,
|
||||
stateroot text,
|
||||
height bigint,
|
||||
parentstateroot text) as
|
||||
$body$
|
||||
select distinct on (id) * from actors
|
||||
inner join state_heights sh on sh.parentstateroot = stateroot
|
||||
where height < $1
|
||||
order by id, height desc;
|
||||
$body$ language sql;
|
||||
|
||||
create table if not exists actor_states
|
||||
(
|
||||
head text not null,
|
||||
code text not null,
|
||||
state json not null
|
||||
);
|
||||
|
||||
create unique index if not exists actor_states_head_code_uindex
|
||||
on actor_states (head, code);
|
||||
|
||||
create index if not exists actor_states_head_index
|
||||
on actor_states (head);
|
||||
|
||||
create index if not exists actor_states_code_head_index
|
||||
on actor_states (head, code);
|
||||
|
||||
`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (p *Processor) HandleCommonActorsChanges(ctx context.Context, actors map[cid.Cid]ActorTips) error {
|
||||
if err := p.storeActorAddresses(ctx, actors); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
grp, _ := errgroup.WithContext(ctx)
|
||||
|
||||
grp.Go(func() error {
|
||||
if err := p.storeActorHeads(actors); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
grp.Go(func() error {
|
||||
if err := p.storeActorStates(actors); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return grp.Wait()
|
||||
}
|
||||
|
||||
type UpdateAddresses struct {
|
||||
Old state.AddressPair
|
||||
New state.AddressPair
|
||||
}
|
||||
|
||||
func (p Processor) storeActorAddresses(ctx context.Context, actors map[cid.Cid]ActorTips) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
log.Debugw("Stored Actor Addresses", "duration", time.Since(start).String())
|
||||
}()
|
||||
|
||||
addressToID := map[address.Address]address.Address{}
|
||||
// HACK until genesis storage is figured out:
|
||||
addressToID[builtin2.SystemActorAddr] = builtin2.SystemActorAddr
|
||||
addressToID[builtin2.InitActorAddr] = builtin2.InitActorAddr
|
||||
addressToID[builtin2.RewardActorAddr] = builtin2.RewardActorAddr
|
||||
addressToID[builtin2.CronActorAddr] = builtin2.CronActorAddr
|
||||
addressToID[builtin2.StoragePowerActorAddr] = builtin2.StoragePowerActorAddr
|
||||
addressToID[builtin2.StorageMarketActorAddr] = builtin2.StorageMarketActorAddr
|
||||
addressToID[builtin2.VerifiedRegistryActorAddr] = builtin2.VerifiedRegistryActorAddr
|
||||
addressToID[builtin2.BurntFundsActorAddr] = builtin2.BurntFundsActorAddr
|
||||
initActor, err := p.node.StateGetActor(ctx, builtin2.InitActorAddr, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
initActorState, err := _init.Load(cw_util.NewAPIIpldStore(ctx, p.node), initActor)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// gross..
|
||||
if err := initActorState.ForEachActor(func(id abi.ActorID, addr address.Address) error {
|
||||
idAddr, err := address.NewIDAddress(uint64(id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
addressToID[addr] = idAddr
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`
|
||||
create temp table iam (like id_address_map excluding constraints) on commit drop;
|
||||
`); err != nil {
|
||||
return xerrors.Errorf("prep temp: %w", err)
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(`copy iam (id, address) from STDIN `)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for a, i := range addressToID {
|
||||
if i == address.Undef {
|
||||
continue
|
||||
}
|
||||
if _, err := stmt.Exec(
|
||||
i.String(),
|
||||
a.String(),
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// HACK until chain watch can handle reorgs we need to update this table when ID -> PubKey mappings change
|
||||
if _, err := tx.Exec(`insert into id_address_map select * from iam on conflict (id) do update set address = EXCLUDED.address`); err != nil {
|
||||
log.Warnw("Failed to update id_address_map table, this is a known issue")
|
||||
return nil
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (p *Processor) storeActorHeads(actors map[cid.Cid]ActorTips) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
log.Debugw("Stored Actor Heads", "duration", time.Since(start).String())
|
||||
}()
|
||||
// Basic
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.Exec(`
|
||||
create temp table a_tmp (like actors excluding constraints) on commit drop;
|
||||
`); err != nil {
|
||||
return xerrors.Errorf("prep temp: %w", err)
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(`copy a_tmp (id, code, head, nonce, balance, stateroot) from stdin `)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for code, actTips := range actors {
|
||||
actorName := code.String()
|
||||
if builtin.IsBuiltinActor(code) {
|
||||
actorName = builtin.ActorNameByCode(code)
|
||||
}
|
||||
for _, actorInfo := range actTips {
|
||||
for _, a := range actorInfo {
|
||||
if _, err := stmt.Exec(a.addr.String(), actorName, a.act.Head.String(), a.act.Nonce, a.act.Balance.String(), a.stateroot.String()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`insert into actors select * from a_tmp on conflict do nothing `); err != nil {
|
||||
return xerrors.Errorf("actor put: %w", err)
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (p *Processor) storeActorStates(actors map[cid.Cid]ActorTips) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
log.Debugw("Stored Actor States", "duration", time.Since(start).String())
|
||||
}()
|
||||
// States
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.Exec(`
|
||||
create temp table as_tmp (like actor_states excluding constraints) on commit drop;
|
||||
`); err != nil {
|
||||
return xerrors.Errorf("prep temp: %w", err)
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(`copy as_tmp (head, code, state) from stdin `)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for code, actTips := range actors {
|
||||
actorName := code.String()
|
||||
if builtin.IsBuiltinActor(code) {
|
||||
actorName = builtin.ActorNameByCode(code)
|
||||
}
|
||||
for _, actorInfo := range actTips {
|
||||
for _, a := range actorInfo {
|
||||
if _, err := stmt.Exec(a.act.Head.String(), actorName, a.state); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`insert into actor_states select * from as_tmp on conflict do nothing `); err != nil {
|
||||
return xerrors.Errorf("actor put: %w", err)
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
@ -1,316 +0,0 @@
|
||||
package processor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||
"github.com/filecoin-project/lotus/chain/events/state"
|
||||
)
|
||||
|
||||
func (p *Processor) setupMarket() error {
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`
|
||||
create table if not exists market_deal_proposals
|
||||
(
|
||||
deal_id bigint not null,
|
||||
|
||||
state_root text not null,
|
||||
|
||||
piece_cid text not null,
|
||||
padded_piece_size bigint not null,
|
||||
unpadded_piece_size bigint not null,
|
||||
is_verified bool not null,
|
||||
|
||||
client_id text not null,
|
||||
provider_id text not null,
|
||||
|
||||
start_epoch bigint not null,
|
||||
end_epoch bigint not null,
|
||||
slashed_epoch bigint,
|
||||
storage_price_per_epoch text not null,
|
||||
|
||||
provider_collateral text not null,
|
||||
client_collateral text not null,
|
||||
|
||||
constraint market_deal_proposal_pk
|
||||
primary key (deal_id)
|
||||
);
|
||||
|
||||
create table if not exists market_deal_states
|
||||
(
|
||||
deal_id bigint not null,
|
||||
|
||||
sector_start_epoch bigint not null,
|
||||
last_update_epoch bigint not null,
|
||||
slash_epoch bigint not null,
|
||||
|
||||
state_root text not null,
|
||||
|
||||
unique (deal_id, sector_start_epoch, last_update_epoch, slash_epoch),
|
||||
|
||||
constraint market_deal_states_pk
|
||||
primary key (deal_id, state_root)
|
||||
|
||||
);
|
||||
|
||||
create table if not exists minerid_dealid_sectorid
|
||||
(
|
||||
deal_id bigint not null
|
||||
constraint sectors_sector_ids_id_fk
|
||||
references market_deal_proposals(deal_id),
|
||||
|
||||
sector_id bigint not null,
|
||||
miner_id text not null,
|
||||
foreign key (sector_id, miner_id) references sector_precommit_info(sector_id, miner_id),
|
||||
|
||||
constraint miner_sector_deal_ids_pk
|
||||
primary key (miner_id, sector_id, deal_id)
|
||||
);
|
||||
|
||||
`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
type marketActorInfo struct {
|
||||
common actorInfo
|
||||
}
|
||||
|
||||
func (p *Processor) HandleMarketChanges(ctx context.Context, marketTips ActorTips) error {
|
||||
marketChanges, err := p.processMarket(ctx, marketTips)
|
||||
if err != nil {
|
||||
log.Fatalw("Failed to process market actors", "error", err)
|
||||
}
|
||||
|
||||
if err := p.persistMarket(ctx, marketChanges); err != nil {
|
||||
log.Fatalw("Failed to persist market actors", "error", err)
|
||||
}
|
||||
|
||||
if err := p.updateMarket(ctx, marketChanges); err != nil {
|
||||
log.Fatalw("Failed to update market actors", "error", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Processor) processMarket(ctx context.Context, marketTips ActorTips) ([]marketActorInfo, error) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
log.Debugw("Processed Market", "duration", time.Since(start).String())
|
||||
}()
|
||||
|
||||
var out []marketActorInfo
|
||||
for _, markets := range marketTips {
|
||||
for _, mt := range markets {
|
||||
// NB: here is where we can extract the market state when we need it.
|
||||
out = append(out, marketActorInfo{common: mt})
|
||||
}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (p *Processor) persistMarket(ctx context.Context, info []marketActorInfo) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
log.Debugw("Persisted Market", "duration", time.Since(start).String())
|
||||
}()
|
||||
|
||||
grp, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
grp.Go(func() error {
|
||||
if err := p.storeMarketActorDealProposals(ctx, info); err != nil {
|
||||
return xerrors.Errorf("Failed to store marker deal proposals: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
grp.Go(func() error {
|
||||
if err := p.storeMarketActorDealStates(info); err != nil {
|
||||
return xerrors.Errorf("Failed to store marker deal states: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return grp.Wait()
|
||||
|
||||
}
|
||||
|
||||
func (p *Processor) updateMarket(ctx context.Context, info []marketActorInfo) error {
|
||||
if err := p.updateMarketActorDealProposals(ctx, info); err != nil {
|
||||
return xerrors.Errorf("Failed to update market info: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Processor) storeMarketActorDealStates(marketTips []marketActorInfo) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
log.Debugw("Stored Market Deal States", "duration", time.Since(start).String())
|
||||
}()
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.Exec(`create temp table mds (like market_deal_states excluding constraints) on commit drop;`); err != nil {
|
||||
return err
|
||||
}
|
||||
stmt, err := tx.Prepare(`copy mds (deal_id, sector_start_epoch, last_update_epoch, slash_epoch, state_root) from STDIN`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, mt := range marketTips {
|
||||
dealStates, err := p.node.StateMarketDeals(context.TODO(), mt.common.tsKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for dealID, ds := range dealStates {
|
||||
id, err := strconv.ParseUint(dealID, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := stmt.Exec(
|
||||
id,
|
||||
ds.State.SectorStartEpoch,
|
||||
ds.State.LastUpdatedEpoch,
|
||||
ds.State.SlashEpoch,
|
||||
mt.common.stateroot.String(),
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`insert into market_deal_states select * from mds on conflict do nothing`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (p *Processor) storeMarketActorDealProposals(ctx context.Context, marketTips []marketActorInfo) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
log.Debugw("Stored Market Deal Proposals", "duration", time.Since(start).String())
|
||||
}()
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`create temp table mdp (like market_deal_proposals excluding constraints) on commit drop;`); err != nil {
|
||||
return xerrors.Errorf("prep temp: %w", err)
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(`copy mdp (deal_id, state_root, piece_cid, padded_piece_size, unpadded_piece_size, is_verified, client_id, provider_id, start_epoch, end_epoch, slashed_epoch, storage_price_per_epoch, provider_collateral, client_collateral) from STDIN`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// insert in sorted order (lowest height -> highest height) since dealid is pk of table.
|
||||
for _, mt := range marketTips {
|
||||
dealStates, err := p.node.StateMarketDeals(ctx, mt.common.tsKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for dealID, ds := range dealStates {
|
||||
id, err := strconv.ParseUint(dealID, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := stmt.Exec(
|
||||
id,
|
||||
mt.common.stateroot.String(),
|
||||
ds.Proposal.PieceCID.String(),
|
||||
ds.Proposal.PieceSize,
|
||||
ds.Proposal.PieceSize.Unpadded(),
|
||||
ds.Proposal.VerifiedDeal,
|
||||
ds.Proposal.Client.String(),
|
||||
ds.Proposal.Provider.String(),
|
||||
ds.Proposal.StartEpoch,
|
||||
ds.Proposal.EndEpoch,
|
||||
nil, // slashed_epoch
|
||||
ds.Proposal.StoragePricePerEpoch.String(),
|
||||
ds.Proposal.ProviderCollateral.String(),
|
||||
ds.Proposal.ClientCollateral.String(),
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.Exec(`insert into market_deal_proposals select * from mdp on conflict do nothing`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
|
||||
}
|
||||
|
||||
func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTip []marketActorInfo) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
log.Debugw("Updated Market Deal Proposals", "duration", time.Since(start).String())
|
||||
}()
|
||||
pred := state.NewStatePredicates(p.node)
|
||||
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(`update market_deal_proposals set slashed_epoch=$1 where deal_id=$2`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, mt := range marketTip {
|
||||
stateDiff := pred.OnStorageMarketActorChanged(pred.OnDealStateChanged(pred.OnDealStateAmtChanged()))
|
||||
|
||||
changed, val, err := stateDiff(ctx, mt.common.parentTsKey, mt.common.tsKey)
|
||||
if err != nil {
|
||||
log.Warnw("error getting market deal state diff", "error", err)
|
||||
}
|
||||
if !changed {
|
||||
continue
|
||||
}
|
||||
changes, ok := val.(*market.DealStateChanges)
|
||||
if !ok {
|
||||
return xerrors.Errorf("Unknown type returned by Deal State AMT predicate: %T", val)
|
||||
}
|
||||
|
||||
for _, modified := range changes.Modified {
|
||||
if modified.From.SlashEpoch != modified.To.SlashEpoch {
|
||||
if _, err := stmt.Exec(modified.To.SlashEpoch, modified.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
@ -1,318 +0,0 @@
|
||||
package processor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/parmap"
|
||||
)
|
||||
|
||||
func (p *Processor) setupMessages() error {
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`
|
||||
create table if not exists messages
|
||||
(
|
||||
cid text not null
|
||||
constraint messages_pk
|
||||
primary key,
|
||||
"from" text not null,
|
||||
"to" text not null,
|
||||
size_bytes bigint not null,
|
||||
nonce bigint not null,
|
||||
value text not null,
|
||||
gas_fee_cap text not null,
|
||||
gas_premium text not null,
|
||||
gas_limit bigint not null,
|
||||
method bigint,
|
||||
params bytea
|
||||
);
|
||||
|
||||
create unique index if not exists messages_cid_uindex
|
||||
on messages (cid);
|
||||
|
||||
create index if not exists messages_from_index
|
||||
on messages ("from");
|
||||
|
||||
create index if not exists messages_to_index
|
||||
on messages ("to");
|
||||
|
||||
create table if not exists block_messages
|
||||
(
|
||||
block text not null
|
||||
constraint blocks_block_cids_cid_fk
|
||||
references block_cids (cid),
|
||||
message text not null,
|
||||
constraint block_messages_pk
|
||||
primary key (block, message)
|
||||
);
|
||||
|
||||
create table if not exists mpool_messages
|
||||
(
|
||||
msg text not null
|
||||
constraint mpool_messages_pk
|
||||
primary key
|
||||
constraint mpool_messages_messages_cid_fk
|
||||
references messages,
|
||||
add_ts int not null
|
||||
);
|
||||
|
||||
create unique index if not exists mpool_messages_msg_uindex
|
||||
on mpool_messages (msg);
|
||||
|
||||
create table if not exists receipts
|
||||
(
|
||||
msg text not null,
|
||||
state text not null,
|
||||
idx int not null,
|
||||
exit int not null,
|
||||
gas_used bigint not null,
|
||||
return bytea,
|
||||
constraint receipts_pk
|
||||
primary key (msg, state)
|
||||
);
|
||||
|
||||
create index if not exists receipts_msg_state_index
|
||||
on receipts (msg, state);
|
||||
`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (p *Processor) HandleMessageChanges(ctx context.Context, blocks map[cid.Cid]*types.BlockHeader) error {
|
||||
if err := p.persistMessagesAndReceipts(ctx, blocks); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Processor) persistMessagesAndReceipts(ctx context.Context, blocks map[cid.Cid]*types.BlockHeader) error {
|
||||
messages, inclusions := p.fetchMessages(ctx, blocks)
|
||||
receipts := p.fetchParentReceipts(ctx, blocks)
|
||||
|
||||
grp, _ := errgroup.WithContext(ctx)
|
||||
|
||||
grp.Go(func() error {
|
||||
return p.storeMessages(messages)
|
||||
})
|
||||
|
||||
grp.Go(func() error {
|
||||
return p.storeMsgInclusions(inclusions)
|
||||
})
|
||||
|
||||
grp.Go(func() error {
|
||||
return p.storeReceipts(receipts)
|
||||
})
|
||||
|
||||
return grp.Wait()
|
||||
}
|
||||
|
||||
func (p *Processor) storeReceipts(recs map[mrec]*types.MessageReceipt) error {
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`
|
||||
create temp table recs (like receipts excluding constraints) on commit drop;
|
||||
`); err != nil {
|
||||
return xerrors.Errorf("prep temp: %w", err)
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(`copy recs (msg, state, idx, exit, gas_used, return) from stdin `)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for c, m := range recs {
|
||||
if _, err := stmt.Exec(
|
||||
c.msg.String(),
|
||||
c.state.String(),
|
||||
c.idx,
|
||||
m.ExitCode,
|
||||
m.GasUsed,
|
||||
m.Return,
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`insert into receipts select * from recs on conflict do nothing `); err != nil {
|
||||
return xerrors.Errorf("actor put: %w", err)
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (p *Processor) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error {
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`
|
||||
create temp table mi (like block_messages excluding constraints) on commit drop;
|
||||
`); err != nil {
|
||||
return xerrors.Errorf("prep temp: %w", err)
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(`copy mi (block, message) from STDIN `)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for b, msgs := range incls {
|
||||
for _, msg := range msgs {
|
||||
if _, err := stmt.Exec(
|
||||
b.String(),
|
||||
msg.String(),
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`insert into block_messages select * from mi on conflict do nothing `); err != nil {
|
||||
return xerrors.Errorf("actor put: %w", err)
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (p *Processor) storeMessages(msgs map[cid.Cid]*types.Message) error {
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`
|
||||
create temp table msgs (like messages excluding constraints) on commit drop;
|
||||
`); err != nil {
|
||||
return xerrors.Errorf("prep temp: %w", err)
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(`copy msgs (cid, "from", "to", size_bytes, nonce, "value", gas_premium, gas_fee_cap, gas_limit, method, params) from stdin `)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for c, m := range msgs {
|
||||
var msgBytes int
|
||||
if b, err := m.Serialize(); err == nil {
|
||||
msgBytes = len(b)
|
||||
}
|
||||
|
||||
if _, err := stmt.Exec(
|
||||
c.String(),
|
||||
m.From.String(),
|
||||
m.To.String(),
|
||||
msgBytes,
|
||||
m.Nonce,
|
||||
m.Value.String(),
|
||||
m.GasPremium.String(),
|
||||
m.GasFeeCap.String(),
|
||||
m.GasLimit,
|
||||
m.Method,
|
||||
m.Params,
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`insert into messages select * from msgs on conflict do nothing `); err != nil {
|
||||
return xerrors.Errorf("actor put: %w", err)
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (p *Processor) fetchMessages(ctx context.Context, blocks map[cid.Cid]*types.BlockHeader) (map[cid.Cid]*types.Message, map[cid.Cid][]cid.Cid) {
|
||||
var lk sync.Mutex
|
||||
messages := map[cid.Cid]*types.Message{}
|
||||
inclusions := map[cid.Cid][]cid.Cid{} // block -> msgs
|
||||
|
||||
parmap.Par(50, parmap.MapArr(blocks), func(header *types.BlockHeader) {
|
||||
msgs, err := p.node.ChainGetBlockMessages(ctx, header.Cid())
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
log.Debugw("ChainGetBlockMessages", "header_cid", header.Cid())
|
||||
return
|
||||
}
|
||||
|
||||
vmm := make([]*types.Message, 0, len(msgs.Cids))
|
||||
for _, m := range msgs.BlsMessages {
|
||||
vmm = append(vmm, m)
|
||||
}
|
||||
|
||||
for _, m := range msgs.SecpkMessages {
|
||||
vmm = append(vmm, &m.Message)
|
||||
}
|
||||
|
||||
lk.Lock()
|
||||
for _, message := range vmm {
|
||||
messages[message.Cid()] = message
|
||||
inclusions[header.Cid()] = append(inclusions[header.Cid()], message.Cid())
|
||||
}
|
||||
lk.Unlock()
|
||||
})
|
||||
|
||||
return messages, inclusions
|
||||
}
|
||||
|
||||
type mrec struct {
|
||||
msg cid.Cid
|
||||
state cid.Cid
|
||||
idx int
|
||||
}
|
||||
|
||||
func (p *Processor) fetchParentReceipts(ctx context.Context, toSync map[cid.Cid]*types.BlockHeader) map[mrec]*types.MessageReceipt {
|
||||
var lk sync.Mutex
|
||||
out := map[mrec]*types.MessageReceipt{}
|
||||
|
||||
parmap.Par(50, parmap.MapArr(toSync), func(header *types.BlockHeader) {
|
||||
recs, err := p.node.ChainGetParentReceipts(ctx, header.Cid())
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
log.Debugw("ChainGetParentReceipts", "header_cid", header.Cid())
|
||||
return
|
||||
}
|
||||
msgs, err := p.node.ChainGetParentMessages(ctx, header.Cid())
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
log.Debugw("ChainGetParentMessages", "header_cid", header.Cid())
|
||||
return
|
||||
}
|
||||
|
||||
lk.Lock()
|
||||
for i, r := range recs {
|
||||
out[mrec{
|
||||
msg: msgs[i].Cid,
|
||||
state: header.ParentStateRoot,
|
||||
idx: i,
|
||||
}] = r
|
||||
}
|
||||
lk.Unlock()
|
||||
})
|
||||
|
||||
return out
|
||||
}
|
File diff suppressed because it is too large
Load Diff
@ -1,100 +0,0 @@
|
||||
package processor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
func (p *Processor) subMpool(ctx context.Context) {
|
||||
sub, err := p.node.MpoolSub(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
var updates []api.MpoolUpdate
|
||||
|
||||
select {
|
||||
case update := <-sub:
|
||||
updates = append(updates, update)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case update := <-sub:
|
||||
updates = append(updates, update)
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
break loop
|
||||
}
|
||||
}
|
||||
|
||||
msgs := map[cid.Cid]*types.Message{}
|
||||
for _, v := range updates {
|
||||
if v.Type != api.MpoolAdd {
|
||||
continue
|
||||
}
|
||||
|
||||
msgs[v.Message.Message.Cid()] = &v.Message.Message
|
||||
}
|
||||
|
||||
err := p.storeMessages(msgs)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
|
||||
if err := p.storeMpoolInclusions(updates); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Processor) storeMpoolInclusions(msgs []api.MpoolUpdate) error {
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`
|
||||
create temp table mi (like mpool_messages excluding constraints) on commit drop;
|
||||
`); err != nil {
|
||||
return xerrors.Errorf("prep temp: %w", err)
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(`copy mi (msg, add_ts) from stdin `)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, msg := range msgs {
|
||||
if msg.Type != api.MpoolAdd {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err := stmt.Exec(
|
||||
msg.Message.Message.Cid().String(),
|
||||
time.Now().Unix(),
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`insert into mpool_messages select * from mi on conflict do nothing `); err != nil {
|
||||
return xerrors.Errorf("actor put: %w", err)
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
@ -1,190 +0,0 @@
|
||||
package processor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin"
|
||||
)
|
||||
|
||||
type powerActorInfo struct {
|
||||
common actorInfo
|
||||
|
||||
totalRawBytes big.Int
|
||||
totalRawBytesCommitted big.Int
|
||||
totalQualityAdjustedBytes big.Int
|
||||
totalQualityAdjustedBytesCommitted big.Int
|
||||
totalPledgeCollateral big.Int
|
||||
|
||||
qaPowerSmoothed builtin.FilterEstimate
|
||||
|
||||
minerCount int64
|
||||
minerCountAboveMinimumPower int64
|
||||
}
|
||||
|
||||
func (p *Processor) setupPower() error {
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`
|
||||
create table if not exists chain_power
|
||||
(
|
||||
state_root text not null
|
||||
constraint power_smoothing_estimates_pk
|
||||
primary key,
|
||||
|
||||
total_raw_bytes_power text not null,
|
||||
total_raw_bytes_committed text not null,
|
||||
total_qa_bytes_power text not null,
|
||||
total_qa_bytes_committed text not null,
|
||||
total_pledge_collateral text not null,
|
||||
|
||||
qa_smoothed_position_estimate text not null,
|
||||
qa_smoothed_velocity_estimate text not null,
|
||||
|
||||
miner_count int not null,
|
||||
minimum_consensus_miner_count int not null
|
||||
);
|
||||
`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (p *Processor) HandlePowerChanges(ctx context.Context, powerTips ActorTips) error {
|
||||
powerChanges, err := p.processPowerActors(ctx, powerTips)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("Failed to process power actors: %w", err)
|
||||
}
|
||||
|
||||
if err := p.persistPowerActors(ctx, powerChanges); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips) ([]powerActorInfo, error) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
log.Debugw("Processed Power Actors", "duration", time.Since(start).String())
|
||||
}()
|
||||
|
||||
var out []powerActorInfo
|
||||
for tipset, powerStates := range powerTips {
|
||||
for _, act := range powerStates {
|
||||
var pw powerActorInfo
|
||||
pw.common = act
|
||||
|
||||
powerActorState, err := getPowerActorState(ctx, p.node, tipset)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("get power state (@ %s): %w", pw.common.stateroot.String(), err)
|
||||
}
|
||||
|
||||
totalPower, err := powerActorState.TotalPower()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to compute total power: %w", err)
|
||||
}
|
||||
|
||||
totalCommitted, err := powerActorState.TotalCommitted()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to compute total committed: %w", err)
|
||||
}
|
||||
|
||||
totalLocked, err := powerActorState.TotalLocked()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to compute total locked: %w", err)
|
||||
}
|
||||
|
||||
powerSmoothed, err := powerActorState.TotalPowerSmoothed()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to determine smoothed power: %w", err)
|
||||
}
|
||||
|
||||
// NOTE: this doesn't set new* fields. Previously, we
|
||||
// filled these using ThisEpoch* fields from the actor
|
||||
// state, but these fields are effectively internal
|
||||
// state and don't represent "new" power, as was
|
||||
// assumed.
|
||||
|
||||
participatingMiners, totalMiners, err := powerActorState.MinerCounts()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to count miners: %w", err)
|
||||
}
|
||||
|
||||
pw.totalRawBytes = totalPower.RawBytePower
|
||||
pw.totalQualityAdjustedBytes = totalPower.QualityAdjPower
|
||||
pw.totalRawBytesCommitted = totalCommitted.RawBytePower
|
||||
pw.totalQualityAdjustedBytesCommitted = totalCommitted.QualityAdjPower
|
||||
pw.totalPledgeCollateral = totalLocked
|
||||
pw.qaPowerSmoothed = powerSmoothed
|
||||
pw.minerCountAboveMinimumPower = int64(participatingMiners)
|
||||
pw.minerCount = int64(totalMiners)
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (p *Processor) persistPowerActors(ctx context.Context, powerStates []powerActorInfo) error {
|
||||
// NB: use errgroup when there is more than a single store operation
|
||||
return p.storePowerSmoothingEstimates(powerStates)
|
||||
}
|
||||
|
||||
func (p *Processor) storePowerSmoothingEstimates(powerStates []powerActorInfo) error {
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("begin chain_power tx: %w", err)
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`create temp table cp (like chain_power) on commit drop`); err != nil {
|
||||
return xerrors.Errorf("prep chain_power: %w", err)
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(`copy cp (state_root, total_raw_bytes_power, total_raw_bytes_committed, total_qa_bytes_power, total_qa_bytes_committed, total_pledge_collateral, qa_smoothed_position_estimate, qa_smoothed_velocity_estimate, miner_count, minimum_consensus_miner_count) from stdin;`)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("prepare tmp chain_power: %w", err)
|
||||
}
|
||||
|
||||
for _, ps := range powerStates {
|
||||
if _, err := stmt.Exec(
|
||||
ps.common.stateroot.String(),
|
||||
|
||||
ps.totalRawBytes.String(),
|
||||
ps.totalRawBytesCommitted.String(),
|
||||
ps.totalQualityAdjustedBytes.String(),
|
||||
ps.totalQualityAdjustedBytesCommitted.String(),
|
||||
ps.totalPledgeCollateral.String(),
|
||||
|
||||
ps.qaPowerSmoothed.PositionEstimate.String(),
|
||||
ps.qaPowerSmoothed.VelocityEstimate.String(),
|
||||
|
||||
ps.minerCount,
|
||||
ps.minerCountAboveMinimumPower,
|
||||
); err != nil {
|
||||
return xerrors.Errorf("failed to store smoothing estimate: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := stmt.Close(); err != nil {
|
||||
return xerrors.Errorf("close prepared chain_power: %w", err)
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil {
|
||||
return xerrors.Errorf("insert chain_power from tmp: %w", err)
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return xerrors.Errorf("commit chain_power tx: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
@ -1,420 +0,0 @@
|
||||
package processor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin"
|
||||
|
||||
"github.com/filecoin-project/lotus/api/v0api"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util"
|
||||
"github.com/filecoin-project/lotus/lib/parmap"
|
||||
)
|
||||
|
||||
var log = logging.Logger("processor")
|
||||
|
||||
type Processor struct {
|
||||
db *sql.DB
|
||||
|
||||
node v0api.FullNode
|
||||
ctxStore *cw_util.APIIpldStore
|
||||
|
||||
genesisTs *types.TipSet
|
||||
|
||||
// number of blocks processed at a time
|
||||
batch int
|
||||
}
|
||||
|
||||
type ActorTips map[types.TipSetKey][]actorInfo
|
||||
|
||||
type actorInfo struct {
|
||||
act types.Actor
|
||||
|
||||
stateroot cid.Cid
|
||||
height abi.ChainEpoch // so that we can walk the actor changes in chronological order.
|
||||
|
||||
tsKey types.TipSetKey
|
||||
parentTsKey types.TipSetKey
|
||||
|
||||
addr address.Address
|
||||
state string
|
||||
}
|
||||
|
||||
func NewProcessor(ctx context.Context, db *sql.DB, node v0api.FullNode, batch int) *Processor {
|
||||
ctxStore := cw_util.NewAPIIpldStore(ctx, node)
|
||||
return &Processor{
|
||||
db: db,
|
||||
ctxStore: ctxStore,
|
||||
node: node,
|
||||
batch: batch,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Processor) setupSchemas() error {
|
||||
// maintain order, subsequent calls create tables with foreign keys.
|
||||
if err := p.setupMiners(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := p.setupMarket(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := p.setupRewards(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := p.setupMessages(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := p.setupCommonActors(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := p.setupPower(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Processor) Start(ctx context.Context) {
|
||||
log.Debug("Starting Processor")
|
||||
|
||||
if err := p.setupSchemas(); err != nil {
|
||||
log.Fatalw("Failed to setup processor", "error", err)
|
||||
}
|
||||
|
||||
var err error
|
||||
p.genesisTs, err = p.node.ChainGetGenesis(ctx)
|
||||
if err != nil {
|
||||
log.Fatalw("Failed to get genesis state from lotus", "error", err.Error())
|
||||
}
|
||||
|
||||
go p.subMpool(ctx)
|
||||
|
||||
// main processor loop
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Info("Stopping Processor...")
|
||||
return
|
||||
default:
|
||||
loopStart := time.Now()
|
||||
toProcess, err := p.unprocessedBlocks(ctx, p.batch)
|
||||
if err != nil {
|
||||
log.Fatalw("Failed to get unprocessed blocks", "error", err)
|
||||
}
|
||||
|
||||
if len(toProcess) == 0 {
|
||||
log.Info("No unprocessed blocks. Wait then try again...")
|
||||
time.Sleep(time.Second * 30)
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO special case genesis state handling here to avoid all the special cases that will be needed for it else where
|
||||
// before doing "normal" processing.
|
||||
|
||||
actorChanges, nullRounds, err := p.collectActorChanges(ctx, toProcess)
|
||||
if err != nil {
|
||||
log.Fatalw("Failed to collect actor changes", "error", err)
|
||||
}
|
||||
log.Infow("Collected Actor Changes",
|
||||
"MarketChanges", len(actorChanges[builtin2.StorageMarketActorCodeID]),
|
||||
"MinerChanges", len(actorChanges[builtin2.StorageMinerActorCodeID]),
|
||||
"RewardChanges", len(actorChanges[builtin2.RewardActorCodeID]),
|
||||
"AccountChanges", len(actorChanges[builtin2.AccountActorCodeID]),
|
||||
"nullRounds", len(nullRounds))
|
||||
|
||||
grp := sync.WaitGroup{}
|
||||
|
||||
grp.Add(1)
|
||||
go func() {
|
||||
defer grp.Done()
|
||||
if err := p.HandleMarketChanges(ctx, actorChanges[builtin2.StorageMarketActorCodeID]); err != nil {
|
||||
log.Errorf("Failed to handle market changes: %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
grp.Add(1)
|
||||
go func() {
|
||||
defer grp.Done()
|
||||
if err := p.HandleMinerChanges(ctx, actorChanges[builtin2.StorageMinerActorCodeID]); err != nil {
|
||||
log.Errorf("Failed to handle miner changes: %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
grp.Add(1)
|
||||
go func() {
|
||||
defer grp.Done()
|
||||
if err := p.HandleRewardChanges(ctx, actorChanges[builtin2.RewardActorCodeID], nullRounds); err != nil {
|
||||
log.Errorf("Failed to handle reward changes: %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
grp.Add(1)
|
||||
go func() {
|
||||
defer grp.Done()
|
||||
if err := p.HandlePowerChanges(ctx, actorChanges[builtin2.StoragePowerActorCodeID]); err != nil {
|
||||
log.Errorf("Failed to handle power actor changes: %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
grp.Add(1)
|
||||
go func() {
|
||||
defer grp.Done()
|
||||
if err := p.HandleMessageChanges(ctx, toProcess); err != nil {
|
||||
log.Errorf("Failed to handle message changes: %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
grp.Add(1)
|
||||
go func() {
|
||||
defer grp.Done()
|
||||
if err := p.HandleCommonActorsChanges(ctx, actorChanges); err != nil {
|
||||
log.Errorf("Failed to handle common actor changes: %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
grp.Wait()
|
||||
|
||||
if err := p.markBlocksProcessed(ctx, toProcess); err != nil {
|
||||
log.Fatalw("Failed to mark blocks as processed", "error", err)
|
||||
}
|
||||
|
||||
if err := p.refreshViews(); err != nil {
|
||||
log.Errorw("Failed to refresh views", "error", err)
|
||||
}
|
||||
log.Infow("Processed Batch Complete", "duration", time.Since(loopStart).String())
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
}
|
||||
|
||||
func (p *Processor) refreshViews() error {
|
||||
if _, err := p.db.Exec(`refresh materialized view state_heights`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.Cid]*types.BlockHeader) (map[cid.Cid]ActorTips, []types.TipSetKey, error) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
log.Debugw("Collected Actor Changes", "duration", time.Since(start).String())
|
||||
}()
|
||||
// ActorCode - > tipset->[]actorInfo
|
||||
out := map[cid.Cid]ActorTips{}
|
||||
var outMu sync.Mutex
|
||||
|
||||
// map of addresses to changed actors
|
||||
var changes map[string]types.Actor
|
||||
actorsSeen := map[cid.Cid]struct{}{}
|
||||
|
||||
var nullRounds []types.TipSetKey
|
||||
var nullBlkMu sync.Mutex
|
||||
|
||||
// collect all actor state that has changes between block headers
|
||||
paDone := 0
|
||||
parmap.Par(50, parmap.MapArr(toProcess), func(bh *types.BlockHeader) {
|
||||
paDone++
|
||||
if paDone%100 == 0 {
|
||||
log.Debugw("Collecting actor changes", "done", paDone, "percent", (paDone*100)/len(toProcess))
|
||||
}
|
||||
|
||||
pts, err := p.node.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...))
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
if pts.ParentState().Equals(bh.ParentStateRoot) {
|
||||
nullBlkMu.Lock()
|
||||
nullRounds = append(nullRounds, pts.Key())
|
||||
nullBlkMu.Unlock()
|
||||
}
|
||||
|
||||
// collect all actors that had state changes between the blockheader parent-state and its grandparent-state.
|
||||
// TODO: changes will contain deleted actors, this causes needless processing further down the pipeline, consider
|
||||
// a separate strategy for deleted actors
|
||||
changes, err = p.node.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
log.Debugw("StateChangedActors", "grandparent_state", pts.ParentState(), "parent_state", bh.ParentStateRoot)
|
||||
return
|
||||
}
|
||||
|
||||
// record the state of all actors that have changed
|
||||
for a, act := range changes {
|
||||
act := act
|
||||
a := a
|
||||
|
||||
// ignore actors that were deleted.
|
||||
has, err := p.node.ChainHasObj(ctx, act.Head)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
log.Debugw("ChanHasObj", "actor_head", act.Head)
|
||||
return
|
||||
}
|
||||
if !has {
|
||||
continue
|
||||
}
|
||||
|
||||
addr, err := address.NewFromString(a)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
log.Debugw("NewFromString", "address_string", a)
|
||||
return
|
||||
}
|
||||
|
||||
ast, err := p.node.StateReadState(ctx, addr, pts.Key())
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
log.Debugw("StateReadState", "address_string", a, "parent_tipset_key", pts.Key())
|
||||
return
|
||||
}
|
||||
|
||||
// TODO look here for an empty state, maybe thats a sign the actor was deleted?
|
||||
|
||||
state, err := json.Marshal(ast.State)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
outMu.Lock()
|
||||
if _, ok := actorsSeen[act.Head]; !ok {
|
||||
_, ok := out[act.Code]
|
||||
if !ok {
|
||||
out[act.Code] = map[types.TipSetKey][]actorInfo{}
|
||||
}
|
||||
out[act.Code][pts.Key()] = append(out[act.Code][pts.Key()], actorInfo{
|
||||
act: act,
|
||||
stateroot: bh.ParentStateRoot,
|
||||
height: bh.Height,
|
||||
tsKey: pts.Key(),
|
||||
parentTsKey: pts.Parents(),
|
||||
addr: addr,
|
||||
state: string(state),
|
||||
})
|
||||
}
|
||||
actorsSeen[act.Head] = struct{}{}
|
||||
outMu.Unlock()
|
||||
}
|
||||
})
|
||||
return out, nullRounds, nil
|
||||
}
|
||||
|
||||
func (p *Processor) unprocessedBlocks(ctx context.Context, batch int) (map[cid.Cid]*types.BlockHeader, error) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
log.Debugw("Gathered Blocks to process", "duration", time.Since(start).String())
|
||||
}()
|
||||
rows, err := p.db.Query(`
|
||||
with toProcess as (
|
||||
select b.cid, b.height, rank() over (order by height) as rnk
|
||||
from blocks_synced bs
|
||||
left join blocks b on bs.cid = b.cid
|
||||
where bs.processed_at is null and b.height > 0
|
||||
)
|
||||
select cid
|
||||
from toProcess
|
||||
where rnk <= $1
|
||||
`, batch)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("Failed to query for unprocessed blocks: %w", err)
|
||||
}
|
||||
out := map[cid.Cid]*types.BlockHeader{}
|
||||
|
||||
minBlock := abi.ChainEpoch(math.MaxInt64)
|
||||
maxBlock := abi.ChainEpoch(0)
|
||||
// TODO consider parallel execution here for getting the blocks from the api as is done in fetchMessages()
|
||||
for rows.Next() {
|
||||
if rows.Err() != nil {
|
||||
return nil, err
|
||||
}
|
||||
var c string
|
||||
if err := rows.Scan(&c); err != nil {
|
||||
log.Errorf("Failed to scan unprocessed blocks: %s", err.Error())
|
||||
continue
|
||||
}
|
||||
ci, err := cid.Parse(c)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to parse unprocessed blocks: %s", err.Error())
|
||||
continue
|
||||
}
|
||||
bh, err := p.node.ChainGetBlock(ctx, ci)
|
||||
if err != nil {
|
||||
// this is a pretty serious issue.
|
||||
log.Errorf("Failed to get block header %s: %s", ci.String(), err.Error())
|
||||
continue
|
||||
}
|
||||
out[ci] = bh
|
||||
if bh.Height < minBlock {
|
||||
minBlock = bh.Height
|
||||
}
|
||||
if bh.Height > maxBlock {
|
||||
maxBlock = bh.Height
|
||||
}
|
||||
}
|
||||
if minBlock <= maxBlock {
|
||||
log.Infow("Gathered Blocks to Process", "start", minBlock, "end", maxBlock)
|
||||
}
|
||||
return out, rows.Close()
|
||||
}
|
||||
|
||||
func (p *Processor) markBlocksProcessed(ctx context.Context, processed map[cid.Cid]*types.BlockHeader) error {
|
||||
start := time.Now()
|
||||
processedHeight := abi.ChainEpoch(0)
|
||||
defer func() {
|
||||
log.Debugw("Marked blocks as Processed", "duration", time.Since(start).String())
|
||||
log.Infow("Processed Blocks", "height", processedHeight)
|
||||
}()
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
processedAt := time.Now().Unix()
|
||||
stmt, err := tx.Prepare(`update blocks_synced set processed_at=$1 where cid=$2`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for c, bh := range processed {
|
||||
if bh.Height > processedHeight {
|
||||
processedHeight = bh.Height
|
||||
}
|
||||
if _, err := stmt.Exec(processedAt, c.String()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
@ -1,234 +0,0 @@
|
||||
package processor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/reward"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
|
||||
cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util"
|
||||
)
|
||||
|
||||
type rewardActorInfo struct {
|
||||
common actorInfo
|
||||
|
||||
cumSumBaselinePower big.Int
|
||||
cumSumRealizedPower big.Int
|
||||
|
||||
effectiveNetworkTime abi.ChainEpoch
|
||||
effectiveBaselinePower big.Int
|
||||
|
||||
// NOTE: These variables are wrong. Talk to @ZX about fixing. These _do
|
||||
// not_ represent "new" anything.
|
||||
newBaselinePower big.Int
|
||||
newBaseReward big.Int
|
||||
newSmoothingEstimate builtin.FilterEstimate
|
||||
|
||||
totalMinedReward big.Int
|
||||
}
|
||||
|
||||
func (rw *rewardActorInfo) set(s reward.State) (err error) {
|
||||
rw.cumSumBaselinePower, err = s.CumsumBaseline()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting cumsum baseline power (@ %s): %w", rw.common.stateroot.String(), err)
|
||||
}
|
||||
|
||||
rw.cumSumRealizedPower, err = s.CumsumRealized()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting cumsum realized power (@ %s): %w", rw.common.stateroot.String(), err)
|
||||
}
|
||||
|
||||
rw.effectiveNetworkTime, err = s.EffectiveNetworkTime()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting effective network time (@ %s): %w", rw.common.stateroot.String(), err)
|
||||
}
|
||||
|
||||
rw.effectiveBaselinePower, err = s.EffectiveBaselinePower()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting effective baseline power (@ %s): %w", rw.common.stateroot.String(), err)
|
||||
}
|
||||
|
||||
rw.totalMinedReward, err = s.TotalStoragePowerReward()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting total mined (@ %s): %w", rw.common.stateroot.String(), err)
|
||||
}
|
||||
|
||||
rw.newBaselinePower, err = s.ThisEpochBaselinePower()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting this epoch baseline power (@ %s): %w", rw.common.stateroot.String(), err)
|
||||
}
|
||||
|
||||
rw.newBaseReward, err = s.ThisEpochReward()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting this epoch baseline power (@ %s): %w", rw.common.stateroot.String(), err)
|
||||
}
|
||||
|
||||
rw.newSmoothingEstimate, err = s.ThisEpochRewardSmoothed()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting this epoch baseline power (@ %s): %w", rw.common.stateroot.String(), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Processor) setupRewards() error {
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`
|
||||
/* captures chain-specific power state for any given stateroot */
|
||||
create table if not exists chain_reward
|
||||
(
|
||||
state_root text not null
|
||||
constraint chain_reward_pk
|
||||
primary key,
|
||||
cum_sum_baseline text not null,
|
||||
cum_sum_realized text not null,
|
||||
effective_network_time int not null,
|
||||
effective_baseline_power text not null,
|
||||
|
||||
new_baseline_power text not null,
|
||||
new_reward numeric not null,
|
||||
new_reward_smoothed_position_estimate text not null,
|
||||
new_reward_smoothed_velocity_estimate text not null,
|
||||
|
||||
total_mined_reward text not null
|
||||
);
|
||||
`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) error {
|
||||
rewardChanges, err := p.processRewardActors(ctx, rewardTips, nullRounds)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("Failed to process reward actors: %w", err)
|
||||
}
|
||||
|
||||
if err := p.persistRewardActors(ctx, rewardChanges); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) ([]rewardActorInfo, error) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
log.Debugw("Processed Reward Actors", "duration", time.Since(start).String())
|
||||
}()
|
||||
|
||||
var out []rewardActorInfo
|
||||
for tipset, rewards := range rewardTips {
|
||||
for _, act := range rewards {
|
||||
var rw rewardActorInfo
|
||||
rw.common = act
|
||||
|
||||
// get reward actor states at each tipset once for all updates
|
||||
rewardActor, err := p.node.StateGetActor(ctx, reward.Address, tipset)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("get reward state (@ %s): %w", rw.common.stateroot.String(), err)
|
||||
}
|
||||
|
||||
rewardActorState, err := reward.Load(cw_util.NewAPIIpldStore(ctx, p.node), rewardActor)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("read state obj (@ %s): %w", rw.common.stateroot.String(), err)
|
||||
}
|
||||
if err := rw.set(rewardActorState); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out = append(out, rw)
|
||||
}
|
||||
}
|
||||
for _, tsKey := range nullRounds {
|
||||
var rw rewardActorInfo
|
||||
tipset, err := p.node.ChainGetTipSet(ctx, tsKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rw.common.tsKey = tipset.Key()
|
||||
rw.common.height = tipset.Height()
|
||||
rw.common.stateroot = tipset.ParentState()
|
||||
rw.common.parentTsKey = tipset.Parents()
|
||||
// get reward actor states at each tipset once for all updates
|
||||
rewardActor, err := p.node.StateGetActor(ctx, reward.Address, tsKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rewardActorState, err := reward.Load(cw_util.NewAPIIpldStore(ctx, p.node), rewardActor)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("read state obj (@ %s): %w", rw.common.stateroot.String(), err)
|
||||
}
|
||||
|
||||
if err := rw.set(rewardActorState); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, rw)
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (p *Processor) persistRewardActors(ctx context.Context, rewards []rewardActorInfo) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
log.Debugw("Persisted Reward Actors", "duration", time.Since(start).String())
|
||||
}()
|
||||
|
||||
tx, err := p.db.Begin()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("begin chain_reward tx: %w", err)
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`create temp table cr (like chain_reward excluding constraints) on commit drop`); err != nil {
|
||||
return xerrors.Errorf("prep chain_reward temp: %w", err)
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(`copy cr ( state_root, cum_sum_baseline, cum_sum_realized, effective_network_time, effective_baseline_power, new_baseline_power, new_reward, new_reward_smoothed_position_estimate, new_reward_smoothed_velocity_estimate, total_mined_reward) from STDIN`)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("prepare tmp chain_reward: %w", err)
|
||||
}
|
||||
|
||||
for _, rewardState := range rewards {
|
||||
if _, err := stmt.Exec(
|
||||
rewardState.common.stateroot.String(),
|
||||
rewardState.cumSumBaselinePower.String(),
|
||||
rewardState.cumSumRealizedPower.String(),
|
||||
uint64(rewardState.effectiveNetworkTime),
|
||||
rewardState.effectiveBaselinePower.String(),
|
||||
rewardState.newBaselinePower.String(),
|
||||
rewardState.newBaseReward.String(),
|
||||
rewardState.newSmoothingEstimate.PositionEstimate.String(),
|
||||
rewardState.newSmoothingEstimate.VelocityEstimate.String(),
|
||||
rewardState.totalMinedReward.String(),
|
||||
); err != nil {
|
||||
log.Errorw("failed to store chain power", "state_root", rewardState.common.stateroot, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := stmt.Close(); err != nil {
|
||||
return xerrors.Errorf("close prepared chain_reward: %w", err)
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`insert into chain_reward select * from cr on conflict do nothing`); err != nil {
|
||||
return xerrors.Errorf("insert chain_reward from tmp: %w", err)
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return xerrors.Errorf("commit chain_reward tx: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -1,107 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/filecoin-project/lotus/api/v0api"
|
||||
|
||||
_ "github.com/lib/pq"
|
||||
|
||||
"github.com/filecoin-project/go-jsonrpc"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/urfave/cli/v2"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/processor"
|
||||
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/scheduler"
|
||||
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/syncer"
|
||||
"github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util"
|
||||
)
|
||||
|
||||
var runCmd = &cli.Command{
|
||||
Name: "run",
|
||||
Usage: "Start lotus chainwatch",
|
||||
Flags: []cli.Flag{
|
||||
&cli.IntFlag{
|
||||
Name: "max-batch",
|
||||
Value: 50,
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
go func() {
|
||||
http.ListenAndServe(":6060", nil) //nolint:errcheck
|
||||
}()
|
||||
ll := cctx.String("log-level")
|
||||
if err := logging.SetLogLevel("*", ll); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := logging.SetLogLevel("rpc", "error"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var api v0api.FullNode
|
||||
var closer jsonrpc.ClientCloser
|
||||
var err error
|
||||
if tokenMaddr := cctx.String("api"); tokenMaddr != "" {
|
||||
toks := strings.Split(tokenMaddr, ":")
|
||||
if len(toks) != 2 {
|
||||
return fmt.Errorf("invalid api tokens, expected <token>:<maddr>, got: %s", tokenMaddr)
|
||||
}
|
||||
|
||||
api, closer, err = util.GetFullNodeAPIUsingCredentials(cctx.Context, toks[1], toks[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
api, closer, err = lcli.GetFullNodeAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
defer closer()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
v, err := api.Version(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("Remote version: %s", v.Version)
|
||||
|
||||
maxBatch := cctx.Int("max-batch")
|
||||
|
||||
db, err := sql.Open("postgres", cctx.String("db"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err := db.Close(); err != nil {
|
||||
log.Errorw("Failed to close database", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := db.Ping(); err != nil {
|
||||
return xerrors.Errorf("Database failed to respond to ping (is it online?): %w", err)
|
||||
}
|
||||
db.SetMaxOpenConns(1350)
|
||||
|
||||
sync := syncer.NewSyncer(db, api, 1400)
|
||||
sync.Start(ctx)
|
||||
|
||||
proc := processor.NewProcessor(ctx, db, api, maxBatch)
|
||||
proc.Start(ctx)
|
||||
|
||||
sched := scheduler.PrepareScheduler(db)
|
||||
sched.Start(ctx)
|
||||
|
||||
<-ctx.Done()
|
||||
os.Exit(0)
|
||||
return nil
|
||||
},
|
||||
}
|
@ -1,78 +0,0 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
func setupTopMinerByBaseRewardSchema(ctx context.Context, db *sql.DB) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
tx, err := db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.Exec(`
|
||||
create materialized view if not exists top_miners_by_base_reward as
|
||||
with total_rewards_by_miner as (
|
||||
select
|
||||
b.miner,
|
||||
sum(cr.new_reward * b.win_count) as total_reward
|
||||
from blocks b
|
||||
inner join chain_reward cr on b.parentstateroot = cr.state_root
|
||||
group by 1
|
||||
) select
|
||||
rank() over (order by total_reward desc),
|
||||
miner,
|
||||
total_reward
|
||||
from total_rewards_by_miner
|
||||
group by 2, 3;
|
||||
|
||||
create index if not exists top_miners_by_base_reward_miner_index
|
||||
on top_miners_by_base_reward (miner);
|
||||
|
||||
create materialized view if not exists top_miners_by_base_reward_max_height as
|
||||
select
|
||||
b."timestamp"as current_timestamp,
|
||||
max(b.height) as current_height
|
||||
from blocks b
|
||||
join chain_reward cr on b.parentstateroot = cr.state_root
|
||||
where cr.new_reward is not null
|
||||
group by 1
|
||||
order by 1 desc
|
||||
limit 1;
|
||||
`); err != nil {
|
||||
return xerrors.Errorf("create top_miners_by_base_reward views: %w", err)
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return xerrors.Errorf("committing top_miners_by_base_reward views; %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func refreshTopMinerByBaseReward(ctx context.Context, db *sql.DB) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
_, err := db.Exec("refresh materialized view top_miners_by_base_reward;")
|
||||
if err != nil {
|
||||
return xerrors.Errorf("refresh top_miners_by_base_reward: %w", err)
|
||||
}
|
||||
|
||||
_, err = db.Exec("refresh materialized view top_miners_by_base_reward_max_height;")
|
||||
if err != nil {
|
||||
return xerrors.Errorf("refresh top_miners_by_base_reward_max_height: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -1,60 +0,0 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"time"
|
||||
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
var log = logging.Logger("scheduler")
|
||||
|
||||
// Scheduler manages the execution of jobs triggered
|
||||
// by tickers. Not externally configurable at runtime.
|
||||
type Scheduler struct {
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
// PrepareScheduler returns a ready-to-run Scheduler
|
||||
func PrepareScheduler(db *sql.DB) *Scheduler {
|
||||
return &Scheduler{db}
|
||||
}
|
||||
|
||||
func (s *Scheduler) setupSchema(ctx context.Context) error {
|
||||
if err := setupTopMinerByBaseRewardSchema(ctx, s.db); err != nil {
|
||||
return xerrors.Errorf("setup top miners by reward schema: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start the scheduler jobs at the defined intervals
|
||||
func (s *Scheduler) Start(ctx context.Context) {
|
||||
log.Debug("Starting Scheduler")
|
||||
|
||||
if err := s.setupSchema(ctx); err != nil {
|
||||
log.Fatalw("applying scheduling schema", "error", err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
// run once on start after schema has initialized
|
||||
time.Sleep(1 * time.Minute)
|
||||
if err := refreshTopMinerByBaseReward(ctx, s.db); err != nil {
|
||||
log.Errorw("failed to refresh top miner", "error", err)
|
||||
}
|
||||
refreshTopMinerCh := time.NewTicker(30 * time.Second)
|
||||
defer refreshTopMinerCh.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-refreshTopMinerCh.C:
|
||||
if err := refreshTopMinerByBaseReward(ctx, s.db); err != nil {
|
||||
log.Errorw("failed to refresh top miner", "error", err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
@ -1,27 +0,0 @@
|
||||
package syncer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
func (s *Syncer) subBlocks(ctx context.Context) {
|
||||
sub, err := s.node.SyncIncomingBlocks(ctx)
|
||||
if err != nil {
|
||||
log.Errorf("opening incoming block channel: %+v", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Infow("Capturing incoming blocks")
|
||||
for bh := range sub {
|
||||
err := s.storeHeaders(map[cid.Cid]*types.BlockHeader{
|
||||
bh.Cid(): bh,
|
||||
}, false, time.Now())
|
||||
if err != nil {
|
||||
log.Errorf("storing incoming block header: %+v", err)
|
||||
}
|
||||
}
|
||||
}
|
@ -1,527 +0,0 @@
|
||||
package syncer
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
|
||||
"github.com/filecoin-project/lotus/api/v0api"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
var log = logging.Logger("syncer")
|
||||
|
||||
type Syncer struct {
|
||||
db *sql.DB
|
||||
|
||||
lookbackLimit uint64
|
||||
|
||||
headerLk sync.Mutex
|
||||
node v0api.FullNode
|
||||
}
|
||||
|
||||
func NewSyncer(db *sql.DB, node v0api.FullNode, lookbackLimit uint64) *Syncer {
|
||||
return &Syncer{
|
||||
db: db,
|
||||
node: node,
|
||||
lookbackLimit: lookbackLimit,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Syncer) setupSchemas() error {
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`
|
||||
/* tracks circulating fil available on the network at each tipset */
|
||||
create table if not exists chain_economics
|
||||
(
|
||||
parent_state_root text not null
|
||||
constraint chain_economics_pk primary key,
|
||||
circulating_fil text not null,
|
||||
vested_fil text not null,
|
||||
mined_fil text not null,
|
||||
burnt_fil text not null,
|
||||
locked_fil text not null
|
||||
);
|
||||
|
||||
create table if not exists block_cids
|
||||
(
|
||||
cid text not null
|
||||
constraint block_cids_pk
|
||||
primary key
|
||||
);
|
||||
|
||||
create unique index if not exists block_cids_cid_uindex
|
||||
on block_cids (cid);
|
||||
|
||||
create table if not exists blocks_synced
|
||||
(
|
||||
cid text not null
|
||||
constraint blocks_synced_pk
|
||||
primary key
|
||||
constraint blocks_block_cids_cid_fk
|
||||
references block_cids (cid),
|
||||
synced_at int not null,
|
||||
processed_at int
|
||||
);
|
||||
|
||||
create unique index if not exists blocks_synced_cid_uindex
|
||||
on blocks_synced (cid,processed_at);
|
||||
|
||||
create table if not exists block_parents
|
||||
(
|
||||
block text not null
|
||||
constraint blocks_block_cids_cid_fk
|
||||
references block_cids (cid),
|
||||
parent text not null
|
||||
);
|
||||
|
||||
create unique index if not exists block_parents_block_parent_uindex
|
||||
on block_parents (block, parent);
|
||||
|
||||
create table if not exists drand_entries
|
||||
(
|
||||
round bigint not null
|
||||
constraint drand_entries_pk
|
||||
primary key,
|
||||
data bytea not null
|
||||
);
|
||||
create unique index if not exists drand_entries_round_uindex
|
||||
on drand_entries (round);
|
||||
|
||||
create table if not exists block_drand_entries
|
||||
(
|
||||
round bigint not null
|
||||
constraint block_drand_entries_drand_entries_round_fk
|
||||
references drand_entries (round),
|
||||
block text not null
|
||||
constraint blocks_block_cids_cid_fk
|
||||
references block_cids (cid)
|
||||
);
|
||||
create unique index if not exists block_drand_entries_round_uindex
|
||||
on block_drand_entries (round, block);
|
||||
|
||||
create table if not exists blocks
|
||||
(
|
||||
cid text not null
|
||||
constraint blocks_pk
|
||||
primary key
|
||||
constraint blocks_block_cids_cid_fk
|
||||
references block_cids (cid),
|
||||
parentWeight numeric not null,
|
||||
parentStateRoot text not null,
|
||||
height bigint not null,
|
||||
miner text not null,
|
||||
timestamp bigint not null,
|
||||
ticket bytea not null,
|
||||
election_proof bytea,
|
||||
win_count bigint,
|
||||
parent_base_fee text not null,
|
||||
forksig bigint not null
|
||||
);
|
||||
|
||||
create unique index if not exists block_cid_uindex
|
||||
on blocks (cid,height);
|
||||
|
||||
create materialized view if not exists state_heights
|
||||
as select min(b.height) height, b.parentstateroot
|
||||
from blocks b group by b.parentstateroot;
|
||||
|
||||
create index if not exists state_heights_height_index
|
||||
on state_heights (height);
|
||||
|
||||
create index if not exists state_heights_parentstateroot_index
|
||||
on state_heights (parentstateroot);
|
||||
`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (s *Syncer) Start(ctx context.Context) {
|
||||
if err := logging.SetLogLevel("syncer", "info"); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
log.Debug("Starting Syncer")
|
||||
|
||||
if err := s.setupSchemas(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// capture all reported blocks
|
||||
go s.subBlocks(ctx)
|
||||
|
||||
// we need to ensure that on a restart we don't reprocess the whole flarping chain
|
||||
var sinceEpoch uint64
|
||||
blkCID, height, err := s.mostRecentlySyncedBlockHeight()
|
||||
if err != nil {
|
||||
log.Fatalw("failed to find most recently synced block", "error", err)
|
||||
} else {
|
||||
if height > 0 {
|
||||
log.Infow("Found starting point for syncing", "blockCID", blkCID.String(), "height", height)
|
||||
sinceEpoch = uint64(height)
|
||||
}
|
||||
}
|
||||
|
||||
// continue to keep the block headers table up to date.
|
||||
notifs, err := s.node.ChainNotify(ctx)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
for notif := range notifs {
|
||||
for _, change := range notif {
|
||||
switch change.Type {
|
||||
case store.HCCurrent:
|
||||
// This case is important for capturing the initial state of a node
|
||||
// which might be on a dead network with no new blocks being produced.
|
||||
// It also allows a fresh Chainwatch instance to start walking the
|
||||
// chain without waiting for a new block to come along.
|
||||
fallthrough
|
||||
case store.HCApply:
|
||||
unsynced, err := s.unsyncedBlocks(ctx, change.Val, sinceEpoch)
|
||||
if err != nil {
|
||||
log.Errorw("failed to gather unsynced blocks", "error", err)
|
||||
}
|
||||
|
||||
if err := s.storeCirculatingSupply(ctx, change.Val); err != nil {
|
||||
log.Errorw("failed to store circulating supply", "error", err)
|
||||
}
|
||||
|
||||
if len(unsynced) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := s.storeHeaders(unsynced, true, time.Now()); err != nil {
|
||||
// so this is pretty bad, need some kind of retry..
|
||||
// for now just log an error and the blocks will be attempted again on next notifi
|
||||
log.Errorw("failed to store unsynced blocks", "error", err)
|
||||
}
|
||||
|
||||
sinceEpoch = uint64(change.Val.Height())
|
||||
case store.HCRevert:
|
||||
log.Debug("revert todo")
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since uint64) (map[cid.Cid]*types.BlockHeader, error) {
|
||||
hasList, err := s.syncedBlocks(since, s.lookbackLimit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// build a list of blocks that we have not synced.
|
||||
toVisit := list.New()
|
||||
for _, header := range head.Blocks() {
|
||||
toVisit.PushBack(header)
|
||||
}
|
||||
|
||||
toSync := map[cid.Cid]*types.BlockHeader{}
|
||||
|
||||
for toVisit.Len() > 0 {
|
||||
bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader)
|
||||
_, has := hasList[bh.Cid()]
|
||||
if _, seen := toSync[bh.Cid()]; seen || has {
|
||||
continue
|
||||
}
|
||||
|
||||
toSync[bh.Cid()] = bh
|
||||
if len(toSync)%500 == 10 {
|
||||
log.Debugw("To visit", "toVisit", toVisit.Len(), "toSync", len(toSync), "current_height", bh.Height)
|
||||
}
|
||||
|
||||
if bh.Height == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
pts, err := s.node.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...))
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, header := range pts.Blocks() {
|
||||
toVisit.PushBack(header)
|
||||
}
|
||||
}
|
||||
log.Debugw("Gathered unsynced blocks", "count", len(toSync))
|
||||
return toSync, nil
|
||||
}
|
||||
|
||||
func (s *Syncer) syncedBlocks(since, limit uint64) (map[cid.Cid]struct{}, error) {
|
||||
rws, err := s.db.Query(`select bs.cid FROM blocks_synced bs left join blocks b on b.cid = bs.cid where b.height <= $1 and bs.processed_at is not null limit $2`, since, limit)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("Failed to query blocks_synced: %w", err)
|
||||
}
|
||||
out := map[cid.Cid]struct{}{}
|
||||
|
||||
for rws.Next() {
|
||||
var c string
|
||||
if err := rws.Scan(&c); err != nil {
|
||||
return nil, xerrors.Errorf("Failed to scan blocks_synced: %w", err)
|
||||
}
|
||||
|
||||
ci, err := cid.Parse(c)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("Failed to parse blocks_synced: %w", err)
|
||||
}
|
||||
|
||||
out[ci] = struct{}{}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *Syncer) mostRecentlySyncedBlockHeight() (cid.Cid, int64, error) {
|
||||
rw := s.db.QueryRow(`
|
||||
select blocks_synced.cid, b.height
|
||||
from blocks_synced
|
||||
left join blocks b on blocks_synced.cid = b.cid
|
||||
where processed_at is not null
|
||||
order by height desc
|
||||
limit 1
|
||||
`)
|
||||
|
||||
var c string
|
||||
var h int64
|
||||
if err := rw.Scan(&c, &h); err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return cid.Undef, 0, nil
|
||||
}
|
||||
return cid.Undef, -1, err
|
||||
}
|
||||
|
||||
ci, err := cid.Parse(c)
|
||||
if err != nil {
|
||||
return cid.Undef, -1, err
|
||||
}
|
||||
|
||||
return ci, h, nil
|
||||
}
|
||||
|
||||
func (s *Syncer) storeCirculatingSupply(ctx context.Context, tipset *types.TipSet) error {
|
||||
supply, err := s.node.StateVMCirculatingSupplyInternal(ctx, tipset.Key())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ceInsert := `insert into chain_economics (parent_state_root, circulating_fil, vested_fil, mined_fil, burnt_fil, locked_fil) ` +
|
||||
`values ('%s', '%s', '%s', '%s', '%s', '%s') on conflict on constraint chain_economics_pk do ` +
|
||||
`update set (circulating_fil, vested_fil, mined_fil, burnt_fil, locked_fil) = ('%[2]s', '%[3]s', '%[4]s', '%[5]s', '%[6]s') ` +
|
||||
`where chain_economics.parent_state_root = '%[1]s';`
|
||||
|
||||
if _, err := s.db.Exec(fmt.Sprintf(ceInsert,
|
||||
tipset.ParentState().String(),
|
||||
supply.FilCirculating.String(),
|
||||
supply.FilVested.String(),
|
||||
supply.FilMined.String(),
|
||||
supply.FilBurnt.String(),
|
||||
supply.FilLocked.String(),
|
||||
)); err != nil {
|
||||
return xerrors.Errorf("insert circulating supply for tipset (%s): %w", tipset.Key().String(), err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Syncer) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool, timestamp time.Time) error {
|
||||
s.headerLk.Lock()
|
||||
defer s.headerLk.Unlock()
|
||||
if len(bhs) == 0 {
|
||||
return nil
|
||||
}
|
||||
log.Debugw("Storing Headers", "count", len(bhs))
|
||||
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("begin: %w", err)
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`
|
||||
|
||||
create temp table bc (like block_cids excluding constraints) on commit drop;
|
||||
create temp table de (like drand_entries excluding constraints) on commit drop;
|
||||
create temp table bde (like block_drand_entries excluding constraints) on commit drop;
|
||||
create temp table tbp (like block_parents excluding constraints) on commit drop;
|
||||
create temp table bs (like blocks_synced excluding constraints) on commit drop;
|
||||
create temp table b (like blocks excluding constraints) on commit drop;
|
||||
|
||||
|
||||
`); err != nil {
|
||||
return xerrors.Errorf("prep temp: %w", err)
|
||||
}
|
||||
|
||||
{
|
||||
stmt, err := tx.Prepare(`copy bc (cid) from STDIN`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, bh := range bhs {
|
||||
if _, err := stmt.Exec(bh.Cid().String()); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`insert into block_cids select * from bc on conflict do nothing `); err != nil {
|
||||
return xerrors.Errorf("drand entries put: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
stmt, err := tx.Prepare(`copy de (round, data) from STDIN`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, bh := range bhs {
|
||||
for _, ent := range bh.BeaconEntries {
|
||||
if _, err := stmt.Exec(ent.Round, ent.Data); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`insert into drand_entries select * from de on conflict do nothing `); err != nil {
|
||||
return xerrors.Errorf("drand entries put: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
stmt, err := tx.Prepare(`copy bde (round, block) from STDIN`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, bh := range bhs {
|
||||
for _, ent := range bh.BeaconEntries {
|
||||
if _, err := stmt.Exec(ent.Round, bh.Cid().String()); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`insert into block_drand_entries select * from bde on conflict do nothing `); err != nil {
|
||||
return xerrors.Errorf("block drand entries put: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
stmt, err := tx.Prepare(`copy tbp (block, parent) from STDIN`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, bh := range bhs {
|
||||
for _, parent := range bh.Parents {
|
||||
if _, err := stmt.Exec(bh.Cid().String(), parent.String()); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`insert into block_parents select * from tbp on conflict do nothing `); err != nil {
|
||||
return xerrors.Errorf("parent put: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if sync {
|
||||
|
||||
stmt, err := tx.Prepare(`copy bs (cid, synced_at) from stdin `)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, bh := range bhs {
|
||||
if _, err := stmt.Exec(bh.Cid().String(), timestamp.Unix()); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := stmt.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`insert into blocks_synced select * from bs on conflict do nothing `); err != nil {
|
||||
return xerrors.Errorf("syncd put: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
stmt2, err := tx.Prepare(`copy b (cid, parentWeight, parentStateRoot, height, miner, "timestamp", ticket, election_proof, win_count, parent_base_fee, forksig) from stdin`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, bh := range bhs {
|
||||
var eproof, winCount interface{}
|
||||
if bh.ElectionProof != nil {
|
||||
eproof = bh.ElectionProof.VRFProof
|
||||
winCount = bh.ElectionProof.WinCount
|
||||
}
|
||||
|
||||
if bh.Ticket == nil {
|
||||
log.Warnf("got a block with nil ticket")
|
||||
|
||||
bh.Ticket = &types.Ticket{
|
||||
VRFProof: []byte{},
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := stmt2.Exec(
|
||||
bh.Cid().String(),
|
||||
bh.ParentWeight.String(),
|
||||
bh.ParentStateRoot.String(),
|
||||
bh.Height,
|
||||
bh.Miner.String(),
|
||||
bh.Timestamp,
|
||||
bh.Ticket.VRFProof,
|
||||
eproof,
|
||||
winCount,
|
||||
bh.ParentBaseFee.String(),
|
||||
bh.ForkSignaling); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := stmt2.Close(); err != nil {
|
||||
return xerrors.Errorf("s2 close: %w", err)
|
||||
}
|
||||
|
||||
if _, err := tx.Exec(`insert into blocks select * from b on conflict do nothing `); err != nil {
|
||||
return xerrors.Errorf("blk put: %w", err)
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
@ -1,34 +0,0 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/filecoin-project/go-jsonrpc"
|
||||
"github.com/filecoin-project/lotus/api/client"
|
||||
"github.com/filecoin-project/lotus/api/v0api"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
manet "github.com/multiformats/go-multiaddr/net"
|
||||
)
|
||||
|
||||
func GetFullNodeAPIUsingCredentials(ctx context.Context, listenAddr, token string) (v0api.FullNode, jsonrpc.ClientCloser, error) {
|
||||
parsedAddr, err := ma.NewMultiaddr(listenAddr)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
_, addr, err := manet.DialArgs(parsedAddr)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return client.NewFullNodeRPCV0(ctx, apiURI(addr), apiHeaders(token))
|
||||
}
|
||||
func apiURI(addr string) string {
|
||||
return "ws://" + addr + "/rpc/v0"
|
||||
}
|
||||
func apiHeaders(token string) http.Header {
|
||||
headers := http.Header{}
|
||||
headers.Add("Authorization", "Bearer "+token)
|
||||
return headers
|
||||
}
|
@ -1,51 +0,0 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
|
||||
"github.com/filecoin-project/lotus/api/v0api"
|
||||
)
|
||||
|
||||
// TODO extract this to a common location in lotus and reuse the code
|
||||
|
||||
// APIIpldStore is required for AMT and HAMT access.
|
||||
type APIIpldStore struct {
|
||||
ctx context.Context
|
||||
api v0api.FullNode
|
||||
}
|
||||
|
||||
func NewAPIIpldStore(ctx context.Context, api v0api.FullNode) *APIIpldStore {
|
||||
return &APIIpldStore{
|
||||
ctx: ctx,
|
||||
api: api,
|
||||
}
|
||||
}
|
||||
|
||||
func (ht *APIIpldStore) Context() context.Context {
|
||||
return ht.ctx
|
||||
}
|
||||
|
||||
func (ht *APIIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
|
||||
raw, err := ht.api.ChainReadObj(ctx, c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cu, ok := out.(cbg.CBORUnmarshaler)
|
||||
if ok {
|
||||
if err := cu.UnmarshalCBOR(bytes.NewReader(raw)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Object does not implement CBORUnmarshaler: %T", out)
|
||||
}
|
||||
|
||||
func (ht *APIIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
|
||||
return cid.Undef, fmt.Errorf("Put is not implemented on APIIpldStore")
|
||||
}
|
@ -1,134 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
rice "github.com/GeertJohan/go.rice"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/ipld/go-car"
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
)
|
||||
|
||||
var topic = "/fil/headnotifs/"
|
||||
|
||||
func init() {
|
||||
genBytes := build.MaybeGenesis()
|
||||
if len(genBytes) == 0 {
|
||||
topic = ""
|
||||
return
|
||||
}
|
||||
|
||||
bs := blockstore.NewMemory()
|
||||
|
||||
c, err := car.LoadCar(bs, bytes.NewReader(genBytes))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if len(c.Roots) != 1 {
|
||||
panic("expected genesis file to have one root")
|
||||
}
|
||||
|
||||
fmt.Printf("Genesis CID: %s\n", c.Roots[0])
|
||||
topic = topic + c.Roots[0].String()
|
||||
}
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
WriteBufferSize: 1024,
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
return true
|
||||
},
|
||||
}
|
||||
|
||||
func main() {
|
||||
if topic == "" {
|
||||
fmt.Println("FATAL: No genesis found")
|
||||
return
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
host, err := libp2p.New(
|
||||
ctx,
|
||||
libp2p.Defaults,
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ps, err := pubsub.NewGossipSub(ctx, host)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
pi, err := build.BuiltinBootstrap()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := host.Connect(ctx, pi[0]); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
http.HandleFunc("/sub", handler(ps))
|
||||
http.Handle("/", http.FileServer(rice.MustFindBox("townhall/build").HTTPBox()))
|
||||
|
||||
fmt.Println("listening on http://localhost:2975")
|
||||
|
||||
if err := http.ListenAndServe("0.0.0.0:2975", nil); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
type update struct {
|
||||
From peer.ID
|
||||
Update json.RawMessage
|
||||
Time uint64
|
||||
}
|
||||
|
||||
func handler(ps *pubsub.PubSub) func(w http.ResponseWriter, r *http.Request) {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
if r.Header.Get("Sec-WebSocket-Protocol") != "" {
|
||||
w.Header().Set("Sec-WebSocket-Protocol", r.Header.Get("Sec-WebSocket-Protocol"))
|
||||
}
|
||||
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
sub, err := ps.Subscribe(topic) //nolint
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer sub.Cancel() //nolint:errcheck
|
||||
|
||||
fmt.Println("new conn")
|
||||
|
||||
for {
|
||||
msg, err := sub.Next(r.Context())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
//fmt.Println(msg)
|
||||
|
||||
if err := conn.WriteJSON(update{
|
||||
From: peer.ID(msg.From),
|
||||
Update: msg.Data,
|
||||
Time: uint64(time.Now().UnixNano() / 1000_000),
|
||||
}); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
23
cmd/lotus-townhall/townhall/.gitignore
vendored
23
cmd/lotus-townhall/townhall/.gitignore
vendored
@ -1,23 +0,0 @@
|
||||
# See https://help.github.com/articles/ignoring-files/ for more about ignoring files.
|
||||
|
||||
# dependencies
|
||||
/node_modules
|
||||
/.pnp
|
||||
.pnp.js
|
||||
|
||||
# testing
|
||||
/coverage
|
||||
|
||||
# production
|
||||
/build
|
||||
|
||||
# misc
|
||||
.DS_Store
|
||||
.env.local
|
||||
.env.development.local
|
||||
.env.test.local
|
||||
.env.production.local
|
||||
|
||||
npm-debug.log*
|
||||
yarn-debug.log*
|
||||
yarn-error.log*
|
@ -1,31 +0,0 @@
|
||||
{
|
||||
"name": "townhall",
|
||||
"version": "0.1.0",
|
||||
"private": true,
|
||||
"dependencies": {
|
||||
"react": "^16.10.2",
|
||||
"react-dom": "^16.10.2",
|
||||
"react-scripts": "3.2.0"
|
||||
},
|
||||
"scripts": {
|
||||
"start": "react-scripts start",
|
||||
"build": "react-scripts build",
|
||||
"test": "react-scripts test",
|
||||
"eject": "react-scripts eject"
|
||||
},
|
||||
"eslintConfig": {
|
||||
"extends": "react-app"
|
||||
},
|
||||
"browserslist": {
|
||||
"production": [
|
||||
">0.2%",
|
||||
"not dead",
|
||||
"not op_mini all"
|
||||
],
|
||||
"development": [
|
||||
"last 1 chrome version",
|
||||
"last 1 firefox version",
|
||||
"last 1 safari version"
|
||||
]
|
||||
}
|
||||
}
|
@ -1,13 +0,0 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="utf-8" />
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1" />
|
||||
<meta name="theme-color" content="#1a1a1a" />
|
||||
<title>Lotus TownHall</title>
|
||||
</head>
|
||||
<body>
|
||||
<noscript>You need to enable JavaScript to run this app.</noscript>
|
||||
<div id="root"></div>
|
||||
</body>
|
||||
</html>
|
@ -1,2 +0,0 @@
|
||||
# https://www.robotstxt.org/robotstxt.html
|
||||
User-agent: *
|
@ -1 +0,0 @@
|
||||
|
@ -1,87 +0,0 @@
|
||||
import React from 'react';
|
||||
import './App.css';
|
||||
|
||||
function colForH(besth, height) {
|
||||
const diff = besth - height
|
||||
if(diff === 0) return '#6f6'
|
||||
if(diff === 1) return '#df4'
|
||||
if(diff < 4) return '#ff0'
|
||||
if(diff < 10) return '#f60'
|
||||
return '#f00'
|
||||
}
|
||||
|
||||
function colLag(lag) {
|
||||
if(lag < 100) return '#6f6'
|
||||
if(lag < 400) return '#df4'
|
||||
if(lag < 1000) return '#ff0'
|
||||
if(lag < 4000) return '#f60'
|
||||
return '#f00'
|
||||
}
|
||||
|
||||
function lagCol(lag, good) {
|
||||
return <span>
|
||||
<span style={{color: colLag(lag)}}>{lag}</span>
|
||||
<span style={{color: good ? '#f0f0f0' : '#f60'}}>ms</span>
|
||||
</span>
|
||||
}
|
||||
|
||||
class App extends React.Component {
|
||||
constructor(props) {
|
||||
super(props);
|
||||
|
||||
let ws = new WebSocket("ws://" + window.location.host + "/sub")
|
||||
//let ws = new WebSocket("ws://127.0.0.1:2975/sub")
|
||||
|
||||
ws.onmessage = (ev) => {
|
||||
console.log(ev)
|
||||
let update = JSON.parse(ev.data)
|
||||
|
||||
update.Update.Weight = Number(update.Update.Weight)
|
||||
|
||||
let wdiff = update.Update.Weight - (this.state[update.From] || {Weight: update.Update.Weight}).Weight
|
||||
wdiff = <span style={{color: wdiff < 0 ? '#f00' : '#f0f0f0'}}>{wdiff}</span>
|
||||
|
||||
let utDiff = update.Time - (this.state[update.From] || {utime: update.Time}).utime
|
||||
utDiff = <span style={{color: utDiff < 0 ? '#f00' : '#f0f0f0'}}>{utDiff}ms</span>
|
||||
|
||||
this.setState( prev => ({
|
||||
...prev, [update.From]: {...update.Update, utime: update.Time, wdiff: wdiff, utDiff: utDiff},
|
||||
}))
|
||||
}
|
||||
|
||||
ws.onclose = () => {
|
||||
this.setState({disconnected: true})
|
||||
}
|
||||
|
||||
this.state = {}
|
||||
}
|
||||
|
||||
render() {
|
||||
if(this.state.disconnected) {
|
||||
return <span>Error: disconnected</span>
|
||||
}
|
||||
|
||||
let besth = Object.keys(this.state).map(k => this.state[k]).reduce((p, n) => p > n.Height ? p : n.Height, -1)
|
||||
let bestw = Object.keys(this.state).map(k => this.state[k]).reduce((p, n) => p > n.Weight ? p : n.Weight, -1)
|
||||
|
||||
return <table>
|
||||
<tr><td>PeerID</td><td>Nickname</td><td>Lag</td><td>Weight(best, prev)</td><td>Height</td><td>Blocks</td></tr>
|
||||
{Object.keys(this.state).map(k => [k, this.state[k]]).map(([k, v]) => {
|
||||
let mnrs = v.Blocks.map(b => <td> m:{b.Miner}({lagCol(v.Time ? v.Time - (b.Timestamp*1000) : v.utime - (b.Timestamp*1000), v.Time)})</td>)
|
||||
let l = [
|
||||
<td>{k}</td>,
|
||||
<td>{v.NodeName}</td>,
|
||||
<td>{v.Time ? lagCol(v.utime - v.Time, true) : ""}(Δ{v.utDiff})</td>,
|
||||
<td style={{color: bestw !== v.Weight ? '#f00' : '#afa'}}>{v.Weight}({bestw - v.Weight}, {v.wdiff})</td>,
|
||||
<td style={{color: colForH(besth, v.Height)}}>{v.Height}({besth - v.Height})</td>,
|
||||
...mnrs,
|
||||
]
|
||||
|
||||
l = <tr>{l}</tr>
|
||||
return l
|
||||
})
|
||||
}
|
||||
</table>
|
||||
}
|
||||
}
|
||||
export default App;
|
@ -1,9 +0,0 @@
|
||||
import React from 'react';
|
||||
import ReactDOM from 'react-dom';
|
||||
import App from './App';
|
||||
|
||||
it('renders without crashing', () => {
|
||||
const div = document.createElement('div');
|
||||
ReactDOM.render(<App />, div);
|
||||
ReactDOM.unmountComponentAtNode(div);
|
||||
});
|
@ -1,6 +0,0 @@
|
||||
body {
|
||||
margin: 0;
|
||||
font-family: monospace;
|
||||
background: #1f1f1f;
|
||||
color: #f0f0f0;
|
||||
}
|
@ -1,6 +0,0 @@
|
||||
import React from 'react';
|
||||
import ReactDOM from 'react-dom';
|
||||
import './index.css';
|
||||
import App from './App';
|
||||
|
||||
ReactDOM.render(<App />, document.getElementById('root'));
|
1
go.mod
1
go.mod
@ -100,7 +100,6 @@ require (
|
||||
github.com/ipld/go-car v0.1.1-0.20201119040415-11b6074b6d4d
|
||||
github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018
|
||||
github.com/kelseyhightower/envconfig v1.4.0
|
||||
github.com/lib/pq v1.7.0
|
||||
github.com/libp2p/go-buffer-pool v0.0.2
|
||||
github.com/libp2p/go-eventbus v0.2.1
|
||||
github.com/libp2p/go-libp2p v0.14.2
|
||||
|
2
go.sum
2
go.sum
@ -835,8 +835,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/lib/pq v1.7.0 h1:h93mCPfUSkaul3Ka/VG8uZdmW1uMHDGxzu0NWHuJmHY=
|
||||
github.com/lib/pq v1.7.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||
github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ=
|
||||
github.com/libp2p/go-addr-util v0.0.2 h1:7cWK5cdA5x72jX0g8iLrQWm5TRJZ6CzGdPEhWj7plWU=
|
||||
github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E=
|
||||
|
@ -17,7 +17,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/market"
|
||||
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||
"github.com/filecoin-project/lotus/chain/messagesigner"
|
||||
"github.com/filecoin-project/lotus/chain/metrics"
|
||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||
rpcstmgr "github.com/filecoin-project/lotus/chain/stmgr/rpc"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
@ -174,10 +173,6 @@ func ConfigFullNode(c interface{}) Option {
|
||||
),
|
||||
Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfers)),
|
||||
|
||||
If(cfg.Metrics.HeadNotifs,
|
||||
Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)),
|
||||
),
|
||||
|
||||
If(cfg.Wallet.RemoteBackend != "",
|
||||
Override(new(*remotewallet.RemoteWallet), remotewallet.SetupRemoteWallet(cfg.Wallet.RemoteBackend)),
|
||||
),
|
||||
|
@ -245,12 +245,6 @@ see https://docs.filecoin.io/mine/lotus/miner-configuration/#using-filters-for-f
|
||||
|
||||
Comment: ``,
|
||||
},
|
||||
{
|
||||
Name: "Metrics",
|
||||
Type: "Metrics",
|
||||
|
||||
Comment: ``,
|
||||
},
|
||||
{
|
||||
Name: "Wallet",
|
||||
Type: "Wallet",
|
||||
@ -324,20 +318,6 @@ Format: multiaddress`,
|
||||
Comment: ``,
|
||||
},
|
||||
},
|
||||
"Metrics": []DocField{
|
||||
{
|
||||
Name: "Nickname",
|
||||
Type: "string",
|
||||
|
||||
Comment: ``,
|
||||
},
|
||||
{
|
||||
Name: "HeadNotifs",
|
||||
Type: "bool",
|
||||
|
||||
Comment: ``,
|
||||
},
|
||||
},
|
||||
"MinerAddressConfig": []DocField{
|
||||
{
|
||||
Name: "PreCommitControl",
|
||||
|
@ -23,7 +23,6 @@ type Common struct {
|
||||
type FullNode struct {
|
||||
Common
|
||||
Client Client
|
||||
Metrics Metrics
|
||||
Wallet Wallet
|
||||
Fees FeeConfig
|
||||
Chainstore Chainstore
|
||||
@ -298,12 +297,6 @@ type Splitstore struct {
|
||||
}
|
||||
|
||||
// // Full Node
|
||||
|
||||
type Metrics struct {
|
||||
Nickname string
|
||||
HeadNotifs bool
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
UseIpfs bool
|
||||
IpfsOnlineMode bool
|
||||
|
@ -1,15 +0,0 @@
|
||||
[Unit]
|
||||
Description=Chainwatch
|
||||
After=lotus-daemon.service
|
||||
Requires=lotus-daemon.service
|
||||
|
||||
[Service]
|
||||
Environment=GOLOG_FILE="/var/log/lotus/chainwatch.log"
|
||||
Environment=GOLOG_LOG_FMT="json"
|
||||
Environment=LOTUS_DB=""
|
||||
Environment=LOTUS_PATH="%h/.lotus"
|
||||
EnvironmentFile=-/etc/lotus/chainwatch.env
|
||||
ExecStart=/usr/local/bin/lotus-chainwatch run
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
Loading…
Reference in New Issue
Block a user