chainwatch: store mpool message additions

This commit is contained in:
Łukasz Magiera 2019-11-17 13:01:10 +01:00
parent 1f913b8df2
commit f40a1afac8
7 changed files with 176 additions and 7 deletions

View File

@ -12,6 +12,7 @@ import (
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/minio/blake2b-simd"
"github.com/polydawn/refmt/obj/atlas"
"golang.org/x/xerrors"
cbg "github.com/whyrusleeping/cbor-gen"
)
@ -147,6 +148,22 @@ func (a Address) Format(f fmt.State, c rune) {
}
}
func (a *Address) Scan(value interface{}) error {
switch value := value.(type) {
case string:
a1, err := decode(value)
if err != nil {
return err
}
*a = a1
return nil
default:
return xerrors.New("non-string types unsupported")
}
}
// NewIDAddress returns an address using the ID protocol.
func NewIDAddress(id uint64) (Address, error) {
return newAddress(ID, leb128.FromUInt64(id))

View File

@ -124,6 +124,28 @@ func (bi *BigInt) UnmarshalJSON(b []byte) error {
return nil
}
func (bi *BigInt) Scan(value interface{}) error {
switch value := value.(type) {
case string:
i, ok := big.NewInt(0).SetString(value, 10)
if !ok {
if value == "<nil>" {
return nil
}
return xerrors.Errorf("failed to parse bigint string: '%s'", value)
}
bi.Int = i
return nil
case int64:
bi.Int = big.NewInt(value)
return nil
default:
return xerrors.Errorf("non-string types unsupported: %T", value)
}
}
func (bi *BigInt) cborBytes() []byte {
if bi.Int == nil {
return []byte{}

View File

@ -74,6 +74,7 @@ var runCmd = &cli.Command{
defer st.close()
runSyncer(ctx, api, st)
go subMpool(ctx, api, st)
h, err := newHandler(api, st)
if err != nil {

View File

@ -0,0 +1,36 @@
package main
import (
"context"
"github.com/ipfs/go-cid"
aapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
)
func subMpool(ctx context.Context, api aapi.FullNode, st *storage) {
sub, err := api.MpoolSub(ctx)
if err != nil {
return
}
for change := range sub {
if change.Type != aapi.MpoolAdd {
continue
}
err := st.storeMessages(map[cid.Cid]*types.Message{
change.Message.Message.Cid(): &change.Message.Message,
})
if err != nil {
log.Error(err)
continue
}
if err := st.storeMpoolInclusion(change.Message.Message.Cid()); err != nil {
log.Error(err)
continue
}
}
}

View File

@ -19,9 +19,20 @@
</div>
<div class="Index-node">
Messages:
{{ range strings "messages" "case when `to` = ? then `from` else `to` end" "`from` = ? or `to` = ?" $wallet $wallet $wallet}}
<div>to/from {{.}}</div>
{{end}}
<table>
<tr><td>Dir</td><td>Peer</td><td>Nonce</td><td>Value</td><td>Block</td><td>Mpool Wait</td></tr>
{{ range messages "`from` = ? or `to` = ?" $wallet $wallet $wallet}}
<tr>
{{ if eq .From.String $wallet }}
<td>To</td><td>{{.To.String}}</td>
{{else}}
<td>From</td><td>{{.From.String}}</td>
{{end}}
<td>{{.Nonce}}</td>
<td>{{.Value}}</td>
</tr>
{{end}}
</table>
</div>
</div>
</div>

View File

@ -2,6 +2,7 @@ package main
import (
"database/sql"
"time"
"github.com/ipfs/go-cid"
_ "github.com/mattn/go-sqlite3"
@ -115,6 +116,19 @@ create table if not exists block_messages
primary key (block, message)
);
create table if not exists mpool_messages
(
msg text not null
constraint mpool_messages_pk
primary key
constraint mpool_messages_messages_cid_fk
references messages,
add_ts int not null
);
create unique index if not exists mpool_messages_msg_uindex
on mpool_messages (msg);
create table if not exists miner_heads
(
head text not null
@ -321,6 +335,27 @@ func (st *storage) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error {
return tx.Commit()
}
func (st *storage) storeMpoolInclusion(msg cid.Cid) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`insert into mpool_messages (msg, add_ts) VALUES (?, ?)`)
if err != nil {
return err
}
defer stmt.Close()
if _, err := stmt.Exec(
msg.String(),
time.Now().Unix(),
); err != nil {
return err
}
return tx.Commit()
}
func (st *storage) close() error {
return st.db.Close()
}

View File

@ -2,15 +2,17 @@ package main
import (
"fmt"
rice "github.com/GeertJohan/go.rice"
"github.com/filecoin-project/lotus/chain/types"
"golang.org/x/xerrors"
"html/template"
"net/http"
"os"
"path/filepath"
rice "github.com/GeertJohan/go.rice"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
)
type handler struct {
@ -40,6 +42,8 @@ func newHandler(api api.FullNode, st *storage) (*handler, error) {
"queryNum": h.queryNum,
"sizeStr": sizeStr,
"strings": h.strings,
"messages": h.messages,
"param": func(string) string { return "" }, // replaced in request handler
}
@ -90,7 +94,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
})
if err := t.Execute(w, nil); err != nil {
log.Error(err)
log.Errorf("%+v", err)
return
}
@ -205,4 +209,47 @@ func (h *handler) strings(table string, col string, filter string, args ...inter
return
}
func (h *handler) messages(filter string, args ...interface{}) (out []types.Message, err error) {
if len(filter) > 0 {
filter = " where " + filter
}
rws, err := h.st.db.Query("select * from messages "+filter, args...)
if err != nil {
return nil, err
}
for rws.Next() {
var r types.Message
var cs string
if err := rws.Scan(
&cs,
&r.From,
&r.To,
&r.Nonce,
&r.Value,
&r.GasPrice,
&r.GasLimit,
&r.Method,
&r.Params,
); err != nil {
return nil, err
}
c, err := cid.Parse(cs)
if err != nil {
return nil, err
}
if c != r.Cid() {
log.Warn("msg cid doesn't match")
}
out = append(out, r)
}
return
}
var _ http.Handler = &handler{}