forked from cerc-io/ipld-eth-server
begin wasm watcher engine
This commit is contained in:
parent
94aefafd7c
commit
f0c5ff8077
440
pkg/ipfs/ipld/trie_node.go
Normal file
440
pkg/ipfs/ipld/trie_node.go
Normal file
@ -0,0 +1,440 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package ipld
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
node "github.com/ipfs/go-ipld-format"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TrieNode is the general abstraction for
|
||||||
|
//ethereum IPLD trie nodes.
|
||||||
|
type TrieNode struct {
|
||||||
|
// leaf, extension or branch
|
||||||
|
nodeKind string
|
||||||
|
|
||||||
|
// If leaf or extension: [0] is key, [1] is val.
|
||||||
|
// If branch: [0] - [16] are children.
|
||||||
|
elements []interface{}
|
||||||
|
|
||||||
|
// IPLD block information
|
||||||
|
cid cid.Cid
|
||||||
|
rawdata []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
OUTPUT
|
||||||
|
*/
|
||||||
|
|
||||||
|
type trieNodeLeafDecoder func([]interface{}) ([]interface{}, error)
|
||||||
|
|
||||||
|
// decodeTrieNode returns a TrieNode object from an IPLD block's
|
||||||
|
// cid and rawdata.
|
||||||
|
func decodeTrieNode(c cid.Cid, b []byte,
|
||||||
|
leafDecoder trieNodeLeafDecoder) (*TrieNode, error) {
|
||||||
|
var (
|
||||||
|
i, decoded, elements []interface{}
|
||||||
|
nodeKind string
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
err = rlp.DecodeBytes(b, &i)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
codec := c.Type()
|
||||||
|
switch len(i) {
|
||||||
|
case 2:
|
||||||
|
nodeKind, decoded, err = decodeCompactKey(i)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if nodeKind == "extension" {
|
||||||
|
elements, err = parseTrieNodeExtension(decoded, codec)
|
||||||
|
}
|
||||||
|
if nodeKind == "leaf" {
|
||||||
|
elements, err = leafDecoder(decoded)
|
||||||
|
}
|
||||||
|
if nodeKind != "extension" && nodeKind != "leaf" {
|
||||||
|
return nil, fmt.Errorf("unexpected nodeKind returned from decoder")
|
||||||
|
}
|
||||||
|
case 17:
|
||||||
|
nodeKind = "branch"
|
||||||
|
elements, err = parseTrieNodeBranch(i, codec)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown trie node type")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &TrieNode{
|
||||||
|
nodeKind: nodeKind,
|
||||||
|
elements: elements,
|
||||||
|
rawdata: b,
|
||||||
|
cid: c,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// decodeCompactKey takes a compact key, and returns its nodeKind and value.
|
||||||
|
func decodeCompactKey(i []interface{}) (string, []interface{}, error) {
|
||||||
|
first := i[0].([]byte)
|
||||||
|
last := i[1].([]byte)
|
||||||
|
|
||||||
|
switch first[0] / 16 {
|
||||||
|
case '\x00':
|
||||||
|
return "extension", []interface{}{
|
||||||
|
nibbleToByte(first)[2:],
|
||||||
|
last,
|
||||||
|
}, nil
|
||||||
|
case '\x01':
|
||||||
|
return "extension", []interface{}{
|
||||||
|
nibbleToByte(first)[1:],
|
||||||
|
last,
|
||||||
|
}, nil
|
||||||
|
case '\x02':
|
||||||
|
return "leaf", []interface{}{
|
||||||
|
nibbleToByte(first)[2:],
|
||||||
|
last,
|
||||||
|
}, nil
|
||||||
|
case '\x03':
|
||||||
|
return "leaf", []interface{}{
|
||||||
|
nibbleToByte(first)[1:],
|
||||||
|
last,
|
||||||
|
}, nil
|
||||||
|
default:
|
||||||
|
return "", nil, fmt.Errorf("unknown hex prefix")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseTrieNodeExtension helper improves readability
|
||||||
|
func parseTrieNodeExtension(i []interface{}, codec uint64) ([]interface{}, error) {
|
||||||
|
return []interface{}{
|
||||||
|
i[0].([]byte),
|
||||||
|
keccak256ToCid(codec, i[1].([]byte)),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseTrieNodeBranch helper improves readability
|
||||||
|
func parseTrieNodeBranch(i []interface{}, codec uint64) ([]interface{}, error) {
|
||||||
|
var out []interface{}
|
||||||
|
|
||||||
|
for _, vi := range i {
|
||||||
|
v := vi.([]byte)
|
||||||
|
|
||||||
|
switch len(v) {
|
||||||
|
case 0:
|
||||||
|
out = append(out, nil)
|
||||||
|
case 32:
|
||||||
|
out = append(out, keccak256ToCid(codec, v))
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unrecognized object: %v", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Node INTERFACE
|
||||||
|
*/
|
||||||
|
|
||||||
|
// Resolve resolves a path through this node, stopping at any link boundary
|
||||||
|
// and returning the object found as well as the remaining path to traverse
|
||||||
|
func (t *TrieNode) Resolve(p []string) (interface{}, []string, error) {
|
||||||
|
switch t.nodeKind {
|
||||||
|
case "extension":
|
||||||
|
return t.resolveTrieNodeExtension(p)
|
||||||
|
case "leaf":
|
||||||
|
return t.resolveTrieNodeLeaf(p)
|
||||||
|
case "branch":
|
||||||
|
return t.resolveTrieNodeBranch(p)
|
||||||
|
default:
|
||||||
|
return nil, nil, fmt.Errorf("nodeKind case not implemented")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tree lists all paths within the object under 'path', and up to the given depth.
|
||||||
|
// To list the entire object (similar to `find .`) pass "" and -1
|
||||||
|
func (t *TrieNode) Tree(p string, depth int) []string {
|
||||||
|
if p != "" || depth == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var out []string
|
||||||
|
|
||||||
|
switch t.nodeKind {
|
||||||
|
case "extension":
|
||||||
|
var val string
|
||||||
|
for _, e := range t.elements[0].([]byte) {
|
||||||
|
val += fmt.Sprintf("%x", e)
|
||||||
|
}
|
||||||
|
return []string{val}
|
||||||
|
case "branch":
|
||||||
|
for i, elem := range t.elements {
|
||||||
|
if _, ok := elem.(*cid.Cid); ok {
|
||||||
|
out = append(out, fmt.Sprintf("%x", i))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResolveLink is a helper function that calls resolve and asserts the
|
||||||
|
// output is a link
|
||||||
|
func (t *TrieNode) ResolveLink(p []string) (*node.Link, []string, error) {
|
||||||
|
obj, rest, err := t.Resolve(p)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
lnk, ok := obj.(*node.Link)
|
||||||
|
if !ok {
|
||||||
|
return nil, nil, fmt.Errorf("was not a link")
|
||||||
|
}
|
||||||
|
|
||||||
|
return lnk, rest, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy will go away. It is here to comply with the interface.
|
||||||
|
func (t *TrieNode) Copy() node.Node {
|
||||||
|
panic("dont use this yet")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Links is a helper function that returns all links within this object
|
||||||
|
func (t *TrieNode) Links() []*node.Link {
|
||||||
|
var out []*node.Link
|
||||||
|
|
||||||
|
for _, i := range t.elements {
|
||||||
|
c, ok := i.(cid.Cid)
|
||||||
|
if ok {
|
||||||
|
out = append(out, &node.Link{Cid: c})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stat will go away. It is here to comply with the interface.
|
||||||
|
func (t *TrieNode) Stat() (*node.NodeStat, error) {
|
||||||
|
return &node.NodeStat{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Size will go away. It is here to comply with the interface.
|
||||||
|
func (t *TrieNode) Size() (uint64, error) {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
TrieNode functions
|
||||||
|
*/
|
||||||
|
|
||||||
|
// MarshalJSON processes the transaction trie into readable JSON format.
|
||||||
|
func (t *TrieNode) MarshalJSON() ([]byte, error) {
|
||||||
|
var out map[string]interface{}
|
||||||
|
|
||||||
|
switch t.nodeKind {
|
||||||
|
case "extension":
|
||||||
|
fallthrough
|
||||||
|
case "leaf":
|
||||||
|
var hexPrefix string
|
||||||
|
for _, e := range t.elements[0].([]byte) {
|
||||||
|
hexPrefix += fmt.Sprintf("%x", e)
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we got a byte we need to do this casting otherwise
|
||||||
|
// it will be marshaled to a base64 encoded value
|
||||||
|
if _, ok := t.elements[1].([]byte); ok {
|
||||||
|
var hexVal string
|
||||||
|
for _, e := range t.elements[1].([]byte) {
|
||||||
|
hexVal += fmt.Sprintf("%x", e)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.elements[1] = hexVal
|
||||||
|
}
|
||||||
|
|
||||||
|
out = map[string]interface{}{
|
||||||
|
"type": t.nodeKind,
|
||||||
|
hexPrefix: t.elements[1],
|
||||||
|
}
|
||||||
|
|
||||||
|
case "branch":
|
||||||
|
out = map[string]interface{}{
|
||||||
|
"type": "branch",
|
||||||
|
"0": t.elements[0],
|
||||||
|
"1": t.elements[1],
|
||||||
|
"2": t.elements[2],
|
||||||
|
"3": t.elements[3],
|
||||||
|
"4": t.elements[4],
|
||||||
|
"5": t.elements[5],
|
||||||
|
"6": t.elements[6],
|
||||||
|
"7": t.elements[7],
|
||||||
|
"8": t.elements[8],
|
||||||
|
"9": t.elements[9],
|
||||||
|
"a": t.elements[10],
|
||||||
|
"b": t.elements[11],
|
||||||
|
"c": t.elements[12],
|
||||||
|
"d": t.elements[13],
|
||||||
|
"e": t.elements[14],
|
||||||
|
"f": t.elements[15],
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("nodeKind %s not supported", t.nodeKind)
|
||||||
|
}
|
||||||
|
|
||||||
|
return json.Marshal(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
// nibbleToByte expands the nibbles of a byte slice into their own bytes.
|
||||||
|
func nibbleToByte(k []byte) []byte {
|
||||||
|
var out []byte
|
||||||
|
|
||||||
|
for _, b := range k {
|
||||||
|
out = append(out, b/16)
|
||||||
|
out = append(out, b%16)
|
||||||
|
}
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolve reading conveniences
|
||||||
|
func (t *TrieNode) resolveTrieNodeExtension(p []string) (interface{}, []string, error) {
|
||||||
|
nibbles := t.elements[0].([]byte)
|
||||||
|
idx, rest := shiftFromPath(p, len(nibbles))
|
||||||
|
if len(idx) < len(nibbles) {
|
||||||
|
return nil, nil, fmt.Errorf("not enough nibbles to traverse this extension")
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, i := range idx {
|
||||||
|
if getHexIndex(string(i)) == -1 {
|
||||||
|
return nil, nil, fmt.Errorf("invalid path element")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, n := range nibbles {
|
||||||
|
if string(idx[i]) != fmt.Sprintf("%x", n) {
|
||||||
|
return nil, nil, fmt.Errorf("no such link in this extension")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &node.Link{Cid: t.elements[1].(cid.Cid)}, rest, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TrieNode) resolveTrieNodeLeaf(p []string) (interface{}, []string, error) {
|
||||||
|
nibbles := t.elements[0].([]byte)
|
||||||
|
|
||||||
|
if len(nibbles) != 0 {
|
||||||
|
idx, rest := shiftFromPath(p, len(nibbles))
|
||||||
|
if len(idx) < len(nibbles) {
|
||||||
|
return nil, nil, fmt.Errorf("not enough nibbles to traverse this leaf")
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, i := range idx {
|
||||||
|
if getHexIndex(string(i)) == -1 {
|
||||||
|
return nil, nil, fmt.Errorf("invalid path element")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, n := range nibbles {
|
||||||
|
if string(idx[i]) != fmt.Sprintf("%x", n) {
|
||||||
|
return nil, nil, fmt.Errorf("no such link in this extension")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
p = rest
|
||||||
|
}
|
||||||
|
|
||||||
|
link, ok := t.elements[1].(node.Node)
|
||||||
|
if !ok {
|
||||||
|
return nil, nil, fmt.Errorf("leaf children is not an IPLD node")
|
||||||
|
}
|
||||||
|
|
||||||
|
return link.Resolve(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TrieNode) resolveTrieNodeBranch(p []string) (interface{}, []string, error) {
|
||||||
|
idx, rest := shiftFromPath(p, 1)
|
||||||
|
hidx := getHexIndex(idx)
|
||||||
|
if hidx == -1 {
|
||||||
|
return nil, nil, fmt.Errorf("incorrect path")
|
||||||
|
}
|
||||||
|
|
||||||
|
child := t.elements[hidx]
|
||||||
|
if child != nil {
|
||||||
|
return &node.Link{Cid: child.(cid.Cid)}, rest, nil
|
||||||
|
}
|
||||||
|
return nil, nil, fmt.Errorf("no such link in this branch")
|
||||||
|
}
|
||||||
|
|
||||||
|
// shiftFromPath extracts from a given path (as a slice of strings)
|
||||||
|
// the given number of elements as a single string, returning whatever
|
||||||
|
// it has not taken.
|
||||||
|
//
|
||||||
|
// Examples:
|
||||||
|
// ["0", "a", "something"] and 1 -> "0" and ["a", "something"]
|
||||||
|
// ["ab", "c", "d", "1"] and 2 -> "ab" and ["c", "d", "1"]
|
||||||
|
// ["abc", "d", "1"] and 2 -> "ab" and ["c", "d", "1"]
|
||||||
|
func shiftFromPath(p []string, i int) (string, []string) {
|
||||||
|
var (
|
||||||
|
out string
|
||||||
|
rest []string
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, pe := range p {
|
||||||
|
re := ""
|
||||||
|
for _, c := range pe {
|
||||||
|
if len(out) < i {
|
||||||
|
out += string(c)
|
||||||
|
} else {
|
||||||
|
re += string(c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(out) == i && re != "" {
|
||||||
|
rest = append(rest, re)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, rest
|
||||||
|
}
|
||||||
|
|
||||||
|
// getHexIndex returns to you the integer 0 - 15 equivalent to your
|
||||||
|
// string character if applicable, or -1 otherwise.
|
||||||
|
func getHexIndex(s string) int {
|
||||||
|
if len(s) != 1 {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
c := byte(s[0])
|
||||||
|
switch {
|
||||||
|
case '0' <= c && c <= '9':
|
||||||
|
return int(c - '0')
|
||||||
|
case 'a' <= c && c <= 'f':
|
||||||
|
return int(c - 'a' + 10)
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1
|
||||||
|
}
|
40
pkg/super_node/watcher/config.go
Normal file
40
pkg/super_node/watcher/config.go
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package watcher
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/config"
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/eth/core"
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/postgres"
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Config holds all of the parameters necessary for defining and running an instance of a watcher
|
||||||
|
type Config struct {
|
||||||
|
// Subscription settings
|
||||||
|
SubscriptionConfig shared.SubscriptionSettings
|
||||||
|
// Database settings
|
||||||
|
DBConfig config.Database
|
||||||
|
// DB itself
|
||||||
|
DB *postgres.DB
|
||||||
|
// Subscription client
|
||||||
|
Client core.RPCClient
|
||||||
|
// WASM instantiation paths and namespaces
|
||||||
|
WASMInstances [][2]string
|
||||||
|
// Path and names for trigger functions (sql files) that (can) use the instantiated wasm namespaces
|
||||||
|
TriggerFunctions [][2]string
|
||||||
|
}
|
43
pkg/super_node/watcher/constructors.go
Normal file
43
pkg/super_node/watcher/constructors.go
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package watcher
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/eth/core"
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/postgres"
|
||||||
|
shared2 "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/eth"
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/shared"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewSuperNodeStreamer returns a new shared.SuperNodeStreamer
|
||||||
|
func NewSuperNodeStreamer(client core.RPCClient) shared.SuperNodeStreamer {
|
||||||
|
return streamer.NewSuperNodeStreamer(client)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRepository constructs and returns a new Repository that satisfies the shared.Repository interface for the specified chain
|
||||||
|
func NewRepository(chain shared2.ChainType, db *postgres.DB, triggerFuncs [][2]string) (shared.Repository, error) {
|
||||||
|
switch chain {
|
||||||
|
case shared2.Ethereum:
|
||||||
|
return eth.NewRepository(db, triggerFuncs), nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("NewRepository constructor unexpected chain type %s", chain.String())
|
||||||
|
}
|
||||||
|
}
|
58
pkg/super_node/watcher/eth/repository.go
Normal file
58
pkg/super_node/watcher/eth/repository.go
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package eth
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/postgres"
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/super_node"
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/shared"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Repository is the underlying struct for satisfying the shared.Repository interface for eth
|
||||||
|
type Repository struct {
|
||||||
|
db *postgres.DB
|
||||||
|
triggerFunctions [][2]string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRepository returns a new eth.Repository that satisfies the shared.Repository interface
|
||||||
|
func NewRepository(db *postgres.DB, triggerFunctions [][2]string) shared.Repository {
|
||||||
|
return &Repository{
|
||||||
|
db: db,
|
||||||
|
triggerFunctions: triggerFunctions,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadTriggers is used to initialize Postgres trigger function
|
||||||
|
// this needs to be called after the wasm functions these triggers invoke have been instantiated
|
||||||
|
func (r *Repository) LoadTriggers() error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueData puts super node payload data into the db queue
|
||||||
|
func (r *Repository) QueueData(payload super_node.SubscriptionPayload) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetQueueData grabs super node payload data from the db queue
|
||||||
|
func (r *Repository) GetQueueData(height int64, hash string) (super_node.SubscriptionPayload, error) {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadyData puts super node payload data in the tables ready for processing by trigger functions
|
||||||
|
func (r *Repository) ReadyData(payload super_node.SubscriptionPayload) error {
|
||||||
|
panic("implement me")
|
||||||
|
}
|
177
pkg/super_node/watcher/service.go
Normal file
177
pkg/super_node/watcher/service.go
Normal file
@ -0,0 +1,177 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package watcher
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/super_node"
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/super_node/watcher/shared"
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/wasm"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SuperNodeWatcher is the top level interface for watching data from super node
|
||||||
|
type SuperNodeWatcher interface {
|
||||||
|
Init() error
|
||||||
|
Watch(wg *sync.WaitGroup) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Service is the underlying struct for the SuperNodeWatcher
|
||||||
|
type Service struct {
|
||||||
|
// Config
|
||||||
|
WatcherConfig Config
|
||||||
|
// Interface for streaming data from super node
|
||||||
|
SuperNodeStreamer shared.SuperNodeStreamer
|
||||||
|
// Interface for db operations
|
||||||
|
Repository shared.Repository
|
||||||
|
// WASM instantiator
|
||||||
|
WASMIniter *wasm.Instantiator
|
||||||
|
|
||||||
|
// Channels for process communication/data relay
|
||||||
|
PayloadChan chan super_node.SubscriptionPayload
|
||||||
|
QuitChan chan bool
|
||||||
|
|
||||||
|
// Indexes
|
||||||
|
// use atomic operations on these ONLY
|
||||||
|
payloadIndex *int64
|
||||||
|
endingIndex *int64
|
||||||
|
backFilling *int32 // 0 => not backfilling; 1 => backfilling
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSuperNodeWatcher returns a new Service which satisfies the SuperNodeWatcher interface
|
||||||
|
func NewSuperNodeWatcher(c Config, quitChan chan bool) (SuperNodeWatcher, error) {
|
||||||
|
repo, err := NewRepository(c.SubscriptionConfig.ChainType(), c.DB, c.TriggerFunctions)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &Service{
|
||||||
|
WatcherConfig: c,
|
||||||
|
SuperNodeStreamer: NewSuperNodeStreamer(c.Client),
|
||||||
|
Repository: repo,
|
||||||
|
WASMIniter: wasm.NewWASMInstantiator(c.DB, c.WASMInstances),
|
||||||
|
PayloadChan: make(chan super_node.SubscriptionPayload, super_node.PayloadChanBufferSize),
|
||||||
|
QuitChan: quitChan,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Init is used to initialize the Postgres WASM and trigger functions
|
||||||
|
func (s *Service) Init() error {
|
||||||
|
// Instantiate the Postgres WASM functions
|
||||||
|
if err := s.WASMIniter.Instantiate(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Load the Postgres trigger functions that (can) use
|
||||||
|
return s.Repository.LoadTriggers()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Watch is the top level loop for watching super node
|
||||||
|
func (s *Service) Watch(wg *sync.WaitGroup) error {
|
||||||
|
sub, err := s.SuperNodeStreamer.Stream(s.PayloadChan, s.WatcherConfig.SubscriptionConfig)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
atomic.StoreInt64(s.payloadIndex, s.WatcherConfig.SubscriptionConfig.StartingBlock().Int64())
|
||||||
|
atomic.StoreInt64(s.endingIndex, s.WatcherConfig.SubscriptionConfig.EndingBlock().Int64()) // less than 0 => never end
|
||||||
|
backFilling := s.WatcherConfig.SubscriptionConfig.HistoricalData()
|
||||||
|
if backFilling {
|
||||||
|
atomic.StoreInt32(s.backFilling, 1)
|
||||||
|
} else {
|
||||||
|
atomic.StoreInt32(s.backFilling, 0)
|
||||||
|
}
|
||||||
|
backFillOnly := s.WatcherConfig.SubscriptionConfig.HistoricalDataOnly()
|
||||||
|
if backFillOnly { // we are only processing historical data => handle single contiguous stream
|
||||||
|
s.backFillOnlyQueuing(wg, sub)
|
||||||
|
} else { // otherwise we need to be prepared to handle out-of-order data
|
||||||
|
s.combinedQueuing(wg, sub)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// combinedQueuing assumes data is not necessarily going to come in linear order
|
||||||
|
// this is true when we are backfilling and streaming at the head or when we are
|
||||||
|
// only streaming at the head since reorgs can occur
|
||||||
|
func (s *Service) combinedQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscription) {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case payload := <-s.PayloadChan:
|
||||||
|
// If there is an error associated with the payload, log it and continue
|
||||||
|
if payload.Error() != nil {
|
||||||
|
logrus.Error(payload.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if payload.Data.Height() == atomic.LoadInt64(s.payloadIndex) {
|
||||||
|
// If the data is at our current index it is ready to be processed; add it to the ready data queue
|
||||||
|
if err := s.Repository.ReadyData(payload); err != nil {
|
||||||
|
logrus.Error(err)
|
||||||
|
}
|
||||||
|
atomic.AddInt64(s.payloadIndex, 1)
|
||||||
|
} else { // otherwise add it to the wait queue
|
||||||
|
if err := s.Repository.QueueData(payload); err != nil {
|
||||||
|
logrus.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case err := <-sub.Err():
|
||||||
|
logrus.Error(err)
|
||||||
|
case <-s.QuitChan:
|
||||||
|
logrus.Info("WatchContract shutting down")
|
||||||
|
wg.Done()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// backFillOnlyQueuing assumes the data is coming in contiguously and behind the head
|
||||||
|
// it puts all data on the ready queue
|
||||||
|
// it continues until the watcher is told to quit or we receive notification that the backfill is finished
|
||||||
|
func (s *Service) backFillOnlyQueuing(wg *sync.WaitGroup, sub *rpc.ClientSubscription) {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case payload := <-s.PayloadChan:
|
||||||
|
// If there is an error associated with the payload, log it and continue
|
||||||
|
if payload.Error() != nil {
|
||||||
|
logrus.Error(payload.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// If the payload signals that backfilling has completed, shut down the process
|
||||||
|
if payload.BackFillComplete() {
|
||||||
|
logrus.Info("Backfill complete, WatchContract shutting down")
|
||||||
|
wg.Done()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Add the payload the ready data queue
|
||||||
|
if err := s.Repository.ReadyData(payload); err != nil {
|
||||||
|
logrus.Error(err)
|
||||||
|
}
|
||||||
|
case err := <-sub.Err():
|
||||||
|
logrus.Error(err)
|
||||||
|
case <-s.QuitChan:
|
||||||
|
logrus.Info("WatchContract shutting down")
|
||||||
|
wg.Done()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
37
pkg/super_node/watcher/shared/interfaces.go
Normal file
37
pkg/super_node/watcher/shared/interfaces.go
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package shared
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
|
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/super_node"
|
||||||
|
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Repository is the interface for the Postgres database
|
||||||
|
type Repository interface {
|
||||||
|
LoadTriggers() error
|
||||||
|
QueueData(payload super_node.SubscriptionPayload) error
|
||||||
|
GetQueueData(height int64, hash string) (super_node.SubscriptionPayload, error)
|
||||||
|
ReadyData(payload super_node.SubscriptionPayload) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// SuperNodeStreamer is the interface for streaming data from a vulcanizeDB super node
|
||||||
|
type SuperNodeStreamer interface {
|
||||||
|
Stream(payloadChan chan super_node.SubscriptionPayload, params shared.SubscriptionSettings) (*rpc.ClientSubscription, error)
|
||||||
|
}
|
@ -18,11 +18,13 @@ package wasm
|
|||||||
|
|
||||||
import "github.com/vulcanize/vulcanizedb/pkg/postgres"
|
import "github.com/vulcanize/vulcanizedb/pkg/postgres"
|
||||||
|
|
||||||
|
// Instantiator is used to instantiate WASM functions in Postgres
|
||||||
type Instantiator struct {
|
type Instantiator struct {
|
||||||
db *postgres.DB
|
db *postgres.DB
|
||||||
instances [][2]string // list of WASM file paths and namespaces
|
instances [][2]string // list of WASM file paths and namespaces
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewWASMInstantiator returns a pointer to a new Instantiator
|
||||||
func NewWASMInstantiator(db *postgres.DB, instances [][2]string) *Instantiator {
|
func NewWASMInstantiator(db *postgres.DB, instances [][2]string) *Instantiator {
|
||||||
return &Instantiator{
|
return &Instantiator{
|
||||||
db: db,
|
db: db,
|
||||||
@ -30,13 +32,14 @@ func NewWASMInstantiator(db *postgres.DB, instances [][2]string) *Instantiator {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Instantiate is used to load the WASM functions into Postgres
|
||||||
func (i *Instantiator) Instantiate() error {
|
func (i *Instantiator) Instantiate() error {
|
||||||
tx, err := i.db.Beginx()
|
tx, err := i.db.Beginx()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, pn := range i.instances {
|
for _, pn := range i.instances {
|
||||||
_, err := i.db.Exec(`SELECT wasm_new_instance('$1', '$2')`, pn[0], pn[1])
|
_, err := tx.Exec(`SELECT wasm_new_instance('$1', '$2')`, pn[0], pn[1])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user