Merge pull request #4513 from filecoin-project/steb/feat-change-worker-key

Add commands to change the worker key
This commit is contained in:
Steven Allen 2020-10-26 18:41:04 -07:00 committed by GitHub
commit 89f6da14bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 419 additions and 53 deletions

View File

@ -40,6 +40,8 @@ var actorCmd = &cli.Command{
actorSetPeeridCmd,
actorSetOwnerCmd,
actorControl,
actorProposeChangeWorker,
actorConfirmChangeWorker,
},
}
@ -698,3 +700,221 @@ var actorSetOwnerCmd = &cli.Command{
return nil
},
}
var actorProposeChangeWorker = &cli.Command{
Name: "propose-change-worker",
Usage: "Propose a worker address change",
ArgsUsage: "[address]",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "really-do-it",
Usage: "Actually send transaction performing the action",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return fmt.Errorf("must pass address of new worker address")
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
api, acloser, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer acloser()
ctx := lcli.ReqContext(cctx)
na, err := address.NewFromString(cctx.Args().First())
if err != nil {
return err
}
newAddr, err := api.StateLookupID(ctx, na, types.EmptyTSK)
if err != nil {
return err
}
maddr, err := nodeApi.ActorAddress(ctx)
if err != nil {
return err
}
mi, err := api.StateMinerInfo(ctx, maddr, types.EmptyTSK)
if err != nil {
return err
}
if mi.NewWorker.Empty() {
if mi.Worker == newAddr {
return fmt.Errorf("worker address already set to %s", na)
}
} else {
if mi.NewWorker == newAddr {
return fmt.Errorf("change to worker address %s already pending", na)
}
}
if !cctx.Bool("really-do-it") {
fmt.Fprintln(cctx.App.Writer, "Pass --really-do-it to actually execute this action")
return nil
}
cwp := &miner2.ChangeWorkerAddressParams{
NewWorker: newAddr,
NewControlAddrs: mi.ControlAddresses,
}
sp, err := actors.SerializeParams(cwp)
if err != nil {
return xerrors.Errorf("serializing params: %w", err)
}
smsg, err := api.MpoolPushMessage(ctx, &types.Message{
From: mi.Owner,
To: maddr,
Method: miner.Methods.ChangeWorkerAddress,
Value: big.Zero(),
Params: sp,
}, nil)
if err != nil {
return xerrors.Errorf("mpool push: %w", err)
}
fmt.Fprintln(cctx.App.Writer, "Propose Message CID:", smsg.Cid())
// wait for it to get mined into a block
wait, err := api.StateWaitMsg(ctx, smsg.Cid(), build.MessageConfidence)
if err != nil {
return err
}
// check it executed successfully
if wait.Receipt.ExitCode != 0 {
fmt.Fprintln(cctx.App.Writer, "Propose worker change failed!")
return err
}
mi, err = api.StateMinerInfo(ctx, maddr, wait.TipSet)
if err != nil {
return err
}
if mi.NewWorker != newAddr {
return fmt.Errorf("Proposed worker address change not reflected on chain: expected '%s', found '%s'", na, mi.NewWorker)
}
fmt.Fprintf(cctx.App.Writer, "Worker key change to %s successfully proposed.\n", na)
fmt.Fprintf(cctx.App.Writer, "Call 'confirm-change-worker' at or after height %d to complete.\n", mi.WorkerChangeEpoch)
return nil
},
}
var actorConfirmChangeWorker = &cli.Command{
Name: "confirm-change-worker",
Usage: "Confirm a worker address change",
ArgsUsage: "[address]",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "really-do-it",
Usage: "Actually send transaction performing the action",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return fmt.Errorf("must pass address of new worker address")
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
api, acloser, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer acloser()
ctx := lcli.ReqContext(cctx)
na, err := address.NewFromString(cctx.Args().First())
if err != nil {
return err
}
newAddr, err := api.StateLookupID(ctx, na, types.EmptyTSK)
if err != nil {
return err
}
maddr, err := nodeApi.ActorAddress(ctx)
if err != nil {
return err
}
mi, err := api.StateMinerInfo(ctx, maddr, types.EmptyTSK)
if err != nil {
return err
}
if mi.NewWorker.Empty() {
return xerrors.Errorf("no worker key change proposed")
} else if mi.NewWorker != newAddr {
return xerrors.Errorf("worker key %s does not match current worker key proposal %s", newAddr, mi.NewWorker)
}
if head, err := api.ChainHead(ctx); err != nil {
return xerrors.Errorf("failed to get the chain head: %w", err)
} else if head.Height() < mi.WorkerChangeEpoch {
return xerrors.Errorf("worker key change cannot be confirmed until %d, current height is %d", mi.WorkerChangeEpoch, head.Height())
}
if !cctx.Bool("really-do-it") {
fmt.Fprintln(cctx.App.Writer, "Pass --really-do-it to actually execute this action")
return nil
}
smsg, err := api.MpoolPushMessage(ctx, &types.Message{
From: mi.Owner,
To: maddr,
Method: miner.Methods.ConfirmUpdateWorkerKey,
Value: big.Zero(),
}, nil)
if err != nil {
return xerrors.Errorf("mpool push: %w", err)
}
fmt.Fprintln(cctx.App.Writer, "Confirm Message CID:", smsg.Cid())
// wait for it to get mined into a block
wait, err := api.StateWaitMsg(ctx, smsg.Cid(), build.MessageConfidence)
if err != nil {
return err
}
// check it executed successfully
if wait.Receipt.ExitCode != 0 {
fmt.Fprintln(cctx.App.Writer, "Worker change failed!")
return err
}
mi, err = api.StateMinerInfo(ctx, maddr, wait.TipSet)
if err != nil {
return err
}
if mi.Worker != newAddr {
return fmt.Errorf("Confirmed worker address change not reflected on chain: expected '%s', found '%s'", newAddr, mi.Worker)
}
return nil
},
}

View File

@ -0,0 +1,164 @@
package main
import (
"bytes"
"context"
"flag"
"fmt"
"regexp"
"strconv"
"sync/atomic"
"testing"
"time"
logging "github.com/ipfs/go-log/v2"
"github.com/stretchr/testify/require"
"github.com/urfave/cli/v2"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api/test"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/node/repo"
builder "github.com/filecoin-project/lotus/node/test"
)
func TestWorkerKeyChange(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_ = logging.SetLogLevel("*", "INFO")
policy.SetConsensusMinerMinPower(abi.NewStoragePower(2048))
policy.SetSupportedProofTypes(abi.RegisteredSealProof_StackedDrg2KiBV1)
policy.SetMinVerifiedDealSize(abi.NewStoragePower(256))
lotuslog.SetupLogLevels()
logging.SetLogLevel("miner", "ERROR")
logging.SetLogLevel("chainstore", "ERROR")
logging.SetLogLevel("chain", "ERROR")
logging.SetLogLevel("pubsub", "ERROR")
logging.SetLogLevel("sub", "ERROR")
logging.SetLogLevel("storageminer", "ERROR")
blocktime := 1 * time.Millisecond
n, sn := builder.MockSbBuilder(t, []test.FullNodeOpts{test.FullNodeWithUpgradeAt(1), test.FullNodeWithUpgradeAt(1)}, test.OneMiner)
client1 := n[0]
client2 := n[1]
// Connect the nodes.
addrinfo, err := client1.NetAddrsListen(ctx)
require.NoError(t, err)
err = client2.NetConnect(ctx, addrinfo)
require.NoError(t, err)
output := bytes.NewBuffer(nil)
run := func(cmd *cli.Command, args ...string) error {
app := cli.NewApp()
app.Metadata = map[string]interface{}{
"repoType": repo.StorageMiner,
"testnode-full": n[0],
"testnode-storage": sn[0],
}
app.Writer = output
build.RunningNodeType = build.NodeMiner
fs := flag.NewFlagSet("", flag.ContinueOnError)
for _, f := range cmd.Flags {
if err := f.Apply(fs); err != nil {
return err
}
}
require.NoError(t, fs.Parse(args))
cctx := cli.NewContext(app, fs, nil)
return cmd.Action(cctx)
}
// setup miner
mine := int64(1)
done := make(chan struct{})
go func() {
defer close(done)
for atomic.LoadInt64(&mine) == 1 {
time.Sleep(blocktime)
if err := sn[0].MineOne(ctx, test.MineNext); err != nil {
t.Error(err)
}
}
}()
defer func() {
atomic.AddInt64(&mine, -1)
fmt.Println("shutting down mining")
<-done
}()
newKey, err := client1.WalletNew(ctx, types.KTBLS)
require.NoError(t, err)
// Initialize wallet.
test.SendFunds(ctx, t, client1, newKey, abi.NewTokenAmount(0))
require.NoError(t, run(actorProposeChangeWorker, "--really-do-it", newKey.String()))
result := output.String()
output.Reset()
require.Contains(t, result, fmt.Sprintf("Worker key change to %s successfully proposed.", newKey))
epochRe := regexp.MustCompile("at or after height (?P<epoch>[0-9]+) to complete")
matches := epochRe.FindStringSubmatch(result)
require.NotNil(t, matches)
targetEpoch, err := strconv.Atoi(matches[1])
require.NoError(t, err)
require.NotZero(t, targetEpoch)
// Too early.
require.Error(t, run(actorConfirmChangeWorker, "--really-do-it", newKey.String()))
output.Reset()
for {
head, err := client1.ChainHead(ctx)
require.NoError(t, err)
if head.Height() >= abi.ChainEpoch(targetEpoch) {
break
}
build.Clock.Sleep(10 * blocktime)
}
require.NoError(t, run(actorConfirmChangeWorker, "--really-do-it", newKey.String()))
output.Reset()
head, err := client1.ChainHead(ctx)
require.NoError(t, err)
// Wait for finality (worker key switch).
targetHeight := head.Height() + policy.ChainFinality
for {
head, err := client1.ChainHead(ctx)
require.NoError(t, err)
if head.Height() >= targetHeight {
break
}
build.Clock.Sleep(10 * blocktime)
}
// Make sure the other node can catch up.
for i := 0; i < 20; i++ {
head, err := client2.ChainHead(ctx)
require.NoError(t, err)
if head.Height() >= targetHeight {
return
}
build.Clock.Sleep(10 * blocktime)
}
t.Fatal("failed to reach target epoch on the second miner")
}

View File

@ -186,22 +186,12 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st
ctx := helpers.LifecycleCtx(mctx, lc)
mi, err := api.StateMinerInfo(ctx, maddr, types.EmptyTSK)
fps, err := storage.NewWindowedPoStScheduler(api, fc, sealer, sealer, j, maddr)
if err != nil {
return nil, err
}
worker, err := api.StateAccountKey(ctx, mi.Worker, types.EmptyTSK)
if err != nil {
return nil, err
}
fps, err := storage.NewWindowedPoStScheduler(api, fc, sealer, sealer, j, maddr, worker)
if err != nil {
return nil, err
}
sm, err := storage.NewMiner(api, maddr, worker, h, ds, sealer, sc, verif, gsd, fc, j)
sm, err := storage.NewMiner(api, maddr, h, ds, sealer, sc, verif, gsd, fc, j)
if err != nil {
return nil, err
}

View File

@ -49,8 +49,7 @@ type Miner struct {
sc sealing.SectorIDCounter
verif ffiwrapper.Verifier
maddr address.Address
worker address.Address
maddr address.Address
getSealConfig dtypes.GetSealingConfigFunc
sealing *sealing.Sealing
@ -111,7 +110,7 @@ type storageMinerApi interface {
WalletHas(context.Context, address.Address) (bool, error)
}
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig, journal journal.Journal) (*Miner, error) {
func NewMiner(api storageMinerApi, maddr address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig, journal journal.Journal) (*Miner, error) {
m := &Miner{
api: api,
feeCfg: feeCfg,
@ -122,7 +121,6 @@ func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, d
verif: verif,
maddr: maddr,
worker: worker,
getSealConfig: gsd,
journal: journal,
sealingEvtType: journal.RegisterEventType("storage", "sealing_states"),
@ -174,7 +172,17 @@ func (m *Miner) Stop(ctx context.Context) error {
}
func (m *Miner) runPreflightChecks(ctx context.Context) error {
has, err := m.api.WalletHas(ctx, m.worker)
mi, err := m.api.StateMinerInfo(ctx, m.maddr, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("failed to resolve miner info: %w", err)
}
workerKey, err := m.api.StateAccountKey(ctx, mi.Worker, types.EmptyTSK)
if err != nil {
return xerrors.Errorf("failed to resolve worker key: %w", err)
}
has, err := m.api.WalletHas(ctx, workerKey)
if err != nil {
return xerrors.Errorf("failed to check wallet for worker key: %w", err)
}
@ -183,7 +191,7 @@ func (m *Miner) runPreflightChecks(ctx context.Context) error {
return errors.New("key for worker not found in local wallet")
}
log.Infof("starting up miner %s, worker addr %s", m.maddr, m.worker)
log.Infof("starting up miner %s, worker addr %s", m.maddr, workerKey)
return nil
}

View File

@ -292,13 +292,14 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin
msg := &types.Message{
To: s.actor,
From: s.worker,
Method: miner.Methods.DeclareFaultsRecovered,
Params: enc,
Value: types.NewInt(0),
}
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
s.setSender(ctx, msg, spec)
if err := s.setSender(ctx, msg, spec); err != nil {
return recoveries, nil, err
}
sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)})
if err != nil {
@ -376,13 +377,14 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64,
msg := &types.Message{
To: s.actor,
From: s.worker,
Method: miner.Methods.DeclareFaults,
Params: enc,
Value: types.NewInt(0), // TODO: Is there a fee?
}
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
s.setSender(ctx, msg, spec)
if err := s.setSender(ctx, msg, spec); err != nil {
return faults, nil, err
}
sm, err := s.api.MpoolPushMessage(ctx, msg, spec)
if err != nil {
@ -716,13 +718,14 @@ func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.Submi
msg := &types.Message{
To: s.actor,
From: s.worker,
Method: miner.Methods.SubmitWindowedPoSt,
Params: enc,
Value: types.NewInt(0),
}
spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}
s.setSender(ctx, msg, spec)
if err := s.setSender(ctx, msg, spec); err != nil {
return nil, err
}
// TODO: consider maybe caring about the output
sm, err := s.api.MpoolPushMessage(ctx, msg, spec)
@ -750,33 +753,18 @@ func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.Submi
return sm, nil
}
func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) {
func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) error {
mi, err := s.api.StateMinerInfo(ctx, s.actor, types.EmptyTSK)
if err != nil {
log.Errorw("error getting miner info", "error", err)
// better than just failing
msg.From = s.worker
return
}
worker, err := s.api.StateAccountKey(ctx, mi.Worker, types.EmptyTSK)
if err != nil {
log.Errorw("error getting account key", "error", err)
msg.From = s.worker
return
}
// worker should keep sync with state on chain, if not, will met error when estimate msg gas.
if s.worker != worker {
s.worker = worker
msg.From = s.worker
return xerrors.Errorf("error getting miner info: %w", err)
}
// use the worker as a fallback
msg.From = mi.Worker
gm, err := s.api.GasEstimateMessageGas(ctx, msg, spec, types.EmptyTSK)
if err != nil {
log.Errorw("estimating gas", "error", err)
msg.From = s.worker
return
return nil
}
*msg = *gm
@ -785,9 +773,9 @@ func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message,
pa, err := AddressFor(ctx, s.api, mi, PoStAddr, minFunds)
if err != nil {
log.Errorw("error selecting address for window post", "error", err)
msg.From = s.worker
return
return nil
}
msg.From = pa
return nil
}

View File

@ -128,7 +128,6 @@ func TestWDPostDoPost(t *testing.T) {
proofType := abi.RegisteredPoStProof_StackedDrgWindow2KiBV1
postAct := tutils.NewIDAddr(t, 100)
workerAct := tutils.NewIDAddr(t, 101)
mockStgMinerAPI := newMockStorageMinerAPI()
@ -169,7 +168,6 @@ func TestWDPostDoPost(t *testing.T) {
faultTracker: &mockFaultTracker{},
proofType: proofType,
actor: postAct,
worker: workerAct,
journal: journal.NilJournal(),
}

View File

@ -31,8 +31,7 @@ type WindowPoStScheduler struct {
partitionSectors uint64
ch *changeHandler
actor address.Address
worker address.Address
actor address.Address
evtTypes [4]journal.EventType
journal journal.Journal
@ -41,7 +40,7 @@ type WindowPoStScheduler struct {
// failLk sync.Mutex
}
func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb storage.Prover, ft sectorstorage.FaultTracker, j journal.Journal, actor address.Address, worker address.Address) (*WindowPoStScheduler, error) {
func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb storage.Prover, ft sectorstorage.FaultTracker, j journal.Journal, actor address.Address) (*WindowPoStScheduler, error) {
mi, err := api.StateMinerInfo(context.TODO(), actor, types.EmptyTSK)
if err != nil {
return nil, xerrors.Errorf("getting sector size: %w", err)
@ -60,8 +59,7 @@ func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb
proofType: rt,
partitionSectors: mi.WindowPoStPartitionSectors,
actor: actor,
worker: worker,
actor: actor,
evtTypes: [...]journal.EventType{
evtTypeWdPoStScheduler: j.RegisterEventType("wdpost", "scheduler"),
evtTypeWdPoStProofs: j.RegisterEventType("wdpost", "proofs_processed"),