Merge remote-tracking branch 'origin/master' into feat/post-worker

This commit is contained in:
Łukasz Magiera 2022-03-11 17:04:58 +01:00
commit 135aef78d7
34 changed files with 1190 additions and 105 deletions

View File

@ -121,7 +121,7 @@ func init() {
addExample(api.FullAPIVersion1)
addExample(api.PCHInbound)
addExample(time.Minute)
addExample(graphsync.RequestID(4))
addExample(graphsync.NewRequestID())
addExample(datatransfer.TransferID(3))
addExample(datatransfer.Ongoing)
addExample(storeIDExample)

View File

@ -58,7 +58,7 @@ type MessageSendSpec struct {
// GraphSyncDataTransfer provides diagnostics on a data transfer happening over graphsync
type GraphSyncDataTransfer struct {
// GraphSync request id for this transfer
RequestID graphsync.RequestID
RequestID *graphsync.RequestID
// Graphsync state for this transfer
RequestState string
// If a channel ID is present, indicates whether this is the current graphsync request for this channel

Binary file not shown.

View File

@ -18,6 +18,10 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)
func (cs *ChainStore) UnionStore() bstore.Blockstore {
return bstore.Union(cs.stateBlockstore, cs.chainBlockstore)
}
func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error {
h := &car.CarHeader{
Roots: ts.Cids(),
@ -28,7 +32,7 @@ func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRo
return xerrors.Errorf("failed to write car header: %s", err)
}
unionBs := bstore.Union(cs.stateBlockstore, cs.chainBlockstore)
unionBs := cs.UnionStore()
return cs.WalkSnapshot(ctx, ts, inclRecentRoots, skipOldMsgs, true, func(c cid.Cid) error {
blk, err := unionBs.Get(ctx, c)
if err != nil {

View File

@ -0,0 +1,342 @@
package main
import (
"bufio"
"encoding/binary"
"errors"
"fmt"
"hash"
"hash/crc32"
"io"
"os"
"strings"
"github.com/dgraph-io/badger/v2/y"
block "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-base32"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
)
var datastoreVlog2CarCmd = &cli.Command{
Name: "vlog2car",
Usage: "convert badger blockstore .vlog to .car",
Flags: []cli.Flag{
&cli.PathFlag{
Name: "vlog",
Usage: "vlog file",
Required: true,
},
&cli.PathFlag{
Name: "car",
Usage: "out car file name (no .car)",
Required: true,
},
&cli.StringFlag{
Name: "key-prefix",
Usage: "datastore prefix",
Value: "/blocks/",
},
},
Action: func(cctx *cli.Context) error {
ctx := cctx.Context
maxSz := uint64(1 << 20)
carb := &rawCarb{
max: maxSz,
blocks: map[cid.Cid]block.Block{},
}
cars := 0
pref := cctx.String("key-prefix")
plen := len(pref)
{
// NOTE: Some bits of code in this code block come from https://github.com/dgraph-io/badger, which is licensed
// under Apache 2.0; See https://github.com/dgraph-io/badger/blob/master/LICENSE
vf, err := os.Open(cctx.Path("vlog"))
if err != nil {
return xerrors.Errorf("open vlog file: %w", err)
}
if _, err := vf.Seek(20, io.SeekStart); err != nil {
return xerrors.Errorf("seek past vlog start: %w", err)
}
reader := bufio.NewReader(vf)
read := &safeRead{
k: make([]byte, 10),
v: make([]byte, 10),
recordOffset: 20,
}
loop:
for {
e, err := read.Entry(reader)
switch {
case err == io.EOF:
break loop
case err == io.ErrUnexpectedEOF || err == errTruncate:
break loop
case err != nil:
return xerrors.Errorf("entry read error: %w", err)
case e == nil:
continue
}
if e.meta&0x40 > 0 {
e.Key = e.Key[:len(e.Key)-8]
} else if e.meta > 0 {
if e.meta&0x3f > 0 {
log.Infof("unk meta m:%x; k:%x, v:%60x", e.meta, e.Key, e.Value)
}
continue
}
{
if plen > 0 && !strings.HasPrefix(string(e.Key), pref) {
log.Infow("no blocks prefix", "key", string(e.Key))
continue
}
h, err := base32.RawStdEncoding.DecodeString(string(e.Key[plen:]))
if err != nil {
return xerrors.Errorf("decode b32 ds key %x: %w", e.Key, err)
}
c := cid.NewCidV1(cid.Raw, h)
b, err := block.NewBlockWithCid(e.Value, c)
if err != nil {
return xerrors.Errorf("readblk: %w", err)
}
err = carb.consume(c, b)
switch err {
case nil:
case errFullCar:
root, err := carb.finalize()
if err != nil {
return xerrors.Errorf("carb finalize: %w", err)
}
if err := carb.writeCar(ctx, fmt.Sprintf("%s%d.car", cctx.Path("car"), cars), root); err != nil {
return xerrors.Errorf("writeCar: %w", err)
}
cars++
carb = &rawCarb{
max: maxSz,
blocks: map[cid.Cid]block.Block{},
}
default:
return xerrors.Errorf("carb consume: %w", err)
}
}
}
if err := vf.Close(); err != nil {
return err
}
}
root, err := carb.finalize()
if err != nil {
return xerrors.Errorf("carb finalize: %w", err)
}
if err := carb.writeCar(ctx, fmt.Sprintf("%s%d.car", cctx.Path("car"), cars), root); err != nil {
return xerrors.Errorf("writeCar: %w", err)
}
return nil
},
}
// NOTE: Code below comes (with slight modifications) from https://github.com/dgraph-io/badger/blob/master/value.go
// Apache 2.0; See https://github.com/dgraph-io/badger/blob/master/LICENSE
var errTruncate = errors.New("do truncate")
// hashReader implements io.Reader, io.ByteReader interfaces. It also keeps track of the number
// bytes read. The hashReader writes to h (hash) what it reads from r.
type hashReader struct {
r io.Reader
h hash.Hash32
bytesRead int // Number of bytes read.
}
func newHashReader(r io.Reader) *hashReader {
hash := crc32.New(y.CastagnoliCrcTable)
return &hashReader{
r: r,
h: hash,
}
}
// Read reads len(p) bytes from the reader. Returns the number of bytes read, error on failure.
func (t *hashReader) Read(p []byte) (int, error) {
n, err := t.r.Read(p)
if err != nil {
return n, err
}
t.bytesRead += n
return t.h.Write(p[:n])
}
// ReadByte reads exactly one byte from the reader. Returns error on failure.
func (t *hashReader) ReadByte() (byte, error) {
b := make([]byte, 1)
_, err := t.Read(b)
return b[0], err
}
// Sum32 returns the sum32 of the underlying hash.
func (t *hashReader) Sum32() uint32 {
return t.h.Sum32()
}
type safeRead struct {
k []byte
v []byte
recordOffset uint32
}
// Entry provides Key, Value, UserMeta and ExpiresAt. This struct can be used by
// the user to set data.
type Entry struct {
Key []byte
Value []byte
UserMeta byte
ExpiresAt uint64 // time.Unix
meta byte
// Fields maintained internally.
offset uint32
hlen int // Length of the header.
}
// Entry reads an entry from the provided reader. It also validates the checksum for every entry
// read. Returns error on failure.
func (r *safeRead) Entry(reader io.Reader) (*Entry, error) {
tee := newHashReader(reader)
var h header
hlen, err := h.DecodeFrom(tee)
if err != nil {
return nil, err
}
if h.klen > uint32(1<<16) { // Key length must be below uint16.
return nil, errTruncate
}
kl := int(h.klen)
if cap(r.k) < kl {
r.k = make([]byte, 2*kl)
}
vl := int(h.vlen)
if cap(r.v) < vl {
r.v = make([]byte, 2*vl)
}
e := &Entry{}
e.offset = r.recordOffset
e.hlen = hlen
buf := make([]byte, h.klen+h.vlen)
if _, err := io.ReadFull(tee, buf[:]); err != nil {
if err == io.EOF {
err = errTruncate
}
return nil, err
}
e.Key = buf[:h.klen]
e.Value = buf[h.klen:]
var crcBuf [crc32.Size]byte
if _, err := io.ReadFull(reader, crcBuf[:]); err != nil {
if err == io.EOF {
err = errTruncate
}
return nil, err
}
crc := y.BytesToU32(crcBuf[:])
if crc != tee.Sum32() {
return nil, errTruncate
}
e.meta = h.meta
e.UserMeta = h.userMeta
e.ExpiresAt = h.expiresAt
return e, nil
}
// header is used in value log as a header before Entry.
type header struct {
klen uint32
vlen uint32
expiresAt uint64
meta byte
userMeta byte
}
// Encode encodes the header into []byte. The provided []byte should be atleast 5 bytes. The
// function will panic if out []byte isn't large enough to hold all the values.
// The encoded header looks like
// +------+----------+------------+--------------+-----------+
// | Meta | UserMeta | Key Length | Value Length | ExpiresAt |
// +------+----------+------------+--------------+-----------+
func (h header) Encode(out []byte) int {
out[0], out[1] = h.meta, h.userMeta
index := 2
index += binary.PutUvarint(out[index:], uint64(h.klen))
index += binary.PutUvarint(out[index:], uint64(h.vlen))
index += binary.PutUvarint(out[index:], h.expiresAt)
return index
}
// Decode decodes the given header from the provided byte slice.
// Returns the number of bytes read.
func (h *header) Decode(buf []byte) int {
h.meta, h.userMeta = buf[0], buf[1]
index := 2
klen, count := binary.Uvarint(buf[index:])
h.klen = uint32(klen)
index += count
vlen, count := binary.Uvarint(buf[index:])
h.vlen = uint32(vlen)
index += count
h.expiresAt, count = binary.Uvarint(buf[index:])
return index + count
}
// DecodeFrom reads the header from the hashReader.
// Returns the number of bytes read.
func (h *header) DecodeFrom(reader *hashReader) (int, error) {
var err error
h.meta, err = reader.ReadByte()
if err != nil {
return 0, err
}
h.userMeta, err = reader.ReadByte()
if err != nil {
return 0, err
}
klen, err := binary.ReadUvarint(reader)
if err != nil {
return 0, err
}
h.klen = uint32(klen)
vlen, err := binary.ReadUvarint(reader)
if err != nil {
return 0, err
}
h.vlen = uint32(vlen)
h.expiresAt, err = binary.ReadUvarint(reader)
if err != nil {
return 0, err
}
return reader.bytesRead, nil
}

View File

@ -32,6 +32,7 @@ var datastoreCmd = &cli.Command{
datastoreListCmd,
datastoreGetCmd,
datastoreRewriteCmd,
datastoreVlog2CarCmd,
},
}

View File

@ -1,19 +1,38 @@
package main
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/pb"
"github.com/dustin/go-humanize"
"github.com/filecoin-project/go-state-types/abi"
block "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
offline "github.com/ipfs/go-ipfs-exchange-offline"
"github.com/ipfs/go-merkledag"
"github.com/ipld/go-car"
"github.com/multiformats/go-base32"
mh "github.com/multiformats/go-multihash"
"github.com/urfave/cli/v2"
"go.uber.org/zap"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/cmd/lotus-shed/shedgen"
"github.com/filecoin-project/lotus/node/repo"
)
@ -39,6 +58,9 @@ var exportChainCmd = &cli.Command{
Name: "skip-old-msgs",
},
},
Subcommands: []*cli.Command{
exportRawCmd,
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return lcli.ShowHelp(cctx, fmt.Errorf("must specify file name to write export to"))
@ -130,3 +152,351 @@ var exportChainCmd = &cli.Command{
return nil
},
}
var exportRawCmd = &cli.Command{
Name: "raw",
Description: "Export raw blocks from repo (requires node to be offline)",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
Value: "~/.lotus",
},
&cli.StringFlag{
Name: "car-size",
Value: "50M",
},
},
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return lcli.ShowHelp(cctx, fmt.Errorf("must specify file name to write export to"))
}
ctx := context.TODO()
r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("opening fs repo: %w", err)
}
exists, err := r.Exists()
if err != nil {
return err
}
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.LockRO(repo.FullNode)
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck
out := cctx.Args().First()
err = os.Mkdir(out, 0755)
if err != nil {
return xerrors.Errorf("creating output dir: %w", err)
}
maxSz, err := humanize.ParseBytes(cctx.String("car-size"))
if err != nil {
return xerrors.Errorf("parse --car-size: %w", err)
}
cars := 0
carb := &rawCarb{
max: maxSz,
blocks: map[cid.Cid]block.Block{},
}
{
consume := func(c cid.Cid, b block.Block) error {
err = carb.consume(c, b)
switch err {
case nil:
case errFullCar:
root, err := carb.finalize()
if err != nil {
return xerrors.Errorf("carb finalize: %w", err)
}
if err := carb.writeCar(ctx, filepath.Join(out, fmt.Sprintf("chain%d.car", cars)), root); err != nil {
return xerrors.Errorf("writeCar: %w", err)
}
cars++
if cars > 10 {
return xerrors.Errorf("enough")
}
carb = &rawCarb{
max: maxSz,
blocks: map[cid.Cid]block.Block{},
}
log.Infow("gc")
go runtime.GC()
default:
return xerrors.Errorf("carb consume: %w", err)
}
return nil
}
{
path := filepath.Join(lr.Path(), "datastore", "chain")
opts, err := repo.BadgerBlockstoreOptions(repo.UniversalBlockstore, path, false)
if err != nil {
return err
}
opts.Logger = &badgerLog{
SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(),
skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(),
}
log.Infow("open db")
db, err := badger.Open(opts.Options)
if err != nil {
return fmt.Errorf("failed to open badger blockstore: %w", err)
}
defer db.Close() // nolint:errcheck
log.Infow("new stream")
var wlk sync.Mutex
str := db.NewStream()
str.NumGo = 16
str.LogPrefix = "bstream"
str.Send = func(list *pb.KVList) (err error) {
defer func() {
if err != nil {
log.Errorw("send error", "err", err)
}
}()
for _, kv := range list.Kv {
if kv.Key == nil || kv.Value == nil {
continue
}
if !strings.HasPrefix(string(kv.Key), "/blocks/") {
log.Infow("no blocks prefix", "key", string(kv.Key))
continue
}
h, err := base32.RawStdEncoding.DecodeString(string(kv.Key[len("/blocks/"):]))
if err != nil {
return xerrors.Errorf("decode b32 ds key %x: %w", kv.Key, err)
}
c := cid.NewCidV1(cid.Raw, h)
b, err := block.NewBlockWithCid(kv.Value, c)
if err != nil {
return xerrors.Errorf("readblk: %w", err)
}
wlk.Lock()
err = consume(c, b)
wlk.Unlock()
if err != nil {
return xerrors.Errorf("consume stream block: %w", err)
}
}
return nil
}
if err := str.Orchestrate(ctx); err != nil {
return xerrors.Errorf("orchestrate stream: %w", err)
}
}
}
log.Infow("write last")
root, err := carb.finalize()
if err != nil {
return xerrors.Errorf("carb finalize: %w", err)
}
if err := carb.writeCar(ctx, filepath.Join(out, fmt.Sprintf("chain%d.car", cars)), root); err != nil {
return xerrors.Errorf("writeCar: %w", err)
}
return nil
},
}
var errFullCar = errors.New("full")
const maxlinks = 16
type rawCarb struct {
blockstore.Blockstore
max, cur uint64
nodes []*shedgen.CarbNode
blocks map[cid.Cid]block.Block
}
func (rc *rawCarb) Has(ctx context.Context, c cid.Cid) (bool, error) {
_, has := rc.blocks[c]
return has, nil
}
func (rc *rawCarb) Get(ctx context.Context, c cid.Cid) (block.Block, error) {
b, has := rc.blocks[c]
if !has {
return nil, blockstore.ErrNotFound
}
return b, nil
}
func (rc *rawCarb) GetSize(ctx context.Context, c cid.Cid) (int, error) {
b, has := rc.blocks[c]
if !has {
return 0, blockstore.ErrNotFound
}
return len(b.RawData()), nil
}
func (rc *rawCarb) checkNodes(maxl int) error {
if len(rc.nodes) == 0 {
log.Infow("add level", "l", 0)
rc.nodes = append(rc.nodes, new(shedgen.CarbNode))
}
for i := 0; i < len(rc.nodes); i++ {
if len(rc.nodes[i].Sub) <= maxl {
break
}
if len(rc.nodes) <= i+1 {
log.Infow("add level", "l", i+1)
rc.nodes = append(rc.nodes, new(shedgen.CarbNode))
}
var bb bytes.Buffer
if err := rc.nodes[i].MarshalCBOR(&bb); err != nil {
return err
}
c, err := cid.Prefix{
Version: 1,
Codec: cid.DagCBOR,
MhType: mh.SHA2_256,
MhLength: -1,
}.Sum(bb.Bytes())
if err != nil {
return xerrors.Errorf("gen cid: %w", err)
}
b, err := block.NewBlockWithCid(bb.Bytes(), c)
if err != nil {
return xerrors.Errorf("new block: %w", err)
}
if i > 1 {
log.Infow("compact", "from", i, "to", i+1, "sub", c.String())
}
rc.nodes[i+1].Sub = append(rc.nodes[i+1].Sub, c)
rc.blocks[c] = b
rc.nodes[i] = new(shedgen.CarbNode)
rc.cur += uint64(bb.Len())
}
return nil
}
func (rc *rawCarb) consume(c cid.Cid, b block.Block) error {
if err := rc.checkNodes(maxlinks); err != nil {
return err
}
if rc.cur+uint64(len(b.RawData())) > rc.max {
return errFullCar
}
rc.cur += uint64(len(b.RawData()))
b, err := block.NewBlockWithCid(b.RawData(), c)
if err != nil {
return xerrors.Errorf("create raw block: %w", err)
}
rc.blocks[c] = b
rc.nodes[0].Sub = append(rc.nodes[0].Sub, c)
return nil
}
func (rc *rawCarb) finalize() (cid.Cid, error) {
if len(rc.nodes) == 0 {
rc.nodes = append(rc.nodes, new(shedgen.CarbNode))
}
for i := 0; i < len(rc.nodes); i++ {
var bb bytes.Buffer
if err := rc.nodes[i].MarshalCBOR(&bb); err != nil {
return cid.Undef, err
}
c, err := cid.Prefix{
Version: 1,
Codec: cid.DagCBOR,
MhType: mh.SHA2_256,
MhLength: -1,
}.Sum(bb.Bytes())
if err != nil {
return cid.Undef, xerrors.Errorf("gen cid: %w", err)
}
b, err := block.NewBlockWithCid(bb.Bytes(), c)
if err != nil {
return cid.Undef, xerrors.Errorf("new block: %w", err)
}
log.Infow("fin", "level", i, "cid", c.String())
rc.blocks[c] = b
rc.nodes[i] = new(shedgen.CarbNode)
rc.cur += uint64(bb.Len())
if len(rc.nodes[i].Sub) <= 1 && i == len(rc.nodes)-1 {
return c, err
}
if len(rc.nodes) <= i+1 {
rc.nodes = append(rc.nodes, new(shedgen.CarbNode))
}
rc.nodes[i+1].Sub = append(rc.nodes[i+1].Sub, c)
}
return cid.Undef, xerrors.Errorf("failed to finalize")
}
func (rc *rawCarb) writeCar(ctx context.Context, path string, root cid.Cid) error {
f, err := os.Create(path)
if err != nil {
return xerrors.Errorf("create out car: %w", err)
}
bs := rc
ds := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
log.Infow("write car", "path", path, "root", root.String(), "blocks", len(rc.blocks))
return car.WriteCar(ctx, ds, []cid.Cid{root}, f)
}
var _ blockstore.Blockstore = &rawCarb{}
type badgerLog struct {
*zap.SugaredLogger
skip2 *zap.SugaredLogger
}
func (b *badgerLog) Warningf(format string, args ...interface{}) {
b.skip2.Warnf(format, args...)
}

104
cmd/lotus-shed/itestd.go Normal file
View File

@ -0,0 +1,104 @@
package main
import (
"bufio"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"github.com/chzyer/readline"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/itests/kit"
)
var itestdCmd = &cli.Command{
Name: "itestd",
Description: "Integration test debug env",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "listen",
Value: "127.0.0.1:5674",
},
},
Action: func(cctx *cli.Context) error {
var nodes []kit.ItestdNotif
m := http.NewServeMux()
m.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
var notif kit.ItestdNotif
if err := json.NewDecoder(r.Body).Decode(&notif); err != nil {
fmt.Printf("!! Decode itest notif: %s\n", err)
return
}
fmt.Printf("%d @%s '%s=%s'\n", len(nodes), notif.TestName, notif.NodeType, notif.Api)
nodes = append(nodes, notif)
})
l, err := net.Listen("tcp", cctx.String("listen"))
if err != nil {
return xerrors.Errorf("net listen: %w", err)
}
s := &httptest.Server{
Listener: l,
Config: &http.Server{Handler: m},
}
s.Start()
fmt.Printf("ITest env:\n\nLOTUS_ITESTD=%s\n\nSay 'sh' to spawn a shell connected to test nodes\n--- waiting for clients\n", s.URL)
cs := readline.NewCancelableStdin(os.Stdin)
go func() {
<-cctx.Done()
cs.Close() // nolint:errcheck
}()
rl := bufio.NewReader(cs)
for {
cmd, _, err := rl.ReadLine()
if err != nil {
return xerrors.Errorf("readline: %w", err)
}
switch string(cmd) {
case "sh":
shell := "/bin/sh"
if os.Getenv("SHELL") != "" {
shell = os.Getenv("SHELL")
}
p := exec.Command(shell, "-i")
p.Env = append(p.Env, os.Environ()...)
lastNodes := map[string]string{}
for _, node := range nodes {
lastNodes[node.NodeType] = node.Api
}
if _, found := lastNodes["MARKETS_API_INFO"]; !found {
lastNodes["MARKETS_API_INFO"] = lastNodes["MINER_API_INFO"]
}
for typ, api := range lastNodes {
p.Env = append(p.Env, fmt.Sprintf("%s=%s", typ, api))
}
p.Stdout = os.Stdout
p.Stderr = os.Stderr
p.Stdin = os.Stdin
if err := p.Start(); err != nil {
return xerrors.Errorf("start shell: %w", err)
}
if err := p.Wait(); err != nil {
fmt.Printf("wait for shell: %s\n", err)
}
fmt.Println("\n--- shell quit")
default:
fmt.Println("!! Unknown command")
}
}
},
}

View File

@ -69,6 +69,7 @@ func main() {
terminationsCmd,
migrationsCmd,
diffCmd,
itestdCmd,
}
app := &cli.App{

View File

@ -0,0 +1,128 @@
// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT.
package shedgen
import (
"fmt"
"io"
"math"
"sort"
cid "github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)
var _ = xerrors.Errorf
var _ = cid.Undef
var _ = math.E
var _ = sort.Sort
func (t *CarbNode) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{161}); err != nil {
return err
}
scratch := make([]byte, 9)
// t.Sub ([]cid.Cid) (slice)
if len("Sub") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Sub\" was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Sub"))); err != nil {
return err
}
if _, err := io.WriteString(w, string("Sub")); err != nil {
return err
}
if len(t.Sub) > cbg.MaxLength {
return xerrors.Errorf("Slice value in field t.Sub was too long")
}
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.Sub))); err != nil {
return err
}
for _, v := range t.Sub {
if err := cbg.WriteCidBuf(scratch, w, v); err != nil {
return xerrors.Errorf("failed writing cid field t.Sub: %w", err)
}
}
return nil
}
func (t *CarbNode) UnmarshalCBOR(r io.Reader) error {
*t = CarbNode{}
br := cbg.GetPeeker(r)
scratch := make([]byte, 8)
maj, extra, err := cbg.CborReadHeaderBuf(br, scratch)
if err != nil {
return err
}
if maj != cbg.MajMap {
return fmt.Errorf("cbor input should be of type map")
}
if extra > cbg.MaxLength {
return fmt.Errorf("CarbNode: map struct too large (%d)", extra)
}
var name string
n := extra
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadStringBuf(br, scratch)
if err != nil {
return err
}
name = string(sval)
}
switch name {
// t.Sub ([]cid.Cid) (slice)
case "Sub":
maj, extra, err = cbg.CborReadHeaderBuf(br, scratch)
if err != nil {
return err
}
if extra > cbg.MaxLength {
return fmt.Errorf("t.Sub: array too large (%d)", extra)
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.Sub = make([]cid.Cid, extra)
}
for i := 0; i < int(extra); i++ {
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("reading cid field t.Sub failed: %w", err)
}
t.Sub[i] = c
}
default:
// Field doesn't exist on this type, so ignore it
cbg.ScanForLinks(r, func(cid.Cid) {})
}
}
return nil
}

View File

@ -0,0 +1,7 @@
package shedgen
import "github.com/ipfs/go-cid"
type CarbNode struct {
Sub []cid.Cid
}

View File

@ -939,7 +939,7 @@ Response:
{
"ReceivingTransfers": [
{
"RequestID": 4,
"RequestID": {},
"RequestState": "string value",
"IsCurrentChannelRequest": true,
"ChannelID": {
@ -983,7 +983,7 @@ Response:
],
"SendingTransfers": [
{
"RequestID": 4,
"RequestID": {},
"RequestState": "string value",
"IsCurrentChannelRequest": true,
"ChannelID": {

View File

@ -23,6 +23,12 @@
#DisableMetadataLog = false
[Logging]
[Logging.SubsystemLevels]
# env var: LOTUS_LOGGING_SUBSYSTEMLEVELS_EXAMPLE-SUBSYSTEM
#example-subsystem = "INFO"
[Libp2p]
# Binding address for the libp2p host - 0 means random port.
# Format: multiaddress; see https://multiformats.io/multiaddr/

View File

@ -23,6 +23,12 @@
#DisableMetadataLog = false
[Logging]
[Logging.SubsystemLevels]
# env var: LOTUS_LOGGING_SUBSYSTEMLEVELS_EXAMPLE-SUBSYSTEM
#example-subsystem = "INFO"
[Libp2p]
# Binding address for the libp2p host - 0 means random port.
# Format: multiaddress; see https://multiformats.io/multiaddr/

View File

@ -434,11 +434,19 @@ func (mgr *SectorMgr) GenerateWindowPoStWithVanilla(ctx context.Context, proofTy
}
func (mgr *SectorMgr) ReadPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error) {
if uint64(offset) != 0 {
panic("implme")
off := storiface.UnpaddedByteIndex(0)
var piece cid.Cid
for _, c := range mgr.sectors[sector.ID].pieces {
piece = c
if off >= offset {
break
}
off += storiface.UnpaddedByteIndex(len(mgr.pieces[piece]))
}
br := bytes.NewReader(mgr.pieces[mgr.sectors[sector.ID].pieces[0]][:size])
if off > offset {
panic("non-aligned offset todo")
}
br := bytes.NewReader(mgr.pieces[piece][:size])
return struct {
io.ReadCloser

View File

@ -166,7 +166,7 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef,
r, err := p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size)
log.Debugf("result of first tryReadUnsealedPiece: r=%+v, err=%s", r, err)
log.Debugf("result of first tryReadUnsealedPiece: r=%s, err=%s", r, err)
if xerrors.Is(err, storiface.ErrSectorNotFound) {
log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size)

View File

@ -315,25 +315,21 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
m.inputLk.Unlock()
// we already have a pre-existing add piece call for this deal, let's wait for it to finish and see if it's successful
for {
res, err := waitAddPieceResp(ctx, pp)
if err != nil {
return api.SectorOffset{}, err
}
// there was an error waiting for a pre-existing add piece call, let's retry
if res.err != nil {
m.inputLk.Lock()
pp = m.addPendingPiece(ctx, size, data, deal, sp)
m.inputLk.Unlock()
continue
}
res, err := waitAddPieceResp(ctx, pp)
if err != nil {
return api.SectorOffset{}, err
}
if res.err == nil {
// all good, return the response
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
}
// if there was an error waiting for a pre-existing add piece call, let's retry
m.inputLk.Lock()
}
// addPendingPiece takes over m.inputLk
pp := m.addPendingPiece(ctx, size, data, deal, sp)
m.inputLk.Unlock()
res, err := waitAddPieceResp(ctx, pp)
if err != nil {
return api.SectorOffset{}, err
@ -341,6 +337,7 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
}
// called with m.inputLk; transfers the lock to another goroutine!
func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storage.Data, deal api.PieceDealInfo, sp abi.RegisteredSealProof) *pendingPiece {
doneCh := make(chan struct{})
pp := &pendingPiece{
@ -357,6 +354,7 @@ func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSiz
m.pendingPieces[proposalCID(deal)] = pp
go func() {
defer m.inputLk.Unlock()
if err := m.updateInput(ctx, sp); err != nil {
log.Errorf("%+v", err)
}

View File

@ -10,6 +10,7 @@ import (
"github.com/filecoin-project/lotus/chain/exchange"
"github.com/filecoin-project/lotus/chain/market"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/cmd/lotus-shed/shedgen"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/node/hello"
@ -106,4 +107,11 @@ func main() {
fmt.Println(err)
os.Exit(1)
}
err = gen.WriteMapEncodersToFile("./cmd/lotus-shed/shedgen/cbor_gen.go", "shedgen",
shedgen.CarbNode{},
)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
}

12
go.mod
View File

@ -35,7 +35,7 @@ require (
github.com/filecoin-project/go-cbor-util v0.0.1
github.com/filecoin-project/go-commp-utils v0.1.3
github.com/filecoin-project/go-crypto v0.0.1
github.com/filecoin-project/go-data-transfer v1.14.0
github.com/filecoin-project/go-data-transfer v1.15.0
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.20.1
@ -82,7 +82,7 @@ require (
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-ds-measure v0.2.0
github.com/ipfs/go-fs-lock v0.0.7
github.com/ipfs/go-graphsync v0.12.0
github.com/ipfs/go-graphsync v0.13.0
github.com/ipfs/go-ipfs-blockstore v1.1.2
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
@ -101,18 +101,18 @@ require (
github.com/ipfs/go-metrics-interface v0.0.1
github.com/ipfs/go-metrics-prometheus v0.0.2
github.com/ipfs/go-unixfs v0.3.1
github.com/ipfs/go-unixfsnode v1.2.0
github.com/ipfs/go-unixfsnode v1.4.0
github.com/ipfs/interface-go-ipfs-core v0.5.2
github.com/ipld/go-car v0.3.3
github.com/ipld/go-car/v2 v2.1.1
github.com/ipld/go-codec-dagpb v1.3.0
github.com/ipld/go-ipld-prime v0.14.4
github.com/ipld/go-ipld-prime v0.16.0
github.com/ipld/go-ipld-selector-text-lite v0.0.1
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/kelseyhightower/envconfig v1.4.0
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-eventbus v0.2.1
github.com/libp2p/go-libp2p v0.18.0-rc5
github.com/libp2p/go-libp2p v0.18.0-rc6
github.com/libp2p/go-libp2p-connmgr v0.3.1 // indirect
github.com/libp2p/go-libp2p-core v0.14.0
github.com/libp2p/go-libp2p-discovery v0.6.0
@ -122,7 +122,7 @@ require (
github.com/libp2p/go-libp2p-pubsub v0.6.1
github.com/libp2p/go-libp2p-quic-transport v0.16.1
github.com/libp2p/go-libp2p-record v0.1.3
github.com/libp2p/go-libp2p-resource-manager v0.1.4
github.com/libp2p/go-libp2p-resource-manager v0.1.5
github.com/libp2p/go-libp2p-routing-helpers v0.2.3
github.com/libp2p/go-libp2p-swarm v0.10.2
github.com/libp2p/go-libp2p-tls v0.3.1

35
go.sum
View File

@ -324,8 +324,9 @@ github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod
github.com/filecoin-project/go-crypto v0.0.1 h1:AcvpSGGCgjaY8y1az6AMfKQWreF/pWO2JJGLl6gCq6o=
github.com/filecoin-project/go-crypto v0.0.1/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-dagaggregator-unixfs v0.2.0/go.mod h1:WTuJWgBQY0omnQqa8kRPT9O0Uj5wQOgslVMUuTeHdJ8=
github.com/filecoin-project/go-data-transfer v1.14.0 h1:4pnfJk8FYtqcdAg+QRGzaz57seUC/Tz+HJgPuGB7zdg=
github.com/filecoin-project/go-data-transfer v1.14.0/go.mod h1:wNJKhaLLYBJDM3VFvgvYi4iUjPa69pz/1Q5Q4HzX2wE=
github.com/filecoin-project/go-data-transfer v1.15.0 h1:gVH7MxEgoj/qXPz+S6ggFlHlDv1mLlRZuJtTvcq8r1o=
github.com/filecoin-project/go-data-transfer v1.15.0/go.mod h1:RaJIYjh6x6z+FXKNvUULOdUZdN+JutKigfcMMbfykWA=
github.com/filecoin-project/go-ds-versioning v0.0.0-20211206185234-508abd7c2aff/go.mod h1:C9/l9PnB1+mwPa26BBVpCjG/XQCB0yj/q5CK2J8X1I4=
github.com/filecoin-project/go-ds-versioning v0.1.1 h1:JiyBqaQlwC+UM0WhcBtVEeT3XrX59mQhT8U3p7nu86o=
github.com/filecoin-project/go-ds-versioning v0.1.1/go.mod h1:C9/l9PnB1+mwPa26BBVpCjG/XQCB0yj/q5CK2J8X1I4=
@ -410,8 +411,9 @@ github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVB
github.com/franela/goblin v0.0.0-20210519012713-85d372ac71e2/go.mod h1:VzmDKDJVZI3aJmnRI9VjAn9nJ8qPPsN1fqzr9dqInIo=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss=
github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og=
github.com/frankban/quicktest v1.14.2 h1:SPb1KFFmM+ybpEjPUhCCkZOM5xlovT5UbrMvWnXyBns=
github.com/frankban/quicktest v1.14.2/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
@ -551,8 +553,9 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
@ -754,8 +757,9 @@ github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28
github.com/ipfs/go-fs-lock v0.0.7 h1:6BR3dajORFrFTkb5EpCUFIAypsoxpGpDSVUdFwzgL9U=
github.com/ipfs/go-fs-lock v0.0.7/go.mod h1:Js8ka+FNYmgQRLrRXzU3CB/+Csr1BwrRilEcvYrHhhc=
github.com/ipfs/go-graphsync v0.11.0/go.mod h1:wC+c8vGVjAHthsVIl8LKr37cUra2GOaMYcQNNmMxDqE=
github.com/ipfs/go-graphsync v0.12.0 h1:QCsVHVzb9FTkcm3NEa8GjXnUeGit1L9s08HcSVQ4m/g=
github.com/ipfs/go-graphsync v0.12.0/go.mod h1:nASYWYETgsnMbQ3+DirNImOHQ8TY0a5AhAqyOY55tUg=
github.com/ipfs/go-graphsync v0.13.0 h1:8reYjVKxKocJ9jD471xs9XNuegquPrnBFuGZmCqT8zU=
github.com/ipfs/go-graphsync v0.13.0/go.mod h1:oPBU9JGNlyWHyH9lWYmyl19M++5yiXjBnNC4boh5nVU=
github.com/ipfs/go-ipfs v0.11.0/go.mod h1:g68Thu2Ho11AWoHsN34P5fSK7iA6OWWRy3T/g8HLixc=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
@ -886,8 +890,9 @@ github.com/ipfs/go-unixfs v0.3.1 h1:LrfED0OGfG98ZEegO4/xiprx2O+yS+krCMQSp7zLVv8=
github.com/ipfs/go-unixfs v0.3.1/go.mod h1:h4qfQYzghiIc8ZNFKiLMFWOTzrWIAtzYQ59W/pCFf1o=
github.com/ipfs/go-unixfsnode v1.1.2/go.mod h1:5dcE2x03pyjHk4JjamXmunTMzz+VUtqvPwZjIEkfV6s=
github.com/ipfs/go-unixfsnode v1.1.3/go.mod h1:ZZxUM5wXBC+G0Co9FjrYTOm+UlhZTjxLfRYdWY9veZ4=
github.com/ipfs/go-unixfsnode v1.2.0 h1:tHHBJftsJyHGa8bS62PpkYNqHy/Sug3c/vxxC8NaGQY=
github.com/ipfs/go-unixfsnode v1.2.0/go.mod h1:mQEgLjxkV/1mohkC4p7taRRBYPBeXu97SA3YaerT2q0=
github.com/ipfs/go-unixfsnode v1.4.0 h1:9BUxHBXrbNi8mWHc6j+5C580WJqtVw9uoeEKn4tMhwA=
github.com/ipfs/go-unixfsnode v1.4.0/go.mod h1:qc7YFFZ8tABc58p62HnIYbUMwj9chhUuFWmxSokfePo=
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
github.com/ipfs/interface-go-ipfs-core v0.4.0/go.mod h1:UJBcU6iNennuI05amq3FQ7g0JHUkibHFAfhfUIy927o=
@ -918,8 +923,9 @@ github.com/ipld/go-ipld-prime v0.14.0/go.mod h1:9ASQLwUFLptCov6lIYc70GRB4V7UTyLD
github.com/ipld/go-ipld-prime v0.14.1/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0=
github.com/ipld/go-ipld-prime v0.14.2/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0=
github.com/ipld/go-ipld-prime v0.14.3-0.20211207234443-319145880958/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0=
github.com/ipld/go-ipld-prime v0.14.4 h1:bqhmume8+nbNsX4/+J6eohktfZHAI8GKrF3rQ0xgOyc=
github.com/ipld/go-ipld-prime v0.14.4/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0=
github.com/ipld/go-ipld-prime v0.16.0 h1:RS5hhjB/mcpeEPJvfyj0qbOj/QL+/j05heZ0qa97dVo=
github.com/ipld/go-ipld-prime v0.16.0/go.mod h1:axSCuOCBPqrH+gvXr2w9uAOulJqBPhHPT2PjoiiU1qA=
github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5/go.mod h1:gcvzoEDBjwycpXt3LBE061wT9f46szXGHAmj9uoP6fU=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73 h1:TsyATB2ZRRQGTwafJdgEUQkmjOExRV0DNokcihZxbnQ=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73/go.mod h1:2PJ0JgxyB08t0b2WKrcuqI3di0V+5n6RS/LTUJhkoxY=
@ -1060,8 +1066,8 @@ github.com/libp2p/go-libp2p v0.16.0/go.mod h1:ump42BsirwAWxKzsCiFnTtN1Yc+DuPu76f
github.com/libp2p/go-libp2p v0.17.0/go.mod h1:Fkin50rsGdv5mm5BshBUtPRZknt9esfmYXBOYcwOTgw=
github.com/libp2p/go-libp2p v0.18.0-rc1/go.mod h1:RgYlH7IIWHXREimC92bw5Lg1V2R5XmSzuLHb5fTnr+8=
github.com/libp2p/go-libp2p v0.18.0-rc3/go.mod h1:WYL+Xw1iuwi6rdfzw5VIEpD+HqzYucHZ6fcUuumbI3M=
github.com/libp2p/go-libp2p v0.18.0-rc5 h1:88wWDHb9nNo0vBNCupLde3OTnFAkugOCNkrDfl3ivK4=
github.com/libp2p/go-libp2p v0.18.0-rc5/go.mod h1:aZPS5l84bDvCvP4jkyEUT/J6YOpUq33Fgqrs3K59mpI=
github.com/libp2p/go-libp2p v0.18.0-rc6 h1:IR6TVPYGo1wDY0tY61gyPQVxK1koOkXh49ejVfAnH7A=
github.com/libp2p/go-libp2p v0.18.0-rc6/go.mod h1:oOUOAlBrm1L0+jxT10h2TMUMTDz6pV3EvmkJ3beDYGQ=
github.com/libp2p/go-libp2p-asn-util v0.0.0-20200825225859-85005c6cf052/go.mod h1:nRMRTab+kZuk0LnKZpxhOVH/ndsdr2Nr//Zltc/vwgo=
github.com/libp2p/go-libp2p-asn-util v0.1.0 h1:rABPCO77SjdbJ/eJ/ynIo8vWICy1VEnL5JAxJbQLo1E=
github.com/libp2p/go-libp2p-asn-util v0.1.0/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I=
@ -1173,8 +1179,9 @@ github.com/libp2p/go-libp2p-mplex v0.2.3/go.mod h1:CK3p2+9qH9x+7ER/gWWDYJ3QW5ZxW
github.com/libp2p/go-libp2p-mplex v0.3.0/go.mod h1:l9QWxRbbb5/hQMECEb908GbS9Sm2UAR2KFZKUJEynEs=
github.com/libp2p/go-libp2p-mplex v0.4.0/go.mod h1:yCyWJE2sc6TBTnFpjvLuEJgTSw/u+MamvzILKdX7asw=
github.com/libp2p/go-libp2p-mplex v0.4.1/go.mod h1:cmy+3GfqfM1PceHTLL7zQzAAYaryDu6iPSC+CIb094g=
github.com/libp2p/go-libp2p-mplex v0.5.0 h1:vt3k4E4HSND9XH4Z8rUpacPJFSAgLOv6HDvG8W9Ks9E=
github.com/libp2p/go-libp2p-mplex v0.5.0/go.mod h1:eLImPJLkj3iG5t5lq68w3Vm5NAQ5BcKwrrb2VmOYb3M=
github.com/libp2p/go-libp2p-mplex v0.6.0 h1:5ubK4/vLE2JkogKlJ2JLeXcSfA6qY6mE2HMJV9ve/Sk=
github.com/libp2p/go-libp2p-mplex v0.6.0/go.mod h1:i3usuPrBbh9FD2fLZjGpotyNkwr42KStYZQY7BeTiu4=
github.com/libp2p/go-libp2p-nat v0.0.4/go.mod h1:N9Js/zVtAXqaeT99cXgTV9e75KpnWCvVOiGzlcHmBbY=
github.com/libp2p/go-libp2p-nat v0.0.5/go.mod h1:1qubaE5bTZMJE+E/uu2URroMbzdubFz1ChgiN79yKPE=
github.com/libp2p/go-libp2p-nat v0.0.6/go.mod h1:iV59LVhB3IkFvS6S6sauVTSOrNEANnINbI/fkaLimiw=
@ -1232,8 +1239,8 @@ github.com/libp2p/go-libp2p-record v0.1.3 h1:R27hoScIhQf/A8XJZ8lYpnqh9LatJ5YbHs2
github.com/libp2p/go-libp2p-record v0.1.3/go.mod h1:yNUff/adKIfPnYQXgp6FQmNu3gLJ6EMg7+/vv2+9pY4=
github.com/libp2p/go-libp2p-resource-manager v0.1.0/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y=
github.com/libp2p/go-libp2p-resource-manager v0.1.3/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y=
github.com/libp2p/go-libp2p-resource-manager v0.1.4 h1:RcxMD0pytOUimx3BqTVs6IqItb3H5Qg44SD7XyT68lw=
github.com/libp2p/go-libp2p-resource-manager v0.1.4/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y=
github.com/libp2p/go-libp2p-resource-manager v0.1.5 h1:7J6t9KLFS0MxXDTfqA6rwfVCZl/yLQnXW5LpZjHAANI=
github.com/libp2p/go-libp2p-resource-manager v0.1.5/go.mod h1:wJPNjeE4XQlxeidwqVY5G6DLOKqFK33u2n8blpl0I6Y=
github.com/libp2p/go-libp2p-routing v0.0.1/go.mod h1:N51q3yTr4Zdr7V8Jt2JIktVU+3xBBylx1MZeVA6t1Ys=
github.com/libp2p/go-libp2p-routing v0.1.0/go.mod h1:zfLhI1RI8RLEzmEaaPwzonRvXeeSHddONWkcTcB54nE=
github.com/libp2p/go-libp2p-routing-helpers v0.2.3 h1:xY61alxJ6PurSi+MXbywZpelvuU4U4p/gPTxjqCqTzY=
@ -1272,8 +1279,9 @@ github.com/libp2p/go-libp2p-testing v0.4.0/go.mod h1:Q+PFXYoiYFN5CAEG2w3gLPEzotl
github.com/libp2p/go-libp2p-testing v0.4.2/go.mod h1:Q+PFXYoiYFN5CAEG2w3gLPEzotlKsNSbKQ/lImlOWF0=
github.com/libp2p/go-libp2p-testing v0.5.0/go.mod h1:QBk8fqIL1XNcno/l3/hhaIEn4aLRijpYOR+zVjjlh+A=
github.com/libp2p/go-libp2p-testing v0.6.0/go.mod h1:QBk8fqIL1XNcno/l3/hhaIEn4aLRijpYOR+zVjjlh+A=
github.com/libp2p/go-libp2p-testing v0.7.0 h1:9bfyhNINizxuLrKsenzGaZalXRXIaAEmx1BP/PzF1gM=
github.com/libp2p/go-libp2p-testing v0.7.0/go.mod h1:OLbdn9DbgdMwv00v+tlp1l3oe2Cl+FAjoWIA2pa0X6E=
github.com/libp2p/go-libp2p-testing v0.8.0 h1:/te8SOIyj5sGH5Jr1Uoo+qYB00aK8O4+yHGzLgfE3kc=
github.com/libp2p/go-libp2p-testing v0.8.0/go.mod h1:gRdsNxQSxAZowTgcLY7CC33xPmleZzoBpqSYbWenqPc=
github.com/libp2p/go-libp2p-tls v0.1.3/go.mod h1:wZfuewxOndz5RTnCAxFliGjvYSDA40sKitV4c50uI1M=
github.com/libp2p/go-libp2p-tls v0.3.0/go.mod h1:fwF5X6PWGxm6IDRwF3V8AVCCj/hOd5oFlg+wo2FxJDY=
github.com/libp2p/go-libp2p-tls v0.3.1 h1:lsE2zYte+rZCEOHF72J1Fg3XK3dGQyKvI6i5ehJfEp0=
@ -1325,8 +1333,9 @@ github.com/libp2p/go-mplex v0.1.1/go.mod h1:Xgz2RDCi3co0LeZfgjm4OgUF15+sVR8SRcu3
github.com/libp2p/go-mplex v0.1.2/go.mod h1:Xgz2RDCi3co0LeZfgjm4OgUF15+sVR8SRcu3SFXI1lk=
github.com/libp2p/go-mplex v0.2.0/go.mod h1:0Oy/A9PQlwBytDRp4wSkFnzHYDKcpLot35JQ6msjvYQ=
github.com/libp2p/go-mplex v0.3.0/go.mod h1:0Oy/A9PQlwBytDRp4wSkFnzHYDKcpLot35JQ6msjvYQ=
github.com/libp2p/go-mplex v0.4.0 h1:Ukkez9/4EOX5rTw4sHefNJp10dksftAA05ZgyjplUbM=
github.com/libp2p/go-mplex v0.4.0/go.mod h1:y26Lx+wNVtMYMaPu300Cbot5LkEZ4tJaNYeHeT9dh6E=
github.com/libp2p/go-mplex v0.6.0 h1:5kKp029zrsLVJT5q6ASt4LwuZFxj3B13wXXaGmFrWg0=
github.com/libp2p/go-mplex v0.6.0/go.mod h1:y26Lx+wNVtMYMaPu300Cbot5LkEZ4tJaNYeHeT9dh6E=
github.com/libp2p/go-msgio v0.0.2/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-msgio v0.0.3/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=

View File

@ -9,16 +9,18 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/markets/storageadapter"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/stretchr/testify/require"
)
func TestBatchDealInput(t *testing.T) {
t.Skip("this test is disabled as it's flaky: #4611")
kit.QuietMiningLogs()
var (
@ -47,17 +49,20 @@ func TestBatchDealInput(t *testing.T) {
})),
node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
return sealiface.Config{
MaxWaitDealsSectors: 2,
MaxSealingSectors: 1,
MaxSealingSectorsForDeals: 3,
AlwaysKeepUnsealedCopy: true,
WaitDealsDelay: time.Hour,
}, nil
sc := modules.ToSealingConfig(config.DefaultStorageMiner())
sc.MaxWaitDealsSectors = 2
sc.MaxSealingSectors = 1
sc.MaxSealingSectorsForDeals = 3
sc.AlwaysKeepUnsealedCopy = true
sc.WaitDealsDelay = time.Hour
sc.BatchPreCommits = false
sc.AggregateCommits = false
return sc, nil
}, nil
}),
))
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), opts)
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), opts, kit.ThroughRPC())
ens.InterconnectAll().BeginMining(blockTime)
dh := kit.NewDealHarness(t, client, miner, miner)
@ -126,9 +131,9 @@ func TestBatchDealInput(t *testing.T) {
t.Run("4-p513B", run(513, 4, 2))
if !testing.Short() {
t.Run("32-p257B", run(257, 32, 8))
t.Run("32-p10B", run(10, 32, 2))
// fixme: this appears to break data-transfer / markets in some really creative ways
//t.Run("32-p10B", run(10, 32, 2))
// t.Run("128-p10B", run(10, 128, 8))
}
}

36
itests/kit/itestd.go Normal file
View File

@ -0,0 +1,36 @@
package kit
import (
"bytes"
"encoding/json"
"net/http"
"os"
)
type ItestdNotif struct {
NodeType string // api env var name
TestName string
Api string
}
func sendItestdNotif(nodeType, testName, apiAddr string) {
td := os.Getenv("LOTUS_ITESTD")
if td == "" {
// not running
return
}
notif := ItestdNotif{
NodeType: nodeType,
TestName: testName,
Api: apiAddr,
}
nb, err := json.Marshal(&notif)
if err != nil {
return
}
if _, err := http.Post(td, "application/json", bytes.NewReader(nb)); err != nil { // nolint:gosec
return
}
}

View File

@ -43,6 +43,7 @@ func fullRpc(t *testing.T, f *TestFullNode) *TestFullNode {
srv, maddr := CreateRPCServer(t, handler, l)
fmt.Printf("FULLNODE RPC ENV FOR CLI DEBUGGING `export FULLNODE_API_INFO=%s`\n", "ws://"+srv.Listener.Addr().String())
sendItestdNotif("FULLNODE_API_INFO", t.Name(), "ws://"+srv.Listener.Addr().String())
cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil)
require.NoError(t, err)
@ -60,6 +61,7 @@ func minerRpc(t *testing.T, m *TestMiner) *TestMiner {
fmt.Printf("creating RPC server for %s at %s\n", m.ActorAddr, srv.Listener.Addr().String())
fmt.Printf("SP RPC ENV FOR CLI DEBUGGING `export MINER_API_INFO=%s`\n", "ws://"+srv.Listener.Addr().String())
sendItestdNotif("MINER_API_INFO", t.Name(), "ws://"+srv.Listener.Addr().String())
url := "ws://" + srv.Listener.Addr().String() + "/rpc/v0"
cl, stop, err := client.NewStorageMinerRPCV0(context.Background(), url, nil)

View File

@ -17,6 +17,8 @@ import (
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)
@ -40,29 +42,31 @@ func TestMinerBalanceCollateral(t *testing.T) {
opts := kit.ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
return sealiface.Config{
MaxWaitDealsSectors: 4,
MaxSealingSectors: 4,
MaxSealingSectorsForDeals: 4,
AlwaysKeepUnsealedCopy: true,
WaitDealsDelay: time.Hour,
sc := modules.ToSealingConfig(config.DefaultStorageMiner())
BatchPreCommits: batching,
AggregateCommits: batching,
sc.MaxWaitDealsSectors = 4
sc.MaxSealingSectors = 4
sc.MaxSealingSectorsForDeals = 4
sc.AlwaysKeepUnsealedCopy = true
sc.WaitDealsDelay = time.Hour
PreCommitBatchWait: time.Hour,
CommitBatchWait: time.Hour,
sc.BatchPreCommits = batching
sc.AggregateCommits = batching
MinCommitBatch: nSectors,
MaxPreCommitBatch: nSectors,
MaxCommitBatch: nSectors,
sc.PreCommitBatchWait = time.Hour
sc.CommitBatchWait = time.Hour
CollateralFromMinerBalance: enabled,
AvailableBalanceBuffer: big.Zero(),
DisableCollateralFallback: false,
AggregateAboveBaseFee: big.Zero(),
BatchPreCommitAboveBaseFee: big.Zero(),
}, nil
sc.MinCommitBatch = nSectors
sc.MaxPreCommitBatch = nSectors
sc.MaxCommitBatch = nSectors
sc.CollateralFromMinerBalance = enabled
sc.AvailableBalanceBuffer = big.Zero()
sc.DisableCollateralFallback = false
sc.AggregateAboveBaseFee = big.Zero()
sc.BatchPreCommitAboveBaseFee = big.Zero()
return sc, nil
}, nil
})),
)

11
lib/lotuslog/config.go Normal file
View File

@ -0,0 +1,11 @@
package lotuslog
import logging "github.com/ipfs/go-log/v2"
func SetLevelsFromConfig(l map[string]string) {
for sys, level := range l {
if err := logging.SetLogLevel(sys, level); err != nil {
continue
}
}
}

View File

@ -33,6 +33,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/journal/alerting"
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/lib/peermgr"
_ "github.com/filecoin-project/lotus/lib/sigs/bls"
_ "github.com/filecoin-project/lotus/lib/sigs/secp"
@ -249,6 +250,9 @@ func Base() Option {
// Config sets up constructors based on the provided Config
func ConfigCommon(cfg *config.Common, enableLibp2pNode bool) Option {
// setup logging early
lotuslog.SetLevelsFromConfig(cfg.Logging.SubsystemLevels)
return Options(
func(s *Settings) error { s.Config = true; return nil },
Override(new(dtypes.APIEndpoint), func() (dtypes.APIEndpoint, error) {

View File

@ -47,6 +47,11 @@ func defCommon() Common {
ListenAddress: "/ip4/127.0.0.1/tcp/1234/http",
Timeout: Duration(30 * time.Second),
},
Logging: Logging{
SubsystemLevels: map[string]string{
"example-subsystem": "INFO",
},
},
Libp2p: Libp2p{
ListenAddresses: []string{
"/ip4/0.0.0.0/tcp/0",

View File

@ -32,6 +32,24 @@ func TestDefaultFullNodeRoundtrip(t *testing.T) {
require.True(t, reflect.DeepEqual(c, c2))
}
func TestDefaultFullNodeCommentRoundtrip(t *testing.T) {
c := DefaultFullNode()
var s string
{
c, err := ConfigComment(DefaultFullNode())
require.NoError(t, err)
s = string(c)
}
c2, err := FromReader(strings.NewReader(s), DefaultFullNode())
require.NoError(t, err)
fmt.Println(s)
require.True(t, reflect.DeepEqual(c, c2))
}
func TestDefaultMinerRoundtrip(t *testing.T) {
c := DefaultStorageMiner()

View File

@ -127,6 +127,12 @@ of automatically performing on-chain operations.`,
Comment: ``,
},
{
Name: "Logging",
Type: "Logging",
Comment: ``,
},
{
Name: "Libp2p",
Type: "Libp2p",
@ -484,6 +490,14 @@ count towards this limit.`,
closed by the connection manager.`,
},
},
"Logging": []DocField{
{
Name: "SubsystemLevels",
Type: "map[string]string",
Comment: `SubsystemLevels specify per-subsystem log levels`,
},
},
"MinerAddressConfig": []DocField{
{
Name: "PreCommitControl",

View File

@ -16,7 +16,7 @@ func findDoc(root interface{}, section, name string) *DocField {
return findDocSect("Common", section, name)
}
func findDocSect(root string, section, name string) *DocField {
func findDocSect(root, section, name string) *DocField {
path := strings.Split(section, ".")
docSection := Doc[root]

View File

@ -69,7 +69,7 @@ func ConfigUpdate(cfgCur, cfgDef interface{}, comment bool) ([]byte, error) {
}
if comment {
// create a map of default lines so we can comment those out later
// create a map of default lines, so we can comment those out later
defLines := strings.Split(defStr, "\n")
defaults := map[string]struct{}{}
for i := range defLines {

View File

@ -13,10 +13,11 @@ import (
// Common is common config between full node and miner
type Common struct {
API API
Backup Backup
Libp2p Libp2p
Pubsub Pubsub
API API
Backup Backup
Logging Logging
Libp2p Libp2p
Pubsub Pubsub
}
// FullNode is a full node config
@ -39,6 +40,12 @@ type Backup struct {
DisableMetadataLog bool
}
// Logging is the logging system config
type Logging struct {
// SubsystemLevels specify per-subsystem log levels
SubsystemLevels map[string]string
}
// StorageMiner is a miner config
type StorageMiner struct {
Common

View File

@ -47,7 +47,6 @@ import (
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-commp-utils/ffiwrapper"
"github.com/filecoin-project/go-commp-utils/writer"
datatransfer "github.com/filecoin-project/go-data-transfer"
@ -1263,28 +1262,12 @@ func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Addre
}
func (a *API) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet, error) {
// Hard-code the sector type to 32GiBV1_1, because:
// - ffiwrapper.GeneratePieceCIDFromFile requires a RegisteredSealProof
// - commP itself is sector-size independent, with rather low probability of that changing
// ( note how the final rust call is identical for every RegSP type )
// https://github.com/filecoin-project/rust-filecoin-proofs-api/blob/v5.0.0/src/seal.rs#L1040-L1050
//
// IF/WHEN this changes in the future we will have to be able to calculate
// "old style" commP, and thus will need to introduce a version switch or similar
arbitraryProofType := abi.RegisteredSealProof_StackedDrg64GiBV1_1
rdr, err := os.Open(inpath)
if err != nil {
return nil, err
}
defer rdr.Close() //nolint:errcheck
stat, err := rdr.Stat()
if err != nil {
return nil, err
}
// check that the data is a car file; if it's not, retrieval won't work
_, err = car.ReadHeader(bufio.NewReader(rdr))
if err != nil {
@ -1295,16 +1278,20 @@ func (a *API) ClientCalcCommP(ctx context.Context, inpath string) (*api.CommPRet
return nil, xerrors.Errorf("seek to start: %w", err)
}
pieceReader, pieceSize := padreader.New(rdr, uint64(stat.Size()))
commP, err := ffiwrapper.GeneratePieceCIDFromFile(arbitraryProofType, pieceReader, pieceSize)
w := &writer.Writer{}
_, err = io.CopyBuffer(w, rdr, make([]byte, writer.CommPBuf))
if err != nil {
return nil, xerrors.Errorf("copy into commp writer: %w", err)
}
commp, err := w.Sum()
if err != nil {
return nil, xerrors.Errorf("computing commP failed: %w", err)
}
return &api.CommPRet{
Root: commP,
Size: pieceSize,
Root: commp.PieceCID,
Size: commp.PieceSize.Unpadded(),
}, nil
}

View File

@ -676,18 +676,18 @@ func (tc *transferConverter) convertTransfer(channelID datatransfer.ChannelID, h
}
var channelIDPtr *datatransfer.ChannelID
if !hasChannelID {
diagnostics = append(diagnostics, fmt.Sprintf("No data transfer channel id for GraphSync request ID %d", requestID))
diagnostics = append(diagnostics, fmt.Sprintf("No data transfer channel id for GraphSync request ID %s", requestID))
} else {
channelIDPtr = &channelID
if isCurrentChannelRequest && !hasState {
diagnostics = append(diagnostics, fmt.Sprintf("No current request state for data transfer channel id %s", channelID))
} else if !isCurrentChannelRequest && hasState {
diagnostics = append(diagnostics, fmt.Sprintf("Graphsync request %d is a previous request on data transfer channel id %s that was restarted, but it is still running", requestID, channelID))
diagnostics = append(diagnostics, fmt.Sprintf("Graphsync request %s is a previous request on data transfer channel id %s that was restarted, but it is still running", requestID, channelID))
}
}
diagnostics = append(diagnostics, tc.gsDiagnostics[requestID]...)
transfer := &api.GraphSyncDataTransfer{
RequestID: requestID,
RequestID: &requestID,
RequestState: stateString,
IsCurrentChannelRequest: isCurrentChannelRequest,
ChannelID: channelIDPtr,
@ -717,7 +717,7 @@ func (tc *transferConverter) collectRemainingTransfers() {
channelID := channelID
cs := api.NewDataTransferChannel(channelState.SelfPeer(), channelState)
transfer := &api.GraphSyncDataTransfer{
RequestID: graphsync.RequestID(-1),
RequestID: nil,
RequestState: "graphsync state unknown",
IsCurrentChannelRequest: false,
ChannelID: &channelID,