feat: chain node: Move consensus slasher to internal service

This commit is contained in:
Łukasz Magiera 2023-08-01 17:28:47 +02:00
parent 9b4992b834
commit fe7cf0c39a
9 changed files with 305 additions and 188 deletions

View File

@ -0,0 +1,179 @@
package slashsvc
import (
"context"
"time"
"github.com/ipfs/go-cid"
levelds "github.com/ipfs/go-ds-leveldb"
logging "github.com/ipfs/go-log/v2"
ldbopts "github.com/syndtr/goleveldb/leveldb/opt"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
"github.com/filecoin-project/lotus/chain/types"
)
var log = logging.Logger("slashsvc")
type ConsensusSlasherApi interface {
ChainHead(context.Context) (*types.TipSet, error)
ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error)
MpoolPushMessage(ctx context.Context, msg *types.Message, spec *lapi.MessageSendSpec) (*types.SignedMessage, error)
SyncIncomingBlocks(context.Context) (<-chan *types.BlockHeader, error)
WalletDefaultAddress(context.Context) (address.Address, error)
}
func SlashConsensus(ctx context.Context, a ConsensusSlasherApi, p string, from string) error {
var fromAddr address.Address
ds, err := levelds.NewDatastore(p, &levelds.Options{
Compression: ldbopts.NoCompression,
NoSync: false,
Strict: ldbopts.StrictAll,
ReadOnly: false,
})
if err != nil {
return xerrors.Errorf("open leveldb: %w", err)
}
sf := slashfilter.New(ds)
if from == "" {
defaddr, err := a.WalletDefaultAddress(ctx)
if err != nil {
return err
}
fromAddr = defaddr
} else {
addr, err := address.NewFromString(from)
if err != nil {
return err
}
fromAddr = addr
}
blocks, err := a.SyncIncomingBlocks(ctx)
if err != nil {
return xerrors.Errorf("sync incoming blocks failed: %w", err)
}
log.Infow("consensus fault reporter", "from", fromAddr)
go func() {
for block := range blocks {
otherBlock, extraBlock, fault, err := slashFilterMinedBlock(ctx, sf, a, block)
if err != nil {
log.Errorf("slash detector errored: %s", err)
continue
}
if fault {
log.Errorf("<!!> SLASH FILTER DETECTED FAULT DUE TO BLOCKS %s and %s", otherBlock.Cid(), block.Cid())
bh1, err := cborutil.Dump(otherBlock)
if err != nil {
log.Errorf("could not dump otherblock:%s, err:%s", otherBlock.Cid(), err)
continue
}
bh2, err := cborutil.Dump(block)
if err != nil {
log.Errorf("could not dump block:%s, err:%s", block.Cid(), err)
continue
}
params := miner.ReportConsensusFaultParams{
BlockHeader1: bh1,
BlockHeader2: bh2,
}
if extraBlock != nil {
be, err := cborutil.Dump(extraBlock)
if err != nil {
log.Errorf("could not dump block:%s, err:%s", block.Cid(), err)
continue
}
params.BlockHeaderExtra = be
}
enc, err := actors.SerializeParams(&params)
if err != nil {
log.Errorf("could not serialize declare faults parameters: %s", err)
continue
}
for {
head, err := a.ChainHead(ctx)
if err != nil || head.Height() > block.Height {
break
}
time.Sleep(time.Second * 10)
}
message, err := a.MpoolPushMessage(ctx, &types.Message{
To: block.Miner,
From: fromAddr,
Value: types.NewInt(0),
Method: builtin.MethodsMiner.ReportConsensusFault,
Params: enc,
}, nil)
if err != nil {
log.Errorf("ReportConsensusFault to messagepool error:%s", err)
continue
}
log.Infof("ReportConsensusFault message CID:%s", message.Cid())
}
}
}()
return nil
}
func slashFilterMinedBlock(ctx context.Context, sf *slashfilter.SlashFilter, a ConsensusSlasherApi, blockB *types.BlockHeader) (*types.BlockHeader, *types.BlockHeader, bool, error) {
blockC, err := a.ChainGetBlock(ctx, blockB.Parents[0])
if err != nil {
return nil, nil, false, xerrors.Errorf("chain get block error:%s", err)
}
blockACid, fault, err := sf.MinedBlock(ctx, blockB, blockC.Height)
if err != nil {
return nil, nil, false, xerrors.Errorf("slash filter check block error:%s", err)
}
if !fault {
return nil, nil, false, nil
}
blockA, err := a.ChainGetBlock(ctx, blockACid)
if err != nil {
return nil, nil, false, xerrors.Errorf("failed to get blockA: %w", err)
}
// (a) double-fork mining (2 blocks at one epoch)
if blockA.Height == blockB.Height {
return blockA, nil, true, nil
}
// (b) time-offset mining faults (2 blocks with the same parents)
if types.CidArrsEqual(blockB.Parents, blockA.Parents) {
return blockA, nil, true, nil
}
// (c) parent-grinding fault
// Here extra is the "witness", a third block that shows the connection between A and B as
// A's sibling and B's parent.
// Specifically, since A is of lower height, it must be that B was mined omitting A from its tipset
//
// B
// |
// [A, C]
if types.CidArrsEqual(blockA.Parents, blockC.Parents) && blockA.Height == blockC.Height &&
types.CidArrsContains(blockB.Parents, blockC.Cid()) && !types.CidArrsContains(blockB.Parents, blockA.Cid()) {
return blockA, blockC, true, nil
}
log.Error("unexpectedly reached end of slashFilterMinedBlock despite fault being reported!")
return nil, nil, false, nil
}

View File

@ -15,14 +15,11 @@ import (
"path/filepath" "path/filepath"
"runtime/pprof" "runtime/pprof"
"strings" "strings"
"time"
"github.com/DataDog/zstd" "github.com/DataDog/zstd"
levelds "github.com/ipfs/go-ds-leveldb"
metricsprom "github.com/ipfs/go-metrics-prometheus" metricsprom "github.com/ipfs/go-metrics-prometheus"
"github.com/mitchellh/go-homedir" "github.com/mitchellh/go-homedir"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
ldbopts "github.com/syndtr/goleveldb/leveldb/opt"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"go.opencensus.io/plugin/runmetrics" "go.opencensus.io/plugin/runmetrics"
"go.opencensus.io/stats" "go.opencensus.io/stats"
@ -31,20 +28,14 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"gopkg.in/cheggaaa/pb.v1" "gopkg.in/cheggaaa/pb.v1"
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-paramfetch" "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
lapi "github.com/filecoin-project/lotus/api" lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/beacon/drand" "github.com/filecoin-project/lotus/chain/beacon/drand"
"github.com/filecoin-project/lotus/chain/consensus" "github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns" "github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
"github.com/filecoin-project/lotus/chain/index" "github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
@ -175,19 +166,6 @@ var DaemonCmd = &cli.Command{
Name: "restore-config", Name: "restore-config",
Usage: "config file to use when restoring from backup", Usage: "config file to use when restoring from backup",
}, },
&cli.BoolFlag{
Name: "slash-consensus",
Usage: "Report consensus fault",
Value: false,
},
&cli.StringFlag{
Name: "slasher-sender",
Usage: "optionally specify the account to report consensus from",
},
&cli.StringFlag{
Name: "slashdb-dir",
Value: "slash watch db dir path",
},
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
isLite := cctx.Bool("lite") isLite := cctx.Bool("lite")
@ -428,19 +406,6 @@ var DaemonCmd = &cli.Command{
if err != nil { if err != nil {
return fmt.Errorf("failed to start json-rpc endpoint: %s", err) return fmt.Errorf("failed to start json-rpc endpoint: %s", err)
} }
if cctx.Bool("slash-consensus") {
if !cctx.IsSet("slashdb-dir") {
return fmt.Errorf("must supply path for slasher database with --slashdb-dir")
}
go func() {
err := slashConsensus(api, cctx.String("slashdb-dir"), cctx.String("slasher-sender"))
if err != nil {
panic("slashConsensus error: " + err.Error())
}
}()
}
// Monitor for shutdown. // Monitor for shutdown.
finishCh := node.MonitorShutdown(shutdownChan, finishCh := node.MonitorShutdown(shutdownChan,
node.ShutdownHandler{Component: "rpc server", StopFunc: rpcStopper}, node.ShutdownHandler{Component: "rpc server", StopFunc: rpcStopper},
@ -639,149 +604,6 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
return nil return nil
} }
func slashConsensus(a lapi.FullNode, p string, from string) error {
ctx := context.Background()
var fromAddr address.Address
ds, err := levelds.NewDatastore(p, &levelds.Options{
Compression: ldbopts.NoCompression,
NoSync: false,
Strict: ldbopts.StrictAll,
ReadOnly: false,
})
if err != nil {
return xerrors.Errorf("open leveldb: %w", err)
}
sf := slashfilter.New(ds)
if from == "" {
defaddr, err := a.WalletDefaultAddress(ctx)
if err != nil {
return err
}
fromAddr = defaddr
} else {
addr, err := address.NewFromString(from)
if err != nil {
return err
}
fromAddr = addr
}
blocks, err := a.SyncIncomingBlocks(ctx)
if err != nil {
return xerrors.Errorf("sync incoming blocks failed: %w", err)
}
for block := range blocks {
otherBlock, extraBlock, fault, err := slashFilterMinedBlock(ctx, sf, a, block)
if err != nil {
log.Errorf("slash detector errored: %s", err)
continue
}
if fault {
log.Errorf("<!!> SLASH FILTER DETECTED FAULT DUE TO BLOCKS %s and %s", otherBlock.Cid(), block.Cid())
bh1, err := cborutil.Dump(otherBlock)
if err != nil {
log.Errorf("could not dump otherblock:%s, err:%s", otherBlock.Cid(), err)
continue
}
bh2, err := cborutil.Dump(block)
if err != nil {
log.Errorf("could not dump block:%s, err:%s", block.Cid(), err)
continue
}
params := miner.ReportConsensusFaultParams{
BlockHeader1: bh1,
BlockHeader2: bh2,
}
if extraBlock != nil {
be, err := cborutil.Dump(extraBlock)
if err != nil {
log.Errorf("could not dump block:%s, err:%s", block.Cid(), err)
continue
}
params.BlockHeaderExtra = be
}
enc, err := actors.SerializeParams(&params)
if err != nil {
log.Errorf("could not serialize declare faults parameters: %s", err)
continue
}
for {
head, err := a.ChainHead(ctx)
if err != nil || head.Height() > block.Height {
break
}
time.Sleep(time.Second * 10)
}
message, err := a.MpoolPushMessage(ctx, &types.Message{
To: block.Miner,
From: fromAddr,
Value: types.NewInt(0),
Method: builtin.MethodsMiner.ReportConsensusFault,
Params: enc,
}, nil)
if err != nil {
log.Errorf("ReportConsensusFault to messagepool error:%s", err)
continue
}
log.Infof("ReportConsensusFault message CID:%s", message.Cid())
}
}
return err
}
func slashFilterMinedBlock(ctx context.Context, sf *slashfilter.SlashFilter, a lapi.FullNode, blockB *types.BlockHeader) (*types.BlockHeader, *types.BlockHeader, bool, error) {
blockC, err := a.ChainGetBlock(ctx, blockB.Parents[0])
if err != nil {
return nil, nil, false, xerrors.Errorf("chain get block error:%s", err)
}
blockACid, fault, err := sf.MinedBlock(ctx, blockB, blockC.Height)
if err != nil {
return nil, nil, false, xerrors.Errorf("slash filter check block error:%s", err)
}
if !fault {
return nil, nil, false, nil
}
blockA, err := a.ChainGetBlock(ctx, blockACid)
if err != nil {
return nil, nil, false, xerrors.Errorf("failed to get blockA: %w", err)
}
// (a) double-fork mining (2 blocks at one epoch)
if blockA.Height == blockB.Height {
return blockA, nil, true, nil
}
// (b) time-offset mining faults (2 blocks with the same parents)
if types.CidArrsEqual(blockB.Parents, blockA.Parents) {
return blockA, nil, true, nil
}
// (c) parent-grinding fault
// Here extra is the "witness", a third block that shows the connection between A and B as
// A's sibling and B's parent.
// Specifically, since A is of lower height, it must be that B was mined omitting A from its tipset
//
// B
// |
// [A, C]
if types.CidArrsEqual(blockA.Parents, blockC.Parents) && blockA.Height == blockC.Height &&
types.CidArrsContains(blockB.Parents, blockC.Cid()) && !types.CidArrsContains(blockB.Parents, blockA.Cid()) {
return blockA, blockC, true, nil
}
log.Error("unexpectedly reached end of slashFilterMinedBlock despite fault being reported!")
return nil, nil, false, nil
}
func removeExistingChain(cctx *cli.Context, lr repo.Repo) error { func removeExistingChain(cctx *cli.Context, lr repo.Repo) error {
lockedRepo, err := lr.Lock(repo.FullNode) lockedRepo, err := lr.Lock(repo.FullNode)
if err != nil { if err != nil {

View File

@ -75,9 +75,6 @@ OPTIONS:
--api-max-req-size value maximum API request size accepted by the JSON RPC server (default: 0) --api-max-req-size value maximum API request size accepted by the JSON RPC server (default: 0)
--restore value restore from backup file --restore value restore from backup file
--restore-config value config file to use when restoring from backup --restore-config value config file to use when restoring from backup
--slash-consensus Report consensus fault (default: false)
--slasher-sender value optionally specify the account to report consensus from
--slashdb-dir value (default: "slash watch db dir path")
--help, -h show help --help, -h show help
``` ```

View File

@ -399,3 +399,32 @@
#EnableMsgIndex = false #EnableMsgIndex = false
[FaultReporter]
# EnableConsensusFaultReporter controls whether the node will monitor and
# report consensus faults. When enabled, the node will watch for malicious
# behaviors like double-mining and parent grinding, and submit reports to the
# network. This can earn reporter rewards, but is not guaranteed. Nodes should
# enable fault reporting with care, as it may increase resource usage, and may
# generate gas fees without earning rewards.
#
# type: bool
# env var: LOTUS_FAULTREPORTER_ENABLECONSENSUSFAULTREPORTER
#EnableConsensusFaultReporter = false
# ConsensusFaultReporterDataDir is the path where fault reporter state will be
# persisted. This directory should have adequate space and permissions for the
# node process.
#
# type: string
# env var: LOTUS_FAULTREPORTER_CONSENSUSFAULTREPORTERDATADIR
#ConsensusFaultReporterDataDir = ""
# ConsensusFaultReporterAddress is the wallet address used for submitting
# ReportConsensusFault messages. It will pay for gas fees, and receive any
# rewards. This address should have adequate funds to cover gas fees.
#
# type: string
# env var: LOTUS_FAULTREPORTER_CONSENSUSFAULTREPORTERADDRESS
#ConsensusFaultReporterAddress = ""

View File

@ -128,6 +128,8 @@ const (
SetupFallbackBlockstoresKey SetupFallbackBlockstoresKey
GoRPCServer GoRPCServer
ConsensusReporterKey
SetApiEndpointKey SetApiEndpointKey
StoreEventsKey StoreEventsKey

View File

@ -280,6 +280,11 @@ func ConfigFullNode(c interface{}) Option {
// enable message index for full node when configured by the user, otherwise use dummy. // enable message index for full node when configured by the user, otherwise use dummy.
If(cfg.Index.EnableMsgIndex, Override(new(index.MsgIndex), modules.MsgIndex)), If(cfg.Index.EnableMsgIndex, Override(new(index.MsgIndex), modules.MsgIndex)),
If(!cfg.Index.EnableMsgIndex, Override(new(index.MsgIndex), modules.DummyMsgIndex)), If(!cfg.Index.EnableMsgIndex, Override(new(index.MsgIndex), modules.DummyMsgIndex)),
// enable fault reporter when configured by the user
If(cfg.FaultReporter.EnableConsensusFaultReporter,
Override(ConsensusReporterKey, modules.RunConsensusFaultReporter(cfg.FaultReporter)),
),
) )
} }

View File

@ -394,6 +394,35 @@ the database must already exist and be writeable. If a relative path is provided
relative to the CWD (current working directory).`, relative to the CWD (current working directory).`,
}, },
}, },
"FaultReporterConfig": []DocField{
{
Name: "EnableConsensusFaultReporter",
Type: "bool",
Comment: `EnableConsensusFaultReporter controls whether the node will monitor and
report consensus faults. When enabled, the node will watch for malicious
behaviors like double-mining and parent grinding, and submit reports to the
network. This can earn reporter rewards, but is not guaranteed. Nodes should
enable fault reporting with care, as it may increase resource usage, and may
generate gas fees without earning rewards.`,
},
{
Name: "ConsensusFaultReporterDataDir",
Type: "string",
Comment: `ConsensusFaultReporterDataDir is the path where fault reporter state will be
persisted. This directory should have adequate space and permissions for the
node process.`,
},
{
Name: "ConsensusFaultReporterAddress",
Type: "string",
Comment: `ConsensusFaultReporterAddress is the wallet address used for submitting
ReportConsensusFault messages. It will pay for gas fees, and receive any
rewards. This address should have adequate funds to cover gas fees.`,
},
},
"FeeConfig": []DocField{ "FeeConfig": []DocField{
{ {
Name: "DefaultMaxFee", Name: "DefaultMaxFee",
@ -465,6 +494,12 @@ Set to 0 to keep all mappings`,
Name: "Index", Name: "Index",
Type: "IndexConfig", Type: "IndexConfig",
Comment: ``,
},
{
Name: "FaultReporter",
Type: "FaultReporterConfig",
Comment: ``, Comment: ``,
}, },
}, },

View File

@ -29,6 +29,7 @@ type FullNode struct {
Cluster UserRaftConfig Cluster UserRaftConfig
Fevm FevmConfig Fevm FevmConfig
Index IndexConfig Index IndexConfig
FaultReporter FaultReporterConfig
} }
// // Common // // Common
@ -732,3 +733,23 @@ type IndexConfig struct {
// EnableMsgIndex enables indexing of messages on chain. // EnableMsgIndex enables indexing of messages on chain.
EnableMsgIndex bool EnableMsgIndex bool
} }
type FaultReporterConfig struct {
// EnableConsensusFaultReporter controls whether the node will monitor and
// report consensus faults. When enabled, the node will watch for malicious
// behaviors like double-mining and parent grinding, and submit reports to the
// network. This can earn reporter rewards, but is not guaranteed. Nodes should
// enable fault reporting with care, as it may increase resource usage, and may
// generate gas fees without earning rewards.
EnableConsensusFaultReporter bool
// ConsensusFaultReporterDataDir is the path where fault reporter state will be
// persisted. This directory should have adequate space and permissions for the
// node process.
ConsensusFaultReporterDataDir string
// ConsensusFaultReporterAddress is the wallet address used for submitting
// ReportConsensusFault messages. It will pay for gas fees, and receive any
// rewards. This address should have adequate funds to cover gas fees.
ConsensusFaultReporterAddress string
}

View File

@ -0,0 +1,27 @@
package modules
import (
"go.uber.org/fx"
"github.com/filecoin-project/lotus/chain/gen/slashfilter/slashsvc"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/impl/full"
"github.com/filecoin-project/lotus/node/modules/helpers"
)
type consensusReporterModules struct {
fx.In
full.WalletAPI
full.ChainAPI
full.MpoolAPI
full.SyncAPI
}
func RunConsensusFaultReporter(config config.FaultReporterConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, mod consensusReporterModules) error {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, mod consensusReporterModules) error {
ctx := helpers.LifecycleCtx(mctx, lc)
return slashsvc.SlashConsensus(ctx, &mod, config.ConsensusFaultReporterDataDir, config.ConsensusFaultReporterAddress)
}
}