we should always load worker address from the chain

This commit is contained in:
laser 2020-04-09 10:34:07 -07:00
parent 1c35692235
commit 5f3a3c76dd
4 changed files with 61 additions and 24 deletions

View File

@ -67,12 +67,7 @@ func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error {
// checkPrecommit checks that data commitment generated in the sealing process // checkPrecommit checks that data commitment generated in the sealing process
// matches pieces, and that the seal ticket isn't expired // matches pieces, and that the seal ticket isn't expired
func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, api SealingAPI) (err error) { func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, tok TipSetToken, height abi.ChainEpoch, api SealingAPI) (err error) {
tok, height, err := api.ChainHead(ctx)
if err != nil {
return &ErrApi{xerrors.Errorf("getting chain head: %w", err)}
}
commD, err := api.StateComputeDataCommitment(ctx, maddr, si.SectorType, si.dealIDs(), tok) commD, err := api.StateComputeDataCommitment(ctx, maddr, si.SectorType, si.dealIDs(), tok)
if err != nil { if err != nil {
return &ErrApi{xerrors.Errorf("calling StateComputeDataCommitment: %w", err)} return &ErrApi{xerrors.Errorf("calling StateComputeDataCommitment: %w", err)}
@ -89,12 +84,7 @@ func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, a
return nil return nil
} }
func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte) (err error) { func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte, tok TipSetToken) (err error) {
tok, _, err := m.api.ChainHead(ctx)
if err != nil {
return &ErrApi{xerrors.Errorf("getting chain head: %w", err)}
}
if si.SeedEpoch == 0 { if si.SeedEpoch == 0 {
return &ErrBadSeed{xerrors.Errorf("seed epoch was not set")} return &ErrBadSeed{xerrors.Errorf("seed epoch was not set")}
} }

View File

@ -31,6 +31,7 @@ type SealingAPI interface {
StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredProof, deals []abi.DealID, tok TipSetToken) (cid.Cid, error) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredProof, deals []abi.DealID, tok TipSetToken) (cid.Cid, error)
StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error) StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error)
StateMinerSectorSize(context.Context, address.Address, TipSetToken) (abi.SectorSize, error) StateMinerSectorSize(context.Context, address.Address, TipSetToken) (abi.SectorSize, error)
StateMinerWorkerAddress(ctx context.Context, maddr address.Address, tok TipSetToken) (address.Address, error)
StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, market.DealState, error) StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, market.DealState, error)
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, gasPrice big.Int, gasLimit int64, params []byte) (cid.Cid, error) SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, gasPrice big.Int, gasLimit int64, params []byte) (cid.Cid, error)
ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error) ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error)
@ -41,8 +42,7 @@ type Sealing struct {
api SealingAPI api SealingAPI
events Events events Events
maddr address.Address maddr address.Address
worker address.Address
sealer sectorstorage.SectorManager sealer sectorstorage.SectorManager
sectors *statemachine.StateGroup sectors *statemachine.StateGroup
@ -53,13 +53,12 @@ type Sealing struct {
pcp PreCommitPolicy pcp PreCommitPolicy
} }
func New(api SealingAPI, events Events, maddr address.Address, worker address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, tktFn TicketFn, pcp PreCommitPolicy) *Sealing { func New(api SealingAPI, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, tktFn TicketFn, pcp PreCommitPolicy) *Sealing {
s := &Sealing{ s := &Sealing{
api: api, api: api,
events: events, events: events,
maddr: maddr, maddr: maddr,
worker: worker,
sealer: sealer, sealer: sealer,
sc: sc, sc: sc,
verif: verif, verif: verif,

View File

@ -98,7 +98,19 @@ func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo)
} }
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
if err := checkPrecommit(ctx.Context(), m.Address(), sector, m.api); err != nil { tok, height, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
return nil
}
waddr, err := m.api.StateMinerWorkerAddress(ctx.Context(), m.maddr, tok)
if err != nil {
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
return nil
}
if err := checkPrecommit(ctx.Context(), m.Address(), sector, tok, height, m.api); err != nil {
switch err.(type) { switch err.(type) {
case *ErrApi: case *ErrApi:
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err) log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
@ -133,7 +145,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
} }
log.Info("submitting precommit for sector: ", sector.SectorNumber) log.Info("submitting precommit for sector: ", sector.SectorNumber)
mcid, err := m.api.SendMsg(ctx.Context(), m.worker, m.maddr, builtin.MethodsMiner.PreCommitSector, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes()) mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.PreCommitSector, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes())
if err != nil { if err != nil {
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
} }
@ -206,7 +218,13 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)}) return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
} }
if err := m.checkCommit(ctx.Context(), sector, proof); err != nil { tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}
if err := m.checkCommit(ctx.Context(), sector, proof, tok); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)}) return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
} }
@ -222,8 +240,14 @@ func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo)
return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", err)}) return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", err)})
} }
waddr, err := m.api.StateMinerWorkerAddress(ctx.Context(), m.maddr, tok)
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}
// TODO: check seed / ticket are up to date // TODO: check seed / ticket are up to date
mcid, err := m.api.SendMsg(ctx.Context(), m.worker, m.maddr, builtin.MethodsMiner.ProveCommitSector, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes()) mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.ProveCommitSector, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes())
if err != nil { if err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)}) return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
} }
@ -279,7 +303,19 @@ func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) erro
return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to serialize declare fault params: %w", err)}) return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to serialize declare fault params: %w", err)})
} }
mcid, err := m.api.SendMsg(ctx.Context(), m.worker, m.maddr, builtin.MethodsMiner.DeclareTemporaryFaults, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes()) tok, _, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err)
return nil
}
waddr, err := m.api.StateMinerWorkerAddress(ctx.Context(), m.maddr, tok)
if err != nil {
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
return nil
}
mcid, err := m.api.SendMsg(ctx.Context(), waddr, m.maddr, builtin.MethodsMiner.DeclareTemporaryFaults, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes())
if err != nil { if err != nil {
return xerrors.Errorf("failed to push declare faults message to network: %w", err) return xerrors.Errorf("failed to push declare faults message to network: %w", err)
} }

View File

@ -57,7 +57,13 @@ func (m *Sealing) handleSealFailed(ctx statemachine.Context, sector SectorInfo)
} }
func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
if err := checkPrecommit(ctx.Context(), m.Address(), sector, m.api); err != nil { tok, height, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err)
return nil
}
if err := checkPrecommit(ctx.Context(), m.Address(), sector, tok, height, m.api); err != nil {
switch err.(type) { switch err.(type) {
case *ErrApi: case *ErrApi:
log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err) log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err)
@ -114,7 +120,13 @@ func (m *Sealing) handleComputeProofFailed(ctx statemachine.Context, sector Sect
} }
func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
if err := checkPrecommit(ctx.Context(), m.maddr, sector, m.api); err != nil { tok, height, err := m.api.ChainHead(ctx.Context())
if err != nil {
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
return nil
}
if err := checkPrecommit(ctx.Context(), m.maddr, sector, tok, height, m.api); err != nil {
switch err.(type) { switch err.(type) {
case *ErrApi: case *ErrApi:
log.Errorf("handleCommitFailed: api error, not proceeding: %+v", err) log.Errorf("handleCommitFailed: api error, not proceeding: %+v", err)
@ -128,7 +140,7 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
} }
} }
if err := m.checkCommit(ctx.Context(), sector, sector.Proof); err != nil { if err := m.checkCommit(ctx.Context(), sector, sector.Proof, tok); err != nil {
switch err.(type) { switch err.(type) {
case *ErrApi: case *ErrApi:
log.Errorf("handleCommitFailed: api error, not proceeding: %+v", err) log.Errorf("handleCommitFailed: api error, not proceeding: %+v", err)