lpwindow: Simple adder

This commit is contained in:
Łukasz Magiera 2023-10-25 20:58:16 +02:00
parent 9ec9360895
commit ad9c05e30b
8 changed files with 393 additions and 55 deletions

View File

@ -820,3 +820,21 @@ func AggregatePreCommitNetworkFee(nwVer network.Version, aggregateSize int, base
return big.Zero(), xerrors.Errorf("unsupported network version")
}
}
var PoStToSealMap map[abi.RegisteredPoStProof]abi.RegisteredSealProof
func init() {
PoStToSealMap = make(map[abi.RegisteredPoStProof]abi.RegisteredSealProof)
for sealProof, info := range abi.SealProofInfos {
PoStToSealMap[info.WinningPoStProof] = sealProof
PoStToSealMap[info.WindowPoStProof] = sealProof
}
}
func GetSealProofFromPoStProof(postProof abi.RegisteredPoStProof) (abi.RegisteredSealProof, error) {
sealProof, exists := PoStToSealMap[postProof]
if !exists {
return 0, xerrors.New("no corresponding RegisteredSealProof for the given RegisteredPoStProof")
}
return sealProof, nil
}

View File

@ -329,3 +329,20 @@ func AggregatePreCommitNetworkFee(nwVer network.Version, aggregateSize int, base
return big.Zero(), xerrors.Errorf("unsupported network version")
}
}
var PoStToSealMap map[abi.RegisteredPoStProof]abi.RegisteredSealProof
func init() {
PoStToSealMap = make(map[abi.RegisteredPoStProof]abi.RegisteredSealProof)
for sealProof, info := range abi.SealProofInfos {
PoStToSealMap[info.WinningPoStProof] = sealProof
PoStToSealMap[info.WindowPoStProof] = sealProof
}
}
func GetSealProofFromPoStProof(postProof abi.RegisteredPoStProof) (abi.RegisteredSealProof, error) {
sealProof, exists := PoStToSealMap[postProof]
if !exists {
return 0, xerrors.New("no corresponding RegisteredSealProof for the given RegisteredPoStProof")
}
return sealProof, nil
}

47
lib/promise/promise.go Normal file
View File

@ -0,0 +1,47 @@
package promise
import (
"context"
"sync"
)
type Promise[T any] struct {
val T
done chan struct{}
mu sync.Mutex
}
func (p *Promise[T]) Set(val T) {
p.mu.Lock()
defer p.mu.Unlock()
// Set value
p.val = val
// Initialize the done channel if it hasn't been initialized
if p.done == nil {
p.done = make(chan struct{})
}
// Signal that the value is set
close(p.done)
}
func (p *Promise[T]) Val(ctx context.Context) T {
p.mu.Lock()
// Initialize the done channel if it hasn't been initialized
if p.done == nil {
p.done = make(chan struct{})
}
p.mu.Unlock()
select {
case <-ctx.Done():
return *new(T)
case <-p.done:
p.mu.Lock()
val := p.val
p.mu.Unlock()
return val
}
}

View File

@ -0,0 +1,65 @@
package promise
import (
"context"
"sync"
"testing"
"time"
)
func TestPromiseSet(t *testing.T) {
p := &Promise[int]{}
p.Set(42)
if p.val != 42 {
t.Fatalf("expected 42, got %v", p.val)
}
}
func TestPromiseVal(t *testing.T) {
p := &Promise[int]{}
p.Set(42)
ctx := context.Background()
val := p.Val(ctx)
if val != 42 {
t.Fatalf("expected 42, got %v", val)
}
}
func TestPromiseValWaitsForSet(t *testing.T) {
p := &Promise[int]{}
var val int
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ctx := context.Background()
val = p.Val(ctx)
}()
time.Sleep(100 * time.Millisecond) // Give some time for the above goroutine to execute
p.Set(42)
wg.Wait()
if val != 42 {
t.Fatalf("expected 42, got %v", val)
}
}
func TestPromiseValContextCancel(t *testing.T) {
p := &Promise[int]{}
ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel the context
val := p.Val(ctx)
var zeroValue int
if val != zeroValue {
t.Fatalf("expected zero-value, got %v", val)
}
}

View File

@ -7,29 +7,21 @@ import (
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/node/config"
dtypes "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/provider/chainsched"
"github.com/filecoin-project/lotus/provider/lpwindow"
"github.com/filecoin-project/lotus/storage/ctladdr"
"github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
logging "github.com/ipfs/go-log/v2"
)
var log = logging.Logger("provider")
func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
api api.FullNode, sealer sealer.SectorManager, verif storiface.Verifier, j journal.Journal,
as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, max int) (*lpwindow.WdPostTask, error) {
/*fc2 := config.MinerFeeConfig{
MaxPreCommitGasFee: fc.MaxPreCommitGasFee,
MaxCommitGasFee: fc.MaxCommitGasFee,
MaxTerminateGasFee: fc.MaxTerminateGasFee,
MaxPublishDealsFee: fc.MaxPublishDealsFee,
}*/
ts := lpwindow.NewWdPostTask(db, nil, max)
panic("change handler thing")
/*go fps.RunV2(ctx, func(api wdpost.WdPoStCommands, actor address.Address) wdpost.ChangeHandlerIface {
return wdpost.NewChangeHandler2(api, actor, ts)
})*/
return ts, nil
chainSched := chainsched.New(api)
return lpwindow.NewWdPostTask(db, nil, chainSched, maddr)
}

View File

@ -0,0 +1,134 @@
package chainsched
import (
"context"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
logging "github.com/ipfs/go-log/v2"
"go.opencensus.io/trace"
"golang.org/x/xerrors"
"time"
)
var log = logging.Logger("chainsched")
type NodeAPI interface {
ChainHead(context.Context) (*types.TipSet, error)
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
}
type ProviderChainSched struct {
api NodeAPI
callbacks []UpdateFunc
started bool
}
func New(api NodeAPI) *ProviderChainSched {
return &ProviderChainSched{
api: api,
}
}
type UpdateFunc func(ctx context.Context, revert, apply *types.TipSet) error
func (s *ProviderChainSched) AddHandler(ch UpdateFunc) error {
if s.started {
return xerrors.Errorf("cannot add handler after start")
}
s.callbacks = append(s.callbacks, ch)
return nil
}
func (s *ProviderChainSched) Run(ctx context.Context) {
s.started = true
var (
notifs <-chan []*api.HeadChange
err error
gotCur bool
)
// not fine to panic after this point
for {
if notifs == nil {
notifs, err = s.api.ChainNotify(ctx)
if err != nil {
log.Errorf("ChainNotify error: %+v", err)
build.Clock.Sleep(10 * time.Second)
continue
}
gotCur = false
log.Info("restarting window post scheduler")
}
select {
case changes, ok := <-notifs:
if !ok {
log.Warn("window post scheduler notifs channel closed")
notifs = nil
continue
}
if !gotCur {
if len(changes) != 1 {
log.Errorf("expected first notif to have len = 1")
continue
}
chg := changes[0]
if chg.Type != store.HCCurrent {
log.Errorf("expected first notif to tell current ts")
continue
}
ctx, span := trace.StartSpan(ctx, "ProviderChainSched.headChange")
s.update(ctx, nil, chg.Val)
span.End()
gotCur = true
continue
}
ctx, span := trace.StartSpan(ctx, "ProviderChainSched.headChange")
var lowest, highest *types.TipSet = nil, nil
for _, change := range changes {
if change.Val == nil {
log.Errorf("change.Val was nil")
}
switch change.Type {
case store.HCRevert:
lowest = change.Val
case store.HCApply:
highest = change.Val
}
}
s.update(ctx, lowest, highest)
span.End()
case <-ctx.Done():
return
}
}
}
func (s *ProviderChainSched) update(ctx context.Context, revert, apply *types.TipSet) {
if apply == nil {
log.Error("no new tipset in window post ProviderChainSched.update")
return
}
for _, ch := range s.callbacks {
if err := ch(ctx, revert, apply); err != nil {
log.Errorf("handling head updates in provider chain sched: %+v", err)
}
}
}

View File

@ -1,30 +1,31 @@
package lpwindow
/*
import (
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/builtin/v9/miner"
"github.com/filecoin-project/go-state-types/dline"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/storage/wdpost"
)
type changeHandler struct {
api WDPoStAPI
actor address.Address
actors []dtypes.MinerAddress
proveHdlr *proveHandler
}
func newChangeHandler(api WDPoStAPI, actor address.Address) *changeHandler {
func newChangeHandler(api WDPoStAPI, actors []dtypes.MinerAddress) *changeHandler {
p := newProver(api)
return &changeHandler{api: api, actor: actor, proveHdlr: p}
return &changeHandler{api: api, actors: actors, proveHdlr: p}
}
func (ch *changeHandler) start() {
go ch.proveHdlr.run()
}
func (ch *changeHandler) update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error {
func (ch *changeHandler) Update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error {
// Get the current deadline period
di, err := ch.api.StateMinerProvingDeadline(ctx, ch.actor, advance.Key())
if err != nil {
@ -48,26 +49,20 @@ func (ch *changeHandler) update(ctx context.Context, revert *types.TipSet, advan
case <-ctx.Done():
}
select {
case ch.submitHdlr.hcs <- hc:
case <-ch.submitHdlr.shutdownCtx.Done():
case <-ctx.Done():
}
return nil
}
type proveHandler struct {
api WdPoStCommands
api WDPoStAPI
posts *postsCache
postResults chan *postResult
hcs chan *headChange
//postResults chan *postResult
hcs chan *headChange
current *currentPost
shutdownCtx context.Context
shutdown context.CancelFunc
shutdownFn context.CancelFunc
}
type headChange struct {
@ -87,16 +82,16 @@ func newProver(
) *proveHandler {
ctx, cancel := context.WithCancel(context.Background())
return &proveHandler{
api: api,
postResults: make(chan *postResult),
api: api,
//postResults: make(chan *postResult),
hcs: make(chan *headChange),
shutdownCtx: ctx,
shutdown: cancel,
shutdownFn: cancel,
}
}
func (p *proveHandler) run() {
// Abort proving on shutdown
// Abort proving on shutdownFn
defer func() {
if p.current != nil {
p.current.abort()
@ -179,3 +174,8 @@ func (p *proveHandler) processPostResult(res *postResult) {
// Add the proofs to the cache
p.posts.add(di, res.posts)
}
func (ch *changeHandler) Shutdown() {
ch.proveHdlr.shutdownFn()
}
*/

View File

@ -5,6 +5,10 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/builtin/v9/miner"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/lib/promise"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/provider/chainsched"
"github.com/filecoin-project/lotus/storage/wdpost"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
@ -38,13 +42,23 @@ type WDPoStAPI interface {
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
StateMinerPartitions(context.Context, address.Address, uint64, types.TipSetKey) ([]api.Partition, error)
}
type WdPostTask struct {
api WDPoStAPI
db *harmonydb.DB
max int
windowPoStTF promise.Promise[harmonytask.AddTaskFunc]
actors []dtypes.MinerAddress
}
type wdTaskIdentity struct {
Sp_id uint64
Proving_period_start abi.ChainEpoch
Deadline_index uint64
Partition_index uint64
}
func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
@ -185,7 +199,13 @@ func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEng
return false
}
return res[mi.WindowPoStProofType].MaxMemory <= freeRAM
spt, err := policy.GetSealProofFromPoStProof(mi.WindowPoStProofType)
if err != nil {
log.Errorf("WdPostTask.CanAccept() failed to GetSealProofFromPoStProof: %v", err)
return false
}
return res[spt].MaxMemory <= freeRAM
})
if len(tasks) == 0 {
log.Infof("RAM too small for any WDPost task")
@ -217,7 +237,7 @@ var res = storiface.ResourceTable[sealtasks.TTGenerateWindowPoSt]
func (t *WdPostTask) TypeDetails() harmonytask.TaskTypeDetails {
return harmonytask.TaskTypeDetails{
Name: "WdPost",
Max: t.max,
Max: 1, // TODO
MaxFailures: 3,
Follows: nil,
Cost: resources.Resources{
@ -235,27 +255,72 @@ func (t *WdPostTask) TypeDetails() harmonytask.TaskTypeDetails {
}
func (t *WdPostTask) Adder(taskFunc harmonytask.AddTaskFunc) {
// wait for any channels on t.tasks and call taskFunc on them
for taskDetails := range t.tasks {
//log.Errorf("WdPostTask.Adder() received taskDetails: %v", taskDetails)
taskFunc(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
return t.addTaskToDB(taskDetails.Ts, taskDetails.Deadline, tID, tx)
})
}
t.windowPoStTF.Set(taskFunc)
}
func NewWdPostTask(db *harmonydb.DB, api WDPoStAPI, max int) *WdPostTask {
return &WdPostTask{
func (t *WdPostTask) processHeadChange(ctx context.Context, revert, apply *types.TipSet) error {
for _, act := range t.actors {
maddr := address.Address(act)
aid, err := address.IDFromAddress(maddr)
if err != nil {
return xerrors.Errorf("getting miner ID: %w", err)
}
di, err := t.api.StateMinerProvingDeadline(ctx, maddr, apply.Key())
if err != nil {
return err
}
if !di.PeriodStarted() {
return nil // not proving anything yet
}
partitions, err := t.api.StateMinerPartitions(ctx, maddr, di.Index, apply.Key())
if err != nil {
return xerrors.Errorf("getting partitions: %w", err)
}
// TODO: Batch Partitions??
for pidx := range partitions {
tid := wdTaskIdentity{
Sp_id: aid,
Proving_period_start: di.PeriodStart,
Deadline_index: di.Index,
Partition_index: uint64(pidx),
}
tf := t.windowPoStTF.Val(ctx)
if tf == nil {
return xerrors.Errorf("no task func")
}
tf(func(id harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
return t.addTaskToDB(id, tid, tx)
})
}
}
return nil
}
func NewWdPostTask(db *harmonydb.DB, api WDPoStAPI, pcs *chainsched.ProviderChainSched, actors []dtypes.MinerAddress) (*WdPostTask, error) {
t := &WdPostTask{
db: db,
api: api,
max: max,
actors: actors,
}
if err := pcs.AddHandler(t.processHeadChange); err != nil {
return nil, err
}
return t, nil
}
func (t *WdPostTask) addTaskToDB(deadline *dline.Info, taskId harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
func (t *WdPostTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIdentity, tx *harmonydb.Tx) (bool, error) {
_, err := tx.Exec(
`INSERT INTO wdpost_tasks (
@ -267,10 +332,10 @@ func (t *WdPostTask) addTaskToDB(deadline *dline.Info, taskId harmonytask.TaskID
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`,
taskId,
spID,
deadline.PeriodStart,
deadline.Index,
partID,
taskIdent.Sp_id,
taskIdent.Proving_period_start,
taskIdent.Deadline_index,
taskIdent.Partition_index,
)
if err != nil {
return false, err