storageminer: Drop commitment tracker
This commit is contained in:
parent
a4c0610cc3
commit
46d782b30b
10
api/api.go
10
api/api.go
@ -168,8 +168,6 @@ type StorageMiner interface {
|
||||
SectorsList(context.Context) ([]uint64, error)
|
||||
|
||||
SectorsRefs(context.Context) (map[string][]SealedRef, error)
|
||||
|
||||
CommitmentsList(context.Context) ([]SectorCommitment, error)
|
||||
}
|
||||
|
||||
// Version provides various build-time information
|
||||
@ -332,14 +330,6 @@ type SyncState struct {
|
||||
Height uint64
|
||||
}
|
||||
|
||||
type SectorCommitment struct {
|
||||
SectorID uint64
|
||||
Miner address.Address
|
||||
|
||||
CommitMsg cid.Cid
|
||||
DealIDs []uint64
|
||||
}
|
||||
|
||||
type SyncStateStage int
|
||||
|
||||
const (
|
||||
|
@ -133,8 +133,6 @@ type StorageMinerStruct struct {
|
||||
SectorsList func(context.Context) ([]uint64, error) `perm:"read"`
|
||||
|
||||
SectorsRefs func(context.Context) (map[string][]SealedRef, error) `perm:"read"`
|
||||
|
||||
CommitmentsList func(context.Context) ([]SectorCommitment, error) `perm:"read"`
|
||||
}
|
||||
}
|
||||
|
||||
@ -481,10 +479,6 @@ func (c *StorageMinerStruct) SectorsRefs(ctx context.Context) (map[string][]Seal
|
||||
return c.Internal.SectorsRefs(ctx)
|
||||
}
|
||||
|
||||
func (c *StorageMinerStruct) CommitmentsList(ctx context.Context) ([]SectorCommitment, error) {
|
||||
return c.Internal.CommitmentsList(ctx)
|
||||
}
|
||||
|
||||
var _ Common = &CommonStruct{}
|
||||
var _ FullNode = &FullNodeStruct{}
|
||||
var _ StorageMiner = &StorageMinerStruct{}
|
||||
|
@ -17,7 +17,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/cborrpc"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/storage/commitment"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
)
|
||||
|
||||
@ -43,7 +42,6 @@ type Provider struct {
|
||||
askLk sync.Mutex
|
||||
|
||||
secst *sectorblocks.SectorBlocks
|
||||
commt *commitment.Tracker
|
||||
full api.FullNode
|
||||
|
||||
// TODO: Use a custom protocol or graphsync in the future
|
||||
@ -70,7 +68,7 @@ type minerDealUpdate struct {
|
||||
mut func(*MinerDeal)
|
||||
}
|
||||
|
||||
func NewProvider(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, commt *commitment.Tracker, dag dtypes.StagingDAG, fullNode api.FullNode) (*Provider, error) {
|
||||
func NewProvider(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, dag dtypes.StagingDAG, fullNode api.FullNode) (*Provider, error) {
|
||||
addr, err := ds.Get(datastore.NewKey("miner-address"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -82,7 +80,6 @@ func NewProvider(ds dtypes.MetadataDS, secst *sectorblocks.SectorBlocks, commt *
|
||||
|
||||
h := &Provider{
|
||||
secst: secst,
|
||||
commt: commt,
|
||||
dag: dag,
|
||||
full: fullNode,
|
||||
|
||||
|
@ -290,12 +290,14 @@ func (p *Provider) sealing(ctx context.Context, deal MinerDeal) (func(*MinerDeal
|
||||
|
||||
func (p *Provider) complete(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
|
||||
// TODO: Add dealID to commtracker (probably before sealing)
|
||||
mcid, err := p.commt.WaitCommit(ctx, deal.Proposal.Provider, deal.SectorID)
|
||||
/*mcid, err := p.commt.WaitCommit(ctx, deal.Proposal.Provider, deal.SectorID)
|
||||
if err != nil {
|
||||
log.Warnf("Waiting for sector commitment message: %s", err)
|
||||
}
|
||||
}*/
|
||||
|
||||
err = p.sendSignedResponse(&Response{
|
||||
panic("fixme")
|
||||
|
||||
/*err = p.sendSignedResponse(&Response{
|
||||
State: api.DealComplete,
|
||||
Proposal: deal.ProposalCid,
|
||||
|
||||
@ -303,7 +305,7 @@ func (p *Provider) complete(ctx context.Context, deal MinerDeal) (func(*MinerDea
|
||||
})
|
||||
if err != nil {
|
||||
log.Warnf("Sending deal response failed: %s", err)
|
||||
}
|
||||
}*/
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -1,41 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
|
||||
"gopkg.in/urfave/cli.v2"
|
||||
)
|
||||
|
||||
var commitmentsCmd = &cli.Command{
|
||||
Name: "commitments",
|
||||
Usage: "interact with commitment tracker",
|
||||
Subcommands: []*cli.Command{
|
||||
commitmentsListCmd,
|
||||
},
|
||||
}
|
||||
|
||||
var commitmentsListCmd = &cli.Command{
|
||||
Name: "list",
|
||||
Usage: "List tracked sector commitments",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
api, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
comms, err := api.CommitmentsList(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, comm := range comms {
|
||||
fmt.Printf("%s:%d msg:%s, deals: %v\n", comm.Miner, comm.SectorID, comm.CommitMsg, comm.DealIDs)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
@ -26,7 +26,6 @@ func main() {
|
||||
infoCmd,
|
||||
storeGarbageCmd,
|
||||
sectorsCmd,
|
||||
commitmentsCmd,
|
||||
}
|
||||
jaeger := tracing.SetupJaegerTracing("lotus")
|
||||
defer func() {
|
||||
|
@ -40,7 +40,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/retrieval"
|
||||
"github.com/filecoin-project/lotus/retrieval/discovery"
|
||||
"github.com/filecoin-project/lotus/storage"
|
||||
"github.com/filecoin-project/lotus/storage/commitment"
|
||||
"github.com/filecoin-project/lotus/storage/sector"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
)
|
||||
@ -234,7 +233,6 @@ func Online() Option {
|
||||
Override(new(*sectorbuilder.SectorBuilder), sectorbuilder.New),
|
||||
Override(new(*sector.Store), sector.NewStore),
|
||||
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
|
||||
Override(new(*commitment.Tracker), commitment.NewTracker),
|
||||
Override(new(sector.TicketFn), modules.SealTicketGen),
|
||||
Override(new(*storage.Miner), modules.StorageMiner),
|
||||
|
||||
|
@ -10,7 +10,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/address"
|
||||
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
||||
"github.com/filecoin-project/lotus/storage"
|
||||
"github.com/filecoin-project/lotus/storage/commitment"
|
||||
"github.com/filecoin-project/lotus/storage/sector"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
|
||||
@ -24,7 +23,6 @@ type StorageMinerAPI struct {
|
||||
SectorBuilder *sectorbuilder.SectorBuilder
|
||||
Sectors *sector.Store
|
||||
SectorBlocks *sectorblocks.SectorBlocks
|
||||
CommitmentTracker *commitment.Tracker
|
||||
|
||||
Miner *storage.Miner
|
||||
}
|
||||
@ -83,8 +81,4 @@ func (sm *StorageMinerAPI) SectorsRefs(context.Context) (map[string][]api.Sealed
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) CommitmentsList(ctx context.Context) ([]api.SectorCommitment, error) {
|
||||
return sm.CommitmentTracker.List()
|
||||
}
|
||||
|
||||
var _ api.StorageMiner = &StorageMinerAPI{}
|
||||
|
@ -26,7 +26,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/retrieval"
|
||||
"github.com/filecoin-project/lotus/storage"
|
||||
"github.com/filecoin-project/lotus/storage/commitment"
|
||||
"github.com/filecoin-project/lotus/storage/sector"
|
||||
)
|
||||
|
||||
@ -72,13 +71,13 @@ func SectorBuilderConfig(storagePath string) func(dtypes.MetadataDS, api.FullNod
|
||||
}
|
||||
}
|
||||
|
||||
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, secst *sector.Store, commt *commitment.Tracker) (*storage.Miner, error) {
|
||||
func StorageMiner(mctx helpers.MetricsCtx, lc fx.Lifecycle, api api.FullNode, h host.Host, ds dtypes.MetadataDS, secst *sector.Store) (*storage.Miner, error) {
|
||||
maddr, err := minerAddrFromDS(ds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sm, err := storage.NewMiner(api, maddr, h, ds, secst, commt)
|
||||
sm, err := storage.NewMiner(api, maddr, h, ds, secst)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1,198 +0,0 @@
|
||||
package commitment
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
cbor "github.com/ipfs/go-ipld-cbor"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/address"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
dsq "github.com/ipfs/go-datastore/query"
|
||||
)
|
||||
|
||||
var log = logging.Logger("commitment")
|
||||
|
||||
func init() {
|
||||
cbor.RegisterCborType(commitment{})
|
||||
}
|
||||
|
||||
var commitmentDsPrefix = datastore.NewKey("/commitments")
|
||||
|
||||
type Tracker struct {
|
||||
commitments datastore.Datastore
|
||||
|
||||
lk sync.Mutex
|
||||
|
||||
waits map[datastore.Key]chan struct{}
|
||||
}
|
||||
|
||||
func NewTracker(ds dtypes.MetadataDS) *Tracker {
|
||||
return &Tracker{
|
||||
commitments: namespace.Wrap(ds, commitmentDsPrefix),
|
||||
waits: map[datastore.Key]chan struct{}{},
|
||||
}
|
||||
}
|
||||
|
||||
type commitment struct {
|
||||
DealIDs []uint64
|
||||
Msg cid.Cid
|
||||
}
|
||||
|
||||
func commitmentKey(miner address.Address, sectorId uint64) datastore.Key {
|
||||
return commitmentDsPrefix.ChildString(miner.String()).ChildString(fmt.Sprintf("%d", sectorId))
|
||||
}
|
||||
|
||||
func (ct *Tracker) TrackCommitSectorMsg(miner address.Address, sectorId uint64, commitMsg cid.Cid) error {
|
||||
key := commitmentKey(miner, sectorId)
|
||||
|
||||
ct.lk.Lock()
|
||||
defer ct.lk.Unlock()
|
||||
|
||||
tracking, err := ct.commitments.Get(key)
|
||||
switch err {
|
||||
case nil:
|
||||
var comm commitment
|
||||
if err := cbor.DecodeInto(tracking, &comm); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !comm.Msg.Equals(commitMsg) {
|
||||
log.Errorf("commitment tracking for miner %s, sector %d: already tracking %s, got another commitment message: %s", miner, sectorId, comm.Msg, commitMsg)
|
||||
}
|
||||
|
||||
log.Warnf("commitment.TrackCommitSectorMsg called more than once for miner %s, sector %d, message %s", miner, sectorId, commitMsg)
|
||||
|
||||
// we still want to store it
|
||||
fallthrough // TODO: ideally we'd keep around both (even though we'll
|
||||
// usually only need the new one)
|
||||
case datastore.ErrNotFound:
|
||||
comm := &commitment{Msg: commitMsg}
|
||||
commB, err := cbor.DumpObject(comm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := ct.commitments.Put(key, commB); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
waits, ok := ct.waits[key]
|
||||
if ok {
|
||||
close(waits)
|
||||
delete(ct.waits, key)
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (ct *Tracker) WaitCommit(ctx context.Context, miner address.Address, sectorId uint64) (cid.Cid, error) {
|
||||
key := commitmentKey(miner, sectorId)
|
||||
|
||||
ct.lk.Lock()
|
||||
|
||||
tracking, err := ct.commitments.Get(key)
|
||||
if err != datastore.ErrNotFound {
|
||||
ct.lk.Unlock()
|
||||
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
var comm commitment
|
||||
if err := cbor.DecodeInto(tracking, &comm); err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
return comm.Msg, nil
|
||||
}
|
||||
|
||||
wait, ok := ct.waits[key]
|
||||
if !ok {
|
||||
wait = make(chan struct{})
|
||||
ct.waits[key] = wait
|
||||
}
|
||||
|
||||
ct.lk.Unlock()
|
||||
|
||||
select {
|
||||
case <-wait:
|
||||
tracking, err := ct.commitments.Get(key)
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("failed to get commitment after waiting: %w", err)
|
||||
}
|
||||
|
||||
var comm commitment
|
||||
if err := cbor.DecodeInto(tracking, &comm); err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
return comm.Msg, nil
|
||||
case <-ctx.Done():
|
||||
return cid.Undef, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (ct *Tracker) List() ([]api.SectorCommitment, error) {
|
||||
out := make([]api.SectorCommitment, 0)
|
||||
|
||||
ct.lk.Lock()
|
||||
defer ct.lk.Unlock()
|
||||
|
||||
res, err := ct.commitments.Query(dsq.Query{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer res.Close()
|
||||
|
||||
for {
|
||||
res, ok := res.NextSync()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
if res.Error != nil {
|
||||
return nil, xerrors.Errorf("iterating commitments: %w", err)
|
||||
}
|
||||
|
||||
parts := strings.Split(res.Key, "/")
|
||||
if len(parts) != 4 {
|
||||
return nil, xerrors.Errorf("expected commitment key to be 4 parts, Key %s", res.Key)
|
||||
}
|
||||
|
||||
miner, err := address.NewFromString(parts[2])
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("parsing miner address: %w", err)
|
||||
}
|
||||
|
||||
sectorID, err := strconv.ParseInt(parts[3], 10, 64)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("parsing sector id: %w", err)
|
||||
}
|
||||
|
||||
var comm commitment
|
||||
if err := cbor.DecodeInto(res.Value, &comm); err != nil {
|
||||
return nil, xerrors.Errorf("decoding commitment %s (`% X`): %w", res.Key, res.Value, err)
|
||||
}
|
||||
|
||||
out = append(out, api.SectorCommitment{
|
||||
SectorID: uint64(sectorID),
|
||||
Miner: miner,
|
||||
CommitMsg: comm.Msg,
|
||||
DealIDs: comm.DealIDs,
|
||||
})
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
@ -18,7 +18,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/store"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/sectorbuilder"
|
||||
"github.com/filecoin-project/lotus/storage/commitment"
|
||||
"github.com/filecoin-project/lotus/storage/sector"
|
||||
)
|
||||
|
||||
@ -31,7 +30,6 @@ type Miner struct {
|
||||
events *events.Events
|
||||
|
||||
secst *sector.Store
|
||||
commt *commitment.Tracker
|
||||
|
||||
maddr address.Address
|
||||
|
||||
@ -70,7 +68,7 @@ type storageMinerApi interface {
|
||||
WalletHas(context.Context, address.Address) (bool, error)
|
||||
}
|
||||
|
||||
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store, commt *commitment.Tracker) (*Miner, error) {
|
||||
func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datastore.Batching, secst *sector.Store) (*Miner, error) {
|
||||
return &Miner{
|
||||
api: api,
|
||||
|
||||
@ -78,7 +76,6 @@ func NewMiner(api storageMinerApi, addr address.Address, h host.Host, ds datasto
|
||||
h: h,
|
||||
ds: ds,
|
||||
secst: secst,
|
||||
commt: commt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -214,10 +211,6 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
|
||||
m.beginPosting(ctx)
|
||||
}()
|
||||
|
||||
if err := m.commt.TrackCommitSectorMsg(m.maddr, sinfo.SectorID, smsg.Cid()); err != nil {
|
||||
return xerrors.Errorf("tracking sector commitment: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user