tvx simulate: execute raw message against height.

This commit is contained in:
Raúl Kripalani 2020-10-21 10:58:07 +01:00
parent 2c3d80494f
commit 92395745d3
4 changed files with 309 additions and 59 deletions

View File

@ -19,7 +19,6 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin/reward"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/conformance"
"github.com/filecoin-project/test-vectors/schema"
@ -100,41 +99,24 @@ var extractCmd = &cli.Command{
}
func runExtract(c *cli.Context) error {
// LOTUS_DISABLE_VM_BUF disables what's called "VM state tree buffering",
// which stashes write operations in a BufferedBlockstore
// (https://github.com/filecoin-project/lotus/blob/b7a4dbb07fd8332b4492313a617e3458f8003b2a/lib/bufbstore/buf_bstore.go#L21)
// such that they're not written until the VM is actually flushed.
//
// For some reason, the standard behaviour was not working for me (raulk),
// and disabling it (such that the state transformations are written immediately
// to the blockstore) worked.
_ = os.Setenv("LOTUS_DISABLE_VM_BUF", "iknowitsabadidea")
return doExtract(extractFlags)
}
func doExtract(opts extractOpts) error {
ctx := context.Background()
// Make the API client.
fapi, closer, err := lcli.GetFullNodeAPI(c)
if err != nil {
return err
}
defer closer()
return doExtract(ctx, fapi, extractFlags)
}
func doExtract(ctx context.Context, fapi api.FullNode, opts extractOpts) error {
mcid, err := cid.Decode(opts.cid)
if err != nil {
return err
}
msg, execTs, incTs, err := resolveFromChain(ctx, fapi, mcid, opts.block)
msg, execTs, incTs, err := resolveFromChain(ctx, FullAPI, mcid, opts.block)
if err != nil {
return fmt.Errorf("failed to resolve message and tipsets from chain: %w", err)
}
// get the circulating supply before the message was executed.
circSupplyDetail, err := fapi.StateVMCirculatingSupplyInternal(ctx, incTs.Key())
circSupplyDetail, err := FullAPI.StateVMCirculatingSupplyInternal(ctx, incTs.Key())
if err != nil {
return fmt.Errorf("failed while fetching circulating supply: %w", err)
}
@ -147,7 +129,7 @@ func doExtract(ctx context.Context, fapi api.FullNode, opts extractOpts) error {
log.Printf("finding precursor messages using mode: %s", opts.precursor)
// Fetch messages in canonical order from inclusion tipset.
msgs, err := fapi.ChainGetParentMessages(ctx, execTs.Blocks()[0].Cid())
msgs, err := FullAPI.ChainGetParentMessages(ctx, execTs.Blocks()[0].Cid())
if err != nil {
return fmt.Errorf("failed to fetch messages in canonical order from inclusion tipset: %w", err)
}
@ -174,8 +156,8 @@ func doExtract(ctx context.Context, fapi api.FullNode, opts extractOpts) error {
var (
// create a read-through store that uses ChainGetObject to fetch unknown CIDs.
pst = NewProxyingStores(ctx, fapi)
g = NewSurgeon(ctx, fapi, pst)
pst = NewProxyingStores(ctx, FullAPI)
g = NewSurgeon(ctx, FullAPI, pst)
)
driver := conformance.NewDriver(ctx, schema.Selector{}, conformance.DriverOpts{
@ -200,7 +182,7 @@ func doExtract(ctx context.Context, fapi api.FullNode, opts extractOpts) error {
CircSupply: circSupplyDetail.FilCirculating,
BaseFee: basefee,
// recorded randomness will be discarded.
Rand: conformance.NewRecordingRand(new(conformance.LogReporter), fapi),
Rand: conformance.NewRecordingRand(new(conformance.LogReporter), FullAPI),
})
if err != nil {
return fmt.Errorf("failed to execute precursor message: %w", err)
@ -215,7 +197,7 @@ func doExtract(ctx context.Context, fapi api.FullNode, opts extractOpts) error {
retention = opts.retain
// recordingRand will record randomness so we can embed it in the test vector.
recordingRand = conformance.NewRecordingRand(new(conformance.LogReporter), fapi)
recordingRand = conformance.NewRecordingRand(new(conformance.LogReporter), FullAPI)
)
log.Printf("using state retention strategy: %s", retention)
@ -248,7 +230,7 @@ func doExtract(ctx context.Context, fapi api.FullNode, opts extractOpts) error {
case "accessed-actors":
log.Printf("calculating accessed actors")
// get actors accessed by message.
retain, err := g.GetAccessedActors(ctx, fapi, mcid)
retain, err := g.GetAccessedActors(ctx, FullAPI, mcid)
if err != nil {
return fmt.Errorf("failed to calculate accessed actors: %w", err)
}
@ -286,7 +268,7 @@ func doExtract(ctx context.Context, fapi api.FullNode, opts extractOpts) error {
// TODO sometimes this returns a nil receipt and no error ¯\_(ツ)_/¯
// ex: https://filfox.info/en/message/bafy2bzacebpxw3yiaxzy2bako62akig46x3imji7fewszen6fryiz6nymu2b2
// This code is lenient and skips receipt comparison in case of a nil receipt.
rec, err := fapi.StateGetReceipt(ctx, mcid, execTs.Key())
rec, err := FullAPI.StateGetReceipt(ctx, mcid, execTs.Key())
if err != nil {
return fmt.Errorf("failed to find receipt on chain: %w", err)
}
@ -336,17 +318,17 @@ func doExtract(ctx context.Context, fapi api.FullNode, opts extractOpts) error {
return err
}
version, err := fapi.Version(ctx)
version, err := FullAPI.Version(ctx)
if err != nil {
return err
}
ntwkName, err := fapi.StateNetworkName(ctx)
ntwkName, err := FullAPI.StateNetworkName(ctx)
if err != nil {
return err
}
nv, err := fapi.StateNetworkVersion(ctx, execTs.Key())
nv, err := FullAPI.StateNetworkVersion(ctx, execTs.Key())
if err != nil {
return err
}
@ -399,8 +381,12 @@ func doExtract(ctx context.Context, fapi api.FullNode, opts extractOpts) error {
},
}
return writeVector(vector, opts.file)
}
func writeVector(vector schema.TestVector, file string) (err error) {
output := io.WriteCloser(os.Stdout)
if file := opts.file; file != "" {
if file := file; file != "" {
dir := filepath.Dir(file)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("unable to create directory %s: %w", dir, err)
@ -415,11 +401,7 @@ func doExtract(ctx context.Context, fapi api.FullNode, opts extractOpts) error {
enc := json.NewEncoder(output)
enc.SetIndent("", " ")
if err := enc.Encode(&vector); err != nil {
return err
}
return nil
return enc.Encode(&vector)
}
// resolveFromChain queries the chain for the provided message, using the block CID to

View File

@ -1,7 +1,6 @@
package main
import (
"context"
"encoding/csv"
"fmt"
"io"
@ -20,7 +19,6 @@ import (
"github.com/urfave/cli/v2"
"github.com/filecoin-project/lotus/chain/stmgr"
lcli "github.com/filecoin-project/lotus/cli"
)
var extractManyFlags struct {
@ -77,15 +75,6 @@ func runExtractMany(c *cli.Context) error {
// to the blockstore) worked.
_ = os.Setenv("LOTUS_DISABLE_VM_BUF", "iknowitsabadidea")
ctx := context.Background()
// Make the API client.
fapi, closer, err := lcli.GetFullNodeAPI(c)
if err != nil {
return err
}
defer closer()
var (
in = extractManyFlags.in
outdir = extractManyFlags.outdir
@ -198,8 +187,8 @@ func runExtractMany(c *cli.Context) error {
precursor: PrecursorSelectSender,
}
if err := doExtract(ctx, fapi, opts); err != nil {
log.Println(color.RedString("failed to extract vector for message %s: %s; queuing for 'canonical' precursor selection", mcid, err))
if err := doExtract(opts); err != nil {
log.Println(color.RedString("failed to extract vector for message %s: %s; queuing for 'all' precursor selection", mcid, err))
retry = append(retry, opts)
continue
}
@ -215,7 +204,7 @@ func runExtractMany(c *cli.Context) error {
log.Printf("retrying %s: %s", r.cid, r.id)
r.precursor = PrecursorSelectAll
if err := doExtract(ctx, fapi, r); err != nil {
if err := doExtract(r); err != nil {
merr = multierror.Append(merr, fmt.Errorf("failed to extract vector for message %s: %w", r.cid, err))
continue
}

View File

@ -5,9 +5,21 @@ import (
"os"
"sort"
"github.com/filecoin-project/go-jsonrpc"
"github.com/urfave/cli/v2"
"github.com/filecoin-project/lotus/api"
lcli "github.com/filecoin-project/lotus/cli"
)
// FullAPI is a JSON-RPC client targeting a full node. It's initialized in a
// cli.BeforeFunc.
var FullAPI api.FullNode
// Closer is the closer for the JSON-RPC client, which must be called on
// cli.AfterFunc.
var Closer jsonrpc.ClientCloser
// DefaultLotusRepoPath is where the fallback path where to look for a Lotus
// client repo. It is expanded with mitchellh/go-homedir, so it'll work with all
// OSes despite the Unix twiddle notation.
@ -23,7 +35,7 @@ var repoFlag = cli.StringFlag{
func main() {
app := &cli.App{
Name: "tvx",
Description: `tvx is a tool for extracting and executing test vectors. It has three subcommands.
Description: `tvx is a tool for extracting and executing test vectors. It has four subcommands.
tvx extract extracts a test vector from a live network. It requires access to
a Filecoin client that exposes the standard JSON-RPC API endpoint. Only
@ -35,6 +47,15 @@ func main() {
tvx extract-many performs a batch extraction of many messages, supplied in a
CSV file. Refer to the help of that subcommand for more info.
tvx project projects an existing test vector against a different protocol
version, reporting the result, optionally appending a new variant to the
vector in place if deemed equivalent, or producing a new vector if
non-equivalent.
tvx simulate takes a raw message and simulates it on top of the supplied
epoch, reporting the result on stderr and writing a test vector on stdout
or into the specified file.
SETTING THE JSON-RPC API ENDPOINT
You can set the JSON-RPC API endpoint through one of the following methods.
@ -57,7 +78,10 @@ func main() {
extractCmd,
execCmd,
extractManyCmd,
simulateCmd,
},
Before: initialize,
After: destroy,
}
sort.Sort(cli.CommandsByName(app.Commands))
@ -69,3 +93,27 @@ func main() {
log.Fatal(err)
}
}
func initialize(c *cli.Context) error {
// LOTUS_DISABLE_VM_BUF disables what's called "VM state tree buffering",
// which stashes write operations in a BufferedBlockstore
// (https://github.com/filecoin-project/lotus/blob/b7a4dbb07fd8332b4492313a617e3458f8003b2a/lib/bufbstore/buf_bstore.go#L21)
// such that they're not written until the VM is actually flushed.
//
// For some reason, the standard behaviour was not working for me (raulk),
// and disabling it (such that the state transformations are written immediately
// to the blockstore) worked.
_ = os.Setenv("LOTUS_DISABLE_VM_BUF", "iknowitsabadidea")
// Make the API client.
var err error
FullAPI, Closer, err = lcli.GetFullNodeAPI(c)
return err
}
func destroy(_ *cli.Context) error {
if Closer != nil {
Closer()
}
return nil
}

231
cmd/tvx/simulate.go Normal file
View File

@ -0,0 +1,231 @@
package main
import (
"bytes"
"compress/gzip"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"log"
"os/exec"
"github.com/fatih/color"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/test-vectors/schema"
"github.com/urfave/cli/v2"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/conformance"
)
var simulateFlags struct {
msg string
epoch int64
out string
statediff bool
}
var simulateCmd = &cli.Command{
Name: "simulate",
Description: "simulate a raw message on top of the supplied" +
"epoch, reporting the result on stderr and writing a test vector on stdout" +
"or into the specified file",
Action: runSimulateCmd,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "msg",
Usage: "base64 cbor-encoded message",
Destination: &simulateFlags.msg,
},
&cli.Int64Flag{
Name: "at-epoch",
Usage: "epoch at which to run this message (or HEAD if not provided)",
Destination: &simulateFlags.epoch,
},
&cli.StringFlag{
Name: "out",
Usage: "file to write the test vector to; if nil, the vector will be written to stdout",
TakesFile: true,
Destination: &simulateFlags.out,
},
&cli.BoolFlag{
Name: "statediff",
Usage: "display a statediff of the precondition and postcondition states",
Destination: &simulateFlags.statediff,
},
},
}
func runSimulateCmd(_ *cli.Context) error {
ctx := context.Background()
r := new(conformance.LogReporter)
msgb, err := base64.StdEncoding.DecodeString(simulateFlags.msg)
if err != nil {
return fmt.Errorf("failed to base64-decode message: %w", err)
}
msg, err := types.DecodeMessage(msgb)
if err != nil {
return fmt.Errorf("failed to deserialize message: %w", err)
}
log.Printf("message to simulate has CID: %s", msg.Cid())
msgjson, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to serialize message to json for printing: %w", err)
}
log.Printf("message to simulate: %s", string(msgjson))
// Resolve the tipset, root, epoch.
var ts *types.TipSet
if epochIn := simulateFlags.epoch; epochIn == 0 {
ts, err = FullAPI.ChainHead(ctx)
} else {
ts, err = FullAPI.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(epochIn), types.EmptyTSK)
}
if err != nil {
return fmt.Errorf("failed to get tipset: %w", err)
}
var (
preroot = ts.ParentState()
epoch = ts.Height()
baseFee = ts.Blocks()[0].ParentBaseFee
circSupply api.CirculatingSupply
)
// Get circulating supply.
circSupply, err = FullAPI.StateVMCirculatingSupplyInternal(ctx, ts.Key())
if err != nil {
return fmt.Errorf("failed to get circulating supply for tipset %s: %w", ts.Key(), err)
}
// Create the driver.
stores := NewProxyingStores(ctx, FullAPI)
driver := conformance.NewDriver(ctx, schema.Selector{}, conformance.DriverOpts{
DisableVMFlush: true,
})
rand := conformance.NewRecordingRand(r, FullAPI)
tbs, ok := stores.Blockstore.(TracingBlockstore)
if !ok {
return fmt.Errorf("no tracing blockstore available")
}
tbs.StartTracing()
applyret, postroot, err := driver.ExecuteMessage(stores.Blockstore, conformance.ExecuteMessageParams{
Preroot: preroot,
Epoch: epoch,
Message: msg,
CircSupply: circSupply.FilCirculating,
BaseFee: baseFee,
Rand: rand,
})
if err != nil {
return fmt.Errorf("failed to apply message: %w", err)
}
accessed := tbs.FinishTracing()
var (
out = new(bytes.Buffer)
gw = gzip.NewWriter(out)
g = NewSurgeon(ctx, FullAPI, stores)
)
if err := g.WriteCARIncluding(gw, accessed, preroot, postroot); err != nil {
return err
}
if err = gw.Flush(); err != nil {
return err
}
if err = gw.Close(); err != nil {
return err
}
version, err := FullAPI.Version(ctx)
if err != nil {
log.Printf("failed to get node version: %s; falling back to unknown", err)
version = api.Version{}
}
nv, err := FullAPI.StateNetworkVersion(ctx, ts.Key())
if err != nil {
return err
}
codename := GetProtocolCodename(epoch)
// Write out the test vector.
vector := schema.TestVector{
Class: schema.ClassMessage,
Meta: &schema.Metadata{
ID: fmt.Sprintf("simulated-%s", msg.Cid()),
Gen: []schema.GenerationData{
{Source: "github.com/filecoin-project/lotus", Version: version.String()}},
},
Selector: schema.Selector{
schema.SelectorMinProtocolVersion: codename,
},
Randomness: rand.Recorded(),
CAR: out.Bytes(),
Pre: &schema.Preconditions{
Variants: []schema.Variant{
{ID: codename, Epoch: int64(epoch), NetworkVersion: uint(nv)},
},
CircSupply: circSupply.FilCirculating.Int,
BaseFee: baseFee.Int,
StateTree: &schema.StateTree{
RootCID: preroot,
},
},
ApplyMessages: []schema.Message{{Bytes: msgb}},
Post: &schema.Postconditions{
StateTree: &schema.StateTree{
RootCID: postroot,
},
Receipts: []*schema.Receipt{
{
ExitCode: int64(applyret.ExitCode),
ReturnValue: applyret.Return,
GasUsed: applyret.GasUsed,
},
},
},
}
if err := writeVector(vector, simulateFlags.out); err != nil {
return fmt.Errorf("failed to write vector: %w", err)
}
log.Printf(color.GreenString("wrote vector at: %s"), simulateFlags.out)
if !simulateFlags.statediff {
return nil
}
if simulateFlags.out == "" {
log.Print("omitting statediff in non-file mode")
return nil
}
// check if statediff is installed; if not, skip.
if err := exec.Command("statediff", "--help").Run(); err != nil {
log.Printf("could not perform statediff on generated vector; command not found (%s)", err)
log.Printf("install statediff with:")
log.Printf("$ GOMODULE111=off go get github.com/filecoin-project/statediff/cmd/statediff")
return err
}
stdiff, err := exec.Command("statediff", "vector", "--file", simulateFlags.out).CombinedOutput()
if err != nil {
return fmt.Errorf("failed to statediff: %w", err)
}
log.Print(string(stdiff))
return nil
}