Merge remote-tracking branch 'origin/master' into feat/deal-await-precommit

This commit is contained in:
Łukasz Magiera 2020-11-26 11:51:36 +01:00
commit fcec665267
9 changed files with 136 additions and 88 deletions

View File

@ -2,6 +2,7 @@ package api
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"time" "time"
@ -785,6 +786,22 @@ type StartDealParams struct {
VerifiedDeal bool VerifiedDeal bool
} }
func (s *StartDealParams) UnmarshalJSON(raw []byte) (err error) {
type sdpAlias StartDealParams
sdp := sdpAlias{
FastRetrieval: true,
}
if err := json.Unmarshal(raw, &sdp); err != nil {
return err
}
*s = StartDealParams(sdp)
return nil
}
type IpldObject struct { type IpldObject struct {
Cid cid.Cid Cid cid.Cid
Obj interface{} Obj interface{}

View File

@ -466,6 +466,9 @@ var chainInspectUsage = &cli.Command{
code, err := lookupActorCode(m.Message.To) code, err := lookupActorCode(m.Message.To)
if err != nil { if err != nil {
if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) {
continue
}
return err return err
} }

View File

@ -283,29 +283,52 @@ func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
// Caller should hold m.unsealedInfoMap.lk // Caller should hold m.unsealedInfoMap.lk
func (m *Sealing) getSectorAndPadding(ctx context.Context, size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) { func (m *Sealing) getSectorAndPadding(ctx context.Context, size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) {
for k, v := range m.unsealedInfoMap.infos { for tries := 0; tries < 100; tries++ {
pads, padLength := ffiwrapper.GetRequiredPadding(v.stored, size.Padded()) for k, v := range m.unsealedInfoMap.infos {
pads, padLength := ffiwrapper.GetRequiredPadding(v.stored, size.Padded())
if v.stored+size.Padded()+padLength <= abi.PaddedPieceSize(v.ssize) { if v.stored+size.Padded()+padLength <= abi.PaddedPieceSize(v.ssize) {
return k, pads, nil return k, pads, nil
}
} }
if len(m.unsealedInfoMap.infos) > 0 {
log.Infow("tried to put a piece into an open sector, found none with enough space", "open", len(m.unsealedInfoMap.infos), "size", size, "tries", tries)
}
ns, ssize, err := m.newDealSector(ctx)
switch err {
case nil:
m.unsealedInfoMap.infos[ns] = UnsealedSectorInfo{
numDeals: 0,
stored: 0,
pieceSizes: nil,
ssize: ssize,
}
case errTooManySealing:
m.unsealedInfoMap.lk.Unlock()
select {
case <-time.After(2 * time.Second):
case <-ctx.Done():
m.unsealedInfoMap.lk.Lock()
return 0, nil, xerrors.Errorf("getting sector for piece: %w", ctx.Err())
}
m.unsealedInfoMap.lk.Lock()
continue
default:
return 0, nil, xerrors.Errorf("creating new sector: %w", err)
}
return ns, nil, nil
} }
ns, ssize, err := m.newDealSector(ctx) return 0, nil, xerrors.Errorf("failed to allocate piece to a sector")
if err != nil {
return 0, nil, err
}
m.unsealedInfoMap.infos[ns] = UnsealedSectorInfo{
numDeals: 0,
stored: 0,
pieceSizes: nil,
ssize: ssize,
}
return ns, nil, nil
} }
var errTooManySealing = errors.New("too many sectors sealing")
// newDealSector creates a new sector for deal storage // newDealSector creates a new sector for deal storage
func (m *Sealing) newDealSector(ctx context.Context) (abi.SectorNumber, abi.SectorSize, error) { func (m *Sealing) newDealSector(ctx context.Context) (abi.SectorNumber, abi.SectorSize, error) {
// First make sure we don't have too many 'open' sectors // First make sure we don't have too many 'open' sectors
@ -321,47 +344,34 @@ func (m *Sealing) newDealSector(ctx context.Context) (abi.SectorNumber, abi.Sect
} }
} }
if cfg.MaxWaitDealsSectors > 0 { if cfg.MaxWaitDealsSectors > 0 && uint64(len(m.unsealedInfoMap.infos)) >= cfg.MaxWaitDealsSectors {
// run in a loop because we have to drop the map lock here for a bit // Too many sectors are sealing in parallel. Start sealing one, and retry
tries := 0 // allocating the piece to a sector (we're dropping the lock here, so in
// case other goroutines are also trying to create a sector, we retry in
// getSectorAndPadding instead of here - otherwise if we have lots of
// parallel deals in progress, we can start creating a ton of sectors
// with just a single deal in them)
var mostStored abi.PaddedPieceSize = math.MaxUint64
var best abi.SectorNumber = math.MaxUint64
// we have to run in a loop as we're dropping unsealedInfoMap.lk for sn, info := range m.unsealedInfoMap.infos {
// to actually call StartPacking. When we do that, another entry can if info.stored+1 > mostStored+1 { // 18446744073709551615 + 1 = 0
// get added to unsealedInfoMap. best = sn
for uint64(len(m.unsealedInfoMap.infos)) >= cfg.MaxWaitDealsSectors {
if tries > 10 {
// whatever...
break
} }
if tries > 0 {
m.unsealedInfoMap.lk.Unlock()
time.Sleep(time.Second)
m.unsealedInfoMap.lk.Lock()
}
tries++
var mostStored abi.PaddedPieceSize = math.MaxUint64
var best abi.SectorNumber = math.MaxUint64
for sn, info := range m.unsealedInfoMap.infos {
if info.stored+1 > mostStored+1 { // 18446744073709551615 + 1 = 0
best = sn
}
}
if best == math.MaxUint64 {
// probably not possible, but who knows
break
}
m.unsealedInfoMap.lk.Unlock()
if err := m.StartPacking(best); err != nil {
log.Errorf("newDealSector StartPacking error: %+v", err)
continue // let's pretend this is fine
}
m.unsealedInfoMap.lk.Lock()
} }
if best != math.MaxUint64 {
m.unsealedInfoMap.lk.Unlock()
err := m.StartPacking(best)
m.unsealedInfoMap.lk.Lock()
if err != nil {
log.Errorf("newDealSector StartPacking error: %+v", err)
// let's pretend this is fine
}
}
return 0, 0, errTooManySealing // will wait a bit and retry
} }
spt, err := m.currentSealProof(ctx) spt, err := m.currentSealProof(ctx)

2
go.mod
View File

@ -69,7 +69,7 @@ require (
github.com/ipfs/go-ds-pebble v0.0.2-0.20200921225637-ce220f8ac459 github.com/ipfs/go-ds-pebble v0.0.2-0.20200921225637-ce220f8ac459
github.com/ipfs/go-filestore v1.0.0 github.com/ipfs/go-filestore v1.0.0
github.com/ipfs/go-fs-lock v0.0.6 github.com/ipfs/go-fs-lock v0.0.6
github.com/ipfs/go-graphsync v0.5.0 github.com/ipfs/go-graphsync v0.5.1
github.com/ipfs/go-ipfs-blockstore v1.0.3 github.com/ipfs/go-ipfs-blockstore v1.0.3
github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v1.0.0 github.com/ipfs/go-ipfs-ds-help v1.0.0

3
go.sum
View File

@ -557,6 +557,9 @@ github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZ
github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY= github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY=
github.com/ipfs/go-graphsync v0.5.0 h1:iaByvxq88Ys1KcaQzTS1wmRhNsNEo3SaUiSGqTSbGmM= github.com/ipfs/go-graphsync v0.5.0 h1:iaByvxq88Ys1KcaQzTS1wmRhNsNEo3SaUiSGqTSbGmM=
github.com/ipfs/go-graphsync v0.5.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= github.com/ipfs/go-graphsync v0.5.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-graphsync v0.5.1 h1:4fXBRvRKicTgTmCFMmEua/H5jvmAOLgU9Z7PCPWt2ec=
github.com/ipfs/go-graphsync v0.5.1/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk=
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk= github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=

View File

@ -423,6 +423,8 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
return nil, xerrors.Errorf("failed to compute winning post proof: %w", err) return nil, xerrors.Errorf("failed to compute winning post proof: %w", err)
} }
tProof := build.Clock.Now()
// get pending messages early, // get pending messages early,
msgs, err := m.api.MpoolSelect(context.TODO(), base.TipSet.Key(), ticket.Quality()) msgs, err := m.api.MpoolSelect(context.TODO(), base.TipSet.Key(), ticket.Quality())
if err != nil { if err != nil {
@ -451,7 +453,8 @@ func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg,
"tPowercheck ", tPowercheck.Sub(tDrand), "tPowercheck ", tPowercheck.Sub(tDrand),
"tTicket ", tTicket.Sub(tPowercheck), "tTicket ", tTicket.Sub(tPowercheck),
"tSeed ", tSeed.Sub(tTicket), "tSeed ", tSeed.Sub(tTicket),
"tPending ", tPending.Sub(tSeed), "tProof ", tProof.Sub(tSeed),
"tPending ", tPending.Sub(tProof),
"tCreateBlock ", tCreateBlock.Sub(tPending)) "tCreateBlock ", tCreateBlock.Sub(tPending))
} }

View File

@ -290,7 +290,7 @@ func Online() Option {
Override(new(exchange.Server), exchange.NewServer), Override(new(exchange.Server), exchange.NewServer),
Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr), Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr),
Override(new(dtypes.Graphsync), modules.Graphsync), Override(new(dtypes.Graphsync), modules.Graphsync(config.DefaultFullNode().Client.SimultaneousTransfers)),
Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)), Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)),
Override(new(*discoveryimpl.Local), modules.NewLocalDiscovery), Override(new(*discoveryimpl.Local), modules.NewLocalDiscovery),
Override(new(discovery.PeerResolver), modules.RetrievalResolver), Override(new(discovery.PeerResolver), modules.RetrievalResolver),
@ -465,12 +465,15 @@ func ConfigFullNode(c interface{}) Option {
ipfsMaddr := cfg.Client.IpfsMAddr ipfsMaddr := cfg.Client.IpfsMAddr
return Options( return Options(
ConfigCommon(&cfg.Common), ConfigCommon(&cfg.Common),
If(cfg.Client.UseIpfs, If(cfg.Client.UseIpfs,
Override(new(dtypes.ClientBlockstore), modules.IpfsClientBlockstore(ipfsMaddr, cfg.Client.IpfsOnlineMode)), Override(new(dtypes.ClientBlockstore), modules.IpfsClientBlockstore(ipfsMaddr, cfg.Client.IpfsOnlineMode)),
If(cfg.Client.IpfsUseForRetrieval, If(cfg.Client.IpfsUseForRetrieval,
Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientBlockstoreRetrievalStoreManager), Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientBlockstoreRetrievalStoreManager),
), ),
), ),
Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfers)),
If(cfg.Metrics.HeadNotifs, If(cfg.Metrics.HeadNotifs,
Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)), Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)),
), ),

View File

@ -105,10 +105,11 @@ type Metrics struct {
} }
type Client struct { type Client struct {
UseIpfs bool UseIpfs bool
IpfsOnlineMode bool IpfsOnlineMode bool
IpfsMAddr string IpfsMAddr string
IpfsUseForRetrieval bool IpfsUseForRetrieval bool
SimultaneousTransfers uint64
} }
type Wallet struct { type Wallet struct {
@ -149,6 +150,7 @@ func defCommon() Common {
} }
var DefaultDefaultMaxFee = types.MustParseFIL("0.007") var DefaultDefaultMaxFee = types.MustParseFIL("0.007")
var DefaultSimultaneousTransfers = uint64(20)
// DefaultFullNode returns the default config // DefaultFullNode returns the default config
func DefaultFullNode() *FullNode { func DefaultFullNode() *FullNode {
@ -157,6 +159,9 @@ func DefaultFullNode() *FullNode {
Fees: FeeConfig{ Fees: FeeConfig{
DefaultMaxFee: DefaultDefaultMaxFee, DefaultMaxFee: DefaultDefaultMaxFee,
}, },
Client: Client{
SimultaneousTransfers: DefaultSimultaneousTransfers,
},
} }
} }

View File

@ -3,6 +3,7 @@ package modules
import ( import (
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
"github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync"
graphsyncimpl "github.com/ipfs/go-graphsync/impl" graphsyncimpl "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network" gsnet "github.com/ipfs/go-graphsync/network"
@ -13,31 +14,34 @@ import (
) )
// Graphsync creates a graphsync instance from the given loader and storer // Graphsync creates a graphsync instance from the given loader and storer
func Graphsync(mctx helpers.MetricsCtx, lc fx.Lifecycle, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) { func Graphsync(parallelTransfers uint64) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) {
graphsyncNetwork := gsnet.NewFromLibp2pHost(h) return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, r repo.LockedRepo, clientBs dtypes.ClientBlockstore, chainBs dtypes.ChainBlockstore, h host.Host) (dtypes.Graphsync, error) {
loader := storeutil.LoaderForBlockstore(clientBs) graphsyncNetwork := gsnet.NewFromLibp2pHost(h)
storer := storeutil.StorerForBlockstore(clientBs) loader := storeutil.LoaderForBlockstore(clientBs)
gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsyncimpl.RejectAllRequestsByDefault()) storer := storeutil.StorerForBlockstore(clientBs)
chainLoader := storeutil.LoaderForBlockstore(chainBs)
chainStorer := storeutil.StorerForBlockstore(chainBs) gs := graphsyncimpl.New(helpers.LifecycleCtx(mctx, lc), graphsyncNetwork, loader, storer, graphsyncimpl.RejectAllRequestsByDefault(), graphsyncimpl.MaxInProgressRequests(parallelTransfers))
err := gs.RegisterPersistenceOption("chainstore", chainLoader, chainStorer) chainLoader := storeutil.LoaderForBlockstore(chainBs)
if err != nil { chainStorer := storeutil.StorerForBlockstore(chainBs)
return nil, err err := gs.RegisterPersistenceOption("chainstore", chainLoader, chainStorer)
if err != nil {
return nil, err
}
gs.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
_, has := requestData.Extension("chainsync")
if has {
// TODO: we should confirm the selector is a reasonable one before we validate
// TODO: this code will get more complicated and should probably not live here eventually
hookActions.ValidateRequest()
hookActions.UsePersistenceOption("chainstore")
}
})
gs.RegisterOutgoingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) {
_, has := requestData.Extension("chainsync")
if has {
hookActions.UsePersistenceOption("chainstore")
}
})
return gs, nil
} }
gs.RegisterIncomingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
_, has := requestData.Extension("chainsync")
if has {
// TODO: we should confirm the selector is a reasonable one before we validate
// TODO: this code will get more complicated and should probably not live here eventually
hookActions.ValidateRequest()
hookActions.UsePersistenceOption("chainstore")
}
})
gs.RegisterOutgoingRequestHook(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) {
_, has := requestData.Extension("chainsync")
if has {
hookActions.UsePersistenceOption("chainstore")
}
})
return gs, nil
} }