update higher level interfaces and constructs

This commit is contained in:
Ian Norden 2020-04-27 15:37:41 -05:00
parent 475ca3e6d1
commit 65da8cafe7
4 changed files with 59 additions and 15 deletions

View File

@ -69,11 +69,11 @@ type BackFillService struct {
// NewBackFillService returns a new BackFillInterface
func NewBackFillService(settings *Config, screenAndServeChan chan shared.ConvertedData) (BackFillInterface, error) {
publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath)
publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.IPFSMode)
if err != nil {
return nil, err
}
indexer, err := NewCIDIndexer(settings.Chain, settings.DB)
indexer, err := NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode)
if err != nil {
return nil, err
}

View File

@ -44,12 +44,27 @@ func NewResponseFilterer(chain shared.ChainType) (shared.ResponseFilterer, error
}
// NewCIDIndexer constructs a CIDIndexer for the provided chain type
func NewCIDIndexer(chain shared.ChainType, db *postgres.DB) (shared.CIDIndexer, error) {
func NewCIDIndexer(chain shared.ChainType, db *postgres.DB, ipfsMode shared.IPFSMode) (shared.CIDIndexer, error) {
switch chain {
case shared.Ethereum:
switch ipfsMode {
case shared.LocalInterface, shared.RemoteClient:
return eth.NewCIDIndexer(db), nil
case shared.DirectPostgres:
return eth.NewIPLDPublisherAndIndexer(db), nil
default:
return nil, fmt.Errorf("ethereum CIDIndexer unexpected ipfs mode %s", ipfsMode.String())
}
case shared.Bitcoin:
switch ipfsMode {
case shared.LocalInterface, shared.RemoteClient:
return btc.NewCIDIndexer(db), nil
case shared.DirectPostgres:
// TODO
return nil, nil
default:
return nil, fmt.Errorf("bitcoin CIDIndexer unexpected ipfs mode %s", ipfsMode.String())
}
default:
return nil, fmt.Errorf("invalid chain %s for indexer constructor", chain.String())
}
@ -134,12 +149,42 @@ func NewIPLDFetcher(chain shared.ChainType, ipfsPath string) (shared.IPLDFetcher
}
// NewIPLDPublisher constructs an IPLDPublisher for the provided chain type
func NewIPLDPublisher(chain shared.ChainType, ipfsPath string) (shared.IPLDPublisher, error) {
func NewIPLDPublisher(chain shared.ChainType, arg interface{}, ipfsMode shared.IPFSMode) (shared.IPLDPublisher, error) {
switch chain {
case shared.Ethereum:
switch ipfsMode {
case shared.LocalInterface, shared.RemoteClient:
ipfsPath, ok := arg.(string)
if !ok {
var s string
return nil, fmt.Errorf("ethereum IPLDPublisher expected argument type %T got %T", s, arg)
}
return eth.NewIPLDPublisher(ipfsPath)
case shared.DirectPostgres:
db, ok := arg.(*postgres.DB)
if !ok {
var pgdb *postgres.DB
return nil, fmt.Errorf("ethereum IPLDPublisher expected argument type %T got %T", pgdb, arg)
}
return eth.NewIPLDPublisherAndIndexer(db), nil
default:
return nil, fmt.Errorf("ethereum IPLDPublisher unexpected ipfs mode %s", ipfsMode.String())
}
case shared.Bitcoin:
switch ipfsMode {
case shared.LocalInterface, shared.RemoteClient:
ipfsPath, ok := arg.(string)
if !ok {
var s string
return nil, fmt.Errorf("bitcoin IPLDPublisher expected argument type %T got %T", s, arg)
}
return btc.NewIPLDPublisher(ipfsPath)
case shared.DirectPostgres:
// TODO
return nil, nil
default:
return nil, fmt.Errorf("bitcoin IPLDPublisher unexpected ipfs mode %s", ipfsMode.String())
}
default:
return nil, fmt.Errorf("invalid chain %s for publisher constructor", chain.String())
}

View File

@ -64,11 +64,11 @@ type Service struct {
// NewResyncService creates and returns a resync service from the provided settings
func NewResyncService(settings *Config) (Resync, error) {
publisher, err := super_node.NewIPLDPublisher(settings.Chain, settings.IPFSPath)
publisher, err := super_node.NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.IPFSMode)
if err != nil {
return nil, err
}
indexer, err := super_node.NewCIDIndexer(settings.Chain, settings.DB)
indexer, err := super_node.NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode)
if err != nil {
return nil, err
}

View File

@ -109,11 +109,11 @@ func NewSuperNode(settings *Config) (SuperNode, error) {
if err != nil {
return nil, err
}
sn.Publisher, err = NewIPLDPublisher(settings.Chain, settings.IPFSPath)
sn.Publisher, err = NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.IPFSMode)
if err != nil {
return nil, err
}
sn.Indexer, err = NewCIDIndexer(settings.Chain, settings.DB)
sn.Indexer, err = NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode)
if err != nil {
return nil, err
}
@ -227,7 +227,6 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared
<-publishAndIndexPayload
publishAndIndexPayload <- ipldPayload
}
publishAndIndexPayload <- ipldPayload
case err := <-sub.Err():
log.Errorf("super node subscription error for chain %s: %v", sap.chain.String(), err)
case <-sap.QuitChan:
@ -251,12 +250,12 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared
log.Debugf("publishing %s data streamed at head height %d", sap.chain.String(), payload.Height())
cidPayload, err := sap.Publisher.Publish(payload)
if err != nil {
log.Errorf("super node publishAndIndex worker %d error for chain %s: %v", id, sap.chain.String(), err)
log.Errorf("super node publishAndIndex worker %d publishing error for chain %s: %v", id, sap.chain.String(), err)
continue
}
log.Debugf("indexing %s data streamed at head height %d", sap.chain.String(), payload.Height())
if err := sap.Indexer.Index(cidPayload); err != nil {
log.Errorf("super node publishAndIndex worker %d error for chain %s: %v", id, sap.chain.String(), err)
log.Errorf("super node publishAndIndex worker %d indexing error for chain %s: %v", id, sap.chain.String(), err)
}
}
}