Merge branch 'master' into badger-viewable

This commit is contained in:
Raúl Kripalani 2020-11-06 19:34:25 +00:00
commit 3577300aee
37 changed files with 1729 additions and 218 deletions

7
.gitmodules vendored
View File

@ -1,16 +1,15 @@
[submodule "extern/filecoin-ffi"]
path = extern/filecoin-ffi
url = https://github.com/filecoin-project/filecoin-ffi.git
branch = master
[submodule "extern/serialization-vectors"]
path = extern/serialization-vectors
url = https://github.com/filecoin-project/serialization-vectors
url = https://github.com/filecoin-project/serialization-vectors.git
[submodule "extern/test-vectors"]
path = extern/test-vectors
url = https://github.com/filecoin-project/test-vectors.git
[submodule "extern/oni"]
path = extern/oni
url = https://github.com/filecoin-project/oni
url = https://github.com/filecoin-project/oni.git
[submodule "extern/blst"]
path = extern/blst
url = git@github.com:supranational/blst.git
url = https://github.com/supranational/blst.git

View File

@ -179,7 +179,7 @@ BINS+=lotus-bench
lotus-stats:
rm -f lotus-stats
go build -o lotus-stats ./cmd/lotus-stats
go build $(GOFLAGS) -o lotus-stats ./cmd/lotus-stats
go run github.com/GeertJohan/go.rice/rice append --exec lotus-stats -i ./build
.PHONY: lotus-stats
BINS+=lotus-stats

View File

@ -22,7 +22,7 @@ const UpgradeTapeHeight = -4
var UpgradeActorsV2Height = abi.ChainEpoch(10)
var UpgradeLiftoffHeight = abi.ChainEpoch(-5)
const UpgradeKumquatHeight = -6
const UpgradeKumquatHeight = 15
var DrandSchedule = map[abi.ChainEpoch]DrandEnum{
0: DrandMainnet,

View File

@ -0,0 +1,134 @@
package multisig
import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
cbg "github.com/whyrusleeping/cbor-gen"
"github.com/filecoin-project/lotus/chain/actors/adt"
)
type PendingTransactionChanges struct {
Added []TransactionChange
Modified []TransactionModification
Removed []TransactionChange
}
type TransactionChange struct {
TxID int64
Tx Transaction
}
type TransactionModification struct {
TxID int64
From Transaction
To Transaction
}
func DiffPendingTransactions(pre, cur State) (*PendingTransactionChanges, error) {
results := new(PendingTransactionChanges)
if changed, err := pre.PendingTxnChanged(cur); err != nil {
return nil, err
} else if !changed { // if nothing has changed then return an empty result and bail.
return results, nil
}
pret, err := pre.transactions()
if err != nil {
return nil, err
}
curt, err := cur.transactions()
if err != nil {
return nil, err
}
if err := adt.DiffAdtMap(pret, curt, &transactionDiffer{results, pre, cur}); err != nil {
return nil, err
}
return results, nil
}
type transactionDiffer struct {
Results *PendingTransactionChanges
pre, after State
}
func (t *transactionDiffer) AsKey(key string) (abi.Keyer, error) {
txID, err := abi.ParseIntKey(key)
if err != nil {
return nil, err
}
return abi.IntKey(txID), nil
}
func (t *transactionDiffer) Add(key string, val *cbg.Deferred) error {
txID, err := abi.ParseIntKey(key)
if err != nil {
return err
}
tx, err := t.after.decodeTransaction(val)
if err != nil {
return err
}
t.Results.Added = append(t.Results.Added, TransactionChange{
TxID: txID,
Tx: tx,
})
return nil
}
func (t *transactionDiffer) Modify(key string, from, to *cbg.Deferred) error {
txID, err := abi.ParseIntKey(key)
if err != nil {
return err
}
txFrom, err := t.pre.decodeTransaction(from)
if err != nil {
return err
}
txTo, err := t.after.decodeTransaction(to)
if err != nil {
return err
}
if approvalsChanged(txFrom.Approved, txTo.Approved) {
t.Results.Modified = append(t.Results.Modified, TransactionModification{
TxID: txID,
From: txFrom,
To: txTo,
})
}
return nil
}
func approvalsChanged(from, to []address.Address) bool {
if len(from) != len(to) {
return true
}
for idx := range from {
if from[idx] != to[idx] {
return true
}
}
return false
}
func (t *transactionDiffer) Remove(key string, val *cbg.Deferred) error {
txID, err := abi.ParseIntKey(key)
if err != nil {
return err
}
tx, err := t.pre.decodeTransaction(val)
if err != nil {
return err
}
t.Results.Removed = append(t.Results.Removed, TransactionChange{
TxID: txID,
Tx: tx,
})
return nil
}

View File

@ -1,6 +1,7 @@
package multisig
import (
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
@ -47,6 +48,10 @@ type State interface {
Signers() ([]address.Address, error)
ForEachPendingTxn(func(id int64, txn Transaction) error) error
PendingTxnChanged(State) (bool, error)
transactions() (adt.Map, error)
decodeTransaction(val *cbg.Deferred) (Transaction, error)
}
type Transaction = msig0.Transaction

View File

@ -1,17 +1,20 @@
package multisig
import (
"bytes"
"encoding/binary"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/actors/adt"
msig0 "github.com/filecoin-project/specs-actors/actors/builtin/multisig"
adt0 "github.com/filecoin-project/specs-actors/actors/util/adt"
multisig0 "github.com/filecoin-project/specs-actors/v2/actors/builtin/multisig"
)
var _ State = (*state0)(nil)
@ -68,3 +71,24 @@ func (s *state0) ForEachPendingTxn(cb func(id int64, txn Transaction) error) err
return cb(txid, (Transaction)(out))
})
}
func (s *state0) PendingTxnChanged(other State) (bool, error) {
other0, ok := other.(*state0)
if !ok {
// treat an upgrade as a change, always
return true, nil
}
return !s.State.PendingTxns.Equals(other0.PendingTxns), nil
}
func (s *state0) transactions() (adt.Map, error) {
return adt0.AsMap(s.store, s.PendingTxns)
}
func (s *state0) decodeTransaction(val *cbg.Deferred) (Transaction, error) {
var tx multisig0.Transaction
if err := tx.UnmarshalCBOR(bytes.NewReader(val.Raw)); err != nil {
return Transaction{}, err
}
return tx, nil
}

View File

@ -1,11 +1,13 @@
package multisig
import (
"bytes"
"encoding/binary"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/actors/adt"
@ -68,3 +70,24 @@ func (s *state2) ForEachPendingTxn(cb func(id int64, txn Transaction) error) err
return cb(txid, (Transaction)(out))
})
}
func (s *state2) PendingTxnChanged(other State) (bool, error) {
other2, ok := other.(*state2)
if !ok {
// treat an upgrade as a change, always
return true, nil
}
return !s.State.PendingTxns.Equals(other2.PendingTxns), nil
}
func (s *state2) transactions() (adt.Map, error) {
return adt2.AsMap(s.store, s.PendingTxns)
}
func (s *state2) decodeTransaction(val *cbg.Deferred) (Transaction, error) {
var tx msig2.Transaction
if err := tx.UnmarshalCBOR(bytes.NewReader(val.Raw)); err != nil {
return Transaction{}, err
}
return tx, nil
}

View File

@ -28,7 +28,7 @@ type state2 struct {
store adt.Store
}
func (s *state2) ThisEpochReward() (abi.StoragePower, error) {
func (s *state2) ThisEpochReward() (abi.TokenAmount, error) {
return s.State.ThisEpochReward, nil
}
@ -55,11 +55,11 @@ func (s *state2) EffectiveNetworkTime() (abi.ChainEpoch, error) {
return s.State.EffectiveNetworkTime, nil
}
func (s *state2) CumsumBaseline() (abi.StoragePower, error) {
func (s *state2) CumsumBaseline() (reward2.Spacetime, error) {
return s.State.CumsumBaseline, nil
}
func (s *state2) CumsumRealized() (abi.StoragePower, error) {
func (s *state2) CumsumRealized() (reward2.Spacetime, error) {
return s.State.CumsumRealized, nil
}

View File

@ -245,7 +245,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) {
return nil, xerrors.Errorf("make genesis block failed: %w", err)
}
cs := store.NewChainStore(bs, ds, sys, j)
cs := store.NewChainStore(bs, bs, ds, sys, j)
genfb := &types.FullBlock{Header: genb.Genesis}
gents := store.NewFullTipSet([]*types.FullBlock{genfb})

View File

@ -482,7 +482,7 @@ func MakeGenesisBlock(ctx context.Context, j journal.Journal, bs bstore.Blocksto
}
// temp chainstore
cs := store.NewChainStore(bs, datastore.NewMapDatastore(), sys, j)
cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), sys, j)
// Verify PreSealed Data
stateroot, err = VerifyPreSealedData(ctx, cs, stateroot, template, keyIDs)

View File

@ -31,7 +31,7 @@ func TestIndexSeeks(t *testing.T) {
ctx := context.TODO()
nbs := blockstore.NewTemporarySync()
cs := store.NewChainStore(nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil)
cs := store.NewChainStore(nbs, nbs, syncds.MutexWrap(datastore.NewMapDatastore()), nil, nil)
_, err = cs.Import(bytes.NewReader(gencar))
if err != nil {

View File

@ -105,6 +105,7 @@ type HeadChangeEvt struct {
// 2. a block => messages references cache.
type ChainStore struct {
bs bstore.Blockstore
localbs bstore.Blockstore
ds dstore.Batching
heaviestLk sync.Mutex
@ -130,7 +131,8 @@ type ChainStore struct {
journal journal.Journal
}
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore {
// localbs is guaranteed to fail Get* if requested block isn't stored locally
func NewChainStore(bs bstore.Blockstore, localbs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder, j journal.Journal) *ChainStore {
c, _ := lru.NewARC(DefaultMsgMetaCacheSize)
tsc, _ := lru.NewARC(DefaultTipSetCacheSize)
if j == nil {
@ -138,6 +140,7 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallB
}
cs := &ChainStore{
bs: bs,
localbs: localbs,
ds: ds,
bestTips: pubsub.New(64),
tipsets: make(map[abi.ChainEpoch][]cid.Cid),
@ -542,7 +545,7 @@ func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
// GetBlock fetches a BlockHeader with the supplied CID. It returns
// blockstore.ErrNotFound if the block was not found in the BlockStore.
func (cs *ChainStore) GetBlock(c cid.Cid) (*types.BlockHeader, error) {
sb, err := cs.bs.Get(c)
sb, err := cs.localbs.Get(c)
if err != nil {
return nil, err
}
@ -813,7 +816,7 @@ func (cs *ChainStore) GetCMessage(c cid.Cid) (types.ChainMsg, error) {
}
func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) {
sb, err := cs.bs.Get(c)
sb, err := cs.localbs.Get(c)
if err != nil {
log.Errorf("get message get failed: %s: %s", c, err)
return nil, err
@ -823,7 +826,7 @@ func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) {
}
func (cs *ChainStore) GetSignedMessage(c cid.Cid) (*types.SignedMessage, error) {
sb, err := cs.bs.Get(c)
sb, err := cs.localbs.Get(c)
if err != nil {
log.Errorf("get message get failed: %s: %s", c, err)
return nil, err
@ -959,7 +962,7 @@ func (cs *ChainStore) ReadMsgMetaCids(mmc cid.Cid) ([]cid.Cid, []cid.Cid, error)
return mmcids.bls, mmcids.secpk, nil
}
cst := cbor.NewCborStore(cs.bs)
cst := cbor.NewCborStore(cs.localbs)
var msgmeta types.MsgMeta
if err := cst.Get(context.TODO(), mmc, &msgmeta); err != nil {
return nil, nil, xerrors.Errorf("failed to load msgmeta (%s): %w", mmc, err)

View File

@ -70,7 +70,7 @@ func BenchmarkGetRandomness(b *testing.B) {
b.Fatal(err)
}
cs := store.NewChainStore(bs, mds, nil, nil)
cs := store.NewChainStore(bs, bs, mds, nil, nil)
b.ResetTimer()
@ -104,7 +104,7 @@ func TestChainExportImport(t *testing.T) {
}
nbs := blockstore.NewTemporary()
cs := store.NewChainStore(nbs, datastore.NewMapDatastore(), nil, nil)
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil)
root, err := cs.Import(buf)
if err != nil {
@ -138,7 +138,7 @@ func TestChainExportImportFull(t *testing.T) {
}
nbs := blockstore.NewTemporary()
cs := store.NewChainStore(nbs, datastore.NewMapDatastore(), nil, nil)
cs := store.NewChainStore(nbs, nbs, datastore.NewMapDatastore(), nil, nil)
root, err := cs.Import(buf)
if err != nil {
t.Fatal(err)

View File

@ -30,7 +30,7 @@ type GasCharge struct {
}
func (g GasCharge) Total() int64 {
return g.ComputeGas*GasComputeMulti + g.StorageGas*GasStorageMulti
return g.ComputeGas + g.StorageGas
}
func (g GasCharge) WithVirtual(compute, storage int64) GasCharge {
out := g
@ -85,6 +85,9 @@ type Pricelist interface {
var prices = map[abi.ChainEpoch]Pricelist{
abi.ChainEpoch(0): &pricelistV0{
computeGasMulti: 1,
storageGasMulti: 1000,
onChainMessageComputeBase: 38863,
onChainMessageStorageBase: 36,
onChainMessageStoragePerByte: 1,

View File

@ -18,6 +18,8 @@ type scalingCost struct {
}
type pricelistV0 struct {
computeGasMulti int64
storageGasMulti int64
///////////////////////////////////////////////////////////////////////////
// System operations
///////////////////////////////////////////////////////////////////////////
@ -99,12 +101,12 @@ var _ Pricelist = (*pricelistV0)(nil)
// OnChainMessage returns the gas used for storing a message of a given size in the chain.
func (pl *pricelistV0) OnChainMessage(msgSize int) GasCharge {
return newGasCharge("OnChainMessage", pl.onChainMessageComputeBase,
pl.onChainMessageStorageBase+pl.onChainMessageStoragePerByte*int64(msgSize))
(pl.onChainMessageStorageBase+pl.onChainMessageStoragePerByte*int64(msgSize))*pl.storageGasMulti)
}
// OnChainReturnValue returns the gas used for storing the response of a message in the chain.
func (pl *pricelistV0) OnChainReturnValue(dataSize int) GasCharge {
return newGasCharge("OnChainReturnValue", 0, int64(dataSize)*pl.onChainReturnValuePerByte)
return newGasCharge("OnChainReturnValue", 0, int64(dataSize)*pl.onChainReturnValuePerByte*pl.storageGasMulti)
}
// OnMethodInvocation returns the gas used when invoking a method.
@ -136,18 +138,18 @@ func (pl *pricelistV0) OnIpldGet() GasCharge {
// OnIpldPut returns the gas used for storing an object
func (pl *pricelistV0) OnIpldPut(dataSize int) GasCharge {
return newGasCharge("OnIpldPut", pl.ipldPutBase, int64(dataSize)*pl.ipldPutPerByte).
return newGasCharge("OnIpldPut", pl.ipldPutBase, int64(dataSize)*pl.ipldPutPerByte*pl.storageGasMulti).
WithExtra(dataSize)
}
// OnCreateActor returns the gas used for creating an actor
func (pl *pricelistV0) OnCreateActor() GasCharge {
return newGasCharge("OnCreateActor", pl.createActorCompute, pl.createActorStorage)
return newGasCharge("OnCreateActor", pl.createActorCompute, pl.createActorStorage*pl.storageGasMulti)
}
// OnDeleteActor returns the gas used for deleting an actor
func (pl *pricelistV0) OnDeleteActor() GasCharge {
return newGasCharge("OnDeleteActor", 0, pl.deleteActor)
return newGasCharge("OnDeleteActor", 0, pl.deleteActor*pl.storageGasMulti)
}
// OnVerifySignature

View File

@ -262,7 +262,7 @@ var importBenchCmd = &cli.Command{
}
metadataDs := datastore.NewMapDatastore()
cs := store.NewChainStore(bs, metadataDs, vm.Syscalls(verifier), nil)
cs := store.NewChainStore(bs, bs, metadataDs, vm.Syscalls(verifier), nil)
stm := stmgr.NewStateManager(cs)
startTime := time.Now()

View File

@ -188,7 +188,7 @@ var chainBalanceStateCmd = &cli.Command{
return err
}
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)
@ -408,7 +408,7 @@ var chainPledgeCmd = &cli.Command{
return err
}
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)

48
cmd/lotus-shed/cid.go Normal file
View File

@ -0,0 +1,48 @@
package main
import (
"encoding/base64"
"encoding/hex"
"fmt"
"github.com/urfave/cli/v2"
"github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
)
var cidCmd = &cli.Command{
Name: "cid",
Subcommands: cli.Commands{
cidIdCmd,
},
}
var cidIdCmd = &cli.Command{
Name: "id",
Usage: "create identity CID from hex or base64 data",
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return fmt.Errorf("must specify data")
}
dec, err := hex.DecodeString(cctx.Args().First())
if err != nil {
dec, err = base64.StdEncoding.DecodeString(cctx.Args().First())
if err != nil {
return err
}
}
builder := cid.V1Builder{Codec: cid.Raw, MhType: mh.IDENTITY}
c, err := builder.Sum(dec)
if err != nil {
return err
}
fmt.Println(c)
return nil
},
}

View File

@ -90,7 +90,7 @@ var exportChainCmd = &cli.Command{
return err
}
cs := store.NewChainStore(bs, mds, nil, nil)
cs := store.NewChainStore(bs, bs, mds, nil, nil)
if err := cs.Load(); err != nil {
return err
}

View File

@ -52,7 +52,7 @@ var genesisVerifyCmd = &cli.Command{
}
bs := blockstore.NewBlockstore(datastore.NewMapDatastore())
cs := store.NewChainStore(bs, datastore.NewMapDatastore(), nil, nil)
cs := store.NewChainStore(bs, bs, datastore.NewMapDatastore(), nil, nil)
cf := cctx.Args().Get(0)
f, err := os.Open(cf)

View File

@ -48,6 +48,7 @@ func main() {
msgCmd,
electionCmd,
rpcCmd,
cidCmd,
}
app := &cli.App{

View File

@ -169,7 +169,7 @@ var stateTreePruneCmd = &cli.Command{
return nil
}
cs := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
cs := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), nil)
if err := cs.Load(); err != nil {
return fmt.Errorf("loading chainstore: %w", err)
}

View File

@ -25,6 +25,7 @@ var sectorsCmd = &cli.Command{
Flags: []cli.Flag{},
Subcommands: []*cli.Command{
terminateSectorCmd,
terminateSectorPenaltyEstimationCmd,
},
}
@ -131,3 +132,101 @@ var terminateSectorCmd = &cli.Command{
return nil
},
}
func findPenaltyInInternalExecutions(prefix string, trace []types.ExecutionTrace) {
for _, im := range trace {
if im.Msg.To.String() == "f099" /*Burn actor*/ {
fmt.Printf("Estimated termination penalty: %s attoFIL\n", im.Msg.Value)
return
}
findPenaltyInInternalExecutions(prefix+"\t", im.Subcalls)
}
}
var terminateSectorPenaltyEstimationCmd = &cli.Command{
Name: "termination-estimate",
Usage: "Estimate the termination penalty",
ArgsUsage: "[sectorNum1 sectorNum2 ...]",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() < 1 {
return fmt.Errorf("at least one sector must be specified")
}
nodeApi, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
api, acloser, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer acloser()
ctx := lcli.ReqContext(cctx)
maddr, err := api.ActorAddress(ctx)
if err != nil {
return err
}
mi, err := nodeApi.StateMinerInfo(ctx, maddr, types.EmptyTSK)
if err != nil {
return err
}
terminationDeclarationParams := []miner2.TerminationDeclaration{}
for _, sn := range cctx.Args().Slice() {
sectorNum, err := strconv.ParseUint(sn, 10, 64)
if err != nil {
return fmt.Errorf("could not parse sector number: %w", err)
}
sectorbit := bitfield.New()
sectorbit.Set(sectorNum)
loca, err := nodeApi.StateSectorPartition(ctx, maddr, abi.SectorNumber(sectorNum), types.EmptyTSK)
if err != nil {
return fmt.Errorf("get state sector partition %s", err)
}
para := miner2.TerminationDeclaration{
Deadline: loca.Deadline,
Partition: loca.Partition,
Sectors: sectorbit,
}
terminationDeclarationParams = append(terminationDeclarationParams, para)
}
terminateSectorParams := &miner2.TerminateSectorsParams{
Terminations: terminationDeclarationParams,
}
sp, err := actors.SerializeParams(terminateSectorParams)
if err != nil {
return xerrors.Errorf("serializing params: %w", err)
}
msg := &types.Message{
From: mi.Owner,
To: maddr,
Method: miner.Methods.TerminateSectors,
Value: big.Zero(),
Params: sp,
}
//TODO: 4667 add an option to give a more precise estimation with pending termination penalty excluded
invocResult, err := nodeApi.StateCall(ctx, msg, types.TipSetKey{})
if err != nil {
return xerrors.Errorf("fail to state call: %w", err)
}
findPenaltyInInternalExecutions("\t", invocResult.ExecutionTrace.Subcalls)
return nil
},
}

File diff suppressed because it is too large Load Diff

View File

@ -4,10 +4,10 @@ services:
influxdb:
image: influxdb:latest
container_name: influxdb
ports:
- "18086:8086"
environment:
- INFLUXDB_DB=lotus
ports:
- "8086:8086"
volumes:
- influxdb:/var/lib/influxdb
@ -15,7 +15,7 @@ services:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
- "13000:3000"
links:
- influxdb
volumes:

View File

@ -1,3 +1,3 @@
export INFLUX_ADDR="http://localhost:8086"
export INFLUX_ADDR="http://localhost:18086"
export INFLUX_USER=""
export INFLUX_PASS=""

View File

@ -2,53 +2,137 @@ package main
import (
"context"
"flag"
"os"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/tools/stats"
logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
)
var log = logging.Logger("stats")
const (
influxAddrEnvVar = "INFLUX_ADDR"
influxUserEnvVar = "INFLUX_USER"
influxPassEnvVar = "INFLUX_PASS"
)
func main() {
var repo string = "~/.lotus"
var database string = "lotus"
var reset bool = false
var nosync bool = false
var height int64 = 0
var headlag int = 3
local := []*cli.Command{
runCmd,
versionCmd,
}
flag.StringVar(&repo, "repo", repo, "lotus repo path")
flag.StringVar(&database, "database", database, "influx database")
flag.Int64Var(&height, "height", height, "block height to start syncing from (0 will resume)")
flag.IntVar(&headlag, "head-lag", headlag, "number of head events to hold to protect against small reorgs")
flag.BoolVar(&reset, "reset", reset, "truncate database before starting stats gathering")
flag.BoolVar(&nosync, "nosync", nosync, "skip waiting for sync")
app := &cli.App{
Name: "lotus-stats",
Usage: "Collect basic information about a filecoin network using lotus",
Version: build.UserVersion(),
Flags: []cli.Flag{
&cli.StringFlag{
Name: "lotus-path",
EnvVars: []string{"LOTUS_PATH"},
Value: "~/.lotus", // TODO: Consider XDG_DATA_HOME
},
&cli.StringFlag{
Name: "log-level",
EnvVars: []string{"LOTUS_STATS_LOG_LEVEL"},
Value: "info",
},
},
Before: func(cctx *cli.Context) error {
return logging.SetLogLevel("stats", cctx.String("log-level"))
},
Commands: local,
}
flag.Parse()
if err := app.Run(os.Args); err != nil {
log.Errorw("exit in error", "err", err)
os.Exit(1)
return
}
}
var versionCmd = &cli.Command{
Name: "version",
Usage: "Print version",
Action: func(cctx *cli.Context) error {
cli.VersionPrinter(cctx)
return nil
},
}
var runCmd = &cli.Command{
Name: "run",
Usage: "",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "influx-database",
EnvVars: []string{"LOTUS_STATS_INFLUX_DATABASE"},
Usage: "influx database",
Value: "",
},
&cli.StringFlag{
Name: "influx-hostname",
EnvVars: []string{"LOTUS_STATS_INFLUX_HOSTNAME"},
Value: "http://localhost:8086",
Usage: "influx hostname",
},
&cli.StringFlag{
Name: "influx-username",
EnvVars: []string{"LOTUS_STATS_INFLUX_USERNAME"},
Usage: "influx username",
Value: "",
},
&cli.StringFlag{
Name: "influx-password",
EnvVars: []string{"LOTUS_STATS_INFLUX_PASSWORD"},
Usage: "influx password",
Value: "",
},
&cli.IntFlag{
Name: "height",
EnvVars: []string{"LOTUS_STATS_HEIGHT"},
Usage: "tipset height to start processing from",
Value: 0,
},
&cli.IntFlag{
Name: "head-lag",
EnvVars: []string{"LOTUS_STATS_HEAD_LAG"},
Usage: "the number of tipsets to delay processing on to smooth chain reorgs",
Value: int(build.MessageConfidence),
},
&cli.BoolFlag{
Name: "no-sync",
EnvVars: []string{"LOTUS_STATS_NO_SYNC"},
Usage: "do not wait for chain sync to complete",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
ctx := context.Background()
influx, err := stats.InfluxClient(os.Getenv(influxAddrEnvVar), os.Getenv(influxUserEnvVar), os.Getenv(influxPassEnvVar))
resetFlag := cctx.Bool("reset")
noSyncFlag := cctx.Bool("no-sync")
heightFlag := cctx.Int("height")
headLagFlag := cctx.Int("head-lag")
influxAddrFlag := cctx.String("influx-addr")
influxUserFlag := cctx.String("influx-user")
influxPassFlag := cctx.String("influx-pass")
influxDatabaseFlag := cctx.String("influx-database")
influx, err := stats.InfluxClient(influxAddrFlag, influxUserFlag, influxPassFlag)
if err != nil {
log.Fatal(err)
}
if reset {
if err := stats.ResetDatabase(influx, database); err != nil {
if resetFlag {
if err := stats.ResetDatabase(influx, influxDatabaseFlag); err != nil {
log.Fatal(err)
}
}
if !reset && height == 0 {
h, err := stats.GetLastRecordedHeight(influx, database)
height := int64(heightFlag)
if !resetFlag && height == 0 {
h, err := stats.GetLastRecordedHeight(influx, influxDatabaseFlag)
if err != nil {
log.Info(err)
}
@ -56,17 +140,20 @@ func main() {
height = h
}
api, closer, err := stats.GetFullNodeAPI(ctx, repo)
api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
log.Fatal(err)
return err
}
defer closer()
if !nosync {
if !noSyncFlag {
if err := stats.WaitForSyncComplete(ctx, api); err != nil {
log.Fatal(err)
}
}
stats.Collect(ctx, api, influx, database, height, headlag)
stats.Collect(ctx, api, influx, influxDatabaseFlag, height, headLagFlag)
return nil
},
}

View File

@ -1,10 +1,10 @@
#!/usr/bin/env bash
GRAFANA_HOST="localhost:3000"
GRAFANA_HOST="http://localhost:13000"
curl -s -XPOST http://admin:admin@$GRAFANA_HOST/api/datasources -H 'Content-Type: text/json' --data-binary @- > /dev/null << EOF
{
"name":"InfluxDB",
"name":"filecoin-ntwk-localstats",
"type":"influxdb",
"database":"lotus",
"url": "http://influxdb:8086",

View File

@ -427,7 +427,7 @@ func ImportChain(r repo.Repo, fname string, snapshot bool) (err error) {
if err != nil {
return xerrors.Errorf("failed to open journal: %w", err)
}
cst := store.NewChainStore(bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j)
cst := store.NewChainStore(bs, bs, mds, vm.Syscalls(ffiwrapper.ProofVerifier), j)
log.Infof("importing chain from %s...", fname)

View File

@ -87,7 +87,7 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, preroot
syscalls = vm.Syscalls(ffiwrapper.ProofVerifier)
vmRand = NewFixedRand()
cs = store.NewChainStore(bs, ds, syscalls, nil)
cs = store.NewChainStore(bs, bs, ds, syscalls, nil)
sm = stmgr.NewStateManager(cs)
)

View File

@ -0,0 +1,95 @@
package blockstore
import (
"context"
"sync"
"time"
"golang.org/x/xerrors"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("blockstore")
type FallbackStore struct {
blockstore.Blockstore
fallbackGetBlock func(context.Context, cid.Cid) (blocks.Block, error)
lk sync.RWMutex
}
func (fbs *FallbackStore) SetFallback(fg func(context.Context, cid.Cid) (blocks.Block, error)) {
fbs.lk.Lock()
defer fbs.lk.Unlock()
fbs.fallbackGetBlock = fg
}
func (fbs *FallbackStore) getFallback(c cid.Cid) (blocks.Block, error) {
log.Errorw("fallbackstore: Block not found locally, fetching from the network", "cid", c)
fbs.lk.RLock()
defer fbs.lk.RUnlock()
if fbs.fallbackGetBlock == nil {
// FallbackStore wasn't configured yet (chainstore/bitswap aren't up yet)
// Wait for a bit and retry
fbs.lk.RUnlock()
time.Sleep(5 * time.Second)
fbs.lk.RLock()
if fbs.fallbackGetBlock == nil {
log.Errorw("fallbackstore: fallbackGetBlock not configured yet")
return nil, blockstore.ErrNotFound
}
}
ctx, cancel := context.WithTimeout(context.TODO(), 120*time.Second)
defer cancel()
b, err := fbs.fallbackGetBlock(ctx, c)
if err != nil {
return nil, err
}
// chain bitswap puts blocks in temp blockstore which is cleaned up
// every few min (to drop any messages we fetched but don't want)
// in this case we want to keep this block around
if err := fbs.Put(b); err != nil {
return nil, xerrors.Errorf("persisting fallback-fetched block: %w", err)
}
return b, nil
}
func (fbs *FallbackStore) Get(c cid.Cid) (blocks.Block, error) {
b, err := fbs.Blockstore.Get(c)
switch err {
case nil:
return b, nil
case blockstore.ErrNotFound:
return fbs.getFallback(c)
default:
return b, err
}
}
func (fbs *FallbackStore) GetSize(c cid.Cid) (int, error) {
sz, err := fbs.Blockstore.GetSize(c)
switch err {
case nil:
return sz, nil
case blockstore.ErrNotFound:
b, err := fbs.getFallback(c)
if err != nil {
return 0, err
}
return len(b.RawData()), nil
default:
return sz, err
}
}
var _ blockstore.Blockstore = &FallbackStore{}

View File

@ -94,7 +94,7 @@ func (bs *BufferedBS) DeleteBlock(c cid.Cid) error {
}
func (bs *BufferedBS) Get(c cid.Cid) (block.Block, error) {
if out, err := bs.read.Get(c); err != nil {
if out, err := bs.write.Get(c); err != nil {
if err != bstore.ErrNotFound {
return nil, err
}
@ -102,7 +102,7 @@ func (bs *BufferedBS) Get(c cid.Cid) (block.Block, error) {
return out, nil
}
return bs.write.Get(c)
return bs.read.Get(c)
}
func (bs *BufferedBS) GetSize(c cid.Cid) (int, error) {
@ -115,7 +115,7 @@ func (bs *BufferedBS) GetSize(c cid.Cid) (int, error) {
}
func (bs *BufferedBS) Put(blk block.Block) error {
has, err := bs.read.Has(blk.Cid())
has, err := bs.read.Has(blk.Cid()) // TODO: consider dropping this check
if err != nil {
return err
}
@ -128,7 +128,7 @@ func (bs *BufferedBS) Put(blk block.Block) error {
}
func (bs *BufferedBS) Has(c cid.Cid) (bool, error) {
has, err := bs.read.Has(c)
has, err := bs.write.Has(c)
if err != nil {
return false, err
}
@ -136,7 +136,7 @@ func (bs *BufferedBS) Has(c cid.Cid) (bool, error) {
return true, nil
}
return bs.write.Has(c)
return bs.read.Has(c)
}
func (bs *BufferedBS) HashOnRead(hor bool) {

View File

@ -3,6 +3,7 @@ package node
import (
"context"
"errors"
"os"
"time"
metricsi "github.com/ipfs/go-metrics-interface"
@ -138,6 +139,7 @@ const (
HeadMetricsKey
SettlePaymentChannelsKey
RunPeerTaggerKey
SetupFallbackBlockstoreKey
SetApiEndpointKey
@ -521,7 +523,13 @@ func Repo(r repo.Repo) Option {
Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing
Override(new(dtypes.MetadataDS), modules.Datastore),
Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore),
Override(new(dtypes.ChainRawBlockstore), modules.ChainRawBlockstore),
Override(new(dtypes.ChainBlockstore), From(new(dtypes.ChainRawBlockstore))),
If(os.Getenv("LOTUS_ENABLE_CHAINSTORE_FALLBACK") == "1",
Override(new(dtypes.ChainBlockstore), modules.FallbackChainBlockstore),
Override(SetupFallbackBlockstoreKey, modules.SetupFallbackBlockstore),
),
Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr),
Override(new(dtypes.ClientMultiDstore), modules.ClientMultiDatastore),

View File

@ -192,11 +192,19 @@ func gasEstimateGasPremium(cstore *store.ChainStore, nblocksincl uint64) (types.
return premium, nil
}
func (a *GasAPI) GasEstimateGasLimit(ctx context.Context, msgIn *types.Message, _ types.TipSetKey) (int64, error) {
return gasEstimateGasLimit(ctx, a.Chain, a.Stmgr, a.Mpool, msgIn)
func (a *GasAPI) GasEstimateGasLimit(ctx context.Context, msgIn *types.Message, tsk types.TipSetKey) (int64, error) {
ts, err := a.Chain.GetTipSetFromKey(tsk)
if err != nil {
return -1, xerrors.Errorf("getting tipset: %w", err)
}
func (m *GasModule) GasEstimateGasLimit(ctx context.Context, msgIn *types.Message, _ types.TipSetKey) (int64, error) {
return gasEstimateGasLimit(ctx, m.Chain, m.Stmgr, m.Mpool, msgIn)
return gasEstimateGasLimit(ctx, a.Chain, a.Stmgr, a.Mpool, msgIn, ts)
}
func (m *GasModule) GasEstimateGasLimit(ctx context.Context, msgIn *types.Message, tsk types.TipSetKey) (int64, error) {
ts, err := m.Chain.GetTipSetFromKey(tsk)
if err != nil {
return -1, xerrors.Errorf("getting tipset: %w", err)
}
return gasEstimateGasLimit(ctx, m.Chain, m.Stmgr, m.Mpool, msgIn, ts)
}
func gasEstimateGasLimit(
ctx context.Context,
@ -204,13 +212,13 @@ func gasEstimateGasLimit(
smgr *stmgr.StateManager,
mpool *messagepool.MessagePool,
msgIn *types.Message,
currTs *types.TipSet,
) (int64, error) {
msg := *msgIn
msg.GasLimit = build.BlockGasLimit
msg.GasFeeCap = types.NewInt(uint64(build.MinimumBaseFee) + 1)
msg.GasPremium = types.NewInt(1)
currTs := cstore.GetHeaviestTipSet()
fromA, err := smgr.ResolveToKeyAddress(ctx, msgIn.From, currTs)
if err != nil {
return -1, xerrors.Errorf("getting key address: %w", err)

View File

@ -76,7 +76,7 @@ func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds
return mp, nil
}
func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainBlockstore, error) {
func ChainRawBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainRawBlockstore, error) {
bs, err := r.Blockstore(repo.BlockstoreChain)
if err != nil {
return nil, err
@ -91,16 +91,32 @@ func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo
return cbs, nil
}
func ChainGCBlockstore(bs dtypes.ChainBlockstore, gcl dtypes.ChainGCLocker) dtypes.ChainGCBlockstore {
func ChainGCBlockstore(bs dtypes.ChainRawBlockstore, gcl dtypes.ChainGCLocker) dtypes.ChainGCBlockstore {
return blockstore.NewGCBlockstore(bs, gcl)
}
func ChainBlockService(bs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService {
func ChainBlockService(bs dtypes.ChainRawBlockstore, rem dtypes.ChainBitswap) dtypes.ChainBlockService {
return blockservice.New(bs, rem)
}
func ChainStore(lc fx.Lifecycle, bs dtypes.ChainBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
chain := store.NewChainStore(bs, ds, syscalls, j)
func FallbackChainBlockstore(rbs dtypes.ChainRawBlockstore) dtypes.ChainBlockstore {
return &blockstore.FallbackStore{
Blockstore: rbs,
}
}
func SetupFallbackBlockstore(cbs dtypes.ChainBlockstore, rem dtypes.ChainBitswap) error {
fbs, ok := cbs.(*blockstore.FallbackStore)
if !ok {
return xerrors.Errorf("expected a FallbackStore")
}
fbs.SetFallback(rem.GetBlock)
return nil
}
func ChainStore(bs dtypes.ChainBlockstore, lbs dtypes.ChainRawBlockstore, ds dtypes.MetadataDS, syscalls vm.SyscallBuilder, j journal.Journal) *store.ChainStore {
chain := store.NewChainStore(bs, lbs, ds, syscalls, j)
if err := chain.Load(); err != nil {
log.Warnf("loading chain state from disk: %s", err)

View File

@ -23,7 +23,8 @@ import (
// dy default it's namespaced under /metadata in main repo datastore
type MetadataDS datastore.Batching
type ChainBlockstore blockstore.Blockstore
type ChainRawBlockstore blockstore.Blockstore
type ChainBlockstore blockstore.Blockstore // optionally bitswap backed
type ChainGCLocker blockstore.GCLocker
type ChainGCBlockstore blockstore.GCBlockstore

View File

@ -160,7 +160,7 @@ func NewMemory(opts *MemRepoOptions) *MemRepo {
return &MemRepo{
repoLock: make(chan struct{}, 1),
blockstore: llblockstore.WrapIDStore(llblockstore.NewTemporarySync()),
blockstore: blockstore.WrapIDStore(blockstore.NewTemporarySync()),
datastore: opts.Ds,
configF: opts.ConfigF,
keystore: opts.KeyStore,