lotus/node/builder.go

543 lines
20 KiB
Go
Raw Normal View History

package node
import (
"context"
2019-07-04 15:50:48 +00:00
"errors"
"time"
2020-01-29 18:10:41 +00:00
logging "github.com/ipfs/go-log"
ci "github.com/libp2p/go-libp2p-core/crypto"
2019-07-05 10:06:28 +00:00
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
2019-07-05 10:06:28 +00:00
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
2019-07-08 14:07:09 +00:00
pubsub "github.com/libp2p/go-libp2p-pubsub"
2019-07-05 10:06:28 +00:00
record "github.com/libp2p/go-libp2p-record"
"github.com/multiformats/go-multiaddr"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/discovery"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
2020-03-06 05:30:47 +00:00
storage2 "github.com/filecoin-project/specs-storage/storage"
2020-01-13 20:47:27 +00:00
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/exchange"
2019-11-25 04:45:13 +00:00
"github.com/filecoin-project/lotus/chain/gen"
2020-08-06 01:16:54 +00:00
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
2019-11-08 20:11:56 +00:00
"github.com/filecoin-project/lotus/chain/market"
2019-12-01 23:11:43 +00:00
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/metrics"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
2020-01-13 20:47:27 +00:00
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/chain/wallet"
2020-08-17 13:39:33 +00:00
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
2020-09-07 14:12:46 +00:00
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
2020-08-17 13:39:33 +00:00
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/lib/blockstore"
2020-02-22 11:36:22 +00:00
"github.com/filecoin-project/lotus/lib/peermgr"
_ "github.com/filecoin-project/lotus/lib/sigs/bls"
_ "github.com/filecoin-project/lotus/lib/sigs/secp"
2020-08-06 01:16:54 +00:00
"github.com/filecoin-project/lotus/markets/dealfilter"
"github.com/filecoin-project/lotus/markets/storageadapter"
"github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/impl/common"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/modules/lp2p"
"github.com/filecoin-project/lotus/node/modules/testing"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/paychmgr"
"github.com/filecoin-project/lotus/paychmgr/settler"
"github.com/filecoin-project/lotus/storage"
"github.com/filecoin-project/lotus/storage/sectorblocks"
)
//nolint:deadcode,varcheck
2020-01-29 18:10:41 +00:00
var log = logging.Logger("builder")
2019-07-04 15:50:48 +00:00
// special is a type used to give keys to modules which
// can't really be identified by the returned type
type special struct{ id int }
//nolint:golint
2019-07-04 15:50:48 +00:00
var (
2020-04-28 13:21:56 +00:00
DefaultTransportsKey = special{0} // Libp2p option
DiscoveryHandlerKey = special{2} // Private type
AddrsFactoryKey = special{3} // Libp2p option
SmuxTransportKey = special{4} // Libp2p option
RelayKey = special{5} // Libp2p option
SecurityKey = special{6} // Libp2p option
BaseRoutingKey = special{7} // fx groups + multiret
NatPortMapKey = special{8} // Libp2p option
ConnectionManagerKey = special{9} // Libp2p option
2020-04-27 17:20:39 +00:00
AutoNATSvcKey = special{10} // Libp2p option
BandwidthReporterKey = special{11} // Libp2p option
2019-07-04 15:50:48 +00:00
)
type invoke int
2019-07-08 13:36:43 +00:00
//nolint:golint
2019-07-04 15:50:48 +00:00
const (
2019-07-08 13:36:43 +00:00
// libp2p
2019-07-04 20:06:02 +00:00
2019-07-08 13:36:43 +00:00
PstoreAddSelfKeysKey = invoke(iota)
2019-07-04 15:50:48 +00:00
StartListeningKey
2019-10-11 00:31:06 +00:00
BootstrapKey
2019-07-04 15:50:48 +00:00
2019-07-08 13:36:43 +00:00
// filecoin
2019-07-09 22:58:51 +00:00
SetGenesisKey
2019-07-08 14:07:09 +00:00
2019-07-08 13:36:43 +00:00
RunHelloKey
RunChainExchangeKey
RunChainGraphsync
2019-10-17 08:57:56 +00:00
RunPeerMgrKey
2019-07-08 14:07:09 +00:00
HandleIncomingBlocksKey
HandleIncomingMessagesKey
2019-07-08 13:36:43 +00:00
2020-07-28 23:16:47 +00:00
HandlePaymentChannelManagerKey
2019-08-14 20:27:10 +00:00
// miner
2019-12-04 19:44:15 +00:00
GetParamsKey
2019-08-02 16:25:10 +00:00
HandleDealsKey
2019-08-26 13:45:36 +00:00
HandleRetrievalKey
2019-08-14 20:27:10 +00:00
RunSectorServiceKey
2019-08-01 17:12:41 +00:00
// daemon
2019-07-23 22:34:13 +00:00
ExtractApiKey
2019-10-10 11:07:00 +00:00
HeadMetricsKey
SettlePaymentChannelsKey
2019-12-17 16:28:02 +00:00
RunPeerTaggerKey
JournalKey
2019-07-23 22:34:13 +00:00
2019-07-10 17:28:49 +00:00
SetApiEndpointKey
2019-07-04 15:50:48 +00:00
_nInvokes // keep this last
)
2019-07-10 13:06:04 +00:00
type Settings struct {
2019-07-04 20:06:02 +00:00
// modules is a map of constructors for DI
//
// In most cases the index will be a reflect. Type of element returned by
// the constructor, but for some 'constructors' it's hard to specify what's
// the return type should be (or the constructor returns fx group)
2019-07-04 15:50:48 +00:00
modules map[interface{}]fx.Option
// invokes are separate from modules as they can't be referenced by return
// type, and must be applied in correct order
invokes []fx.Option
nodeType repo.RepoType
2019-07-19 09:23:24 +00:00
2019-07-10 13:06:04 +00:00
Online bool // Online option applied
Config bool // Config option applied
2020-05-14 01:10:49 +00:00
2019-07-04 15:50:48 +00:00
}
2019-07-09 17:03:36 +00:00
func defaults() []Option {
return []Option{
Override(new(helpers.MetricsCtx), context.Background),
Override(new(record.Validator), modules.RecordValidator),
2020-05-14 01:10:49 +00:00
Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(false)),
Override(new(dtypes.ShutdownChan), make(chan struct{})),
Override(JournalKey, modules.SetupJournal),
2019-07-24 21:10:27 +00:00
// Filecoin modules
2019-07-09 17:03:36 +00:00
}
2019-07-04 15:50:48 +00:00
}
2019-07-19 09:23:24 +00:00
func libp2p() Option {
2019-07-04 15:50:48 +00:00
return Options(
Override(new(peerstore.Peerstore), pstoremem.NewPeerstore),
Override(DefaultTransportsKey, lp2p.DefaultTransports),
Override(new(lp2p.RawHost), lp2p.Host),
Override(new(host.Host), lp2p.RoutedHost),
Override(new(lp2p.BaseIpfsRouting), lp2p.DHTRouting(dht.ModeAuto)),
2019-07-04 15:50:48 +00:00
Override(DiscoveryHandlerKey, lp2p.DiscoveryHandler),
Override(AddrsFactoryKey, lp2p.AddrsFactory(nil, nil)),
Override(SmuxTransportKey, lp2p.SmuxTransport(true)),
2020-05-25 10:39:50 +00:00
Override(RelayKey, lp2p.NoRelay()),
2020-08-20 08:33:19 +00:00
Override(SecurityKey, lp2p.Security(true, false)),
2019-07-04 15:50:48 +00:00
Override(BaseRoutingKey, lp2p.BaseRouting),
Override(new(routing.Routing), lp2p.Routing),
Override(NatPortMapKey, lp2p.NatPortMap),
Override(BandwidthReporterKey, lp2p.BandwidthCounter),
2019-12-17 06:15:06 +00:00
Override(ConnectionManagerKey, lp2p.ConnectionManager(50, 200, 20*time.Second, nil)),
2020-04-22 22:28:01 +00:00
Override(AutoNATSvcKey, lp2p.AutoNATService),
2019-07-04 15:50:48 +00:00
Override(new(*dtypes.ScoreKeeper), lp2p.ScoreKeeper),
2020-05-14 01:10:49 +00:00
Override(new(*pubsub.PubSub), lp2p.GossipSub),
Override(new(*config.Pubsub), func(bs dtypes.Bootstrapper) *config.Pubsub {
return &config.Pubsub{
Bootstrapper: bool(bs),
}
}),
2019-07-08 14:07:09 +00:00
2019-07-04 15:50:48 +00:00
Override(PstoreAddSelfKeysKey, lp2p.PstoreAddSelfKeys),
Override(StartListeningKey, lp2p.StartListening(config.DefaultFullNode().Libp2p.ListenAddresses)),
2019-07-19 09:23:24 +00:00
)
}
func isType(t repo.RepoType) func(s *Settings) bool {
return func(s *Settings) bool { return s.nodeType == t }
}
2019-07-19 09:23:24 +00:00
// Online sets up basic libp2p node
func Online() Option {
return Options(
// make sure that online is applied before Config.
// This is important because Config overrides some of Online units
func(s *Settings) error { s.Online = true; return nil },
ApplyIf(func(s *Settings) bool { return s.Config },
Error(errors.New("the Online option must be set before Config option")),
),
2019-07-08 13:36:43 +00:00
2019-07-19 09:23:24 +00:00
libp2p(),
2019-07-08 13:36:43 +00:00
// common
2020-08-06 01:14:13 +00:00
Override(new(*slashfilter.SlashFilter), modules.NewSlashFilter),
2019-07-19 09:23:24 +00:00
// Full node
2019-07-08 13:36:43 +00:00
2019-11-12 17:59:38 +00:00
ApplyIf(isType(repo.FullNode),
2019-07-23 22:34:13 +00:00
// TODO: Fix offline mode
Override(new(dtypes.BootstrapPeers), modules.BuiltinBootstrap),
Override(new(dtypes.DrandBootstrap), modules.DrandBootstrap),
Override(new(dtypes.DrandSchedule), modules.BuiltinDrandConfig),
2019-07-23 22:34:13 +00:00
Override(HandleIncomingMessagesKey, modules.HandleIncomingMessages),
2020-03-26 02:50:56 +00:00
Override(new(ffiwrapper.Verifier), ffiwrapper.ProofVerifier),
2020-07-18 13:46:47 +00:00
Override(new(vm.SyscallBuilder), vm.Syscalls),
2019-07-26 04:54:22 +00:00
Override(new(*store.ChainStore), modules.ChainStore),
Override(new(*stmgr.StateManager), stmgr.NewStateManager),
2019-08-12 23:54:53 +00:00
Override(new(*wallet.Wallet), wallet.NewWallet),
2019-07-23 22:34:13 +00:00
Override(new(dtypes.ChainGCLocker), blockstore.NewGCLocker),
Override(new(dtypes.ChainGCBlockstore), modules.ChainGCBlockstore),
Override(new(dtypes.ChainBitswap), modules.ChainBitswap),
Override(new(dtypes.ChainBlockService), modules.ChainBlockService),
2019-07-08 13:36:43 +00:00
2019-07-19 09:23:24 +00:00
// Filecoin services
2019-11-15 21:35:29 +00:00
Override(new(*chain.Syncer), modules.NewSyncer),
Override(new(exchange.Client), exchange.NewClient),
2019-12-01 23:11:43 +00:00
Override(new(*messagepool.MessagePool), modules.MessagePool),
2019-07-08 13:36:43 +00:00
2019-07-24 22:49:37 +00:00
Override(new(modules.Genesis), modules.ErrorGenesis),
2020-03-31 23:13:37 +00:00
Override(new(dtypes.AfterGenesisSet), modules.SetGenesis),
Override(SetGenesisKey, modules.DoSetGenesis),
2019-07-19 09:23:24 +00:00
2020-03-31 23:13:37 +00:00
Override(new(dtypes.NetworkName), modules.NetworkName),
2019-07-19 09:23:24 +00:00
Override(new(*hello.Service), hello.NewHelloService),
Override(new(exchange.Server), exchange.NewServer),
2019-10-17 08:57:56 +00:00
Override(new(*peermgr.PeerMgr), peermgr.NewPeerMgr),
Override(new(dtypes.Graphsync), modules.Graphsync),
Override(new(*dtypes.MpoolLocker), new(dtypes.MpoolLocker)),
2019-07-19 09:23:24 +00:00
Override(RunHelloKey, modules.RunHello),
Override(RunChainExchangeKey, modules.RunChainExchange),
2019-10-17 08:57:56 +00:00
Override(RunPeerMgrKey, modules.RunPeerMgr),
2019-07-19 09:23:24 +00:00
Override(HandleIncomingBlocksKey, modules.HandleIncomingBlocks),
2019-08-01 17:12:41 +00:00
Override(new(*discovery.Local), modules.NewLocalDiscovery),
Override(new(retrievalmarket.PeerResolver), modules.RetrievalResolver),
2019-08-26 13:45:36 +00:00
Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient),
2020-03-18 18:57:22 +00:00
Override(new(dtypes.ClientDatastore), modules.NewClientDatastore),
Override(new(dtypes.ClientDataTransfer), modules.NewClientGraphsyncDataTransfer),
2020-07-30 12:31:31 +00:00
Override(new(modules.ClientDealFunds), modules.NewClientDealFunds),
Override(new(storagemarket.StorageClient), modules.StorageClient),
Override(new(storagemarket.StorageClientNode), storageadapter.NewClientNodeAdapter),
Override(new(beacon.Schedule), modules.RandomSchedule),
Override(new(*paychmgr.Store), paychmgr.NewStore),
Override(new(*paychmgr.Manager), paychmgr.NewManager),
Override(new(*market.FundMgr), market.StartFundManager),
2020-07-28 23:16:47 +00:00
Override(HandlePaymentChannelManagerKey, paychmgr.HandleManager),
Override(SettlePaymentChannelsKey, settler.SettlePaymentChannels),
2019-07-19 09:23:24 +00:00
),
// miner
2019-11-12 17:59:38 +00:00
ApplyIf(func(s *Settings) bool { return s.nodeType == repo.StorageMiner },
Override(new(api.Common), From(new(common.CommonAPI))),
Override(new(sectorstorage.StorageAuth), modules.StorageAuth),
2020-03-22 21:39:06 +00:00
Override(new(*stores.Index), stores.NewIndex),
2020-03-18 01:08:11 +00:00
Override(new(stores.SectorIndex), From(new(*stores.Index))),
Override(new(dtypes.MinerID), modules.MinerID),
Override(new(dtypes.MinerAddress), modules.MinerAddress),
2020-03-26 19:34:38 +00:00
Override(new(*ffiwrapper.Config), modules.ProofsConfig),
Override(new(stores.LocalStorage), From(new(repo.LockedRepo))),
2020-03-17 20:19:52 +00:00
Override(new(sealing.SectorIDCounter), modules.SectorIDCounter),
Override(new(*sectorstorage.Manager), modules.SectorStorage),
Override(new(ffiwrapper.Verifier), ffiwrapper.ProofVerifier),
2020-03-04 19:42:49 +00:00
Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))),
Override(new(storage2.Prover), From(new(sectorstorage.SectorManager))),
2020-09-07 14:12:46 +00:00
Override(new(storiface.WorkerReturn), From(new(sectorstorage.SectorManager))),
2020-03-04 19:42:49 +00:00
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
2020-08-12 17:47:00 +00:00
Override(new(*storage.Miner), modules.StorageMiner(config.DefaultStorageMiner().Fees)),
2020-03-31 23:13:37 +00:00
Override(new(dtypes.NetworkName), modules.StorageNetworkName),
2019-08-02 16:25:10 +00:00
Override(new(dtypes.StagingMultiDstore), modules.StagingMultiDatastore),
feat(datatransfer): implement and extract feat(datatransfer): setup implementation path Sets up a path to implementation, offering both the dagservice implementation and a future graphsync implement, establishes message interfaces and network layer, and isolates the datatransfer module from the app WIP using CBOR encoding for dataxfermsg * Bring cbor-gen stuff into datatransfer package * make transferRequest private struct * add transferResponse + funcs * Rename VoucherID to VoucherType * more tests passing WIP trying out some stuff * Embed request/response in message so all the interfaces work AND the CBOR unmarshaling works: this is more like the spec anyway * get rid of pb stuff all message tests passing, some others in datatransfer Some cleanup for PR Cleanup for PR, clarifying and additional comments mod tidy Respond to PR comments: * Make DataTransferRequest/Response be returned in from Net * Regenerate cbor_gen and fix the generator caller so it works better * Please the linters Fix tests Initiate push and pull requests (#536) * add issue link for data TransferID generation * comment out failing but not relevant tests * finish voucher rename from Identifier --> Type tests passing cleanup for PR remove unused fmt import in graphsync_test a better reflection send data transfer response other tests passing feat(datatransfer): milestone 2 infrastructure Setup test path for all tickets for milestone 2 responses alert subscribers when request is not accepted (#607) Graphsync response is scheduled when a valid push request is received (#625) fix(datatransfer): fix tests fix an error with read buffers in tests fix(deps): fix go.sum Feat/dt graphsync pullreqs (#627) * graphsync responses to pull requests Feat/dt initiator cleanup (#645) * ChannelID.To --> ChannelID.Initiator * We now store our peer ID (from host.ID()) so it can be used when creating ChannelIDs. * InProgressChannels returns all of impl.channels, currently just for testing * Implements go-data-transfer issue * Some assertions were changed based on the above. * Renamed some variables and added some assertions based on the new understanding * Updated SHA for graphsync module * Updated fakeGraphSync test structs to use new interfaces from new SHA above Techdebt/dt split graphsync impl receiver (#651) * Split up graphsyncImpl and graphsyncReceiver * rename graphsync to utils DTM sends data over graphsync for validated push requests (#665) * create channels when a request is received. register push request hook with graphsync. fix tests. * better NewReaders * use mutex lock around impl.channels access * fix(datatransfer): fix test uncertainty * fix a data race and also don't use random bytes in basic block which can fail * privatize 3 funcs with @hannahhoward Feat/dt gs pullrequests (#693) * Implements DTM Sends Data Over Graphsync For Validated Pull Requests * rename a field in a test struct * refactor a couple of private functions (one was refactored out of existence) Feat/dt subscribe, file Xfer round trip (#720) Implements the rest of Subscriber Is Notified When Request Completed #24: * send a graphsync message within a go func and consume responses until error or transfer is complete. * notify subscribers of results. * Rename datatransfer.Event to EventCode. * datatransfer.Event is now a struct that includes a message and a timestamp as well as the Event.Code int, formerly Event, update all uses * Add extension data to graphsync request hook, gsReq * rename sendRequest to sendDtRequest, to distinguish it from sendGsRequest, where Dt = datatransfer, Gs = graphsync * use a mutex lock for last transfer ID * obey the linter Don't respond with error in gsReqRcdHook when we can't find the datatransfer extension. (#754) * update to correct graphsync version, update tests & code to call the new graphsync hooks * getExtensionData returns an empty struct + nil if we can't find our extension * Don't respond with error when we can't find the extension. * Test for same * mod tidy minor fix to go.sum feat(datatransfer): switch to graphsync implementation Move over to real graphsync implementation of data transfer, add constructors for graphsync instances on client and miner side fix(datatransfer): Fix validators Validators were checking payload cid against commP -- which are not the same any more. Added a payloadCid to client deal to maintain the record, fixed validator logic Feat/dt extraction use go-fil-components/datatransfer (#770) * Initial commit after changing to go-fil-components/datatransfer * blow away the datatransfer dir * use go-fil-components master after its PR #1 was merged * go mod tidy use a package updates after rebase with master
2019-10-30 02:42:16 +00:00
Override(new(dtypes.StagingBlockstore), modules.StagingBlockstore),
2019-08-06 22:04:21 +00:00
Override(new(dtypes.StagingDAG), modules.StagingDAG),
feat(datatransfer): implement and extract feat(datatransfer): setup implementation path Sets up a path to implementation, offering both the dagservice implementation and a future graphsync implement, establishes message interfaces and network layer, and isolates the datatransfer module from the app WIP using CBOR encoding for dataxfermsg * Bring cbor-gen stuff into datatransfer package * make transferRequest private struct * add transferResponse + funcs * Rename VoucherID to VoucherType * more tests passing WIP trying out some stuff * Embed request/response in message so all the interfaces work AND the CBOR unmarshaling works: this is more like the spec anyway * get rid of pb stuff all message tests passing, some others in datatransfer Some cleanup for PR Cleanup for PR, clarifying and additional comments mod tidy Respond to PR comments: * Make DataTransferRequest/Response be returned in from Net * Regenerate cbor_gen and fix the generator caller so it works better * Please the linters Fix tests Initiate push and pull requests (#536) * add issue link for data TransferID generation * comment out failing but not relevant tests * finish voucher rename from Identifier --> Type tests passing cleanup for PR remove unused fmt import in graphsync_test a better reflection send data transfer response other tests passing feat(datatransfer): milestone 2 infrastructure Setup test path for all tickets for milestone 2 responses alert subscribers when request is not accepted (#607) Graphsync response is scheduled when a valid push request is received (#625) fix(datatransfer): fix tests fix an error with read buffers in tests fix(deps): fix go.sum Feat/dt graphsync pullreqs (#627) * graphsync responses to pull requests Feat/dt initiator cleanup (#645) * ChannelID.To --> ChannelID.Initiator * We now store our peer ID (from host.ID()) so it can be used when creating ChannelIDs. * InProgressChannels returns all of impl.channels, currently just for testing * Implements go-data-transfer issue * Some assertions were changed based on the above. * Renamed some variables and added some assertions based on the new understanding * Updated SHA for graphsync module * Updated fakeGraphSync test structs to use new interfaces from new SHA above Techdebt/dt split graphsync impl receiver (#651) * Split up graphsyncImpl and graphsyncReceiver * rename graphsync to utils DTM sends data over graphsync for validated push requests (#665) * create channels when a request is received. register push request hook with graphsync. fix tests. * better NewReaders * use mutex lock around impl.channels access * fix(datatransfer): fix test uncertainty * fix a data race and also don't use random bytes in basic block which can fail * privatize 3 funcs with @hannahhoward Feat/dt gs pullrequests (#693) * Implements DTM Sends Data Over Graphsync For Validated Pull Requests * rename a field in a test struct * refactor a couple of private functions (one was refactored out of existence) Feat/dt subscribe, file Xfer round trip (#720) Implements the rest of Subscriber Is Notified When Request Completed #24: * send a graphsync message within a go func and consume responses until error or transfer is complete. * notify subscribers of results. * Rename datatransfer.Event to EventCode. * datatransfer.Event is now a struct that includes a message and a timestamp as well as the Event.Code int, formerly Event, update all uses * Add extension data to graphsync request hook, gsReq * rename sendRequest to sendDtRequest, to distinguish it from sendGsRequest, where Dt = datatransfer, Gs = graphsync * use a mutex lock for last transfer ID * obey the linter Don't respond with error in gsReqRcdHook when we can't find the datatransfer extension. (#754) * update to correct graphsync version, update tests & code to call the new graphsync hooks * getExtensionData returns an empty struct + nil if we can't find our extension * Don't respond with error when we can't find the extension. * Test for same * mod tidy minor fix to go.sum feat(datatransfer): switch to graphsync implementation Move over to real graphsync implementation of data transfer, add constructors for graphsync instances on client and miner side fix(datatransfer): Fix validators Validators were checking payload cid against commP -- which are not the same any more. Added a payloadCid to client deal to maintain the record, fixed validator logic Feat/dt extraction use go-fil-components/datatransfer (#770) * Initial commit after changing to go-fil-components/datatransfer * blow away the datatransfer dir * use go-fil-components master after its PR #1 was merged * go mod tidy use a package updates after rebase with master
2019-10-30 02:42:16 +00:00
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync),
Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider),
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore),
Override(new(*storedask.StoredAsk), modules.NewStorageAsk),
2020-07-30 17:36:31 +00:00
Override(new(dtypes.DealFilter), modules.BasicDealFilter(nil)),
2020-07-30 12:31:31 +00:00
Override(new(modules.ProviderDealFunds), modules.NewProviderDealFunds),
Override(new(storagemarket.StorageProvider), modules.StorageProvider),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter),
2019-08-26 13:45:36 +00:00
Override(HandleRetrievalKey, modules.HandleRetrieval),
2019-12-04 19:44:15 +00:00
Override(GetParamsKey, modules.GetParams),
2019-08-02 16:25:10 +00:00
Override(HandleDealsKey, modules.HandleDeals),
Override(new(gen.WinningPoStProver), storage.NewWinningPoStProver),
2019-11-25 04:45:13 +00:00
Override(new(*miner.Miner), modules.SetupBlockProducer),
2020-06-11 20:18:18 +00:00
Override(new(dtypes.ConsiderOnlineStorageDealsConfigFunc), modules.NewConsiderOnlineStorageDealsConfigFunc),
Override(new(dtypes.SetConsiderOnlineStorageDealsConfigFunc), modules.NewSetConsideringOnlineStorageDealsFunc),
Override(new(dtypes.ConsiderOnlineRetrievalDealsConfigFunc), modules.NewConsiderOnlineRetrievalDealsConfigFunc),
Override(new(dtypes.SetConsiderOnlineRetrievalDealsConfigFunc), modules.NewSetConsiderOnlineRetrievalDealsConfigFunc),
2020-06-18 22:42:24 +00:00
Override(new(dtypes.StorageDealPieceCidBlocklistConfigFunc), modules.NewStorageDealPieceCidBlocklistConfigFunc),
Override(new(dtypes.SetStorageDealPieceCidBlocklistConfigFunc), modules.NewSetStorageDealPieceCidBlocklistConfigFunc),
Override(new(dtypes.ConsiderOfflineStorageDealsConfigFunc), modules.NewConsiderOfflineStorageDealsConfigFunc),
Override(new(dtypes.SetConsiderOfflineStorageDealsConfigFunc), modules.NewSetConsideringOfflineStorageDealsFunc),
Override(new(dtypes.ConsiderOfflineRetrievalDealsConfigFunc), modules.NewConsiderOfflineRetrievalDealsConfigFunc),
Override(new(dtypes.SetConsiderOfflineRetrievalDealsConfigFunc), modules.NewSetConsiderOfflineRetrievalDealsConfigFunc),
2020-08-18 14:20:31 +00:00
Override(new(dtypes.SetSealingConfigFunc), modules.NewSetSealConfigFunc),
Override(new(dtypes.GetSealingConfigFunc), modules.NewGetSealConfigFunc),
Override(new(dtypes.SetExpectedSealDurationFunc), modules.NewSetExpectedSealDurationFunc),
Override(new(dtypes.GetExpectedSealDurationFunc), modules.NewGetExpectedSealDurationFunc),
),
2019-07-19 09:23:24 +00:00
)
}
2019-07-24 00:58:31 +00:00
func StorageMiner(out *api.StorageMiner) Option {
2019-07-19 09:23:24 +00:00
return Options(
ApplyIf(func(s *Settings) bool { return s.Config },
Error(errors.New("the StorageMiner option must be set before Config option")),
),
ApplyIf(func(s *Settings) bool { return s.Online },
Error(errors.New("the StorageMiner option must be set before Online option")),
),
func(s *Settings) error {
2019-11-12 17:59:38 +00:00
s.nodeType = repo.StorageMiner
2019-07-19 09:23:24 +00:00
return nil
},
2019-07-24 00:58:31 +00:00
func(s *Settings) error {
resAPI := &impl.StorageMinerAPI{}
2019-07-24 00:58:31 +00:00
s.invokes[ExtractApiKey] = fx.Extract(resAPI)
*out = resAPI
return nil
},
2019-07-04 15:50:48 +00:00
)
}
2019-07-10 13:06:04 +00:00
// Config sets up constructors based on the provided Config
func ConfigCommon(cfg *config.Common) Option {
2019-07-04 15:50:48 +00:00
return Options(
2019-07-10 13:06:04 +00:00
func(s *Settings) error { s.Config = true; return nil },
2020-03-16 17:50:07 +00:00
Override(new(dtypes.APIEndpoint), func() (dtypes.APIEndpoint, error) {
return multiaddr.NewMultiaddr(cfg.API.ListenAddress)
}),
Override(SetApiEndpointKey, func(lr repo.LockedRepo, e dtypes.APIEndpoint) error {
return lr.SetAPIEndpoint(e)
}),
Override(new(sectorstorage.URLs), func(e dtypes.APIEndpoint) (sectorstorage.URLs, error) {
ip := cfg.API.RemoteListenAddress
var urls sectorstorage.URLs
2020-04-03 05:01:39 +00:00
urls = append(urls, "http://"+ip+"/remote") // TODO: This makes no assumptions, and probably could...
2020-03-16 17:50:07 +00:00
return urls, nil
}),
2019-07-10 13:06:04 +00:00
ApplyIf(func(s *Settings) bool { return s.Online },
2019-07-04 15:50:48 +00:00
Override(StartListeningKey, lp2p.StartListening(cfg.Libp2p.ListenAddresses)),
2019-12-17 16:09:43 +00:00
Override(ConnectionManagerKey, lp2p.ConnectionManager(
cfg.Libp2p.ConnMgrLow,
cfg.Libp2p.ConnMgrHigh,
time.Duration(cfg.Libp2p.ConnMgrGrace),
cfg.Libp2p.ProtectedPeers)),
2020-05-14 01:10:49 +00:00
Override(new(*pubsub.PubSub), lp2p.GossipSub),
Override(new(*config.Pubsub), &cfg.Pubsub),
2019-10-11 02:45:45 +00:00
ApplyIf(func(s *Settings) bool { return len(cfg.Libp2p.BootstrapPeers) > 0 },
Override(new(dtypes.BootstrapPeers), modules.ConfigBootstrap(cfg.Libp2p.BootstrapPeers)),
),
2019-07-04 15:50:48 +00:00
),
Override(AddrsFactoryKey, lp2p.AddrsFactory(
cfg.Libp2p.AnnounceAddresses,
cfg.Libp2p.NoAnnounceAddresses)),
2019-07-04 15:50:48 +00:00
)
}
2019-11-12 17:59:38 +00:00
func ConfigFullNode(c interface{}) Option {
cfg, ok := c.(*config.FullNode)
if !ok {
return Error(xerrors.Errorf("invalid config from repo, got: %T", c))
2019-07-10 15:38:35 +00:00
}
ipfsMaddr := cfg.Client.IpfsMAddr
2019-11-12 17:59:38 +00:00
return Options(
ConfigCommon(&cfg.Common),
If(cfg.Client.UseIpfs,
Override(new(dtypes.ClientBlockstore), modules.IpfsClientBlockstore(ipfsMaddr)),
If(cfg.Client.IpfsUseForRetrieval,
Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientBlockstoreRetrievalStoreManager),
),
),
2019-12-11 15:08:50 +00:00
If(cfg.Metrics.HeadNotifs,
Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)),
),
2019-11-12 17:59:38 +00:00
)
}
2020-03-03 22:19:22 +00:00
func ConfigStorageMiner(c interface{}) Option {
cfg, ok := c.(*config.StorageMiner)
if !ok {
return Error(xerrors.Errorf("invalid config from repo, got: %T", c))
2019-07-10 15:38:35 +00:00
}
return Options(
ConfigCommon(&cfg.Common),
2020-07-30 17:36:31 +00:00
If(cfg.Dealmaking.Filter != "",
Override(new(dtypes.DealFilter), modules.BasicDealFilter(dealfilter.CliDealFilter(cfg.Dealmaking.Filter))),
),
Override(new(sectorstorage.SealerConfig), cfg.Storage),
2020-08-12 17:47:00 +00:00
Override(new(*storage.Miner), modules.StorageMiner(cfg.Fees)),
)
}
2019-07-10 15:38:35 +00:00
func Repo(r repo.Repo) Option {
return func(settings *Settings) error {
lr, err := r.Lock(settings.nodeType)
if err != nil {
return err
}
c, err := lr.Config()
if err != nil {
return err
}
2019-07-10 17:36:17 +00:00
return Options(
Override(new(repo.LockedRepo), modules.LockedRepo(lr)), // module handles closing
2019-07-16 16:02:51 +00:00
Override(new(dtypes.MetadataDS), modules.Datastore),
Override(new(dtypes.ChainBlockstore), modules.ChainBlockstore),
2019-07-23 20:23:44 +00:00
2020-07-07 08:52:19 +00:00
Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr),
2020-07-06 23:39:30 +00:00
Override(new(dtypes.ClientMultiDstore), modules.ClientMultiDatastore),
2020-07-07 08:52:19 +00:00
Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore),
Override(new(dtypes.ClientRetrievalStoreManager), modules.ClientRetrievalStoreManager),
Override(new(ci.PrivKey), lp2p.PrivKey),
Override(new(ci.PubKey), ci.PrivKey.GetPublic),
Override(new(peer.ID), peer.IDFromPublicKey),
2019-07-23 20:23:44 +00:00
Override(new(types.KeyStore), modules.KeyStore),
Override(new(*dtypes.APIAlg), modules.APISecret),
ApplyIf(isType(repo.FullNode), ConfigFullNode(c)),
ApplyIf(isType(repo.StorageMiner), ConfigStorageMiner(c)),
)(settings)
}
2019-07-10 15:38:35 +00:00
}
2019-07-24 00:09:34 +00:00
func FullAPI(out *api.FullNode) Option {
2019-07-23 22:34:13 +00:00
return func(s *Settings) error {
resAPI := &impl.FullNodeAPI{}
2019-07-23 22:34:13 +00:00
s.invokes[ExtractApiKey] = fx.Extract(resAPI)
*out = resAPI
return nil
}
}
2019-09-17 14:23:08 +00:00
type StopFunc func(context.Context) error
2019-07-02 13:05:43 +00:00
// New builds and starts new Filecoin node
2019-09-17 14:23:08 +00:00
func New(ctx context.Context, opts ...Option) (StopFunc, error) {
2019-07-10 13:06:04 +00:00
settings := Settings{
modules: map[interface{}]fx.Option{},
invokes: make([]fx.Option, _nInvokes),
2019-11-12 17:59:38 +00:00
nodeType: repo.FullNode,
2019-07-04 15:50:48 +00:00
}
2019-07-04 20:06:02 +00:00
// apply module options in the right order
2019-07-09 17:03:36 +00:00
if err := Options(Options(defaults()...), Options(opts...))(&settings); err != nil {
2020-02-27 23:14:15 +00:00
return nil, xerrors.Errorf("applying node options failed: %w", err)
2019-07-04 15:50:48 +00:00
}
2019-07-04 20:06:02 +00:00
// gather constructors for fx.Options
2019-07-04 15:50:48 +00:00
ctors := make([]fx.Option, 0, len(settings.modules))
for _, opt := range settings.modules {
ctors = append(ctors, opt)
}
2019-07-04 20:06:02 +00:00
// fill holes in invokes for use in fx.Options
2019-07-04 15:50:48 +00:00
for i, opt := range settings.invokes {
if opt == nil {
settings.invokes[i] = fx.Options()
}
}
app := fx.New(
2019-07-04 15:50:48 +00:00
fx.Options(ctors...),
fx.Options(settings.invokes...),
2019-07-09 17:03:36 +00:00
fx.NopLogger,
)
2019-07-04 20:06:02 +00:00
// TODO: we probably should have a 'firewall' for Closing signal
// on this context, and implement closing logic through lifecycles
// correctly
if err := app.Start(ctx); err != nil {
// comment fx.NopLogger few lines above for easier debugging
2020-02-27 23:14:15 +00:00
return nil, xerrors.Errorf("starting node: %w", err)
}
2019-09-17 14:23:08 +00:00
return app.Stop, nil
}
// In-memory / testing
2019-10-23 11:02:00 +00:00
func Test() Option {
2019-07-04 15:50:48 +00:00
return Options(
2019-10-23 11:02:00 +00:00
Unset(RunPeerMgrKey),
Unset(new(*peermgr.PeerMgr)),
Override(new(beacon.Schedule), testing.RandomBeacon),
)
}