lotus/curiosrc/window/compute_task.go

444 lines
13 KiB
Go
Raw Normal View History

package window
2023-08-30 16:34:11 +00:00
import (
2023-10-31 08:23:10 +00:00
"bytes"
2023-08-30 16:34:11 +00:00
"context"
2023-11-28 01:54:08 +00:00
"encoding/json"
2023-10-31 08:05:32 +00:00
"fmt"
"sort"
"strings"
2023-10-25 19:13:56 +00:00
logging "github.com/ipfs/go-log/v2"
"github.com/samber/lo"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
2023-11-04 10:04:46 +00:00
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi"
2023-11-04 10:04:46 +00:00
"github.com/filecoin-project/go-state-types/crypto"
2023-08-30 16:34:11 +00:00
"github.com/filecoin-project/go-state-types/dline"
2023-11-04 10:04:46 +00:00
"github.com/filecoin-project/go-state-types/network"
2023-10-25 19:13:56 +00:00
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
2023-08-30 16:34:11 +00:00
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/curiosrc/chainsched"
2023-08-30 16:34:11 +00:00
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/lib/harmony/taskhelp"
2023-10-25 19:13:56 +00:00
"github.com/filecoin-project/lotus/lib/promise"
"github.com/filecoin-project/lotus/node/modules/dtypes"
2023-11-04 10:04:46 +00:00
"github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
2023-10-25 19:13:56 +00:00
"github.com/filecoin-project/lotus/storage/wdpost"
2023-08-30 16:34:11 +00:00
)
var log = logging.Logger("curio/window")
var EpochsPerDeadline = miner.WPoStProvingPeriod() / abi.ChainEpoch(miner.WPoStPeriodDeadlines)
2023-08-30 16:34:11 +00:00
type WdPostTaskDetails struct {
Ts *types.TipSet
Deadline *dline.Info
}
type WDPoStAPI interface {
ChainHead(context.Context) (*types.TipSet, error)
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
2023-10-25 14:00:49 +00:00
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
2023-10-25 18:58:16 +00:00
StateMinerPartitions(context.Context, address.Address, uint64, types.TipSetKey) ([]api.Partition, error)
2023-10-27 14:10:55 +00:00
StateGetRandomnessFromBeacon(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error)
StateNetworkVersion(context.Context, types.TipSetKey) (network.Version, error)
StateMinerSectors(context.Context, address.Address, *bitfield.BitField, types.TipSetKey) ([]*miner.SectorOnChainInfo, error)
}
type ProverPoSt interface {
GenerateWindowPoStAdv(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness, allowSkip bool) (storiface.WindowPoStResult, error)
}
2023-08-30 16:34:11 +00:00
type WdPostTask struct {
api WDPoStAPI
2023-10-25 14:00:49 +00:00
db *harmonydb.DB
2023-10-25 18:58:16 +00:00
2023-10-27 14:10:55 +00:00
faultTracker sealer.FaultTracker
prover ProverPoSt
verifier storiface.Verifier
2023-10-25 18:58:16 +00:00
windowPoStTF promise.Promise[harmonytask.AddTaskFunc]
2024-02-13 01:03:45 +00:00
actors map[dtypes.MinerAddress]bool
2023-10-27 23:08:18 +00:00
max int
2023-10-25 18:58:16 +00:00
}
type wdTaskIdentity struct {
2023-11-15 16:04:04 +00:00
SpID uint64 `db:"sp_id"`
ProvingPeriodStart abi.ChainEpoch `db:"proving_period_start"`
DeadlineIndex uint64 `db:"deadline_index"`
PartitionIndex uint64 `db:"partition_index"`
2023-08-30 16:34:11 +00:00
}
2023-11-03 21:40:28 +00:00
func NewWdPostTask(db *harmonydb.DB,
api WDPoStAPI,
faultTracker sealer.FaultTracker,
prover ProverPoSt,
verifier storiface.Verifier,
pcs *chainsched.CurioChainSched,
2024-02-13 01:03:45 +00:00
actors map[dtypes.MinerAddress]bool,
2023-11-03 21:40:28 +00:00
max int,
) (*WdPostTask, error) {
t := &WdPostTask{
db: db,
api: api,
faultTracker: faultTracker,
prover: prover,
verifier: verifier,
actors: actors,
max: max,
}
2024-01-05 15:10:34 +00:00
if pcs != nil {
if err := pcs.AddHandler(t.processHeadChange); err != nil {
return nil, err
}
2023-11-03 21:40:28 +00:00
}
return t, nil
}
2023-08-30 16:34:11 +00:00
func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
2023-11-03 21:40:28 +00:00
log.Debugw("WdPostTask.Do()", "taskID", taskID)
2023-10-05 15:52:22 +00:00
var spID, pps, dlIdx, partIdx uint64
2023-10-05 15:52:22 +00:00
err = t.db.QueryRow(context.Background(),
`Select sp_id, proving_period_start, deadline_index, partition_index
from wdpost_partition_tasks
2023-10-05 15:52:22 +00:00
where task_id = $1`, taskID).Scan(
&spID, &pps, &dlIdx, &partIdx,
2023-10-05 15:52:22 +00:00
)
if err != nil {
log.Errorf("WdPostTask.Do() failed to queryRow: %v", err)
return false, err
}
head, err := t.api.ChainHead(context.Background())
if err != nil {
log.Errorf("WdPostTask.Do() failed to get chain head: %v", err)
return false, err
}
2023-10-27 14:10:55 +00:00
deadline := wdpost.NewDeadlineInfo(abi.ChainEpoch(pps), dlIdx, head.Height())
2024-02-20 16:35:52 +00:00
var testTask *int
isTestTask := func() bool {
if testTask != nil {
return *testTask > 0
}
testTask = new(int)
err := t.db.QueryRow(context.Background(), `SELECT COUNT(*) FROM harmony_test WHERE task_id = $1`, taskID).Scan(testTask)
if err != nil {
log.Errorf("WdPostTask.Do() failed to queryRow: %v", err)
return false
}
return *testTask > 0
}
if deadline.PeriodElapsed() && !isTestTask() {
log.Errorf("WdPost removed stale task: %v %v", taskID, deadline)
return true, nil
2023-10-05 15:52:22 +00:00
}
2024-02-20 16:35:52 +00:00
if deadline.Challenge > head.Height() {
if isTestTask() {
deadline = wdpost.NewDeadlineInfo(abi.ChainEpoch(pps)-deadline.WPoStProvingPeriod, dlIdx, head.Height()-deadline.WPoStProvingPeriod)
log.Warnw("Test task is in the future, adjusting to past", "taskID", taskID, "deadline", deadline)
}
}
2023-10-27 14:10:55 +00:00
maddr, err := address.NewIDAddress(spID)
if err != nil {
log.Errorf("WdPostTask.Do() failed to NewIDAddress: %v", err)
return false, err
}
ts, err := t.api.ChainGetTipSetAfterHeight(context.Background(), deadline.Challenge, head.Key())
if err != nil {
log.Errorf("WdPostTask.Do() failed to ChainGetTipSetAfterHeight: %v", err)
return false, err
}
postOut, err := t.DoPartition(context.Background(), ts, maddr, deadline, partIdx)
2023-10-27 14:10:55 +00:00
if err != nil {
log.Errorf("WdPostTask.Do() failed to doPartition: %v", err)
return false, err
}
2023-10-31 08:23:10 +00:00
var msgbuf bytes.Buffer
if err := postOut.MarshalCBOR(&msgbuf); err != nil {
return false, xerrors.Errorf("marshaling PoSt: %w", err)
}
2023-10-27 14:10:55 +00:00
2024-02-20 16:35:52 +00:00
if isTestTask() {
2023-11-28 01:54:08 +00:00
// Do not send test tasks to the chain but to harmony_test & stdout.
data, err := json.MarshalIndent(map[string]any{
"sp_id": spID,
"proving_period_start": pps,
"deadline": deadline.Index,
"partition": partIdx,
"submit_at_epoch": deadline.Open,
"submit_by_epoch": deadline.Close,
"proof_params": msgbuf.Bytes(),
}, "", " ")
if err != nil {
return false, xerrors.Errorf("marshaling message: %w", err)
}
ctx := context.Background()
_, err = t.db.Exec(ctx, `UPDATE harmony_test SET result=$1 WHERE task_id=$2`, string(data), taskID)
if err != nil {
return false, xerrors.Errorf("updating harmony_test: %w", err)
}
log.Infof("SKIPPED sending test message to chain. SELECT * FROM harmony_test WHERE task_id= %v", taskID)
return true, nil // nothing committed
2023-11-22 04:43:13 +00:00
}
2023-10-31 08:23:10 +00:00
// Insert into wdpost_proofs table
2023-11-04 11:32:27 +00:00
n, err := t.db.Exec(context.Background(),
2023-10-31 08:23:10 +00:00
`INSERT INTO wdpost_proofs (
sp_id,
2023-11-04 11:32:27 +00:00
proving_period_start,
2023-10-31 08:23:10 +00:00
deadline,
partition,
submit_at_epoch,
submit_by_epoch,
2023-11-28 01:54:08 +00:00
proof_params)
VALUES ($1, $2, $3, $4, $5, $6, $7)`,
2023-10-31 08:23:10 +00:00
spID,
2023-11-04 11:32:27 +00:00
pps,
2023-10-31 08:23:10 +00:00
deadline.Index,
partIdx,
deadline.Open,
deadline.Close,
2023-11-22 04:43:13 +00:00
msgbuf.Bytes(),
)
2023-10-12 17:35:10 +00:00
2023-11-04 11:32:27 +00:00
if err != nil {
log.Errorf("WdPostTask.Do() failed to insert into wdpost_proofs: %v", err)
return false, err
}
if n != 1 {
log.Errorf("WdPostTask.Do() failed to insert into wdpost_proofs: %v", err)
return false, err
}
2023-08-30 16:34:11 +00:00
return true, nil
}
2023-10-31 08:05:32 +00:00
func entToStr[T any](t T, i int) string {
return fmt.Sprint(t)
}
func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
// GetEpoch
ts, err := t.api.ChainHead(context.Background())
if err != nil {
return nil, err
}
// GetData for tasks
type wdTaskDef struct {
2023-11-15 04:58:43 +00:00
TaskID harmonytask.TaskID
SpID uint64
ProvingPeriodStart abi.ChainEpoch
DeadlineIndex uint64
PartitionIndex uint64
dlInfo *dline.Info `pgx:"-"`
}
var tasks []wdTaskDef
2023-10-23 04:03:06 +00:00
err = t.db.Select(context.Background(), &tasks,
`Select
task_id,
sp_id,
proving_period_start,
deadline_index,
partition_index
from wdpost_partition_tasks
2023-10-31 08:05:32 +00:00
where task_id IN (SELECT unnest(string_to_array($1, ','))::bigint)`, strings.Join(lo.Map(ids, entToStr[harmonytask.TaskID]), ","))
if err != nil {
return nil, err
}
// Accept those past deadline, then delete them in Do().
for i := range tasks {
2023-11-15 04:58:43 +00:00
tasks[i].dlInfo = wdpost.NewDeadlineInfo(tasks[i].ProvingPeriodStart, tasks[i].DeadlineIndex, ts.Height())
if tasks[i].dlInfo.PeriodElapsed() {
2024-02-20 16:35:52 +00:00
// note: Those may be test tasks
2023-11-15 04:58:43 +00:00
return &tasks[i].TaskID, nil
}
}
2023-10-31 09:08:13 +00:00
// todo fix the block below
// workAdderMutex is held by taskTypeHandler.considerWork, which calls this CanAccept
// te.ResourcesAvailable will try to get that lock again, which will deadlock
// Discard those too big for our free RAM
2023-10-31 09:08:13 +00:00
/*freeRAM := te.ResourcesAvailable().Ram
tasks = lo.Filter(tasks, func(d wdTaskDef, _ int) bool {
maddr, err := address.NewIDAddress(tasks[0].Sp_id)
if err != nil {
log.Errorf("WdPostTask.CanAccept() failed to NewIDAddress: %v", err)
return false
}
mi, err := t.api.StateMinerInfo(context.Background(), maddr, ts.Key())
if err != nil {
log.Errorf("WdPostTask.CanAccept() failed to StateMinerInfo: %v", err)
return false
}
2023-10-25 18:58:16 +00:00
spt, err := policy.GetSealProofFromPoStProof(mi.WindowPoStProofType)
if err != nil {
log.Errorf("WdPostTask.CanAccept() failed to GetSealProofFromPoStProof: %v", err)
return false
}
return res[spt].MaxMemory <= freeRAM
2023-10-31 09:08:13 +00:00
})*/
if len(tasks) == 0 {
log.Infof("RAM too small for any WDPost task")
return nil, nil
}
// Ignore those with too many failures unless they are the only ones left.
tasks, _ = taskhelp.SliceIfFound(tasks, func(d wdTaskDef) bool {
var r int
err := t.db.QueryRow(context.Background(), `SELECT COUNT(*)
FROM harmony_task_history
2023-11-15 04:58:43 +00:00
WHERE task_id = $1 AND result = false`, d.TaskID).Scan(&r)
if err != nil {
log.Errorf("WdPostTask.CanAccept() failed to queryRow: %v", err)
}
return r < 2
})
// Select the one closest to the deadline
sort.Slice(tasks, func(i, j int) bool {
return tasks[i].dlInfo.Open < tasks[j].dlInfo.Open
})
2023-11-15 04:58:43 +00:00
return &tasks[0].TaskID, nil
2023-08-30 16:34:11 +00:00
}
var res = storiface.ResourceTable[sealtasks.TTGenerateWindowPoSt]
2023-08-30 16:34:11 +00:00
func (t *WdPostTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Name: "WdPost",
2023-10-27 23:08:18 +00:00
Max: t.max,
MaxFailures: 3,
Follows: nil,
Cost: resources.Resources{
Cpu: 1,
2023-10-31 08:05:32 +00:00
// todo set to something for 32/64G sector sizes? Technically windowPoSt is happy on a CPU
// but it will use a GPU if available
Gpu: 0,
// RAM of smallest proof's max is listed here
Ram: lo.Reduce(lo.Keys(res), func(i uint64, k abi.RegisteredSealProof, _ int) uint64 {
if res[k].MaxMemory < i {
return res[k].MaxMemory
}
return i
}, 1<<63),
},
2023-08-30 16:34:11 +00:00
}
}
func (t *WdPostTask) Adder(taskFunc harmonytask.AddTaskFunc) {
2023-10-25 18:58:16 +00:00
t.windowPoStTF.Set(taskFunc)
}
func (t *WdPostTask) processHeadChange(ctx context.Context, revert, apply *types.TipSet) error {
2024-02-13 01:03:45 +00:00
for act := range t.actors {
2023-10-25 18:58:16 +00:00
maddr := address.Address(act)
aid, err := address.IDFromAddress(maddr)
if err != nil {
return xerrors.Errorf("getting miner ID: %w", err)
}
di, err := t.api.StateMinerProvingDeadline(ctx, maddr, apply.Key())
if err != nil {
return err
}
if !di.PeriodStarted() {
return nil // not proving anything yet
}
partitions, err := t.api.StateMinerPartitions(ctx, maddr, di.Index, apply.Key())
if err != nil {
return xerrors.Errorf("getting partitions: %w", err)
}
2023-08-30 16:34:11 +00:00
2023-10-25 18:58:16 +00:00
// TODO: Batch Partitions??
for pidx := range partitions {
tid := wdTaskIdentity{
2023-11-15 04:58:43 +00:00
SpID: aid,
ProvingPeriodStart: di.PeriodStart,
DeadlineIndex: di.Index,
PartitionIndex: uint64(pidx),
2023-10-25 18:58:16 +00:00
}
2023-08-30 16:34:11 +00:00
2023-10-25 18:58:16 +00:00
tf := t.windowPoStTF.Val(ctx)
if tf == nil {
return xerrors.Errorf("no task func")
}
2023-08-30 16:34:11 +00:00
2023-10-25 18:58:16 +00:00
tf(func(id harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
return t.addTaskToDB(id, tid, tx)
})
}
2023-08-30 16:34:11 +00:00
}
2023-10-25 18:58:16 +00:00
return nil
2023-08-30 16:34:11 +00:00
}
2023-10-25 18:58:16 +00:00
func (t *WdPostTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIdentity, tx *harmonydb.Tx) (bool, error) {
2023-09-05 16:29:39 +00:00
2023-08-30 16:34:11 +00:00
_, err := tx.Exec(
`INSERT INTO wdpost_partition_tasks (
2023-08-30 16:34:11 +00:00
task_id,
sp_id,
proving_period_start,
deadline_index,
2023-10-27 16:22:39 +00:00
partition_index
) VALUES ($1, $2, $3, $4, $5)`,
2023-08-30 16:34:11 +00:00
taskId,
2023-11-15 04:58:43 +00:00
taskIdent.SpID,
taskIdent.ProvingPeriodStart,
taskIdent.DeadlineIndex,
taskIdent.PartitionIndex,
2023-08-30 16:34:11 +00:00
)
if err != nil {
2023-11-04 11:32:27 +00:00
return false, xerrors.Errorf("insert partition task: %w", err)
2023-08-30 16:34:11 +00:00
}
return true, nil
}
var _ harmonytask.TaskInterface = &WdPostTask{}