From f8afb699cddb455075c4f90c0512d92723bfcd32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 28 Jul 2021 17:29:17 +0200 Subject: [PATCH 1/4] make: Allow setting Go compiler with GOCC --- Makefile | 62 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/Makefile b/Makefile index 77fd38b9e..77040d1b6 100644 --- a/Makefile +++ b/Makefile @@ -17,6 +17,8 @@ MODULES:= CLEAN:= BINS:= +GOCC?=go + ldflags=-X=github.com/filecoin-project/lotus/build.CurrentCommit=+git.$(subst -,.,$(shell git describe --always --match=NeVeRmAtCh --dirty 2>/dev/null || git rev-parse --short HEAD 2>/dev/null)) ifneq ($(strip $(LDFLAGS)),) ldflags+=-extldflags=$(LDFLAGS) @@ -85,32 +87,32 @@ interopnet: build-devnets lotus: $(BUILD_DEPS) rm -f lotus - go build $(GOFLAGS) -o lotus ./cmd/lotus + $(GOCC) build $(GOFLAGS) -o lotus ./cmd/lotus .PHONY: lotus BINS+=lotus lotus-miner: $(BUILD_DEPS) rm -f lotus-miner - go build $(GOFLAGS) -o lotus-miner ./cmd/lotus-miner + $(GOCC) build $(GOFLAGS) -o lotus-miner ./cmd/lotus-miner .PHONY: lotus-miner BINS+=lotus-miner lotus-worker: $(BUILD_DEPS) rm -f lotus-worker - go build $(GOFLAGS) -o lotus-worker ./cmd/lotus-seal-worker + $(GOCC) build $(GOFLAGS) -o lotus-worker ./cmd/lotus-seal-worker .PHONY: lotus-worker BINS+=lotus-worker lotus-shed: $(BUILD_DEPS) rm -f lotus-shed - go build $(GOFLAGS) -o lotus-shed ./cmd/lotus-shed + $(GOCC) build $(GOFLAGS) -o lotus-shed ./cmd/lotus-shed .PHONY: lotus-shed BINS+=lotus-shed lotus-gateway: $(BUILD_DEPS) rm -f lotus-gateway - go build $(GOFLAGS) -o lotus-gateway ./cmd/lotus-gateway + $(GOCC) build $(GOFLAGS) -o lotus-gateway ./cmd/lotus-gateway .PHONY: lotus-gateway BINS+=lotus-gateway @@ -138,19 +140,19 @@ install-app: lotus-seed: $(BUILD_DEPS) rm -f lotus-seed - go build $(GOFLAGS) -o lotus-seed ./cmd/lotus-seed + $(GOCC) build $(GOFLAGS) -o lotus-seed ./cmd/lotus-seed .PHONY: lotus-seed BINS+=lotus-seed benchmarks: - go run github.com/whyrusleeping/bencher ./... > bench.json + $(GOCC) run github.com/whyrusleeping/bencher ./... > bench.json @echo Submitting results @curl -X POST 'http://benchmark.kittyhawk.wtf/benchmark' -d '@bench.json' -u "${benchmark_http_cred}" .PHONY: benchmarks lotus-pond: 2k - go build -o lotus-pond ./lotuspond + $(GOCC) build -o lotus-pond ./lotuspond .PHONY: lotus-pond BINS+=lotus-pond @@ -163,7 +165,7 @@ lotus-pond-app: lotus-pond-front lotus-pond lotus-townhall: rm -f lotus-townhall - go build -o lotus-townhall ./cmd/lotus-townhall + $(GOCC) build -o lotus-townhall ./cmd/lotus-townhall .PHONY: lotus-townhall BINS+=lotus-townhall @@ -176,61 +178,61 @@ lotus-townhall-app: lotus-touch lotus-townhall-front lotus-fountain: rm -f lotus-fountain - go build -o lotus-fountain ./cmd/lotus-fountain + $(GOCC) build -o lotus-fountain ./cmd/lotus-fountain .PHONY: lotus-fountain BINS+=lotus-fountain lotus-chainwatch: rm -f lotus-chainwatch - go build $(GOFLAGS) -o lotus-chainwatch ./cmd/lotus-chainwatch + $(GOCC) build $(GOFLAGS) -o lotus-chainwatch ./cmd/lotus-chainwatch .PHONY: lotus-chainwatch BINS+=lotus-chainwatch lotus-bench: rm -f lotus-bench - go build -o lotus-bench ./cmd/lotus-bench + $(GOCC) build -o lotus-bench ./cmd/lotus-bench .PHONY: lotus-bench BINS+=lotus-bench lotus-stats: rm -f lotus-stats - go build $(GOFLAGS) -o lotus-stats ./cmd/lotus-stats + $(GOCC) build $(GOFLAGS) -o lotus-stats ./cmd/lotus-stats .PHONY: lotus-stats BINS+=lotus-stats lotus-pcr: rm -f lotus-pcr - go build $(GOFLAGS) -o lotus-pcr ./cmd/lotus-pcr + $(GOCC) build $(GOFLAGS) -o lotus-pcr ./cmd/lotus-pcr .PHONY: lotus-pcr BINS+=lotus-pcr lotus-health: rm -f lotus-health - go build -o lotus-health ./cmd/lotus-health + $(GOCC) build -o lotus-health ./cmd/lotus-health .PHONY: lotus-health BINS+=lotus-health lotus-wallet: rm -f lotus-wallet - go build -o lotus-wallet ./cmd/lotus-wallet + $(GOCC) build -o lotus-wallet ./cmd/lotus-wallet .PHONY: lotus-wallet BINS+=lotus-wallet lotus-keygen: rm -f lotus-keygen - go build -o lotus-keygen ./cmd/lotus-keygen + $(GOCC) build -o lotus-keygen ./cmd/lotus-keygen .PHONY: lotus-keygen BINS+=lotus-keygen testground: - go build -tags testground -o /dev/null ./cmd/lotus + $(GOCC) build -tags testground -o /dev/null ./cmd/lotus .PHONY: testground BINS+=testground tvx: rm -f tvx - go build -o tvx ./cmd/tvx + $(GOCC) build -o tvx ./cmd/tvx .PHONY: tvx BINS+=tvx @@ -239,7 +241,7 @@ install-chainwatch: lotus-chainwatch lotus-sim: $(BUILD_DEPS) rm -f lotus-sim - go build $(GOFLAGS) -o lotus-sim ./cmd/lotus-sim + $(GOCC) build $(GOFLAGS) -o lotus-sim ./cmd/lotus-sim .PHONY: lotus-sim BINS+=lotus-sim @@ -319,25 +321,25 @@ dist-clean: .PHONY: dist-clean type-gen: api-gen - go run ./gen/main.go - go generate -x ./... + $(GOCC) run ./gen/main.go + $(GOCC) generate -x ./... goimports -w api/ method-gen: api-gen - (cd ./lotuspond/front/src/chain && go run ./methodgen.go) + (cd ./lotuspond/front/src/chain && $(GOCC) run ./methodgen.go) actors-gen: - go run ./chain/actors/agen - go fmt ./... + $(GOCC) run ./chain/actors/agen + $(GOCC) fmt ./... api-gen: - go run ./gen/api + $(GOCC) run ./gen/api goimports -w api goimports -w api .PHONY: api-gen cfgdoc-gen: - go run ./node/config/cfgdocgen > ./node/config/doc_gen.go + $(GOCC) run ./node/config/cfgdocgen > ./node/config/doc_gen.go appimage: lotus rm -rf appimage-builder-cache || true @@ -351,9 +353,9 @@ appimage: lotus docsgen: docsgen-md docsgen-openrpc docsgen-md-bin: api-gen actors-gen - go build $(GOFLAGS) -o docgen-md ./api/docgen/cmd + $(GOCC) build $(GOFLAGS) -o docgen-md ./api/docgen/cmd docsgen-openrpc-bin: api-gen actors-gen - go build $(GOFLAGS) -o docgen-openrpc ./api/docgen-openrpc/cmd + $(GOCC) build $(GOFLAGS) -o docgen-openrpc ./api/docgen-openrpc/cmd docsgen-md: docsgen-md-full docsgen-md-storage docsgen-md-worker @@ -393,4 +395,4 @@ print-%: @echo $*=$($*) circleci: - go generate -x ./.circleci \ No newline at end of file + $(GOCC) generate -x ./.circleci \ No newline at end of file From 389f71251c770030a59e22400eb97202eaa9c835 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 28 Jul 2021 17:50:29 +0200 Subject: [PATCH 2/4] Remove townhall --- Makefile | 13 -- chain/metrics/consensus.go | 129 ----------------- cmd/lotus-townhall/main.go | 134 ------------------ cmd/lotus-townhall/townhall/.gitignore | 23 --- cmd/lotus-townhall/townhall/package.json | 31 ---- cmd/lotus-townhall/townhall/public/index.html | 13 -- cmd/lotus-townhall/townhall/public/robots.txt | 2 - cmd/lotus-townhall/townhall/src/App.css | 1 - cmd/lotus-townhall/townhall/src/App.js | 87 ------------ cmd/lotus-townhall/townhall/src/App.test.js | 9 -- cmd/lotus-townhall/townhall/src/index.css | 6 - cmd/lotus-townhall/townhall/src/index.js | 6 - node/builder_chain.go | 5 - node/config/doc_gen.go | 20 --- node/config/types.go | 7 - 15 files changed, 486 deletions(-) delete mode 100644 chain/metrics/consensus.go delete mode 100644 cmd/lotus-townhall/main.go delete mode 100644 cmd/lotus-townhall/townhall/.gitignore delete mode 100644 cmd/lotus-townhall/townhall/package.json delete mode 100644 cmd/lotus-townhall/townhall/public/index.html delete mode 100644 cmd/lotus-townhall/townhall/public/robots.txt delete mode 100644 cmd/lotus-townhall/townhall/src/App.css delete mode 100644 cmd/lotus-townhall/townhall/src/App.js delete mode 100644 cmd/lotus-townhall/townhall/src/App.test.js delete mode 100644 cmd/lotus-townhall/townhall/src/index.css delete mode 100644 cmd/lotus-townhall/townhall/src/index.js diff --git a/Makefile b/Makefile index 77040d1b6..4aac5f21a 100644 --- a/Makefile +++ b/Makefile @@ -163,19 +163,6 @@ lotus-pond-front: lotus-pond-app: lotus-pond-front lotus-pond .PHONY: lotus-pond-app -lotus-townhall: - rm -f lotus-townhall - $(GOCC) build -o lotus-townhall ./cmd/lotus-townhall -.PHONY: lotus-townhall -BINS+=lotus-townhall - -lotus-townhall-front: - (cd ./cmd/lotus-townhall/townhall && npm i && npm run build) -.PHONY: lotus-townhall-front - -lotus-townhall-app: lotus-touch lotus-townhall-front -.PHONY: lotus-townhall-app - lotus-fountain: rm -f lotus-fountain $(GOCC) build -o lotus-fountain ./cmd/lotus-fountain diff --git a/chain/metrics/consensus.go b/chain/metrics/consensus.go deleted file mode 100644 index c3c4a10d1..000000000 --- a/chain/metrics/consensus.go +++ /dev/null @@ -1,129 +0,0 @@ -package metrics - -import ( - "context" - "encoding/json" - - "github.com/filecoin-project/go-state-types/abi" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - pubsub "github.com/libp2p/go-libp2p-pubsub" - "go.uber.org/fx" - - "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/node/impl/full" - "github.com/filecoin-project/lotus/node/modules/helpers" -) - -var log = logging.Logger("metrics") - -const baseTopic = "/fil/headnotifs/" - -type Update struct { - Type string -} - -func SendHeadNotifs(nickname string) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, chain full.ChainAPI) error { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, chain full.ChainAPI) error { - ctx := helpers.LifecycleCtx(mctx, lc) - - lc.Append(fx.Hook{ - OnStart: func(_ context.Context) error { - gen, err := chain.Chain.GetGenesis() - if err != nil { - return err - } - - topic := baseTopic + gen.Cid().String() - - go func() { - if err := sendHeadNotifs(ctx, ps, topic, chain, nickname); err != nil { - log.Error("consensus metrics error", err) - return - } - }() - go func() { - sub, err := ps.Subscribe(topic) //nolint - if err != nil { - return - } - defer sub.Cancel() - - for { - if _, err := sub.Next(ctx); err != nil { - return - } - } - - }() - return nil - }, - }) - - return nil - } -} - -type message struct { - // TipSet - Cids []cid.Cid - Blocks []*types.BlockHeader - Height abi.ChainEpoch - Weight types.BigInt - Time uint64 - Nonce uint64 - - // Meta - - NodeName string -} - -func sendHeadNotifs(ctx context.Context, ps *pubsub.PubSub, topic string, chain full.ChainAPI, nickname string) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - notifs, err := chain.ChainNotify(ctx) - if err != nil { - return err - } - - // using unix nano time makes very sure we pick a nonce higher than previous restart - nonce := uint64(build.Clock.Now().UnixNano()) - - for { - select { - case notif := <-notifs: - n := notif[len(notif)-1] - - w, err := chain.ChainTipSetWeight(ctx, n.Val.Key()) - if err != nil { - return err - } - - m := message{ - Cids: n.Val.Cids(), - Blocks: n.Val.Blocks(), - Height: n.Val.Height(), - Weight: w, - NodeName: nickname, - Time: uint64(build.Clock.Now().UnixNano() / 1000_000), - Nonce: nonce, - } - - b, err := json.Marshal(m) - if err != nil { - return err - } - - //nolint - if err := ps.Publish(topic, b); err != nil { - return err - } - case <-ctx.Done(): - return nil - } - - nonce++ - } -} diff --git a/cmd/lotus-townhall/main.go b/cmd/lotus-townhall/main.go deleted file mode 100644 index 1e0460dee..000000000 --- a/cmd/lotus-townhall/main.go +++ /dev/null @@ -1,134 +0,0 @@ -package main - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "net/http" - "time" - - rice "github.com/GeertJohan/go.rice" - "github.com/gorilla/websocket" - "github.com/ipld/go-car" - "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-core/peer" - pubsub "github.com/libp2p/go-libp2p-pubsub" - - "github.com/filecoin-project/lotus/blockstore" - "github.com/filecoin-project/lotus/build" -) - -var topic = "/fil/headnotifs/" - -func init() { - genBytes := build.MaybeGenesis() - if len(genBytes) == 0 { - topic = "" - return - } - - bs := blockstore.NewMemory() - - c, err := car.LoadCar(bs, bytes.NewReader(genBytes)) - if err != nil { - panic(err) - } - if len(c.Roots) != 1 { - panic("expected genesis file to have one root") - } - - fmt.Printf("Genesis CID: %s\n", c.Roots[0]) - topic = topic + c.Roots[0].String() -} - -var upgrader = websocket.Upgrader{ - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - return true - }, -} - -func main() { - if topic == "" { - fmt.Println("FATAL: No genesis found") - return - } - - ctx := context.Background() - - host, err := libp2p.New( - ctx, - libp2p.Defaults, - ) - if err != nil { - panic(err) - } - ps, err := pubsub.NewGossipSub(ctx, host) - if err != nil { - panic(err) - } - - pi, err := build.BuiltinBootstrap() - if err != nil { - panic(err) - } - - if err := host.Connect(ctx, pi[0]); err != nil { - panic(err) - } - - http.HandleFunc("/sub", handler(ps)) - http.Handle("/", http.FileServer(rice.MustFindBox("townhall/build").HTTPBox())) - - fmt.Println("listening on http://localhost:2975") - - if err := http.ListenAndServe("0.0.0.0:2975", nil); err != nil { - panic(err) - } -} - -type update struct { - From peer.ID - Update json.RawMessage - Time uint64 -} - -func handler(ps *pubsub.PubSub) func(w http.ResponseWriter, r *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - if r.Header.Get("Sec-WebSocket-Protocol") != "" { - w.Header().Set("Sec-WebSocket-Protocol", r.Header.Get("Sec-WebSocket-Protocol")) - } - - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - return - } - - sub, err := ps.Subscribe(topic) //nolint - if err != nil { - return - } - defer sub.Cancel() //nolint:errcheck - - fmt.Println("new conn") - - for { - msg, err := sub.Next(r.Context()) - if err != nil { - return - } - - //fmt.Println(msg) - - if err := conn.WriteJSON(update{ - From: peer.ID(msg.From), - Update: msg.Data, - Time: uint64(time.Now().UnixNano() / 1000_000), - }); err != nil { - return - } - } - } -} diff --git a/cmd/lotus-townhall/townhall/.gitignore b/cmd/lotus-townhall/townhall/.gitignore deleted file mode 100644 index 4d29575de..000000000 --- a/cmd/lotus-townhall/townhall/.gitignore +++ /dev/null @@ -1,23 +0,0 @@ -# See https://help.github.com/articles/ignoring-files/ for more about ignoring files. - -# dependencies -/node_modules -/.pnp -.pnp.js - -# testing -/coverage - -# production -/build - -# misc -.DS_Store -.env.local -.env.development.local -.env.test.local -.env.production.local - -npm-debug.log* -yarn-debug.log* -yarn-error.log* diff --git a/cmd/lotus-townhall/townhall/package.json b/cmd/lotus-townhall/townhall/package.json deleted file mode 100644 index 5a8167622..000000000 --- a/cmd/lotus-townhall/townhall/package.json +++ /dev/null @@ -1,31 +0,0 @@ -{ - "name": "townhall", - "version": "0.1.0", - "private": true, - "dependencies": { - "react": "^16.10.2", - "react-dom": "^16.10.2", - "react-scripts": "3.2.0" - }, - "scripts": { - "start": "react-scripts start", - "build": "react-scripts build", - "test": "react-scripts test", - "eject": "react-scripts eject" - }, - "eslintConfig": { - "extends": "react-app" - }, - "browserslist": { - "production": [ - ">0.2%", - "not dead", - "not op_mini all" - ], - "development": [ - "last 1 chrome version", - "last 1 firefox version", - "last 1 safari version" - ] - } -} diff --git a/cmd/lotus-townhall/townhall/public/index.html b/cmd/lotus-townhall/townhall/public/index.html deleted file mode 100644 index 38af10597..000000000 --- a/cmd/lotus-townhall/townhall/public/index.html +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - Lotus TownHall - - - -
- - diff --git a/cmd/lotus-townhall/townhall/public/robots.txt b/cmd/lotus-townhall/townhall/public/robots.txt deleted file mode 100644 index 01b0f9a10..000000000 --- a/cmd/lotus-townhall/townhall/public/robots.txt +++ /dev/null @@ -1,2 +0,0 @@ -# https://www.robotstxt.org/robotstxt.html -User-agent: * diff --git a/cmd/lotus-townhall/townhall/src/App.css b/cmd/lotus-townhall/townhall/src/App.css deleted file mode 100644 index 8b1378917..000000000 --- a/cmd/lotus-townhall/townhall/src/App.css +++ /dev/null @@ -1 +0,0 @@ - diff --git a/cmd/lotus-townhall/townhall/src/App.js b/cmd/lotus-townhall/townhall/src/App.js deleted file mode 100644 index 2f216f5da..000000000 --- a/cmd/lotus-townhall/townhall/src/App.js +++ /dev/null @@ -1,87 +0,0 @@ -import React from 'react'; -import './App.css'; - -function colForH(besth, height) { - const diff = besth - height - if(diff === 0) return '#6f6' - if(diff === 1) return '#df4' - if(diff < 4) return '#ff0' - if(diff < 10) return '#f60' - return '#f00' -} - -function colLag(lag) { - if(lag < 100) return '#6f6' - if(lag < 400) return '#df4' - if(lag < 1000) return '#ff0' - if(lag < 4000) return '#f60' - return '#f00' -} - -function lagCol(lag, good) { - return - {lag} - ms - -} - -class App extends React.Component { - constructor(props) { - super(props); - - let ws = new WebSocket("ws://" + window.location.host + "/sub") - //let ws = new WebSocket("ws://127.0.0.1:2975/sub") - - ws.onmessage = (ev) => { - console.log(ev) - let update = JSON.parse(ev.data) - - update.Update.Weight = Number(update.Update.Weight) - - let wdiff = update.Update.Weight - (this.state[update.From] || {Weight: update.Update.Weight}).Weight - wdiff = {wdiff} - - let utDiff = update.Time - (this.state[update.From] || {utime: update.Time}).utime - utDiff = {utDiff}ms - - this.setState( prev => ({ - ...prev, [update.From]: {...update.Update, utime: update.Time, wdiff: wdiff, utDiff: utDiff}, - })) - } - - ws.onclose = () => { - this.setState({disconnected: true}) - } - - this.state = {} - } - - render() { - if(this.state.disconnected) { - return Error: disconnected - } - - let besth = Object.keys(this.state).map(k => this.state[k]).reduce((p, n) => p > n.Height ? p : n.Height, -1) - let bestw = Object.keys(this.state).map(k => this.state[k]).reduce((p, n) => p > n.Weight ? p : n.Weight, -1) - - return - - {Object.keys(this.state).map(k => [k, this.state[k]]).map(([k, v]) => { - let mnrs = v.Blocks.map(b => ) - let l = [ - , - , - , - , - , - ...mnrs, - ] - - l = {l} - return l - }) - } -
PeerIDNicknameLagWeight(best, prev)HeightBlocks
 m:{b.Miner}({lagCol(v.Time ? v.Time - (b.Timestamp*1000) : v.utime - (b.Timestamp*1000), v.Time)}){k}{v.NodeName}{v.Time ? lagCol(v.utime - v.Time, true) : ""}(Δ{v.utDiff}){v.Weight}({bestw - v.Weight}, {v.wdiff}){v.Height}({besth - v.Height})
- } -} -export default App; diff --git a/cmd/lotus-townhall/townhall/src/App.test.js b/cmd/lotus-townhall/townhall/src/App.test.js deleted file mode 100644 index a754b201b..000000000 --- a/cmd/lotus-townhall/townhall/src/App.test.js +++ /dev/null @@ -1,9 +0,0 @@ -import React from 'react'; -import ReactDOM from 'react-dom'; -import App from './App'; - -it('renders without crashing', () => { - const div = document.createElement('div'); - ReactDOM.render(, div); - ReactDOM.unmountComponentAtNode(div); -}); diff --git a/cmd/lotus-townhall/townhall/src/index.css b/cmd/lotus-townhall/townhall/src/index.css deleted file mode 100644 index fb0d9d10e..000000000 --- a/cmd/lotus-townhall/townhall/src/index.css +++ /dev/null @@ -1,6 +0,0 @@ -body { - margin: 0; - font-family: monospace; - background: #1f1f1f; - color: #f0f0f0; -} diff --git a/cmd/lotus-townhall/townhall/src/index.js b/cmd/lotus-townhall/townhall/src/index.js deleted file mode 100644 index 395b74997..000000000 --- a/cmd/lotus-townhall/townhall/src/index.js +++ /dev/null @@ -1,6 +0,0 @@ -import React from 'react'; -import ReactDOM from 'react-dom'; -import './index.css'; -import App from './App'; - -ReactDOM.render(, document.getElementById('root')); diff --git a/node/builder_chain.go b/node/builder_chain.go index 1447a4df7..4d9294972 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -17,7 +17,6 @@ import ( "github.com/filecoin-project/lotus/chain/market" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/messagesigner" - "github.com/filecoin-project/lotus/chain/metrics" "github.com/filecoin-project/lotus/chain/stmgr" rpcstmgr "github.com/filecoin-project/lotus/chain/stmgr/rpc" "github.com/filecoin-project/lotus/chain/store" @@ -174,10 +173,6 @@ func ConfigFullNode(c interface{}) Option { ), Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfers)), - If(cfg.Metrics.HeadNotifs, - Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)), - ), - If(cfg.Wallet.RemoteBackend != "", Override(new(*remotewallet.RemoteWallet), remotewallet.SetupRemoteWallet(cfg.Wallet.RemoteBackend)), ), diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 5d4a91d5f..430abebbb 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -245,12 +245,6 @@ see https://docs.filecoin.io/mine/lotus/miner-configuration/#using-filters-for-f Comment: ``, }, - { - Name: "Metrics", - Type: "Metrics", - - Comment: ``, - }, { Name: "Wallet", Type: "Wallet", @@ -324,20 +318,6 @@ Format: multiaddress`, Comment: ``, }, }, - "Metrics": []DocField{ - { - Name: "Nickname", - Type: "string", - - Comment: ``, - }, - { - Name: "HeadNotifs", - Type: "bool", - - Comment: ``, - }, - }, "MinerAddressConfig": []DocField{ { Name: "PreCommitControl", diff --git a/node/config/types.go b/node/config/types.go index fe42aa27e..12f7653fb 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -23,7 +23,6 @@ type Common struct { type FullNode struct { Common Client Client - Metrics Metrics Wallet Wallet Fees FeeConfig Chainstore Chainstore @@ -298,12 +297,6 @@ type Splitstore struct { } // // Full Node - -type Metrics struct { - Nickname string - HeadNotifs bool -} - type Client struct { UseIpfs bool IpfsOnlineMode bool From 25052fe48f6e155e2cf05b57572e5102f0be8360 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 28 Jul 2021 17:53:33 +0200 Subject: [PATCH 3/4] Remove chainwatch (now filecoin-project/sentinel) --- Makefile | 27 +- cmd/lotus-chainwatch/dot.go | 131 --- cmd/lotus-chainwatch/main.go | 54 - .../processor/common_actors.go | 299 ----- cmd/lotus-chainwatch/processor/market.go | 316 ----- cmd/lotus-chainwatch/processor/messages.go | 318 ----- cmd/lotus-chainwatch/processor/miner.go | 1035 ----------------- cmd/lotus-chainwatch/processor/mpool.go | 100 -- cmd/lotus-chainwatch/processor/power.go | 190 --- cmd/lotus-chainwatch/processor/processor.go | 420 ------- cmd/lotus-chainwatch/processor/reward.go | 234 ---- cmd/lotus-chainwatch/run.go | 107 -- .../refresh_top_miners_by_base_reward.go | 78 -- cmd/lotus-chainwatch/scheduler/scheduler.go | 60 - cmd/lotus-chainwatch/syncer/blockssub.go | 27 - cmd/lotus-chainwatch/syncer/sync.go | 527 --------- cmd/lotus-chainwatch/util/api.go | 34 - cmd/lotus-chainwatch/util/contextStore.go | 51 - scripts/lotus-chainwatch.service | 15 - 19 files changed, 2 insertions(+), 4021 deletions(-) delete mode 100644 cmd/lotus-chainwatch/dot.go delete mode 100644 cmd/lotus-chainwatch/main.go delete mode 100644 cmd/lotus-chainwatch/processor/common_actors.go delete mode 100644 cmd/lotus-chainwatch/processor/market.go delete mode 100644 cmd/lotus-chainwatch/processor/messages.go delete mode 100644 cmd/lotus-chainwatch/processor/miner.go delete mode 100644 cmd/lotus-chainwatch/processor/mpool.go delete mode 100644 cmd/lotus-chainwatch/processor/power.go delete mode 100644 cmd/lotus-chainwatch/processor/processor.go delete mode 100644 cmd/lotus-chainwatch/processor/reward.go delete mode 100644 cmd/lotus-chainwatch/run.go delete mode 100644 cmd/lotus-chainwatch/scheduler/refresh_top_miners_by_base_reward.go delete mode 100644 cmd/lotus-chainwatch/scheduler/scheduler.go delete mode 100644 cmd/lotus-chainwatch/syncer/blockssub.go delete mode 100644 cmd/lotus-chainwatch/syncer/sync.go delete mode 100644 cmd/lotus-chainwatch/util/api.go delete mode 100644 cmd/lotus-chainwatch/util/contextStore.go delete mode 100644 scripts/lotus-chainwatch.service diff --git a/Makefile b/Makefile index 4aac5f21a..dfe4a65c7 100644 --- a/Makefile +++ b/Makefile @@ -169,12 +169,6 @@ lotus-fountain: .PHONY: lotus-fountain BINS+=lotus-fountain -lotus-chainwatch: - rm -f lotus-chainwatch - $(GOCC) build $(GOFLAGS) -o lotus-chainwatch ./cmd/lotus-chainwatch -.PHONY: lotus-chainwatch -BINS+=lotus-chainwatch - lotus-bench: rm -f lotus-bench $(GOCC) build -o lotus-bench ./cmd/lotus-bench @@ -223,9 +217,6 @@ tvx: .PHONY: tvx BINS+=tvx -install-chainwatch: lotus-chainwatch - install -C ./lotus-chainwatch /usr/local/bin/lotus-chainwatch - lotus-sim: $(BUILD_DEPS) rm -f lotus-sim $(GOCC) build $(GOFLAGS) -o lotus-sim ./cmd/lotus-sim @@ -250,21 +241,13 @@ install-miner-service: install-miner install-daemon-service @echo @echo "lotus-miner service installed. Don't forget to run 'sudo systemctl start lotus-miner' to start it and 'sudo systemctl enable lotus-miner' for it to be enabled on startup." -install-chainwatch-service: install-chainwatch install-daemon-service - mkdir -p /etc/systemd/system - mkdir -p /var/log/lotus - install -C -m 0644 ./scripts/lotus-chainwatch.service /etc/systemd/system/lotus-chainwatch.service - systemctl daemon-reload - @echo - @echo "chainwatch service installed. Don't forget to run 'sudo systemctl start lotus-chainwatch' to start it and 'sudo systemctl enable lotus-chainwatch' for it to be enabled on startup." - install-main-services: install-miner-service -install-all-services: install-main-services install-chainwatch-service +install-all-services: install-main-services install-services: install-main-services -clean-daemon-service: clean-miner-service clean-chainwatch-service +clean-daemon-service: clean-miner-service -systemctl stop lotus-daemon -systemctl disable lotus-daemon rm -f /etc/systemd/system/lotus-daemon.service @@ -276,12 +259,6 @@ clean-miner-service: rm -f /etc/systemd/system/lotus-miner.service systemctl daemon-reload -clean-chainwatch-service: - -systemctl stop lotus-chainwatch - -systemctl disable lotus-chainwatch - rm -f /etc/systemd/system/lotus-chainwatch.service - systemctl daemon-reload - clean-main-services: clean-daemon-service clean-all-services: clean-main-services diff --git a/cmd/lotus-chainwatch/dot.go b/cmd/lotus-chainwatch/dot.go deleted file mode 100644 index 3149d65f5..000000000 --- a/cmd/lotus-chainwatch/dot.go +++ /dev/null @@ -1,131 +0,0 @@ -package main - -import ( - "database/sql" - "fmt" - "hash/crc32" - "strconv" - - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "github.com/urfave/cli/v2" - "golang.org/x/xerrors" -) - -var dotCmd = &cli.Command{ - Name: "dot", - Usage: "generate dot graphs", - ArgsUsage: " ", - Action: func(cctx *cli.Context) error { - ll := cctx.String("log-level") - if err := logging.SetLogLevel("*", ll); err != nil { - return err - } - - db, err := sql.Open("postgres", cctx.String("db")) - if err != nil { - return err - } - defer func() { - if err := db.Close(); err != nil { - log.Errorw("Failed to close database", "error", err) - } - }() - - if err := db.Ping(); err != nil { - return xerrors.Errorf("Database failed to respond to ping (is it online?): %w", err) - } - - minH, err := strconv.ParseInt(cctx.Args().Get(0), 10, 32) - if err != nil { - return err - } - tosee, err := strconv.ParseInt(cctx.Args().Get(1), 10, 32) - if err != nil { - return err - } - maxH := minH + tosee - - res, err := db.Query(`select block, parent, b.miner, b.height, p.height from block_parents - inner join blocks b on block_parents.block = b.cid - inner join blocks p on block_parents.parent = p.cid -where b.height > $1 and b.height < $2`, minH, maxH) - - if err != nil { - return err - } - - fmt.Println("digraph D {") - - hl, err := syncedBlocks(db) - if err != nil { - log.Fatal(err) - } - - for res.Next() { - var block, parent, miner string - var height, ph uint64 - if err := res.Scan(&block, &parent, &miner, &height, &ph); err != nil { - return err - } - - bc, err := cid.Parse(block) - if err != nil { - return err - } - - _, has := hl[bc] - - col := crc32.Checksum([]byte(miner), crc32.MakeTable(crc32.Castagnoli))&0xc0c0c0c0 + 0x30303030 - - hasstr := "" - if !has { - //col = 0xffffffff - hasstr = " UNSYNCED" - } - - nulls := height - ph - 1 - for i := uint64(0); i < nulls; i++ { - name := block + "NP" + fmt.Sprint(i) - - fmt.Printf("%s [label = \"NULL:%d\", fillcolor = \"#ffddff\", style=filled, forcelabels=true]\n%s -> %s\n", - name, height-nulls+i, name, parent) - - parent = name - } - - fmt.Printf("%s [label = \"%s:%d%s\", fillcolor = \"#%06x\", style=filled, forcelabels=true]\n%s -> %s\n", block, miner, height, hasstr, col, block, parent) - } - if res.Err() != nil { - return res.Err() - } - - fmt.Println("}") - - return nil - }, -} - -func syncedBlocks(db *sql.DB) (map[cid.Cid]struct{}, error) { - // timestamp is used to return a configurable amount of rows based on when they were last added. - rws, err := db.Query(`select cid FROM blocks_synced`) - if err != nil { - return nil, xerrors.Errorf("Failed to query blocks_synced: %w", err) - } - out := map[cid.Cid]struct{}{} - - for rws.Next() { - var c string - if err := rws.Scan(&c); err != nil { - return nil, xerrors.Errorf("Failed to scan blocks_synced: %w", err) - } - - ci, err := cid.Parse(c) - if err != nil { - return nil, xerrors.Errorf("Failed to parse blocks_synced: %w", err) - } - - out[ci] = struct{}{} - } - return out, nil -} diff --git a/cmd/lotus-chainwatch/main.go b/cmd/lotus-chainwatch/main.go deleted file mode 100644 index 5cb0f3507..000000000 --- a/cmd/lotus-chainwatch/main.go +++ /dev/null @@ -1,54 +0,0 @@ -package main - -import ( - "os" - - "github.com/filecoin-project/lotus/build" - logging "github.com/ipfs/go-log/v2" - "github.com/urfave/cli/v2" -) - -var log = logging.Logger("chainwatch") - -func main() { - if err := logging.SetLogLevel("*", "info"); err != nil { - log.Fatal(err) - } - log.Info("Starting chainwatch", " v", build.UserVersion()) - - app := &cli.App{ - Name: "lotus-chainwatch", - Usage: "Devnet token distribution utility", - Version: build.UserVersion(), - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "repo", - EnvVars: []string{"LOTUS_PATH"}, - Value: "~/.lotus", // TODO: Consider XDG_DATA_HOME - }, - &cli.StringFlag{ - Name: "api", - EnvVars: []string{"FULLNODE_API_INFO"}, - Value: "", - }, - &cli.StringFlag{ - Name: "db", - EnvVars: []string{"LOTUS_DB"}, - Value: "", - }, - &cli.StringFlag{ - Name: "log-level", - EnvVars: []string{"GOLOG_LOG_LEVEL"}, - Value: "info", - }, - }, - Commands: []*cli.Command{ - dotCmd, - runCmd, - }, - } - - if err := app.Run(os.Args); err != nil { - log.Fatal(err) - } -} diff --git a/cmd/lotus-chainwatch/processor/common_actors.go b/cmd/lotus-chainwatch/processor/common_actors.go deleted file mode 100644 index 0f2c0d2ea..000000000 --- a/cmd/lotus-chainwatch/processor/common_actors.go +++ /dev/null @@ -1,299 +0,0 @@ -package processor - -import ( - "context" - "time" - - "golang.org/x/sync/errgroup" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-state-types/abi" - "github.com/ipfs/go-cid" - - builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin" - - "github.com/filecoin-project/lotus/chain/actors/builtin" - _init "github.com/filecoin-project/lotus/chain/actors/builtin/init" - "github.com/filecoin-project/lotus/chain/events/state" - "github.com/filecoin-project/lotus/chain/types" - cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util" -) - -func (p *Processor) setupCommonActors() error { - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(` -create table if not exists id_address_map -( - id text not null, - address text not null, - constraint id_address_map_pk - primary key (id, address) -); - -create unique index if not exists id_address_map_id_uindex - on id_address_map (id); - -create unique index if not exists id_address_map_address_uindex - on id_address_map (address); - -create table if not exists actors - ( - id text not null - constraint id_address_map_actors_id_fk - references id_address_map (id), - code text not null, - head text not null, - nonce int not null, - balance text not null, - stateroot text - ); - -create index if not exists actors_id_index - on actors (id); - -create index if not exists id_address_map_address_index - on id_address_map (address); - -create index if not exists id_address_map_id_index - on id_address_map (id); - -create or replace function actor_tips(epoch bigint) - returns table (id text, - code text, - head text, - nonce int, - balance text, - stateroot text, - height bigint, - parentstateroot text) as -$body$ - select distinct on (id) * from actors - inner join state_heights sh on sh.parentstateroot = stateroot - where height < $1 - order by id, height desc; -$body$ language sql; - -create table if not exists actor_states -( - head text not null, - code text not null, - state json not null -); - -create unique index if not exists actor_states_head_code_uindex - on actor_states (head, code); - -create index if not exists actor_states_head_index - on actor_states (head); - -create index if not exists actor_states_code_head_index - on actor_states (head, code); - -`); err != nil { - return err - } - - return tx.Commit() -} - -func (p *Processor) HandleCommonActorsChanges(ctx context.Context, actors map[cid.Cid]ActorTips) error { - if err := p.storeActorAddresses(ctx, actors); err != nil { - return err - } - - grp, _ := errgroup.WithContext(ctx) - - grp.Go(func() error { - if err := p.storeActorHeads(actors); err != nil { - return err - } - return nil - }) - - grp.Go(func() error { - if err := p.storeActorStates(actors); err != nil { - return err - } - return nil - }) - - return grp.Wait() -} - -type UpdateAddresses struct { - Old state.AddressPair - New state.AddressPair -} - -func (p Processor) storeActorAddresses(ctx context.Context, actors map[cid.Cid]ActorTips) error { - start := time.Now() - defer func() { - log.Debugw("Stored Actor Addresses", "duration", time.Since(start).String()) - }() - - addressToID := map[address.Address]address.Address{} - // HACK until genesis storage is figured out: - addressToID[builtin2.SystemActorAddr] = builtin2.SystemActorAddr - addressToID[builtin2.InitActorAddr] = builtin2.InitActorAddr - addressToID[builtin2.RewardActorAddr] = builtin2.RewardActorAddr - addressToID[builtin2.CronActorAddr] = builtin2.CronActorAddr - addressToID[builtin2.StoragePowerActorAddr] = builtin2.StoragePowerActorAddr - addressToID[builtin2.StorageMarketActorAddr] = builtin2.StorageMarketActorAddr - addressToID[builtin2.VerifiedRegistryActorAddr] = builtin2.VerifiedRegistryActorAddr - addressToID[builtin2.BurntFundsActorAddr] = builtin2.BurntFundsActorAddr - initActor, err := p.node.StateGetActor(ctx, builtin2.InitActorAddr, types.EmptyTSK) - if err != nil { - return err - } - - initActorState, err := _init.Load(cw_util.NewAPIIpldStore(ctx, p.node), initActor) - if err != nil { - return err - } - // gross.. - if err := initActorState.ForEachActor(func(id abi.ActorID, addr address.Address) error { - idAddr, err := address.NewIDAddress(uint64(id)) - if err != nil { - return err - } - addressToID[addr] = idAddr - return nil - }); err != nil { - return err - } - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(` -create temp table iam (like id_address_map excluding constraints) on commit drop; -`); err != nil { - return xerrors.Errorf("prep temp: %w", err) - } - - stmt, err := tx.Prepare(`copy iam (id, address) from STDIN `) - if err != nil { - return err - } - - for a, i := range addressToID { - if i == address.Undef { - continue - } - if _, err := stmt.Exec( - i.String(), - a.String(), - ); err != nil { - return err - } - } - if err := stmt.Close(); err != nil { - return err - } - - // HACK until chain watch can handle reorgs we need to update this table when ID -> PubKey mappings change - if _, err := tx.Exec(`insert into id_address_map select * from iam on conflict (id) do update set address = EXCLUDED.address`); err != nil { - log.Warnw("Failed to update id_address_map table, this is a known issue") - return nil - } - - return tx.Commit() -} - -func (p *Processor) storeActorHeads(actors map[cid.Cid]ActorTips) error { - start := time.Now() - defer func() { - log.Debugw("Stored Actor Heads", "duration", time.Since(start).String()) - }() - // Basic - tx, err := p.db.Begin() - if err != nil { - return err - } - if _, err := tx.Exec(` - create temp table a_tmp (like actors excluding constraints) on commit drop; - `); err != nil { - return xerrors.Errorf("prep temp: %w", err) - } - - stmt, err := tx.Prepare(`copy a_tmp (id, code, head, nonce, balance, stateroot) from stdin `) - if err != nil { - return err - } - - for code, actTips := range actors { - actorName := code.String() - if builtin.IsBuiltinActor(code) { - actorName = builtin.ActorNameByCode(code) - } - for _, actorInfo := range actTips { - for _, a := range actorInfo { - if _, err := stmt.Exec(a.addr.String(), actorName, a.act.Head.String(), a.act.Nonce, a.act.Balance.String(), a.stateroot.String()); err != nil { - return err - } - } - } - } - - if err := stmt.Close(); err != nil { - return err - } - - if _, err := tx.Exec(`insert into actors select * from a_tmp on conflict do nothing `); err != nil { - return xerrors.Errorf("actor put: %w", err) - } - - return tx.Commit() -} - -func (p *Processor) storeActorStates(actors map[cid.Cid]ActorTips) error { - start := time.Now() - defer func() { - log.Debugw("Stored Actor States", "duration", time.Since(start).String()) - }() - // States - tx, err := p.db.Begin() - if err != nil { - return err - } - if _, err := tx.Exec(` - create temp table as_tmp (like actor_states excluding constraints) on commit drop; - `); err != nil { - return xerrors.Errorf("prep temp: %w", err) - } - - stmt, err := tx.Prepare(`copy as_tmp (head, code, state) from stdin `) - if err != nil { - return err - } - - for code, actTips := range actors { - actorName := code.String() - if builtin.IsBuiltinActor(code) { - actorName = builtin.ActorNameByCode(code) - } - for _, actorInfo := range actTips { - for _, a := range actorInfo { - if _, err := stmt.Exec(a.act.Head.String(), actorName, a.state); err != nil { - return err - } - } - } - } - - if err := stmt.Close(); err != nil { - return err - } - - if _, err := tx.Exec(`insert into actor_states select * from as_tmp on conflict do nothing `); err != nil { - return xerrors.Errorf("actor put: %w", err) - } - - return tx.Commit() -} diff --git a/cmd/lotus-chainwatch/processor/market.go b/cmd/lotus-chainwatch/processor/market.go deleted file mode 100644 index 17aa1c37b..000000000 --- a/cmd/lotus-chainwatch/processor/market.go +++ /dev/null @@ -1,316 +0,0 @@ -package processor - -import ( - "context" - "strconv" - "time" - - "golang.org/x/sync/errgroup" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/chain/actors/builtin/market" - "github.com/filecoin-project/lotus/chain/events/state" -) - -func (p *Processor) setupMarket() error { - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(` -create table if not exists market_deal_proposals -( - deal_id bigint not null, - - state_root text not null, - - piece_cid text not null, - padded_piece_size bigint not null, - unpadded_piece_size bigint not null, - is_verified bool not null, - - client_id text not null, - provider_id text not null, - - start_epoch bigint not null, - end_epoch bigint not null, - slashed_epoch bigint, - storage_price_per_epoch text not null, - - provider_collateral text not null, - client_collateral text not null, - - constraint market_deal_proposal_pk - primary key (deal_id) -); - -create table if not exists market_deal_states -( - deal_id bigint not null, - - sector_start_epoch bigint not null, - last_update_epoch bigint not null, - slash_epoch bigint not null, - - state_root text not null, - - unique (deal_id, sector_start_epoch, last_update_epoch, slash_epoch), - - constraint market_deal_states_pk - primary key (deal_id, state_root) - -); - -create table if not exists minerid_dealid_sectorid -( - deal_id bigint not null - constraint sectors_sector_ids_id_fk - references market_deal_proposals(deal_id), - - sector_id bigint not null, - miner_id text not null, - foreign key (sector_id, miner_id) references sector_precommit_info(sector_id, miner_id), - - constraint miner_sector_deal_ids_pk - primary key (miner_id, sector_id, deal_id) -); - -`); err != nil { - return err - } - - return tx.Commit() -} - -type marketActorInfo struct { - common actorInfo -} - -func (p *Processor) HandleMarketChanges(ctx context.Context, marketTips ActorTips) error { - marketChanges, err := p.processMarket(ctx, marketTips) - if err != nil { - log.Fatalw("Failed to process market actors", "error", err) - } - - if err := p.persistMarket(ctx, marketChanges); err != nil { - log.Fatalw("Failed to persist market actors", "error", err) - } - - if err := p.updateMarket(ctx, marketChanges); err != nil { - log.Fatalw("Failed to update market actors", "error", err) - } - return nil -} - -func (p *Processor) processMarket(ctx context.Context, marketTips ActorTips) ([]marketActorInfo, error) { - start := time.Now() - defer func() { - log.Debugw("Processed Market", "duration", time.Since(start).String()) - }() - - var out []marketActorInfo - for _, markets := range marketTips { - for _, mt := range markets { - // NB: here is where we can extract the market state when we need it. - out = append(out, marketActorInfo{common: mt}) - } - } - return out, nil -} - -func (p *Processor) persistMarket(ctx context.Context, info []marketActorInfo) error { - start := time.Now() - defer func() { - log.Debugw("Persisted Market", "duration", time.Since(start).String()) - }() - - grp, ctx := errgroup.WithContext(ctx) - - grp.Go(func() error { - if err := p.storeMarketActorDealProposals(ctx, info); err != nil { - return xerrors.Errorf("Failed to store marker deal proposals: %w", err) - } - return nil - }) - - grp.Go(func() error { - if err := p.storeMarketActorDealStates(info); err != nil { - return xerrors.Errorf("Failed to store marker deal states: %w", err) - } - return nil - }) - - return grp.Wait() - -} - -func (p *Processor) updateMarket(ctx context.Context, info []marketActorInfo) error { - if err := p.updateMarketActorDealProposals(ctx, info); err != nil { - return xerrors.Errorf("Failed to update market info: %w", err) - } - return nil -} - -func (p *Processor) storeMarketActorDealStates(marketTips []marketActorInfo) error { - start := time.Now() - defer func() { - log.Debugw("Stored Market Deal States", "duration", time.Since(start).String()) - }() - tx, err := p.db.Begin() - if err != nil { - return err - } - if _, err := tx.Exec(`create temp table mds (like market_deal_states excluding constraints) on commit drop;`); err != nil { - return err - } - stmt, err := tx.Prepare(`copy mds (deal_id, sector_start_epoch, last_update_epoch, slash_epoch, state_root) from STDIN`) - if err != nil { - return err - } - for _, mt := range marketTips { - dealStates, err := p.node.StateMarketDeals(context.TODO(), mt.common.tsKey) - if err != nil { - return err - } - - for dealID, ds := range dealStates { - id, err := strconv.ParseUint(dealID, 10, 64) - if err != nil { - return err - } - - if _, err := stmt.Exec( - id, - ds.State.SectorStartEpoch, - ds.State.LastUpdatedEpoch, - ds.State.SlashEpoch, - mt.common.stateroot.String(), - ); err != nil { - return err - } - - } - } - if err := stmt.Close(); err != nil { - return err - } - - if _, err := tx.Exec(`insert into market_deal_states select * from mds on conflict do nothing`); err != nil { - return err - } - - return tx.Commit() -} - -func (p *Processor) storeMarketActorDealProposals(ctx context.Context, marketTips []marketActorInfo) error { - start := time.Now() - defer func() { - log.Debugw("Stored Market Deal Proposals", "duration", time.Since(start).String()) - }() - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(`create temp table mdp (like market_deal_proposals excluding constraints) on commit drop;`); err != nil { - return xerrors.Errorf("prep temp: %w", err) - } - - stmt, err := tx.Prepare(`copy mdp (deal_id, state_root, piece_cid, padded_piece_size, unpadded_piece_size, is_verified, client_id, provider_id, start_epoch, end_epoch, slashed_epoch, storage_price_per_epoch, provider_collateral, client_collateral) from STDIN`) - if err != nil { - return err - } - - // insert in sorted order (lowest height -> highest height) since dealid is pk of table. - for _, mt := range marketTips { - dealStates, err := p.node.StateMarketDeals(ctx, mt.common.tsKey) - if err != nil { - return err - } - - for dealID, ds := range dealStates { - id, err := strconv.ParseUint(dealID, 10, 64) - if err != nil { - return err - } - - if _, err := stmt.Exec( - id, - mt.common.stateroot.String(), - ds.Proposal.PieceCID.String(), - ds.Proposal.PieceSize, - ds.Proposal.PieceSize.Unpadded(), - ds.Proposal.VerifiedDeal, - ds.Proposal.Client.String(), - ds.Proposal.Provider.String(), - ds.Proposal.StartEpoch, - ds.Proposal.EndEpoch, - nil, // slashed_epoch - ds.Proposal.StoragePricePerEpoch.String(), - ds.Proposal.ProviderCollateral.String(), - ds.Proposal.ClientCollateral.String(), - ); err != nil { - return err - } - - } - } - if err := stmt.Close(); err != nil { - return err - } - if _, err := tx.Exec(`insert into market_deal_proposals select * from mdp on conflict do nothing`); err != nil { - return err - } - - return tx.Commit() - -} - -func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTip []marketActorInfo) error { - start := time.Now() - defer func() { - log.Debugw("Updated Market Deal Proposals", "duration", time.Since(start).String()) - }() - pred := state.NewStatePredicates(p.node) - - tx, err := p.db.Begin() - if err != nil { - return err - } - - stmt, err := tx.Prepare(`update market_deal_proposals set slashed_epoch=$1 where deal_id=$2`) - if err != nil { - return err - } - - for _, mt := range marketTip { - stateDiff := pred.OnStorageMarketActorChanged(pred.OnDealStateChanged(pred.OnDealStateAmtChanged())) - - changed, val, err := stateDiff(ctx, mt.common.parentTsKey, mt.common.tsKey) - if err != nil { - log.Warnw("error getting market deal state diff", "error", err) - } - if !changed { - continue - } - changes, ok := val.(*market.DealStateChanges) - if !ok { - return xerrors.Errorf("Unknown type returned by Deal State AMT predicate: %T", val) - } - - for _, modified := range changes.Modified { - if modified.From.SlashEpoch != modified.To.SlashEpoch { - if _, err := stmt.Exec(modified.To.SlashEpoch, modified.ID); err != nil { - return err - } - } - } - } - - if err := stmt.Close(); err != nil { - return err - } - - return tx.Commit() -} diff --git a/cmd/lotus-chainwatch/processor/messages.go b/cmd/lotus-chainwatch/processor/messages.go deleted file mode 100644 index 333477c6a..000000000 --- a/cmd/lotus-chainwatch/processor/messages.go +++ /dev/null @@ -1,318 +0,0 @@ -package processor - -import ( - "context" - "sync" - - "golang.org/x/sync/errgroup" - "golang.org/x/xerrors" - - "github.com/ipfs/go-cid" - - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/parmap" -) - -func (p *Processor) setupMessages() error { - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(` -create table if not exists messages -( - cid text not null - constraint messages_pk - primary key, - "from" text not null, - "to" text not null, - size_bytes bigint not null, - nonce bigint not null, - value text not null, - gas_fee_cap text not null, - gas_premium text not null, - gas_limit bigint not null, - method bigint, - params bytea -); - -create unique index if not exists messages_cid_uindex - on messages (cid); - -create index if not exists messages_from_index - on messages ("from"); - -create index if not exists messages_to_index - on messages ("to"); - -create table if not exists block_messages -( - block text not null - constraint blocks_block_cids_cid_fk - references block_cids (cid), - message text not null, - constraint block_messages_pk - primary key (block, message) -); - -create table if not exists mpool_messages -( - msg text not null - constraint mpool_messages_pk - primary key - constraint mpool_messages_messages_cid_fk - references messages, - add_ts int not null -); - -create unique index if not exists mpool_messages_msg_uindex - on mpool_messages (msg); - -create table if not exists receipts -( - msg text not null, - state text not null, - idx int not null, - exit int not null, - gas_used bigint not null, - return bytea, - constraint receipts_pk - primary key (msg, state) -); - -create index if not exists receipts_msg_state_index - on receipts (msg, state); -`); err != nil { - return err - } - - return tx.Commit() -} - -func (p *Processor) HandleMessageChanges(ctx context.Context, blocks map[cid.Cid]*types.BlockHeader) error { - if err := p.persistMessagesAndReceipts(ctx, blocks); err != nil { - return err - } - return nil -} - -func (p *Processor) persistMessagesAndReceipts(ctx context.Context, blocks map[cid.Cid]*types.BlockHeader) error { - messages, inclusions := p.fetchMessages(ctx, blocks) - receipts := p.fetchParentReceipts(ctx, blocks) - - grp, _ := errgroup.WithContext(ctx) - - grp.Go(func() error { - return p.storeMessages(messages) - }) - - grp.Go(func() error { - return p.storeMsgInclusions(inclusions) - }) - - grp.Go(func() error { - return p.storeReceipts(receipts) - }) - - return grp.Wait() -} - -func (p *Processor) storeReceipts(recs map[mrec]*types.MessageReceipt) error { - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(` -create temp table recs (like receipts excluding constraints) on commit drop; -`); err != nil { - return xerrors.Errorf("prep temp: %w", err) - } - - stmt, err := tx.Prepare(`copy recs (msg, state, idx, exit, gas_used, return) from stdin `) - if err != nil { - return err - } - - for c, m := range recs { - if _, err := stmt.Exec( - c.msg.String(), - c.state.String(), - c.idx, - m.ExitCode, - m.GasUsed, - m.Return, - ); err != nil { - return err - } - } - if err := stmt.Close(); err != nil { - return err - } - - if _, err := tx.Exec(`insert into receipts select * from recs on conflict do nothing `); err != nil { - return xerrors.Errorf("actor put: %w", err) - } - - return tx.Commit() -} - -func (p *Processor) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error { - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(` -create temp table mi (like block_messages excluding constraints) on commit drop; -`); err != nil { - return xerrors.Errorf("prep temp: %w", err) - } - - stmt, err := tx.Prepare(`copy mi (block, message) from STDIN `) - if err != nil { - return err - } - - for b, msgs := range incls { - for _, msg := range msgs { - if _, err := stmt.Exec( - b.String(), - msg.String(), - ); err != nil { - return err - } - } - } - if err := stmt.Close(); err != nil { - return err - } - - if _, err := tx.Exec(`insert into block_messages select * from mi on conflict do nothing `); err != nil { - return xerrors.Errorf("actor put: %w", err) - } - - return tx.Commit() -} - -func (p *Processor) storeMessages(msgs map[cid.Cid]*types.Message) error { - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(` -create temp table msgs (like messages excluding constraints) on commit drop; -`); err != nil { - return xerrors.Errorf("prep temp: %w", err) - } - - stmt, err := tx.Prepare(`copy msgs (cid, "from", "to", size_bytes, nonce, "value", gas_premium, gas_fee_cap, gas_limit, method, params) from stdin `) - if err != nil { - return err - } - - for c, m := range msgs { - var msgBytes int - if b, err := m.Serialize(); err == nil { - msgBytes = len(b) - } - - if _, err := stmt.Exec( - c.String(), - m.From.String(), - m.To.String(), - msgBytes, - m.Nonce, - m.Value.String(), - m.GasPremium.String(), - m.GasFeeCap.String(), - m.GasLimit, - m.Method, - m.Params, - ); err != nil { - return err - } - } - if err := stmt.Close(); err != nil { - return err - } - - if _, err := tx.Exec(`insert into messages select * from msgs on conflict do nothing `); err != nil { - return xerrors.Errorf("actor put: %w", err) - } - - return tx.Commit() -} - -func (p *Processor) fetchMessages(ctx context.Context, blocks map[cid.Cid]*types.BlockHeader) (map[cid.Cid]*types.Message, map[cid.Cid][]cid.Cid) { - var lk sync.Mutex - messages := map[cid.Cid]*types.Message{} - inclusions := map[cid.Cid][]cid.Cid{} // block -> msgs - - parmap.Par(50, parmap.MapArr(blocks), func(header *types.BlockHeader) { - msgs, err := p.node.ChainGetBlockMessages(ctx, header.Cid()) - if err != nil { - log.Error(err) - log.Debugw("ChainGetBlockMessages", "header_cid", header.Cid()) - return - } - - vmm := make([]*types.Message, 0, len(msgs.Cids)) - for _, m := range msgs.BlsMessages { - vmm = append(vmm, m) - } - - for _, m := range msgs.SecpkMessages { - vmm = append(vmm, &m.Message) - } - - lk.Lock() - for _, message := range vmm { - messages[message.Cid()] = message - inclusions[header.Cid()] = append(inclusions[header.Cid()], message.Cid()) - } - lk.Unlock() - }) - - return messages, inclusions -} - -type mrec struct { - msg cid.Cid - state cid.Cid - idx int -} - -func (p *Processor) fetchParentReceipts(ctx context.Context, toSync map[cid.Cid]*types.BlockHeader) map[mrec]*types.MessageReceipt { - var lk sync.Mutex - out := map[mrec]*types.MessageReceipt{} - - parmap.Par(50, parmap.MapArr(toSync), func(header *types.BlockHeader) { - recs, err := p.node.ChainGetParentReceipts(ctx, header.Cid()) - if err != nil { - log.Error(err) - log.Debugw("ChainGetParentReceipts", "header_cid", header.Cid()) - return - } - msgs, err := p.node.ChainGetParentMessages(ctx, header.Cid()) - if err != nil { - log.Error(err) - log.Debugw("ChainGetParentMessages", "header_cid", header.Cid()) - return - } - - lk.Lock() - for i, r := range recs { - out[mrec{ - msg: msgs[i].Cid, - state: header.ParentStateRoot, - idx: i, - }] = r - } - lk.Unlock() - }) - - return out -} diff --git a/cmd/lotus-chainwatch/processor/miner.go b/cmd/lotus-chainwatch/processor/miner.go deleted file mode 100644 index f3514df88..000000000 --- a/cmd/lotus-chainwatch/processor/miner.go +++ /dev/null @@ -1,1035 +0,0 @@ -package processor - -import ( - "context" - "strings" - "time" - - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-bitfield" - "github.com/ipfs/go-cid" - "golang.org/x/sync/errgroup" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-state-types/big" - - "github.com/filecoin-project/lotus/api/v0api" - "github.com/filecoin-project/lotus/blockstore" - "github.com/filecoin-project/lotus/chain/actors/builtin/miner" - "github.com/filecoin-project/lotus/chain/actors/builtin/power" - "github.com/filecoin-project/lotus/chain/events/state" - "github.com/filecoin-project/lotus/chain/store" - "github.com/filecoin-project/lotus/chain/types" - cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util" -) - -func (p *Processor) setupMiners() error { - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(` - -create table if not exists miner_info -( - miner_id text not null, - owner_addr text not null, - worker_addr text not null, - peer_id text, - sector_size text not null, - - constraint miner_info_pk - primary key (miner_id) -); - -create table if not exists sector_precommit_info -( - miner_id text not null, - sector_id bigint not null, - sealed_cid text not null, - state_root text not null, - - seal_rand_epoch bigint not null, - expiration_epoch bigint not null, - - precommit_deposit text not null, - precommit_epoch bigint not null, - deal_weight text not null, - verified_deal_weight text not null, - - - is_replace_capacity bool not null, - replace_sector_deadline bigint, - replace_sector_partition bigint, - replace_sector_number bigint, - - unique (miner_id, sector_id), - - constraint sector_precommit_info_pk - primary key (miner_id, sector_id, sealed_cid) - -); - -create table if not exists sector_info -( - miner_id text not null, - sector_id bigint not null, - sealed_cid text not null, - state_root text not null, - - activation_epoch bigint not null, - expiration_epoch bigint not null, - - deal_weight text not null, - verified_deal_weight text not null, - - initial_pledge text not null, - expected_day_reward text not null, - expected_storage_pledge text not null, - - constraint sector_info_pk - primary key (miner_id, sector_id, sealed_cid) -); - -/* -* captures miner-specific power state for any given stateroot -*/ -create table if not exists miner_power -( - miner_id text not null, - state_root text not null, - raw_bytes_power text not null, - quality_adjusted_power text not null, - constraint miner_power_pk - primary key (miner_id, state_root) -); - -DO $$ -BEGIN - IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'miner_sector_event_type') THEN - CREATE TYPE miner_sector_event_type AS ENUM - ( - 'PRECOMMIT_ADDED', 'PRECOMMIT_EXPIRED', 'COMMIT_CAPACITY_ADDED', 'SECTOR_ADDED', - 'SECTOR_EXTENDED', 'SECTOR_EXPIRED', 'SECTOR_FAULTED', 'SECTOR_RECOVERING', 'SECTOR_RECOVERED', 'SECTOR_TERMINATED' - ); - END IF; -END$$; - -create table if not exists miner_sector_events -( - miner_id text not null, - sector_id bigint not null, - state_root text not null, - event miner_sector_event_type not null, - - constraint miner_sector_events_pk - primary key (sector_id, event, miner_id, state_root) -); - -`); err != nil { - return err - } - - return tx.Commit() -} - -type SectorLifecycleEvent string - -const ( - PreCommitAdded = "PRECOMMIT_ADDED" - PreCommitExpired = "PRECOMMIT_EXPIRED" - - CommitCapacityAdded = "COMMIT_CAPACITY_ADDED" - - SectorAdded = "SECTOR_ADDED" - SectorExpired = "SECTOR_EXPIRED" - SectorExtended = "SECTOR_EXTENDED" - SectorFaulted = "SECTOR_FAULTED" - SectorRecovering = "SECTOR_RECOVERING" - SectorRecovered = "SECTOR_RECOVERED" - SectorTerminated = "SECTOR_TERMINATED" -) - -type MinerSectorsEvent struct { - MinerID address.Address - SectorIDs []uint64 - StateRoot cid.Cid - Event SectorLifecycleEvent -} - -type SectorDealEvent struct { - MinerID address.Address - SectorID uint64 - DealIDs []abi.DealID -} - -type PartitionStatus struct { - Terminated bitfield.BitField - Expired bitfield.BitField - Faulted bitfield.BitField - InRecovery bitfield.BitField - Recovered bitfield.BitField -} - -type minerActorInfo struct { - common actorInfo - - state miner.State - - // tracked by power actor - rawPower big.Int - qalPower big.Int -} - -func (p *Processor) HandleMinerChanges(ctx context.Context, minerTips ActorTips) error { - minerChanges, err := p.processMiners(ctx, minerTips) - if err != nil { - log.Fatalw("Failed to process miner actors", "error", err) - } - - if err := p.persistMiners(ctx, minerChanges); err != nil { - log.Fatalw("Failed to persist miner actors", "error", err) - } - - return nil -} - -func (p *Processor) processMiners(ctx context.Context, minerTips map[types.TipSetKey][]actorInfo) ([]minerActorInfo, error) { - start := time.Now() - defer func() { - log.Debugw("Processed Miners", "duration", time.Since(start).String()) - }() - - stor := store.ActorStore(ctx, blockstore.NewAPIBlockstore(p.node)) - - var out []minerActorInfo - // TODO add parallel calls if this becomes slow - for tipset, miners := range minerTips { - // get the power actors claims map - powerState, err := getPowerActorState(ctx, p.node, tipset) - if err != nil { - return nil, err - } - - // Get miner raw and quality power - for _, act := range miners { - var mi minerActorInfo - mi.common = act - - // get miner claim from power actors claim map and store if found, else the miner had no claim at - // this tipset - claim, found, err := powerState.MinerPower(act.addr) - if err != nil { - return nil, err - } - if found { - mi.qalPower = claim.QualityAdjPower - mi.rawPower = claim.RawBytePower - } - - // Get the miner state - mas, err := miner.Load(stor, &act.act) - if err != nil { - log.Warnw("failed to find miner actor state", "address", act.addr, "error", err) - continue - } - mi.state = mas - out = append(out, mi) - } - } - return out, nil -} - -func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) error { - start := time.Now() - defer func() { - log.Debugw("Persisted Miners", "duration", time.Since(start).String()) - }() - - grp, _ := errgroup.WithContext(ctx) - - grp.Go(func() error { - if err := p.storeMinersPower(miners); err != nil { - return err - } - return nil - }) - - grp.Go(func() error { - if err := p.storeMinersActorInfoState(ctx, miners); err != nil { - return err - } - return nil - }) - - // 8 is arbitrary, idk what a good value here is. - preCommitEvents := make(chan *MinerSectorsEvent, 8) - sectorEvents := make(chan *MinerSectorsEvent, 8) - partitionEvents := make(chan *MinerSectorsEvent, 8) - dealEvents := make(chan *SectorDealEvent, 8) - - grp.Go(func() error { - return p.storePreCommitDealInfo(dealEvents) - }) - - grp.Go(func() error { - return p.storeMinerSectorEvents(ctx, sectorEvents, preCommitEvents, partitionEvents) - }) - - grp.Go(func() error { - defer func() { - close(preCommitEvents) - close(dealEvents) - }() - return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, dealEvents) - }) - - grp.Go(func() error { - defer close(sectorEvents) - return p.storeMinerSectorInfo(ctx, miners, sectorEvents) - }) - - grp.Go(func() error { - defer close(partitionEvents) - return p.getMinerPartitionsDifferences(ctx, miners, partitionEvents) - }) - - return grp.Wait() -} - -func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerActorInfo, sectorEvents chan<- *MinerSectorsEvent, sectorDeals chan<- *SectorDealEvent) error { - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(`create temp table spi (like sector_precommit_info excluding constraints) on commit drop;`); err != nil { - return xerrors.Errorf("Failed to create temp table for sector_precommit_info: %w", err) - } - - stmt, err := tx.Prepare(`copy spi (miner_id, sector_id, sealed_cid, state_root, seal_rand_epoch, expiration_epoch, precommit_deposit, precommit_epoch, deal_weight, verified_deal_weight, is_replace_capacity, replace_sector_deadline, replace_sector_partition, replace_sector_number) from STDIN`) - - if err != nil { - return xerrors.Errorf("Failed to prepare miner precommit info statement: %w", err) - } - - grp, _ := errgroup.WithContext(ctx) - for _, m := range miners { - m := m - grp.Go(func() error { - changes, err := p.getMinerPreCommitChanges(ctx, m) - if err != nil { - if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) { - return nil - } - return err - } - if changes == nil { - return nil - } - - preCommitAdded := make([]uint64, len(changes.Added)) - for i, added := range changes.Added { - if len(added.Info.DealIDs) > 0 { - sectorDeals <- &SectorDealEvent{ - MinerID: m.common.addr, - SectorID: uint64(added.Info.SectorNumber), - DealIDs: added.Info.DealIDs, - } - } - if added.Info.ReplaceCapacity { - if _, err := stmt.Exec( - m.common.addr.String(), - added.Info.SectorNumber, - added.Info.SealedCID.String(), - m.common.stateroot.String(), - added.Info.SealRandEpoch, - added.Info.Expiration, - added.PreCommitDeposit.String(), - added.PreCommitEpoch, - added.DealWeight.String(), - added.VerifiedDealWeight.String(), - added.Info.ReplaceCapacity, - added.Info.ReplaceSectorDeadline, - added.Info.ReplaceSectorPartition, - added.Info.ReplaceSectorNumber, - ); err != nil { - return err - } - } else { - if _, err := stmt.Exec( - m.common.addr.String(), - added.Info.SectorNumber, - added.Info.SealedCID.String(), - m.common.stateroot.String(), - added.Info.SealRandEpoch, - added.Info.Expiration, - added.PreCommitDeposit.String(), - added.PreCommitEpoch, - added.DealWeight.String(), - added.VerifiedDealWeight.String(), - added.Info.ReplaceCapacity, - nil, // replace deadline - nil, // replace partition - nil, // replace sector - ); err != nil { - return err - } - - } - preCommitAdded[i] = uint64(added.Info.SectorNumber) - } - if len(preCommitAdded) > 0 { - sectorEvents <- &MinerSectorsEvent{ - MinerID: m.common.addr, - StateRoot: m.common.stateroot, - SectorIDs: preCommitAdded, - Event: PreCommitAdded, - } - } - var preCommitExpired []uint64 - for _, removed := range changes.Removed { - // TODO: we can optimize this to not load the AMT every time, if necessary. - si, err := m.state.GetSector(removed.Info.SectorNumber) - if err != nil { - return err - } - if si == nil { - preCommitExpired = append(preCommitExpired, uint64(removed.Info.SectorNumber)) - } - } - if len(preCommitExpired) > 0 { - sectorEvents <- &MinerSectorsEvent{ - MinerID: m.common.addr, - StateRoot: m.common.stateroot, - SectorIDs: preCommitExpired, - Event: PreCommitExpired, - } - } - return nil - }) - } - if err := grp.Wait(); err != nil { - return err - } - - if err := stmt.Close(); err != nil { - return xerrors.Errorf("Failed to close sector precommit info statement: %w", err) - } - - if _, err := tx.Exec(`insert into sector_precommit_info select * from spi on conflict do nothing`); err != nil { - return xerrors.Errorf("Failed to insert into sector precommit info table: %w", err) - } - - if err := tx.Commit(); err != nil { - return xerrors.Errorf("Failed to commit sector precommit info: %w", err) - } - return nil -} - -func (p *Processor) storeMinerSectorInfo(ctx context.Context, miners []minerActorInfo, events chan<- *MinerSectorsEvent) error { - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(`create temp table si (like sector_info excluding constraints) on commit drop;`); err != nil { - return xerrors.Errorf("Failed to create temp table for sector_: %w", err) - } - - stmt, err := tx.Prepare(`copy si (miner_id, sector_id, sealed_cid, state_root, activation_epoch, expiration_epoch, deal_weight, verified_deal_weight, initial_pledge, expected_day_reward, expected_storage_pledge) from STDIN`) - if err != nil { - return xerrors.Errorf("Failed to prepare miner sector info statement: %w", err) - } - - grp, _ := errgroup.WithContext(ctx) - for _, m := range miners { - m := m - grp.Go(func() error { - changes, err := p.getMinerSectorChanges(ctx, m) - if err != nil { - if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) { - return nil - } - return err - } - if changes == nil { - return nil - } - var sectorsAdded []uint64 - var ccAdded []uint64 - var extended []uint64 - for _, added := range changes.Added { - // add the sector to the table - if _, err := stmt.Exec( - m.common.addr.String(), - added.SectorNumber, - added.SealedCID.String(), - m.common.stateroot.String(), - added.Activation.String(), - added.Expiration.String(), - added.DealWeight.String(), - added.VerifiedDealWeight.String(), - added.InitialPledge.String(), - added.ExpectedDayReward.String(), - added.ExpectedStoragePledge.String(), - ); err != nil { - log.Errorw("writing miner sector changes statement", "error", err.Error()) - } - if len(added.DealIDs) == 0 { - ccAdded = append(ccAdded, uint64(added.SectorNumber)) - } else { - sectorsAdded = append(sectorsAdded, uint64(added.SectorNumber)) - } - } - - for _, mod := range changes.Extended { - extended = append(extended, uint64(mod.To.SectorNumber)) - } - - events <- &MinerSectorsEvent{ - MinerID: m.common.addr, - StateRoot: m.common.stateroot, - SectorIDs: ccAdded, - Event: CommitCapacityAdded, - } - events <- &MinerSectorsEvent{ - MinerID: m.common.addr, - StateRoot: m.common.stateroot, - SectorIDs: sectorsAdded, - Event: SectorAdded, - } - events <- &MinerSectorsEvent{ - MinerID: m.common.addr, - StateRoot: m.common.stateroot, - SectorIDs: extended, - Event: SectorExtended, - } - return nil - }) - } - - if err := grp.Wait(); err != nil { - return err - } - - if err := stmt.Close(); err != nil { - return xerrors.Errorf("Failed to close sector info statement: %w", err) - } - - if _, err := tx.Exec(`insert into sector_info select * from si on conflict do nothing`); err != nil { - return xerrors.Errorf("Failed to insert into sector info table: %w", err) - } - - if err := tx.Commit(); err != nil { - return xerrors.Errorf("Failed to commit sector info: %w", err) - } - return nil - -} - -func (p *Processor) getMinerPartitionsDifferences(ctx context.Context, miners []minerActorInfo, events chan<- *MinerSectorsEvent) error { - grp, ctx := errgroup.WithContext(ctx) - for _, m := range miners { - m := m - grp.Go(func() error { - if err := p.diffMinerPartitions(ctx, m, events); err != nil { - if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) { - return nil - } - return err - } - return nil - }) - } - return grp.Wait() -} - -func (p *Processor) storeMinerSectorEvents(ctx context.Context, sectorEvents, preCommitEvents, partitionEvents <-chan *MinerSectorsEvent) error { - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(`create temp table mse (like miner_sector_events excluding constraints) on commit drop;`); err != nil { - return xerrors.Errorf("Failed to create temp table for sector_: %w", err) - } - - stmt, err := tx.Prepare(`copy mse (miner_id, sector_id, event, state_root) from STDIN`) - if err != nil { - return xerrors.Errorf("Failed to prepare miner sector info statement: %w", err) - } - - grp, ctx := errgroup.WithContext(ctx) - grp.Go(func() error { - innerGrp, _ := errgroup.WithContext(ctx) - for mse := range sectorEvents { - mse := mse - innerGrp.Go(func() error { - for _, sid := range mse.SectorIDs { - if _, err := stmt.Exec( - mse.MinerID.String(), - sid, - mse.Event, - mse.StateRoot.String(), - ); err != nil { - return err - } - } - return nil - }) - } - return innerGrp.Wait() - }) - - grp.Go(func() error { - innerGrp, _ := errgroup.WithContext(ctx) - for mse := range preCommitEvents { - mse := mse - innerGrp.Go(func() error { - for _, sid := range mse.SectorIDs { - if _, err := stmt.Exec( - mse.MinerID.String(), - sid, - mse.Event, - mse.StateRoot.String(), - ); err != nil { - return err - } - } - return nil - }) - } - return innerGrp.Wait() - }) - - grp.Go(func() error { - innerGrp, _ := errgroup.WithContext(ctx) - for mse := range partitionEvents { - mse := mse - grp.Go(func() error { - for _, sid := range mse.SectorIDs { - if _, err := stmt.Exec( - mse.MinerID.String(), - sid, - mse.Event, - mse.StateRoot.String(), - ); err != nil { - return err - } - } - return nil - }) - } - return innerGrp.Wait() - }) - - if err := grp.Wait(); err != nil { - return err - } - - if err := stmt.Close(); err != nil { - return xerrors.Errorf("Failed to close sector event statement: %w", err) - } - - if _, err := tx.Exec(`insert into miner_sector_events select * from mse on conflict do nothing`); err != nil { - return xerrors.Errorf("Failed to insert into sector event table: %w", err) - } - - if err := tx.Commit(); err != nil { - return xerrors.Errorf("Failed to commit sector events: %w", err) - } - return nil -} - -func (p *Processor) getMinerStateAt(ctx context.Context, maddr address.Address, tskey types.TipSetKey) (miner.State, error) { - prevActor, err := p.node.StateGetActor(ctx, maddr, tskey) - if err != nil { - return nil, err - } - return miner.Load(store.ActorStore(ctx, blockstore.NewAPIBlockstore(p.node)), prevActor) -} - -func (p *Processor) getMinerPreCommitChanges(ctx context.Context, m minerActorInfo) (*miner.PreCommitChanges, error) { - pred := state.NewStatePredicates(p.node) - changed, val, err := pred.OnMinerActorChange(m.common.addr, pred.OnMinerPreCommitChange())(ctx, m.common.parentTsKey, m.common.tsKey) - if err != nil { - return nil, xerrors.Errorf("Failed to diff miner precommit amt: %w", err) - } - if !changed { - return nil, nil - } - out := val.(*miner.PreCommitChanges) - return out, nil -} - -func (p *Processor) getMinerSectorChanges(ctx context.Context, m minerActorInfo) (*miner.SectorChanges, error) { - pred := state.NewStatePredicates(p.node) - changed, val, err := pred.OnMinerActorChange(m.common.addr, pred.OnMinerSectorChange())(ctx, m.common.parentTsKey, m.common.tsKey) - if err != nil { - return nil, xerrors.Errorf("Failed to diff miner sectors amt: %w", err) - } - if !changed { - return nil, nil - } - out := val.(*miner.SectorChanges) - return out, nil -} - -func (p *Processor) diffMinerPartitions(ctx context.Context, m minerActorInfo, events chan<- *MinerSectorsEvent) error { - prevMiner, err := p.getMinerStateAt(ctx, m.common.addr, m.common.parentTsKey) - if err != nil { - return err - } - curMiner := m.state - dc, err := prevMiner.DeadlinesChanged(curMiner) - if err != nil { - return err - } - if !dc { - return nil - } - panic("TODO") - - // FIXME: This code doesn't work. - // 1. We need to diff all deadlines, not just the "current" deadline. - // 2. We need to handle the case where we _add_ a partition. (i.e., - // where len(newPartitions) != len(oldPartitions). - /* - - // NOTE: If we change the number of deadlines in an upgrade, this will - // break. - - // load the old deadline - prevDls, err := prevMiner.LoadDeadlines(p.ctxStore) - if err != nil { - return err - } - var prevDl miner.Deadline - if err := p.ctxStore.Get(ctx, prevDls.Due[dlIdx], &prevDl); err != nil { - return err - } - - prevPartitions, err := prevDl.PartitionsArray(p.ctxStore) - if err != nil { - return err - } - - // load the new deadline - curDls, err := curMiner.LoadDeadlines(p.ctxStore) - if err != nil { - return err - } - - var curDl miner.Deadline - if err := p.ctxStore.Get(ctx, curDls.Due[dlIdx], &curDl); err != nil { - return err - } - - curPartitions, err := curDl.PartitionsArray(p.ctxStore) - if err != nil { - return err - } - - // TODO this can be optimized by inspecting the miner state for partitions that have changed and only inspecting those. - var prevPart miner.Partition - if err := prevPartitions.ForEach(&prevPart, func(i int64) error { - var curPart miner.Partition - if found, err := curPartitions.Get(uint64(i), &curPart); err != nil { - return err - } else if !found { - log.Fatal("I don't know what this means, are partitions ever removed?") - } - partitionDiff, err := p.diffPartition(prevPart, curPart) - if err != nil { - return err - } - - recovered, err := partitionDiff.Recovered.All(miner.SectorsMax) - if err != nil { - return err - } - events <- &MinerSectorsEvent{ - MinerID: m.common.addr, - StateRoot: m.common.stateroot, - SectorIDs: recovered, - Event: SectorRecovered, - } - inRecovery, err := partitionDiff.InRecovery.All(miner.SectorsMax) - if err != nil { - return err - } - events <- &MinerSectorsEvent{ - MinerID: m.common.addr, - StateRoot: m.common.stateroot, - SectorIDs: inRecovery, - Event: SectorRecovering, - } - faulted, err := partitionDiff.Faulted.All(miner.SectorsMax) - if err != nil { - return err - } - events <- &MinerSectorsEvent{ - MinerID: m.common.addr, - StateRoot: m.common.stateroot, - SectorIDs: faulted, - Event: SectorFaulted, - } - terminated, err := partitionDiff.Terminated.All(miner.SectorsMax) - if err != nil { - return err - } - events <- &MinerSectorsEvent{ - MinerID: m.common.addr, - StateRoot: m.common.stateroot, - SectorIDs: terminated, - Event: SectorTerminated, - } - expired, err := partitionDiff.Expired.All(miner.SectorsMax) - if err != nil { - return err - } - events <- &MinerSectorsEvent{ - MinerID: m.common.addr, - StateRoot: m.common.stateroot, - SectorIDs: expired, - Event: SectorExpired, - } - - return nil - }); err != nil { - return err - } - - return nil - */ -} - -func (p *Processor) diffPartition(prevPart, curPart miner.Partition) (*PartitionStatus, error) { - prevLiveSectors, err := prevPart.LiveSectors() - if err != nil { - return nil, err - } - curLiveSectors, err := curPart.LiveSectors() - if err != nil { - return nil, err - } - - removedSectors, err := bitfield.SubtractBitField(prevLiveSectors, curLiveSectors) - if err != nil { - return nil, err - } - - prevRecoveries, err := prevPart.RecoveringSectors() - if err != nil { - return nil, err - } - - curRecoveries, err := curPart.RecoveringSectors() - if err != nil { - return nil, err - } - - newRecoveries, err := bitfield.SubtractBitField(curRecoveries, prevRecoveries) - if err != nil { - return nil, err - } - - prevFaults, err := prevPart.FaultySectors() - if err != nil { - return nil, err - } - - curFaults, err := curPart.FaultySectors() - if err != nil { - return nil, err - } - - newFaults, err := bitfield.SubtractBitField(curFaults, prevFaults) - if err != nil { - return nil, err - } - - // all current good sectors - curActiveSectors, err := curPart.ActiveSectors() - if err != nil { - return nil, err - } - - // sectors that were previously fault and are now currently active are considered recovered. - recovered, err := bitfield.IntersectBitField(prevFaults, curActiveSectors) - if err != nil { - return nil, err - } - - // TODO: distinguish between "terminated" and "expired" sectors. The - // previous code here never had a chance of working in the first place, - // so I'm not going to try to replicate it right now. - // - // How? If the sector expires before it should (according to sector - // info) and it wasn't replaced by a pre-commit deleted in this change - // set, it was "early terminated". - - return &PartitionStatus{ - Terminated: bitfield.New(), - Expired: removedSectors, - Faulted: newFaults, - InRecovery: newRecoveries, - Recovered: recovered, - }, nil -} - -func (p *Processor) storeMinersActorInfoState(ctx context.Context, miners []minerActorInfo) error { - start := time.Now() - defer func() { - log.Debugw("Stored Miners Actor State", "duration", time.Since(start).String()) - }() - - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(`create temp table mi (like miner_info excluding constraints) on commit drop;`); err != nil { - return xerrors.Errorf("prep temp: %w", err) - } - - stmt, err := tx.Prepare(`copy mi (miner_id, owner_addr, worker_addr, peer_id, sector_size) from STDIN`) - if err != nil { - return err - } - for _, m := range miners { - mi, err := p.node.StateMinerInfo(ctx, m.common.addr, m.common.tsKey) - if err != nil { - if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) { - continue - } else { - return err - } - } - var pid string - if mi.PeerId != nil { - pid = mi.PeerId.String() - } - if _, err := stmt.Exec( - m.common.addr.String(), - mi.Owner.String(), - mi.Worker.String(), - pid, - mi.SectorSize.ShortString(), - ); err != nil { - log.Errorw("failed to store miner state", "state", m.state, "info", m.state.Info, "error", err) - return xerrors.Errorf("failed to store miner state: %w", err) - } - - } - if err := stmt.Close(); err != nil { - return err - } - - if _, err := tx.Exec(`insert into miner_info select * from mi on conflict do nothing `); err != nil { - return xerrors.Errorf("actor put: %w", err) - } - - return tx.Commit() -} - -func (p *Processor) storePreCommitDealInfo(dealEvents <-chan *SectorDealEvent) error { - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(`create temp table mds (like minerid_dealid_sectorid excluding constraints) on commit drop;`); err != nil { - return xerrors.Errorf("Failed to create temp table for minerid_dealid_sectorid: %w", err) - } - - stmt, err := tx.Prepare(`copy mds (deal_id, miner_id, sector_id) from STDIN`) - if err != nil { - return xerrors.Errorf("Failed to prepare minerid_dealid_sectorid statement: %w", err) - } - - for sde := range dealEvents { - for _, did := range sde.DealIDs { - if _, err := stmt.Exec( - uint64(did), - sde.MinerID.String(), - sde.SectorID, - ); err != nil { - return err - } - } - } - - if err := stmt.Close(); err != nil { - return xerrors.Errorf("Failed to close miner sector deals statement: %w", err) - } - - if _, err := tx.Exec(`insert into minerid_dealid_sectorid select * from mds on conflict do nothing`); err != nil { - return xerrors.Errorf("Failed to insert into miner deal sector table: %w", err) - } - - if err := tx.Commit(); err != nil { - return xerrors.Errorf("Failed to commit miner deal sector table: %w", err) - } - return nil - -} - -func (p *Processor) storeMinersPower(miners []minerActorInfo) error { - start := time.Now() - defer func() { - log.Debugw("Stored Miners Power", "duration", time.Since(start).String()) - }() - - tx, err := p.db.Begin() - if err != nil { - return xerrors.Errorf("begin miner_power tx: %w", err) - } - - if _, err := tx.Exec(`create temp table mp (like miner_power excluding constraints) on commit drop`); err != nil { - return xerrors.Errorf("prep miner_power temp: %w", err) - } - - stmt, err := tx.Prepare(`copy mp (miner_id, state_root, raw_bytes_power, quality_adjusted_power) from STDIN`) - if err != nil { - return xerrors.Errorf("prepare tmp miner_power: %w", err) - } - - for _, m := range miners { - if _, err := stmt.Exec( - m.common.addr.String(), - m.common.stateroot.String(), - m.rawPower.String(), - m.qalPower.String(), - ); err != nil { - log.Errorw("failed to store miner power", "miner", m.common.addr, "stateroot", m.common.stateroot, "error", err) - } - } - - if err := stmt.Close(); err != nil { - return xerrors.Errorf("close prepared miner_power: %w", err) - } - - if _, err := tx.Exec(`insert into miner_power select * from mp on conflict do nothing`); err != nil { - return xerrors.Errorf("insert miner_power from tmp: %w", err) - } - - if err := tx.Commit(); err != nil { - return xerrors.Errorf("commit miner_power tx: %w", err) - } - - return nil - -} - -// load the power actor state clam as an adt.Map at the tipset `ts`. -func getPowerActorState(ctx context.Context, api v0api.FullNode, ts types.TipSetKey) (power.State, error) { - powerActor, err := api.StateGetActor(ctx, power.Address, ts) - if err != nil { - return nil, err - } - return power.Load(cw_util.NewAPIIpldStore(ctx, api), powerActor) -} diff --git a/cmd/lotus-chainwatch/processor/mpool.go b/cmd/lotus-chainwatch/processor/mpool.go deleted file mode 100644 index 0a6445d78..000000000 --- a/cmd/lotus-chainwatch/processor/mpool.go +++ /dev/null @@ -1,100 +0,0 @@ -package processor - -import ( - "context" - "time" - - "golang.org/x/xerrors" - - "github.com/ipfs/go-cid" - - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/types" -) - -func (p *Processor) subMpool(ctx context.Context) { - sub, err := p.node.MpoolSub(ctx) - if err != nil { - return - } - - for { - var updates []api.MpoolUpdate - - select { - case update := <-sub: - updates = append(updates, update) - case <-ctx.Done(): - return - } - - loop: - for { - select { - case update := <-sub: - updates = append(updates, update) - case <-time.After(10 * time.Millisecond): - break loop - } - } - - msgs := map[cid.Cid]*types.Message{} - for _, v := range updates { - if v.Type != api.MpoolAdd { - continue - } - - msgs[v.Message.Message.Cid()] = &v.Message.Message - } - - err := p.storeMessages(msgs) - if err != nil { - log.Error(err) - } - - if err := p.storeMpoolInclusions(updates); err != nil { - log.Error(err) - } - } -} - -func (p *Processor) storeMpoolInclusions(msgs []api.MpoolUpdate) error { - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(` - create temp table mi (like mpool_messages excluding constraints) on commit drop; - `); err != nil { - return xerrors.Errorf("prep temp: %w", err) - } - - stmt, err := tx.Prepare(`copy mi (msg, add_ts) from stdin `) - if err != nil { - return err - } - - for _, msg := range msgs { - if msg.Type != api.MpoolAdd { - continue - } - - if _, err := stmt.Exec( - msg.Message.Message.Cid().String(), - time.Now().Unix(), - ); err != nil { - return err - } - } - - if err := stmt.Close(); err != nil { - return err - } - - if _, err := tx.Exec(`insert into mpool_messages select * from mi on conflict do nothing `); err != nil { - return xerrors.Errorf("actor put: %w", err) - } - - return tx.Commit() -} diff --git a/cmd/lotus-chainwatch/processor/power.go b/cmd/lotus-chainwatch/processor/power.go deleted file mode 100644 index 726a46706..000000000 --- a/cmd/lotus-chainwatch/processor/power.go +++ /dev/null @@ -1,190 +0,0 @@ -package processor - -import ( - "context" - "time" - - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-state-types/big" - - "github.com/filecoin-project/lotus/chain/actors/builtin" -) - -type powerActorInfo struct { - common actorInfo - - totalRawBytes big.Int - totalRawBytesCommitted big.Int - totalQualityAdjustedBytes big.Int - totalQualityAdjustedBytesCommitted big.Int - totalPledgeCollateral big.Int - - qaPowerSmoothed builtin.FilterEstimate - - minerCount int64 - minerCountAboveMinimumPower int64 -} - -func (p *Processor) setupPower() error { - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(` -create table if not exists chain_power -( - state_root text not null - constraint power_smoothing_estimates_pk - primary key, - - total_raw_bytes_power text not null, - total_raw_bytes_committed text not null, - total_qa_bytes_power text not null, - total_qa_bytes_committed text not null, - total_pledge_collateral text not null, - - qa_smoothed_position_estimate text not null, - qa_smoothed_velocity_estimate text not null, - - miner_count int not null, - minimum_consensus_miner_count int not null -); -`); err != nil { - return err - } - - return tx.Commit() -} - -func (p *Processor) HandlePowerChanges(ctx context.Context, powerTips ActorTips) error { - powerChanges, err := p.processPowerActors(ctx, powerTips) - if err != nil { - return xerrors.Errorf("Failed to process power actors: %w", err) - } - - if err := p.persistPowerActors(ctx, powerChanges); err != nil { - return err - } - - return nil -} - -func (p *Processor) processPowerActors(ctx context.Context, powerTips ActorTips) ([]powerActorInfo, error) { - start := time.Now() - defer func() { - log.Debugw("Processed Power Actors", "duration", time.Since(start).String()) - }() - - var out []powerActorInfo - for tipset, powerStates := range powerTips { - for _, act := range powerStates { - var pw powerActorInfo - pw.common = act - - powerActorState, err := getPowerActorState(ctx, p.node, tipset) - if err != nil { - return nil, xerrors.Errorf("get power state (@ %s): %w", pw.common.stateroot.String(), err) - } - - totalPower, err := powerActorState.TotalPower() - if err != nil { - return nil, xerrors.Errorf("failed to compute total power: %w", err) - } - - totalCommitted, err := powerActorState.TotalCommitted() - if err != nil { - return nil, xerrors.Errorf("failed to compute total committed: %w", err) - } - - totalLocked, err := powerActorState.TotalLocked() - if err != nil { - return nil, xerrors.Errorf("failed to compute total locked: %w", err) - } - - powerSmoothed, err := powerActorState.TotalPowerSmoothed() - if err != nil { - return nil, xerrors.Errorf("failed to determine smoothed power: %w", err) - } - - // NOTE: this doesn't set new* fields. Previously, we - // filled these using ThisEpoch* fields from the actor - // state, but these fields are effectively internal - // state and don't represent "new" power, as was - // assumed. - - participatingMiners, totalMiners, err := powerActorState.MinerCounts() - if err != nil { - return nil, xerrors.Errorf("failed to count miners: %w", err) - } - - pw.totalRawBytes = totalPower.RawBytePower - pw.totalQualityAdjustedBytes = totalPower.QualityAdjPower - pw.totalRawBytesCommitted = totalCommitted.RawBytePower - pw.totalQualityAdjustedBytesCommitted = totalCommitted.QualityAdjPower - pw.totalPledgeCollateral = totalLocked - pw.qaPowerSmoothed = powerSmoothed - pw.minerCountAboveMinimumPower = int64(participatingMiners) - pw.minerCount = int64(totalMiners) - } - } - - return out, nil -} - -func (p *Processor) persistPowerActors(ctx context.Context, powerStates []powerActorInfo) error { - // NB: use errgroup when there is more than a single store operation - return p.storePowerSmoothingEstimates(powerStates) -} - -func (p *Processor) storePowerSmoothingEstimates(powerStates []powerActorInfo) error { - tx, err := p.db.Begin() - if err != nil { - return xerrors.Errorf("begin chain_power tx: %w", err) - } - - if _, err := tx.Exec(`create temp table cp (like chain_power) on commit drop`); err != nil { - return xerrors.Errorf("prep chain_power: %w", err) - } - - stmt, err := tx.Prepare(`copy cp (state_root, total_raw_bytes_power, total_raw_bytes_committed, total_qa_bytes_power, total_qa_bytes_committed, total_pledge_collateral, qa_smoothed_position_estimate, qa_smoothed_velocity_estimate, miner_count, minimum_consensus_miner_count) from stdin;`) - if err != nil { - return xerrors.Errorf("prepare tmp chain_power: %w", err) - } - - for _, ps := range powerStates { - if _, err := stmt.Exec( - ps.common.stateroot.String(), - - ps.totalRawBytes.String(), - ps.totalRawBytesCommitted.String(), - ps.totalQualityAdjustedBytes.String(), - ps.totalQualityAdjustedBytesCommitted.String(), - ps.totalPledgeCollateral.String(), - - ps.qaPowerSmoothed.PositionEstimate.String(), - ps.qaPowerSmoothed.VelocityEstimate.String(), - - ps.minerCount, - ps.minerCountAboveMinimumPower, - ); err != nil { - return xerrors.Errorf("failed to store smoothing estimate: %w", err) - } - } - - if err := stmt.Close(); err != nil { - return xerrors.Errorf("close prepared chain_power: %w", err) - } - - if _, err := tx.Exec(`insert into chain_power select * from cp on conflict do nothing`); err != nil { - return xerrors.Errorf("insert chain_power from tmp: %w", err) - } - - if err := tx.Commit(); err != nil { - return xerrors.Errorf("commit chain_power tx: %w", err) - } - - return nil - -} diff --git a/cmd/lotus-chainwatch/processor/processor.go b/cmd/lotus-chainwatch/processor/processor.go deleted file mode 100644 index af5935d47..000000000 --- a/cmd/lotus-chainwatch/processor/processor.go +++ /dev/null @@ -1,420 +0,0 @@ -package processor - -import ( - "context" - "database/sql" - "encoding/json" - "math" - "sync" - "time" - - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-address" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - - "github.com/filecoin-project/go-state-types/abi" - builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin" - - "github.com/filecoin-project/lotus/api/v0api" - "github.com/filecoin-project/lotus/chain/types" - cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util" - "github.com/filecoin-project/lotus/lib/parmap" -) - -var log = logging.Logger("processor") - -type Processor struct { - db *sql.DB - - node v0api.FullNode - ctxStore *cw_util.APIIpldStore - - genesisTs *types.TipSet - - // number of blocks processed at a time - batch int -} - -type ActorTips map[types.TipSetKey][]actorInfo - -type actorInfo struct { - act types.Actor - - stateroot cid.Cid - height abi.ChainEpoch // so that we can walk the actor changes in chronological order. - - tsKey types.TipSetKey - parentTsKey types.TipSetKey - - addr address.Address - state string -} - -func NewProcessor(ctx context.Context, db *sql.DB, node v0api.FullNode, batch int) *Processor { - ctxStore := cw_util.NewAPIIpldStore(ctx, node) - return &Processor{ - db: db, - ctxStore: ctxStore, - node: node, - batch: batch, - } -} - -func (p *Processor) setupSchemas() error { - // maintain order, subsequent calls create tables with foreign keys. - if err := p.setupMiners(); err != nil { - return err - } - - if err := p.setupMarket(); err != nil { - return err - } - - if err := p.setupRewards(); err != nil { - return err - } - - if err := p.setupMessages(); err != nil { - return err - } - - if err := p.setupCommonActors(); err != nil { - return err - } - - if err := p.setupPower(); err != nil { - return err - } - - return nil -} - -func (p *Processor) Start(ctx context.Context) { - log.Debug("Starting Processor") - - if err := p.setupSchemas(); err != nil { - log.Fatalw("Failed to setup processor", "error", err) - } - - var err error - p.genesisTs, err = p.node.ChainGetGenesis(ctx) - if err != nil { - log.Fatalw("Failed to get genesis state from lotus", "error", err.Error()) - } - - go p.subMpool(ctx) - - // main processor loop - go func() { - for { - select { - case <-ctx.Done(): - log.Info("Stopping Processor...") - return - default: - loopStart := time.Now() - toProcess, err := p.unprocessedBlocks(ctx, p.batch) - if err != nil { - log.Fatalw("Failed to get unprocessed blocks", "error", err) - } - - if len(toProcess) == 0 { - log.Info("No unprocessed blocks. Wait then try again...") - time.Sleep(time.Second * 30) - continue - } - - // TODO special case genesis state handling here to avoid all the special cases that will be needed for it else where - // before doing "normal" processing. - - actorChanges, nullRounds, err := p.collectActorChanges(ctx, toProcess) - if err != nil { - log.Fatalw("Failed to collect actor changes", "error", err) - } - log.Infow("Collected Actor Changes", - "MarketChanges", len(actorChanges[builtin2.StorageMarketActorCodeID]), - "MinerChanges", len(actorChanges[builtin2.StorageMinerActorCodeID]), - "RewardChanges", len(actorChanges[builtin2.RewardActorCodeID]), - "AccountChanges", len(actorChanges[builtin2.AccountActorCodeID]), - "nullRounds", len(nullRounds)) - - grp := sync.WaitGroup{} - - grp.Add(1) - go func() { - defer grp.Done() - if err := p.HandleMarketChanges(ctx, actorChanges[builtin2.StorageMarketActorCodeID]); err != nil { - log.Errorf("Failed to handle market changes: %v", err) - return - } - }() - - grp.Add(1) - go func() { - defer grp.Done() - if err := p.HandleMinerChanges(ctx, actorChanges[builtin2.StorageMinerActorCodeID]); err != nil { - log.Errorf("Failed to handle miner changes: %v", err) - return - } - }() - - grp.Add(1) - go func() { - defer grp.Done() - if err := p.HandleRewardChanges(ctx, actorChanges[builtin2.RewardActorCodeID], nullRounds); err != nil { - log.Errorf("Failed to handle reward changes: %v", err) - return - } - }() - - grp.Add(1) - go func() { - defer grp.Done() - if err := p.HandlePowerChanges(ctx, actorChanges[builtin2.StoragePowerActorCodeID]); err != nil { - log.Errorf("Failed to handle power actor changes: %v", err) - return - } - }() - - grp.Add(1) - go func() { - defer grp.Done() - if err := p.HandleMessageChanges(ctx, toProcess); err != nil { - log.Errorf("Failed to handle message changes: %v", err) - return - } - }() - - grp.Add(1) - go func() { - defer grp.Done() - if err := p.HandleCommonActorsChanges(ctx, actorChanges); err != nil { - log.Errorf("Failed to handle common actor changes: %v", err) - return - } - }() - - grp.Wait() - - if err := p.markBlocksProcessed(ctx, toProcess); err != nil { - log.Fatalw("Failed to mark blocks as processed", "error", err) - } - - if err := p.refreshViews(); err != nil { - log.Errorw("Failed to refresh views", "error", err) - } - log.Infow("Processed Batch Complete", "duration", time.Since(loopStart).String()) - } - } - }() - -} - -func (p *Processor) refreshViews() error { - if _, err := p.db.Exec(`refresh materialized view state_heights`); err != nil { - return err - } - - return nil -} - -func (p *Processor) collectActorChanges(ctx context.Context, toProcess map[cid.Cid]*types.BlockHeader) (map[cid.Cid]ActorTips, []types.TipSetKey, error) { - start := time.Now() - defer func() { - log.Debugw("Collected Actor Changes", "duration", time.Since(start).String()) - }() - // ActorCode - > tipset->[]actorInfo - out := map[cid.Cid]ActorTips{} - var outMu sync.Mutex - - // map of addresses to changed actors - var changes map[string]types.Actor - actorsSeen := map[cid.Cid]struct{}{} - - var nullRounds []types.TipSetKey - var nullBlkMu sync.Mutex - - // collect all actor state that has changes between block headers - paDone := 0 - parmap.Par(50, parmap.MapArr(toProcess), func(bh *types.BlockHeader) { - paDone++ - if paDone%100 == 0 { - log.Debugw("Collecting actor changes", "done", paDone, "percent", (paDone*100)/len(toProcess)) - } - - pts, err := p.node.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...)) - if err != nil { - log.Error(err) - return - } - - if pts.ParentState().Equals(bh.ParentStateRoot) { - nullBlkMu.Lock() - nullRounds = append(nullRounds, pts.Key()) - nullBlkMu.Unlock() - } - - // collect all actors that had state changes between the blockheader parent-state and its grandparent-state. - // TODO: changes will contain deleted actors, this causes needless processing further down the pipeline, consider - // a separate strategy for deleted actors - changes, err = p.node.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot) - if err != nil { - log.Error(err) - log.Debugw("StateChangedActors", "grandparent_state", pts.ParentState(), "parent_state", bh.ParentStateRoot) - return - } - - // record the state of all actors that have changed - for a, act := range changes { - act := act - a := a - - // ignore actors that were deleted. - has, err := p.node.ChainHasObj(ctx, act.Head) - if err != nil { - log.Error(err) - log.Debugw("ChanHasObj", "actor_head", act.Head) - return - } - if !has { - continue - } - - addr, err := address.NewFromString(a) - if err != nil { - log.Error(err) - log.Debugw("NewFromString", "address_string", a) - return - } - - ast, err := p.node.StateReadState(ctx, addr, pts.Key()) - if err != nil { - log.Error(err) - log.Debugw("StateReadState", "address_string", a, "parent_tipset_key", pts.Key()) - return - } - - // TODO look here for an empty state, maybe thats a sign the actor was deleted? - - state, err := json.Marshal(ast.State) - if err != nil { - log.Error(err) - return - } - - outMu.Lock() - if _, ok := actorsSeen[act.Head]; !ok { - _, ok := out[act.Code] - if !ok { - out[act.Code] = map[types.TipSetKey][]actorInfo{} - } - out[act.Code][pts.Key()] = append(out[act.Code][pts.Key()], actorInfo{ - act: act, - stateroot: bh.ParentStateRoot, - height: bh.Height, - tsKey: pts.Key(), - parentTsKey: pts.Parents(), - addr: addr, - state: string(state), - }) - } - actorsSeen[act.Head] = struct{}{} - outMu.Unlock() - } - }) - return out, nullRounds, nil -} - -func (p *Processor) unprocessedBlocks(ctx context.Context, batch int) (map[cid.Cid]*types.BlockHeader, error) { - start := time.Now() - defer func() { - log.Debugw("Gathered Blocks to process", "duration", time.Since(start).String()) - }() - rows, err := p.db.Query(` -with toProcess as ( - select b.cid, b.height, rank() over (order by height) as rnk - from blocks_synced bs - left join blocks b on bs.cid = b.cid - where bs.processed_at is null and b.height > 0 -) -select cid -from toProcess -where rnk <= $1 -`, batch) - if err != nil { - return nil, xerrors.Errorf("Failed to query for unprocessed blocks: %w", err) - } - out := map[cid.Cid]*types.BlockHeader{} - - minBlock := abi.ChainEpoch(math.MaxInt64) - maxBlock := abi.ChainEpoch(0) - // TODO consider parallel execution here for getting the blocks from the api as is done in fetchMessages() - for rows.Next() { - if rows.Err() != nil { - return nil, err - } - var c string - if err := rows.Scan(&c); err != nil { - log.Errorf("Failed to scan unprocessed blocks: %s", err.Error()) - continue - } - ci, err := cid.Parse(c) - if err != nil { - log.Errorf("Failed to parse unprocessed blocks: %s", err.Error()) - continue - } - bh, err := p.node.ChainGetBlock(ctx, ci) - if err != nil { - // this is a pretty serious issue. - log.Errorf("Failed to get block header %s: %s", ci.String(), err.Error()) - continue - } - out[ci] = bh - if bh.Height < minBlock { - minBlock = bh.Height - } - if bh.Height > maxBlock { - maxBlock = bh.Height - } - } - if minBlock <= maxBlock { - log.Infow("Gathered Blocks to Process", "start", minBlock, "end", maxBlock) - } - return out, rows.Close() -} - -func (p *Processor) markBlocksProcessed(ctx context.Context, processed map[cid.Cid]*types.BlockHeader) error { - start := time.Now() - processedHeight := abi.ChainEpoch(0) - defer func() { - log.Debugw("Marked blocks as Processed", "duration", time.Since(start).String()) - log.Infow("Processed Blocks", "height", processedHeight) - }() - tx, err := p.db.Begin() - if err != nil { - return err - } - - processedAt := time.Now().Unix() - stmt, err := tx.Prepare(`update blocks_synced set processed_at=$1 where cid=$2`) - if err != nil { - return err - } - - for c, bh := range processed { - if bh.Height > processedHeight { - processedHeight = bh.Height - } - if _, err := stmt.Exec(processedAt, c.String()); err != nil { - return err - } - } - - if err := stmt.Close(); err != nil { - return err - } - - return tx.Commit() -} diff --git a/cmd/lotus-chainwatch/processor/reward.go b/cmd/lotus-chainwatch/processor/reward.go deleted file mode 100644 index 72a329c87..000000000 --- a/cmd/lotus-chainwatch/processor/reward.go +++ /dev/null @@ -1,234 +0,0 @@ -package processor - -import ( - "context" - "time" - - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-state-types/big" - - "github.com/filecoin-project/lotus/chain/actors/builtin" - "github.com/filecoin-project/lotus/chain/actors/builtin/reward" - "github.com/filecoin-project/lotus/chain/types" - - cw_util "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util" -) - -type rewardActorInfo struct { - common actorInfo - - cumSumBaselinePower big.Int - cumSumRealizedPower big.Int - - effectiveNetworkTime abi.ChainEpoch - effectiveBaselinePower big.Int - - // NOTE: These variables are wrong. Talk to @ZX about fixing. These _do - // not_ represent "new" anything. - newBaselinePower big.Int - newBaseReward big.Int - newSmoothingEstimate builtin.FilterEstimate - - totalMinedReward big.Int -} - -func (rw *rewardActorInfo) set(s reward.State) (err error) { - rw.cumSumBaselinePower, err = s.CumsumBaseline() - if err != nil { - return xerrors.Errorf("getting cumsum baseline power (@ %s): %w", rw.common.stateroot.String(), err) - } - - rw.cumSumRealizedPower, err = s.CumsumRealized() - if err != nil { - return xerrors.Errorf("getting cumsum realized power (@ %s): %w", rw.common.stateroot.String(), err) - } - - rw.effectiveNetworkTime, err = s.EffectiveNetworkTime() - if err != nil { - return xerrors.Errorf("getting effective network time (@ %s): %w", rw.common.stateroot.String(), err) - } - - rw.effectiveBaselinePower, err = s.EffectiveBaselinePower() - if err != nil { - return xerrors.Errorf("getting effective baseline power (@ %s): %w", rw.common.stateroot.String(), err) - } - - rw.totalMinedReward, err = s.TotalStoragePowerReward() - if err != nil { - return xerrors.Errorf("getting total mined (@ %s): %w", rw.common.stateroot.String(), err) - } - - rw.newBaselinePower, err = s.ThisEpochBaselinePower() - if err != nil { - return xerrors.Errorf("getting this epoch baseline power (@ %s): %w", rw.common.stateroot.String(), err) - } - - rw.newBaseReward, err = s.ThisEpochReward() - if err != nil { - return xerrors.Errorf("getting this epoch baseline power (@ %s): %w", rw.common.stateroot.String(), err) - } - - rw.newSmoothingEstimate, err = s.ThisEpochRewardSmoothed() - if err != nil { - return xerrors.Errorf("getting this epoch baseline power (@ %s): %w", rw.common.stateroot.String(), err) - } - return nil -} - -func (p *Processor) setupRewards() error { - tx, err := p.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(` -/* captures chain-specific power state for any given stateroot */ -create table if not exists chain_reward -( - state_root text not null - constraint chain_reward_pk - primary key, - cum_sum_baseline text not null, - cum_sum_realized text not null, - effective_network_time int not null, - effective_baseline_power text not null, - - new_baseline_power text not null, - new_reward numeric not null, - new_reward_smoothed_position_estimate text not null, - new_reward_smoothed_velocity_estimate text not null, - - total_mined_reward text not null -); -`); err != nil { - return err - } - - return tx.Commit() -} - -func (p *Processor) HandleRewardChanges(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) error { - rewardChanges, err := p.processRewardActors(ctx, rewardTips, nullRounds) - if err != nil { - return xerrors.Errorf("Failed to process reward actors: %w", err) - } - - if err := p.persistRewardActors(ctx, rewardChanges); err != nil { - return err - } - - return nil -} - -func (p *Processor) processRewardActors(ctx context.Context, rewardTips ActorTips, nullRounds []types.TipSetKey) ([]rewardActorInfo, error) { - start := time.Now() - defer func() { - log.Debugw("Processed Reward Actors", "duration", time.Since(start).String()) - }() - - var out []rewardActorInfo - for tipset, rewards := range rewardTips { - for _, act := range rewards { - var rw rewardActorInfo - rw.common = act - - // get reward actor states at each tipset once for all updates - rewardActor, err := p.node.StateGetActor(ctx, reward.Address, tipset) - if err != nil { - return nil, xerrors.Errorf("get reward state (@ %s): %w", rw.common.stateroot.String(), err) - } - - rewardActorState, err := reward.Load(cw_util.NewAPIIpldStore(ctx, p.node), rewardActor) - if err != nil { - return nil, xerrors.Errorf("read state obj (@ %s): %w", rw.common.stateroot.String(), err) - } - if err := rw.set(rewardActorState); err != nil { - return nil, err - } - - out = append(out, rw) - } - } - for _, tsKey := range nullRounds { - var rw rewardActorInfo - tipset, err := p.node.ChainGetTipSet(ctx, tsKey) - if err != nil { - return nil, err - } - rw.common.tsKey = tipset.Key() - rw.common.height = tipset.Height() - rw.common.stateroot = tipset.ParentState() - rw.common.parentTsKey = tipset.Parents() - // get reward actor states at each tipset once for all updates - rewardActor, err := p.node.StateGetActor(ctx, reward.Address, tsKey) - if err != nil { - return nil, err - } - - rewardActorState, err := reward.Load(cw_util.NewAPIIpldStore(ctx, p.node), rewardActor) - if err != nil { - return nil, xerrors.Errorf("read state obj (@ %s): %w", rw.common.stateroot.String(), err) - } - - if err := rw.set(rewardActorState); err != nil { - return nil, err - } - out = append(out, rw) - } - - return out, nil -} - -func (p *Processor) persistRewardActors(ctx context.Context, rewards []rewardActorInfo) error { - start := time.Now() - defer func() { - log.Debugw("Persisted Reward Actors", "duration", time.Since(start).String()) - }() - - tx, err := p.db.Begin() - if err != nil { - return xerrors.Errorf("begin chain_reward tx: %w", err) - } - - if _, err := tx.Exec(`create temp table cr (like chain_reward excluding constraints) on commit drop`); err != nil { - return xerrors.Errorf("prep chain_reward temp: %w", err) - } - - stmt, err := tx.Prepare(`copy cr ( state_root, cum_sum_baseline, cum_sum_realized, effective_network_time, effective_baseline_power, new_baseline_power, new_reward, new_reward_smoothed_position_estimate, new_reward_smoothed_velocity_estimate, total_mined_reward) from STDIN`) - if err != nil { - return xerrors.Errorf("prepare tmp chain_reward: %w", err) - } - - for _, rewardState := range rewards { - if _, err := stmt.Exec( - rewardState.common.stateroot.String(), - rewardState.cumSumBaselinePower.String(), - rewardState.cumSumRealizedPower.String(), - uint64(rewardState.effectiveNetworkTime), - rewardState.effectiveBaselinePower.String(), - rewardState.newBaselinePower.String(), - rewardState.newBaseReward.String(), - rewardState.newSmoothingEstimate.PositionEstimate.String(), - rewardState.newSmoothingEstimate.VelocityEstimate.String(), - rewardState.totalMinedReward.String(), - ); err != nil { - log.Errorw("failed to store chain power", "state_root", rewardState.common.stateroot, "error", err) - } - } - - if err := stmt.Close(); err != nil { - return xerrors.Errorf("close prepared chain_reward: %w", err) - } - - if _, err := tx.Exec(`insert into chain_reward select * from cr on conflict do nothing`); err != nil { - return xerrors.Errorf("insert chain_reward from tmp: %w", err) - } - - if err := tx.Commit(); err != nil { - return xerrors.Errorf("commit chain_reward tx: %w", err) - } - - return nil -} diff --git a/cmd/lotus-chainwatch/run.go b/cmd/lotus-chainwatch/run.go deleted file mode 100644 index 6e47a100d..000000000 --- a/cmd/lotus-chainwatch/run.go +++ /dev/null @@ -1,107 +0,0 @@ -package main - -import ( - "database/sql" - "fmt" - "net/http" - _ "net/http/pprof" - "os" - "strings" - - "github.com/filecoin-project/lotus/api/v0api" - - _ "github.com/lib/pq" - - "github.com/filecoin-project/go-jsonrpc" - logging "github.com/ipfs/go-log/v2" - "github.com/urfave/cli/v2" - "golang.org/x/xerrors" - - lcli "github.com/filecoin-project/lotus/cli" - "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/processor" - "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/scheduler" - "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/syncer" - "github.com/filecoin-project/lotus/cmd/lotus-chainwatch/util" -) - -var runCmd = &cli.Command{ - Name: "run", - Usage: "Start lotus chainwatch", - Flags: []cli.Flag{ - &cli.IntFlag{ - Name: "max-batch", - Value: 50, - }, - }, - Action: func(cctx *cli.Context) error { - go func() { - http.ListenAndServe(":6060", nil) //nolint:errcheck - }() - ll := cctx.String("log-level") - if err := logging.SetLogLevel("*", ll); err != nil { - return err - } - if err := logging.SetLogLevel("rpc", "error"); err != nil { - return err - } - - var api v0api.FullNode - var closer jsonrpc.ClientCloser - var err error - if tokenMaddr := cctx.String("api"); tokenMaddr != "" { - toks := strings.Split(tokenMaddr, ":") - if len(toks) != 2 { - return fmt.Errorf("invalid api tokens, expected :, got: %s", tokenMaddr) - } - - api, closer, err = util.GetFullNodeAPIUsingCredentials(cctx.Context, toks[1], toks[0]) - if err != nil { - return err - } - } else { - api, closer, err = lcli.GetFullNodeAPI(cctx) - if err != nil { - return err - } - } - defer closer() - ctx := lcli.ReqContext(cctx) - - v, err := api.Version(ctx) - if err != nil { - return err - } - - log.Infof("Remote version: %s", v.Version) - - maxBatch := cctx.Int("max-batch") - - db, err := sql.Open("postgres", cctx.String("db")) - if err != nil { - return err - } - defer func() { - if err := db.Close(); err != nil { - log.Errorw("Failed to close database", "error", err) - } - }() - - if err := db.Ping(); err != nil { - return xerrors.Errorf("Database failed to respond to ping (is it online?): %w", err) - } - db.SetMaxOpenConns(1350) - - sync := syncer.NewSyncer(db, api, 1400) - sync.Start(ctx) - - proc := processor.NewProcessor(ctx, db, api, maxBatch) - proc.Start(ctx) - - sched := scheduler.PrepareScheduler(db) - sched.Start(ctx) - - <-ctx.Done() - os.Exit(0) - return nil - }, -} diff --git a/cmd/lotus-chainwatch/scheduler/refresh_top_miners_by_base_reward.go b/cmd/lotus-chainwatch/scheduler/refresh_top_miners_by_base_reward.go deleted file mode 100644 index 145e84229..000000000 --- a/cmd/lotus-chainwatch/scheduler/refresh_top_miners_by_base_reward.go +++ /dev/null @@ -1,78 +0,0 @@ -package scheduler - -import ( - "context" - "database/sql" - - "golang.org/x/xerrors" -) - -func setupTopMinerByBaseRewardSchema(ctx context.Context, db *sql.DB) error { - select { - case <-ctx.Done(): - return nil - default: - } - - tx, err := db.Begin() - if err != nil { - return err - } - if _, err := tx.Exec(` - create materialized view if not exists top_miners_by_base_reward as - with total_rewards_by_miner as ( - select - b.miner, - sum(cr.new_reward * b.win_count) as total_reward - from blocks b - inner join chain_reward cr on b.parentstateroot = cr.state_root - group by 1 - ) select - rank() over (order by total_reward desc), - miner, - total_reward - from total_rewards_by_miner - group by 2, 3; - - create index if not exists top_miners_by_base_reward_miner_index - on top_miners_by_base_reward (miner); - - create materialized view if not exists top_miners_by_base_reward_max_height as - select - b."timestamp"as current_timestamp, - max(b.height) as current_height - from blocks b - join chain_reward cr on b.parentstateroot = cr.state_root - where cr.new_reward is not null - group by 1 - order by 1 desc - limit 1; - `); err != nil { - return xerrors.Errorf("create top_miners_by_base_reward views: %w", err) - } - - if err := tx.Commit(); err != nil { - return xerrors.Errorf("committing top_miners_by_base_reward views; %w", err) - } - return nil -} - -func refreshTopMinerByBaseReward(ctx context.Context, db *sql.DB) error { - select { - case <-ctx.Done(): - return nil - default: - } - - _, err := db.Exec("refresh materialized view top_miners_by_base_reward;") - if err != nil { - return xerrors.Errorf("refresh top_miners_by_base_reward: %w", err) - } - - _, err = db.Exec("refresh materialized view top_miners_by_base_reward_max_height;") - if err != nil { - return xerrors.Errorf("refresh top_miners_by_base_reward_max_height: %w", err) - } - - return nil -} diff --git a/cmd/lotus-chainwatch/scheduler/scheduler.go b/cmd/lotus-chainwatch/scheduler/scheduler.go deleted file mode 100644 index 6782bc16d..000000000 --- a/cmd/lotus-chainwatch/scheduler/scheduler.go +++ /dev/null @@ -1,60 +0,0 @@ -package scheduler - -import ( - "context" - "database/sql" - "time" - - logging "github.com/ipfs/go-log/v2" - - "golang.org/x/xerrors" -) - -var log = logging.Logger("scheduler") - -// Scheduler manages the execution of jobs triggered -// by tickers. Not externally configurable at runtime. -type Scheduler struct { - db *sql.DB -} - -// PrepareScheduler returns a ready-to-run Scheduler -func PrepareScheduler(db *sql.DB) *Scheduler { - return &Scheduler{db} -} - -func (s *Scheduler) setupSchema(ctx context.Context) error { - if err := setupTopMinerByBaseRewardSchema(ctx, s.db); err != nil { - return xerrors.Errorf("setup top miners by reward schema: %w", err) - } - return nil -} - -// Start the scheduler jobs at the defined intervals -func (s *Scheduler) Start(ctx context.Context) { - log.Debug("Starting Scheduler") - - if err := s.setupSchema(ctx); err != nil { - log.Fatalw("applying scheduling schema", "error", err) - } - - go func() { - // run once on start after schema has initialized - time.Sleep(1 * time.Minute) - if err := refreshTopMinerByBaseReward(ctx, s.db); err != nil { - log.Errorw("failed to refresh top miner", "error", err) - } - refreshTopMinerCh := time.NewTicker(30 * time.Second) - defer refreshTopMinerCh.Stop() - for { - select { - case <-refreshTopMinerCh.C: - if err := refreshTopMinerByBaseReward(ctx, s.db); err != nil { - log.Errorw("failed to refresh top miner", "error", err) - } - case <-ctx.Done(): - return - } - } - }() -} diff --git a/cmd/lotus-chainwatch/syncer/blockssub.go b/cmd/lotus-chainwatch/syncer/blockssub.go deleted file mode 100644 index ea9c079e8..000000000 --- a/cmd/lotus-chainwatch/syncer/blockssub.go +++ /dev/null @@ -1,27 +0,0 @@ -package syncer - -import ( - "context" - "time" - - "github.com/filecoin-project/lotus/chain/types" - "github.com/ipfs/go-cid" -) - -func (s *Syncer) subBlocks(ctx context.Context) { - sub, err := s.node.SyncIncomingBlocks(ctx) - if err != nil { - log.Errorf("opening incoming block channel: %+v", err) - return - } - - log.Infow("Capturing incoming blocks") - for bh := range sub { - err := s.storeHeaders(map[cid.Cid]*types.BlockHeader{ - bh.Cid(): bh, - }, false, time.Now()) - if err != nil { - log.Errorf("storing incoming block header: %+v", err) - } - } -} diff --git a/cmd/lotus-chainwatch/syncer/sync.go b/cmd/lotus-chainwatch/syncer/sync.go deleted file mode 100644 index b5e9c73d6..000000000 --- a/cmd/lotus-chainwatch/syncer/sync.go +++ /dev/null @@ -1,527 +0,0 @@ -package syncer - -import ( - "container/list" - "context" - "database/sql" - "fmt" - "sync" - "time" - - "golang.org/x/xerrors" - - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - - "github.com/filecoin-project/lotus/api/v0api" - "github.com/filecoin-project/lotus/chain/store" - "github.com/filecoin-project/lotus/chain/types" -) - -var log = logging.Logger("syncer") - -type Syncer struct { - db *sql.DB - - lookbackLimit uint64 - - headerLk sync.Mutex - node v0api.FullNode -} - -func NewSyncer(db *sql.DB, node v0api.FullNode, lookbackLimit uint64) *Syncer { - return &Syncer{ - db: db, - node: node, - lookbackLimit: lookbackLimit, - } -} - -func (s *Syncer) setupSchemas() error { - tx, err := s.db.Begin() - if err != nil { - return err - } - - if _, err := tx.Exec(` -/* tracks circulating fil available on the network at each tipset */ -create table if not exists chain_economics -( - parent_state_root text not null - constraint chain_economics_pk primary key, - circulating_fil text not null, - vested_fil text not null, - mined_fil text not null, - burnt_fil text not null, - locked_fil text not null -); - -create table if not exists block_cids -( - cid text not null - constraint block_cids_pk - primary key -); - -create unique index if not exists block_cids_cid_uindex - on block_cids (cid); - -create table if not exists blocks_synced -( - cid text not null - constraint blocks_synced_pk - primary key - constraint blocks_block_cids_cid_fk - references block_cids (cid), - synced_at int not null, - processed_at int -); - -create unique index if not exists blocks_synced_cid_uindex - on blocks_synced (cid,processed_at); - -create table if not exists block_parents -( - block text not null - constraint blocks_block_cids_cid_fk - references block_cids (cid), - parent text not null -); - -create unique index if not exists block_parents_block_parent_uindex - on block_parents (block, parent); - -create table if not exists drand_entries -( - round bigint not null - constraint drand_entries_pk - primary key, - data bytea not null -); -create unique index if not exists drand_entries_round_uindex - on drand_entries (round); - -create table if not exists block_drand_entries -( - round bigint not null - constraint block_drand_entries_drand_entries_round_fk - references drand_entries (round), - block text not null - constraint blocks_block_cids_cid_fk - references block_cids (cid) -); -create unique index if not exists block_drand_entries_round_uindex - on block_drand_entries (round, block); - -create table if not exists blocks -( - cid text not null - constraint blocks_pk - primary key - constraint blocks_block_cids_cid_fk - references block_cids (cid), - parentWeight numeric not null, - parentStateRoot text not null, - height bigint not null, - miner text not null, - timestamp bigint not null, - ticket bytea not null, - election_proof bytea, - win_count bigint, - parent_base_fee text not null, - forksig bigint not null -); - -create unique index if not exists block_cid_uindex - on blocks (cid,height); - -create materialized view if not exists state_heights - as select min(b.height) height, b.parentstateroot - from blocks b group by b.parentstateroot; - -create index if not exists state_heights_height_index - on state_heights (height); - -create index if not exists state_heights_parentstateroot_index - on state_heights (parentstateroot); -`); err != nil { - return err - } - - return tx.Commit() -} - -func (s *Syncer) Start(ctx context.Context) { - if err := logging.SetLogLevel("syncer", "info"); err != nil { - log.Fatal(err) - } - log.Debug("Starting Syncer") - - if err := s.setupSchemas(); err != nil { - log.Fatal(err) - } - - // capture all reported blocks - go s.subBlocks(ctx) - - // we need to ensure that on a restart we don't reprocess the whole flarping chain - var sinceEpoch uint64 - blkCID, height, err := s.mostRecentlySyncedBlockHeight() - if err != nil { - log.Fatalw("failed to find most recently synced block", "error", err) - } else { - if height > 0 { - log.Infow("Found starting point for syncing", "blockCID", blkCID.String(), "height", height) - sinceEpoch = uint64(height) - } - } - - // continue to keep the block headers table up to date. - notifs, err := s.node.ChainNotify(ctx) - if err != nil { - log.Fatal(err) - } - - go func() { - for notif := range notifs { - for _, change := range notif { - switch change.Type { - case store.HCCurrent: - // This case is important for capturing the initial state of a node - // which might be on a dead network with no new blocks being produced. - // It also allows a fresh Chainwatch instance to start walking the - // chain without waiting for a new block to come along. - fallthrough - case store.HCApply: - unsynced, err := s.unsyncedBlocks(ctx, change.Val, sinceEpoch) - if err != nil { - log.Errorw("failed to gather unsynced blocks", "error", err) - } - - if err := s.storeCirculatingSupply(ctx, change.Val); err != nil { - log.Errorw("failed to store circulating supply", "error", err) - } - - if len(unsynced) == 0 { - continue - } - - if err := s.storeHeaders(unsynced, true, time.Now()); err != nil { - // so this is pretty bad, need some kind of retry.. - // for now just log an error and the blocks will be attempted again on next notifi - log.Errorw("failed to store unsynced blocks", "error", err) - } - - sinceEpoch = uint64(change.Val.Height()) - case store.HCRevert: - log.Debug("revert todo") - } - } - } - }() -} - -func (s *Syncer) unsyncedBlocks(ctx context.Context, head *types.TipSet, since uint64) (map[cid.Cid]*types.BlockHeader, error) { - hasList, err := s.syncedBlocks(since, s.lookbackLimit) - if err != nil { - return nil, err - } - - // build a list of blocks that we have not synced. - toVisit := list.New() - for _, header := range head.Blocks() { - toVisit.PushBack(header) - } - - toSync := map[cid.Cid]*types.BlockHeader{} - - for toVisit.Len() > 0 { - bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader) - _, has := hasList[bh.Cid()] - if _, seen := toSync[bh.Cid()]; seen || has { - continue - } - - toSync[bh.Cid()] = bh - if len(toSync)%500 == 10 { - log.Debugw("To visit", "toVisit", toVisit.Len(), "toSync", len(toSync), "current_height", bh.Height) - } - - if bh.Height == 0 { - continue - } - - pts, err := s.node.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...)) - if err != nil { - log.Error(err) - continue - } - - for _, header := range pts.Blocks() { - toVisit.PushBack(header) - } - } - log.Debugw("Gathered unsynced blocks", "count", len(toSync)) - return toSync, nil -} - -func (s *Syncer) syncedBlocks(since, limit uint64) (map[cid.Cid]struct{}, error) { - rws, err := s.db.Query(`select bs.cid FROM blocks_synced bs left join blocks b on b.cid = bs.cid where b.height <= $1 and bs.processed_at is not null limit $2`, since, limit) - if err != nil { - return nil, xerrors.Errorf("Failed to query blocks_synced: %w", err) - } - out := map[cid.Cid]struct{}{} - - for rws.Next() { - var c string - if err := rws.Scan(&c); err != nil { - return nil, xerrors.Errorf("Failed to scan blocks_synced: %w", err) - } - - ci, err := cid.Parse(c) - if err != nil { - return nil, xerrors.Errorf("Failed to parse blocks_synced: %w", err) - } - - out[ci] = struct{}{} - } - return out, nil -} - -func (s *Syncer) mostRecentlySyncedBlockHeight() (cid.Cid, int64, error) { - rw := s.db.QueryRow(` -select blocks_synced.cid, b.height -from blocks_synced -left join blocks b on blocks_synced.cid = b.cid -where processed_at is not null -order by height desc -limit 1 -`) - - var c string - var h int64 - if err := rw.Scan(&c, &h); err != nil { - if err == sql.ErrNoRows { - return cid.Undef, 0, nil - } - return cid.Undef, -1, err - } - - ci, err := cid.Parse(c) - if err != nil { - return cid.Undef, -1, err - } - - return ci, h, nil -} - -func (s *Syncer) storeCirculatingSupply(ctx context.Context, tipset *types.TipSet) error { - supply, err := s.node.StateVMCirculatingSupplyInternal(ctx, tipset.Key()) - if err != nil { - return err - } - - ceInsert := `insert into chain_economics (parent_state_root, circulating_fil, vested_fil, mined_fil, burnt_fil, locked_fil) ` + - `values ('%s', '%s', '%s', '%s', '%s', '%s') on conflict on constraint chain_economics_pk do ` + - `update set (circulating_fil, vested_fil, mined_fil, burnt_fil, locked_fil) = ('%[2]s', '%[3]s', '%[4]s', '%[5]s', '%[6]s') ` + - `where chain_economics.parent_state_root = '%[1]s';` - - if _, err := s.db.Exec(fmt.Sprintf(ceInsert, - tipset.ParentState().String(), - supply.FilCirculating.String(), - supply.FilVested.String(), - supply.FilMined.String(), - supply.FilBurnt.String(), - supply.FilLocked.String(), - )); err != nil { - return xerrors.Errorf("insert circulating supply for tipset (%s): %w", tipset.Key().String(), err) - } - - return nil -} - -func (s *Syncer) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool, timestamp time.Time) error { - s.headerLk.Lock() - defer s.headerLk.Unlock() - if len(bhs) == 0 { - return nil - } - log.Debugw("Storing Headers", "count", len(bhs)) - - tx, err := s.db.Begin() - if err != nil { - return xerrors.Errorf("begin: %w", err) - } - - if _, err := tx.Exec(` - -create temp table bc (like block_cids excluding constraints) on commit drop; -create temp table de (like drand_entries excluding constraints) on commit drop; -create temp table bde (like block_drand_entries excluding constraints) on commit drop; -create temp table tbp (like block_parents excluding constraints) on commit drop; -create temp table bs (like blocks_synced excluding constraints) on commit drop; -create temp table b (like blocks excluding constraints) on commit drop; - - -`); err != nil { - return xerrors.Errorf("prep temp: %w", err) - } - - { - stmt, err := tx.Prepare(`copy bc (cid) from STDIN`) - if err != nil { - return err - } - - for _, bh := range bhs { - if _, err := stmt.Exec(bh.Cid().String()); err != nil { - log.Error(err) - } - } - - if err := stmt.Close(); err != nil { - return err - } - - if _, err := tx.Exec(`insert into block_cids select * from bc on conflict do nothing `); err != nil { - return xerrors.Errorf("drand entries put: %w", err) - } - } - - { - stmt, err := tx.Prepare(`copy de (round, data) from STDIN`) - if err != nil { - return err - } - - for _, bh := range bhs { - for _, ent := range bh.BeaconEntries { - if _, err := stmt.Exec(ent.Round, ent.Data); err != nil { - log.Error(err) - } - } - } - - if err := stmt.Close(); err != nil { - return err - } - - if _, err := tx.Exec(`insert into drand_entries select * from de on conflict do nothing `); err != nil { - return xerrors.Errorf("drand entries put: %w", err) - } - } - - { - stmt, err := tx.Prepare(`copy bde (round, block) from STDIN`) - if err != nil { - return err - } - - for _, bh := range bhs { - for _, ent := range bh.BeaconEntries { - if _, err := stmt.Exec(ent.Round, bh.Cid().String()); err != nil { - log.Error(err) - } - } - } - - if err := stmt.Close(); err != nil { - return err - } - - if _, err := tx.Exec(`insert into block_drand_entries select * from bde on conflict do nothing `); err != nil { - return xerrors.Errorf("block drand entries put: %w", err) - } - } - - { - stmt, err := tx.Prepare(`copy tbp (block, parent) from STDIN`) - if err != nil { - return err - } - - for _, bh := range bhs { - for _, parent := range bh.Parents { - if _, err := stmt.Exec(bh.Cid().String(), parent.String()); err != nil { - log.Error(err) - } - } - } - - if err := stmt.Close(); err != nil { - return err - } - - if _, err := tx.Exec(`insert into block_parents select * from tbp on conflict do nothing `); err != nil { - return xerrors.Errorf("parent put: %w", err) - } - } - - if sync { - - stmt, err := tx.Prepare(`copy bs (cid, synced_at) from stdin `) - if err != nil { - return err - } - - for _, bh := range bhs { - if _, err := stmt.Exec(bh.Cid().String(), timestamp.Unix()); err != nil { - log.Error(err) - } - } - - if err := stmt.Close(); err != nil { - return err - } - - if _, err := tx.Exec(`insert into blocks_synced select * from bs on conflict do nothing `); err != nil { - return xerrors.Errorf("syncd put: %w", err) - } - } - - stmt2, err := tx.Prepare(`copy b (cid, parentWeight, parentStateRoot, height, miner, "timestamp", ticket, election_proof, win_count, parent_base_fee, forksig) from stdin`) - if err != nil { - return err - } - - for _, bh := range bhs { - var eproof, winCount interface{} - if bh.ElectionProof != nil { - eproof = bh.ElectionProof.VRFProof - winCount = bh.ElectionProof.WinCount - } - - if bh.Ticket == nil { - log.Warnf("got a block with nil ticket") - - bh.Ticket = &types.Ticket{ - VRFProof: []byte{}, - } - } - - if _, err := stmt2.Exec( - bh.Cid().String(), - bh.ParentWeight.String(), - bh.ParentStateRoot.String(), - bh.Height, - bh.Miner.String(), - bh.Timestamp, - bh.Ticket.VRFProof, - eproof, - winCount, - bh.ParentBaseFee.String(), - bh.ForkSignaling); err != nil { - log.Error(err) - } - } - - if err := stmt2.Close(); err != nil { - return xerrors.Errorf("s2 close: %w", err) - } - - if _, err := tx.Exec(`insert into blocks select * from b on conflict do nothing `); err != nil { - return xerrors.Errorf("blk put: %w", err) - } - - return tx.Commit() -} diff --git a/cmd/lotus-chainwatch/util/api.go b/cmd/lotus-chainwatch/util/api.go deleted file mode 100644 index f8f22cbbf..000000000 --- a/cmd/lotus-chainwatch/util/api.go +++ /dev/null @@ -1,34 +0,0 @@ -package util - -import ( - "context" - "net/http" - - "github.com/filecoin-project/go-jsonrpc" - "github.com/filecoin-project/lotus/api/client" - "github.com/filecoin-project/lotus/api/v0api" - ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr/net" -) - -func GetFullNodeAPIUsingCredentials(ctx context.Context, listenAddr, token string) (v0api.FullNode, jsonrpc.ClientCloser, error) { - parsedAddr, err := ma.NewMultiaddr(listenAddr) - if err != nil { - return nil, nil, err - } - - _, addr, err := manet.DialArgs(parsedAddr) - if err != nil { - return nil, nil, err - } - - return client.NewFullNodeRPCV0(ctx, apiURI(addr), apiHeaders(token)) -} -func apiURI(addr string) string { - return "ws://" + addr + "/rpc/v0" -} -func apiHeaders(token string) http.Header { - headers := http.Header{} - headers.Add("Authorization", "Bearer "+token) - return headers -} diff --git a/cmd/lotus-chainwatch/util/contextStore.go b/cmd/lotus-chainwatch/util/contextStore.go deleted file mode 100644 index c93f87f9b..000000000 --- a/cmd/lotus-chainwatch/util/contextStore.go +++ /dev/null @@ -1,51 +0,0 @@ -package util - -import ( - "bytes" - "context" - "fmt" - - "github.com/ipfs/go-cid" - cbg "github.com/whyrusleeping/cbor-gen" - - "github.com/filecoin-project/lotus/api/v0api" -) - -// TODO extract this to a common location in lotus and reuse the code - -// APIIpldStore is required for AMT and HAMT access. -type APIIpldStore struct { - ctx context.Context - api v0api.FullNode -} - -func NewAPIIpldStore(ctx context.Context, api v0api.FullNode) *APIIpldStore { - return &APIIpldStore{ - ctx: ctx, - api: api, - } -} - -func (ht *APIIpldStore) Context() context.Context { - return ht.ctx -} - -func (ht *APIIpldStore) Get(ctx context.Context, c cid.Cid, out interface{}) error { - raw, err := ht.api.ChainReadObj(ctx, c) - if err != nil { - return err - } - - cu, ok := out.(cbg.CBORUnmarshaler) - if ok { - if err := cu.UnmarshalCBOR(bytes.NewReader(raw)); err != nil { - return err - } - return nil - } - return fmt.Errorf("Object does not implement CBORUnmarshaler: %T", out) -} - -func (ht *APIIpldStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) { - return cid.Undef, fmt.Errorf("Put is not implemented on APIIpldStore") -} diff --git a/scripts/lotus-chainwatch.service b/scripts/lotus-chainwatch.service deleted file mode 100644 index e121cb1d1..000000000 --- a/scripts/lotus-chainwatch.service +++ /dev/null @@ -1,15 +0,0 @@ -[Unit] -Description=Chainwatch -After=lotus-daemon.service -Requires=lotus-daemon.service - -[Service] -Environment=GOLOG_FILE="/var/log/lotus/chainwatch.log" -Environment=GOLOG_LOG_FMT="json" -Environment=LOTUS_DB="" -Environment=LOTUS_PATH="%h/.lotus" -EnvironmentFile=-/etc/lotus/chainwatch.env -ExecStart=/usr/local/bin/lotus-chainwatch run - -[Install] -WantedBy=multi-user.target From 3b617f4528eaa8fe221cd0bc59ddea718d270144 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 28 Jul 2021 17:59:37 +0200 Subject: [PATCH 4/4] mod tidy --- go.mod | 1 - go.sum | 2 -- 2 files changed, 3 deletions(-) diff --git a/go.mod b/go.mod index 5f968f6e0..a094cd6c3 100644 --- a/go.mod +++ b/go.mod @@ -100,7 +100,6 @@ require ( github.com/ipld/go-car v0.1.1-0.20201119040415-11b6074b6d4d github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018 github.com/kelseyhightower/envconfig v1.4.0 - github.com/lib/pq v1.7.0 github.com/libp2p/go-buffer-pool v0.0.2 github.com/libp2p/go-eventbus v0.2.1 github.com/libp2p/go-libp2p v0.14.2 diff --git a/go.sum b/go.sum index b22f3dc15..9757c15f3 100644 --- a/go.sum +++ b/go.sum @@ -835,8 +835,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/lib/pq v1.7.0 h1:h93mCPfUSkaul3Ka/VG8uZdmW1uMHDGxzu0NWHuJmHY= -github.com/lib/pq v1.7.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ= github.com/libp2p/go-addr-util v0.0.2 h1:7cWK5cdA5x72jX0g8iLrQWm5TRJZ6CzGdPEhWj7plWU= github.com/libp2p/go-addr-util v0.0.2/go.mod h1:Ecd6Fb3yIuLzq4bD7VcywcVSBtefcAwnUISBM3WG15E=