Merge branch 'feat/wdpost-adder2' into wdpost-can-accept

This commit is contained in:
Andrew Jackson (Ajax) 2023-10-31 17:16:04 -05:00
commit e4f09bc1a4
5 changed files with 105 additions and 38 deletions

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"encoding/base64"
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
@ -9,8 +10,10 @@ import (
"time" "time"
"github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-statestore"
"github.com/gbrlsnchs/jwt/v3"
ds "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync" dssync "github.com/ipfs/go-datastore/sync"
"golang.org/x/xerrors"
"github.com/gin-contrib/pprof" "github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@ -34,7 +37,6 @@ import (
"github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/provider" "github.com/filecoin-project/lotus/provider"
@ -200,9 +202,9 @@ var runCmd = &cli.Command{
} }
defer fullCloser() defer fullCloser()
sa, err := modules.StorageAuth(ctx, full) sa, err := StorageAuth(cfg.Apis.StorageRPCSecret)
if err != nil { if err != nil {
return err return xerrors.Errorf("parsing Apis.StorageRPCSecret config: %w", err)
} }
al := alerting.NewAlertingSystem(j) al := alerting.NewAlertingSystem(j)
@ -309,5 +311,34 @@ func makeDB(cctx *cli.Context) (*harmonydb.DB, error) {
Port: cctx.String("db-port"), Port: cctx.String("db-port"),
} }
return harmonydb.NewFromConfig(dbConfig) return harmonydb.NewFromConfig(dbConfig)
}
type jwtPayload struct {
Allow []auth.Permission
}
func StorageAuth(apiKey string) (sealer.StorageAuth, error) {
if apiKey == "" {
return nil, xerrors.Errorf("no api key provided")
}
rawKey, err := base64.StdEncoding.DecodeString(apiKey)
if err != nil {
return nil, xerrors.Errorf("decoding api key: %w", err)
}
key := jwt.NewHS256(rawKey)
p := jwtPayload{
Allow: []auth.Permission{"admin"},
}
token, err := jwt.Sign(&p, key)
if err != nil {
return nil, err
}
headers := http.Header{}
headers.Add("Authorization", "Bearer "+string(token))
return sealer.StorageAuth(headers), nil
} }

View File

@ -26,7 +26,3 @@ create table wdpost_proofs
submit_by_epoch bigint not null, submit_by_epoch bigint not null,
proof_message bytea proof_message bytea
); );

View File

@ -3,7 +3,6 @@ package resources
import ( import (
"bytes" "bytes"
"context" "context"
"fmt"
"os/exec" "os/exec"
"regexp" "regexp"
"runtime" "runtime"
@ -43,29 +42,33 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
} }
ctx := context.Background() ctx := context.Background()
{ // Learn our owner_id while updating harmony_machines { // Learn our owner_id while updating harmony_machines
var ownerID []int var ownerID int
err := db.Select(ctx, &ownerID, `SELECT id FROM harmony_machines WHERE host_and_port=$1`, hostnameAndPort)
if err != nil {
return nil, fmt.Errorf("could not read from harmony_machines: %w", err)
}
if len(ownerID) == 0 {
err = db.QueryRow(ctx, `INSERT INTO harmony_machines
(host_and_port, cpu, ram, gpu, gpuram) VALUES
($1,$2,$3,$4,0) RETURNING id`,
hostnameAndPort, reg.Cpu, reg.Ram, reg.Gpu).Scan(&reg.Resources.MachineID)
if err != nil {
return nil, err
}
} else { // Upsert query with last_contact update, fetch the machine ID
reg.MachineID = ownerID[0] // (note this isn't a simple insert .. on conflict because host_and_port isn't unique)
_, err := db.Exec(ctx, `UPDATE harmony_machines SET err := db.QueryRow(ctx, `
cpu=$1, ram=$2, gpu=$3 WHERE id=$5`, WITH upsert AS (
reg.Cpu, reg.Ram, reg.Gpu, reg.Resources.MachineID) UPDATE harmony_machines
if err != nil { SET cpu = $2, ram = $3, gpu = $4, last_contact = CURRENT_TIMESTAMP
return nil, err WHERE host_and_port = $1
} RETURNING id
),
inserted AS (
INSERT INTO harmony_machines (host_and_port, cpu, ram, gpu, gpuram, last_contact)
SELECT $1, $2, $3, $4, CURRENT_TIMESTAMP
WHERE NOT EXISTS (SELECT id FROM upsert)
RETURNING id
)
SELECT id FROM upsert
UNION ALL
SELECT id FROM inserted;
`, hostnameAndPort, reg.Cpu, reg.Ram, reg.Gpu).Scan(&ownerID)
if err != nil {
return nil, err
} }
reg.MachineID = ownerID
cleaned := CleanupMachines(context.Background(), db) cleaned := CleanupMachines(context.Background(), db)
logger.Infow("Cleaned up machines", "count", cleaned) logger.Infow("Cleaned up machines", "count", cleaned)
} }

View File

@ -79,6 +79,11 @@ type LotusProviderConfig struct {
type ApisConfig struct { type ApisConfig struct {
// FULLNODE_API_INFO is the API endpoint for the Lotus daemon. // FULLNODE_API_INFO is the API endpoint for the Lotus daemon.
FULLNODE_API_INFO []string FULLNODE_API_INFO []string
// RPC Secret for the storage subsystem.
// If integrating with lotus-miner this must match the value from
// cat ~/.lotusminer/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU | jq -r .PrivateKey
StorageRPCSecret string
} }
type JournalConfig struct { type JournalConfig struct {

View File

@ -1,8 +1,11 @@
package lpwindow package lpwindow
import ( import (
"bytes"
"context" "context"
"fmt"
"sort" "sort"
"strings"
"time" "time"
"github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-bitfield"
@ -20,7 +23,6 @@ import (
"github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/harmonytask"
@ -131,9 +133,27 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return false, err return false, err
} }
panic("todo record") var msgbuf bytes.Buffer
if err := postOut.MarshalCBOR(&msgbuf); err != nil {
return false, xerrors.Errorf("marshaling PoSt: %w", err)
}
_ = postOut // Insert into wdpost_proofs table
_, err = t.db.Exec(context.Background(),
`INSERT INTO wdpost_proofs (
sp_id,
deadline,
partition,
submit_at_epoch,
submit_by_epoch,
proof_message)
VALUES ($1, $2, $3, $4, $5, $6)`,
spID,
deadline.Index,
partIdx,
deadline.Open,
deadline.Close,
msgbuf.Bytes())
/*submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts) /*submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts)
if err != nil { if err != nil {
@ -175,6 +195,10 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return true, nil return true, nil
} }
func entToStr[T any](t T, i int) string {
return fmt.Sprint(t)
}
func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
log.Errorw("WDPOST CANACCEPT", "ids", ids) log.Errorw("WDPOST CANACCEPT", "ids", ids)
@ -207,7 +231,7 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng
deadline_index, deadline_index,
partition_index partition_index
from wdpost_partition_tasks from wdpost_partition_tasks
where task_id IN $1`, ids) where task_id IN (SELECT unnest(string_to_array($1, ','))::bigint)`, strings.Join(lo.Map(ids, entToStr[harmonytask.TaskID]), ","))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -226,8 +250,12 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng
} }
} }
// todo fix the block below
// workAdderMutex is held by taskTypeHandler.considerWork, which calls this CanAccept
// te.ResourcesAvailable will try to get that lock again, which will deadlock
// Discard those too big for our free RAM // Discard those too big for our free RAM
freeRAM := te.ResourcesAvailable().Ram /*freeRAM := te.ResourcesAvailable().Ram
tasks = lo.Filter(tasks, func(d wdTaskDef, _ int) bool { tasks = lo.Filter(tasks, func(d wdTaskDef, _ int) bool {
maddr, err := address.NewIDAddress(tasks[0].Sp_id) maddr, err := address.NewIDAddress(tasks[0].Sp_id)
if err != nil { if err != nil {
@ -248,7 +276,7 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng
} }
return res[spt].MaxMemory <= freeRAM return res[spt].MaxMemory <= freeRAM
}) })*/
if len(tasks) == 0 { if len(tasks) == 0 {
log.Infof("RAM too small for any WDPost task") log.Infof("RAM too small for any WDPost task")
return nil, nil return nil, nil
@ -259,7 +287,7 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng
var r int var r int
err := t.db.QueryRow(context.Background(), `SELECT COUNT(*) err := t.db.QueryRow(context.Background(), `SELECT COUNT(*)
FROM harmony_task_history FROM harmony_task_history
WHERE task_id = $1 AND success = false`, d.Task_id).Scan(&r) WHERE task_id = $1 AND result = false`, d.Task_id).Scan(&r)
if err != nil { if err != nil {
log.Errorf("WdPostTask.CanAccept() failed to queryRow: %v", err) log.Errorf("WdPostTask.CanAccept() failed to queryRow: %v", err)
} }
@ -284,7 +312,11 @@ func (t *WdPostTask) TypeDetails() harmonytask.TaskTypeDetails {
Follows: nil, Follows: nil,
Cost: resources.Resources{ Cost: resources.Resources{
Cpu: 1, Cpu: 1,
Gpu: 1,
// todo set to something for 32/64G sector sizes? Technically windowPoSt is happy on a CPU
// but it will use a GPU if available
Gpu: 0,
// RAM of smallest proof's max is listed here // RAM of smallest proof's max is listed here
Ram: lo.Reduce(lo.Keys(res), func(i uint64, k abi.RegisteredSealProof, _ int) uint64 { Ram: lo.Reduce(lo.Keys(res), func(i uint64, k abi.RegisteredSealProof, _ int) uint64 {
if res[k].MaxMemory < i { if res[k].MaxMemory < i {