provider: Undo some hacks, move wdpost task to lpwindow pkg
This commit is contained in:
parent
c9e5720f68
commit
4fad50ad9a
@ -2,6 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/filecoin-project/lotus/provider"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
@ -221,7 +222,7 @@ var runCmd = &cli.Command{
|
|||||||
|
|
||||||
var verif storiface.Verifier = ffiwrapper.ProofVerifier
|
var verif storiface.Verifier = ffiwrapper.ProofVerifier
|
||||||
|
|
||||||
as, err := modules.LotusProvderAddressSelector(&cfg.Addresses)()
|
as, err := provider.AddressSelector(&cfg.Addresses)()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -282,7 +283,7 @@ var runCmd = &cli.Command{
|
|||||||
}
|
}
|
||||||
|
|
||||||
if cfg.Subsystems.EnableWindowPost {
|
if cfg.Subsystems.EnableWindowPost {
|
||||||
wdPostTask, err := modules.WindowPostSchedulerV2(ctx, cfg.Fees, cfg.Proving, full, sealer, verif, j,
|
wdPostTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, sealer, verif, j,
|
||||||
as, maddrs, db, cfg.Subsystems.WindowPostMaxTasks)
|
as, maddrs, db, cfg.Subsystems.WindowPostMaxTasks)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -57,22 +57,3 @@ func Datastore(disableLog bool) func(lc fx.Lifecycle, mctx helpers.MetricsCtx, r
|
|||||||
return bds, nil
|
return bds, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func DatastoreV2(ctx context.Context, disableLog bool, r repo.LockedRepo) (dtypes.MetadataDS, func() error, error) {
|
|
||||||
mds, err := r.Datastore(ctx, "/metadata")
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var logdir string
|
|
||||||
if !disableLog {
|
|
||||||
logdir = filepath.Join(r.Path(), "kvlog/metadata")
|
|
||||||
}
|
|
||||||
|
|
||||||
bds, err := backupds.Wrap(mds, logdir)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, xerrors.Errorf("opening backupds: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return bds, bds.CloseLog, nil
|
|
||||||
}
|
|
||||||
|
@ -11,8 +11,6 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
@ -211,46 +209,6 @@ func AddressSelector(addrConf *config.MinerAddressConfig) func() (*ctladdr.Addre
|
|||||||
return as, nil
|
return as, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func LotusProvderAddressSelector(addrConf *config.LotusProviderAddresses) func() (*ctladdr.AddressSelector, error) {
|
|
||||||
return func() (*ctladdr.AddressSelector, error) {
|
|
||||||
as := &ctladdr.AddressSelector{}
|
|
||||||
if addrConf == nil {
|
|
||||||
return as, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
as.DisableOwnerFallback = addrConf.DisableOwnerFallback
|
|
||||||
as.DisableWorkerFallback = addrConf.DisableWorkerFallback
|
|
||||||
|
|
||||||
for _, s := range addrConf.PreCommitControl {
|
|
||||||
addr, err := address.NewFromString(s)
|
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("parsing precommit control address: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
as.PreCommitControl = append(as.PreCommitControl, addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, s := range addrConf.CommitControl {
|
|
||||||
addr, err := address.NewFromString(s)
|
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("parsing commit control address: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
as.CommitControl = append(as.CommitControl, addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, s := range addrConf.TerminateControl {
|
|
||||||
addr, err := address.NewFromString(s)
|
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("parsing terminate control address: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
as.TerminateControl = append(as.TerminateControl, addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
return as, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func PreflightChecks(mctx helpers.MetricsCtx, lc fx.Lifecycle, api v1api.FullNode, maddr dtypes.MinerAddress) error {
|
func PreflightChecks(mctx helpers.MetricsCtx, lc fx.Lifecycle, api v1api.FullNode, maddr dtypes.MinerAddress) error {
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
@ -342,8 +300,8 @@ func SealingPipeline(fc config.MinerFeeConfig) func(params SealingPipelineParams
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func(params SealingPipelineParams, db *harmonydb.DB) (*wdpost.WindowPoStScheduler, error) {
|
func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func(params SealingPipelineParams) (*wdpost.WindowPoStScheduler, error) {
|
||||||
return func(params SealingPipelineParams, db *harmonydb.DB) (*wdpost.WindowPoStScheduler, error) {
|
return func(params SealingPipelineParams) (*wdpost.WindowPoStScheduler, error) {
|
||||||
var (
|
var (
|
||||||
mctx = params.MetricsCtx
|
mctx = params.MetricsCtx
|
||||||
lc = params.Lifecycle
|
lc = params.Lifecycle
|
||||||
@ -356,7 +314,7 @@ func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func
|
|||||||
|
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
fps, err := wdpost.NewWindowedPoStScheduler(api, fc, pc, as, sealer, verif, sealer, j, []dtypes.MinerAddress{params.Maddr}, db, nil)
|
fps, err := wdpost.NewWindowedPoStScheduler(api, fc, pc, as, sealer, verif, sealer, j, []dtypes.MinerAddress{params.Maddr})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -373,29 +331,6 @@ func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WindowPostSchedulerV2(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
|
|
||||||
api api.FullNode, sealer sealer.SectorManager, verif storiface.Verifier, j journal.Journal,
|
|
||||||
as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, max int) (*wdpost.WdPostTask, error) {
|
|
||||||
|
|
||||||
fc2 := config.MinerFeeConfig{
|
|
||||||
MaxPreCommitGasFee: fc.MaxPreCommitGasFee,
|
|
||||||
MaxCommitGasFee: fc.MaxCommitGasFee,
|
|
||||||
MaxTerminateGasFee: fc.MaxTerminateGasFee,
|
|
||||||
MaxPublishDealsFee: fc.MaxPublishDealsFee,
|
|
||||||
}
|
|
||||||
ts := wdpost.NewWdPostTask(db, nil, max)
|
|
||||||
fps, err := wdpost.NewWindowedPoStScheduler(api, fc2, pc, as, sealer, verif, sealer, j, maddr, db, ts)
|
|
||||||
|
|
||||||
ts.Scheduler = fps
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
go fps.RunV2(ctx, func(api wdpost.WdPoStCommands, actor address.Address) wdpost.ChangeHandlerIface {
|
|
||||||
return wdpost.NewChangeHandler2(api, actor, ts)
|
|
||||||
})
|
|
||||||
return ts, nil
|
|
||||||
|
|
||||||
}
|
|
||||||
func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.RetrievalProvider, j journal.Journal) {
|
func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.RetrievalProvider, j journal.Journal) {
|
||||||
m.OnReady(marketevents.ReadyLogger("retrieval provider"))
|
m.OnReady(marketevents.ReadyLogger("retrieval provider"))
|
||||||
lc.Append(fx.Hook{
|
lc.Append(fx.Hook{
|
||||||
|
49
provider/address.go
Normal file
49
provider/address.go
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
package provider
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/lotus/node/config"
|
||||||
|
"github.com/filecoin-project/lotus/storage/ctladdr"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func AddressSelector(addrConf *config.LotusProviderAddresses) func() (*ctladdr.AddressSelector, error) {
|
||||||
|
return func() (*ctladdr.AddressSelector, error) {
|
||||||
|
as := &ctladdr.AddressSelector{}
|
||||||
|
if addrConf == nil {
|
||||||
|
return as, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
as.DisableOwnerFallback = addrConf.DisableOwnerFallback
|
||||||
|
as.DisableWorkerFallback = addrConf.DisableWorkerFallback
|
||||||
|
|
||||||
|
for _, s := range addrConf.PreCommitControl {
|
||||||
|
addr, err := address.NewFromString(s)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("parsing precommit control address: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
as.PreCommitControl = append(as.PreCommitControl, addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, s := range addrConf.CommitControl {
|
||||||
|
addr, err := address.NewFromString(s)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("parsing commit control address: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
as.CommitControl = append(as.CommitControl, addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, s := range addrConf.TerminateControl {
|
||||||
|
addr, err := address.NewFromString(s)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("parsing terminate control address: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
as.TerminateControl = append(as.TerminateControl, addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return as, nil
|
||||||
|
}
|
||||||
|
}
|
35
provider/builder.go
Normal file
35
provider/builder.go
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
package provider
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/journal"
|
||||||
|
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||||
|
"github.com/filecoin-project/lotus/node/config"
|
||||||
|
dtypes "github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
"github.com/filecoin-project/lotus/provider/lpwindow"
|
||||||
|
"github.com/filecoin-project/lotus/storage/ctladdr"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealer"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||||
|
)
|
||||||
|
|
||||||
|
func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
|
||||||
|
api api.FullNode, sealer sealer.SectorManager, verif storiface.Verifier, j journal.Journal,
|
||||||
|
as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, max int) (*lpwindow.WdPostTask, error) {
|
||||||
|
|
||||||
|
/*fc2 := config.MinerFeeConfig{
|
||||||
|
MaxPreCommitGasFee: fc.MaxPreCommitGasFee,
|
||||||
|
MaxCommitGasFee: fc.MaxCommitGasFee,
|
||||||
|
MaxTerminateGasFee: fc.MaxTerminateGasFee,
|
||||||
|
MaxPublishDealsFee: fc.MaxPublishDealsFee,
|
||||||
|
}*/
|
||||||
|
ts := lpwindow.NewWdPostTask(db, nil, max)
|
||||||
|
|
||||||
|
panic("change handler thing")
|
||||||
|
|
||||||
|
/*go fps.RunV2(ctx, func(api wdpost.WdPoStCommands, actor address.Address) wdpost.ChangeHandlerIface {
|
||||||
|
return wdpost.NewChangeHandler2(api, actor, ts)
|
||||||
|
})*/
|
||||||
|
return ts, nil
|
||||||
|
|
||||||
|
}
|
@ -1,8 +1,8 @@
|
|||||||
package wdpost
|
package lpwindow
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -16,19 +16,26 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
cbg "github.com/whyrusleeping/cbor-gen"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var log = logging.Logger("lpwindow")
|
||||||
|
|
||||||
type WdPostTaskDetails struct {
|
type WdPostTaskDetails struct {
|
||||||
Ts *types.TipSet
|
Ts *types.TipSet
|
||||||
Deadline *dline.Info
|
Deadline *dline.Info
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type WDPoStAPI interface {
|
||||||
|
ChainHead(context.Context) (*types.TipSet, error)
|
||||||
|
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
|
||||||
|
}
|
||||||
|
|
||||||
type WdPostTask struct {
|
type WdPostTask struct {
|
||||||
tasks chan *WdPostTaskDetails
|
api WDPoStAPI
|
||||||
db *harmonydb.DB
|
|
||||||
Scheduler *WindowPoStScheduler
|
tasks chan *WdPostTaskDetails
|
||||||
max int
|
db *harmonydb.DB
|
||||||
|
max int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
|
func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
|
||||||
@ -80,7 +87,7 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
|
|||||||
log.Errorf("WdPostTask.Do() failed to get tipset key: %v", err)
|
log.Errorf("WdPostTask.Do() failed to get tipset key: %v", err)
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
head, err := t.Scheduler.api.ChainHead(context.Background())
|
head, err := t.api.ChainHead(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("WdPostTask.Do() failed to get chain head: %v", err)
|
log.Errorf("WdPostTask.Do() failed to get chain head: %v", err)
|
||||||
return false, err
|
return false, err
|
||||||
@ -91,54 +98,58 @@ func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ts, err := t.Scheduler.api.ChainGetTipSet(context.Background(), tsKey)
|
ts, err := t.api.ChainGetTipSet(context.Background(), tsKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("WdPostTask.Do() failed to get tipset: %v", err)
|
log.Errorf("WdPostTask.Do() failed to get tipset: %v", err)
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts)
|
_ = ts
|
||||||
if err != nil {
|
|
||||||
log.Errorf("WdPostTask.Do() failed to runPoStCycle: %v", err)
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Errorf("WdPostTask.Do() called with taskID: %v, submitWdPostParams: %v", taskID, submitWdPostParams)
|
panic("todo")
|
||||||
|
|
||||||
// Enter an entry for each wdpost message proof into the wdpost_proofs table
|
/*submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts)
|
||||||
for _, params := range submitWdPostParams {
|
if err != nil {
|
||||||
|
log.Errorf("WdPostTask.Do() failed to runPoStCycle: %v", err)
|
||||||
// Convert submitWdPostParams.Partitions to a byte array using CBOR
|
|
||||||
buf := new(bytes.Buffer)
|
|
||||||
scratch := make([]byte, 9)
|
|
||||||
if err := cbg.WriteMajorTypeHeaderBuf(scratch, buf, cbg.MajArray, uint64(len(params.Partitions))); err != nil {
|
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
for _, v := range params.Partitions {
|
|
||||||
if err := v.MarshalCBOR(buf); err != nil {
|
log.Errorf("WdPostTask.Do() called with taskID: %v, submitWdPostParams: %v", taskID, submitWdPostParams)
|
||||||
|
|
||||||
|
// Enter an entry for each wdpost message proof into the wdpost_proofs table
|
||||||
|
for _, params := range submitWdPostParams {
|
||||||
|
|
||||||
|
// Convert submitWdPostParams.Partitions to a byte array using CBOR
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
scratch := make([]byte, 9)
|
||||||
|
if err := cbg.WriteMajorTypeHeaderBuf(scratch, buf, cbg.MajArray, uint64(len(params.Partitions))); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
}
|
for _, v := range params.Partitions {
|
||||||
|
if err := v.MarshalCBOR(buf); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Insert into wdpost_proofs table
|
// Insert into wdpost_proofs table
|
||||||
_, err = t.db.Exec(context.Background(),
|
_, err = t.db.Exec(context.Background(),
|
||||||
`INSERT INTO wdpost_proofs (
|
`INSERT INTO wdpost_proofs (
|
||||||
deadline,
|
deadline,
|
||||||
partitions,
|
partitions,
|
||||||
proof_type,
|
proof_type,
|
||||||
proof_bytes)
|
proof_bytes)
|
||||||
VALUES ($1, $2, $3, $4)`,
|
VALUES ($1, $2, $3, $4)`,
|
||||||
params.Deadline,
|
params.Deadline,
|
||||||
buf.Bytes(),
|
buf.Bytes(),
|
||||||
params.Proofs[0].PoStProof,
|
params.Proofs[0].PoStProof,
|
||||||
params.Proofs[0].ProofBytes)
|
params.Proofs[0].ProofBytes)
|
||||||
}
|
}*/
|
||||||
|
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
|
func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
|
||||||
// GetEpoch
|
// GetEpoch
|
||||||
ts, err := t.Scheduler.api.ChainHead(context.Background())
|
ts, err := t.api.ChainHead(context.Background())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -237,12 +248,12 @@ func (t *WdPostTask) Adder(taskFunc harmonytask.AddTaskFunc) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWdPostTask(db *harmonydb.DB, scheduler *WindowPoStScheduler, max int) *WdPostTask {
|
func NewWdPostTask(db *harmonydb.DB, api WDPoStAPI, max int) *WdPostTask {
|
||||||
return &WdPostTask{
|
return &WdPostTask{
|
||||||
tasks: make(chan *WdPostTaskDetails, 2),
|
tasks: make(chan *WdPostTaskDetails, 2),
|
||||||
db: db,
|
db: db,
|
||||||
Scheduler: scheduler,
|
api: api,
|
||||||
max: max,
|
max: max,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1,4 +1,4 @@
|
|||||||
package wdpost
|
package lpwindow
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -24,9 +24,10 @@ func TestAddTask(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
wdPostTask := NewWdPostTask(db, nil, 0)
|
wdPostTask := NewWdPostTask(db, nil, 0)
|
||||||
taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300")
|
taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300")
|
||||||
|
_ = taskEngine
|
||||||
ts := types.TipSet{}
|
ts := types.TipSet{}
|
||||||
deadline := dline.Info{}
|
deadline := dline.Info{}
|
||||||
err := wdPostTask.AddTask(context.Background(), &ts, &deadline)
|
err = wdPostTask.AddTask(context.Background(), &ts, &deadline)
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
@ -31,15 +31,6 @@ type WdPoStCommands interface {
|
|||||||
recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info)
|
recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info)
|
||||||
}
|
}
|
||||||
|
|
||||||
type ChangeHandlerIface interface {
|
|
||||||
start()
|
|
||||||
update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error
|
|
||||||
shutdown()
|
|
||||||
currentTSDI() (*types.TipSet, *dline.Info)
|
|
||||||
}
|
|
||||||
|
|
||||||
var _ ChangeHandlerIface = &changeHandler{}
|
|
||||||
|
|
||||||
type changeHandler struct {
|
type changeHandler struct {
|
||||||
api WdPoStCommands
|
api WdPoStCommands
|
||||||
actor address.Address
|
actor address.Address
|
||||||
@ -171,8 +162,6 @@ type proveHandler struct {
|
|||||||
// Used for testing
|
// Used for testing
|
||||||
processedHeadChanges chan *headChange
|
processedHeadChanges chan *headChange
|
||||||
processedPostResults chan *postResult
|
processedPostResults chan *postResult
|
||||||
|
|
||||||
wdPostTask *WdPostTask
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newProver(
|
func newProver(
|
||||||
@ -222,8 +211,6 @@ func (p *proveHandler) run() {
|
|||||||
|
|
||||||
func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) {
|
func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) {
|
||||||
// If the post window has expired, abort the current proof
|
// If the post window has expired, abort the current proof
|
||||||
//log.Errorf("--------------------WINDOW POST CHANGE HANDLER PROCESS HC----------------------")
|
|
||||||
|
|
||||||
if p.current != nil && newTS.Height() >= p.current.di.Close {
|
if p.current != nil && newTS.Height() >= p.current.di.Close {
|
||||||
// Cancel the context on the current proof
|
// Cancel the context on the current proof
|
||||||
p.current.abort()
|
p.current.abort()
|
||||||
@ -247,11 +234,6 @@ func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSe
|
|||||||
_, complete = p.posts.get(di)
|
_, complete = p.posts.get(di)
|
||||||
}
|
}
|
||||||
|
|
||||||
//err := p.wdPostTask.AddTask(ctx, newTS, di)
|
|
||||||
//if err != nil {
|
|
||||||
// log.Errorf("AddTask failed: %v", err)
|
|
||||||
//}
|
|
||||||
|
|
||||||
// Check if the chain is above the Challenge height for the post window
|
// Check if the chain is above the Challenge height for the post window
|
||||||
if newTS.Height() < di.Challenge+ChallengeConfidence {
|
if newTS.Height() < di.Challenge+ChallengeConfidence {
|
||||||
return
|
return
|
||||||
|
@ -1,565 +0,0 @@
|
|||||||
package wdpost
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
|
||||||
"github.com/filecoin-project/go-state-types/dline"
|
|
||||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
//const (
|
|
||||||
// SubmitConfidence = 4
|
|
||||||
// ChallengeConfidence = 1
|
|
||||||
//)
|
|
||||||
//
|
|
||||||
//type CompleteGeneratePoSTCb func(posts []miner.SubmitWindowedPoStParams, err error)
|
|
||||||
//type CompleteSubmitPoSTCb func(err error)
|
|
||||||
//
|
|
||||||
//// wdPoStCommands is the subset of the WindowPoStScheduler + full node APIs used
|
|
||||||
//// by the changeHandler to execute actions and query state.
|
|
||||||
//type wdPoStCommands interface {
|
|
||||||
// StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
|
|
||||||
//
|
|
||||||
// startGeneratePoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, onComplete CompleteGeneratePoSTCb) context.CancelFunc
|
|
||||||
// startSubmitPoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, posts []miner.SubmitWindowedPoStParams, onComplete CompleteSubmitPoSTCb) context.CancelFunc
|
|
||||||
// onAbort(ts *types.TipSet, deadline *dline.Info)
|
|
||||||
// recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info)
|
|
||||||
//}
|
|
||||||
|
|
||||||
var _ ChangeHandlerIface = &changeHandler2{}
|
|
||||||
|
|
||||||
type changeHandler2 struct {
|
|
||||||
api WdPoStCommands
|
|
||||||
actor address.Address
|
|
||||||
proveHdlr *proveHandler2
|
|
||||||
//submitHdlr *submitHandler
|
|
||||||
|
|
||||||
db *harmonydb.DB
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *changeHandler2) currentTSDI() (*types.TipSet, *dline.Info) {
|
|
||||||
//TODO implement me
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewChangeHandler2(api WdPoStCommands, actor address.Address, task *WdPostTask) *changeHandler2 {
|
|
||||||
log.Errorf("NewChangeHandler2() called with api: %v, actor: %v", api, actor)
|
|
||||||
//posts := newPostsCache()
|
|
||||||
p := newProver2(api, task)
|
|
||||||
//s := newSubmitter(api, posts)
|
|
||||||
return &changeHandler2{api: api, actor: actor, proveHdlr: p}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *changeHandler2) start() {
|
|
||||||
go ch.proveHdlr.run()
|
|
||||||
//go ch.submitHdlr.run()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *changeHandler2) update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error {
|
|
||||||
// Get the current deadline period
|
|
||||||
di, err := ch.api.StateMinerProvingDeadline(ctx, ch.actor, advance.Key())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !di.PeriodStarted() {
|
|
||||||
return nil // not proving anything yet
|
|
||||||
}
|
|
||||||
|
|
||||||
hc := &headChange{
|
|
||||||
ctx: ctx,
|
|
||||||
revert: revert,
|
|
||||||
advance: advance,
|
|
||||||
di: di,
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case ch.proveHdlr.hcs <- hc:
|
|
||||||
case <-ch.proveHdlr.shutdownCtx.Done():
|
|
||||||
case <-ctx.Done():
|
|
||||||
}
|
|
||||||
|
|
||||||
//select {
|
|
||||||
//case ch.submitHdlr.hcs <- hc:
|
|
||||||
//case <-ch.submitHdlr.shutdownCtx.Done():
|
|
||||||
//case <-ctx.Done():
|
|
||||||
//}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ch *changeHandler2) shutdown() {
|
|
||||||
ch.proveHdlr.shutdown()
|
|
||||||
//ch.submitHdlr.shutdown()
|
|
||||||
}
|
|
||||||
|
|
||||||
//func (ch *changeHandler2) currentTSDI() (*types.TipSet, *dline.Info) {
|
|
||||||
// return ch.submitHdlr.currentTSDI()
|
|
||||||
//}
|
|
||||||
|
|
||||||
// postsCache keeps a cache of PoSTs for each proving window
|
|
||||||
//type postsCache struct {
|
|
||||||
// added chan *postInfo
|
|
||||||
// lk sync.RWMutex
|
|
||||||
// cache map[abi.ChainEpoch][]miner.SubmitWindowedPoStParams
|
|
||||||
//}
|
|
||||||
|
|
||||||
//func newPostsCache() *postsCache {
|
|
||||||
// return &postsCache{
|
|
||||||
// added: make(chan *postInfo, 16),
|
|
||||||
// cache: make(map[abi.ChainEpoch][]miner.SubmitWindowedPoStParams),
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
|
||||||
//func (c *postsCache) add(di *dline.Info, posts []miner.SubmitWindowedPoStParams) {
|
|
||||||
// c.lk.Lock()
|
|
||||||
// defer c.lk.Unlock()
|
|
||||||
//
|
|
||||||
// // TODO: clear cache entries older than chain finality
|
|
||||||
// c.cache[di.Open] = posts
|
|
||||||
//
|
|
||||||
// c.added <- &postInfo{
|
|
||||||
// di: di,
|
|
||||||
// posts: posts,
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (c *postsCache) get(di *dline.Info) ([]miner.SubmitWindowedPoStParams, bool) {
|
|
||||||
// c.lk.RLock()
|
|
||||||
// defer c.lk.RUnlock()
|
|
||||||
//
|
|
||||||
// posts, ok := c.cache[di.Open]
|
|
||||||
// return posts, ok
|
|
||||||
//}
|
|
||||||
|
|
||||||
//type headChange struct {
|
|
||||||
// ctx context.Context
|
|
||||||
// revert *types.TipSet
|
|
||||||
// advance *types.TipSet
|
|
||||||
// di *dline.Info
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//type currentPost struct {
|
|
||||||
// di *dline.Info
|
|
||||||
// abort context.CancelFunc
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//type postResult struct {
|
|
||||||
// ts *types.TipSet
|
|
||||||
// currPost *currentPost
|
|
||||||
// posts []miner.SubmitWindowedPoStParams
|
|
||||||
// err error
|
|
||||||
//}
|
|
||||||
|
|
||||||
// proveHandler generates proofs
|
|
||||||
type proveHandler2 struct {
|
|
||||||
api WdPoStCommands
|
|
||||||
//posts *postsCache
|
|
||||||
|
|
||||||
//postResults chan *postResult
|
|
||||||
hcs chan *headChange
|
|
||||||
|
|
||||||
current *currentPost
|
|
||||||
|
|
||||||
shutdownCtx context.Context
|
|
||||||
shutdown context.CancelFunc
|
|
||||||
|
|
||||||
// Used for testing
|
|
||||||
processedHeadChanges chan *headChange
|
|
||||||
processedPostResults chan *postResult
|
|
||||||
|
|
||||||
wdPostTask *WdPostTask
|
|
||||||
currDeadline *dline.Info
|
|
||||||
}
|
|
||||||
|
|
||||||
func newProver2(
|
|
||||||
api WdPoStCommands,
|
|
||||||
//posts *postsCache,
|
|
||||||
//db *harmonydb.DB,
|
|
||||||
wdPostTask *WdPostTask,
|
|
||||||
) *proveHandler2 {
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
return &proveHandler2{
|
|
||||||
api: api,
|
|
||||||
//posts: posts,
|
|
||||||
//postResults: make(chan *postResult),
|
|
||||||
hcs: make(chan *headChange),
|
|
||||||
shutdownCtx: ctx,
|
|
||||||
shutdown: cancel,
|
|
||||||
wdPostTask: wdPostTask,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *proveHandler2) run() {
|
|
||||||
// Abort proving on shutdown
|
|
||||||
defer func() {
|
|
||||||
if p.current != nil {
|
|
||||||
p.current.abort()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
for p.shutdownCtx.Err() == nil {
|
|
||||||
select {
|
|
||||||
case <-p.shutdownCtx.Done():
|
|
||||||
return
|
|
||||||
|
|
||||||
case hc := <-p.hcs:
|
|
||||||
// Head changed
|
|
||||||
p.processHeadChange(hc.ctx, hc.advance, hc.di)
|
|
||||||
if p.processedHeadChanges != nil {
|
|
||||||
p.processedHeadChanges <- hc
|
|
||||||
}
|
|
||||||
|
|
||||||
//case res := <-p.postResults:
|
|
||||||
// // Proof generation complete
|
|
||||||
// p.processPostResult(res)
|
|
||||||
// if p.processedPostResults != nil {
|
|
||||||
// p.processedPostResults <- res
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *proveHandler2) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) {
|
|
||||||
// If the post window has expired, abort the current proof
|
|
||||||
//if p.current != nil && newTS.Height() >= p.current.di.Close {
|
|
||||||
// log.Errorf("Aborted window post Proving (Deadline: %+v), newTs: %+v", p.current.di, newTS.Height())
|
|
||||||
// // Cancel the context on the current proof
|
|
||||||
// p.current.abort()
|
|
||||||
//
|
|
||||||
// // Clear out the reference to the proof so that we can immediately
|
|
||||||
// // start generating a new proof, without having to worry about state
|
|
||||||
// // getting clobbered when the abort completes
|
|
||||||
// p.current = nil
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//// Only generate one proof at a time
|
|
||||||
//log.Errorf("p.current: %+v", p.current)
|
|
||||||
//if p.current != nil {
|
|
||||||
// return
|
|
||||||
//}
|
|
||||||
|
|
||||||
// If the proof for the current post window has been generated, check the
|
|
||||||
// next post window
|
|
||||||
//_, complete := p.posts.get(di)
|
|
||||||
//for complete {
|
|
||||||
// di = nextDeadline(di)
|
|
||||||
// _, complete = p.posts.get(di)
|
|
||||||
//}
|
|
||||||
|
|
||||||
// Check if the chain is above the Challenge height for the post window
|
|
||||||
if newTS.Height() < di.Challenge+ChallengeConfidence {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//p.current = ¤tPost{di: di}
|
|
||||||
|
|
||||||
err := p.wdPostTask.AddTask(ctx, newTS, di)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("AddTask failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
//curr := p.current
|
|
||||||
//p.current.abort = p.api.startGeneratePoST(ctx, newTS, di, func(posts []miner.SubmitWindowedPoStParams, err error) {
|
|
||||||
// p.postResults <- &postResult{ts: newTS, currPost: curr, posts: posts, err: err}
|
|
||||||
//})
|
|
||||||
}
|
|
||||||
|
|
||||||
//func (p *proveHandler) processPostResult(res *postResult) {
|
|
||||||
// di := res.currPost.di
|
|
||||||
// if res.err != nil {
|
|
||||||
// // Proving failed so inform the API
|
|
||||||
// p.api.recordPoStFailure(res.err, res.ts, di)
|
|
||||||
// log.Warnf("Aborted window post Proving (Deadline: %+v)", di)
|
|
||||||
// p.api.onAbort(res.ts, di)
|
|
||||||
//
|
|
||||||
// // Check if the current post has already been aborted
|
|
||||||
// if p.current == res.currPost {
|
|
||||||
// // If the current post was not already aborted, setting it to nil
|
|
||||||
// // marks it as complete so that a new post can be started
|
|
||||||
// p.current = nil
|
|
||||||
// }
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Completed processing this proving window
|
|
||||||
// p.current = nil
|
|
||||||
//
|
|
||||||
// // Add the proofs to the cache
|
|
||||||
// p.posts.add(di, res.posts)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//type submitResult struct {
|
|
||||||
// pw *postWindow
|
|
||||||
// err error
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//type SubmitState string
|
|
||||||
//
|
|
||||||
//const (
|
|
||||||
// SubmitStateStart SubmitState = "SubmitStateStart"
|
|
||||||
// SubmitStateSubmitting SubmitState = "SubmitStateSubmitting"
|
|
||||||
// SubmitStateComplete SubmitState = "SubmitStateComplete"
|
|
||||||
//)
|
|
||||||
//
|
|
||||||
//type postWindow struct {
|
|
||||||
// ts *types.TipSet
|
|
||||||
// di *dline.Info
|
|
||||||
// submitState SubmitState
|
|
||||||
// abort context.CancelFunc
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//type postInfo struct {
|
|
||||||
// di *dline.Info
|
|
||||||
// posts []miner.SubmitWindowedPoStParams
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//// submitHandler submits proofs on-chain
|
|
||||||
//type submitHandler struct {
|
|
||||||
// api wdPoStCommands
|
|
||||||
// posts *postsCache
|
|
||||||
//
|
|
||||||
// submitResults chan *submitResult
|
|
||||||
// hcs chan *headChange
|
|
||||||
//
|
|
||||||
// postWindows map[abi.ChainEpoch]*postWindow
|
|
||||||
// getPostWindowReqs chan *getPWReq
|
|
||||||
//
|
|
||||||
// shutdownCtx context.Context
|
|
||||||
// shutdown context.CancelFunc
|
|
||||||
//
|
|
||||||
// currentCtx context.Context
|
|
||||||
// currentTS *types.TipSet
|
|
||||||
// currentDI *dline.Info
|
|
||||||
// getTSDIReq chan chan *tsdi
|
|
||||||
//
|
|
||||||
// // Used for testing
|
|
||||||
// processedHeadChanges chan *headChange
|
|
||||||
// processedSubmitResults chan *submitResult
|
|
||||||
// processedPostReady chan *postInfo
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func newSubmitter(
|
|
||||||
// api wdPoStCommands,
|
|
||||||
// posts *postsCache,
|
|
||||||
//) *submitHandler {
|
|
||||||
// ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
// return &submitHandler{
|
|
||||||
// api: api,
|
|
||||||
// posts: posts,
|
|
||||||
// submitResults: make(chan *submitResult),
|
|
||||||
// hcs: make(chan *headChange),
|
|
||||||
// postWindows: make(map[abi.ChainEpoch]*postWindow),
|
|
||||||
// getPostWindowReqs: make(chan *getPWReq),
|
|
||||||
// getTSDIReq: make(chan chan *tsdi),
|
|
||||||
// shutdownCtx: ctx,
|
|
||||||
// shutdown: cancel,
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (s *submitHandler) run() {
|
|
||||||
// // On shutdown, abort in-progress submits
|
|
||||||
// defer func() {
|
|
||||||
// for _, pw := range s.postWindows {
|
|
||||||
// if pw.abort != nil {
|
|
||||||
// pw.abort()
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }()
|
|
||||||
//
|
|
||||||
// for s.shutdownCtx.Err() == nil {
|
|
||||||
// select {
|
|
||||||
// case <-s.shutdownCtx.Done():
|
|
||||||
// return
|
|
||||||
//
|
|
||||||
// case hc := <-s.hcs:
|
|
||||||
// // Head change
|
|
||||||
// s.processHeadChange(hc.ctx, hc.revert, hc.advance, hc.di)
|
|
||||||
// if s.processedHeadChanges != nil {
|
|
||||||
// s.processedHeadChanges <- hc
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// case pi := <-s.posts.added:
|
|
||||||
// // Proof generated
|
|
||||||
// s.processPostReady(pi)
|
|
||||||
// if s.processedPostReady != nil {
|
|
||||||
// s.processedPostReady <- pi
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// case res := <-s.submitResults:
|
|
||||||
// // Submit complete
|
|
||||||
// s.processSubmitResult(res)
|
|
||||||
// if s.processedSubmitResults != nil {
|
|
||||||
// s.processedSubmitResults <- res
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// case pwreq := <-s.getPostWindowReqs:
|
|
||||||
// // used by getPostWindow() to sync with run loop
|
|
||||||
// pwreq.out <- s.postWindows[pwreq.di.Open]
|
|
||||||
//
|
|
||||||
// case out := <-s.getTSDIReq:
|
|
||||||
// // used by currentTSDI() to sync with run loop
|
|
||||||
// out <- &tsdi{ts: s.currentTS, di: s.currentDI}
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//// processHeadChange is called when the chain head changes
|
|
||||||
//func (s *submitHandler) processHeadChange(ctx context.Context, revert *types.TipSet, advance *types.TipSet, di *dline.Info) {
|
|
||||||
// s.currentCtx = ctx
|
|
||||||
// s.currentTS = advance
|
|
||||||
// s.currentDI = di
|
|
||||||
//
|
|
||||||
// // Start tracking the current post window if we're not already
|
|
||||||
// // TODO: clear post windows older than chain finality
|
|
||||||
// if _, ok := s.postWindows[di.Open]; !ok {
|
|
||||||
// s.postWindows[di.Open] = &postWindow{
|
|
||||||
// di: di,
|
|
||||||
// ts: advance,
|
|
||||||
// submitState: SubmitStateStart,
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Apply the change to all post windows
|
|
||||||
// for _, pw := range s.postWindows {
|
|
||||||
// s.processHeadChangeForPW(ctx, revert, advance, pw)
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (s *submitHandler) processHeadChangeForPW(ctx context.Context, revert *types.TipSet, advance *types.TipSet, pw *postWindow) {
|
|
||||||
// revertedToPrevDL := revert != nil && revert.Height() < pw.di.Open
|
|
||||||
// expired := advance.Height() >= pw.di.Close
|
|
||||||
//
|
|
||||||
// // If the chain was reverted back to the previous deadline, or if the post
|
|
||||||
// // window has expired, abort submit
|
|
||||||
// if pw.submitState == SubmitStateSubmitting && (revertedToPrevDL || expired) {
|
|
||||||
// // Replace the aborted postWindow with a new one so that we can
|
|
||||||
// // submit again at any time without the state getting clobbered
|
|
||||||
// // when the abort completes
|
|
||||||
// abort := pw.abort
|
|
||||||
// if abort != nil {
|
|
||||||
// pw = &postWindow{
|
|
||||||
// di: pw.di,
|
|
||||||
// ts: advance,
|
|
||||||
// submitState: SubmitStateStart,
|
|
||||||
// }
|
|
||||||
// s.postWindows[pw.di.Open] = pw
|
|
||||||
//
|
|
||||||
// // Abort the current submit
|
|
||||||
// abort()
|
|
||||||
// }
|
|
||||||
// } else if pw.submitState == SubmitStateComplete && revertedToPrevDL {
|
|
||||||
// // If submit for this deadline has completed, but the chain was
|
|
||||||
// // reverted back to the previous deadline, reset the submit state to the
|
|
||||||
// // starting state, so that it can be resubmitted
|
|
||||||
// pw.submitState = SubmitStateStart
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Submit the proof to chain if the proof has been generated and the chain
|
|
||||||
// // height is above confidence
|
|
||||||
// s.submitIfReady(ctx, advance, pw)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//// processPostReady is called when a proof generation completes
|
|
||||||
//func (s *submitHandler) processPostReady(pi *postInfo) {
|
|
||||||
// pw, ok := s.postWindows[pi.di.Open]
|
|
||||||
// if ok {
|
|
||||||
// s.submitIfReady(s.currentCtx, s.currentTS, pw)
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//// submitIfReady submits a proof if the chain is high enough and the proof
|
|
||||||
//// has been generated for this deadline
|
|
||||||
//func (s *submitHandler) submitIfReady(ctx context.Context, advance *types.TipSet, pw *postWindow) {
|
|
||||||
// // If the window has expired, there's nothing more to do.
|
|
||||||
// if advance.Height() >= pw.di.Close {
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Check if we're already submitting, or already completed submit
|
|
||||||
// if pw.submitState != SubmitStateStart {
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Check if we've reached the confidence height to submit
|
|
||||||
// if advance.Height() < pw.di.Open+SubmitConfidence {
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Check if the proofs have been generated for this deadline
|
|
||||||
// posts, ok := s.posts.get(pw.di)
|
|
||||||
// if !ok {
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // If there was nothing to prove, move straight to the complete state
|
|
||||||
// if len(posts) == 0 {
|
|
||||||
// pw.submitState = SubmitStateComplete
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Start submitting post
|
|
||||||
// pw.submitState = SubmitStateSubmitting
|
|
||||||
// pw.abort = s.api.startSubmitPoST(ctx, advance, pw.di, posts, func(err error) {
|
|
||||||
// s.submitResults <- &submitResult{pw: pw, err: err}
|
|
||||||
// })
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//// processSubmitResult is called with the response to a submit
|
|
||||||
//func (s *submitHandler) processSubmitResult(res *submitResult) {
|
|
||||||
// if res.err != nil {
|
|
||||||
// // Submit failed so inform the API and go back to the start state
|
|
||||||
// s.api.recordPoStFailure(res.err, res.pw.ts, res.pw.di)
|
|
||||||
// log.Warnf("Aborted window post Submitting (Deadline: %+v)", res.pw.di)
|
|
||||||
// s.api.onAbort(res.pw.ts, res.pw.di)
|
|
||||||
//
|
|
||||||
// res.pw.submitState = SubmitStateStart
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // Submit succeeded so move to complete state
|
|
||||||
// res.pw.submitState = SubmitStateComplete
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//type tsdi struct {
|
|
||||||
// ts *types.TipSet
|
|
||||||
// di *dline.Info
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (s *submitHandler) currentTSDI() (*types.TipSet, *dline.Info) {
|
|
||||||
// out := make(chan *tsdi)
|
|
||||||
// s.getTSDIReq <- out
|
|
||||||
// res := <-out
|
|
||||||
// return res.ts, res.di
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//type getPWReq struct {
|
|
||||||
// di *dline.Info
|
|
||||||
// out chan *postWindow
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func (s *submitHandler) getPostWindow(di *dline.Info) *postWindow {
|
|
||||||
// out := make(chan *postWindow)
|
|
||||||
// s.getPostWindowReqs <- &getPWReq{di: di, out: out}
|
|
||||||
// return <-out
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//// nextDeadline gets deadline info for the subsequent deadline
|
|
||||||
//func nextDeadline(currentDeadline *dline.Info) *dline.Info {
|
|
||||||
// periodStart := currentDeadline.PeriodStart
|
|
||||||
// newDeadline := currentDeadline.Index + 1
|
|
||||||
// if newDeadline == miner.WPoStPeriodDeadlines {
|
|
||||||
// newDeadline = 0
|
|
||||||
// periodStart = periodStart + miner.WPoStProvingPeriod
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return NewDeadlineInfo(periodStart, newDeadline, currentDeadline.CurrentEpoch)
|
|
||||||
//}
|
|
||||||
//
|
|
||||||
//func NewDeadlineInfo(periodStart abi.ChainEpoch, deadlineIdx uint64, currEpoch abi.ChainEpoch) *dline.Info {
|
|
||||||
// return dline.NewInfo(periodStart, deadlineIdx, currEpoch, miner.WPoStPeriodDeadlines, miner.WPoStProvingPeriod, miner.WPoStChallengeWindow, miner.WPoStChallengeLookback, miner.FaultDeclarationCutoff)
|
|
||||||
//}
|
|
@ -272,8 +272,6 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, manual bool, di
|
|||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
log.Errorf("runPoStCycle called with manual: %v, di: %v, ts: %v", manual, di, ts)
|
|
||||||
|
|
||||||
log := log.WithOptions(zap.Fields(zap.Time("cycle", start)))
|
log := log.WithOptions(zap.Fields(zap.Time("cycle", start)))
|
||||||
log.Infow("starting PoSt cycle", "manual", manual, "ts", ts, "deadline", di.Index)
|
log.Infow("starting PoSt cycle", "manual", manual, "ts", ts, "deadline", di.Index)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -4,8 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"go.opencensus.io/trace"
|
"go.opencensus.io/trace"
|
||||||
@ -80,7 +78,7 @@ type WindowPoStScheduler struct {
|
|||||||
maxPartitionsPerPostMessage int
|
maxPartitionsPerPostMessage int
|
||||||
maxPartitionsPerRecoveryMessage int
|
maxPartitionsPerRecoveryMessage int
|
||||||
singleRecoveringPartitionPerPostMessage bool
|
singleRecoveringPartitionPerPostMessage bool
|
||||||
ch ChangeHandlerIface
|
ch *changeHandler
|
||||||
|
|
||||||
actor address.Address
|
actor address.Address
|
||||||
|
|
||||||
@ -89,8 +87,6 @@ type WindowPoStScheduler struct {
|
|||||||
|
|
||||||
// failed abi.ChainEpoch // eps
|
// failed abi.ChainEpoch // eps
|
||||||
// failLk sync.Mutex
|
// failLk sync.Mutex
|
||||||
db *harmonydb.DB
|
|
||||||
wdPostTask *WdPostTask
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type ActorInfo struct {
|
type ActorInfo struct {
|
||||||
@ -107,9 +103,7 @@ func NewWindowedPoStScheduler(api NodeAPI,
|
|||||||
verif storiface.Verifier,
|
verif storiface.Verifier,
|
||||||
ft sealer.FaultTracker,
|
ft sealer.FaultTracker,
|
||||||
j journal.Journal,
|
j journal.Journal,
|
||||||
actors []dtypes.MinerAddress,
|
actors []dtypes.MinerAddress) (*WindowPoStScheduler, error) {
|
||||||
db *harmonydb.DB,
|
|
||||||
task *WdPostTask) (*WindowPoStScheduler, error) {
|
|
||||||
var actorInfos []ActorInfo
|
var actorInfos []ActorInfo
|
||||||
|
|
||||||
for _, actor := range actors {
|
for _, actor := range actors {
|
||||||
@ -140,27 +134,19 @@ func NewWindowedPoStScheduler(api NodeAPI,
|
|||||||
evtTypeWdPoStRecoveries: j.RegisterEventType("wdpost", "recoveries_processed"),
|
evtTypeWdPoStRecoveries: j.RegisterEventType("wdpost", "recoveries_processed"),
|
||||||
evtTypeWdPoStFaults: j.RegisterEventType("wdpost", "faults_processed"),
|
evtTypeWdPoStFaults: j.RegisterEventType("wdpost", "faults_processed"),
|
||||||
},
|
},
|
||||||
journal: j,
|
journal: j,
|
||||||
wdPostTask: task,
|
|
||||||
db: db,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WindowPoStScheduler) Run(ctx context.Context) {
|
func (s *WindowPoStScheduler) Run(ctx context.Context) {
|
||||||
// Initialize change handler.
|
|
||||||
|
|
||||||
s.RunV2(ctx, func(api WdPoStCommands, actor address.Address) ChangeHandlerIface {
|
|
||||||
return newChangeHandler(api, actor)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
func (s *WindowPoStScheduler) RunV2(ctx context.Context, f func(api WdPoStCommands, actor address.Address) ChangeHandlerIface) {
|
|
||||||
// callbacks is a union of the fullNodeFilteredAPI and ourselves.
|
// callbacks is a union of the fullNodeFilteredAPI and ourselves.
|
||||||
callbacks := struct {
|
callbacks := struct {
|
||||||
NodeAPI
|
NodeAPI
|
||||||
*WindowPoStScheduler
|
*WindowPoStScheduler
|
||||||
}{s.api, s}
|
}{s.api, s}
|
||||||
|
|
||||||
s.ch = f(callbacks, s.actor)
|
// Initialize change handler.
|
||||||
|
s.ch = newChangeHandler(callbacks, s.actor)
|
||||||
defer s.ch.shutdown()
|
defer s.ch.shutdown()
|
||||||
s.ch.start()
|
s.ch.start()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user