Merge branch 'feat/wdpost-adder2' into wdpost-can-accept
This commit is contained in:
commit
88c3dda736
@ -1028,7 +1028,7 @@ workflows:
|
||||
requires:
|
||||
- build
|
||||
suite: utest-unit-rest
|
||||
target: "./blockstore/... ./build/... ./chain/... ./conformance/... ./gateway/... ./journal/... ./lib/... ./markets/... ./paychmgr/... ./tools/..."
|
||||
target: "./blockstore/... ./build/... ./chain/... ./conformance/... ./gateway/... ./journal/... ./lib/... ./markets/... ./paychmgr/... ./provider/... ./tools/..."
|
||||
|
||||
- test:
|
||||
name: test-unit-storage
|
||||
|
@ -820,3 +820,21 @@ func AggregatePreCommitNetworkFee(nwVer network.Version, aggregateSize int, base
|
||||
return big.Zero(), xerrors.Errorf("unsupported network version")
|
||||
}
|
||||
}
|
||||
|
||||
var PoStToSealMap map[abi.RegisteredPoStProof]abi.RegisteredSealProof
|
||||
|
||||
func init() {
|
||||
PoStToSealMap = make(map[abi.RegisteredPoStProof]abi.RegisteredSealProof)
|
||||
for sealProof, info := range abi.SealProofInfos {
|
||||
PoStToSealMap[info.WinningPoStProof] = sealProof
|
||||
PoStToSealMap[info.WindowPoStProof] = sealProof
|
||||
}
|
||||
}
|
||||
|
||||
func GetSealProofFromPoStProof(postProof abi.RegisteredPoStProof) (abi.RegisteredSealProof, error) {
|
||||
sealProof, exists := PoStToSealMap[postProof]
|
||||
if !exists {
|
||||
return 0, xerrors.New("no corresponding RegisteredSealProof for the given RegisteredPoStProof")
|
||||
}
|
||||
return sealProof, nil
|
||||
}
|
||||
|
@ -329,3 +329,20 @@ func AggregatePreCommitNetworkFee(nwVer network.Version, aggregateSize int, base
|
||||
return big.Zero(), xerrors.Errorf("unsupported network version")
|
||||
}
|
||||
}
|
||||
|
||||
var PoStToSealMap map[abi.RegisteredPoStProof]abi.RegisteredSealProof
|
||||
func init() {
|
||||
PoStToSealMap = make(map[abi.RegisteredPoStProof]abi.RegisteredSealProof)
|
||||
for sealProof, info := range abi.SealProofInfos {
|
||||
PoStToSealMap[info.WinningPoStProof] = sealProof
|
||||
PoStToSealMap[info.WindowPoStProof] = sealProof
|
||||
}
|
||||
}
|
||||
|
||||
func GetSealProofFromPoStProof(postProof abi.RegisteredPoStProof) (abi.RegisteredSealProof, error) {
|
||||
sealProof, exists := PoStToSealMap[postProof]
|
||||
if !exists {
|
||||
return 0, xerrors.New("no corresponding RegisteredSealProof for the given RegisteredPoStProof")
|
||||
}
|
||||
return sealProof, nil
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ type TipSetKey struct {
|
||||
// self-describing, wrapped as a string.
|
||||
// These gymnastics make the a TipSetKey usable as a map key.
|
||||
// The empty key has value "".
|
||||
Value string
|
||||
value string
|
||||
}
|
||||
|
||||
// NewTipSetKey builds a new key from a slice of CIDs.
|
||||
@ -59,7 +59,7 @@ func TipSetKeyFromBytes(encoded []byte) (TipSetKey, error) {
|
||||
|
||||
// Cids returns a slice of the CIDs comprising this key.
|
||||
func (k TipSetKey) Cids() []cid.Cid {
|
||||
cids, err := decodeKey([]byte(k.Value))
|
||||
cids, err := decodeKey([]byte(k.value))
|
||||
if err != nil {
|
||||
panic("invalid tipset key: " + err.Error())
|
||||
}
|
||||
@ -83,7 +83,7 @@ func (k TipSetKey) String() string {
|
||||
|
||||
// Bytes() returns a binary representation of the key.
|
||||
func (k TipSetKey) Bytes() []byte {
|
||||
return []byte(k.Value)
|
||||
return []byte(k.value)
|
||||
}
|
||||
|
||||
func (k TipSetKey) MarshalJSON() ([]byte, error) {
|
||||
@ -95,7 +95,7 @@ func (k *TipSetKey) UnmarshalJSON(b []byte) error {
|
||||
if err := json.Unmarshal(b, &cids); err != nil {
|
||||
return err
|
||||
}
|
||||
k.Value = string(encodeKey(cids))
|
||||
k.value = string(encodeKey(cids))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -161,7 +161,7 @@ func (k *TipSetKey) UnmarshalCBOR(reader io.Reader) error {
|
||||
}
|
||||
|
||||
func (k TipSetKey) IsEmpty() bool {
|
||||
return len(k.Value) == 0
|
||||
return len(k.value) == 0
|
||||
}
|
||||
|
||||
func encodeKey(cids []cid.Cid) []byte {
|
||||
|
@ -37,6 +37,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/modules"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/provider"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
|
||||
@ -222,7 +223,7 @@ var runCmd = &cli.Command{
|
||||
|
||||
var verif storiface.Verifier = ffiwrapper.ProofVerifier
|
||||
|
||||
as, err := modules.LotusProvderAddressSelector(&cfg.Addresses)()
|
||||
as, err := provider.AddressSelector(&cfg.Addresses)()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -285,7 +286,7 @@ var runCmd = &cli.Command{
|
||||
}
|
||||
|
||||
if cfg.Subsystems.EnableWindowPost {
|
||||
wdPostTask, err := modules.WindowPostSchedulerV2(ctx, cfg.Fees, cfg.Proving, full, sealer, verif, j,
|
||||
wdPostTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, sealer, verif, j,
|
||||
as, maddrs, db, cfg.Subsystems.WindowPostMaxTasks)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1,25 +1,22 @@
|
||||
create table wdpost_tasks
|
||||
create table wdpost_partition_tasks
|
||||
(
|
||||
task_id int not null
|
||||
constraint wdpost_tasks_pkey
|
||||
task_id bigint not null
|
||||
constraint wdpost_partition_tasks_pk
|
||||
primary key,
|
||||
tskey bytea not null,
|
||||
current_epoch bigint not null,
|
||||
period_start bigint not null,
|
||||
index bigint not null
|
||||
constraint wdpost_tasks_index_key
|
||||
unique,
|
||||
open bigint not null,
|
||||
close bigint not null,
|
||||
challenge bigint not null,
|
||||
fault_cutoff bigint,
|
||||
wpost_period_deadlines bigint,
|
||||
wpost_proving_period bigint,
|
||||
wpost_challenge_window bigint,
|
||||
wpost_challenge_lookback bigint,
|
||||
fault_declaration_cutoff bigint
|
||||
sp_id bigint not null,
|
||||
proving_period_start bigint not null,
|
||||
deadline_index bigint not null,
|
||||
partition_index bigint not null,
|
||||
constraint wdpost_partition_tasks_identity_key
|
||||
unique (sp_id, proving_period_start, deadline_index, partition_index)
|
||||
);
|
||||
|
||||
comment on column wdpost_partition_tasks.task_id is 'harmonytask task ID';
|
||||
comment on column wdpost_partition_tasks.sp_id is 'storage provider ID';
|
||||
comment on column wdpost_partition_tasks.proving_period_start is 'proving period start';
|
||||
comment on column wdpost_partition_tasks.deadline_index is 'deadline index within the proving period';
|
||||
comment on column wdpost_partition_tasks.partition_index is 'partition index within the deadline';
|
||||
|
||||
create table wdpost_proofs
|
||||
(
|
||||
deadline bigint not null,
|
||||
|
47
lib/promise/promise.go
Normal file
47
lib/promise/promise.go
Normal file
@ -0,0 +1,47 @@
|
||||
package promise
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Promise[T any] struct {
|
||||
val T
|
||||
done chan struct{}
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (p *Promise[T]) Set(val T) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
// Set value
|
||||
p.val = val
|
||||
|
||||
// Initialize the done channel if it hasn't been initialized
|
||||
if p.done == nil {
|
||||
p.done = make(chan struct{})
|
||||
}
|
||||
|
||||
// Signal that the value is set
|
||||
close(p.done)
|
||||
}
|
||||
|
||||
func (p *Promise[T]) Val(ctx context.Context) T {
|
||||
p.mu.Lock()
|
||||
// Initialize the done channel if it hasn't been initialized
|
||||
if p.done == nil {
|
||||
p.done = make(chan struct{})
|
||||
}
|
||||
p.mu.Unlock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return *new(T)
|
||||
case <-p.done:
|
||||
p.mu.Lock()
|
||||
val := p.val
|
||||
p.mu.Unlock()
|
||||
return val
|
||||
}
|
||||
}
|
65
lib/promise/promise_test.go
Normal file
65
lib/promise/promise_test.go
Normal file
@ -0,0 +1,65 @@
|
||||
package promise
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestPromiseSet(t *testing.T) {
|
||||
p := &Promise[int]{}
|
||||
|
||||
p.Set(42)
|
||||
if p.val != 42 {
|
||||
t.Fatalf("expected 42, got %v", p.val)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPromiseVal(t *testing.T) {
|
||||
p := &Promise[int]{}
|
||||
|
||||
p.Set(42)
|
||||
|
||||
ctx := context.Background()
|
||||
val := p.Val(ctx)
|
||||
|
||||
if val != 42 {
|
||||
t.Fatalf("expected 42, got %v", val)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPromiseValWaitsForSet(t *testing.T) {
|
||||
p := &Promise[int]{}
|
||||
var val int
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
ctx := context.Background()
|
||||
val = p.Val(ctx)
|
||||
}()
|
||||
|
||||
time.Sleep(100 * time.Millisecond) // Give some time for the above goroutine to execute
|
||||
p.Set(42)
|
||||
wg.Wait()
|
||||
|
||||
if val != 42 {
|
||||
t.Fatalf("expected 42, got %v", val)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPromiseValContextCancel(t *testing.T) {
|
||||
p := &Promise[int]{}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel() // Cancel the context
|
||||
|
||||
val := p.Val(ctx)
|
||||
|
||||
var zeroValue int
|
||||
if val != zeroValue {
|
||||
t.Fatalf("expected zero-value, got %v", val)
|
||||
}
|
||||
}
|
@ -29,6 +29,14 @@ var Doc = map[string][]DocField{
|
||||
Comment: ``,
|
||||
},
|
||||
},
|
||||
"ApisConfig": {
|
||||
{
|
||||
Name: "FULLNODE_API_INFO",
|
||||
Type: "[]string",
|
||||
|
||||
Comment: `FULLNODE_API_INFO is the API endpoint for the Lotus daemon.`,
|
||||
},
|
||||
},
|
||||
"Backup": {
|
||||
{
|
||||
Name: "DisableMetadataLog",
|
||||
@ -590,6 +598,14 @@ starts. By default, the cache is rehydrated from previously cached entries store
|
||||
datastore if any is present.`,
|
||||
},
|
||||
},
|
||||
"JournalConfig": {
|
||||
{
|
||||
Name: "DisabledEvents",
|
||||
Type: "string",
|
||||
|
||||
Comment: `Events of the form: "system1:event1,system1:event2[,...]"`,
|
||||
},
|
||||
},
|
||||
"Libp2p": {
|
||||
{
|
||||
Name: "ListenAddresses",
|
||||
@ -700,6 +716,12 @@ sent automatically, if control addresses are configured.
|
||||
A control address that doesn't have enough funds will still be chosen
|
||||
over the worker address if this flag is set.`,
|
||||
},
|
||||
{
|
||||
Name: "MinerAddresses",
|
||||
Type: "[]string",
|
||||
|
||||
Comment: `MinerAddresses are the addresses of the miner actors to use for sending messages`,
|
||||
},
|
||||
},
|
||||
"LotusProviderConfig": {
|
||||
{
|
||||
@ -724,6 +746,30 @@ over the worker address if this flag is set.`,
|
||||
Name: "Proving",
|
||||
Type: "ProvingConfig",
|
||||
|
||||
Comment: ``,
|
||||
},
|
||||
{
|
||||
Name: "SealingParams",
|
||||
Type: "SealingConfig",
|
||||
|
||||
Comment: ``,
|
||||
},
|
||||
{
|
||||
Name: "SealerConfig",
|
||||
Type: "//",
|
||||
|
||||
Comment: ``,
|
||||
},
|
||||
{
|
||||
Name: "Journal",
|
||||
Type: "JournalConfig",
|
||||
|
||||
Comment: ``,
|
||||
},
|
||||
{
|
||||
Name: "Apis",
|
||||
Type: "ApisConfig",
|
||||
|
||||
Comment: ``,
|
||||
},
|
||||
},
|
||||
@ -922,6 +968,12 @@ This is useful to allow workers to bypass the lotus miner to access sector infor
|
||||
|
||||
Comment: ``,
|
||||
},
|
||||
{
|
||||
Name: "WindowPostMaxTasks",
|
||||
Type: "int",
|
||||
|
||||
Comment: ``,
|
||||
},
|
||||
{
|
||||
Name: "EnableWinningPost",
|
||||
Type: "bool",
|
||||
|
@ -57,22 +57,3 @@ func Datastore(disableLog bool) func(lc fx.Lifecycle, mctx helpers.MetricsCtx, r
|
||||
return bds, nil
|
||||
}
|
||||
}
|
||||
|
||||
func DatastoreV2(ctx context.Context, disableLog bool, r repo.LockedRepo) (dtypes.MetadataDS, func() error, error) {
|
||||
mds, err := r.Datastore(ctx, "/metadata")
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
var logdir string
|
||||
if !disableLog {
|
||||
logdir = filepath.Join(r.Path(), "kvlog/metadata")
|
||||
}
|
||||
|
||||
bds, err := backupds.Wrap(mds, logdir)
|
||||
if err != nil {
|
||||
return nil, nil, xerrors.Errorf("opening backupds: %w", err)
|
||||
}
|
||||
|
||||
return bds, bds.CloseLog, nil
|
||||
}
|
||||
|
@ -11,8 +11,6 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
@ -211,46 +209,6 @@ func AddressSelector(addrConf *config.MinerAddressConfig) func() (*ctladdr.Addre
|
||||
return as, nil
|
||||
}
|
||||
}
|
||||
func LotusProvderAddressSelector(addrConf *config.LotusProviderAddresses) func() (*ctladdr.AddressSelector, error) {
|
||||
return func() (*ctladdr.AddressSelector, error) {
|
||||
as := &ctladdr.AddressSelector{}
|
||||
if addrConf == nil {
|
||||
return as, nil
|
||||
}
|
||||
|
||||
as.DisableOwnerFallback = addrConf.DisableOwnerFallback
|
||||
as.DisableWorkerFallback = addrConf.DisableWorkerFallback
|
||||
|
||||
for _, s := range addrConf.PreCommitControl {
|
||||
addr, err := address.NewFromString(s)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("parsing precommit control address: %w", err)
|
||||
}
|
||||
|
||||
as.PreCommitControl = append(as.PreCommitControl, addr)
|
||||
}
|
||||
|
||||
for _, s := range addrConf.CommitControl {
|
||||
addr, err := address.NewFromString(s)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("parsing commit control address: %w", err)
|
||||
}
|
||||
|
||||
as.CommitControl = append(as.CommitControl, addr)
|
||||
}
|
||||
|
||||
for _, s := range addrConf.TerminateControl {
|
||||
addr, err := address.NewFromString(s)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("parsing terminate control address: %w", err)
|
||||
}
|
||||
|
||||
as.TerminateControl = append(as.TerminateControl, addr)
|
||||
}
|
||||
|
||||
return as, nil
|
||||
}
|
||||
}
|
||||
|
||||
func PreflightChecks(mctx helpers.MetricsCtx, lc fx.Lifecycle, api v1api.FullNode, maddr dtypes.MinerAddress) error {
|
||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||
@ -342,8 +300,8 @@ func SealingPipeline(fc config.MinerFeeConfig) func(params SealingPipelineParams
|
||||
}
|
||||
}
|
||||
|
||||
func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func(params SealingPipelineParams, db *harmonydb.DB) (*wdpost.WindowPoStScheduler, error) {
|
||||
return func(params SealingPipelineParams, db *harmonydb.DB) (*wdpost.WindowPoStScheduler, error) {
|
||||
func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func(params SealingPipelineParams) (*wdpost.WindowPoStScheduler, error) {
|
||||
return func(params SealingPipelineParams) (*wdpost.WindowPoStScheduler, error) {
|
||||
var (
|
||||
mctx = params.MetricsCtx
|
||||
lc = params.Lifecycle
|
||||
@ -356,7 +314,7 @@ func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func
|
||||
|
||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||
|
||||
fps, err := wdpost.NewWindowedPoStScheduler(api, fc, pc, as, sealer, verif, sealer, j, []dtypes.MinerAddress{params.Maddr}, db, nil)
|
||||
fps, err := wdpost.NewWindowedPoStScheduler(api, fc, pc, as, sealer, verif, sealer, j, []dtypes.MinerAddress{params.Maddr})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -373,29 +331,6 @@ func WindowPostScheduler(fc config.MinerFeeConfig, pc config.ProvingConfig) func
|
||||
}
|
||||
}
|
||||
|
||||
func WindowPostSchedulerV2(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
|
||||
api api.FullNode, sealer sealer.SectorManager, verif storiface.Verifier, j journal.Journal,
|
||||
as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, max int) (*wdpost.WdPostTask, error) {
|
||||
|
||||
fc2 := config.MinerFeeConfig{
|
||||
MaxPreCommitGasFee: fc.MaxPreCommitGasFee,
|
||||
MaxCommitGasFee: fc.MaxCommitGasFee,
|
||||
MaxTerminateGasFee: fc.MaxTerminateGasFee,
|
||||
MaxPublishDealsFee: fc.MaxPublishDealsFee,
|
||||
}
|
||||
ts := wdpost.NewWdPostTask(db, nil, max)
|
||||
fps, err := wdpost.NewWindowedPoStScheduler(api, fc2, pc, as, sealer, verif, sealer, j, maddr, db, ts)
|
||||
|
||||
ts.Scheduler = fps
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go fps.RunV2(ctx, func(api wdpost.WdPoStCommands, actor address.Address) wdpost.ChangeHandlerIface {
|
||||
return wdpost.NewChangeHandler2(api, actor, ts)
|
||||
})
|
||||
return ts, nil
|
||||
|
||||
}
|
||||
func HandleRetrieval(host host.Host, lc fx.Lifecycle, m retrievalmarket.RetrievalProvider, j journal.Journal) {
|
||||
m.OnReady(marketevents.ReadyLogger("retrieval provider"))
|
||||
lc.Append(fx.Hook{
|
||||
|
51
provider/address.go
Normal file
51
provider/address.go
Normal file
@ -0,0 +1,51 @@
|
||||
package provider
|
||||
|
||||
import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/storage/ctladdr"
|
||||
)
|
||||
|
||||
func AddressSelector(addrConf *config.LotusProviderAddresses) func() (*ctladdr.AddressSelector, error) {
|
||||
return func() (*ctladdr.AddressSelector, error) {
|
||||
as := &ctladdr.AddressSelector{}
|
||||
if addrConf == nil {
|
||||
return as, nil
|
||||
}
|
||||
|
||||
as.DisableOwnerFallback = addrConf.DisableOwnerFallback
|
||||
as.DisableWorkerFallback = addrConf.DisableWorkerFallback
|
||||
|
||||
for _, s := range addrConf.PreCommitControl {
|
||||
addr, err := address.NewFromString(s)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("parsing precommit control address: %w", err)
|
||||
}
|
||||
|
||||
as.PreCommitControl = append(as.PreCommitControl, addr)
|
||||
}
|
||||
|
||||
for _, s := range addrConf.CommitControl {
|
||||
addr, err := address.NewFromString(s)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("parsing commit control address: %w", err)
|
||||
}
|
||||
|
||||
as.CommitControl = append(as.CommitControl, addr)
|
||||
}
|
||||
|
||||
for _, s := range addrConf.TerminateControl {
|
||||
addr, err := address.NewFromString(s)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("parsing terminate control address: %w", err)
|
||||
}
|
||||
|
||||
as.TerminateControl = append(as.TerminateControl, addr)
|
||||
}
|
||||
|
||||
return as, nil
|
||||
}
|
||||
}
|
29
provider/builder.go
Normal file
29
provider/builder.go
Normal file
@ -0,0 +1,29 @@
|
||||
package provider
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/journal"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
dtypes "github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/provider/chainsched"
|
||||
"github.com/filecoin-project/lotus/provider/lpwindow"
|
||||
"github.com/filecoin-project/lotus/storage/ctladdr"
|
||||
"github.com/filecoin-project/lotus/storage/sealer"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
var log = logging.Logger("provider")
|
||||
|
||||
func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
|
||||
api api.FullNode, sealer sealer.SectorManager, verif storiface.Verifier, j journal.Journal,
|
||||
as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, max int) (*lpwindow.WdPostTask, error) {
|
||||
|
||||
chainSched := chainsched.New(api)
|
||||
|
||||
return lpwindow.NewWdPostTask(db, nil, chainSched, maddr)
|
||||
}
|
136
provider/chainsched/chain_sched.go
Normal file
136
provider/chainsched/chain_sched.go
Normal file
@ -0,0 +1,136 @@
|
||||
package chainsched
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"go.opencensus.io/trace"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
var log = logging.Logger("chainsched")
|
||||
|
||||
type NodeAPI interface {
|
||||
ChainHead(context.Context) (*types.TipSet, error)
|
||||
ChainNotify(context.Context) (<-chan []*api.HeadChange, error)
|
||||
}
|
||||
|
||||
type ProviderChainSched struct {
|
||||
api NodeAPI
|
||||
|
||||
callbacks []UpdateFunc
|
||||
started bool
|
||||
}
|
||||
|
||||
func New(api NodeAPI) *ProviderChainSched {
|
||||
return &ProviderChainSched{
|
||||
api: api,
|
||||
}
|
||||
}
|
||||
|
||||
type UpdateFunc func(ctx context.Context, revert, apply *types.TipSet) error
|
||||
|
||||
func (s *ProviderChainSched) AddHandler(ch UpdateFunc) error {
|
||||
if s.started {
|
||||
return xerrors.Errorf("cannot add handler after start")
|
||||
}
|
||||
|
||||
s.callbacks = append(s.callbacks, ch)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ProviderChainSched) Run(ctx context.Context) {
|
||||
s.started = true
|
||||
|
||||
var (
|
||||
notifs <-chan []*api.HeadChange
|
||||
err error
|
||||
gotCur bool
|
||||
)
|
||||
|
||||
// not fine to panic after this point
|
||||
for {
|
||||
if notifs == nil {
|
||||
notifs, err = s.api.ChainNotify(ctx)
|
||||
if err != nil {
|
||||
log.Errorf("ChainNotify error: %+v", err)
|
||||
|
||||
build.Clock.Sleep(10 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
gotCur = false
|
||||
log.Info("restarting window post scheduler")
|
||||
}
|
||||
|
||||
select {
|
||||
case changes, ok := <-notifs:
|
||||
if !ok {
|
||||
log.Warn("window post scheduler notifs channel closed")
|
||||
notifs = nil
|
||||
continue
|
||||
}
|
||||
|
||||
if !gotCur {
|
||||
if len(changes) != 1 {
|
||||
log.Errorf("expected first notif to have len = 1")
|
||||
continue
|
||||
}
|
||||
chg := changes[0]
|
||||
if chg.Type != store.HCCurrent {
|
||||
log.Errorf("expected first notif to tell current ts")
|
||||
continue
|
||||
}
|
||||
|
||||
ctx, span := trace.StartSpan(ctx, "ProviderChainSched.headChange")
|
||||
|
||||
s.update(ctx, nil, chg.Val)
|
||||
|
||||
span.End()
|
||||
gotCur = true
|
||||
continue
|
||||
}
|
||||
|
||||
ctx, span := trace.StartSpan(ctx, "ProviderChainSched.headChange")
|
||||
|
||||
var lowest, highest *types.TipSet = nil, nil
|
||||
|
||||
for _, change := range changes {
|
||||
if change.Val == nil {
|
||||
log.Errorf("change.Val was nil")
|
||||
}
|
||||
switch change.Type {
|
||||
case store.HCRevert:
|
||||
lowest = change.Val
|
||||
case store.HCApply:
|
||||
highest = change.Val
|
||||
}
|
||||
}
|
||||
|
||||
s.update(ctx, lowest, highest)
|
||||
|
||||
span.End()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ProviderChainSched) update(ctx context.Context, revert, apply *types.TipSet) {
|
||||
if apply == nil {
|
||||
log.Error("no new tipset in window post ProviderChainSched.update")
|
||||
return
|
||||
}
|
||||
|
||||
for _, ch := range s.callbacks {
|
||||
if err := ch(ctx, revert, apply); err != nil {
|
||||
log.Errorf("handling head updates in provider chain sched: %+v", err)
|
||||
}
|
||||
}
|
||||
}
|
349
provider/lpwindow/task.go
Normal file
349
provider/lpwindow/task.go
Normal file
@ -0,0 +1,349 @@
|
||||
package lpwindow
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/samber/lo"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/builtin/v9/miner"
|
||||
"github.com/filecoin-project/go-state-types/dline"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/resources"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/taskhelp"
|
||||
"github.com/filecoin-project/lotus/lib/promise"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/provider/chainsched"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
"github.com/filecoin-project/lotus/storage/wdpost"
|
||||
)
|
||||
|
||||
var log = logging.Logger("lpwindow")
|
||||
|
||||
var EpochsPerDeadline = miner.WPoStProvingPeriod / abi.ChainEpoch(miner.WPoStPeriodDeadlines)
|
||||
|
||||
type WdPostTaskDetails struct {
|
||||
Ts *types.TipSet
|
||||
Deadline *dline.Info
|
||||
}
|
||||
|
||||
type WDPoStAPI interface {
|
||||
ChainHead(context.Context) (*types.TipSet, error)
|
||||
ChainGetTipSet(context.Context, types.TipSetKey) (*types.TipSet, error)
|
||||
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
|
||||
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
|
||||
ChainGetTipSetAfterHeight(context.Context, abi.ChainEpoch, types.TipSetKey) (*types.TipSet, error)
|
||||
StateMinerPartitions(context.Context, address.Address, uint64, types.TipSetKey) ([]api.Partition, error)
|
||||
}
|
||||
|
||||
type WdPostTask struct {
|
||||
api WDPoStAPI
|
||||
db *harmonydb.DB
|
||||
|
||||
windowPoStTF promise.Promise[harmonytask.AddTaskFunc]
|
||||
|
||||
actors []dtypes.MinerAddress
|
||||
}
|
||||
|
||||
type wdTaskIdentity struct {
|
||||
Sp_id uint64
|
||||
Proving_period_start abi.ChainEpoch
|
||||
Deadline_index uint64
|
||||
Partition_index uint64
|
||||
}
|
||||
|
||||
func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
log.Errorf("WdPostTask.Do() called with taskID: %v", taskID)
|
||||
|
||||
var deadline dline.Info
|
||||
|
||||
var spID, pps, dlIdx, partIdx uint64
|
||||
|
||||
err = t.db.QueryRow(context.Background(),
|
||||
`Select sp_id, proving_period_start, deadline_index, partition_index
|
||||
from wdpost_tasks
|
||||
where task_id = $1`, taskID).Scan(
|
||||
&spID, &pps, &dlIdx, &partIdx,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("WdPostTask.Do() failed to queryRow: %v", err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
head, err := t.api.ChainHead(context.Background())
|
||||
if err != nil {
|
||||
log.Errorf("WdPostTask.Do() failed to get chain head: %v", err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
wdpost.NewDeadlineInfo(abi.ChainEpoch(pps), dlIdx, head.Height())
|
||||
|
||||
if deadline.PeriodElapsed() {
|
||||
log.Errorf("WdPost removed stale task: %v %v", taskID, deadline)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
panic("todo")
|
||||
|
||||
/*submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts)
|
||||
if err != nil {
|
||||
log.Errorf("WdPostTask.Do() failed to runPoStCycle: %v", err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
log.Errorf("WdPostTask.Do() called with taskID: %v, submitWdPostParams: %v", taskID, submitWdPostParams)
|
||||
|
||||
// Enter an entry for each wdpost message proof into the wdpost_proofs table
|
||||
for _, params := range submitWdPostParams {
|
||||
|
||||
// Convert submitWdPostParams.Partitions to a byte array using CBOR
|
||||
buf := new(bytes.Buffer)
|
||||
scratch := make([]byte, 9)
|
||||
if err := cbg.WriteMajorTypeHeaderBuf(scratch, buf, cbg.MajArray, uint64(len(params.Partitions))); err != nil {
|
||||
return false, err
|
||||
}
|
||||
for _, v := range params.Partitions {
|
||||
if err := v.MarshalCBOR(buf); err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
// Insert into wdpost_proofs table
|
||||
_, err = t.db.Exec(context.Background(),
|
||||
`INSERT INTO wdpost_proofs (
|
||||
deadline,
|
||||
partitions,
|
||||
proof_type,
|
||||
proof_bytes)
|
||||
VALUES ($1, $2, $3, $4)`,
|
||||
params.Deadline,
|
||||
buf.Bytes(),
|
||||
params.Proofs[0].PoStProof,
|
||||
params.Proofs[0].ProofBytes)
|
||||
}*/
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
|
||||
// GetEpoch
|
||||
ts, err := t.api.ChainHead(context.Background())
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// GetData for tasks
|
||||
type wdTaskDef struct {
|
||||
Task_id harmonytask.TaskID
|
||||
Sp_id uint64
|
||||
Proving_period_start abi.ChainEpoch
|
||||
Deadline_index uint64
|
||||
Partition_index uint64
|
||||
|
||||
dlInfo *dline.Info `pgx:"-"`
|
||||
openTs *types.TipSet
|
||||
}
|
||||
var tasks []wdTaskDef
|
||||
|
||||
err = t.db.Select(context.Background(), &tasks,
|
||||
`Select
|
||||
task_id,
|
||||
sp_id,
|
||||
proving_period_start,
|
||||
deadline_index,
|
||||
partition_index
|
||||
from wdpost_tasks
|
||||
where task_id IN $1`, ids)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Accept those past deadline, then delete them in Do().
|
||||
for i := range tasks {
|
||||
tasks[i].dlInfo = wdpost.NewDeadlineInfo(tasks[i].Proving_period_start, tasks[i].Deadline_index, ts.Height())
|
||||
|
||||
if tasks[i].dlInfo.PeriodElapsed() {
|
||||
return &tasks[i].Task_id, nil
|
||||
}
|
||||
|
||||
tasks[i].openTs, err = t.api.ChainGetTipSetAfterHeight(context.Background(), tasks[i].dlInfo.Open, ts.Key())
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting task open tipset: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Discard those too big for our free RAM
|
||||
freeRAM := te.ResourcesAvailable().Ram
|
||||
tasks = lo.Filter(tasks, func(d wdTaskDef, _ int) bool {
|
||||
maddr, err := address.NewIDAddress(tasks[0].Sp_id)
|
||||
if err != nil {
|
||||
log.Errorf("WdPostTask.CanAccept() failed to NewIDAddress: %v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
mi, err := t.api.StateMinerInfo(context.Background(), maddr, ts.Key())
|
||||
if err != nil {
|
||||
log.Errorf("WdPostTask.CanAccept() failed to StateMinerInfo: %v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
spt, err := policy.GetSealProofFromPoStProof(mi.WindowPoStProofType)
|
||||
if err != nil {
|
||||
log.Errorf("WdPostTask.CanAccept() failed to GetSealProofFromPoStProof: %v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
return res[spt].MaxMemory <= freeRAM
|
||||
})
|
||||
if len(tasks) == 0 {
|
||||
log.Infof("RAM too small for any WDPost task")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Ignore those with too many failures unless they are the only ones left.
|
||||
tasks, _ = taskhelp.SliceIfFound(tasks, func(d wdTaskDef) bool {
|
||||
var r int
|
||||
err := t.db.QueryRow(context.Background(), `SELECT COUNT(*)
|
||||
FROM harmony_task_history
|
||||
WHERE task_id = $1 AND success = false`, d.Task_id).Scan(&r)
|
||||
if err != nil {
|
||||
log.Errorf("WdPostTask.CanAccept() failed to queryRow: %v", err)
|
||||
}
|
||||
return r < 2
|
||||
})
|
||||
|
||||
// Select the one closest to the deadline
|
||||
sort.Slice(tasks, func(i, j int) bool {
|
||||
return tasks[i].dlInfo.Open < tasks[j].dlInfo.Open
|
||||
})
|
||||
|
||||
return &tasks[0].Task_id, nil
|
||||
}
|
||||
|
||||
var res = storiface.ResourceTable[sealtasks.TTGenerateWindowPoSt]
|
||||
|
||||
func (t *WdPostTask) TypeDetails() harmonytask.TaskTypeDetails {
|
||||
return harmonytask.TaskTypeDetails{
|
||||
Name: "WdPost",
|
||||
Max: 1, // TODO
|
||||
MaxFailures: 3,
|
||||
Follows: nil,
|
||||
Cost: resources.Resources{
|
||||
Cpu: 1,
|
||||
Gpu: 1,
|
||||
// RAM of smallest proof's max is listed here
|
||||
Ram: lo.Reduce(lo.Keys(res), func(i uint64, k abi.RegisteredSealProof, _ int) uint64 {
|
||||
if res[k].MaxMemory < i {
|
||||
return res[k].MaxMemory
|
||||
}
|
||||
return i
|
||||
}, 1<<63),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (t *WdPostTask) Adder(taskFunc harmonytask.AddTaskFunc) {
|
||||
t.windowPoStTF.Set(taskFunc)
|
||||
}
|
||||
|
||||
func (t *WdPostTask) processHeadChange(ctx context.Context, revert, apply *types.TipSet) error {
|
||||
for _, act := range t.actors {
|
||||
maddr := address.Address(act)
|
||||
|
||||
aid, err := address.IDFromAddress(maddr)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting miner ID: %w", err)
|
||||
}
|
||||
|
||||
di, err := t.api.StateMinerProvingDeadline(ctx, maddr, apply.Key())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !di.PeriodStarted() {
|
||||
return nil // not proving anything yet
|
||||
}
|
||||
|
||||
partitions, err := t.api.StateMinerPartitions(ctx, maddr, di.Index, apply.Key())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting partitions: %w", err)
|
||||
}
|
||||
|
||||
// TODO: Batch Partitions??
|
||||
|
||||
for pidx := range partitions {
|
||||
tid := wdTaskIdentity{
|
||||
Sp_id: aid,
|
||||
Proving_period_start: di.PeriodStart,
|
||||
Deadline_index: di.Index,
|
||||
Partition_index: uint64(pidx),
|
||||
}
|
||||
|
||||
tf := t.windowPoStTF.Val(ctx)
|
||||
if tf == nil {
|
||||
return xerrors.Errorf("no task func")
|
||||
}
|
||||
|
||||
tf(func(id harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
|
||||
return t.addTaskToDB(id, tid, tx)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewWdPostTask(db *harmonydb.DB, api WDPoStAPI, pcs *chainsched.ProviderChainSched, actors []dtypes.MinerAddress) (*WdPostTask, error) {
|
||||
t := &WdPostTask{
|
||||
db: db,
|
||||
api: api,
|
||||
|
||||
actors: actors,
|
||||
}
|
||||
|
||||
if err := pcs.AddHandler(t.processHeadChange); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func (t *WdPostTask) addTaskToDB(taskId harmonytask.TaskID, taskIdent wdTaskIdentity, tx *harmonydb.Tx) (bool, error) {
|
||||
|
||||
_, err := tx.Exec(
|
||||
`INSERT INTO wdpost_tasks (
|
||||
task_id,
|
||||
sp_id,
|
||||
proving_period_start,
|
||||
deadline_index,
|
||||
partition_index,
|
||||
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`,
|
||||
taskId,
|
||||
taskIdent.Sp_id,
|
||||
taskIdent.Proving_period_start,
|
||||
taskIdent.Deadline_index,
|
||||
taskIdent.Partition_index,
|
||||
)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
var _ harmonytask.TaskInterface = &WdPostTask{}
|
@ -1,15 +1,16 @@
|
||||
package wdpost
|
||||
package lpwindow
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/dline"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// test to create WDPostTask, invoke AddTask and check if the task is added to the DB
|
||||
@ -24,9 +25,9 @@ func TestAddTask(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
wdPostTask := NewWdPostTask(db, nil, 0)
|
||||
taskEngine, err := harmonytask.New(db, []harmonytask.TaskInterface{wdPostTask}, "localhost:12300")
|
||||
_ = taskEngine
|
||||
ts := types.TipSet{}
|
||||
deadline := dline.Info{}
|
||||
err := wdPostTask.AddTask(context.Background(), &ts, &deadline)
|
||||
|
||||
require.NoError(t, err)
|
||||
}
|
@ -31,15 +31,6 @@ type WdPoStCommands interface {
|
||||
recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info)
|
||||
}
|
||||
|
||||
type ChangeHandlerIface interface {
|
||||
start()
|
||||
update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error
|
||||
shutdown()
|
||||
currentTSDI() (*types.TipSet, *dline.Info)
|
||||
}
|
||||
|
||||
var _ ChangeHandlerIface = &changeHandler{}
|
||||
|
||||
type changeHandler struct {
|
||||
api WdPoStCommands
|
||||
actor address.Address
|
||||
@ -171,8 +162,6 @@ type proveHandler struct {
|
||||
// Used for testing
|
||||
processedHeadChanges chan *headChange
|
||||
processedPostResults chan *postResult
|
||||
|
||||
wdPostTask *WdPostTask
|
||||
}
|
||||
|
||||
func newProver(
|
||||
@ -222,8 +211,6 @@ func (p *proveHandler) run() {
|
||||
|
||||
func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) {
|
||||
// If the post window has expired, abort the current proof
|
||||
//log.Errorf("--------------------WINDOW POST CHANGE HANDLER PROCESS HC----------------------")
|
||||
|
||||
if p.current != nil && newTS.Height() >= p.current.di.Close {
|
||||
// Cancel the context on the current proof
|
||||
p.current.abort()
|
||||
@ -243,15 +230,10 @@ func (p *proveHandler) processHeadChange(ctx context.Context, newTS *types.TipSe
|
||||
// next post window
|
||||
_, complete := p.posts.get(di)
|
||||
for complete {
|
||||
di = nextDeadline(di)
|
||||
di = NextDeadline(di)
|
||||
_, complete = p.posts.get(di)
|
||||
}
|
||||
|
||||
//err := p.wdPostTask.AddTask(ctx, newTS, di)
|
||||
//if err != nil {
|
||||
// log.Errorf("AddTask failed: %v", err)
|
||||
//}
|
||||
|
||||
// Check if the chain is above the Challenge height for the post window
|
||||
if newTS.Height() < di.Challenge+ChallengeConfidence {
|
||||
return
|
||||
@ -543,8 +525,8 @@ func (s *submitHandler) getPostWindow(di *dline.Info) *postWindow {
|
||||
return <-out
|
||||
}
|
||||
|
||||
// nextDeadline gets deadline info for the subsequent deadline
|
||||
func nextDeadline(currentDeadline *dline.Info) *dline.Info {
|
||||
// NextDeadline gets deadline info for the subsequent deadline
|
||||
func NextDeadline(currentDeadline *dline.Info) *dline.Info {
|
||||
periodStart := currentDeadline.PeriodStart
|
||||
newDeadline := currentDeadline.Index + 1
|
||||
if newDeadline == miner.WPoStPeriodDeadlines {
|
||||
|
@ -1,565 +0,0 @@
|
||||
package wdpost
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/dline"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
//const (
|
||||
// SubmitConfidence = 4
|
||||
// ChallengeConfidence = 1
|
||||
//)
|
||||
//
|
||||
//type CompleteGeneratePoSTCb func(posts []miner.SubmitWindowedPoStParams, err error)
|
||||
//type CompleteSubmitPoSTCb func(err error)
|
||||
//
|
||||
//// wdPoStCommands is the subset of the WindowPoStScheduler + full node APIs used
|
||||
//// by the changeHandler to execute actions and query state.
|
||||
//type wdPoStCommands interface {
|
||||
// StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error)
|
||||
//
|
||||
// startGeneratePoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, onComplete CompleteGeneratePoSTCb) context.CancelFunc
|
||||
// startSubmitPoST(ctx context.Context, ts *types.TipSet, deadline *dline.Info, posts []miner.SubmitWindowedPoStParams, onComplete CompleteSubmitPoSTCb) context.CancelFunc
|
||||
// onAbort(ts *types.TipSet, deadline *dline.Info)
|
||||
// recordPoStFailure(err error, ts *types.TipSet, deadline *dline.Info)
|
||||
//}
|
||||
|
||||
var _ ChangeHandlerIface = &changeHandler2{}
|
||||
|
||||
type changeHandler2 struct {
|
||||
api WdPoStCommands
|
||||
actor address.Address
|
||||
proveHdlr *proveHandler2
|
||||
//submitHdlr *submitHandler
|
||||
|
||||
db *harmonydb.DB
|
||||
}
|
||||
|
||||
func (ch *changeHandler2) currentTSDI() (*types.TipSet, *dline.Info) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func NewChangeHandler2(api WdPoStCommands, actor address.Address, task *WdPostTask) *changeHandler2 {
|
||||
log.Errorf("NewChangeHandler2() called with api: %v, actor: %v", api, actor)
|
||||
//posts := newPostsCache()
|
||||
p := newProver2(api, task)
|
||||
//s := newSubmitter(api, posts)
|
||||
return &changeHandler2{api: api, actor: actor, proveHdlr: p}
|
||||
}
|
||||
|
||||
func (ch *changeHandler2) start() {
|
||||
go ch.proveHdlr.run()
|
||||
//go ch.submitHdlr.run()
|
||||
}
|
||||
|
||||
func (ch *changeHandler2) update(ctx context.Context, revert *types.TipSet, advance *types.TipSet) error {
|
||||
// Get the current deadline period
|
||||
di, err := ch.api.StateMinerProvingDeadline(ctx, ch.actor, advance.Key())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !di.PeriodStarted() {
|
||||
return nil // not proving anything yet
|
||||
}
|
||||
|
||||
hc := &headChange{
|
||||
ctx: ctx,
|
||||
revert: revert,
|
||||
advance: advance,
|
||||
di: di,
|
||||
}
|
||||
|
||||
select {
|
||||
case ch.proveHdlr.hcs <- hc:
|
||||
case <-ch.proveHdlr.shutdownCtx.Done():
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
//select {
|
||||
//case ch.submitHdlr.hcs <- hc:
|
||||
//case <-ch.submitHdlr.shutdownCtx.Done():
|
||||
//case <-ctx.Done():
|
||||
//}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ch *changeHandler2) shutdown() {
|
||||
ch.proveHdlr.shutdown()
|
||||
//ch.submitHdlr.shutdown()
|
||||
}
|
||||
|
||||
//func (ch *changeHandler2) currentTSDI() (*types.TipSet, *dline.Info) {
|
||||
// return ch.submitHdlr.currentTSDI()
|
||||
//}
|
||||
|
||||
// postsCache keeps a cache of PoSTs for each proving window
|
||||
//type postsCache struct {
|
||||
// added chan *postInfo
|
||||
// lk sync.RWMutex
|
||||
// cache map[abi.ChainEpoch][]miner.SubmitWindowedPoStParams
|
||||
//}
|
||||
|
||||
//func newPostsCache() *postsCache {
|
||||
// return &postsCache{
|
||||
// added: make(chan *postInfo, 16),
|
||||
// cache: make(map[abi.ChainEpoch][]miner.SubmitWindowedPoStParams),
|
||||
// }
|
||||
//}
|
||||
|
||||
//func (c *postsCache) add(di *dline.Info, posts []miner.SubmitWindowedPoStParams) {
|
||||
// c.lk.Lock()
|
||||
// defer c.lk.Unlock()
|
||||
//
|
||||
// // TODO: clear cache entries older than chain finality
|
||||
// c.cache[di.Open] = posts
|
||||
//
|
||||
// c.added <- &postInfo{
|
||||
// di: di,
|
||||
// posts: posts,
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//func (c *postsCache) get(di *dline.Info) ([]miner.SubmitWindowedPoStParams, bool) {
|
||||
// c.lk.RLock()
|
||||
// defer c.lk.RUnlock()
|
||||
//
|
||||
// posts, ok := c.cache[di.Open]
|
||||
// return posts, ok
|
||||
//}
|
||||
|
||||
//type headChange struct {
|
||||
// ctx context.Context
|
||||
// revert *types.TipSet
|
||||
// advance *types.TipSet
|
||||
// di *dline.Info
|
||||
//}
|
||||
//
|
||||
//type currentPost struct {
|
||||
// di *dline.Info
|
||||
// abort context.CancelFunc
|
||||
//}
|
||||
//
|
||||
//type postResult struct {
|
||||
// ts *types.TipSet
|
||||
// currPost *currentPost
|
||||
// posts []miner.SubmitWindowedPoStParams
|
||||
// err error
|
||||
//}
|
||||
|
||||
// proveHandler generates proofs
|
||||
type proveHandler2 struct {
|
||||
api WdPoStCommands
|
||||
//posts *postsCache
|
||||
|
||||
//postResults chan *postResult
|
||||
hcs chan *headChange
|
||||
|
||||
current *currentPost
|
||||
|
||||
shutdownCtx context.Context
|
||||
shutdown context.CancelFunc
|
||||
|
||||
// Used for testing
|
||||
processedHeadChanges chan *headChange
|
||||
processedPostResults chan *postResult
|
||||
|
||||
wdPostTask *WdPostTask
|
||||
currDeadline *dline.Info
|
||||
}
|
||||
|
||||
func newProver2(
|
||||
api WdPoStCommands,
|
||||
//posts *postsCache,
|
||||
//db *harmonydb.DB,
|
||||
wdPostTask *WdPostTask,
|
||||
) *proveHandler2 {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &proveHandler2{
|
||||
api: api,
|
||||
//posts: posts,
|
||||
//postResults: make(chan *postResult),
|
||||
hcs: make(chan *headChange),
|
||||
shutdownCtx: ctx,
|
||||
shutdown: cancel,
|
||||
wdPostTask: wdPostTask,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *proveHandler2) run() {
|
||||
// Abort proving on shutdown
|
||||
defer func() {
|
||||
if p.current != nil {
|
||||
p.current.abort()
|
||||
}
|
||||
}()
|
||||
|
||||
for p.shutdownCtx.Err() == nil {
|
||||
select {
|
||||
case <-p.shutdownCtx.Done():
|
||||
return
|
||||
|
||||
case hc := <-p.hcs:
|
||||
// Head changed
|
||||
p.processHeadChange(hc.ctx, hc.advance, hc.di)
|
||||
if p.processedHeadChanges != nil {
|
||||
p.processedHeadChanges <- hc
|
||||
}
|
||||
|
||||
//case res := <-p.postResults:
|
||||
// // Proof generation complete
|
||||
// p.processPostResult(res)
|
||||
// if p.processedPostResults != nil {
|
||||
// p.processedPostResults <- res
|
||||
// }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *proveHandler2) processHeadChange(ctx context.Context, newTS *types.TipSet, di *dline.Info) {
|
||||
// If the post window has expired, abort the current proof
|
||||
//if p.current != nil && newTS.Height() >= p.current.di.Close {
|
||||
// log.Errorf("Aborted window post Proving (Deadline: %+v), newTs: %+v", p.current.di, newTS.Height())
|
||||
// // Cancel the context on the current proof
|
||||
// p.current.abort()
|
||||
//
|
||||
// // Clear out the reference to the proof so that we can immediately
|
||||
// // start generating a new proof, without having to worry about state
|
||||
// // getting clobbered when the abort completes
|
||||
// p.current = nil
|
||||
//}
|
||||
//
|
||||
//// Only generate one proof at a time
|
||||
//log.Errorf("p.current: %+v", p.current)
|
||||
//if p.current != nil {
|
||||
// return
|
||||
//}
|
||||
|
||||
// If the proof for the current post window has been generated, check the
|
||||
// next post window
|
||||
//_, complete := p.posts.get(di)
|
||||
//for complete {
|
||||
// di = nextDeadline(di)
|
||||
// _, complete = p.posts.get(di)
|
||||
//}
|
||||
|
||||
// Check if the chain is above the Challenge height for the post window
|
||||
if newTS.Height() < di.Challenge+ChallengeConfidence {
|
||||
return
|
||||
}
|
||||
|
||||
//p.current = ¤tPost{di: di}
|
||||
|
||||
err := p.wdPostTask.AddTask(ctx, newTS, di)
|
||||
if err != nil {
|
||||
log.Errorf("AddTask failed: %v", err)
|
||||
}
|
||||
|
||||
//
|
||||
//curr := p.current
|
||||
//p.current.abort = p.api.startGeneratePoST(ctx, newTS, di, func(posts []miner.SubmitWindowedPoStParams, err error) {
|
||||
// p.postResults <- &postResult{ts: newTS, currPost: curr, posts: posts, err: err}
|
||||
//})
|
||||
}
|
||||
|
||||
//func (p *proveHandler) processPostResult(res *postResult) {
|
||||
// di := res.currPost.di
|
||||
// if res.err != nil {
|
||||
// // Proving failed so inform the API
|
||||
// p.api.recordPoStFailure(res.err, res.ts, di)
|
||||
// log.Warnf("Aborted window post Proving (Deadline: %+v)", di)
|
||||
// p.api.onAbort(res.ts, di)
|
||||
//
|
||||
// // Check if the current post has already been aborted
|
||||
// if p.current == res.currPost {
|
||||
// // If the current post was not already aborted, setting it to nil
|
||||
// // marks it as complete so that a new post can be started
|
||||
// p.current = nil
|
||||
// }
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// // Completed processing this proving window
|
||||
// p.current = nil
|
||||
//
|
||||
// // Add the proofs to the cache
|
||||
// p.posts.add(di, res.posts)
|
||||
//}
|
||||
//
|
||||
//type submitResult struct {
|
||||
// pw *postWindow
|
||||
// err error
|
||||
//}
|
||||
//
|
||||
//type SubmitState string
|
||||
//
|
||||
//const (
|
||||
// SubmitStateStart SubmitState = "SubmitStateStart"
|
||||
// SubmitStateSubmitting SubmitState = "SubmitStateSubmitting"
|
||||
// SubmitStateComplete SubmitState = "SubmitStateComplete"
|
||||
//)
|
||||
//
|
||||
//type postWindow struct {
|
||||
// ts *types.TipSet
|
||||
// di *dline.Info
|
||||
// submitState SubmitState
|
||||
// abort context.CancelFunc
|
||||
//}
|
||||
//
|
||||
//type postInfo struct {
|
||||
// di *dline.Info
|
||||
// posts []miner.SubmitWindowedPoStParams
|
||||
//}
|
||||
//
|
||||
//// submitHandler submits proofs on-chain
|
||||
//type submitHandler struct {
|
||||
// api wdPoStCommands
|
||||
// posts *postsCache
|
||||
//
|
||||
// submitResults chan *submitResult
|
||||
// hcs chan *headChange
|
||||
//
|
||||
// postWindows map[abi.ChainEpoch]*postWindow
|
||||
// getPostWindowReqs chan *getPWReq
|
||||
//
|
||||
// shutdownCtx context.Context
|
||||
// shutdown context.CancelFunc
|
||||
//
|
||||
// currentCtx context.Context
|
||||
// currentTS *types.TipSet
|
||||
// currentDI *dline.Info
|
||||
// getTSDIReq chan chan *tsdi
|
||||
//
|
||||
// // Used for testing
|
||||
// processedHeadChanges chan *headChange
|
||||
// processedSubmitResults chan *submitResult
|
||||
// processedPostReady chan *postInfo
|
||||
//}
|
||||
//
|
||||
//func newSubmitter(
|
||||
// api wdPoStCommands,
|
||||
// posts *postsCache,
|
||||
//) *submitHandler {
|
||||
// ctx, cancel := context.WithCancel(context.Background())
|
||||
// return &submitHandler{
|
||||
// api: api,
|
||||
// posts: posts,
|
||||
// submitResults: make(chan *submitResult),
|
||||
// hcs: make(chan *headChange),
|
||||
// postWindows: make(map[abi.ChainEpoch]*postWindow),
|
||||
// getPostWindowReqs: make(chan *getPWReq),
|
||||
// getTSDIReq: make(chan chan *tsdi),
|
||||
// shutdownCtx: ctx,
|
||||
// shutdown: cancel,
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//func (s *submitHandler) run() {
|
||||
// // On shutdown, abort in-progress submits
|
||||
// defer func() {
|
||||
// for _, pw := range s.postWindows {
|
||||
// if pw.abort != nil {
|
||||
// pw.abort()
|
||||
// }
|
||||
// }
|
||||
// }()
|
||||
//
|
||||
// for s.shutdownCtx.Err() == nil {
|
||||
// select {
|
||||
// case <-s.shutdownCtx.Done():
|
||||
// return
|
||||
//
|
||||
// case hc := <-s.hcs:
|
||||
// // Head change
|
||||
// s.processHeadChange(hc.ctx, hc.revert, hc.advance, hc.di)
|
||||
// if s.processedHeadChanges != nil {
|
||||
// s.processedHeadChanges <- hc
|
||||
// }
|
||||
//
|
||||
// case pi := <-s.posts.added:
|
||||
// // Proof generated
|
||||
// s.processPostReady(pi)
|
||||
// if s.processedPostReady != nil {
|
||||
// s.processedPostReady <- pi
|
||||
// }
|
||||
//
|
||||
// case res := <-s.submitResults:
|
||||
// // Submit complete
|
||||
// s.processSubmitResult(res)
|
||||
// if s.processedSubmitResults != nil {
|
||||
// s.processedSubmitResults <- res
|
||||
// }
|
||||
//
|
||||
// case pwreq := <-s.getPostWindowReqs:
|
||||
// // used by getPostWindow() to sync with run loop
|
||||
// pwreq.out <- s.postWindows[pwreq.di.Open]
|
||||
//
|
||||
// case out := <-s.getTSDIReq:
|
||||
// // used by currentTSDI() to sync with run loop
|
||||
// out <- &tsdi{ts: s.currentTS, di: s.currentDI}
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//// processHeadChange is called when the chain head changes
|
||||
//func (s *submitHandler) processHeadChange(ctx context.Context, revert *types.TipSet, advance *types.TipSet, di *dline.Info) {
|
||||
// s.currentCtx = ctx
|
||||
// s.currentTS = advance
|
||||
// s.currentDI = di
|
||||
//
|
||||
// // Start tracking the current post window if we're not already
|
||||
// // TODO: clear post windows older than chain finality
|
||||
// if _, ok := s.postWindows[di.Open]; !ok {
|
||||
// s.postWindows[di.Open] = &postWindow{
|
||||
// di: di,
|
||||
// ts: advance,
|
||||
// submitState: SubmitStateStart,
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // Apply the change to all post windows
|
||||
// for _, pw := range s.postWindows {
|
||||
// s.processHeadChangeForPW(ctx, revert, advance, pw)
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//func (s *submitHandler) processHeadChangeForPW(ctx context.Context, revert *types.TipSet, advance *types.TipSet, pw *postWindow) {
|
||||
// revertedToPrevDL := revert != nil && revert.Height() < pw.di.Open
|
||||
// expired := advance.Height() >= pw.di.Close
|
||||
//
|
||||
// // If the chain was reverted back to the previous deadline, or if the post
|
||||
// // window has expired, abort submit
|
||||
// if pw.submitState == SubmitStateSubmitting && (revertedToPrevDL || expired) {
|
||||
// // Replace the aborted postWindow with a new one so that we can
|
||||
// // submit again at any time without the state getting clobbered
|
||||
// // when the abort completes
|
||||
// abort := pw.abort
|
||||
// if abort != nil {
|
||||
// pw = &postWindow{
|
||||
// di: pw.di,
|
||||
// ts: advance,
|
||||
// submitState: SubmitStateStart,
|
||||
// }
|
||||
// s.postWindows[pw.di.Open] = pw
|
||||
//
|
||||
// // Abort the current submit
|
||||
// abort()
|
||||
// }
|
||||
// } else if pw.submitState == SubmitStateComplete && revertedToPrevDL {
|
||||
// // If submit for this deadline has completed, but the chain was
|
||||
// // reverted back to the previous deadline, reset the submit state to the
|
||||
// // starting state, so that it can be resubmitted
|
||||
// pw.submitState = SubmitStateStart
|
||||
// }
|
||||
//
|
||||
// // Submit the proof to chain if the proof has been generated and the chain
|
||||
// // height is above confidence
|
||||
// s.submitIfReady(ctx, advance, pw)
|
||||
//}
|
||||
//
|
||||
//// processPostReady is called when a proof generation completes
|
||||
//func (s *submitHandler) processPostReady(pi *postInfo) {
|
||||
// pw, ok := s.postWindows[pi.di.Open]
|
||||
// if ok {
|
||||
// s.submitIfReady(s.currentCtx, s.currentTS, pw)
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//// submitIfReady submits a proof if the chain is high enough and the proof
|
||||
//// has been generated for this deadline
|
||||
//func (s *submitHandler) submitIfReady(ctx context.Context, advance *types.TipSet, pw *postWindow) {
|
||||
// // If the window has expired, there's nothing more to do.
|
||||
// if advance.Height() >= pw.di.Close {
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// // Check if we're already submitting, or already completed submit
|
||||
// if pw.submitState != SubmitStateStart {
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// // Check if we've reached the confidence height to submit
|
||||
// if advance.Height() < pw.di.Open+SubmitConfidence {
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// // Check if the proofs have been generated for this deadline
|
||||
// posts, ok := s.posts.get(pw.di)
|
||||
// if !ok {
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// // If there was nothing to prove, move straight to the complete state
|
||||
// if len(posts) == 0 {
|
||||
// pw.submitState = SubmitStateComplete
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// // Start submitting post
|
||||
// pw.submitState = SubmitStateSubmitting
|
||||
// pw.abort = s.api.startSubmitPoST(ctx, advance, pw.di, posts, func(err error) {
|
||||
// s.submitResults <- &submitResult{pw: pw, err: err}
|
||||
// })
|
||||
//}
|
||||
//
|
||||
//// processSubmitResult is called with the response to a submit
|
||||
//func (s *submitHandler) processSubmitResult(res *submitResult) {
|
||||
// if res.err != nil {
|
||||
// // Submit failed so inform the API and go back to the start state
|
||||
// s.api.recordPoStFailure(res.err, res.pw.ts, res.pw.di)
|
||||
// log.Warnf("Aborted window post Submitting (Deadline: %+v)", res.pw.di)
|
||||
// s.api.onAbort(res.pw.ts, res.pw.di)
|
||||
//
|
||||
// res.pw.submitState = SubmitStateStart
|
||||
// return
|
||||
// }
|
||||
//
|
||||
// // Submit succeeded so move to complete state
|
||||
// res.pw.submitState = SubmitStateComplete
|
||||
//}
|
||||
//
|
||||
//type tsdi struct {
|
||||
// ts *types.TipSet
|
||||
// di *dline.Info
|
||||
//}
|
||||
//
|
||||
//func (s *submitHandler) currentTSDI() (*types.TipSet, *dline.Info) {
|
||||
// out := make(chan *tsdi)
|
||||
// s.getTSDIReq <- out
|
||||
// res := <-out
|
||||
// return res.ts, res.di
|
||||
//}
|
||||
//
|
||||
//type getPWReq struct {
|
||||
// di *dline.Info
|
||||
// out chan *postWindow
|
||||
//}
|
||||
//
|
||||
//func (s *submitHandler) getPostWindow(di *dline.Info) *postWindow {
|
||||
// out := make(chan *postWindow)
|
||||
// s.getPostWindowReqs <- &getPWReq{di: di, out: out}
|
||||
// return <-out
|
||||
//}
|
||||
//
|
||||
//// nextDeadline gets deadline info for the subsequent deadline
|
||||
//func nextDeadline(currentDeadline *dline.Info) *dline.Info {
|
||||
// periodStart := currentDeadline.PeriodStart
|
||||
// newDeadline := currentDeadline.Index + 1
|
||||
// if newDeadline == miner.WPoStPeriodDeadlines {
|
||||
// newDeadline = 0
|
||||
// periodStart = periodStart + miner.WPoStProvingPeriod
|
||||
// }
|
||||
//
|
||||
// return NewDeadlineInfo(periodStart, newDeadline, currentDeadline.CurrentEpoch)
|
||||
//}
|
||||
//
|
||||
//func NewDeadlineInfo(periodStart abi.ChainEpoch, deadlineIdx uint64, currEpoch abi.ChainEpoch) *dline.Info {
|
||||
// return dline.NewInfo(periodStart, deadlineIdx, currEpoch, miner.WPoStPeriodDeadlines, miner.WPoStProvingPeriod, miner.WPoStChallengeWindow, miner.WPoStChallengeLookback, miner.FaultDeclarationCutoff)
|
||||
//}
|
@ -441,7 +441,7 @@ func TestChangeHandlerStartProvingNextDeadline(t *testing.T) {
|
||||
// Trigger head change that advances the chain to the Challenge epoch for
|
||||
// the next deadline
|
||||
go func() {
|
||||
di = nextDeadline(di)
|
||||
di = NextDeadline(di)
|
||||
currentEpoch = di.Challenge + ChallengeConfidence
|
||||
triggerHeadAdvance(t, s, currentEpoch)
|
||||
}()
|
||||
@ -474,7 +474,7 @@ func TestChangeHandlerProvingRounds(t *testing.T) {
|
||||
<-s.ch.proveHdlr.processedHeadChanges
|
||||
|
||||
completeProofEpoch := di.Open + completeProofIndex
|
||||
next := nextDeadline(di)
|
||||
next := NextDeadline(di)
|
||||
//fmt.Println("epoch", currentEpoch, s.mock.getPostStatus(di), "next", s.mock.getPostStatus(next))
|
||||
if currentEpoch >= next.Challenge {
|
||||
require.Equal(t, postStatusComplete, s.mock.getPostStatus(di))
|
||||
@ -962,7 +962,7 @@ func TestChangeHandlerSubmitRevertTwoEpochs(t *testing.T) {
|
||||
require.Equal(t, postStatusComplete, s.mock.getPostStatus(diE1))
|
||||
|
||||
// Move to the challenge epoch for the next deadline
|
||||
diE2 := nextDeadline(diE1)
|
||||
diE2 := NextDeadline(diE1)
|
||||
currentEpoch = diE2.Challenge + ChallengeConfidence
|
||||
go triggerHeadAdvance(t, s, currentEpoch)
|
||||
|
||||
@ -1067,7 +1067,7 @@ func TestChangeHandlerSubmitRevertAdvanceLess(t *testing.T) {
|
||||
require.Equal(t, postStatusComplete, s.mock.getPostStatus(diE1))
|
||||
|
||||
// Move to the challenge epoch for the next deadline
|
||||
diE2 := nextDeadline(diE1)
|
||||
diE2 := NextDeadline(diE1)
|
||||
currentEpoch = diE2.Challenge + ChallengeConfidence
|
||||
go triggerHeadAdvance(t, s, currentEpoch)
|
||||
|
||||
|
@ -24,7 +24,7 @@ func TestNextDeadline(t *testing.T) {
|
||||
|
||||
for i := 1; i < 1+int(minertypes.WPoStPeriodDeadlines)*2; i++ {
|
||||
//stm: @WDPOST_NEXT_DEADLINE_001
|
||||
di = nextDeadline(di)
|
||||
di = NextDeadline(di)
|
||||
deadlineIdx = i % int(minertypes.WPoStPeriodDeadlines)
|
||||
expPeriodStart := int(minertypes.WPoStProvingPeriod) * (i / int(minertypes.WPoStPeriodDeadlines))
|
||||
expOpen := expPeriodStart + deadlineIdx*int(minertypes.WPoStChallengeWindow)
|
||||
|
@ -272,8 +272,6 @@ func (s *WindowPoStScheduler) runPoStCycle(ctx context.Context, manual bool, di
|
||||
|
||||
start := time.Now()
|
||||
|
||||
log.Errorf("runPoStCycle called with manual: %v, di: %v, ts: %v", manual, di, ts)
|
||||
|
||||
log := log.WithOptions(zap.Fields(zap.Time("cycle", start)))
|
||||
log.Infow("starting PoSt cycle", "manual", manual, "ts", ts, "deadline", di.Index)
|
||||
defer func() {
|
||||
|
@ -4,8 +4,6 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"go.opencensus.io/trace"
|
||||
@ -80,7 +78,7 @@ type WindowPoStScheduler struct {
|
||||
maxPartitionsPerPostMessage int
|
||||
maxPartitionsPerRecoveryMessage int
|
||||
singleRecoveringPartitionPerPostMessage bool
|
||||
ch ChangeHandlerIface
|
||||
ch *changeHandler
|
||||
|
||||
actor address.Address
|
||||
|
||||
@ -89,8 +87,6 @@ type WindowPoStScheduler struct {
|
||||
|
||||
// failed abi.ChainEpoch // eps
|
||||
// failLk sync.Mutex
|
||||
db *harmonydb.DB
|
||||
wdPostTask *WdPostTask
|
||||
}
|
||||
|
||||
type ActorInfo struct {
|
||||
@ -107,9 +103,7 @@ func NewWindowedPoStScheduler(api NodeAPI,
|
||||
verif storiface.Verifier,
|
||||
ft sealer.FaultTracker,
|
||||
j journal.Journal,
|
||||
actors []dtypes.MinerAddress,
|
||||
db *harmonydb.DB,
|
||||
task *WdPostTask) (*WindowPoStScheduler, error) {
|
||||
actors []dtypes.MinerAddress) (*WindowPoStScheduler, error) {
|
||||
var actorInfos []ActorInfo
|
||||
|
||||
for _, actor := range actors {
|
||||
@ -141,26 +135,17 @@ func NewWindowedPoStScheduler(api NodeAPI,
|
||||
evtTypeWdPoStFaults: j.RegisterEventType("wdpost", "faults_processed"),
|
||||
},
|
||||
journal: j,
|
||||
wdPostTask: task,
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *WindowPoStScheduler) Run(ctx context.Context) {
|
||||
// Initialize change handler.
|
||||
|
||||
s.RunV2(ctx, func(api WdPoStCommands, actor address.Address) ChangeHandlerIface {
|
||||
return newChangeHandler(api, actor)
|
||||
})
|
||||
}
|
||||
func (s *WindowPoStScheduler) RunV2(ctx context.Context, f func(api WdPoStCommands, actor address.Address) ChangeHandlerIface) {
|
||||
// callbacks is a union of the fullNodeFilteredAPI and ourselves.
|
||||
callbacks := struct {
|
||||
NodeAPI
|
||||
*WindowPoStScheduler
|
||||
}{s.api, s}
|
||||
|
||||
s.ch = f(callbacks, s.actor)
|
||||
s.ch = newChangeHandler(callbacks, s.actor)
|
||||
defer s.ch.shutdown()
|
||||
s.ch.start()
|
||||
|
||||
@ -247,11 +232,6 @@ func (s *WindowPoStScheduler) update(ctx context.Context, revert, apply *types.T
|
||||
if err != nil {
|
||||
log.Errorf("handling head updates in window post sched: %+v", err)
|
||||
}
|
||||
|
||||
//err = s.ch2.update(ctx, revert, apply)
|
||||
//if err != nil {
|
||||
// log.Errorf("handling head updates in window post sched: %+v", err)
|
||||
//}
|
||||
}
|
||||
|
||||
// onAbort is called when generating proofs or submitting proofs is aborted
|
||||
|
@ -1,348 +0,0 @@
|
||||
package wdpost
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/dline"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/resources"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/taskhelp"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
"github.com/samber/lo"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
)
|
||||
|
||||
type WdPostTaskDetails struct {
|
||||
Ts *types.TipSet
|
||||
Deadline *dline.Info
|
||||
}
|
||||
|
||||
type WdPostTask struct {
|
||||
tasks chan *WdPostTaskDetails
|
||||
db *harmonydb.DB
|
||||
Scheduler *WindowPoStScheduler
|
||||
max int
|
||||
}
|
||||
|
||||
func (t *WdPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
log.Errorf("WdPostTask.Do() called with taskID: %v", taskID)
|
||||
|
||||
var tsKeyBytes []byte
|
||||
var deadline dline.Info
|
||||
|
||||
err = t.db.QueryRow(context.Background(),
|
||||
`Select tskey,
|
||||
current_epoch,
|
||||
period_start,
|
||||
index,
|
||||
open,
|
||||
close,
|
||||
challenge,
|
||||
fault_cutoff,
|
||||
wpost_period_deadlines,
|
||||
wpost_proving_period,
|
||||
wpost_challenge_window,
|
||||
wpost_challenge_lookback,
|
||||
fault_declaration_cutoff
|
||||
from wdpost_tasks
|
||||
where task_id = $1`, taskID).Scan(
|
||||
&tsKeyBytes,
|
||||
&deadline.CurrentEpoch,
|
||||
&deadline.PeriodStart,
|
||||
&deadline.Index,
|
||||
&deadline.Open,
|
||||
&deadline.Close,
|
||||
&deadline.Challenge,
|
||||
&deadline.FaultCutoff,
|
||||
&deadline.WPoStPeriodDeadlines,
|
||||
&deadline.WPoStProvingPeriod,
|
||||
&deadline.WPoStChallengeWindow,
|
||||
&deadline.WPoStChallengeLookback,
|
||||
&deadline.FaultDeclarationCutoff,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("WdPostTask.Do() failed to queryRow: %v", err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
log.Errorf("tskEY: %v", tsKeyBytes)
|
||||
tsKey, err := types.TipSetKeyFromBytes(tsKeyBytes)
|
||||
if err != nil {
|
||||
log.Errorf("WdPostTask.Do() failed to get tipset key: %v", err)
|
||||
return false, err
|
||||
}
|
||||
head, err := t.Scheduler.api.ChainHead(context.Background())
|
||||
if err != nil {
|
||||
log.Errorf("WdPostTask.Do() failed to get chain head: %v", err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
if deadline.Close > head.Height() {
|
||||
log.Errorf("WdPost removed stale task: %v %v", taskID, tsKey)
|
||||
return true, nil
|
||||
}
|
||||
|
||||
ts, err := t.Scheduler.api.ChainGetTipSet(context.Background(), tsKey)
|
||||
if err != nil {
|
||||
log.Errorf("WdPostTask.Do() failed to get tipset: %v", err)
|
||||
return false, err
|
||||
}
|
||||
submitWdPostParams, err := t.Scheduler.runPoStCycle(context.Background(), false, deadline, ts)
|
||||
if err != nil {
|
||||
log.Errorf("WdPostTask.Do() failed to runPoStCycle: %v", err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
log.Errorf("WdPostTask.Do() called with taskID: %v, submitWdPostParams: %v", taskID, submitWdPostParams)
|
||||
|
||||
// Enter an entry for each wdpost message proof into the wdpost_proofs table
|
||||
for _, params := range submitWdPostParams {
|
||||
|
||||
// Convert submitWdPostParams.Partitions to a byte array using CBOR
|
||||
buf := new(bytes.Buffer)
|
||||
scratch := make([]byte, 9)
|
||||
if err := cbg.WriteMajorTypeHeaderBuf(scratch, buf, cbg.MajArray, uint64(len(params.Partitions))); err != nil {
|
||||
return false, err
|
||||
}
|
||||
for _, v := range params.Partitions {
|
||||
if err := v.MarshalCBOR(buf); err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
// Insert into wdpost_proofs table
|
||||
_, err = t.db.Exec(context.Background(),
|
||||
`INSERT INTO wdpost_proofs (
|
||||
deadline,
|
||||
partitions,
|
||||
proof_type,
|
||||
proof_bytes)
|
||||
VALUES ($1, $2, $3, $4)`,
|
||||
params.Deadline,
|
||||
buf.Bytes(),
|
||||
params.Proofs[0].PoStProof,
|
||||
params.Proofs[0].ProofBytes)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (t *WdPostTask) CanAccept(ids []harmonytask.TaskID, te *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
|
||||
// GetEpoch
|
||||
ts, err := t.Scheduler.api.ChainHead(context.Background())
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// GetData for tasks
|
||||
type wdTaskDef struct {
|
||||
abi.RegisteredSealProof
|
||||
Task_id harmonytask.TaskID
|
||||
Tskey []byte
|
||||
Open abi.ChainEpoch
|
||||
Close abi.ChainEpoch
|
||||
}
|
||||
var tasks []wdTaskDef
|
||||
err = t.db.Select(context.Background(), &tasks,
|
||||
`Select tskey,
|
||||
task_id,
|
||||
period_start,
|
||||
open,
|
||||
close
|
||||
from wdpost_tasks
|
||||
where task_id IN $1`, ids)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Accept those past deadline, then delete them in Do().
|
||||
for _, task := range tasks {
|
||||
if task.Close < ts.Height() {
|
||||
return &task.Task_id, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Discard those too big for our free RAM
|
||||
freeRAM := te.ResourcesAvailable().Ram
|
||||
tasks = lo.Filter(tasks, func(d wdTaskDef, _ int) bool {
|
||||
return res[d.RegisteredSealProof].MaxMemory <= freeRAM
|
||||
})
|
||||
if len(tasks) == 0 {
|
||||
log.Infof("RAM too small for any WDPost task")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Ignore those with too many failures unless they are the only ones left.
|
||||
tasks, _ = taskhelp.SliceIfFound(tasks, func(d wdTaskDef) bool {
|
||||
var r int
|
||||
err := t.db.QueryRow(context.Background(), `SELECT COUNT(*)
|
||||
FROM harmony_task_history
|
||||
WHERE task_id = $1 AND success = false`, d.Task_id).Scan(&r)
|
||||
if err != nil {
|
||||
log.Errorf("WdPostTask.CanAccept() failed to queryRow: %v", err)
|
||||
}
|
||||
return r < 2
|
||||
})
|
||||
|
||||
// Select the one closest to the deadline
|
||||
sort.Slice(tasks, func(i, j int) bool {
|
||||
return tasks[i].Close < tasks[j].Close
|
||||
})
|
||||
|
||||
return &tasks[0].Task_id, nil
|
||||
}
|
||||
|
||||
var res = storiface.ResourceTable[sealtasks.TTGenerateWindowPoSt]
|
||||
|
||||
func (t *WdPostTask) TypeDetails() harmonytask.TaskTypeDetails {
|
||||
return harmonytask.TaskTypeDetails{
|
||||
Name: "WdPost",
|
||||
Max: t.max,
|
||||
MaxFailures: 3,
|
||||
Follows: nil,
|
||||
Cost: resources.Resources{
|
||||
Cpu: 1,
|
||||
Gpu: 1,
|
||||
// RAM of smallest proof's max is listed here
|
||||
Ram: lo.Reduce(lo.Keys(res), func(i uint64, k abi.RegisteredSealProof, _ int) uint64 {
|
||||
if res[k].MaxMemory < i {
|
||||
return res[k].MaxMemory
|
||||
}
|
||||
return i
|
||||
}, 1<<63),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (t *WdPostTask) Adder(taskFunc harmonytask.AddTaskFunc) {
|
||||
|
||||
// wait for any channels on t.tasks and call taskFunc on them
|
||||
for taskDetails := range t.tasks {
|
||||
|
||||
//log.Errorf("WdPostTask.Adder() received taskDetails: %v", taskDetails)
|
||||
|
||||
taskFunc(func(tID harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
|
||||
return t.addTaskToDB(taskDetails.Ts, taskDetails.Deadline, tID, tx)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func NewWdPostTask(db *harmonydb.DB, scheduler *WindowPoStScheduler, max int) *WdPostTask {
|
||||
return &WdPostTask{
|
||||
tasks: make(chan *WdPostTaskDetails, 2),
|
||||
db: db,
|
||||
Scheduler: scheduler,
|
||||
max: max,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *WdPostTask) AddTask(ctx context.Context, ts *types.TipSet, deadline *dline.Info) error {
|
||||
|
||||
t.tasks <- &WdPostTaskDetails{
|
||||
Ts: ts,
|
||||
Deadline: deadline,
|
||||
}
|
||||
|
||||
//log.Errorf("WdPostTask.AddTask() called with ts: %v, deadline: %v, taskList: %v", ts, deadline, t.tasks)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *WdPostTask) addTaskToDB(ts *types.TipSet, deadline *dline.Info, taskId harmonytask.TaskID, tx *harmonydb.Tx) (bool, error) {
|
||||
|
||||
tsKey := ts.Key()
|
||||
|
||||
//log.Errorf("WdPostTask.addTaskToDB() called with tsKey: %v, taskId: %v", tsKey, taskId)
|
||||
|
||||
_, err := tx.Exec(
|
||||
`INSERT INTO wdpost_tasks (
|
||||
task_id,
|
||||
tskey,
|
||||
current_epoch,
|
||||
period_start,
|
||||
index,
|
||||
open,
|
||||
close,
|
||||
challenge,
|
||||
fault_cutoff,
|
||||
wpost_period_deadlines,
|
||||
wpost_proving_period,
|
||||
wpost_challenge_window,
|
||||
wpost_challenge_lookback,
|
||||
fault_declaration_cutoff
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`,
|
||||
taskId,
|
||||
tsKey.Bytes(),
|
||||
deadline.CurrentEpoch,
|
||||
deadline.PeriodStart,
|
||||
deadline.Index,
|
||||
deadline.Open,
|
||||
deadline.Close,
|
||||
deadline.Challenge,
|
||||
deadline.FaultCutoff,
|
||||
deadline.WPoStPeriodDeadlines,
|
||||
deadline.WPoStProvingPeriod,
|
||||
deadline.WPoStChallengeWindow,
|
||||
deadline.WPoStChallengeLookback,
|
||||
deadline.FaultDeclarationCutoff,
|
||||
)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (t *WdPostTask) AddTaskOld(ctx context.Context, ts *types.TipSet, deadline *dline.Info, taskId harmonytask.TaskID) error {
|
||||
|
||||
tsKey := ts.Key()
|
||||
_, err := t.db.Exec(ctx,
|
||||
`INSERT INTO wdpost_tasks (
|
||||
task_id,
|
||||
tskey,
|
||||
current_epoch,
|
||||
period_start,
|
||||
index,
|
||||
open,
|
||||
close,
|
||||
challenge,
|
||||
fault_cutoff,
|
||||
wpost_period_deadlines,
|
||||
wpost_proving_period,
|
||||
wpost_challenge_window,
|
||||
wpost_challenge_lookback,
|
||||
fault_declaration_cutoff
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10 , $11, $12, $13, $14)`,
|
||||
taskId,
|
||||
tsKey.Bytes(),
|
||||
deadline.CurrentEpoch,
|
||||
deadline.PeriodStart,
|
||||
deadline.Index,
|
||||
deadline.Open,
|
||||
deadline.Close,
|
||||
deadline.Challenge,
|
||||
deadline.FaultCutoff,
|
||||
deadline.WPoStPeriodDeadlines,
|
||||
deadline.WPoStProvingPeriod,
|
||||
deadline.WPoStChallengeWindow,
|
||||
deadline.WPoStChallengeLookback,
|
||||
deadline.FaultDeclarationCutoff,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ harmonytask.TaskInterface = &WdPostTask{}
|
Loading…
Reference in New Issue
Block a user