From f0c5ff80779b8e431f53f3de5196c0c0555568cf Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Wed, 19 Feb 2020 16:32:59 -0600 Subject: [PATCH] begin wasm watcher engine --- pkg/ipfs/ipld/trie_node.go | 440 ++++++++++++++++++++ pkg/super_node/watcher/config.go | 40 ++ pkg/super_node/watcher/constructors.go | 43 ++ pkg/super_node/watcher/eth/repository.go | 58 +++ pkg/super_node/watcher/service.go | 177 ++++++++ pkg/super_node/watcher/shared/interfaces.go | 37 ++ pkg/wasm/instantiator.go | 5 +- 7 files changed, 799 insertions(+), 1 deletion(-) create mode 100644 pkg/ipfs/ipld/trie_node.go create mode 100644 pkg/super_node/watcher/config.go create mode 100644 pkg/super_node/watcher/constructors.go create mode 100644 pkg/super_node/watcher/eth/repository.go create mode 100644 pkg/super_node/watcher/service.go create mode 100644 pkg/super_node/watcher/shared/interfaces.go diff --git a/pkg/ipfs/ipld/trie_node.go b/pkg/ipfs/ipld/trie_node.go new file mode 100644 index 00000000..1f26f89d --- /dev/null +++ b/pkg/ipfs/ipld/trie_node.go @@ -0,0 +1,440 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package ipld + +import ( + "encoding/json" + "fmt" + + "github.com/ethereum/go-ethereum/rlp" + "github.com/ipfs/go-cid" + node "github.com/ipfs/go-ipld-format" +) + +// TrieNode is the general abstraction for +//ethereum IPLD trie nodes. +type TrieNode struct { + // leaf, extension or branch + nodeKind string + + // If leaf or extension: [0] is key, [1] is val. + // If branch: [0] - [16] are children. + elements []interface{} + + // IPLD block information + cid cid.Cid + rawdata []byte +} + +/* + OUTPUT +*/ + +type trieNodeLeafDecoder func([]interface{}) ([]interface{}, error) + +// decodeTrieNode returns a TrieNode object from an IPLD block's +// cid and rawdata. +func decodeTrieNode(c cid.Cid, b []byte, + leafDecoder trieNodeLeafDecoder) (*TrieNode, error) { + var ( + i, decoded, elements []interface{} + nodeKind string + err error + ) + + err = rlp.DecodeBytes(b, &i) + if err != nil { + return nil, err + } + + codec := c.Type() + switch len(i) { + case 2: + nodeKind, decoded, err = decodeCompactKey(i) + if err != nil { + return nil, err + } + + if nodeKind == "extension" { + elements, err = parseTrieNodeExtension(decoded, codec) + } + if nodeKind == "leaf" { + elements, err = leafDecoder(decoded) + } + if nodeKind != "extension" && nodeKind != "leaf" { + return nil, fmt.Errorf("unexpected nodeKind returned from decoder") + } + case 17: + nodeKind = "branch" + elements, err = parseTrieNodeBranch(i, codec) + if err != nil { + return nil, err + } + default: + return nil, fmt.Errorf("unknown trie node type") + } + + return &TrieNode{ + nodeKind: nodeKind, + elements: elements, + rawdata: b, + cid: c, + }, nil +} + +// decodeCompactKey takes a compact key, and returns its nodeKind and value. +func decodeCompactKey(i []interface{}) (string, []interface{}, error) { + first := i[0].([]byte) + last := i[1].([]byte) + + switch first[0] / 16 { + case '\x00': + return "extension", []interface{}{ + nibbleToByte(first)[2:], + last, + }, nil + case '\x01': + return "extension", []interface{}{ + nibbleToByte(first)[1:], + last, + }, nil + case '\x02': + return "leaf", []interface{}{ + nibbleToByte(first)[2:], + last, + }, nil + case '\x03': + return "leaf", []interface{}{ + nibbleToByte(first)[1:], + last, + }, nil + default: + return "", nil, fmt.Errorf("unknown hex prefix") + } +} + +// parseTrieNodeExtension helper improves readability +func parseTrieNodeExtension(i []interface{}, codec uint64) ([]interface{}, error) { + return []interface{}{ + i[0].([]byte), + keccak256ToCid(codec, i[1].([]byte)), + }, nil +} + +// parseTrieNodeBranch helper improves readability +func parseTrieNodeBranch(i []interface{}, codec uint64) ([]interface{}, error) { + var out []interface{} + + for _, vi := range i { + v := vi.([]byte) + + switch len(v) { + case 0: + out = append(out, nil) + case 32: + out = append(out, keccak256ToCid(codec, v)) + default: + return nil, fmt.Errorf("unrecognized object: %v", v) + } + } + + return out, nil +} + +/* + Node INTERFACE +*/ + +// Resolve resolves a path through this node, stopping at any link boundary +// and returning the object found as well as the remaining path to traverse +func (t *TrieNode) Resolve(p []string) (interface{}, []string, error) { + switch t.nodeKind { + case "extension": + return t.resolveTrieNodeExtension(p) + case "leaf": + return t.resolveTrieNodeLeaf(p) + case "branch": + return t.resolveTrieNodeBranch(p) + default: + return nil, nil, fmt.Errorf("nodeKind case not implemented") + } +} + +// Tree lists all paths within the object under 'path', and up to the given depth. +// To list the entire object (similar to `find .`) pass "" and -1 +func (t *TrieNode) Tree(p string, depth int) []string { + if p != "" || depth == 0 { + return nil + } + + var out []string + + switch t.nodeKind { + case "extension": + var val string + for _, e := range t.elements[0].([]byte) { + val += fmt.Sprintf("%x", e) + } + return []string{val} + case "branch": + for i, elem := range t.elements { + if _, ok := elem.(*cid.Cid); ok { + out = append(out, fmt.Sprintf("%x", i)) + } + } + return out + + default: + return nil + } +} + +// ResolveLink is a helper function that calls resolve and asserts the +// output is a link +func (t *TrieNode) ResolveLink(p []string) (*node.Link, []string, error) { + obj, rest, err := t.Resolve(p) + if err != nil { + return nil, nil, err + } + + lnk, ok := obj.(*node.Link) + if !ok { + return nil, nil, fmt.Errorf("was not a link") + } + + return lnk, rest, nil +} + +// Copy will go away. It is here to comply with the interface. +func (t *TrieNode) Copy() node.Node { + panic("dont use this yet") +} + +// Links is a helper function that returns all links within this object +func (t *TrieNode) Links() []*node.Link { + var out []*node.Link + + for _, i := range t.elements { + c, ok := i.(cid.Cid) + if ok { + out = append(out, &node.Link{Cid: c}) + } + } + + return out +} + +// Stat will go away. It is here to comply with the interface. +func (t *TrieNode) Stat() (*node.NodeStat, error) { + return &node.NodeStat{}, nil +} + +// Size will go away. It is here to comply with the interface. +func (t *TrieNode) Size() (uint64, error) { + return 0, nil +} + +/* + TrieNode functions +*/ + +// MarshalJSON processes the transaction trie into readable JSON format. +func (t *TrieNode) MarshalJSON() ([]byte, error) { + var out map[string]interface{} + + switch t.nodeKind { + case "extension": + fallthrough + case "leaf": + var hexPrefix string + for _, e := range t.elements[0].([]byte) { + hexPrefix += fmt.Sprintf("%x", e) + } + + // if we got a byte we need to do this casting otherwise + // it will be marshaled to a base64 encoded value + if _, ok := t.elements[1].([]byte); ok { + var hexVal string + for _, e := range t.elements[1].([]byte) { + hexVal += fmt.Sprintf("%x", e) + } + + t.elements[1] = hexVal + } + + out = map[string]interface{}{ + "type": t.nodeKind, + hexPrefix: t.elements[1], + } + + case "branch": + out = map[string]interface{}{ + "type": "branch", + "0": t.elements[0], + "1": t.elements[1], + "2": t.elements[2], + "3": t.elements[3], + "4": t.elements[4], + "5": t.elements[5], + "6": t.elements[6], + "7": t.elements[7], + "8": t.elements[8], + "9": t.elements[9], + "a": t.elements[10], + "b": t.elements[11], + "c": t.elements[12], + "d": t.elements[13], + "e": t.elements[14], + "f": t.elements[15], + } + default: + return nil, fmt.Errorf("nodeKind %s not supported", t.nodeKind) + } + + return json.Marshal(out) +} + +// nibbleToByte expands the nibbles of a byte slice into their own bytes. +func nibbleToByte(k []byte) []byte { + var out []byte + + for _, b := range k { + out = append(out, b/16) + out = append(out, b%16) + } + + return out +} + +// Resolve reading conveniences +func (t *TrieNode) resolveTrieNodeExtension(p []string) (interface{}, []string, error) { + nibbles := t.elements[0].([]byte) + idx, rest := shiftFromPath(p, len(nibbles)) + if len(idx) < len(nibbles) { + return nil, nil, fmt.Errorf("not enough nibbles to traverse this extension") + } + + for _, i := range idx { + if getHexIndex(string(i)) == -1 { + return nil, nil, fmt.Errorf("invalid path element") + } + } + + for i, n := range nibbles { + if string(idx[i]) != fmt.Sprintf("%x", n) { + return nil, nil, fmt.Errorf("no such link in this extension") + } + } + + return &node.Link{Cid: t.elements[1].(cid.Cid)}, rest, nil +} + +func (t *TrieNode) resolveTrieNodeLeaf(p []string) (interface{}, []string, error) { + nibbles := t.elements[0].([]byte) + + if len(nibbles) != 0 { + idx, rest := shiftFromPath(p, len(nibbles)) + if len(idx) < len(nibbles) { + return nil, nil, fmt.Errorf("not enough nibbles to traverse this leaf") + } + + for _, i := range idx { + if getHexIndex(string(i)) == -1 { + return nil, nil, fmt.Errorf("invalid path element") + } + } + + for i, n := range nibbles { + if string(idx[i]) != fmt.Sprintf("%x", n) { + return nil, nil, fmt.Errorf("no such link in this extension") + } + } + + p = rest + } + + link, ok := t.elements[1].(node.Node) + if !ok { + return nil, nil, fmt.Errorf("leaf children is not an IPLD node") + } + + return link.Resolve(p) +} + +func (t *TrieNode) resolveTrieNodeBranch(p []string) (interface{}, []string, error) { + idx, rest := shiftFromPath(p, 1) + hidx := getHexIndex(idx) + if hidx == -1 { + return nil, nil, fmt.Errorf("incorrect path") + } + + child := t.elements[hidx] + if child != nil { + return &node.Link{Cid: child.(cid.Cid)}, rest, nil + } + return nil, nil, fmt.Errorf("no such link in this branch") +} + +// shiftFromPath extracts from a given path (as a slice of strings) +// the given number of elements as a single string, returning whatever +// it has not taken. +// +// Examples: +// ["0", "a", "something"] and 1 -> "0" and ["a", "something"] +// ["ab", "c", "d", "1"] and 2 -> "ab" and ["c", "d", "1"] +// ["abc", "d", "1"] and 2 -> "ab" and ["c", "d", "1"] +func shiftFromPath(p []string, i int) (string, []string) { + var ( + out string + rest []string + ) + + for _, pe := range p { + re := "" + for _, c := range pe { + if len(out) < i { + out += string(c) + } else { + re += string(c) + } + } + + if len(out) == i && re != "" { + rest = append(rest, re) + } + } + + return out, rest +} + +// getHexIndex returns to you the integer 0 - 15 equivalent to your +// string character if applicable, or -1 otherwise. +func getHexIndex(s string) int { + if len(s) != 1 { + return -1 + } + + c := byte(s[0]) + switch { + case '0' <= c && c <= '9': + return int(c - '0') + case 'a' <= c && c <= 'f': + return int(c - 'a' + 10) + } + + return -1 +} \ No newline at end of file diff --git a/pkg/super_node/watcher/config.go b/pkg/super_node/watcher/config.go new file mode 100644 index 00000000..7190ac9a --- /dev/null +++ b/pkg/super_node/watcher/config.go @@ -0,0 +1,40 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package watcher + +import ( + "github.com/vulcanize/vulcanizedb/pkg/config" + "github.com/vulcanize/vulcanizedb/pkg/eth/core" + "github.com/vulcanize/vulcanizedb/pkg/postgres" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" +) + +// Config holds all of the parameters necessary for defining and running an instance of a watcher +type Config struct { + // Subscription settings + SubscriptionConfig shared.SubscriptionSettings + // Database settings + DBConfig config.Database + // DB itself + DB *postgres.DB + // Subscription client + Client core.RPCClient + // WASM instantiation paths and namespaces + WASMInstances [][2]string + // Path and names for trigger functions (sql files) that (can) use the instantiated wasm namespaces + TriggerFunctions [][2]string +} diff --git a/pkg/super_node/watcher/constructors.go b/pkg/super_node/watcher/constructors.go new file mode 100644 index 00000000..d8b14588 --- /dev/null +++ b/pkg/super_node/watcher/constructors.go @@ -0,0 +1,43 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package watcher + +import ( + "fmt" + + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" + "github.com/vulcanize/vulcanizedb/pkg/eth/core" + "github.com/vulcanize/vulcanizedb/pkg/postgres" + shared2 "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" + "github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/eth" + "github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/shared" +) + +// NewSuperNodeStreamer returns a new shared.SuperNodeStreamer +func NewSuperNodeStreamer(client core.RPCClient) shared.SuperNodeStreamer { + return streamer.NewSuperNodeStreamer(client) +} + +// NewRepository constructs and returns a new Repository that satisfies the shared.Repository interface for the specified chain +func NewRepository(chain shared2.ChainType, db *postgres.DB, triggerFuncs [][2]string) (shared.Repository, error) { + switch chain { + case shared2.Ethereum: + return eth.NewRepository(db, triggerFuncs), nil + default: + return nil, fmt.Errorf("NewRepository constructor unexpected chain type %s", chain.String()) + } +} diff --git a/pkg/super_node/watcher/eth/repository.go b/pkg/super_node/watcher/eth/repository.go new file mode 100644 index 00000000..7ee0199c --- /dev/null +++ b/pkg/super_node/watcher/eth/repository.go @@ -0,0 +1,58 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package eth + +import ( + "github.com/vulcanize/vulcanizedb/pkg/postgres" + "github.com/vulcanize/vulcanizedb/pkg/super_node" + "github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/shared" +) + +// Repository is the underlying struct for satisfying the shared.Repository interface for eth +type Repository struct { + db *postgres.DB + triggerFunctions [][2]string +} + +// NewRepository returns a new eth.Repository that satisfies the shared.Repository interface +func NewRepository(db *postgres.DB, triggerFunctions [][2]string) shared.Repository { + return &Repository{ + db: db, + triggerFunctions: triggerFunctions, + } +} + +// LoadTriggers is used to initialize Postgres trigger function +// this needs to be called after the wasm functions these triggers invoke have been instantiated +func (r *Repository) LoadTriggers() error { + panic("implement me") +} + +// QueueData puts super node payload data into the db queue +func (r *Repository) QueueData(payload super_node.SubscriptionPayload) error { + panic("implement me") +} + +// GetQueueData grabs super node payload data from the db queue +func (r *Repository) GetQueueData(height int64, hash string) (super_node.SubscriptionPayload, error) { + panic("implement me") +} + +// ReadyData puts super node payload data in the tables ready for processing by trigger functions +func (r *Repository) ReadyData(payload super_node.SubscriptionPayload) error { + panic("implement me") +} diff --git a/pkg/super_node/watcher/service.go b/pkg/super_node/watcher/service.go new file mode 100644 index 00000000..2cbbcbc2 --- /dev/null +++ b/pkg/super_node/watcher/service.go @@ -0,0 +1,177 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package watcher + +import ( + "sync" + "sync/atomic" + + "github.com/ethereum/go-ethereum/rpc" + "github.com/sirupsen/logrus" + + "github.com/vulcanize/vulcanizedb/pkg/super_node" + "github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/shared" + "github.com/vulcanize/vulcanizedb/pkg/wasm" +) + +// SuperNodeWatcher is the top level interface for watching data from super node +type SuperNodeWatcher interface { + Init() error + Watch(wg *sync.WaitGroup) error +} + +// Service is the underlying struct for the SuperNodeWatcher +type Service struct { + // Config + WatcherConfig Config + // Interface for streaming data from super node + SuperNodeStreamer shared.SuperNodeStreamer + // Interface for db operations + Repository shared.Repository + // WASM instantiator + WASMIniter *wasm.Instantiator + + // Channels for process communication/data relay + PayloadChan chan super_node.SubscriptionPayload + QuitChan chan bool + + // Indexes + // use atomic operations on these ONLY + payloadIndex *int64 + endingIndex *int64 + backFilling *int32 // 0 => not backfilling; 1 => backfilling +} + +// NewSuperNodeWatcher returns a new Service which satisfies the SuperNodeWatcher interface +func NewSuperNodeWatcher(c Config, quitChan chan bool) (SuperNodeWatcher, error) { + repo, err := NewRepository(c.SubscriptionConfig.ChainType(), c.DB, c.TriggerFunctions) + if err != nil { + return nil, err + } + return &Service{ + WatcherConfig: c, + SuperNodeStreamer: NewSuperNodeStreamer(c.Client), + Repository: repo, + WASMIniter: wasm.NewWASMInstantiator(c.DB, c.WASMInstances), + PayloadChan: make(chan super_node.SubscriptionPayload, super_node.PayloadChanBufferSize), + QuitChan: quitChan, + }, nil +} + +// Init is used to initialize the Postgres WASM and trigger functions +func (s *Service) Init() error { + // Instantiate the Postgres WASM functions + if err := s.WASMIniter.Instantiate(); err != nil { + return err + } + // Load the Postgres trigger functions that (can) use + return s.Repository.LoadTriggers() +} + +// Watch is the top level loop for watching super node +func (s *Service) Watch(wg *sync.WaitGroup) error { + sub, err := s.SuperNodeStreamer.Stream(s.PayloadChan, s.WatcherConfig.SubscriptionConfig) + if err != nil { + return err + } + atomic.StoreInt64(s.payloadIndex, s.WatcherConfig.SubscriptionConfig.StartingBlock().Int64()) + atomic.StoreInt64(s.endingIndex, s.WatcherConfig.SubscriptionConfig.EndingBlock().Int64()) // less than 0 => never end + backFilling := s.WatcherConfig.SubscriptionConfig.HistoricalData() + if backFilling { + atomic.StoreInt32(s.backFilling, 1) + } else { + atomic.StoreInt32(s.backFilling, 0) + } + backFillOnly := s.WatcherConfig.SubscriptionConfig.HistoricalDataOnly() + if backFillOnly { // we are only processing historical data => handle single contiguous stream + s.backFillOnlyQueuing(wg, sub) + } else { // otherwise we need to be prepared to handle out-of-order data + s.combinedQueuing(wg, sub) + } + return nil +} + +// combinedQueuing assumes data is not necessarily going to come in linear order +// this is true when we are backfilling and streaming at the head or when we are +// only streaming at the head since reorgs can occur +func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscription) { + wg.Add(1) + go func() { + for { + select { + case payload := <-s.PayloadChan: + // If there is an error associated with the payload, log it and continue + if payload.Error() != nil { + logrus.Error(payload.Error()) + continue + } + if payload.Data.Height() == atomic.LoadInt64(s.payloadIndex) { + // If the data is at our current index it is ready to be processed; add it to the ready data queue + if err := s.Repository.ReadyData(payload); err != nil { + logrus.Error(err) + } + atomic.AddInt64(s.payloadIndex, 1) + } else { // otherwise add it to the wait queue + if err := s.Repository.QueueData(payload); err != nil { + logrus.Error(err) + } + } + case err := <-sub.Err(): + logrus.Error(err) + case <-s.QuitChan: + logrus.Info("WatchContract shutting down") + wg.Done() + return + } + } + }() +} + +// backFillOnlyQueuing assumes the data is coming in contiguously and behind the head +// it puts all data on the ready queue +// it continues until the watcher is told to quit or we receive notification that the backfill is finished +func (s *Service) backFillOnlyQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscription) { + wg.Add(1) + go func() { + for { + select { + case payload := <-s.PayloadChan: + // If there is an error associated with the payload, log it and continue + if payload.Error() != nil { + logrus.Error(payload.Error()) + continue + } + // If the payload signals that backfilling has completed, shut down the process + if payload.BackFillComplete() { + logrus.Info("Backfill complete, WatchContract shutting down") + wg.Done() + return + } + // Add the payload the ready data queue + if err := s.Repository.ReadyData(payload); err != nil { + logrus.Error(err) + } + case err := <-sub.Err(): + logrus.Error(err) + case <-s.QuitChan: + logrus.Info("WatchContract shutting down") + wg.Done() + return + } + } + }() +} diff --git a/pkg/super_node/watcher/shared/interfaces.go b/pkg/super_node/watcher/shared/interfaces.go new file mode 100644 index 00000000..871d6fab --- /dev/null +++ b/pkg/super_node/watcher/shared/interfaces.go @@ -0,0 +1,37 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package shared + +import ( + "github.com/ethereum/go-ethereum/rpc" + + "github.com/vulcanize/vulcanizedb/pkg/super_node" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" +) + +// Repository is the interface for the Postgres database +type Repository interface { + LoadTriggers() error + QueueData(payload super_node.SubscriptionPayload) error + GetQueueData(height int64, hash string) (super_node.SubscriptionPayload, error) + ReadyData(payload super_node.SubscriptionPayload) error +} + +// SuperNodeStreamer is the interface for streaming data from a vulcanizeDB super node +type SuperNodeStreamer interface { + Stream(payloadChan chan super_node.SubscriptionPayload, params shared.SubscriptionSettings) (*rpc.ClientSubscription, error) +} diff --git a/pkg/wasm/instantiator.go b/pkg/wasm/instantiator.go index f136fa94..89542880 100644 --- a/pkg/wasm/instantiator.go +++ b/pkg/wasm/instantiator.go @@ -18,11 +18,13 @@ package wasm import "github.com/vulcanize/vulcanizedb/pkg/postgres" +// Instantiator is used to instantiate WASM functions in Postgres type Instantiator struct { db *postgres.DB instances [][2]string // list of WASM file paths and namespaces } +// NewWASMInstantiator returns a pointer to a new Instantiator func NewWASMInstantiator(db *postgres.DB, instances [][2]string) *Instantiator { return &Instantiator{ db: db, @@ -30,13 +32,14 @@ func NewWASMInstantiator(db *postgres.DB, instances [][2]string) *Instantiator { } } +// Instantiate is used to load the WASM functions into Postgres func (i *Instantiator) Instantiate() error { tx, err := i.db.Beginx() if err != nil { return err } for _, pn := range i.instances { - _, err := i.db.Exec(`SELECT wasm_new_instance('$1', '$2')`, pn[0], pn[1]) + _, err := tx.Exec(`SELECT wasm_new_instance('$1', '$2')`, pn[0], pn[1]) if err != nil { return err }