lpseal: Poller for PoRep
This commit is contained in:
parent
41f80a66f9
commit
f2e1b5a35c
@ -49,6 +49,7 @@ create table sectors_sdr_pipeline (
|
|||||||
-- Commit (PoRep snark)
|
-- Commit (PoRep snark)
|
||||||
task_id_porep bigint,
|
task_id_porep bigint,
|
||||||
porep_proof bytea,
|
porep_proof bytea,
|
||||||
|
after_porep bool not null default false,
|
||||||
|
|
||||||
-- Commit message sending
|
-- Commit message sending
|
||||||
commit_msg_cid text,
|
commit_msg_cid text,
|
||||||
@ -58,8 +59,6 @@ create table sectors_sdr_pipeline (
|
|||||||
|
|
||||||
-- Commit message wait
|
-- Commit message wait
|
||||||
commit_msg_tsk bytea,
|
commit_msg_tsk bytea,
|
||||||
|
|
||||||
task_id_commit_msg_wait bigint,
|
|
||||||
after_commit_msg_success bool not null default false,
|
after_commit_msg_success bool not null default false,
|
||||||
|
|
||||||
-- Failure handling
|
-- Failure handling
|
||||||
|
@ -28,9 +28,12 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const sealPollerInterval = 10 * time.Second
|
const sealPollerInterval = 10 * time.Second
|
||||||
|
const seedEpochConfidence = 3
|
||||||
|
|
||||||
type SealPollerAPI interface {
|
type SealPollerAPI interface {
|
||||||
StateSectorPreCommitInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (*miner.SectorPreCommitOnChainInfo, error)
|
StateSectorPreCommitInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (*miner.SectorPreCommitOnChainInfo, error)
|
||||||
|
StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
|
||||||
|
ChainHead(context.Context) (*types.TipSet, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type SealPoller struct {
|
type SealPoller struct {
|
||||||
@ -83,16 +86,17 @@ func (s *SealPoller) poll(ctx context.Context) error {
|
|||||||
TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"`
|
TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"`
|
||||||
AfterPrecommitMsg bool `db:"after_precommit_msg"`
|
AfterPrecommitMsg bool `db:"after_precommit_msg"`
|
||||||
|
|
||||||
AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"`
|
AfterPrecommitMsgSuccess bool `db:"after_precommit_msg_success"`
|
||||||
|
SeedEpoch *int64 `db:"seed_epoch"`
|
||||||
|
|
||||||
TaskPoRep *int64 `db:"task_id_porep"`
|
TaskPoRep *int64 `db:"task_id_porep"`
|
||||||
PoRepProof []byte `db:"porep_proof"`
|
PoRepProof []byte `db:"porep_proof"`
|
||||||
|
AfterPoRep bool `db:"after_porep"`
|
||||||
|
|
||||||
TaskCommitMsg *int64 `db:"task_id_commit_msg"`
|
TaskCommitMsg *int64 `db:"task_id_commit_msg"`
|
||||||
AfterCommitMsg bool `db:"after_commit_msg"`
|
AfterCommitMsg bool `db:"after_commit_msg"`
|
||||||
|
|
||||||
TaskCommitMsgWait *int64 `db:"task_id_commit_msg_wait"`
|
AfterCommitMsgSuccess bool `db:"after_commit_msg_success"`
|
||||||
AfterCommitMsgSuccess bool `db:"after_commit_msg_success"`
|
|
||||||
|
|
||||||
Failed bool `db:"failed"`
|
Failed bool `db:"failed"`
|
||||||
FailedReason string `db:"failed_reason"`
|
FailedReason string `db:"failed_reason"`
|
||||||
@ -105,10 +109,10 @@ func (s *SealPoller) poll(ctx context.Context) error {
|
|||||||
task_id_tree_c, after_tree_c,
|
task_id_tree_c, after_tree_c,
|
||||||
task_id_tree_r, after_tree_r,
|
task_id_tree_r, after_tree_r,
|
||||||
task_id_precommit_msg, after_precommit_msg,
|
task_id_precommit_msg, after_precommit_msg,
|
||||||
after_precommit_msg_success,
|
after_precommit_msg_success, seed_epoch,
|
||||||
task_id_porep, porep_proof,
|
task_id_porep, porep_proof, after_porep,
|
||||||
task_id_commit_msg, after_commit_msg,
|
task_id_commit_msg, after_commit_msg,
|
||||||
task_id_commit_msg_wait, after_commit_msg_success,
|
after_commit_msg_success,
|
||||||
failed, failed_reason
|
failed, failed_reason
|
||||||
FROM sectors_sdr_pipeline WHERE after_commit_msg_success != true`)
|
FROM sectors_sdr_pipeline WHERE after_commit_msg_success != true`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -163,8 +167,6 @@ func (s *SealPoller) poll(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if task.TaskPrecommitMsg != nil && !task.AfterPrecommitMsgSuccess {
|
if task.TaskPrecommitMsg != nil && !task.AfterPrecommitMsgSuccess {
|
||||||
// join pipeline precommit_msg_cid with message_waits signed_message_cid, if executed_tsk_epoch is not null, return result
|
|
||||||
|
|
||||||
var execResult []struct {
|
var execResult []struct {
|
||||||
ExecutedTskCID string `db:"executed_tsk_cid"`
|
ExecutedTskCID string `db:"executed_tsk_cid"`
|
||||||
ExecutedTskEpoch int64 `db:"executed_tsk_epoch"`
|
ExecutedTskEpoch int64 `db:"executed_tsk_epoch"`
|
||||||
@ -208,9 +210,82 @@ func (s *SealPoller) poll(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
todoWaitSeed := false
|
ts, err := s.api.ChainHead(ctx)
|
||||||
if task.TaskPoRep != nil && todoWaitSeed {
|
if err != nil {
|
||||||
// todo start porep task
|
return xerrors.Errorf("getting chain head: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.pollers[pollerPoRep].IsSet() && task.AfterPrecommitMsgSuccess && task.SeedEpoch != nil && task.TaskPoRep == nil && ts.Height() >= abi.ChainEpoch(*task.SeedEpoch+seedEpochConfidence) {
|
||||||
|
s.pollers[pollerPoRep].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
|
||||||
|
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_porep = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_porep is null`, id, task.SpID, task.SectorNumber)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
|
||||||
|
}
|
||||||
|
if n != 1 {
|
||||||
|
return false, xerrors.Errorf("expected to update 1 row, updated %d", n)
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if task.AfterPoRep && len(task.PoRepProof) > 0 && task.TaskPrecommitMsg == nil {
|
||||||
|
s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
|
||||||
|
n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 and task_id_commit_msg is null`, id, task.SpID, task.SectorNumber)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
|
||||||
|
}
|
||||||
|
if n != 1 {
|
||||||
|
return false, xerrors.Errorf("expected to update 1 row, updated %d", n)
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if task.AfterCommitMsg && !task.AfterCommitMsgSuccess {
|
||||||
|
var execResult []struct {
|
||||||
|
ExecutedTskCID string `db:"executed_tsk_cid"`
|
||||||
|
ExecutedTskEpoch int64 `db:"executed_tsk_epoch"`
|
||||||
|
ExecutedMsgCID string `db:"executed_msg_cid"`
|
||||||
|
|
||||||
|
ExecutedRcptExitCode int64 `db:"executed_rcpt_exitcode"`
|
||||||
|
ExecutedRcptGasUsed int64 `db:"executed_rcpt_gas_used"`
|
||||||
|
}
|
||||||
|
|
||||||
|
err := s.db.Select(ctx, &execResult, `SELECT executed_tsk_cid, executed_tsk_epoch, executed_msg_cid, executed_rcpt_exitcode, executed_rcpt_gas_used
|
||||||
|
FROM sectors_sdr_pipeline
|
||||||
|
JOIN message_waits ON sectors_sdr_pipeline.commit_msg_cid = message_waits.signed_message_cid
|
||||||
|
WHERE sp_id = $1 AND sector_number = $2 AND executed_tsk_epoch is not null`, task.SpID, task.SectorNumber)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorw("failed to query message_waits", "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(execResult) > 0 {
|
||||||
|
maddr, err := address.NewIDAddress(uint64(task.SpID))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("get sector info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if si != nil {
|
||||||
|
// todo handdle missing sector info (not found after cron)
|
||||||
|
} else {
|
||||||
|
// yay!
|
||||||
|
|
||||||
|
_, err := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET
|
||||||
|
after_commit_msg_success = true, commit_msg_tsk = $1
|
||||||
|
WHERE sp_id = $2 AND sector_number = $3 and after_commit_msg_success is NULL`,
|
||||||
|
execResult[0].ExecutedTskCID, task.SpID, task.SectorNumber)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("update sectors_sdr_pipeline: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,7 +113,7 @@ func (p *PoRepTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
|
|||||||
|
|
||||||
// store success!
|
// store success!
|
||||||
n, err := p.db.Exec(ctx, `UPDATE sectors_sdr_pipeline
|
n, err := p.db.Exec(ctx, `UPDATE sectors_sdr_pipeline
|
||||||
SET after_sdr = true, seed_value = $3, porep_proof = $4
|
SET after_porep = true, seed_value = $3, porep_proof = $4
|
||||||
WHERE sp_id = $1 AND sector_number = $2`,
|
WHERE sp_id = $1 AND sector_number = $2`,
|
||||||
sectorParams.SpID, sectorParams.SectorNumber, []byte(rand), proof)
|
sectorParams.SpID, sectorParams.SectorNumber, []byte(rand), proof)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user