Merge pull request #11380 from filecoin-project/feat/lp-wdpost-submit

[wip] feat: sturdypost: WindowPoSt Submit
This commit is contained in:
Andrew Jackson (Ajax) 2023-11-08 10:57:16 -06:00 committed by GitHub
commit 8ecee2d28d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 555 additions and 98 deletions

View File

@ -244,12 +244,12 @@ var runCmd = &cli.Command{
{ {
if cfg.Subsystems.EnableWindowPost { if cfg.Subsystems.EnableWindowPost {
wdPostTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, wdPostTask, wdPoStSubmitTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw,
as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks) as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks)
if err != nil { if err != nil {
return err return err
} }
activeTasks = append(activeTasks, wdPostTask) activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask)
} }
} }
taskEngine, err := harmonytask.New(db, activeTasks, listenAddr) taskEngine, err := harmonytask.New(db, activeTasks, listenAddr)

1
go.sum
View File

@ -344,6 +344,7 @@ github.com/filecoin-project/go-state-types v0.0.0-20201102161440-c8033295a1fc/go
github.com/filecoin-project/go-state-types v0.1.0/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g= github.com/filecoin-project/go-state-types v0.1.0/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
github.com/filecoin-project/go-state-types v0.1.6/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q= github.com/filecoin-project/go-state-types v0.1.6/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q=
github.com/filecoin-project/go-state-types v0.1.10/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q= github.com/filecoin-project/go-state-types v0.1.10/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q=
github.com/filecoin-project/go-state-types v0.11.1/go.mod h1:SyNPwTsU7I22gL2r0OAPcImvLoTVfgRwdK/Y5rR1zz8=
github.com/filecoin-project/go-state-types v0.11.2-0.20230712101859-8f37624fa540/go.mod h1:SyNPwTsU7I22gL2r0OAPcImvLoTVfgRwdK/Y5rR1zz8= github.com/filecoin-project/go-state-types v0.11.2-0.20230712101859-8f37624fa540/go.mod h1:SyNPwTsU7I22gL2r0OAPcImvLoTVfgRwdK/Y5rR1zz8=
github.com/filecoin-project/go-state-types v0.12.5 h1:VQ2N2T3JeUDdIHEo/xhjnT7Q218Wl0UYIyglqT7Z9Ck= github.com/filecoin-project/go-state-types v0.12.5 h1:VQ2N2T3JeUDdIHEo/xhjnT7Q218Wl0UYIyglqT7Z9Ck=
github.com/filecoin-project/go-state-types v0.12.5/go.mod h1:iJTqGdWDvzXhuVf64Lw0hzt4TIoitMo0VgHdxdjNDZI= github.com/filecoin-project/go-state-types v0.12.5/go.mod h1:iJTqGdWDvzXhuVf64Lw0hzt4TIoitMo0VgHdxdjNDZI=

View File

@ -6,8 +6,7 @@ CREATE TABLE harmony_machines (
host_and_port varchar(300) NOT NULL, host_and_port varchar(300) NOT NULL,
cpu INTEGER NOT NULL, cpu INTEGER NOT NULL,
ram BIGINT NOT NULL, ram BIGINT NOT NULL,
gpu FLOAT NOT NULL, gpu FLOAT NOT NULL
gpuram BIGINT NOT NULL
); );
CREATE TABLE harmony_task ( CREATE TABLE harmony_task (

View File

@ -20,9 +20,16 @@ comment on column wdpost_partition_tasks.partition_index is 'partition index wit
create table wdpost_proofs create table wdpost_proofs
( (
sp_id bigint not null, sp_id bigint not null,
proving_period_start bigint not null,
deadline bigint not null, deadline bigint not null,
partition bigint not null, partition bigint not null,
submit_at_epoch bigint not null, submit_at_epoch bigint not null,
submit_by_epoch bigint not null, submit_by_epoch bigint not null,
proof_message bytea proof_params bytea,
submit_task_id bigint,
message_cid text,
constraint wdpost_proofs_identity_key
unique (sp_id, proving_period_start, deadline, partition)
); );

View File

@ -0,0 +1,22 @@
create table message_sends
(
from_key text not null,
nonce bigint not null,
to_addr text not null,
signed_data bytea not null,
signed_json jsonb not null,
signed_cid text not null,
send_time timestamp default CURRENT_TIMESTAMP,
send_reason text,
send_success bool default false not null,
constraint message_sends_pk
primary key (from_key, nonce)
);
comment on column message_sends.from_key is 'text f[1/3/4]... address';
comment on column message_sends.nonce is 'assigned message nonce';
comment on column message_sends.to_addr is 'text f[0/1/2/3/4]... address';
comment on column message_sends.signed_data is 'signed message data';
comment on column message_sends.signed_cid is 'signed message cid';
comment on column message_sends.send_reason is 'optional description of send reason';
comment on column message_sends.send_success is 'whether this message was broadcasted to the network already';

View File

@ -115,7 +115,7 @@ top:
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
stackSlice := make([]byte, 512) stackSlice := make([]byte, 4092)
sz := runtime.Stack(stackSlice, false) sz := runtime.Stack(stackSlice, false)
log.Error("Recovered from a serious error "+ log.Error("Recovered from a serious error "+
"while processing "+h.Name+" task "+strconv.Itoa(int(*tID))+": ", r, "while processing "+h.Name+" task "+strconv.Itoa(int(*tID))+": ", r,

View File

@ -3,6 +3,7 @@ package resources
import ( import (
"bytes" "bytes"
"context" "context"
"golang.org/x/xerrors"
"os/exec" "os/exec"
"regexp" "regexp"
"runtime" "runtime"
@ -42,7 +43,7 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
} }
ctx := context.Background() ctx := context.Background()
{ // Learn our owner_id while updating harmony_machines { // Learn our owner_id while updating harmony_machines
var ownerID int var ownerID *int
// Upsert query with last_contact update, fetch the machine ID // Upsert query with last_contact update, fetch the machine ID
// (note this isn't a simple insert .. on conflict because host_and_port isn't unique) // (note this isn't a simple insert .. on conflict because host_and_port isn't unique)
@ -54,7 +55,7 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
RETURNING id RETURNING id
), ),
inserted AS ( inserted AS (
INSERT INTO harmony_machines (host_and_port, cpu, ram, gpu, gpuram, last_contact) INSERT INTO harmony_machines (host_and_port, cpu, ram, gpu, last_contact)
SELECT $1, $2, $3, $4, CURRENT_TIMESTAMP SELECT $1, $2, $3, $4, CURRENT_TIMESTAMP
WHERE NOT EXISTS (SELECT id FROM upsert) WHERE NOT EXISTS (SELECT id FROM upsert)
RETURNING id RETURNING id
@ -64,10 +65,13 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
SELECT id FROM inserted; SELECT id FROM inserted;
`, hostnameAndPort, reg.Cpu, reg.Ram, reg.Gpu).Scan(&ownerID) `, hostnameAndPort, reg.Cpu, reg.Ram, reg.Gpu).Scan(&ownerID)
if err != nil { if err != nil {
return nil, err return nil, xerrors.Errorf("inserting machine entry: %w", err)
}
if ownerID == nil {
return nil, xerrors.Errorf("no owner id")
} }
reg.MachineID = ownerID reg.MachineID = *ownerID
cleaned := CleanupMachines(context.Background(), db) cleaned := CleanupMachines(context.Background(), db)
logger.Infow("Cleaned up machines", "count", cleaned) logger.Infow("Cleaned up machines", "count", cleaned)
@ -87,9 +91,10 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
return &reg, nil return &reg, nil
} }
func CleanupMachines(ctx context.Context, db *harmonydb.DB) int { func CleanupMachines(ctx context.Context, db *harmonydb.DB) int {
ct, err := db.Exec(ctx, `DELETE FROM harmony_machines WHERE last_contact < $1`, ct, err := db.Exec(ctx, `DELETE FROM harmony_machines WHERE last_contact < $1`,
time.Now().Add(-1*LOOKS_DEAD_TIMEOUT)) time.Now().Add(-1*LOOKS_DEAD_TIMEOUT).UTC())
if err != nil { if err != nil {
logger.Warn("unable to delete old machines: ", err) logger.Warn("unable to delete old machines: ", err)
} }

View File

@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
dtypes "github.com/filecoin-project/lotus/node/modules/dtypes" dtypes "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/provider/chainsched" "github.com/filecoin-project/lotus/provider/chainsched"
"github.com/filecoin-project/lotus/provider/lpmessage"
"github.com/filecoin-project/lotus/provider/lpwindow" "github.com/filecoin-project/lotus/provider/lpwindow"
"github.com/filecoin-project/lotus/storage/ctladdr" "github.com/filecoin-project/lotus/storage/ctladdr"
"github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/paths"
@ -22,19 +23,26 @@ var log = logging.Logger("provider")
func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig, func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker, api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker,
as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, error) { as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, *lpwindow.WdPostSubmitTask, error) {
chainSched := chainsched.New(api) chainSched := chainsched.New(api)
// todo config // todo config
ft := lpwindow.NewSimpleFaultTracker(stor, idx, 32, 5*time.Second, 300*time.Second) ft := lpwindow.NewSimpleFaultTracker(stor, idx, 32, 5*time.Second, 300*time.Second)
task, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, maddr, max) sender := lpmessage.NewSender(api, api, db)
computeTask, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, maddr, max)
if err != nil { if err != nil {
return nil, err return nil, nil, err
}
submitTask, err := lpwindow.NewWdPostSubmitTask(chainSched, sender, db, api, fc.MaxWindowPoStGasFee, as)
if err != nil {
return nil, nil, err
} }
go chainSched.Run(ctx) go chainSched.Run(ctx)
return task, nil return computeTask, submitTask, nil
} }

View File

@ -0,0 +1,177 @@
package lpmessage
import (
"context"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
)
var log = logging.Logger("lpmessage")
type SenderAPI interface {
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
WalletBalance(ctx context.Context, addr address.Address) (big.Int, error)
MpoolGetNonce(context.Context, address.Address) (uint64, error)
MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error)
}
type SignerAPI interface {
WalletSignMessage(context.Context, address.Address, *types.Message) (*types.SignedMessage, error)
}
// Sender abstracts away highly-available message sending with coordination through
// HarmonyDB. It make sure that nonces are assigned transactionally, and that
// messages are correctly broadcasted to the network. It is not a Task in the sense
// of a HarmonyTask interface, just a helper for tasks which need to send messages
// to the network.
type Sender struct {
api SenderAPI
signer SignerAPI
db *harmonydb.DB
}
// NewSender creates a new Sender.
func NewSender(api SenderAPI, signer SignerAPI, db *harmonydb.DB) *Sender {
return &Sender{
api: api,
signer: signer,
db: db,
}
}
// Send atomically assigns a nonce, signs, and pushes a message
// to mempool.
// maxFee is only used when GasFeeCap/GasPremium fields aren't specified
//
// When maxFee is set to 0, Send will guess appropriate fee
// based on current chain conditions
//
// Send behaves much like fullnodeApi.MpoolPushMessage, but it coordinates
// through HarmonyDB, making it safe to broadcast messages from multiple independent
// API nodes
//
// Send is also currently more strict about required parameters than MpoolPushMessage
func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageSendSpec, reason string) (cid.Cid, error) {
if mss == nil {
return cid.Undef, xerrors.Errorf("MessageSendSpec cannot be nil")
}
if (mss.MsgUuid != uuid.UUID{}) {
return cid.Undef, xerrors.Errorf("MessageSendSpec.MsgUuid must be zero")
}
fromA, err := s.api.StateAccountKey(ctx, msg.From, types.EmptyTSK)
if err != nil {
return cid.Undef, xerrors.Errorf("getting key address: %w", err)
}
msg.From = fromA
if msg.Nonce != 0 {
return cid.Undef, xerrors.Errorf("Send expects message nonce to be 0, was %d", msg.Nonce)
}
msg, err = s.api.GasEstimateMessageGas(ctx, msg, mss, types.EmptyTSK)
if err != nil {
return cid.Undef, xerrors.Errorf("GasEstimateMessageGas error: %w", err)
}
b, err := s.api.WalletBalance(ctx, msg.From)
if err != nil {
return cid.Undef, xerrors.Errorf("mpool push: getting origin balance: %w", err)
}
requiredFunds := big.Add(msg.Value, msg.RequiredFunds())
if b.LessThan(requiredFunds) {
return cid.Undef, xerrors.Errorf("mpool push: not enough funds: %s < %s", b, requiredFunds)
}
var sigMsg *types.SignedMessage
// start db tx
c, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
// assign nonce (max(api.MpoolGetNonce, db nonce+1))
msgNonce, err := s.api.MpoolGetNonce(ctx, fromA)
if err != nil {
return false, xerrors.Errorf("getting nonce from mpool: %w", err)
}
// get nonce from db
var dbNonce *uint64
r := tx.QueryRow(`select max(nonce) from message_sends where from_key = $1`, fromA.String())
if err := r.Scan(&dbNonce); err != nil {
return false, xerrors.Errorf("getting nonce from db: %w", err)
}
if dbNonce != nil && *dbNonce+1 > msgNonce {
msgNonce = *dbNonce + 1
}
msg.Nonce = msgNonce
// sign message
sigMsg, err = s.signer.WalletSignMessage(ctx, msg.From, msg)
if err != nil {
return false, xerrors.Errorf("signing message: %w", err)
}
data, err := sigMsg.Serialize()
if err != nil {
return false, xerrors.Errorf("serializing message: %w", err)
}
jsonBytes, err := sigMsg.MarshalJSON()
if err != nil {
return false, xerrors.Errorf("marshaling message: %w", err)
}
// write to db
c, err := tx.Exec(`insert into message_sends (from_key, nonce, to_addr, signed_data, signed_json, signed_cid, send_reason) values ($1, $2, $3, $4, $5, $6, $7)`,
fromA.String(), msg.Nonce, msg.To.String(), data, string(jsonBytes), sigMsg.Cid().String(), reason)
if err != nil {
return false, xerrors.Errorf("inserting message into db: %w", err)
}
if c != 1 {
return false, xerrors.Errorf("inserting message into db: expected 1 row to be affected, got %d", c)
}
// commit
return true, nil
})
if err != nil || !c {
return cid.Undef, xerrors.Errorf("transaction failed or didn't commit: %w", err)
}
// push to mpool
_, err = s.api.MpoolPush(ctx, sigMsg)
if err != nil {
// TODO: We may get nonce gaps here..
return cid.Undef, xerrors.Errorf("mpool push: failed to push message: %w", err)
}
// update db recocd to say it was pushed (set send_success to true)
cn, err := s.db.Exec(ctx, `update message_sends set send_success = true where from_key = $1 and nonce = $2`, fromA.String(), msg.Nonce)
if err != nil {
return cid.Undef, xerrors.Errorf("updating db record: %w", err)
}
if cn != 1 {
return cid.Undef, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", c)
}
log.Infow("sent message", "cid", sigMsg.Cid(), "from", fromA, "to", msg.To, "nonce", msg.Nonce, "value", msg.Value, "gaslimit", msg.GasLimit)
return sigMsg.Cid(), nil
}

View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"sort" "sort"
"strings" "strings"
"time"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/samber/lo" "github.com/samber/lo"
@ -81,12 +80,37 @@ type wdTaskIdentity struct {
Partition_index uint64 Partition_index uint64
} }
func NewWdPostTask(db *harmonydb.DB,
api WDPoStAPI,
faultTracker sealer.FaultTracker,
prover ProverPoSt,
verifier storiface.Verifier,
pcs *chainsched.ProviderChainSched,
actors []dtypes.MinerAddress,
max int,
) (*WdPostTask, error) {
t := &WdPostTask{
db: db,
api: api,
faultTracker: faultTracker,
prover: prover,
verifier: verifier,
actors: actors,
max: max,
}
if err := pcs.AddHandler(t.processHeadChange); err != nil {
return nil, err
}
return t, nil
}
func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
log.Debugw("WdPostTask.Do()", "taskID", taskID)
log.Errorw("WDPOST DO", "taskID", taskID)
time.Sleep(5 * time.Second)
log.Errorf("WdPostTask.Do() called with taskID: %v", taskID)
var spID, pps, dlIdx, partIdx uint64 var spID, pps, dlIdx, partIdx uint64
@ -138,58 +162,32 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
} }
// Insert into wdpost_proofs table // Insert into wdpost_proofs table
_, err = t.db.Exec(context.Background(), n, err := t.db.Exec(context.Background(),
`INSERT INTO wdpost_proofs ( `INSERT INTO wdpost_proofs (
sp_id, sp_id,
proving_period_start,
deadline, deadline,
partition, partition,
submit_at_epoch, submit_at_epoch,
submit_by_epoch, submit_by_epoch,
proof_message) proof_params)
VALUES ($1, $2, $3, $4, $5, $6)`, VALUES ($1, $2, $3, $4, $5, $6, $7)`,
spID, spID,
pps,
deadline.Index, deadline.Index,
partIdx, partIdx,
deadline.Open, deadline.Open,
deadline.Close, deadline.Close,
msgbuf.Bytes()) msgbuf.Bytes())
/*submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts)
if err != nil { if err != nil {
log.Errorf("WdPostTask.Do() failed to runPoStCycle: %v", err) log.Errorf("WdPostTask.Do() failed to insert into wdpost_proofs: %v", err)
return false, err return false, err
} }
if n != 1 {
log.Errorf("WdPostTask.Do() called with taskID: %v, submitWdPostParams: %v", taskID, submitWdPostParams) log.Errorf("WdPostTask.Do() failed to insert into wdpost_proofs: %v", err)
// Enter an entry for each wdpost message proof into the wdpost_proofs table
for _, params := range submitWdPostParams {
// Convert submitWdPostParams.Partitions to a byte array using CBOR
buf := new(bytes.Buffer)
scratch := make([]byte, 9)
if err := cbg.WriteMajorTypeHeaderBuf(scratch, buf, cbg.MajArray, uint64(len(params.Partitions))); err != nil {
return false, err return false, err
} }
for _, v := range params.Partitions {
if err := v.MarshalCBOR(buf); err != nil {
return false, err
}
}
// Insert into wdpost_proofs table
_, err = t.db.Exec(context.Background(),
`INSERT INTO wdpost_proofs (
deadline,
partitions,
proof_type,
proof_bytes)
VALUES ($1, $2, $3, $4)`,
params.Deadline,
buf.Bytes(),
params.Proofs[0].PoStProof,
params.Proofs[0].ProofBytes)
}*/
return true, nil return true, nil
} }
@ -199,9 +197,6 @@ func entToStr[T any](t T, i int) string {
} }
func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
log.Errorw("WDPOST CANACCEPT", "ids", ids)
// GetEpoch // GetEpoch
ts, err := t.api.ChainHead(context.Background()) ts, err := t.api.ChainHead(context.Background())
@ -378,35 +373,6 @@ func (t *WdPostTask) processHeadChange(ctx context.Context, revert, apply *types
return nil return nil
} }
func NewWdPostTask(db *harmonydb.DB,
api WDPoStAPI,
faultTracker sealer.FaultTracker,
prover ProverPoSt,
verifier storiface.Verifier,
pcs *chainsched.ProviderChainSched,
actors []dtypes.MinerAddress,
max int,
) (*WdPostTask, error) {
t := &WdPostTask{
db: db,
api: api,
faultTracker: faultTracker,
prover: prover,
verifier: verifier,
actors: actors,
max: max,
}
if err := pcs.AddHandler(t.processHeadChange); err != nil {
return nil, err
}
return t, nil
}
func (t *WdPostTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIdentity, tx *harmonydb.Tx) (bool, error) { func (t *WdPostTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIdentity, tx *harmonydb.Tx) (bool, error) {
_, err := tx.Exec( _, err := tx.Exec(
@ -424,7 +390,7 @@ func (t *WdPostTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIden
taskIdent.Partition_index, taskIdent.Partition_index,
) )
if err != nil { if err != nil {
return false, err return false, xerrors.Errorf("insert partition task: %w", err)
} }
return true, nil return true, nil

View File

@ -0,0 +1,272 @@
package lpwindow
import (
"bytes"
"context"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/storage/wdpost"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/go-state-types/builtin/v9/miner"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
"github.com/filecoin-project/lotus/lib/harmony/resources"
"github.com/filecoin-project/lotus/lib/promise"
"github.com/filecoin-project/lotus/provider/chainsched"
"github.com/filecoin-project/lotus/provider/lpmessage"
"github.com/filecoin-project/lotus/storage/ctladdr"
)
type WdPoStSubmitTaskApi interface {
ChainHead(context.Context) (*types.TipSet, error)
WalletBalance(context.Context, address.Address) (types.BigInt, error)
WalletHas(context.Context, address.Address) (bool, error)
StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error)
GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error)
GasEstimateGasPremium(_ context.Context, nblocksincl uint64, sender address.Address, gaslimit int64, tsk types.TipSetKey) (types.BigInt, error)
}
type WdPostSubmitTask struct {
sender *lpmessage.Sender
db *harmonydb.DB
api WdPoStSubmitTaskApi
maxWindowPoStGasFee types.FIL
as *ctladdr.AddressSelector
submitPoStTF promise.Promise[harmonytask.AddTaskFunc]
}
func NewWdPostSubmitTask(pcs *chainsched.ProviderChainSched, send *lpmessage.Sender, db *harmonydb.DB, api WdPoStSubmitTaskApi, maxWindowPoStGasFee types.FIL, as *ctladdr.AddressSelector) (*WdPostSubmitTask, error) {
res := &WdPostSubmitTask{
sender: send,
db: db,
api: api,
maxWindowPoStGasFee: maxWindowPoStGasFee,
as: as,
}
if err := pcs.AddHandler(res.processHeadChange); err != nil {
return nil, err
}
return res, nil
}
func (w *WdPostSubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
log.Debugw("WdPostSubmitTask.Do", "taskID", taskID)
var spID uint64
var deadline uint64
var partition uint64
var pps, submitAtEpoch, submitByEpoch abi.ChainEpoch
var earlyParamBytes []byte
var dbTask uint64
err = w.db.QueryRow(
context.Background(), `SELECT sp_id, proving_period_start, deadline, partition, submit_at_epoch, submit_by_epoch, proof_params, submit_task_id
FROM wdpost_proofs WHERE submit_task_id = $1`, taskID,
).Scan(&spID, &pps, &deadline, &partition, &submitAtEpoch, &submitByEpoch, &earlyParamBytes, &dbTask)
if err != nil {
return false, xerrors.Errorf("query post proof: %w", err)
}
if dbTask != uint64(taskID) {
return false, xerrors.Errorf("taskID mismatch: %d != %d", dbTask, taskID)
}
head, err := w.api.ChainHead(context.Background())
if err != nil {
return false, xerrors.Errorf("getting chain head: %w", err)
}
if head.Height() > submitByEpoch {
// we missed the deadline, no point in submitting
log.Errorw("missed submit deadline", "spID", spID, "deadline", deadline, "partition", partition, "submitByEpoch", submitByEpoch, "headHeight", head.Height())
return true, nil
}
if head.Height() < submitAtEpoch {
log.Errorw("submit epoch not reached", "spID", spID, "deadline", deadline, "partition", partition, "submitAtEpoch", submitAtEpoch, "headHeight", head.Height())
return false, xerrors.Errorf("submit epoch not reached: %d < %d", head.Height(), submitAtEpoch)
}
dlInfo := wdpost.NewDeadlineInfo(pps, deadline, head.Height())
var params miner.SubmitWindowedPoStParams
if err := params.UnmarshalCBOR(bytes.NewReader(earlyParamBytes)); err != nil {
return false, xerrors.Errorf("unmarshaling proof message: %w", err)
}
commEpoch := dlInfo.Challenge
commRand, err := w.api.StateGetRandomnessFromTickets(context.Background(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil, head.Key())
if err != nil {
err = xerrors.Errorf("failed to get chain randomness from tickets for windowPost (epoch=%d): %w", commEpoch, err)
log.Errorf("submitPoStMessage failed: %+v", err)
return false, xerrors.Errorf("getting post commit randomness: %w", err)
}
params.ChainCommitEpoch = commEpoch
params.ChainCommitRand = commRand
var pbuf bytes.Buffer
if err := params.MarshalCBOR(&pbuf); err != nil {
return false, xerrors.Errorf("marshaling proof message: %w", err)
}
maddr, err := address.NewIDAddress(spID)
if err != nil {
return false, xerrors.Errorf("invalid miner address: %w", err)
}
mi, err := w.api.StateMinerInfo(context.Background(), maddr, types.EmptyTSK)
if err != nil {
return false, xerrors.Errorf("error getting miner info: %w", err)
}
msg := &types.Message{
To: maddr,
Method: builtin.MethodsMiner.SubmitWindowedPoSt,
Params: pbuf.Bytes(),
Value: big.Zero(),
From: mi.Worker, // set worker for now for gas estimation
}
// calculate a more frugal estimation; premium is estimated to guarantee
// inclusion within 5 tipsets, and fee cap is estimated for inclusion
// within 4 tipsets.
minGasFeeMsg := *msg
minGasFeeMsg.GasPremium, err = w.api.GasEstimateGasPremium(context.Background(), 5, msg.From, msg.GasLimit, types.EmptyTSK)
if err != nil {
log.Errorf("failed to estimate minimum gas premium: %+v", err)
minGasFeeMsg.GasPremium = msg.GasPremium
}
minGasFeeMsg.GasFeeCap, err = w.api.GasEstimateFeeCap(context.Background(), &minGasFeeMsg, 4, types.EmptyTSK)
if err != nil {
log.Errorf("failed to estimate minimum gas fee cap: %+v", err)
minGasFeeMsg.GasFeeCap = msg.GasFeeCap
}
// goodFunds = funds needed for optimal inclusion probability.
// minFunds = funds needed for more speculative inclusion probability.
goodFunds := big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value)
minFunds := big.Min(big.Add(minGasFeeMsg.RequiredFunds(), minGasFeeMsg.Value), goodFunds)
from, _, err := w.as.AddressFor(context.Background(), w.api, mi, api.PoStAddr, goodFunds, minFunds)
if err != nil {
return false, xerrors.Errorf("error getting address: %w", err)
}
msg.From = from
mss := &api.MessageSendSpec{
MaxFee: abi.TokenAmount(w.maxWindowPoStGasFee),
}
smsg, err := w.sender.Send(context.Background(), msg, mss, "wdpost")
if err != nil {
return false, xerrors.Errorf("sending proof message: %w", err)
}
// set message_cid in the wdpost_proofs entry
_, err = w.db.Exec(context.Background(), `UPDATE wdpost_proofs SET message_cid = $1 WHERE sp_id = $2 AND proving_period_start = $3 AND deadline = $4 AND partition = $5`, smsg.String(), spID, pps, deadline, partition)
if err != nil {
return true, xerrors.Errorf("updating wdpost_proofs: %w", err)
}
return true, nil
}
func (w *WdPostSubmitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
if len(ids) == 0 {
// probably can't happen, but panicking is bad
return nil, nil
}
if w.sender == nil {
// we can't send messages
return nil, nil
}
return &ids[0], nil
}
func (w *WdPostSubmitTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Max: 128,
Name: "WdPostSubmit",
Cost: resources.Resources{
Cpu: 0,
Gpu: 0,
Ram: 10 << 20,
},
MaxFailures: 10,
Follows: nil, // ??
}
}
func (w *WdPostSubmitTask) Adder(taskFunc harmonytask.AddTaskFunc) {
w.submitPoStTF.Set(taskFunc)
}
func (w *WdPostSubmitTask) processHeadChange(ctx context.Context, revert, apply *types.TipSet) error {
tf := w.submitPoStTF.Val(ctx)
qry, err := w.db.Query(ctx, `SELECT sp_id, proving_period_start, deadline, partition, submit_at_epoch FROM wdpost_proofs WHERE submit_task_id IS NULL AND submit_at_epoch <= $1`, apply.Height())
if err != nil {
return err
}
defer qry.Close()
for qry.Next() {
var spID int64
var pps int64
var deadline uint64
var partition uint64
var submitAtEpoch uint64
if err := qry.Scan(&spID, &pps, &deadline, &partition, &submitAtEpoch); err != nil {
return xerrors.Errorf("scan submittable posts: %w", err)
}
tf(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, err error) {
// update in transaction iff submit_task_id is still null
res, err := tx.Exec(`UPDATE wdpost_proofs SET submit_task_id = $1 WHERE sp_id = $2 AND proving_period_start = $3 AND deadline = $4 AND partition = $5 AND submit_task_id IS NULL`, id, spID, pps, deadline, partition)
if err != nil {
return false, xerrors.Errorf("query ready proof: %w", err)
}
if res != 1 {
return false, nil
}
return true, nil
})
}
if err := qry.Err(); err != nil {
return err
}
return nil
}
var _ harmonytask.TaskInterface = &WdPostSubmitTask{}