diff --git a/pkg/super_node/backfiller.go b/pkg/super_node/backfiller.go index 2684df69..7c923646 100644 --- a/pkg/super_node/backfiller.go +++ b/pkg/super_node/backfiller.go @@ -69,11 +69,11 @@ type BackFillService struct { // NewBackFillService returns a new BackFillInterface func NewBackFillService(settings *Config, screenAndServeChan chan shared.ConvertedData) (BackFillInterface, error) { - publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath) + publisher, err := NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.IPFSMode) if err != nil { return nil, err } - indexer, err := NewCIDIndexer(settings.Chain, settings.DB) + indexer, err := NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode) if err != nil { return nil, err } diff --git a/pkg/super_node/constructors.go b/pkg/super_node/constructors.go index fa62fb7d..88726bcb 100644 --- a/pkg/super_node/constructors.go +++ b/pkg/super_node/constructors.go @@ -44,12 +44,27 @@ func NewResponseFilterer(chain shared.ChainType) (shared.ResponseFilterer, error } // NewCIDIndexer constructs a CIDIndexer for the provided chain type -func NewCIDIndexer(chain shared.ChainType, db *postgres.DB) (shared.CIDIndexer, error) { +func NewCIDIndexer(chain shared.ChainType, db *postgres.DB, ipfsMode shared.IPFSMode) (shared.CIDIndexer, error) { switch chain { case shared.Ethereum: - return eth.NewCIDIndexer(db), nil + switch ipfsMode { + case shared.LocalInterface, shared.RemoteClient: + return eth.NewCIDIndexer(db), nil + case shared.DirectPostgres: + return eth.NewIPLDPublisherAndIndexer(db), nil + default: + return nil, fmt.Errorf("ethereum CIDIndexer unexpected ipfs mode %s", ipfsMode.String()) + } case shared.Bitcoin: - return btc.NewCIDIndexer(db), nil + switch ipfsMode { + case shared.LocalInterface, shared.RemoteClient: + return btc.NewCIDIndexer(db), nil + case shared.DirectPostgres: + // TODO + return nil, nil + default: + return nil, fmt.Errorf("bitcoin CIDIndexer unexpected ipfs mode %s", ipfsMode.String()) + } default: return nil, fmt.Errorf("invalid chain %s for indexer constructor", chain.String()) } @@ -134,12 +149,42 @@ func NewIPLDFetcher(chain shared.ChainType, ipfsPath string) (shared.IPLDFetcher } // NewIPLDPublisher constructs an IPLDPublisher for the provided chain type -func NewIPLDPublisher(chain shared.ChainType, ipfsPath string) (shared.IPLDPublisher, error) { +func NewIPLDPublisher(chain shared.ChainType, arg interface{}, ipfsMode shared.IPFSMode) (shared.IPLDPublisher, error) { switch chain { case shared.Ethereum: - return eth.NewIPLDPublisher(ipfsPath) + switch ipfsMode { + case shared.LocalInterface, shared.RemoteClient: + ipfsPath, ok := arg.(string) + if !ok { + var s string + return nil, fmt.Errorf("ethereum IPLDPublisher expected argument type %T got %T", s, arg) + } + return eth.NewIPLDPublisher(ipfsPath) + case shared.DirectPostgres: + db, ok := arg.(*postgres.DB) + if !ok { + var pgdb *postgres.DB + return nil, fmt.Errorf("ethereum IPLDPublisher expected argument type %T got %T", pgdb, arg) + } + return eth.NewIPLDPublisherAndIndexer(db), nil + default: + return nil, fmt.Errorf("ethereum IPLDPublisher unexpected ipfs mode %s", ipfsMode.String()) + } case shared.Bitcoin: - return btc.NewIPLDPublisher(ipfsPath) + switch ipfsMode { + case shared.LocalInterface, shared.RemoteClient: + ipfsPath, ok := arg.(string) + if !ok { + var s string + return nil, fmt.Errorf("bitcoin IPLDPublisher expected argument type %T got %T", s, arg) + } + return btc.NewIPLDPublisher(ipfsPath) + case shared.DirectPostgres: + // TODO + return nil, nil + default: + return nil, fmt.Errorf("bitcoin IPLDPublisher unexpected ipfs mode %s", ipfsMode.String()) + } default: return nil, fmt.Errorf("invalid chain %s for publisher constructor", chain.String()) } diff --git a/pkg/super_node/resync/service.go b/pkg/super_node/resync/service.go index 1e291ff1..6ab5cf37 100644 --- a/pkg/super_node/resync/service.go +++ b/pkg/super_node/resync/service.go @@ -64,11 +64,11 @@ type Service struct { // NewResyncService creates and returns a resync service from the provided settings func NewResyncService(settings *Config) (Resync, error) { - publisher, err := super_node.NewIPLDPublisher(settings.Chain, settings.IPFSPath) + publisher, err := super_node.NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.IPFSMode) if err != nil { return nil, err } - indexer, err := super_node.NewCIDIndexer(settings.Chain, settings.DB) + indexer, err := super_node.NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode) if err != nil { return nil, err } diff --git a/pkg/super_node/service.go b/pkg/super_node/service.go index 508697d6..5b6302d7 100644 --- a/pkg/super_node/service.go +++ b/pkg/super_node/service.go @@ -109,11 +109,11 @@ func NewSuperNode(settings *Config) (SuperNode, error) { if err != nil { return nil, err } - sn.Publisher, err = NewIPLDPublisher(settings.Chain, settings.IPFSPath) + sn.Publisher, err = NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.IPFSMode) if err != nil { return nil, err } - sn.Indexer, err = NewCIDIndexer(settings.Chain, settings.DB) + sn.Indexer, err = NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode) if err != nil { return nil, err } @@ -227,7 +227,6 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared <-publishAndIndexPayload publishAndIndexPayload <- ipldPayload } - publishAndIndexPayload <- ipldPayload case err := <-sub.Err(): log.Errorf("super node subscription error for chain %s: %v", sap.chain.String(), err) case <-sap.QuitChan: @@ -251,12 +250,12 @@ func (sap *Service) publishAndIndex(id int, publishAndIndexPayload <-chan shared log.Debugf("publishing %s data streamed at head height %d", sap.chain.String(), payload.Height()) cidPayload, err := sap.Publisher.Publish(payload) if err != nil { - log.Errorf("super node publishAndIndex worker %d error for chain %s: %v", id, sap.chain.String(), err) + log.Errorf("super node publishAndIndex worker %d publishing error for chain %s: %v", id, sap.chain.String(), err) continue } log.Debugf("indexing %s data streamed at head height %d", sap.chain.String(), payload.Height()) if err := sap.Indexer.Index(cidPayload); err != nil { - log.Errorf("super node publishAndIndex worker %d error for chain %s: %v", id, sap.chain.String(), err) + log.Errorf("super node publishAndIndex worker %d indexing error for chain %s: %v", id, sap.chain.String(), err) } } }