on chain deals: Put dealIDs in CommitSector messages
This commit is contained in:
parent
7c78105df5
commit
76f1e6e207
@ -16,13 +16,11 @@ const (
|
|||||||
DealSealing // Data in process of being sealed
|
DealSealing // Data in process of being sealed
|
||||||
|
|
||||||
DealFailed
|
DealFailed
|
||||||
|
|
||||||
DealComplete
|
DealComplete
|
||||||
|
|
||||||
// Internal
|
// Internal
|
||||||
|
|
||||||
DealError // deal failed with an unexpected error
|
DealError // deal failed with an unexpected error
|
||||||
DealExpired
|
|
||||||
|
|
||||||
DealNoUpdate = DealUnknown
|
DealNoUpdate = DealUnknown
|
||||||
)
|
)
|
||||||
|
@ -409,7 +409,7 @@ func (self *StorageMarketState) validateDeal(vmctx types.VMContext, deal Storage
|
|||||||
|
|
||||||
bcid, aerr := setMarketBalances(vmctx, bnd, map[address.Address]StorageParticipantBalance{
|
bcid, aerr := setMarketBalances(vmctx, bnd, map[address.Address]StorageParticipantBalance{
|
||||||
deal.Proposal.Client: clientBalance,
|
deal.Proposal.Client: clientBalance,
|
||||||
deal.Proposal.Provider: providerBalance,
|
providerWorker: providerBalance,
|
||||||
})
|
})
|
||||||
if aerr != nil {
|
if aerr != nil {
|
||||||
return aerr
|
return aerr
|
||||||
@ -443,17 +443,8 @@ func (sma StorageMarketActor) ActivateStorageDeals(act *types.Actor, vmctx types
|
|||||||
return nil, aerrors.HandleExternalError(err, "getting del info failed")
|
return nil, aerrors.HandleExternalError(err, "getting del info failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
workerBytes, err := vmctx.Send(dealInfo.Deal.Proposal.Provider, MAMethods.GetWorkerAddr, types.NewInt(0), nil)
|
if vmctx.Message().From != dealInfo.Deal.Proposal.Provider {
|
||||||
if err != nil {
|
return nil, aerrors.New(1, "ActivateStorageDeals can only be called by the deal provider")
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
providerWorker, eerr := address.NewFromBytes(workerBytes)
|
|
||||||
if eerr != nil {
|
|
||||||
return nil, aerrors.HandleExternalError(eerr, "parsing provider worker address bytes")
|
|
||||||
}
|
|
||||||
|
|
||||||
if vmctx.Message().From != providerWorker {
|
|
||||||
return nil, aerrors.New(1, "ActivateStorageDeals can only be called by deal provider")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if vmctx.BlockHeight() > dealInfo.Deal.Proposal.ProposalExpiration {
|
if vmctx.BlockHeight() > dealInfo.Deal.Proposal.ProposalExpiration {
|
||||||
@ -467,7 +458,7 @@ func (sma StorageMarketActor) ActivateStorageDeals(act *types.Actor, vmctx types
|
|||||||
|
|
||||||
dealInfo.ActivationEpoch = vmctx.BlockHeight()
|
dealInfo.ActivationEpoch = vmctx.BlockHeight()
|
||||||
|
|
||||||
if err := deals.Set(deal, dealInfo); err != nil {
|
if err := deals.Set(deal, &dealInfo); err != nil {
|
||||||
return nil, aerrors.HandleExternalError(err, "setting deal info in AMT failed")
|
return nil, aerrors.HandleExternalError(err, "setting deal info in AMT failed")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -540,7 +531,17 @@ func (sma StorageMarketActor) ProcessStorageDealsPayment(act *types.Actor, vmctx
|
|||||||
// TODO: division is hard, this more than likely has some off-by-one issue
|
// TODO: division is hard, this more than likely has some off-by-one issue
|
||||||
toPay := types.BigDiv(types.BigMul(dealInfo.Deal.Proposal.StoragePrice, types.NewInt(build.ProvingPeriodDuration)), types.NewInt(dealInfo.Deal.Proposal.Duration))
|
toPay := types.BigDiv(types.BigMul(dealInfo.Deal.Proposal.StoragePrice, types.NewInt(build.ProvingPeriodDuration)), types.NewInt(dealInfo.Deal.Proposal.Duration))
|
||||||
|
|
||||||
b, bnd, err := GetMarketBalances(vmctx.Context(), vmctx.Ipld(), self.Balances, dealInfo.Deal.Proposal.Client, dealInfo.Deal.Proposal.Provider)
|
// TODO: cache somehow to conserve gas
|
||||||
|
workerBytes, err := vmctx.Send(dealInfo.Deal.Proposal.Provider, MAMethods.GetWorkerAddr, types.NewInt(0), nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
providerWorker, eerr := address.NewFromBytes(workerBytes)
|
||||||
|
if eerr != nil {
|
||||||
|
return nil, aerrors.HandleExternalError(eerr, "parsing provider worker address bytes")
|
||||||
|
}
|
||||||
|
|
||||||
|
b, bnd, err := GetMarketBalances(vmctx.Context(), vmctx.Ipld(), self.Balances, dealInfo.Deal.Proposal.Client, providerWorker)
|
||||||
clientBal := b[0]
|
clientBal := b[0]
|
||||||
providerBal := b[1]
|
providerBal := b[1]
|
||||||
|
|
||||||
@ -549,7 +550,7 @@ func (sma StorageMarketActor) ProcessStorageDealsPayment(act *types.Actor, vmctx
|
|||||||
// TODO: call set once
|
// TODO: call set once
|
||||||
bcid, aerr := setMarketBalances(vmctx, bnd, map[address.Address]StorageParticipantBalance{
|
bcid, aerr := setMarketBalances(vmctx, bnd, map[address.Address]StorageParticipantBalance{
|
||||||
dealInfo.Deal.Proposal.Client: clientBal,
|
dealInfo.Deal.Proposal.Client: clientBal,
|
||||||
dealInfo.Deal.Proposal.Provider: providerBal,
|
providerWorker: providerBal,
|
||||||
})
|
})
|
||||||
if aerr != nil {
|
if aerr != nil {
|
||||||
return nil, aerr
|
return nil, aerr
|
||||||
|
@ -640,7 +640,7 @@ func (t *MinerDeal) MarshalCBOR(w io.Writer) error {
|
|||||||
_, err := w.Write(cbg.CborNull)
|
_, err := w.Write(cbg.CborNull)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if _, err := w.Write([]byte{134}); err != nil {
|
if _, err := w.Write([]byte{135}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -674,6 +674,11 @@ func (t *MinerDeal) MarshalCBOR(w io.Writer) error {
|
|||||||
return xerrors.Errorf("failed to write cid field t.Ref: %w", err)
|
return xerrors.Errorf("failed to write cid field t.Ref: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// t.t.DealID (uint64)
|
||||||
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.DealID)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// t.t.SectorID (uint64)
|
// t.t.SectorID (uint64)
|
||||||
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.SectorID)); err != nil {
|
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, t.SectorID)); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -692,7 +697,7 @@ func (t *MinerDeal) UnmarshalCBOR(r io.Reader) error {
|
|||||||
return fmt.Errorf("cbor input should be of type array")
|
return fmt.Errorf("cbor input should be of type array")
|
||||||
}
|
}
|
||||||
|
|
||||||
if extra != 6 {
|
if extra != 7 {
|
||||||
return fmt.Errorf("cbor input had wrong number of fields")
|
return fmt.Errorf("cbor input had wrong number of fields")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -749,6 +754,16 @@ func (t *MinerDeal) UnmarshalCBOR(r io.Reader) error {
|
|||||||
t.Ref = c
|
t.Ref = c
|
||||||
|
|
||||||
}
|
}
|
||||||
|
// t.t.DealID (uint64)
|
||||||
|
|
||||||
|
maj, extra, err = cbg.CborReadHeader(br)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if maj != cbg.MajUnsignedInt {
|
||||||
|
return fmt.Errorf("wrong type for uint64 field")
|
||||||
|
}
|
||||||
|
t.DealID = extra
|
||||||
// t.t.SectorID (uint64)
|
// t.t.SectorID (uint64)
|
||||||
|
|
||||||
maj, extra, err = cbg.CborReadHeader(br)
|
maj, extra, err = cbg.CborReadHeader(br)
|
||||||
|
@ -29,6 +29,7 @@ type MinerDeal struct {
|
|||||||
|
|
||||||
Ref cid.Cid
|
Ref cid.Cid
|
||||||
|
|
||||||
|
DealID uint64
|
||||||
SectorID uint64 // Set when State >= DealStaged
|
SectorID uint64 // Set when State >= DealStaged
|
||||||
|
|
||||||
s inet.Stream
|
s inet.Stream
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package deals
|
package deals
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-sectorbuilder/sealing_state"
|
"github.com/filecoin-project/go-sectorbuilder/sealing_state"
|
||||||
@ -151,6 +152,13 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
|
|||||||
if r.Receipt.ExitCode != 0 {
|
if r.Receipt.ExitCode != 0 {
|
||||||
return nil, xerrors.Errorf("publishing deal failed: exit %d", r.Receipt.ExitCode)
|
return nil, xerrors.Errorf("publishing deal failed: exit %d", r.Receipt.ExitCode)
|
||||||
}
|
}
|
||||||
|
var resp actors.PublishStorageDealResponse
|
||||||
|
if err := resp.UnmarshalCBOR(bytes.NewReader(r.Receipt.Return)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(resp.DealIDs) != 1 {
|
||||||
|
return nil, xerrors.Errorf("got unexpected number of DealIDs from")
|
||||||
|
}
|
||||||
|
|
||||||
log.Info("fetching data for a deal")
|
log.Info("fetching data for a deal")
|
||||||
mcid := smsg.Cid()
|
mcid := smsg.Cid()
|
||||||
@ -164,7 +172,9 @@ func (p *Provider) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, merkledag.FetchGraph(ctx, deal.Ref, p.dag)
|
return func(deal *MinerDeal) {
|
||||||
|
deal.DealID = resp.DealIDs[0]
|
||||||
|
}, merkledag.FetchGraph(ctx, deal.Ref, p.dag)
|
||||||
}
|
}
|
||||||
|
|
||||||
// STAGED
|
// STAGED
|
||||||
@ -210,7 +220,7 @@ func (p *Provider) staged(ctx context.Context, deal MinerDeal) (func(*MinerDeal)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sectorID, err := p.secst.AddUnixfsPiece(pcid, uf, deal.Proposal.Duration)
|
sectorID, err := p.secst.AddUnixfsPiece(pcid, uf, deal.DealID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("AddPiece failed: %s", err)
|
return nil, xerrors.Errorf("AddPiece failed: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -6,6 +6,8 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-datastore"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/address"
|
"github.com/filecoin-project/lotus/chain/address"
|
||||||
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
||||||
"github.com/filecoin-project/lotus/storage/sector"
|
"github.com/filecoin-project/lotus/storage/sector"
|
||||||
@ -40,7 +42,7 @@ func TestSealAndVerify(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
store := sector.NewStore(sb)
|
store := sector.NewStore(sb, datastore.NewMapDatastore())
|
||||||
store.Service()
|
store.Service()
|
||||||
ssinfo := <-store.Incoming()
|
ssinfo := <-store.Incoming()
|
||||||
|
|
||||||
|
@ -33,8 +33,9 @@ func (sm *StorageMinerAPI) ActorAddress(context.Context) (address.Address, error
|
|||||||
func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error) {
|
func (sm *StorageMinerAPI) StoreGarbageData(ctx context.Context) (uint64, error) {
|
||||||
size := sectorbuilder.UserBytesForSectorSize(build.SectorSize)
|
size := sectorbuilder.UserBytesForSectorSize(build.SectorSize)
|
||||||
|
|
||||||
|
// TODO: create a deal
|
||||||
name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000))
|
name := fmt.Sprintf("fake-file-%d", rand.Intn(100000000))
|
||||||
sectorId, err := sm.Sectors.AddPiece(name, size, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)))
|
sectorId, err := sm.Sectors.AddPiece(name, size, io.LimitReader(rand.New(rand.NewSource(42)), int64(size)), 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,7 @@ func init() {
|
|||||||
var commitmentDsPrefix = datastore.NewKey("/commitments")
|
var commitmentDsPrefix = datastore.NewKey("/commitments")
|
||||||
|
|
||||||
type Tracker struct {
|
type Tracker struct {
|
||||||
commitDs datastore.Datastore
|
commitments datastore.Datastore
|
||||||
|
|
||||||
lk sync.Mutex
|
lk sync.Mutex
|
||||||
|
|
||||||
@ -34,12 +34,13 @@ type Tracker struct {
|
|||||||
|
|
||||||
func NewTracker(ds dtypes.MetadataDS) *Tracker {
|
func NewTracker(ds dtypes.MetadataDS) *Tracker {
|
||||||
return &Tracker{
|
return &Tracker{
|
||||||
commitDs: namespace.Wrap(ds, commitmentDsPrefix),
|
commitments: namespace.Wrap(ds, commitmentDsPrefix),
|
||||||
waits: map[datastore.Key]chan struct{}{},
|
waits: map[datastore.Key]chan struct{}{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type commitment struct {
|
type commitment struct {
|
||||||
|
DealIDs []uint64
|
||||||
Msg cid.Cid
|
Msg cid.Cid
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -47,22 +48,22 @@ func commitmentKey(miner address.Address, sectorId uint64) datastore.Key {
|
|||||||
return commitmentDsPrefix.ChildString(miner.String()).ChildString(fmt.Sprintf("%d", sectorId))
|
return commitmentDsPrefix.ChildString(miner.String()).ChildString(fmt.Sprintf("%d", sectorId))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ct *Tracker) TrackCommitSectorMsg(miner address.Address, sectorId uint64, mcid cid.Cid) error {
|
func (ct *Tracker) TrackCommitSectorMsg(miner address.Address, sectorId uint64, commitMsg cid.Cid) error {
|
||||||
key := commitmentKey(miner, sectorId)
|
key := commitmentKey(miner, sectorId)
|
||||||
|
|
||||||
ct.lk.Lock()
|
ct.lk.Lock()
|
||||||
defer ct.lk.Unlock()
|
defer ct.lk.Unlock()
|
||||||
|
|
||||||
tracking, err := ct.commitDs.Get(key)
|
tracking, err := ct.commitments.Get(key)
|
||||||
switch err {
|
switch err {
|
||||||
case datastore.ErrNotFound:
|
case datastore.ErrNotFound:
|
||||||
comm := &commitment{Msg: mcid}
|
comm := &commitment{Msg: commitMsg}
|
||||||
commB, err := cbor.DumpObject(comm)
|
commB, err := cbor.DumpObject(comm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := ct.commitDs.Put(key, commB); err != nil {
|
if err := ct.commitments.Put(key, commB); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -78,11 +79,11 @@ func (ct *Tracker) TrackCommitSectorMsg(miner address.Address, sectorId uint64,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !comm.Msg.Equals(mcid) {
|
if !comm.Msg.Equals(commitMsg) {
|
||||||
return xerrors.Errorf("commitment tracking for miner %s, sector %d: already tracking %s, got another commitment message: %s", miner, sectorId, comm.Msg, mcid)
|
return xerrors.Errorf("commitment tracking for miner %s, sector %d: already tracking %s, got another commitment message: %s", miner, sectorId, comm.Msg, commitMsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Warnf("commitment.TrackCommitSectorMsg called more than once for miner %s, sector %d, message %s", miner, sectorId, mcid)
|
log.Warnf("commitment.TrackCommitSectorMsg called more than once for miner %s, sector %d, message %s", miner, sectorId, commitMsg)
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
return err
|
return err
|
||||||
@ -94,7 +95,7 @@ func (ct *Tracker) WaitCommit(ctx context.Context, miner address.Address, sector
|
|||||||
|
|
||||||
ct.lk.Lock()
|
ct.lk.Lock()
|
||||||
|
|
||||||
tracking, err := ct.commitDs.Get(key)
|
tracking, err := ct.commitments.Get(key)
|
||||||
if err != datastore.ErrNotFound {
|
if err != datastore.ErrNotFound {
|
||||||
ct.lk.Unlock()
|
ct.lk.Unlock()
|
||||||
|
|
||||||
@ -120,7 +121,7 @@ func (ct *Tracker) WaitCommit(ctx context.Context, miner address.Address, sector
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case <-wait:
|
case <-wait:
|
||||||
tracking, err := ct.commitDs.Get(key)
|
tracking, err := ct.commitments.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, xerrors.Errorf("failed to get commitment after waiting: %w", err)
|
return cid.Undef, xerrors.Errorf("failed to get commitment after waiting: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
logging "github.com/ipfs/go-log"
|
logging "github.com/ipfs/go-log"
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
@ -129,12 +130,19 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
|
|||||||
log.Error("seal we just created failed verification")
|
log.Error("seal we just created failed verification")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
deals, err := m.secst.DealsForCommit(sinfo.SectorID)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("getting sector deals failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
params := &actors.OnChainSealVerifyInfo{
|
params := &actors.OnChainSealVerifyInfo{
|
||||||
SectorNumber: sinfo.SectorID,
|
|
||||||
CommD: sinfo.CommD[:],
|
CommD: sinfo.CommD[:],
|
||||||
CommR: sinfo.CommR[:],
|
CommR: sinfo.CommR[:],
|
||||||
CommRStar: sinfo.CommRStar[:],
|
CommRStar: sinfo.CommRStar[:],
|
||||||
Proof: sinfo.Proof,
|
Proof: sinfo.Proof,
|
||||||
|
|
||||||
|
DealIDs: deals,
|
||||||
|
SectorNumber: sinfo.SectorID,
|
||||||
}
|
}
|
||||||
enc, aerr := actors.SerializeParams(params)
|
enc, aerr := actors.SerializeParams(params)
|
||||||
if aerr != nil {
|
if aerr != nil {
|
||||||
|
@ -2,25 +2,45 @@ package sector
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/filecoin-project/go-sectorbuilder/sealing_state"
|
"fmt"
|
||||||
"golang.org/x/xerrors"
|
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-sectorbuilder/sealing_state"
|
||||||
|
"github.com/ipfs/go-datastore"
|
||||||
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
|
cbor "github.com/ipfs/go-ipld-cbor"
|
||||||
|
logging "github.com/ipfs/go-log"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
logging "github.com/ipfs/go-log"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
cbor.RegisterCborType(dealMapping{})
|
||||||
|
}
|
||||||
|
|
||||||
var log = logging.Logger("sectorstore")
|
var log = logging.Logger("sectorstore")
|
||||||
|
|
||||||
|
var sectorDealsPrefix = datastore.NewKey("/sectordeals")
|
||||||
|
|
||||||
|
type dealMapping struct {
|
||||||
|
DealIDs []uint64
|
||||||
|
Committed bool
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: eventually handle sector storage here instead of in rust-sectorbuilder
|
// TODO: eventually handle sector storage here instead of in rust-sectorbuilder
|
||||||
type Store struct {
|
type Store struct {
|
||||||
lk sync.Mutex
|
waitingLk sync.Mutex
|
||||||
|
|
||||||
sb *sectorbuilder.SectorBuilder
|
sb *sectorbuilder.SectorBuilder
|
||||||
|
|
||||||
|
dealsLk sync.Mutex
|
||||||
|
deals datastore.Datastore
|
||||||
|
|
||||||
waiting map[uint64]chan struct{}
|
waiting map[uint64]chan struct{}
|
||||||
incoming []chan sectorbuilder.SectorSealingStatus
|
incoming []chan sectorbuilder.SectorSealingStatus
|
||||||
// TODO: outdated chan
|
// TODO: outdated chan
|
||||||
@ -28,9 +48,10 @@ type Store struct {
|
|||||||
closeCh chan struct{}
|
closeCh chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStore(sb *sectorbuilder.SectorBuilder) *Store {
|
func NewStore(sb *sectorbuilder.SectorBuilder, ds dtypes.MetadataDS) *Store {
|
||||||
return &Store{
|
return &Store{
|
||||||
sb: sb,
|
sb: sb,
|
||||||
|
deals: namespace.Wrap(ds, sectorDealsPrefix),
|
||||||
waiting: map[uint64]chan struct{}{},
|
waiting: map[uint64]chan struct{}{},
|
||||||
closeCh: make(chan struct{}),
|
closeCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
@ -44,13 +65,13 @@ func (s *Store) poll() {
|
|||||||
log.Debug("polling for sealed sectors...")
|
log.Debug("polling for sealed sectors...")
|
||||||
|
|
||||||
// get a list of sectors to poll
|
// get a list of sectors to poll
|
||||||
s.lk.Lock()
|
s.waitingLk.Lock()
|
||||||
toPoll := make([]uint64, 0, len(s.waiting))
|
toPoll := make([]uint64, 0, len(s.waiting))
|
||||||
|
|
||||||
for id := range s.waiting {
|
for id := range s.waiting {
|
||||||
toPoll = append(toPoll, id)
|
toPoll = append(toPoll, id)
|
||||||
}
|
}
|
||||||
s.lk.Unlock()
|
s.waitingLk.Unlock()
|
||||||
|
|
||||||
var done []sectorbuilder.SectorSealingStatus
|
var done []sectorbuilder.SectorSealingStatus
|
||||||
|
|
||||||
@ -68,7 +89,7 @@ func (s *Store) poll() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// send updates
|
// send updates
|
||||||
s.lk.Lock()
|
s.waitingLk.Lock()
|
||||||
for _, sector := range done {
|
for _, sector := range done {
|
||||||
watch, ok := s.waiting[sector.SectorID]
|
watch, ok := s.waiting[sector.SectorID]
|
||||||
if ok {
|
if ok {
|
||||||
@ -79,7 +100,7 @@ func (s *Store) poll() {
|
|||||||
c <- sector // TODO: ctx!
|
c <- sector // TODO: ctx!
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.lk.Unlock()
|
s.waitingLk.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) service() {
|
func (s *Store) service() {
|
||||||
@ -90,35 +111,97 @@ func (s *Store) service() {
|
|||||||
case <-poll:
|
case <-poll:
|
||||||
s.poll()
|
s.poll()
|
||||||
case <-s.closeCh:
|
case <-s.closeCh:
|
||||||
s.lk.Lock()
|
s.waitingLk.Lock()
|
||||||
for _, c := range s.incoming {
|
for _, c := range s.incoming {
|
||||||
close(c)
|
close(c)
|
||||||
}
|
}
|
||||||
s.lk.Unlock()
|
s.waitingLk.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) AddPiece(ref string, size uint64, r io.Reader) (sectorID uint64, err error) {
|
func (s *Store) AddPiece(ref string, size uint64, r io.Reader, dealID uint64) (sectorID uint64, err error) {
|
||||||
sectorID, err = s.sb.AddPiece(ref, size, r)
|
sectorID, err = s.sb.AddPiece(ref, size, r)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.lk.Lock()
|
s.waitingLk.Lock()
|
||||||
_, exists := s.waiting[sectorID]
|
_, exists := s.waiting[sectorID]
|
||||||
if !exists { // pieces can share sectors
|
if !exists { // pieces can share sectors
|
||||||
s.waiting[sectorID] = make(chan struct{})
|
s.waiting[sectorID] = make(chan struct{})
|
||||||
}
|
}
|
||||||
s.lk.Unlock()
|
s.waitingLk.Unlock()
|
||||||
|
|
||||||
|
s.dealsLk.Lock()
|
||||||
|
defer s.dealsLk.Unlock()
|
||||||
|
|
||||||
|
k := datastore.NewKey(fmt.Sprint(sectorID))
|
||||||
|
e, err := s.deals.Get(k)
|
||||||
|
var deals dealMapping
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
if err := cbor.DecodeInto(e, &deals); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if deals.Committed {
|
||||||
|
return 0, xerrors.Errorf("sector %d already committed", sectorID)
|
||||||
|
}
|
||||||
|
fallthrough
|
||||||
|
case datastore.ErrNotFound:
|
||||||
|
deals.DealIDs = append(deals.DealIDs, dealID)
|
||||||
|
d, err := cbor.DumpObject(&deals)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if err := s.deals.Put(k, d); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
return sectorID, nil
|
return sectorID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Store) DealsForCommit(sectorID uint64) ([]uint64, error) {
|
||||||
|
s.dealsLk.Lock()
|
||||||
|
defer s.dealsLk.Unlock()
|
||||||
|
|
||||||
|
k := datastore.NewKey(fmt.Sprint(sectorID))
|
||||||
|
e, err := s.deals.Get(k)
|
||||||
|
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
var deals dealMapping
|
||||||
|
if err := cbor.DecodeInto(e, &deals); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if deals.Committed {
|
||||||
|
log.Errorf("getting deal IDs for sector %d: sector already marked as committed", sectorID)
|
||||||
|
}
|
||||||
|
|
||||||
|
deals.Committed = true
|
||||||
|
d, err := cbor.DumpObject(&deals)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := s.deals.Put(k, d); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return deals.DealIDs, nil
|
||||||
|
case datastore.ErrNotFound:
|
||||||
|
log.Errorf("getting deal IDs for sector %d failed: %s", err)
|
||||||
|
return []uint64{}, nil
|
||||||
|
default:
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Store) CloseIncoming(c <-chan sectorbuilder.SectorSealingStatus) {
|
func (s *Store) CloseIncoming(c <-chan sectorbuilder.SectorSealingStatus) {
|
||||||
s.lk.Lock()
|
s.waitingLk.Lock()
|
||||||
var at = -1
|
var at = -1
|
||||||
for i, ch := range s.incoming {
|
for i, ch := range s.incoming {
|
||||||
if ch == c {
|
if ch == c {
|
||||||
@ -126,7 +209,7 @@ func (s *Store) CloseIncoming(c <-chan sectorbuilder.SectorSealingStatus) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if at == -1 {
|
if at == -1 {
|
||||||
s.lk.Unlock()
|
s.waitingLk.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if len(s.incoming) > 1 {
|
if len(s.incoming) > 1 {
|
||||||
@ -135,21 +218,21 @@ func (s *Store) CloseIncoming(c <-chan sectorbuilder.SectorSealingStatus) {
|
|||||||
s.incoming[last] = nil
|
s.incoming[last] = nil
|
||||||
}
|
}
|
||||||
s.incoming = s.incoming[:len(s.incoming)-1]
|
s.incoming = s.incoming[:len(s.incoming)-1]
|
||||||
s.lk.Unlock()
|
s.waitingLk.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) Incoming() <-chan sectorbuilder.SectorSealingStatus {
|
func (s *Store) Incoming() <-chan sectorbuilder.SectorSealingStatus {
|
||||||
ch := make(chan sectorbuilder.SectorSealingStatus, 8)
|
ch := make(chan sectorbuilder.SectorSealingStatus, 8)
|
||||||
s.lk.Lock()
|
s.waitingLk.Lock()
|
||||||
s.incoming = append(s.incoming, ch)
|
s.incoming = append(s.incoming, ch)
|
||||||
s.lk.Unlock()
|
s.waitingLk.Unlock()
|
||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) WaitSeal(ctx context.Context, sector uint64) (sectorbuilder.SectorSealingStatus, error) {
|
func (s *Store) WaitSeal(ctx context.Context, sector uint64) (sectorbuilder.SectorSealingStatus, error) {
|
||||||
s.lk.Lock()
|
s.waitingLk.Lock()
|
||||||
watch, ok := s.waiting[sector]
|
watch, ok := s.waiting[sector]
|
||||||
s.lk.Unlock()
|
s.waitingLk.Unlock()
|
||||||
if ok {
|
if ok {
|
||||||
select {
|
select {
|
||||||
case <-watch:
|
case <-watch:
|
||||||
|
@ -153,7 +153,7 @@ func (r *refStorer) Read(p []byte) (n int, err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (st *SectorBlocks) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, keepAtLeast uint64) (sectorID uint64, err error) {
|
func (st *SectorBlocks) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, dealID uint64) (sectorID uint64, err error) {
|
||||||
size, err := r.Size()
|
size, err := r.Size()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@ -166,7 +166,7 @@ func (st *SectorBlocks) AddUnixfsPiece(ref cid.Cid, r UnixfsReader, keepAtLeast
|
|||||||
intermediate: st.intermediate,
|
intermediate: st.intermediate,
|
||||||
}
|
}
|
||||||
|
|
||||||
return st.Store.AddPiece(refst.pieceRef, uint64(size), refst)
|
return st.Store.AddPiece(refst.pieceRef, uint64(size), refst, dealID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) {
|
func (st *SectorBlocks) List() (map[cid.Cid][]api.SealedRef, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user