Merge pull request #11075 from filecoin-project/bench/amt-churn

feat:lotus-bench:AMT benchmarking
This commit is contained in:
ZenGround0 2023-07-20 19:27:27 -04:00 committed by GitHub
commit 15faab8412
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 532 additions and 0 deletions

View File

@ -0,0 +1,312 @@
// Copied from go-amt-ipld https://github.com/filecoin-project/go-amt-ipld/tree/master/internal
// which for some reason is a go internal package and therefore cannot be imported
package main
import (
"fmt"
"io"
"math"
"sort"
cid "github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)
type AMTRoot struct {
BitWidth uint64
Height uint64
Count uint64
AMTNode AMTNode
}
type AMTNode struct {
Bmap []byte
Links []cid.Cid
Values []*cbg.Deferred
}
// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT.
var _ = xerrors.Errorf
var _ = cid.Undef
var _ = math.E
var _ = sort.Sort
var lengthBufAMTRoot = []byte{132}
func (t *AMTRoot) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
cw := cbg.NewCborWriter(w)
if _, err := cw.Write(lengthBufAMTRoot); err != nil {
return err
}
// t.BitWidth (uint64) (uint64)
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, t.BitWidth); err != nil {
return err
}
// t.Height (uint64) (uint64)
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, t.Height); err != nil {
return err
}
// t.Count (uint64) (uint64)
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, t.Count); err != nil {
return err
}
// t.AMTNode (internal.AMTNode) (struct)
if err := t.AMTNode.MarshalCBOR(cw); err != nil {
return err
}
return nil
}
func (t *AMTRoot) UnmarshalCBOR(r io.Reader) (err error) {
*t = AMTRoot{}
cr := cbg.NewCborReader(r)
maj, extra, err := cr.ReadHeader()
if err != nil {
return err
}
defer func() {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
}()
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 4 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.BitWidth (uint64) (uint64)
{
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.BitWidth = extra
}
// t.Height (uint64) (uint64)
{
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Height = extra
}
// t.Count (uint64) (uint64)
{
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Count = extra
}
// t.AMTNode (internal.AMTNode) (struct)
{
if err := t.AMTNode.UnmarshalCBOR(cr); err != nil {
return xerrors.Errorf("unmarshaling t.AMTNode: %w", err)
}
}
return nil
}
var lengthBufAMTNode = []byte{131}
func (t *AMTNode) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
cw := cbg.NewCborWriter(w)
if _, err := cw.Write(lengthBufAMTNode); err != nil {
return err
}
// t.Bmap ([]uint8) (slice)
if len(t.Bmap) > cbg.ByteArrayMaxLen {
return xerrors.Errorf("Byte array in field t.Bmap was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajByteString, uint64(len(t.Bmap))); err != nil {
return err
}
if _, err := cw.Write(t.Bmap[:]); err != nil {
return err
}
// t.Links ([]cid.Cid) (slice)
if len(t.Links) > cbg.MaxLength {
return xerrors.Errorf("Slice value in field t.Links was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Links))); err != nil {
return err
}
for _, v := range t.Links {
if err := cbg.WriteCid(w, v); err != nil {
return xerrors.Errorf("failed writing cid field t.Links: %w", err)
}
}
// t.Values ([]*typegen.Deferred) (slice)
if len(t.Values) > cbg.MaxLength {
return xerrors.Errorf("Slice value in field t.Values was too long")
}
if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Values))); err != nil {
return err
}
for _, v := range t.Values {
if err := v.MarshalCBOR(cw); err != nil {
return err
}
}
return nil
}
func (t *AMTNode) UnmarshalCBOR(r io.Reader) (err error) {
*t = AMTNode{}
cr := cbg.NewCborReader(r)
maj, extra, err := cr.ReadHeader()
if err != nil {
return err
}
defer func() {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
}()
if maj != cbg.MajArray {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 3 {
return fmt.Errorf("cbor input had wrong number of fields")
}
// t.Bmap ([]uint8) (slice)
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.Bmap: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
if extra > 0 {
t.Bmap = make([]uint8, extra)
}
if _, err := io.ReadFull(cr, t.Bmap[:]); err != nil {
return err
}
// t.Links ([]cid.Cid) (slice)
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if extra > cbg.MaxLength {
return fmt.Errorf("t.Links: array too large (%d)", extra)
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.Links = make([]cid.Cid, extra)
}
for i := 0; i < int(extra); i++ {
c, err := cbg.ReadCid(cr)
if err != nil {
return xerrors.Errorf("reading cid field t.Links failed: %w", err)
}
t.Links[i] = c
}
// t.Values ([]*typegen.Deferred) (slice)
maj, extra, err = cr.ReadHeader()
if err != nil {
return err
}
if extra > cbg.MaxLength {
return fmt.Errorf("t.Values: array too large (%d)", extra)
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.Values = make([]*cbg.Deferred, extra)
}
for i := 0; i < int(extra); i++ {
var v cbg.Deferred
if err := v.UnmarshalCBOR(cr); err != nil {
return err
}
t.Values[i] = &v
}
return nil
}

View File

@ -1,6 +1,7 @@
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
@ -8,9 +9,16 @@ import (
"math/rand"
"os"
"path/filepath"
"sync"
"time"
"github.com/docker/go-units"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/go-cid"
offline "github.com/ipfs/go-ipfs-exchange-offline"
cbor "github.com/ipfs/go-ipld-cbor"
format "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
"github.com/minio/blake2b-simd"
"github.com/mitchellh/go-homedir"
@ -20,10 +28,14 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
prooftypes "github.com/filecoin-project/go-state-types/proof"
adt "github.com/filecoin-project/specs-actors/v6/actors/util/adt"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
@ -104,6 +116,7 @@ func main() {
DisableSliceFlagSeparator: true,
Commands: []*cli.Command{
proveCmd,
amtBenchCmd,
sealBenchCmd,
simpleCmd,
importBenchCmd,
@ -117,6 +130,211 @@ func main() {
}
}
type amtStatCollector struct {
ds format.NodeGetter
walk func(format.Node) ([]*format.Link, error)
statsLk sync.Mutex
totalAMTLinks int
totalAMTValues int
totalAMTLinkNodes int
totalAMTValueNodes int
totalAMTLinkNodeSize int
totalAMTValueNodeSize int
}
func (asc *amtStatCollector) String() string {
asc.statsLk.Lock()
defer asc.statsLk.Unlock()
str := "\n------------\n"
str += fmt.Sprintf("Link Count: %d\n", asc.totalAMTLinks)
str += fmt.Sprintf("Value Count: %d\n", asc.totalAMTValues)
str += fmt.Sprintf("%d link nodes %d bytes\n", asc.totalAMTLinkNodes, asc.totalAMTLinkNodeSize)
str += fmt.Sprintf("%d value nodes %d bytes\n", asc.totalAMTValueNodes, asc.totalAMTValueNodeSize)
str += fmt.Sprintf("Total bytes: %d\n------------\n", asc.totalAMTLinkNodeSize+asc.totalAMTValueNodeSize)
return str
}
func (asc *amtStatCollector) record(ctx context.Context, nd format.Node) error {
size, err := nd.Size()
if err != nil {
return err
}
var node AMTNode
if err := node.UnmarshalCBOR(bytes.NewReader(nd.RawData())); err != nil {
// try to deserialize root
var root AMTRoot
if err := root.UnmarshalCBOR(bytes.NewReader(nd.RawData())); err != nil {
return err
}
node = root.AMTNode
}
asc.statsLk.Lock()
defer asc.statsLk.Unlock()
link := len(node.Links) > 0
value := len(node.Values) > 0
if link {
asc.totalAMTLinks += len(node.Links)
asc.totalAMTLinkNodes++
asc.totalAMTLinkNodeSize += int(size)
} else if value {
asc.totalAMTValues += len(node.Values)
asc.totalAMTValueNodes++
asc.totalAMTValueNodeSize += int(size)
} else {
return xerrors.Errorf("unexpected AMT node %x: neither link nor value", nd.RawData())
}
return nil
}
func (asc *amtStatCollector) walkLinks(ctx context.Context, c cid.Cid) ([]*format.Link, error) {
nd, err := asc.ds.Get(ctx, c)
if err != nil {
return nil, err
}
if err := asc.record(ctx, nd); err != nil {
return nil, err
}
return asc.walk(nd)
}
func carWalkFunc(nd format.Node) (out []*format.Link, err error) {
for _, link := range nd.Links() {
if link.Cid.Prefix().Codec == cid.FilCommitmentSealed || link.Cid.Prefix().Codec == cid.FilCommitmentUnsealed {
continue
}
out = append(out, link)
}
return out, nil
}
var amtBenchCmd = &cli.Command{
Name: "amt",
Usage: "Benchmark AMT churn",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "rounds",
Usage: "rounds of churn to measure",
Value: 1,
},
&cli.IntFlag{
Name: "interval",
Usage: "AMT idx interval for churning values",
Value: 2880,
},
&cli.IntFlag{
Name: "bitwidth",
Usage: "AMT bitwidth",
Value: 6,
},
},
Action: func(c *cli.Context) error {
bs := blockstore.NewMemory()
ctx := c.Context
store := adt.WrapStore(ctx, cbor.NewCborStore(bs))
// Setup in memory blockstore
bitwidth := c.Int("bitwidth")
array, err := adt.MakeEmptyArray(store, bitwidth)
if err != nil {
return err
}
// Using motivating empirical example: market actor states AMT
// Create 40,000,000 states for realistic workload
fmt.Printf("Populating AMT\n")
for i := 0; i < 40000000; i++ {
if err := array.Set(uint64(i), &market.DealState{
SectorStartEpoch: abi.ChainEpoch(2000000 + i),
LastUpdatedEpoch: abi.ChainEpoch(-1),
SlashEpoch: -1,
VerifiedClaim: verifreg.AllocationId(i),
}); err != nil {
return err
}
}
r, err := array.Root()
if err != nil {
return err
}
// Measure ratio of internal / leaf nodes / sizes
dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
asc := &amtStatCollector{
ds: dag,
walk: carWalkFunc,
}
fmt.Printf("Measuring AMT\n")
seen := cid.NewSet()
if err := merkledag.Walk(ctx, asc.walkLinks, r, seen.Visit, merkledag.Concurrent()); err != nil {
return err
}
fmt.Printf("%s\n", asc)
// Overwrite ids with idx % interval: one epoch of market cron
rounds := c.Int("rounds")
interval := c.Int("interval")
fmt.Printf("Overwrite 1 out of %d values for %d rounds\n", interval, rounds)
array, err = adt.AsArray(store, r, bitwidth)
if err != nil {
return err
}
roots := make([]cid.Cid, rounds)
for j := 0; j < rounds; j++ {
if j%10 == 0 {
fmt.Printf("round: %d\n", j)
}
for i := j; i < 40000000; i += interval {
if i%interval == j {
if err := array.Set(uint64(i), &market.DealState{
SectorStartEpoch: abi.ChainEpoch(2000000 + i),
LastUpdatedEpoch: abi.ChainEpoch(1),
SlashEpoch: -1,
VerifiedClaim: verifreg.AllocationId(i),
}); err != nil {
return err
}
}
}
roots[j], err = array.Root()
if err != nil {
return err
}
}
// Measure churn
dag = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
asc = &amtStatCollector{
ds: dag,
walk: carWalkFunc,
}
fmt.Printf("Measuring %d rounds of churn\n", rounds)
for _, r := range roots {
if err := merkledag.Walk(ctx, asc.walkLinks, r, seen.Visit, merkledag.Concurrent()); err != nil {
return err
}
}
fmt.Printf("%s\n", asc)
return nil
},
}
var sealBenchCmd = &cli.Command{
Name: "sealing",
Usage: "Benchmark seal and winning post and window post",

1
go.mod
View File

@ -89,6 +89,7 @@ require (
github.com/ipfs/go-fs-lock v0.0.7
github.com/ipfs/go-graphsync v0.14.6
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.3.0
github.com/ipfs/go-ipld-cbor v0.0.6
github.com/ipfs/go-ipld-format v0.5.0
github.com/ipfs/go-log/v2 v2.5.1

1
go.sum
View File

@ -742,6 +742,7 @@ github.com/ipfs/go-ipfs-exchange-interface v0.2.0/go.mod h1:z6+RhJuDQbqKguVyslSO
github.com/ipfs/go-ipfs-exchange-offline v0.0.1/go.mod h1:WhHSFCVYX36H/anEKQboAzpUws3x7UeEGkzQc3iNkM0=
github.com/ipfs/go-ipfs-exchange-offline v0.1.1/go.mod h1:vTiBRIbzSwDD0OWm+i3xeT0mO7jG2cbJYatp3HPk5XY=
github.com/ipfs/go-ipfs-exchange-offline v0.3.0 h1:c/Dg8GDPzixGd0MC8Jh6mjOwU57uYokgWRFidfvEkuA=
github.com/ipfs/go-ipfs-exchange-offline v0.3.0/go.mod h1:MOdJ9DChbb5u37M1IcbrRB02e++Z7521fMxqCNRrz9s=
github.com/ipfs/go-ipfs-files v0.0.3/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjNoE7yA8Y1d4=
github.com/ipfs/go-ipfs-files v0.0.4/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjNoE7yA8Y1d4=
github.com/ipfs/go-ipfs-files v0.3.0 h1:fallckyc5PYjuMEitPNrjRfpwl7YFt69heCOUhsbGxQ=