Mostly fix deals
This commit is contained in:
parent
3020f7a203
commit
630134486d
@ -2,7 +2,6 @@ package deals
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/filecoin-project/lotus/lib/statestore"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
cid "github.com/ipfs/go-cid"
|
cid "github.com/ipfs/go-cid"
|
||||||
@ -17,8 +16,10 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/address"
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/lib/cborrpc"
|
"github.com/filecoin-project/lotus/lib/cborrpc"
|
||||||
|
"github.com/filecoin-project/lotus/lib/statestore"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/storage"
|
"github.com/filecoin-project/lotus/storage"
|
||||||
|
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MinerDeal struct {
|
type MinerDeal struct {
|
||||||
@ -42,6 +43,7 @@ type Provider struct {
|
|||||||
ask *types.SignedStorageAsk
|
ask *types.SignedStorageAsk
|
||||||
askLk sync.Mutex
|
askLk sync.Mutex
|
||||||
|
|
||||||
|
secb *sectorblocks.SectorBlocks
|
||||||
sminer *storage.Miner
|
sminer *storage.Miner
|
||||||
full api.FullNode
|
full api.FullNode
|
||||||
|
|
||||||
@ -69,7 +71,7 @@ type minerDealUpdate struct {
|
|||||||
mut func(*MinerDeal)
|
mut func(*MinerDeal)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, dag dtypes.StagingDAG, fullNode api.FullNode) (*Provider, error) {
|
func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, secb *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, fullNode api.FullNode) (*Provider, error) {
|
||||||
addr, err := ds.Get(datastore.NewKey("miner-address"))
|
addr, err := ds.Get(datastore.NewKey("miner-address"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -83,6 +85,7 @@ func NewProvider(ds dtypes.MetadataDS, sminer *storage.Miner, dag dtypes.Staging
|
|||||||
sminer: sminer,
|
sminer: sminer,
|
||||||
dag: dag,
|
dag: dag,
|
||||||
full: fullNode,
|
full: fullNode,
|
||||||
|
secb: secb,
|
||||||
|
|
||||||
pricePerByteBlock: types.NewInt(3), // TODO: allow setting
|
pricePerByteBlock: types.NewInt(3), // TODO: allow setting
|
||||||
minPieceSize: 1,
|
minPieceSize: 1,
|
||||||
|
@ -13,7 +13,6 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
"github.com/filecoin-project/lotus/chain/address"
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
|
||||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -228,54 +227,26 @@ func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
_ = pcid
|
|
||||||
|
|
||||||
/*
|
sectorID, err := p.secb.AddUnixfsPiece(pcid, uf, deal.DealID)
|
||||||
sectorID, err := p.sminer.AddUnixfsPiece(pcid, uf, deal.DealID)
|
if err != nil {
|
||||||
if err != nil {
|
return nil, xerrors.Errorf("AddPiece failed: %s", err)
|
||||||
return nil, xerrors.Errorf("AddPiece failed: %s", err)
|
}
|
||||||
}
|
log.Warnf("New Sector: %d", sectorID)
|
||||||
log.Warnf("New Sector: %d", sectorID)
|
|
||||||
|
return func(deal *MinerDeal) {
|
||||||
|
deal.SectorID = sectorID
|
||||||
|
}, nil
|
||||||
|
|
||||||
return func(deal *MinerDeal) {
|
|
||||||
deal.SectorID = sectorID
|
|
||||||
}, nil
|
|
||||||
*/
|
|
||||||
panic("fixme")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SEALING
|
// SEALING
|
||||||
|
|
||||||
func (p *Provider) waitSealed(ctx context.Context, deal MinerDeal) (sectorbuilder.SectorSealingStatus, error) {
|
|
||||||
panic("fixme")
|
|
||||||
|
|
||||||
/*
|
|
||||||
status, err := p.sminer.WaitSeal(ctx, deal.SectorID)
|
|
||||||
if err != nil {
|
|
||||||
return sectorbuilder.SectorSealingStatus{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch status.State {
|
|
||||||
case sealing_state.Sealed:
|
|
||||||
case sealing_state.Failed:
|
|
||||||
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sealing sector %d for deal %s (ref=%s) failed: %s", deal.SectorID, deal.ProposalCid, deal.Ref, status.SealErrorMsg)
|
|
||||||
case sealing_state.Pending:
|
|
||||||
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'pending' after call to WaitSeal (for sector %d)", deal.SectorID)
|
|
||||||
case sealing_state.Sealing:
|
|
||||||
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("sector status was 'wait' after call to WaitSeal (for sector %d)", deal.SectorID)
|
|
||||||
default:
|
|
||||||
return sectorbuilder.SectorSealingStatus{}, xerrors.Errorf("unknown SealStatusCode: %d", status.SectorID)
|
|
||||||
}
|
|
||||||
|
|
||||||
return status, nil
|
|
||||||
*/
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
|
func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
|
||||||
err := p.sendSignedResponse(&Response{
|
err := p.sendSignedResponse(&Response{
|
||||||
State: api.DealSealing,
|
State: api.DealSealing,
|
||||||
Proposal: deal.ProposalCid,
|
Proposal: deal.ProposalCid,
|
||||||
|
// TODO: Send sector ID
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("Sending deal response failed: %s", err)
|
log.Warnf("Sending deal response failed: %s", err)
|
||||||
@ -284,11 +255,12 @@ func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal
|
|||||||
if err := p.sminer.SealSector(ctx, deal.SectorID); err != nil {
|
if err := p.sminer.SealSector(ctx, deal.SectorID); err != nil {
|
||||||
return nil, xerrors.Errorf("sealing sector failed: %w", err)
|
return nil, xerrors.Errorf("sealing sector failed: %w", err)
|
||||||
}
|
}
|
||||||
|
// TODO: Let's not care after this point, for now at least, client can watch the chain
|
||||||
|
|
||||||
_, err = p.waitSealed(ctx, deal)
|
/*_, err = p.waitSealed(ctx, deal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}*/
|
||||||
// TODO: Spec doesn't say anything about inclusion proofs anywhere
|
// TODO: Spec doesn't say anything about inclusion proofs anywhere
|
||||||
// Not sure what mechanisms prevents miner from storing data that isn't
|
// Not sure what mechanisms prevents miner from storing data that isn't
|
||||||
// clients' data
|
// clients' data
|
||||||
@ -297,13 +269,12 @@ func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *Provider) complete(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
|
func (p *Provider) complete(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
|
||||||
// TODO: Add dealID to commtracker (probably before sealing)
|
|
||||||
/*mcid, err := p.commt.WaitCommit(ctx, deal.Proposal.Provider, deal.SectorID)
|
/*mcid, err := p.commt.WaitCommit(ctx, deal.Proposal.Provider, deal.SectorID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("Waiting for sector commitment message: %s", err)
|
log.Warnf("Waiting for sector commitment message: %s", err)
|
||||||
}*/
|
}*/
|
||||||
|
|
||||||
panic("fixme")
|
//panic("fixme")
|
||||||
|
|
||||||
/*err = p.sendSignedResponse(&Response{
|
/*err = p.sendSignedResponse(&Response{
|
||||||
State: api.DealComplete,
|
State: api.DealComplete,
|
||||||
|
@ -177,9 +177,9 @@ func (c *client) makeOutChan(ctx context.Context, ftyp reflect.Type, valOut int)
|
|||||||
front := buf.Front()
|
front := buf.Front()
|
||||||
bufLk.Unlock()
|
bufLk.Unlock()
|
||||||
|
|
||||||
cases := []reflect.SelectCase{
|
cases := []reflect.SelectCase{
|
||||||
{
|
{
|
||||||
Dir: reflect.SelectRecv,
|
Dir: reflect.SelectRecv,
|
||||||
Chan: reflect.ValueOf(chCtx.Done()),
|
Chan: reflect.ValueOf(chCtx.Done()),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -32,7 +32,7 @@ func TempSectorbuilder(sectorSize uint64) (*SectorBuilder, func(), error) {
|
|||||||
CacheDir: cache,
|
CacheDir: cache,
|
||||||
|
|
||||||
WorkerThreads: 2,
|
WorkerThreads: 2,
|
||||||
Miner: addr,
|
Miner: addr,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
@ -71,8 +71,8 @@ func New(cfg *Config) (*SectorBuilder, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &SectorBuilder{
|
return &SectorBuilder{
|
||||||
handle: sbp,
|
handle: sbp,
|
||||||
Miner: cfg.Miner,
|
Miner: cfg.Miner,
|
||||||
rateLimit: make(chan struct{}, cfg.WorkerThreads-PoStReservedWorkers),
|
rateLimit: make(chan struct{}, cfg.WorkerThreads-PoStReservedWorkers),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,7 @@ func TestList(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
st := &StateStore{ds:ds}
|
st := &StateStore{ds: ds}
|
||||||
|
|
||||||
var out []types.BigInt
|
var out []types.BigInt
|
||||||
if err := st.List(&out); err != nil {
|
if err := st.List(&out); err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user