diff --git a/cmd/lotus-bench/amt_internal.go b/cmd/lotus-bench/amt_internal.go new file mode 100644 index 000000000..892e4bf06 --- /dev/null +++ b/cmd/lotus-bench/amt_internal.go @@ -0,0 +1,309 @@ +package main + +import ( + "fmt" + "io" + "math" + "sort" + + cid "github.com/ipfs/go-cid" + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" +) + +type AMTRoot struct { + BitWidth uint64 + Height uint64 + Count uint64 + AMTNode AMTNode +} + +type AMTNode struct { + Bmap []byte + Links []cid.Cid + Values []*cbg.Deferred +} + +// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. + +var _ = xerrors.Errorf +var _ = cid.Undef +var _ = math.E +var _ = sort.Sort + +var lengthBufAMTRoot = []byte{132} + +func (t *AMTRoot) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + cw := cbg.NewCborWriter(w) + + if _, err := cw.Write(lengthBufAMTRoot); err != nil { + return err + } + + // t.BitWidth (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.BitWidth)); err != nil { + return err + } + + // t.Height (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Height)); err != nil { + return err + } + + // t.Count (uint64) (uint64) + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Count)); err != nil { + return err + } + + // t.AMTNode (internal.AMTNode) (struct) + if err := t.AMTNode.MarshalCBOR(cw); err != nil { + return err + } + return nil +} + +func (t *AMTRoot) UnmarshalCBOR(r io.Reader) (err error) { + *t = AMTRoot{} + + cr := cbg.NewCborReader(r) + + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + defer func() { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + }() + + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 4 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.BitWidth (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.BitWidth = uint64(extra) + + } + // t.Height (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Height = uint64(extra) + + } + // t.Count (uint64) (uint64) + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.Count = uint64(extra) + + } + // t.AMTNode (internal.AMTNode) (struct) + + { + + if err := t.AMTNode.UnmarshalCBOR(cr); err != nil { + return xerrors.Errorf("unmarshaling t.AMTNode: %w", err) + } + + } + return nil +} + +var lengthBufAMTNode = []byte{131} + +func (t *AMTNode) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + + cw := cbg.NewCborWriter(w) + + if _, err := cw.Write(lengthBufAMTNode); err != nil { + return err + } + + // t.Bmap ([]uint8) (slice) + if len(t.Bmap) > cbg.ByteArrayMaxLen { + return xerrors.Errorf("Byte array in field t.Bmap was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajByteString, uint64(len(t.Bmap))); err != nil { + return err + } + + if _, err := cw.Write(t.Bmap[:]); err != nil { + return err + } + + // t.Links ([]cid.Cid) (slice) + if len(t.Links) > cbg.MaxLength { + return xerrors.Errorf("Slice value in field t.Links was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Links))); err != nil { + return err + } + for _, v := range t.Links { + if err := cbg.WriteCid(w, v); err != nil { + return xerrors.Errorf("failed writing cid field t.Links: %w", err) + } + } + + // t.Values ([]*typegen.Deferred) (slice) + if len(t.Values) > cbg.MaxLength { + return xerrors.Errorf("Slice value in field t.Values was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajArray, uint64(len(t.Values))); err != nil { + return err + } + for _, v := range t.Values { + if err := v.MarshalCBOR(cw); err != nil { + return err + } + } + return nil +} + +func (t *AMTNode) UnmarshalCBOR(r io.Reader) (err error) { + *t = AMTNode{} + + cr := cbg.NewCborReader(r) + + maj, extra, err := cr.ReadHeader() + if err != nil { + return err + } + defer func() { + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + }() + + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 3 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Bmap ([]uint8) (slice) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + + if extra > cbg.ByteArrayMaxLen { + return fmt.Errorf("t.Bmap: byte array too large (%d)", extra) + } + if maj != cbg.MajByteString { + return fmt.Errorf("expected byte array") + } + + if extra > 0 { + t.Bmap = make([]uint8, extra) + } + + if _, err := io.ReadFull(cr, t.Bmap[:]); err != nil { + return err + } + // t.Links ([]cid.Cid) (slice) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + + if extra > cbg.MaxLength { + return fmt.Errorf("t.Links: array too large (%d)", extra) + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + + if extra > 0 { + t.Links = make([]cid.Cid, extra) + } + + for i := 0; i < int(extra); i++ { + + c, err := cbg.ReadCid(cr) + if err != nil { + return xerrors.Errorf("reading cid field t.Links failed: %w", err) + } + t.Links[i] = c + } + + // t.Values ([]*typegen.Deferred) (slice) + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + + if extra > cbg.MaxLength { + return fmt.Errorf("t.Values: array too large (%d)", extra) + } + + if maj != cbg.MajArray { + return fmt.Errorf("expected cbor array") + } + + if extra > 0 { + t.Values = make([]*cbg.Deferred, extra) + } + + for i := 0; i < int(extra); i++ { + + var v cbg.Deferred + if err := v.UnmarshalCBOR(cr); err != nil { + return err + } + + t.Values[i] = &v + } + + return nil +} diff --git a/cmd/lotus-bench/main.go b/cmd/lotus-bench/main.go index 6e7e274f2..623c95eab 100644 --- a/cmd/lotus-bench/main.go +++ b/cmd/lotus-bench/main.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "context" "encoding/json" "fmt" @@ -8,9 +9,14 @@ import ( "math/rand" "os" "path/filepath" + "sync" "time" "github.com/docker/go-units" + "github.com/ipfs/boxo/blockservice" + "github.com/ipfs/go-cid" + offline "github.com/ipfs/go-ipfs-exchange-offline" + format "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" "github.com/minio/blake2b-simd" "github.com/mitchellh/go-homedir" @@ -20,7 +26,13 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-paramfetch" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/builtin/v9/verifreg" prooftypes "github.com/filecoin-project/go-state-types/proof" + "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" + adt "github.com/filecoin-project/specs-actors/v6/actors/util/adt" + "github.com/ipfs/boxo/ipld/merkledag" + cbor "github.com/ipfs/go-ipld-cbor" lapi "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" @@ -104,6 +116,7 @@ func main() { DisableSliceFlagSeparator: true, Commands: []*cli.Command{ proveCmd, + amtBenchCmd, sealBenchCmd, simpleCmd, importBenchCmd, @@ -117,6 +130,202 @@ func main() { } } +type amtStatCollector struct { + ds format.NodeGetter + walk func(format.Node) ([]*format.Link, error) + + statsLk sync.Mutex + totalAMTLinks int + totalAMTValues int + totalAMTLinkNodes int + totalAMTValueNodes int + totalAMTLinkNodeSize int + totalAMTValueNodeSize int +} + +func (asc *amtStatCollector) String() string { + asc.statsLk.Lock() + defer asc.statsLk.Unlock() + + str := "\n------------\n" + str += fmt.Sprintf("Link Count: %d\n", asc.totalAMTLinks) + str += fmt.Sprintf("Value Count: %d\n", asc.totalAMTValues) + str += fmt.Sprintf("%d link nodes %d bytes\n", asc.totalAMTLinkNodes, asc.totalAMTLinkNodeSize) + str += fmt.Sprintf("%d value nodes %d bytes\n", asc.totalAMTValueNodes, asc.totalAMTValueNodeSize) + str += fmt.Sprintf("Total bytes: %d\n------------\n", asc.totalAMTLinkNodeSize+asc.totalAMTValueNodeSize) + return str +} + +func (asc *amtStatCollector) record(ctx context.Context, nd format.Node) error { + size, err := nd.Size() + if err != nil { + return err + } + + var node AMTNode + if err := node.UnmarshalCBOR(bytes.NewReader(nd.RawData())); err != nil { + // try to deserialize root + var root AMTRoot + if err := root.UnmarshalCBOR(bytes.NewReader(nd.RawData())); err != nil { + return err + } + node = root.AMTNode + } + + asc.statsLk.Lock() + defer asc.statsLk.Unlock() + + link := len(node.Links) > 0 + value := len(node.Values) > 0 + + if link { + asc.totalAMTLinks += len(node.Links) + asc.totalAMTLinkNodes += 1 + asc.totalAMTLinkNodeSize += int(size) + } else if value { + asc.totalAMTValues += len(node.Values) + asc.totalAMTValueNodes += 1 + asc.totalAMTValueNodeSize += int(size) + } else { + return xerrors.Errorf("unexpected AMT node %x: neither link nor value", nd.RawData()) + } + + return nil +} + +func (asc *amtStatCollector) walkLinks(ctx context.Context, c cid.Cid) ([]*format.Link, error) { + nd, err := asc.ds.Get(ctx, c) + if err != nil { + return nil, err + } + + if err := asc.record(ctx, nd); err != nil { + return nil, err + } + + return asc.walk(nd) +} + +func carWalkFunc(nd format.Node) (out []*format.Link, err error) { + for _, link := range nd.Links() { + if link.Cid.Prefix().Codec == cid.FilCommitmentSealed || link.Cid.Prefix().Codec == cid.FilCommitmentUnsealed { + continue + } + out = append(out, link) + } + return out, nil +} + +var amtBenchCmd = &cli.Command{ + Name: "amt", + Usage: "Benchmark AMT churn", + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "rounds", + Usage: "rounds of churn to measure", + Value: 1, + }, + &cli.IntFlag{ + Name: "interval", + Usage: "AMT idx interval for churning values", + Value: 2880, + }, + }, + Action: func(c *cli.Context) error { + bs := blockstore.NewMemory() + ctx := c.Context + store := adt.WrapStore(ctx, cbor.NewCborStore(bs)) + + // Setup in memory blockstore + const bitwidth = 6 + array, err := adt.MakeEmptyArray(store, bitwidth) + if err != nil { + return err + } + + // Using motivating empirical example: market actor states AMT + // Create 40,000,000 states for realistic workload + fmt.Printf("Populating AMT\n") + for i := 0; i < 40000000; i++ { + array.Set(uint64(i), &market.DealState{ + SectorStartEpoch: abi.ChainEpoch(2000000 + i), + LastUpdatedEpoch: abi.ChainEpoch(-1), + SlashEpoch: -1, + VerifiedClaim: verifreg.AllocationId(i), + }) + } + + r, err := array.Root() + if err != nil { + return err + } + + // Measure ratio of internal / leaf nodes / sizes + dag := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + asc := &amtStatCollector{ + ds: dag, + walk: carWalkFunc, + } + + fmt.Printf("Measuring AMT\n") + seen := cid.NewSet() + if err := merkledag.Walk(ctx, asc.walkLinks, r, seen.Visit, merkledag.Concurrent()); err != nil { + return err + } + + fmt.Printf("%s\n", asc) + + // Overwrite ids with idx % interval: one epoch of market cron + rounds := c.Int("rounds") + interval := c.Int("interval") + + fmt.Printf("Overwrite 1 out of %d values for %d rounds\n", interval, rounds) + array, err = adt.AsArray(store, r, bitwidth) + if err != nil { + return err + } + roots := make([]cid.Cid, rounds) + for j := 0; j < rounds; j++ { + if j%10 == 0 { + fmt.Printf("round: %d\n", j) + } + for i := j; i < 40000000; i += interval { + if i%interval == j { + array.Set(uint64(i), &market.DealState{ + SectorStartEpoch: abi.ChainEpoch(2000000 + i), + LastUpdatedEpoch: abi.ChainEpoch(1), + SlashEpoch: -1, + VerifiedClaim: verifreg.AllocationId(i), + }) + } + } + roots[j], err = array.Root() + if err != nil { + return err + } + + } + + // Measure churn + dag = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + asc = &amtStatCollector{ + ds: dag, + walk: carWalkFunc, + } + + fmt.Printf("Measuring %d rounds of churn\n", rounds) + + for _, r := range roots { + if err := merkledag.Walk(ctx, asc.walkLinks, r, seen.Visit, merkledag.Concurrent()); err != nil { + return err + } + } + + fmt.Printf("%s\n", asc) + return nil + }, +} + var sealBenchCmd = &cli.Command{ Name: "sealing", Usage: "Benchmark seal and winning post and window post", diff --git a/go.mod b/go.mod index ab1f60b6e..fa17e9616 100644 --- a/go.mod +++ b/go.mod @@ -89,6 +89,7 @@ require ( github.com/ipfs/go-fs-lock v0.0.7 github.com/ipfs/go-graphsync v0.14.6 github.com/ipfs/go-ipfs-blocksutil v0.0.1 + github.com/ipfs/go-ipfs-exchange-offline v0.3.0 github.com/ipfs/go-ipld-cbor v0.0.6 github.com/ipfs/go-ipld-format v0.5.0 github.com/ipfs/go-log/v2 v2.5.1 diff --git a/go.sum b/go.sum index 71341203d..c5dc47a98 100644 --- a/go.sum +++ b/go.sum @@ -759,6 +759,7 @@ github.com/ipfs/go-ipfs-exchange-interface v0.2.0/go.mod h1:z6+RhJuDQbqKguVyslSO github.com/ipfs/go-ipfs-exchange-offline v0.0.1/go.mod h1:WhHSFCVYX36H/anEKQboAzpUws3x7UeEGkzQc3iNkM0= github.com/ipfs/go-ipfs-exchange-offline v0.1.1/go.mod h1:vTiBRIbzSwDD0OWm+i3xeT0mO7jG2cbJYatp3HPk5XY= github.com/ipfs/go-ipfs-exchange-offline v0.3.0 h1:c/Dg8GDPzixGd0MC8Jh6mjOwU57uYokgWRFidfvEkuA= +github.com/ipfs/go-ipfs-exchange-offline v0.3.0/go.mod h1:MOdJ9DChbb5u37M1IcbrRB02e++Z7521fMxqCNRrz9s= github.com/ipfs/go-ipfs-files v0.0.3/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjNoE7yA8Y1d4= github.com/ipfs/go-ipfs-files v0.0.4/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjNoE7yA8Y1d4= github.com/ipfs/go-ipfs-files v0.3.0 h1:fallckyc5PYjuMEitPNrjRfpwl7YFt69heCOUhsbGxQ=