diff --git a/pkg/ipfs/ipld/trie_node.go b/pkg/ipfs/ipld/trie_node.go
new file mode 100644
index 00000000..1f26f89d
--- /dev/null
+++ b/pkg/ipfs/ipld/trie_node.go
@@ -0,0 +1,440 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package ipld
+
+import (
+ "encoding/json"
+ "fmt"
+
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ipfs/go-cid"
+ node "github.com/ipfs/go-ipld-format"
+)
+
+// TrieNode is the general abstraction for
+//ethereum IPLD trie nodes.
+type TrieNode struct {
+ // leaf, extension or branch
+ nodeKind string
+
+ // If leaf or extension: [0] is key, [1] is val.
+ // If branch: [0] - [16] are children.
+ elements []interface{}
+
+ // IPLD block information
+ cid cid.Cid
+ rawdata []byte
+}
+
+/*
+ OUTPUT
+*/
+
+type trieNodeLeafDecoder func([]interface{}) ([]interface{}, error)
+
+// decodeTrieNode returns a TrieNode object from an IPLD block's
+// cid and rawdata.
+func decodeTrieNode(c cid.Cid, b []byte,
+ leafDecoder trieNodeLeafDecoder) (*TrieNode, error) {
+ var (
+ i, decoded, elements []interface{}
+ nodeKind string
+ err error
+ )
+
+ err = rlp.DecodeBytes(b, &i)
+ if err != nil {
+ return nil, err
+ }
+
+ codec := c.Type()
+ switch len(i) {
+ case 2:
+ nodeKind, decoded, err = decodeCompactKey(i)
+ if err != nil {
+ return nil, err
+ }
+
+ if nodeKind == "extension" {
+ elements, err = parseTrieNodeExtension(decoded, codec)
+ }
+ if nodeKind == "leaf" {
+ elements, err = leafDecoder(decoded)
+ }
+ if nodeKind != "extension" && nodeKind != "leaf" {
+ return nil, fmt.Errorf("unexpected nodeKind returned from decoder")
+ }
+ case 17:
+ nodeKind = "branch"
+ elements, err = parseTrieNodeBranch(i, codec)
+ if err != nil {
+ return nil, err
+ }
+ default:
+ return nil, fmt.Errorf("unknown trie node type")
+ }
+
+ return &TrieNode{
+ nodeKind: nodeKind,
+ elements: elements,
+ rawdata: b,
+ cid: c,
+ }, nil
+}
+
+// decodeCompactKey takes a compact key, and returns its nodeKind and value.
+func decodeCompactKey(i []interface{}) (string, []interface{}, error) {
+ first := i[0].([]byte)
+ last := i[1].([]byte)
+
+ switch first[0] / 16 {
+ case '\x00':
+ return "extension", []interface{}{
+ nibbleToByte(first)[2:],
+ last,
+ }, nil
+ case '\x01':
+ return "extension", []interface{}{
+ nibbleToByte(first)[1:],
+ last,
+ }, nil
+ case '\x02':
+ return "leaf", []interface{}{
+ nibbleToByte(first)[2:],
+ last,
+ }, nil
+ case '\x03':
+ return "leaf", []interface{}{
+ nibbleToByte(first)[1:],
+ last,
+ }, nil
+ default:
+ return "", nil, fmt.Errorf("unknown hex prefix")
+ }
+}
+
+// parseTrieNodeExtension helper improves readability
+func parseTrieNodeExtension(i []interface{}, codec uint64) ([]interface{}, error) {
+ return []interface{}{
+ i[0].([]byte),
+ keccak256ToCid(codec, i[1].([]byte)),
+ }, nil
+}
+
+// parseTrieNodeBranch helper improves readability
+func parseTrieNodeBranch(i []interface{}, codec uint64) ([]interface{}, error) {
+ var out []interface{}
+
+ for _, vi := range i {
+ v := vi.([]byte)
+
+ switch len(v) {
+ case 0:
+ out = append(out, nil)
+ case 32:
+ out = append(out, keccak256ToCid(codec, v))
+ default:
+ return nil, fmt.Errorf("unrecognized object: %v", v)
+ }
+ }
+
+ return out, nil
+}
+
+/*
+ Node INTERFACE
+*/
+
+// Resolve resolves a path through this node, stopping at any link boundary
+// and returning the object found as well as the remaining path to traverse
+func (t *TrieNode) Resolve(p []string) (interface{}, []string, error) {
+ switch t.nodeKind {
+ case "extension":
+ return t.resolveTrieNodeExtension(p)
+ case "leaf":
+ return t.resolveTrieNodeLeaf(p)
+ case "branch":
+ return t.resolveTrieNodeBranch(p)
+ default:
+ return nil, nil, fmt.Errorf("nodeKind case not implemented")
+ }
+}
+
+// Tree lists all paths within the object under 'path', and up to the given depth.
+// To list the entire object (similar to `find .`) pass "" and -1
+func (t *TrieNode) Tree(p string, depth int) []string {
+ if p != "" || depth == 0 {
+ return nil
+ }
+
+ var out []string
+
+ switch t.nodeKind {
+ case "extension":
+ var val string
+ for _, e := range t.elements[0].([]byte) {
+ val += fmt.Sprintf("%x", e)
+ }
+ return []string{val}
+ case "branch":
+ for i, elem := range t.elements {
+ if _, ok := elem.(*cid.Cid); ok {
+ out = append(out, fmt.Sprintf("%x", i))
+ }
+ }
+ return out
+
+ default:
+ return nil
+ }
+}
+
+// ResolveLink is a helper function that calls resolve and asserts the
+// output is a link
+func (t *TrieNode) ResolveLink(p []string) (*node.Link, []string, error) {
+ obj, rest, err := t.Resolve(p)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ lnk, ok := obj.(*node.Link)
+ if !ok {
+ return nil, nil, fmt.Errorf("was not a link")
+ }
+
+ return lnk, rest, nil
+}
+
+// Copy will go away. It is here to comply with the interface.
+func (t *TrieNode) Copy() node.Node {
+ panic("dont use this yet")
+}
+
+// Links is a helper function that returns all links within this object
+func (t *TrieNode) Links() []*node.Link {
+ var out []*node.Link
+
+ for _, i := range t.elements {
+ c, ok := i.(cid.Cid)
+ if ok {
+ out = append(out, &node.Link{Cid: c})
+ }
+ }
+
+ return out
+}
+
+// Stat will go away. It is here to comply with the interface.
+func (t *TrieNode) Stat() (*node.NodeStat, error) {
+ return &node.NodeStat{}, nil
+}
+
+// Size will go away. It is here to comply with the interface.
+func (t *TrieNode) Size() (uint64, error) {
+ return 0, nil
+}
+
+/*
+ TrieNode functions
+*/
+
+// MarshalJSON processes the transaction trie into readable JSON format.
+func (t *TrieNode) MarshalJSON() ([]byte, error) {
+ var out map[string]interface{}
+
+ switch t.nodeKind {
+ case "extension":
+ fallthrough
+ case "leaf":
+ var hexPrefix string
+ for _, e := range t.elements[0].([]byte) {
+ hexPrefix += fmt.Sprintf("%x", e)
+ }
+
+ // if we got a byte we need to do this casting otherwise
+ // it will be marshaled to a base64 encoded value
+ if _, ok := t.elements[1].([]byte); ok {
+ var hexVal string
+ for _, e := range t.elements[1].([]byte) {
+ hexVal += fmt.Sprintf("%x", e)
+ }
+
+ t.elements[1] = hexVal
+ }
+
+ out = map[string]interface{}{
+ "type": t.nodeKind,
+ hexPrefix: t.elements[1],
+ }
+
+ case "branch":
+ out = map[string]interface{}{
+ "type": "branch",
+ "0": t.elements[0],
+ "1": t.elements[1],
+ "2": t.elements[2],
+ "3": t.elements[3],
+ "4": t.elements[4],
+ "5": t.elements[5],
+ "6": t.elements[6],
+ "7": t.elements[7],
+ "8": t.elements[8],
+ "9": t.elements[9],
+ "a": t.elements[10],
+ "b": t.elements[11],
+ "c": t.elements[12],
+ "d": t.elements[13],
+ "e": t.elements[14],
+ "f": t.elements[15],
+ }
+ default:
+ return nil, fmt.Errorf("nodeKind %s not supported", t.nodeKind)
+ }
+
+ return json.Marshal(out)
+}
+
+// nibbleToByte expands the nibbles of a byte slice into their own bytes.
+func nibbleToByte(k []byte) []byte {
+ var out []byte
+
+ for _, b := range k {
+ out = append(out, b/16)
+ out = append(out, b%16)
+ }
+
+ return out
+}
+
+// Resolve reading conveniences
+func (t *TrieNode) resolveTrieNodeExtension(p []string) (interface{}, []string, error) {
+ nibbles := t.elements[0].([]byte)
+ idx, rest := shiftFromPath(p, len(nibbles))
+ if len(idx) < len(nibbles) {
+ return nil, nil, fmt.Errorf("not enough nibbles to traverse this extension")
+ }
+
+ for _, i := range idx {
+ if getHexIndex(string(i)) == -1 {
+ return nil, nil, fmt.Errorf("invalid path element")
+ }
+ }
+
+ for i, n := range nibbles {
+ if string(idx[i]) != fmt.Sprintf("%x", n) {
+ return nil, nil, fmt.Errorf("no such link in this extension")
+ }
+ }
+
+ return &node.Link{Cid: t.elements[1].(cid.Cid)}, rest, nil
+}
+
+func (t *TrieNode) resolveTrieNodeLeaf(p []string) (interface{}, []string, error) {
+ nibbles := t.elements[0].([]byte)
+
+ if len(nibbles) != 0 {
+ idx, rest := shiftFromPath(p, len(nibbles))
+ if len(idx) < len(nibbles) {
+ return nil, nil, fmt.Errorf("not enough nibbles to traverse this leaf")
+ }
+
+ for _, i := range idx {
+ if getHexIndex(string(i)) == -1 {
+ return nil, nil, fmt.Errorf("invalid path element")
+ }
+ }
+
+ for i, n := range nibbles {
+ if string(idx[i]) != fmt.Sprintf("%x", n) {
+ return nil, nil, fmt.Errorf("no such link in this extension")
+ }
+ }
+
+ p = rest
+ }
+
+ link, ok := t.elements[1].(node.Node)
+ if !ok {
+ return nil, nil, fmt.Errorf("leaf children is not an IPLD node")
+ }
+
+ return link.Resolve(p)
+}
+
+func (t *TrieNode) resolveTrieNodeBranch(p []string) (interface{}, []string, error) {
+ idx, rest := shiftFromPath(p, 1)
+ hidx := getHexIndex(idx)
+ if hidx == -1 {
+ return nil, nil, fmt.Errorf("incorrect path")
+ }
+
+ child := t.elements[hidx]
+ if child != nil {
+ return &node.Link{Cid: child.(cid.Cid)}, rest, nil
+ }
+ return nil, nil, fmt.Errorf("no such link in this branch")
+}
+
+// shiftFromPath extracts from a given path (as a slice of strings)
+// the given number of elements as a single string, returning whatever
+// it has not taken.
+//
+// Examples:
+// ["0", "a", "something"] and 1 -> "0" and ["a", "something"]
+// ["ab", "c", "d", "1"] and 2 -> "ab" and ["c", "d", "1"]
+// ["abc", "d", "1"] and 2 -> "ab" and ["c", "d", "1"]
+func shiftFromPath(p []string, i int) (string, []string) {
+ var (
+ out string
+ rest []string
+ )
+
+ for _, pe := range p {
+ re := ""
+ for _, c := range pe {
+ if len(out) < i {
+ out += string(c)
+ } else {
+ re += string(c)
+ }
+ }
+
+ if len(out) == i && re != "" {
+ rest = append(rest, re)
+ }
+ }
+
+ return out, rest
+}
+
+// getHexIndex returns to you the integer 0 - 15 equivalent to your
+// string character if applicable, or -1 otherwise.
+func getHexIndex(s string) int {
+ if len(s) != 1 {
+ return -1
+ }
+
+ c := byte(s[0])
+ switch {
+ case '0' <= c && c <= '9':
+ return int(c - '0')
+ case 'a' <= c && c <= 'f':
+ return int(c - 'a' + 10)
+ }
+
+ return -1
+}
\ No newline at end of file
diff --git a/pkg/super_node/watcher/config.go b/pkg/super_node/watcher/config.go
new file mode 100644
index 00000000..7190ac9a
--- /dev/null
+++ b/pkg/super_node/watcher/config.go
@@ -0,0 +1,40 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package watcher
+
+import (
+ "github.com/vulcanize/vulcanizedb/pkg/config"
+ "github.com/vulcanize/vulcanizedb/pkg/eth/core"
+ "github.com/vulcanize/vulcanizedb/pkg/postgres"
+ "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
+)
+
+// Config holds all of the parameters necessary for defining and running an instance of a watcher
+type Config struct {
+ // Subscription settings
+ SubscriptionConfig shared.SubscriptionSettings
+ // Database settings
+ DBConfig config.Database
+ // DB itself
+ DB *postgres.DB
+ // Subscription client
+ Client core.RPCClient
+ // WASM instantiation paths and namespaces
+ WASMInstances [][2]string
+ // Path and names for trigger functions (sql files) that (can) use the instantiated wasm namespaces
+ TriggerFunctions [][2]string
+}
diff --git a/pkg/super_node/watcher/constructors.go b/pkg/super_node/watcher/constructors.go
new file mode 100644
index 00000000..d8b14588
--- /dev/null
+++ b/pkg/super_node/watcher/constructors.go
@@ -0,0 +1,43 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package watcher
+
+import (
+ "fmt"
+
+ "github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
+ "github.com/vulcanize/vulcanizedb/pkg/eth/core"
+ "github.com/vulcanize/vulcanizedb/pkg/postgres"
+ shared2 "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
+ "github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/eth"
+ "github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/shared"
+)
+
+// NewSuperNodeStreamer returns a new shared.SuperNodeStreamer
+func NewSuperNodeStreamer(client core.RPCClient) shared.SuperNodeStreamer {
+ return streamer.NewSuperNodeStreamer(client)
+}
+
+// NewRepository constructs and returns a new Repository that satisfies the shared.Repository interface for the specified chain
+func NewRepository(chain shared2.ChainType, db *postgres.DB, triggerFuncs [][2]string) (shared.Repository, error) {
+ switch chain {
+ case shared2.Ethereum:
+ return eth.NewRepository(db, triggerFuncs), nil
+ default:
+ return nil, fmt.Errorf("NewRepository constructor unexpected chain type %s", chain.String())
+ }
+}
diff --git a/pkg/super_node/watcher/eth/repository.go b/pkg/super_node/watcher/eth/repository.go
new file mode 100644
index 00000000..7ee0199c
--- /dev/null
+++ b/pkg/super_node/watcher/eth/repository.go
@@ -0,0 +1,58 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package eth
+
+import (
+ "github.com/vulcanize/vulcanizedb/pkg/postgres"
+ "github.com/vulcanize/vulcanizedb/pkg/super_node"
+ "github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/shared"
+)
+
+// Repository is the underlying struct for satisfying the shared.Repository interface for eth
+type Repository struct {
+ db *postgres.DB
+ triggerFunctions [][2]string
+}
+
+// NewRepository returns a new eth.Repository that satisfies the shared.Repository interface
+func NewRepository(db *postgres.DB, triggerFunctions [][2]string) shared.Repository {
+ return &Repository{
+ db: db,
+ triggerFunctions: triggerFunctions,
+ }
+}
+
+// LoadTriggers is used to initialize Postgres trigger function
+// this needs to be called after the wasm functions these triggers invoke have been instantiated
+func (r *Repository) LoadTriggers() error {
+ panic("implement me")
+}
+
+// QueueData puts super node payload data into the db queue
+func (r *Repository) QueueData(payload super_node.SubscriptionPayload) error {
+ panic("implement me")
+}
+
+// GetQueueData grabs super node payload data from the db queue
+func (r *Repository) GetQueueData(height int64, hash string) (super_node.SubscriptionPayload, error) {
+ panic("implement me")
+}
+
+// ReadyData puts super node payload data in the tables ready for processing by trigger functions
+func (r *Repository) ReadyData(payload super_node.SubscriptionPayload) error {
+ panic("implement me")
+}
diff --git a/pkg/super_node/watcher/service.go b/pkg/super_node/watcher/service.go
new file mode 100644
index 00000000..2cbbcbc2
--- /dev/null
+++ b/pkg/super_node/watcher/service.go
@@ -0,0 +1,177 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package watcher
+
+import (
+ "sync"
+ "sync/atomic"
+
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/sirupsen/logrus"
+
+ "github.com/vulcanize/vulcanizedb/pkg/super_node"
+ "github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/shared"
+ "github.com/vulcanize/vulcanizedb/pkg/wasm"
+)
+
+// SuperNodeWatcher is the top level interface for watching data from super node
+type SuperNodeWatcher interface {
+ Init() error
+ Watch(wg *sync.WaitGroup) error
+}
+
+// Service is the underlying struct for the SuperNodeWatcher
+type Service struct {
+ // Config
+ WatcherConfig Config
+ // Interface for streaming data from super node
+ SuperNodeStreamer shared.SuperNodeStreamer
+ // Interface for db operations
+ Repository shared.Repository
+ // WASM instantiator
+ WASMIniter *wasm.Instantiator
+
+ // Channels for process communication/data relay
+ PayloadChan chan super_node.SubscriptionPayload
+ QuitChan chan bool
+
+ // Indexes
+ // use atomic operations on these ONLY
+ payloadIndex *int64
+ endingIndex *int64
+ backFilling *int32 // 0 => not backfilling; 1 => backfilling
+}
+
+// NewSuperNodeWatcher returns a new Service which satisfies the SuperNodeWatcher interface
+func NewSuperNodeWatcher(c Config, quitChan chan bool) (SuperNodeWatcher, error) {
+ repo, err := NewRepository(c.SubscriptionConfig.ChainType(), c.DB, c.TriggerFunctions)
+ if err != nil {
+ return nil, err
+ }
+ return &Service{
+ WatcherConfig: c,
+ SuperNodeStreamer: NewSuperNodeStreamer(c.Client),
+ Repository: repo,
+ WASMIniter: wasm.NewWASMInstantiator(c.DB, c.WASMInstances),
+ PayloadChan: make(chan super_node.SubscriptionPayload, super_node.PayloadChanBufferSize),
+ QuitChan: quitChan,
+ }, nil
+}
+
+// Init is used to initialize the Postgres WASM and trigger functions
+func (s *Service) Init() error {
+ // Instantiate the Postgres WASM functions
+ if err := s.WASMIniter.Instantiate(); err != nil {
+ return err
+ }
+ // Load the Postgres trigger functions that (can) use
+ return s.Repository.LoadTriggers()
+}
+
+// Watch is the top level loop for watching super node
+func (s *Service) Watch(wg *sync.WaitGroup) error {
+ sub, err := s.SuperNodeStreamer.Stream(s.PayloadChan, s.WatcherConfig.SubscriptionConfig)
+ if err != nil {
+ return err
+ }
+ atomic.StoreInt64(s.payloadIndex, s.WatcherConfig.SubscriptionConfig.StartingBlock().Int64())
+ atomic.StoreInt64(s.endingIndex, s.WatcherConfig.SubscriptionConfig.EndingBlock().Int64()) // less than 0 => never end
+ backFilling := s.WatcherConfig.SubscriptionConfig.HistoricalData()
+ if backFilling {
+ atomic.StoreInt32(s.backFilling, 1)
+ } else {
+ atomic.StoreInt32(s.backFilling, 0)
+ }
+ backFillOnly := s.WatcherConfig.SubscriptionConfig.HistoricalDataOnly()
+ if backFillOnly { // we are only processing historical data => handle single contiguous stream
+ s.backFillOnlyQueuing(wg, sub)
+ } else { // otherwise we need to be prepared to handle out-of-order data
+ s.combinedQueuing(wg, sub)
+ }
+ return nil
+}
+
+// combinedQueuing assumes data is not necessarily going to come in linear order
+// this is true when we are backfilling and streaming at the head or when we are
+// only streaming at the head since reorgs can occur
+func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscription) {
+ wg.Add(1)
+ go func() {
+ for {
+ select {
+ case payload := <-s.PayloadChan:
+ // If there is an error associated with the payload, log it and continue
+ if payload.Error() != nil {
+ logrus.Error(payload.Error())
+ continue
+ }
+ if payload.Data.Height() == atomic.LoadInt64(s.payloadIndex) {
+ // If the data is at our current index it is ready to be processed; add it to the ready data queue
+ if err := s.Repository.ReadyData(payload); err != nil {
+ logrus.Error(err)
+ }
+ atomic.AddInt64(s.payloadIndex, 1)
+ } else { // otherwise add it to the wait queue
+ if err := s.Repository.QueueData(payload); err != nil {
+ logrus.Error(err)
+ }
+ }
+ case err := <-sub.Err():
+ logrus.Error(err)
+ case <-s.QuitChan:
+ logrus.Info("WatchContract shutting down")
+ wg.Done()
+ return
+ }
+ }
+ }()
+}
+
+// backFillOnlyQueuing assumes the data is coming in contiguously and behind the head
+// it puts all data on the ready queue
+// it continues until the watcher is told to quit or we receive notification that the backfill is finished
+func (s *Service) backFillOnlyQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscription) {
+ wg.Add(1)
+ go func() {
+ for {
+ select {
+ case payload := <-s.PayloadChan:
+ // If there is an error associated with the payload, log it and continue
+ if payload.Error() != nil {
+ logrus.Error(payload.Error())
+ continue
+ }
+ // If the payload signals that backfilling has completed, shut down the process
+ if payload.BackFillComplete() {
+ logrus.Info("Backfill complete, WatchContract shutting down")
+ wg.Done()
+ return
+ }
+ // Add the payload the ready data queue
+ if err := s.Repository.ReadyData(payload); err != nil {
+ logrus.Error(err)
+ }
+ case err := <-sub.Err():
+ logrus.Error(err)
+ case <-s.QuitChan:
+ logrus.Info("WatchContract shutting down")
+ wg.Done()
+ return
+ }
+ }
+ }()
+}
diff --git a/pkg/super_node/watcher/shared/interfaces.go b/pkg/super_node/watcher/shared/interfaces.go
new file mode 100644
index 00000000..871d6fab
--- /dev/null
+++ b/pkg/super_node/watcher/shared/interfaces.go
@@ -0,0 +1,37 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package shared
+
+import (
+ "github.com/ethereum/go-ethereum/rpc"
+
+ "github.com/vulcanize/vulcanizedb/pkg/super_node"
+ "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
+)
+
+// Repository is the interface for the Postgres database
+type Repository interface {
+ LoadTriggers() error
+ QueueData(payload super_node.SubscriptionPayload) error
+ GetQueueData(height int64, hash string) (super_node.SubscriptionPayload, error)
+ ReadyData(payload super_node.SubscriptionPayload) error
+}
+
+// SuperNodeStreamer is the interface for streaming data from a vulcanizeDB super node
+type SuperNodeStreamer interface {
+ Stream(payloadChan chan super_node.SubscriptionPayload, params shared.SubscriptionSettings) (*rpc.ClientSubscription, error)
+}
diff --git a/pkg/wasm/instantiator.go b/pkg/wasm/instantiator.go
index f136fa94..89542880 100644
--- a/pkg/wasm/instantiator.go
+++ b/pkg/wasm/instantiator.go
@@ -18,11 +18,13 @@ package wasm
import "github.com/vulcanize/vulcanizedb/pkg/postgres"
+// Instantiator is used to instantiate WASM functions in Postgres
type Instantiator struct {
db *postgres.DB
instances [][2]string // list of WASM file paths and namespaces
}
+// NewWASMInstantiator returns a pointer to a new Instantiator
func NewWASMInstantiator(db *postgres.DB, instances [][2]string) *Instantiator {
return &Instantiator{
db: db,
@@ -30,13 +32,14 @@ func NewWASMInstantiator(db *postgres.DB, instances [][2]string) *Instantiator {
}
}
+// Instantiate is used to load the WASM functions into Postgres
func (i *Instantiator) Instantiate() error {
tx, err := i.db.Beginx()
if err != nil {
return err
}
for _, pn := range i.instances {
- _, err := i.db.Exec(`SELECT wasm_new_instance('$1', '$2')`, pn[0], pn[1])
+ _, err := tx.Exec(`SELECT wasm_new_instance('$1', '$2')`, pn[0], pn[1])
if err != nil {
return err
}