Merge pull request #5379 from filecoin-project/asr/disputer

Build a WindowPoSt disputer
This commit is contained in:
Łukasz Magiera 2021-01-26 00:46:26 +01:00 committed by GitHub
commit 422f0991e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 459 additions and 2 deletions

View File

@ -949,7 +949,8 @@ const (
)
type Deadline struct {
PostSubmissions bitfield.BitField
PostSubmissions bitfield.BitField
DisputableProofCount uint64
}
type Partition struct {

View File

@ -107,6 +107,7 @@ type Deadline interface {
PartitionsPoSted() (bitfield.BitField, error)
PartitionsChanged(Deadline) (bool, error)
DisputableProofCount() (uint64, error)
}
type Partition interface {

View File

@ -391,6 +391,11 @@ func (d *deadline0) PartitionsPoSted() (bitfield.BitField, error) {
return d.Deadline.PostSubmissions, nil
}
func (d *deadline0) DisputableProofCount() (uint64, error) {
// field doesn't exist until v3
return 0, nil
}
func (p *partition0) AllSectors() (bitfield.BitField, error) {
return p.Partition.Sectors, nil
}

View File

@ -390,6 +390,11 @@ func (d *deadline2) PartitionsPoSted() (bitfield.BitField, error) {
return d.Deadline.PostSubmissions, nil
}
func (d *deadline2) DisputableProofCount() (uint64, error) {
// field doesn't exist until v3
return 0, nil
}
func (p *partition2) AllSectors() (bitfield.BitField, error) {
return p.Partition.Sectors, nil
}

View File

@ -386,6 +386,15 @@ func (d *deadline3) PartitionsPoSted() (bitfield.BitField, error) {
return d.Deadline.PartitionsPoSted, nil
}
func (d *deadline3) DisputableProofCount() (uint64, error) {
ops, err := d.OptimisticProofsSnapshotArray(d.store)
if err != nil {
return 0, err
}
return ops.Length(), nil
}
func (p *partition3) AllSectors() (bitfield.BitField, error) {
return p.Partition.Sectors, nil
}

View File

@ -58,6 +58,7 @@ var chainCmd = &cli.Command{
chainInspectUsage,
chainDecodeCmd,
chainEncodeCmd,
chainDisputeSetCmd,
},
}

429
cli/disputer.go Normal file
View File

@ -0,0 +1,429 @@
package cli
import (
"context"
"fmt"
"strconv"
"time"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/actors"
miner3 "github.com/filecoin-project/specs-actors/v3/actors/builtin/miner"
"github.com/filecoin-project/go-state-types/big"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
builtin3 "github.com/filecoin-project/specs-actors/v3/actors/builtin"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/store"
"github.com/urfave/cli/v2"
)
const Confidence = 10
type minerDeadline struct {
miner address.Address
index uint64
}
var chainDisputeSetCmd = &cli.Command{
Name: "disputer",
Usage: "interact with the window post disputer",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "max-fee",
Usage: "Spend up to X FIL per DisputeWindowedPoSt message",
},
&cli.StringFlag{
Name: "from",
Usage: "optionally specify the account to send messages from",
},
},
Subcommands: []*cli.Command{
disputerStartCmd,
disputerMsgCmd,
},
}
var disputerMsgCmd = &cli.Command{
Name: "dispute",
Usage: "Send a specific DisputeWindowedPoSt message",
ArgsUsage: "[minerAddress index postIndex]",
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 3 {
fmt.Println("Usage: dispute [minerAddress index postIndex]")
return nil
}
ctx := ReqContext(cctx)
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
toa, err := address.NewFromString(cctx.Args().First())
if err != nil {
return fmt.Errorf("given 'miner' address %q was invalid: %w", cctx.Args().First(), err)
}
deadline, err := strconv.ParseUint(cctx.Args().Get(1), 10, 64)
if err != nil {
return err
}
postIndex, err := strconv.ParseUint(cctx.Args().Get(2), 10, 64)
if err != nil {
return err
}
fromAddr, err := getSender(ctx, api, cctx.String("from"))
if err != nil {
return err
}
dpp, aerr := actors.SerializeParams(&miner3.DisputeWindowedPoStParams{
Deadline: deadline,
PoStIndex: postIndex,
})
if aerr != nil {
return xerrors.Errorf("failed to serailize params: %w", aerr)
}
dmsg := &types.Message{
To: toa,
From: fromAddr,
Value: big.Zero(),
Method: builtin3.MethodsMiner.DisputeWindowedPoSt,
Params: dpp,
}
rslt, err := api.StateCall(ctx, dmsg, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("failed to simulate dispute: %w", err)
}
if rslt.MsgRct.ExitCode == 0 {
mss, err := getMaxFee(cctx.String("max-fee"))
if err != nil {
return err
}
sm, err := api.MpoolPushMessage(ctx, dmsg, mss)
if err != nil {
return err
}
fmt.Println("dispute message ", sm.Cid())
} else {
fmt.Println("dispute is unsuccessful")
}
return nil
},
}
var disputerStartCmd = &cli.Command{
Name: "start",
Usage: "Start the window post disputer",
ArgsUsage: "[minerAddress]",
Flags: []cli.Flag{
&cli.Uint64Flag{
Name: "start-epoch",
Usage: "only start disputing PoSts after this epoch ",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
fromAddr, err := getSender(ctx, api, cctx.String("from"))
if err != nil {
return err
}
mss, err := getMaxFee(cctx.String("max-fee"))
if err != nil {
return err
}
startEpoch := abi.ChainEpoch(0)
if cctx.IsSet("height") {
startEpoch = abi.ChainEpoch(cctx.Uint64("height"))
}
fmt.Println("checking sync status")
if err := SyncWait(ctx, api, false); err != nil {
return xerrors.Errorf("sync wait: %w", err)
}
fmt.Println("setting up window post disputer")
// subscribe to head changes and validate the current value
headChanges, err := api.ChainNotify(ctx)
if err != nil {
return err
}
head, ok := <-headChanges
if !ok {
return xerrors.Errorf("Notify stream was invalid")
}
if len(head) != 1 {
return xerrors.Errorf("Notify first entry should have been one item")
}
if head[0].Type != store.HCCurrent {
return xerrors.Errorf("expected current head on Notify stream (got %s)", head[0].Type)
}
lastEpoch := head[0].Val.Height()
lastStatusCheckEpoch := lastEpoch
// build initial deadlineMap
minerList, err := api.StateListMiners(ctx, types.EmptyTSK)
if err != nil {
return err
}
knownMiners := make(map[address.Address]struct{})
deadlineMap := make(map[abi.ChainEpoch][]minerDeadline)
for _, miner := range minerList {
dClose, dl, err := makeMinerDeadline(ctx, api, miner)
if err != nil {
return xerrors.Errorf("making deadline: %w", err)
}
deadlineMap[dClose+Confidence] = append(deadlineMap[dClose+Confidence], *dl)
knownMiners[miner] = struct{}{}
}
// when this fires, check for newly created miners, and purge any "missed" epochs from deadlineMap
statusCheckTicker := time.NewTicker(time.Hour)
defer statusCheckTicker.Stop()
fmt.Println("starting up window post disputer")
applyTsk := func(tsk types.TipSetKey) error {
log.Infof("last checked height: %d", lastEpoch)
dls, ok := deadlineMap[lastEpoch]
delete(deadlineMap, lastEpoch)
if !ok || startEpoch >= lastEpoch {
// no deadlines closed at this epoch - Confidence, or we haven't reached the start cutoff yet
return nil
}
dpmsgs := make([]*types.Message, 0)
// TODO: Parallelizeable
for _, dl := range dls {
fullDeadlines, err := api.StateMinerDeadlines(ctx, dl.miner, tsk)
if err != nil {
return xerrors.Errorf("failed to load deadlines: %w", err)
}
if int(dl.index) >= len(fullDeadlines) {
return xerrors.Errorf("deadline index %d not found in deadlines", dl.index)
}
ms, err := makeDisputeWindowedPosts(ctx, api, dl, fullDeadlines[dl.index].DisputableProofCount, fromAddr)
if err != nil {
return xerrors.Errorf("failed to check for disputes: %w", err)
}
dpmsgs = append(dpmsgs, ms...)
dClose, dl, err := makeMinerDeadline(ctx, api, dl.miner)
if err != nil {
return xerrors.Errorf("making deadline: %w", err)
}
deadlineMap[dClose+Confidence] = append(deadlineMap[dClose+Confidence], *dl)
}
// TODO: Parallelizeable / can be integrated into the previous deadline-iterating for loop
for _, dpmsg := range dpmsgs {
log.Infof("disputing a PoSt from miner %s", dpmsg.To)
m, err := api.MpoolPushMessage(ctx, dpmsg, mss)
if err != nil {
log.Infof("failed to dispute post message: %s", err.Error())
} else {
log.Infof("disputed a PoSt in message: %s", m.Cid())
}
}
return nil
}
disputeLoop := func() error {
select {
case notif, ok := <-headChanges:
if !ok {
return xerrors.Errorf("head change channel errored")
}
for _, val := range notif {
switch val.Type {
case store.HCApply:
for ; lastEpoch <= val.Val.Height(); lastEpoch++ {
err := applyTsk(val.Val.Key())
if err != nil {
return err
}
}
case store.HCRevert:
// do nothing
default:
return xerrors.Errorf("unexpected head change type %s", val.Type)
}
}
case <-statusCheckTicker.C:
log.Infof("Running status check: ")
minerList, err = api.StateListMiners(ctx, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("getting miner list: %w", err)
}
for _, m := range minerList {
_, ok := knownMiners[m]
if !ok {
dClose, dl, err := makeMinerDeadline(ctx, api, m)
if err != nil {
return xerrors.Errorf("making deadline: %w", err)
}
deadlineMap[dClose+Confidence] = append(deadlineMap[dClose+Confidence], *dl)
knownMiners[m] = struct{}{}
}
}
for ; lastStatusCheckEpoch < lastEpoch; lastStatusCheckEpoch++ {
// if an epoch got "skipped" from the deadlineMap somehow, just fry it now instead of letting it sit around forever
_, ok := deadlineMap[lastStatusCheckEpoch]
if ok {
log.Infof("epoch %d was skipped during execution, deleting it from deadlineMap")
delete(deadlineMap, lastStatusCheckEpoch)
}
}
log.Infof("Status check complete")
case <-ctx.Done():
return xerrors.Errorf("context cancelled")
}
return nil
}
for {
err := disputeLoop()
if err != nil {
fmt.Println("disputer shutting down: ", err)
break
}
}
return nil
},
}
// for a given miner, index, and maxPostIndex, tries to dispute posts from 0...postsSnapshotted-1
// returns a list of DisputeWindowedPoSt msgs that are expected to succeed if sent
func makeDisputeWindowedPosts(ctx context.Context, api lapi.FullNode, dl minerDeadline, postsSnapshotted uint64, sender address.Address) ([]*types.Message, error) {
disputes := make([]*types.Message, 0)
for i := uint64(0); i < postsSnapshotted; i++ {
dpp, aerr := actors.SerializeParams(&miner3.DisputeWindowedPoStParams{
Deadline: dl.index,
PoStIndex: i,
})
if aerr != nil {
return nil, xerrors.Errorf("failed to serailize params: %w", aerr)
}
dispute := &types.Message{
To: dl.miner,
From: sender,
Value: big.Zero(),
Method: builtin3.MethodsMiner.DisputeWindowedPoSt,
Params: dpp,
}
rslt, err := api.StateCall(ctx, dispute, types.EmptyTSK)
if err == nil && rslt.MsgRct.ExitCode == 0 {
disputes = append(disputes, dispute)
}
}
return disputes, nil
}
func makeMinerDeadline(ctx context.Context, api lapi.FullNode, mAddr address.Address) (abi.ChainEpoch, *minerDeadline, error) {
dl, err := api.StateMinerProvingDeadline(ctx, mAddr, types.EmptyTSK)
if err != nil {
return -1, nil, xerrors.Errorf("getting proving index list: %w", err)
}
return dl.Close, &minerDeadline{
miner: mAddr,
index: dl.Index,
}, nil
}
func getSender(ctx context.Context, api lapi.FullNode, fromStr string) (address.Address, error) {
if fromStr == "" {
return api.WalletDefaultAddress(ctx)
}
addr, err := address.NewFromString(fromStr)
if err != nil {
return address.Undef, err
}
has, err := api.WalletHas(ctx, addr)
if err != nil {
return address.Undef, err
}
if !has {
return address.Undef, xerrors.Errorf("wallet doesn't contain: %s ", addr)
}
return addr, nil
}
func getMaxFee(maxStr string) (*lapi.MessageSendSpec, error) {
if maxStr != "" {
maxFee, err := types.ParseFIL(maxStr)
if err != nil {
return nil, xerrors.Errorf("parsing max-fee: %w", err)
}
return &lapi.MessageSendSpec{
MaxFee: types.BigInt(maxFee),
}, nil
}
return nil, nil
}

View File

@ -170,8 +170,14 @@ func (a *StateAPI) StateMinerDeadlines(ctx context.Context, m address.Address, t
return err
}
l, err := dl.DisputableProofCount()
if err != nil {
return err
}
out[i] = api.Deadline{
PostSubmissions: ps,
PostSubmissions: ps,
DisputableProofCount: l,
}
return nil
}); err != nil {