From 0c75dd3865822e1f4f351855053853f32a03ffbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 14 Aug 2020 16:06:53 +0200 Subject: [PATCH 01/10] Support AddPiece on workers --- api/api_worker.go | 2 + api/apistruct/struct.go | 5 + api/client/client.go | 21 +++- cmd/lotus-seal-worker/main.go | 13 ++- extern/storage-fsm/garbage.go | 9 +- extern/storage-fsm/nullreader.go | 20 ++++ extern/storage-fsm/sealing.go | 2 +- go.mod | 2 +- go.sum | 5 +- lib/rpcenc/reader.go | 170 +++++++++++++++++++++++++++++++ lib/rpcenc/reader_test.go | 90 ++++++++++++++++ node/impl/remoteworker.go | 5 - 12 files changed, 325 insertions(+), 19 deletions(-) create mode 100644 extern/storage-fsm/nullreader.go create mode 100644 lib/rpcenc/reader.go create mode 100644 lib/rpcenc/reader_test.go diff --git a/api/api_worker.go b/api/api_worker.go index 3eb0e15f8..884e0c348 100644 --- a/api/api_worker.go +++ b/api/api_worker.go @@ -23,6 +23,8 @@ type WorkerAPI interface { Paths(context.Context) ([]stores.StoragePath, error) Info(context.Context) (storiface.WorkerInfo, error) + AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) + storage.Sealer MoveStorage(ctx context.Context, sector abi.SectorID) error diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 4ecea5bde..62acd9a4b 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -300,6 +300,7 @@ type WorkerStruct struct { Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"` Info func(context.Context) (storiface.WorkerInfo, error) `perm:"admin"` + AddPiece func(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) `perm:"admin"` SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"` SealPreCommit2 func(context.Context, abi.SectorID, storage.PreCommit1Out) (cids storage.SectorCids, err error) `perm:"admin"` SealCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) `perm:"admin"` @@ -1147,6 +1148,10 @@ func (w *WorkerStruct) Info(ctx context.Context) (storiface.WorkerInfo, error) { return w.Internal.Info(ctx) } +func (w *WorkerStruct) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) { + return w.Internal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData) +} + func (w *WorkerStruct) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) { return w.Internal.SealPreCommit1(ctx, sector, ticket, pieces) } diff --git a/api/client/client.go b/api/client/client.go index 4029838fa..fae6b6765 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -2,11 +2,14 @@ package client import ( "net/http" + "net/url" + "path" "github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/apistruct" + "github.com/filecoin-project/lotus/lib/rpcenc" ) // NewCommonRPC creates a new http jsonrpc client. @@ -49,12 +52,28 @@ func NewStorageMinerRPC(addr string, requestHeader http.Header) (api.StorageMine } func NewWorkerRPC(addr string, requestHeader http.Header) (api.WorkerAPI, jsonrpc.ClientCloser, error) { + u, err := url.Parse(addr) + if err != nil { + return nil, nil, err + } + switch u.Scheme { + case "ws": + u.Scheme = "http" + case "wss": + u.Scheme = "https" + } + ///rpc/v0 -> /rpc/streams/v0/push + + u.Path = path.Join(u.Path, "../streams/v0/push") + + readerEncoder := rpcenc.ReaderParamEncoder(u.String()) + var res apistruct.WorkerStruct closer, err := jsonrpc.NewMergeClient(addr, "Filecoin", []interface{}{ &res.Internal, }, - requestHeader, + requestHeader, readerEncoder, ) return &res, closer, err diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index 3fb588b26..128862e0d 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -29,6 +29,7 @@ import ( "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/lib/lotuslog" + "github.com/filecoin-project/lotus/lib/rpcenc" "github.com/filecoin-project/lotus/node/repo" sectorstorage "github.com/filecoin-project/sector-storage" "github.com/filecoin-project/sector-storage/sealtasks" @@ -105,6 +106,11 @@ var runCmd = &cli.Command{ Name: "no-local-storage", Usage: "don't use storageminer repo for sector storage", }, + &cli.BoolFlag{ + Name: "addpiece", + Usage: "enable addpiece", + Value: true, + }, &cli.BoolFlag{ Name: "precommit1", Usage: "enable precommit1 (32G sectors: 1 core, 128GiB Memory)", @@ -204,6 +210,9 @@ var runCmd = &cli.Command{ taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize) + if cctx.Bool("addpiece") { + taskTypes = append(taskTypes, sealtasks.TTAddPiece) + } if cctx.Bool("precommit1") { taskTypes = append(taskTypes, sealtasks.TTPreCommit1) } @@ -337,10 +346,12 @@ var runCmd = &cli.Command{ log.Info("Setting up control endpoint at " + address) - rpcServer := jsonrpc.NewServer() + readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder() + rpcServer := jsonrpc.NewServer(readerServerOpt) rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(workerApi)) mux.Handle("/rpc/v0", rpcServer) + mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler) mux.PathPrefix("/remote").HandlerFunc((&stores.FetchHandler{Local: localStore}).ServeHTTP) mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof diff --git a/extern/storage-fsm/garbage.go b/extern/storage-fsm/garbage.go index 0464259c3..d8cdb4248 100644 --- a/extern/storage-fsm/garbage.go +++ b/extern/storage-fsm/garbage.go @@ -2,19 +2,12 @@ package sealing import ( "context" - "io" "golang.org/x/xerrors" "github.com/filecoin-project/specs-actors/actors/abi" - - nr "github.com/filecoin-project/storage-fsm/lib/nullreader" ) -func (m *Sealing) pledgeReader(size abi.UnpaddedPieceSize) io.Reader { - return io.LimitReader(&nr.Reader{}, int64(size)) -} - func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) { if len(sizes) == 0 { return nil, nil @@ -24,7 +17,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorID, exist out := make([]abi.PieceInfo, len(sizes)) for i, size := range sizes { - ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, m.pledgeReader(size)) + ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, NewNullReader(size)) if err != nil { return nil, xerrors.Errorf("add piece: %w", err) } diff --git a/extern/storage-fsm/nullreader.go b/extern/storage-fsm/nullreader.go new file mode 100644 index 000000000..518584599 --- /dev/null +++ b/extern/storage-fsm/nullreader.go @@ -0,0 +1,20 @@ +package sealing + +import ( + "io" + + "github.com/filecoin-project/specs-actors/actors/abi" + nr "github.com/filecoin-project/storage-fsm/lib/nullreader" +) + +type NullReader struct { + *io.LimitedReader +} + +func NewNullReader(size abi.UnpaddedPieceSize) io.Reader { + return &NullReader{(io.LimitReader(&nr.Reader{}, int64(size))).(*io.LimitedReader)} +} + +func (m NullReader) NullBytes() int64 { + return m.N +} \ No newline at end of file diff --git a/extern/storage-fsm/sealing.go b/extern/storage-fsm/sealing.go index 1144ce334..c704207f6 100644 --- a/extern/storage-fsm/sealing.go +++ b/extern/storage-fsm/sealing.go @@ -146,7 +146,7 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec } for _, p := range pads { - err = m.addPiece(ctx, sid, p.Unpadded(), m.pledgeReader(p.Unpadded()), nil) + err = m.addPiece(ctx, sid, p.Unpadded(), NewNullReader(p.Unpadded()), nil) if err != nil { m.unsealedInfoMap.mux.Unlock() return 0, 0, xerrors.Errorf("writing pads: %w", err) diff --git a/go.mod b/go.mod index 13b154c26..c3f44af0d 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/filecoin-project/go-data-transfer v0.6.1 github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814021159-7be996ed8ccb - github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 + github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200727193017-75111571155b github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261 diff --git a/go.sum b/go.sum index 808632267..20ad09ed2 100644 --- a/go.sum +++ b/go.sum @@ -242,8 +242,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1 github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814021159-7be996ed8ccb h1:eCLqJb1tmhMCWUFAfJuSyyv/qLrqiAhICLjhUcbi4x8= github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814021159-7be996ed8ccb/go.mod h1:SJApXAKr5jyGpbzDEOhvemui0pih7hhT8r2MXJxCP1E= -github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 h1:Jc7vkplmZYVuaEcSXGHDwefvZIdoyyaoGDLqSr8Svms= -github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24/go.mod h1:j6zV//WXIIY5kky873Q3iIKt/ViOE8rcijovmpxrXzM= +github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200727193017-75111571155b h1:t0PXF+dR91A+yXJ16cX5E5ByQwaMcnVjPMcpY2qXDYQ= +github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200727193017-75111571155b/go.mod h1:qABZ0IE+F15++XdVZgKCE919XWw54DKrbe4Wdx3jWrE= github.com/filecoin-project/go-multistore v0.0.3 h1:vaRBY4YiA2UZFPK57RNuewypB8u0DzzQwqsL0XarpnI= github.com/filecoin-project/go-multistore v0.0.3/go.mod h1:kaNqCC4IhU4B1uyr7YWFHd23TL4KM32aChS0jNkyUvQ= github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs= @@ -385,6 +385,7 @@ github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c/go.mod h1:wJfORR github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f h1:KMlcu9X58lhTA/KrfX8Bi1LQSO4pzoVjTiL3h4Jk+Zk= github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= +github.com/gorilla/handlers v1.4.2 h1:0QniY0USkHQ1RGCLfKxeNHK9bkDHGRYGNDFBCS+YARg= github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= diff --git a/lib/rpcenc/reader.go b/lib/rpcenc/reader.go new file mode 100644 index 000000000..2b205f0e8 --- /dev/null +++ b/lib/rpcenc/reader.go @@ -0,0 +1,170 @@ +package rpcenc + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "path" + "reflect" + "strconv" + "sync" + + "github.com/google/uuid" + logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-jsonrpc" + "github.com/filecoin-project/specs-actors/actors/abi" + sealing "github.com/filecoin-project/storage-fsm" +) + +var log = logging.Logger("rpc") + +type StreamType string +const ( + Null StreamType = "null" + PushStream StreamType = "push" + // TODO: Data transfer handoff to workers? +) + +type ReaderStream struct { + Type StreamType + Info string +} + +func ReaderParamEncoder(addr string) jsonrpc.Option { + return jsonrpc.WithParamEncoder(new(io.Reader), func(value reflect.Value) (reflect.Value, error) { + r := value.Interface().(io.Reader) + + if r, ok := r.(*sealing.NullReader); ok { + return reflect.ValueOf(ReaderStream{Type: Null, Info: fmt.Sprint(r.N)}), nil + } + + reqID := uuid.New() + u, _ := url.Parse(addr) + u.Path = path.Join(u.Path, reqID.String()) + + go func() { + // TODO: figure out errors here + + resp, err := http.Post(u.String(), "application/octet-stream", r) + if err != nil { + log.Errorf("sending reader param: %+v", err) + return + } + + defer resp.Body.Close() + + if resp.StatusCode != 200 { + log.Errorf("sending reader param: non-200 status: ", resp.Status) + return + } + + }() + + return reflect.ValueOf(ReaderStream{Type: PushStream, Info: reqID.String()}), nil + }) +} + +type waitReadCloser struct { + io.ReadCloser + wait chan struct{} +} + +func (w *waitReadCloser) Read(p []byte) (int, error) { + n, err := w.ReadCloser.Read(p) + if err != nil { + close(w.wait) + } + return n, err +} + +func (w *waitReadCloser) Close() error { + close(w.wait) + return w.ReadCloser.Close() +} + +func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { + var readersLk sync.Mutex + readers := map[uuid.UUID]chan *waitReadCloser{} + + hnd := func(resp http.ResponseWriter, req *http.Request) { + strId := path.Base(req.URL.Path) + u, err := uuid.Parse(strId) + if err != nil { + http.Error(resp, fmt.Sprintf("parsing reader uuid: %s", err), 400) + } + + readersLk.Lock() + ch, found := readers[u] + if !found { + ch = make(chan *waitReadCloser) + readers[u] = ch + } + readersLk.Unlock() + + wr := &waitReadCloser{ + ReadCloser: req.Body, + wait: make(chan struct{}), + } + + select { + case ch <- wr: + case <-req.Context().Done(): + log.Error("context error in reader stream handler (1): %v", req.Context().Err()) + resp.WriteHeader(500) + return + } + + select { + case <-wr.wait: + case <-req.Context().Done(): + log.Error("context error in reader stream handler (2): %v", req.Context().Err()) + resp.WriteHeader(500) + return + } + + resp.WriteHeader(200) + } + + dec := jsonrpc.WithParamDecoder(new(io.Reader), func(ctx context.Context, b []byte) (reflect.Value, error) { + var rs ReaderStream + if err := json.Unmarshal(b, &rs); err != nil { + return reflect.Value{}, xerrors.Errorf("unmarshaling reader id: %w", err) + } + + if rs.Type == Null { + n, err := strconv.ParseInt(rs.Info, 10, 64) + if err != nil { + return reflect.Value{}, xerrors.Errorf("parsing null byte count: %w", err) + } + + return reflect.ValueOf(sealing.NewNullReader(abi.UnpaddedPieceSize(n))), nil + } + + u, err := uuid.Parse(rs.Info) + if err != nil { + return reflect.Value{}, xerrors.Errorf("parsing reader UUDD: %w", err) + } + + readersLk.Lock() + ch, found := readers[u] + if !found { + ch = make(chan *waitReadCloser) + readers[u] = ch + } + readersLk.Unlock() + + select { + case wr := <-ch: + return reflect.ValueOf(wr), nil + case <-ctx.Done(): + return reflect.Value{}, ctx.Err() + } + }) + + return hnd, dec +} diff --git a/lib/rpcenc/reader_test.go b/lib/rpcenc/reader_test.go new file mode 100644 index 000000000..0748f51ef --- /dev/null +++ b/lib/rpcenc/reader_test.go @@ -0,0 +1,90 @@ +package rpcenc + +import ( + "context" + "io" + "io/ioutil" + "net/http/httptest" + "strings" + "testing" + + "github.com/gorilla/mux" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-jsonrpc" + sealing "github.com/filecoin-project/storage-fsm" +) + +type ReaderHandler struct { +} + +func (h *ReaderHandler) ReadAll(ctx context.Context, r io.Reader) ([]byte, error) { + return ioutil.ReadAll(r) +} + +func (h *ReaderHandler) ReadNullLen(ctx context.Context, r io.Reader) (int64, error) { + return r.(*sealing.NullReader).N, nil +} + +func (h *ReaderHandler) ReadUrl(ctx context.Context, u string) (string, error) { + return u, nil +} + +func TestReaderProxy(t *testing.T) { + var client struct { + ReadAll func(ctx context.Context, r io.Reader) ([]byte, error) + } + + serverHandler := &ReaderHandler{} + + readerHandler, readerServerOpt := ReaderParamDecoder() + rpcServer := jsonrpc.NewServer(readerServerOpt) + rpcServer.Register("ReaderHandler", serverHandler) + + mux := mux.NewRouter() + mux.Handle("/rpc/v0", rpcServer) + mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler) + + testServ := httptest.NewServer(mux) + defer testServ.Close() + + re := ReaderParamEncoder("http://"+testServ.Listener.Addr().String()+"/rpc/streams/v0/push") + closer, err := jsonrpc.NewMergeClient("ws://"+testServ.Listener.Addr().String()+"/rpc/v0", "ReaderHandler", []interface{}{&client}, nil, re) + require.NoError(t, err) + + defer closer() + + read, err := client.ReadAll(context.TODO(), strings.NewReader("pooooootato")) + require.NoError(t, err) + require.Equal(t, "pooooootato", string(read), "potatos weren't equal") +} + +func TestNullReaderProxy(t *testing.T) { + var client struct { + ReadAll func(ctx context.Context, r io.Reader) ([]byte, error) + ReadNullLen func(ctx context.Context, r io.Reader) (int64, error) + } + + serverHandler := &ReaderHandler{} + + readerHandler, readerServerOpt := ReaderParamDecoder() + rpcServer := jsonrpc.NewServer(readerServerOpt) + rpcServer.Register("ReaderHandler", serverHandler) + + mux := mux.NewRouter() + mux.Handle("/rpc/v0", rpcServer) + mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler) + + testServ := httptest.NewServer(mux) + defer testServ.Close() + + re := ReaderParamEncoder("http://"+testServ.Listener.Addr().String()+"/rpc/streams/v0/push") + closer, err := jsonrpc.NewMergeClient("ws://"+testServ.Listener.Addr().String()+"/rpc/v0", "ReaderHandler", []interface{}{&client}, nil, re) + require.NoError(t, err) + + defer closer() + + n, err := client.ReadNullLen(context.TODO(), sealing.NewNullReader(1016)) + require.NoError(t, err) + require.Equal(t, int64(1016), n) +} diff --git a/node/impl/remoteworker.go b/node/impl/remoteworker.go index b7cee83ef..a665e7caf 100644 --- a/node/impl/remoteworker.go +++ b/node/impl/remoteworker.go @@ -9,7 +9,6 @@ import ( "github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/specs-actors/actors/abi" - storage2 "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/client" @@ -25,10 +24,6 @@ func (r *remoteWorker) NewSector(ctx context.Context, sector abi.SectorID) error return xerrors.New("unsupported") } -func (r *remoteWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage2.Data) (abi.PieceInfo, error) { - return abi.PieceInfo{}, xerrors.New("unsupported") -} - func connectRemoteWorker(ctx context.Context, fa api.Common, url string) (*remoteWorker, error) { token, err := fa.AuthNew(ctx, []auth.Permission{"admin"}) if err != nil { From 6b5525b8d27876accf57d888eeed7fe1a137714d Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 13 Aug 2020 21:11:32 -0700 Subject: [PATCH 02/10] feat(markets): update markets 0.5.6 update markets to 0.5.6, removing use of validators that moved inside markets --- go.mod | 2 +- go.sum | 4 ++-- node/builder.go | 8 -------- node/modules/client.go | 10 ---------- node/modules/storageminer.go | 20 -------------------- 5 files changed, 3 insertions(+), 41 deletions(-) diff --git a/go.mod b/go.mod index 13b154c26..93fc8a2a6 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v0.6.1 github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f - github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814021159-7be996ed8ccb + github.com/filecoin-project/go-fil-markets v0.5.6 github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 diff --git a/go.sum b/go.sum index 808632267..8c905c508 100644 --- a/go.sum +++ b/go.sum @@ -240,8 +240,8 @@ github.com/filecoin-project/go-data-transfer v0.6.1 h1:EA6X8fSiBRNVVwKm5pA7+njZn github.com/filecoin-project/go-data-transfer v0.6.1/go.mod h1:uRYBRKVBVM12CSusBtVrzDHkVw/3DKZpkxKJVP1Ydas= github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1:GxJzR3oRIMTPtpZ0b7QF8FKPK6/iPAc7trhlL5k/g+s= github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= -github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814021159-7be996ed8ccb h1:eCLqJb1tmhMCWUFAfJuSyyv/qLrqiAhICLjhUcbi4x8= -github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814021159-7be996ed8ccb/go.mod h1:SJApXAKr5jyGpbzDEOhvemui0pih7hhT8r2MXJxCP1E= +github.com/filecoin-project/go-fil-markets v0.5.6 h1:WmBbV0qBU4NvLJ64xROpzrKUbkZxZqszZiEiCGmCEIY= +github.com/filecoin-project/go-fil-markets v0.5.6/go.mod h1:SJApXAKr5jyGpbzDEOhvemui0pih7hhT8r2MXJxCP1E= github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 h1:Jc7vkplmZYVuaEcSXGHDwefvZIdoyyaoGDLqSr8Svms= github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24/go.mod h1:j6zV//WXIIY5kky873Q3iIKt/ViOE8rcijovmpxrXzM= github.com/filecoin-project/go-multistore v0.0.3 h1:vaRBY4YiA2UZFPK57RNuewypB8u0DzzQwqsL0XarpnI= diff --git a/node/builder.go b/node/builder.go index f70072596..cce2a76e5 100644 --- a/node/builder.go +++ b/node/builder.go @@ -108,7 +108,6 @@ const ( HandleIncomingBlocksKey HandleIncomingMessagesKey - RegisterClientValidatorKey HandlePaymentChannelManagerKey // miner @@ -116,7 +115,6 @@ const ( HandleDealsKey HandleRetrievalKey RunSectorServiceKey - RegisterProviderValidatorKey // daemon ExtractApiKey @@ -266,14 +264,11 @@ func Online() Option { Override(new(retrievalmarket.PeerResolver), modules.RetrievalResolver), Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient), - Override(new(dtypes.ClientDealStore), modules.NewClientDealStore), Override(new(dtypes.ClientDatastore), modules.NewClientDatastore), Override(new(dtypes.ClientDataTransfer), modules.NewClientGraphsyncDataTransfer), - Override(new(dtypes.ClientRequestValidator), modules.NewClientRequestValidator), Override(new(modules.ClientDealFunds), modules.NewClientDealFunds), Override(new(storagemarket.StorageClient), modules.StorageClient), Override(new(storagemarket.StorageClientNode), storageadapter.NewClientNodeAdapter), - Override(RegisterClientValidatorKey, modules.RegisterClientValidator), Override(new(beacon.RandomBeacon), modules.RandomBeacon), Override(new(*paychmgr.Store), paychmgr.NewStore), @@ -310,16 +305,13 @@ func Online() Option { Override(new(dtypes.StagingDAG), modules.StagingDAG), Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync), Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider), - Override(new(dtypes.ProviderDealStore), modules.NewProviderDealStore), Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer), - Override(new(dtypes.ProviderRequestValidator), modules.NewProviderRequestValidator), Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore), Override(new(*storedask.StoredAsk), modules.NewStorageAsk), Override(new(dtypes.DealFilter), modules.BasicDealFilter(nil)), Override(new(modules.ProviderDealFunds), modules.NewProviderDealFunds), Override(new(storagemarket.StorageProvider), modules.StorageProvider), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter), - Override(RegisterProviderValidatorKey, modules.RegisterProviderValidator), Override(HandleRetrievalKey, modules.HandleRetrieval), Override(GetParamsKey, modules.GetParams), Override(HandleDealsKey, modules.HandleDeals), diff --git a/node/modules/client.go b/node/modules/client.go index 1d017d61f..80675a3d2 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -21,7 +21,6 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" - "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-storedcounter" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" @@ -101,20 +100,11 @@ func NewClientGraphsyncDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.Grap return dt, nil } -// NewClientDealStore creates a statestore for the client to store its deals -func NewClientDealStore(ds dtypes.ClientDatastore) dtypes.ClientDealStore { - return statestore.New(ds) -} - // NewClientDatastore creates a datastore for the client to store its deals func NewClientDatastore(ds dtypes.MetadataDS) dtypes.ClientDatastore { return namespace.Wrap(ds, datastore.NewKey("/deals/client")) } -func NewClientRequestValidator(deals dtypes.ClientDealStore) dtypes.ClientRequestValidator { - return requestvalidation.NewUnifiedRequestValidator(nil, deals) -} - type ClientDealFunds funds.DealFunds func NewClientDealFunds(ds dtypes.MetadataDS) (ClientDealFunds, error) { diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index ebb0c4986..71ec8fc75 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -36,13 +36,11 @@ import ( "github.com/filecoin-project/go-fil-markets/storagemarket" storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds" - "github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask" smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/go-multistore" paramfetch "github.com/filecoin-project/go-paramfetch" - "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/go-storedcounter" sectorstorage "github.com/filecoin-project/sector-storage" "github.com/filecoin-project/sector-storage/ffiwrapper" @@ -209,15 +207,6 @@ func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h sto }) } -// RegisterProviderValidator is an initialization hook that registers the provider -// request validator with the data transfer module as the validator for -// StorageDataTransferVoucher types -func RegisterProviderValidator(mrv dtypes.ProviderRequestValidator, dtm dtypes.ProviderDataTransfer) { - if err := dtm.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, (*requestvalidation.UnifiedRequestValidator)(mrv)); err != nil { - panic(err) - } -} - // NewProviderDAGServiceDataTransfer returns a data transfer manager that just // uses the provider's Staging DAG service for transfers func NewProviderDAGServiceDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.StagingGraphsync, ds dtypes.MetadataDS) (dtypes.ProviderDataTransfer, error) { @@ -242,11 +231,6 @@ func NewProviderDAGServiceDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.S return dt, nil } -// NewProviderDealStore creates a statestore for the client to store its deals -func NewProviderDealStore(ds dtypes.MetadataDS) dtypes.ProviderDealStore { - return statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/provider"))) -} - // NewProviderPieceStore creates a statestore for storing metadata about pieces // shared by the storage and retrieval providers func NewProviderPieceStore(ds dtypes.MetadataDS) dtypes.ProviderPieceStore { @@ -337,10 +321,6 @@ func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api lapi.FullNode return m, nil } -func NewProviderRequestValidator(deals dtypes.ProviderDealStore) dtypes.ProviderRequestValidator { - return requestvalidation.NewUnifiedRequestValidator(deals, nil) -} - func NewStorageAsk(ctx helpers.MetricsCtx, fapi lapi.FullNode, ds dtypes.MetadataDS, minerAddress dtypes.MinerAddress, spn storagemarket.StorageProviderNode) (*storedask.StoredAsk, error) { mi, err := fapi.StateMinerInfo(ctx, address.Address(minerAddress), types.EmptyTSK) From 25cb3a15e786e5869133fbdb83bf694f421f1178 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 14 Aug 2020 21:03:20 +0200 Subject: [PATCH 03/10] wdpost: Wait for the correct confidence --- storage/wdpost_run.go | 17 +++++++++-------- storage/wdpost_sched.go | 7 +++++-- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index e9e626533..13f5547eb 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -317,12 +317,6 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo return nil, xerrors.Errorf("failed to get chain randomness for windowPost (ts=%d; deadline=%d): %w", ts.Height(), di, err) } - commEpoch := di.Open - commRand, err := s.api.ChainGetRandomnessFromTickets(ctx, ts.Key(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil) - if err != nil { - return nil, xerrors.Errorf("failed to get chain randomness for windowPost (ts=%d; deadline=%d): %w", ts.Height(), di, err) - } - partitions, err := s.api.StateMinerPartitions(ctx, s.actor, di.Index, ts.Key()) if err != nil { return nil, xerrors.Errorf("getting partitions: %w", err) @@ -332,8 +326,6 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo Deadline: di.Index, Partitions: make([]miner.PoStPartition, 0, len(partitions)), Proofs: nil, - ChainCommitEpoch: commEpoch, - ChainCommitRand: commRand, } var sinfos []abi.SectorInfo @@ -425,6 +417,15 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo } elapsed := time.Since(tsStart) + + commEpoch := di.Open + commRand, err := s.api.ChainGetRandomnessFromTickets(ctx, ts.Key(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil) + if err != nil { + return nil, xerrors.Errorf("failed to get chain randomness for windowPost (ts=%d; deadline=%d): %w", ts.Height(), di, err) + } + params.ChainCommitEpoch = commEpoch + params.ChainCommitRand = commRand + log.Infow("submitting window PoSt", "elapsed", elapsed) return params, nil diff --git a/storage/wdpost_sched.go b/storage/wdpost_sched.go index 8adfe2f76..2bf7799d9 100644 --- a/storage/wdpost_sched.go +++ b/storage/wdpost_sched.go @@ -190,8 +190,11 @@ func (s *WindowPoStScheduler) update(ctx context.Context, new *types.TipSet) err s.abortActivePoSt() - if di.Challenge+StartConfidence >= new.Height() { - log.Info("not starting windowPost yet, waiting for startconfidence", di.Challenge, di.Challenge+StartConfidence, new.Height()) + // TODO: wait for di.Challenge here, will give us ~10min more to compute windowpost + // (Need to get correct deadline above, which is tricky) + + if di.Open+StartConfidence >= new.Height() { + log.Info("not starting windowPost yet, waiting for startconfidence", di.Open, di.Open+StartConfidence, new.Height()) return nil } From ec242c1ba75a919f47abb2eb675cac27cef49fed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 14 Aug 2020 21:16:42 +0200 Subject: [PATCH 04/10] Update jsonrpc with stacktraces on panic --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index c3f44af0d..2bc83c54d 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/filecoin-project/go-data-transfer v0.6.1 github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814021159-7be996ed8ccb - github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200727193017-75111571155b + github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200814191451-7f179ed3d4fd github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261 diff --git a/go.sum b/go.sum index 20ad09ed2..c2b5478e2 100644 --- a/go.sum +++ b/go.sum @@ -242,8 +242,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1 github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814021159-7be996ed8ccb h1:eCLqJb1tmhMCWUFAfJuSyyv/qLrqiAhICLjhUcbi4x8= github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814021159-7be996ed8ccb/go.mod h1:SJApXAKr5jyGpbzDEOhvemui0pih7hhT8r2MXJxCP1E= -github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200727193017-75111571155b h1:t0PXF+dR91A+yXJ16cX5E5ByQwaMcnVjPMcpY2qXDYQ= -github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200727193017-75111571155b/go.mod h1:qABZ0IE+F15++XdVZgKCE919XWw54DKrbe4Wdx3jWrE= +github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200814191451-7f179ed3d4fd h1:kgRzfncFVQmIScJjmmh/sYTWeQMUDBzgvSyYslbVAjU= +github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200814191451-7f179ed3d4fd/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4= github.com/filecoin-project/go-multistore v0.0.3 h1:vaRBY4YiA2UZFPK57RNuewypB8u0DzzQwqsL0XarpnI= github.com/filecoin-project/go-multistore v0.0.3/go.mod h1:kaNqCC4IhU4B1uyr7YWFHd23TL4KM32aChS0jNkyUvQ= github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs= From 0359a458e4b41c3920e5768c11f63671ee1b67ca Mon Sep 17 00:00:00 2001 From: Aayush Rajasekaran Date: Fri, 14 Aug 2020 16:44:33 -0400 Subject: [PATCH 05/10] Include more info in StateCirculatingSupply --- api/api_full.go | 10 +++++++++- api/apistruct/struct.go | 4 ++-- chain/stmgr/stmgr.go | 29 ++++++++++++++++++++++------- cli/state.go | 6 +++++- cmd/lotus-chainwatch/syncer/sync.go | 3 ++- node/impl/full/state.go | 12 ++++++------ 6 files changed, 46 insertions(+), 18 deletions(-) diff --git a/api/api_full.go b/api/api_full.go index fe61afa26..2a1d830ed 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -359,7 +359,7 @@ type FullNode interface { StateDealProviderCollateralBounds(context.Context, abi.PaddedPieceSize, bool, types.TipSetKey) (DealCollateralBounds, error) // StateCirculatingSupply returns the circulating supply of Filecoin at the given tipset - StateCirculatingSupply(context.Context, types.TipSetKey) (abi.TokenAmount, error) + StateCirculatingSupply(context.Context, types.TipSetKey) (CirculatingSupply, error) // MethodGroup: Msig // The Msig methods are used to interact with multisig wallets on the @@ -674,6 +674,14 @@ type DealCollateralBounds struct { Max abi.TokenAmount } +type CirculatingSupply struct { + FilVested abi.TokenAmount + FilMined abi.TokenAmount + FilBurnt abi.TokenAmount + FilLocked abi.TokenAmount + FilCirculating abi.TokenAmount +} + type MiningBaseInfo struct { MinerPower types.BigInt NetworkPower types.BigInt diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 4ecea5bde..952cef8ce 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -181,7 +181,7 @@ type FullNodeStruct struct { StateCompute func(context.Context, abi.ChainEpoch, []*types.Message, types.TipSetKey) (*api.ComputeStateOutput, error) `perm:"read"` StateVerifiedClientStatus func(context.Context, address.Address, types.TipSetKey) (*verifreg.DataCap, error) `perm:"read"` StateDealProviderCollateralBounds func(context.Context, abi.PaddedPieceSize, bool, types.TipSetKey) (api.DealCollateralBounds, error) `perm:"read"` - StateCirculatingSupply func(context.Context, types.TipSetKey) (abi.TokenAmount, error) `perm:"read"` + StateCirculatingSupply func(context.Context, types.TipSetKey) (api.CirculatingSupply, error) `perm:"read"` MsigGetAvailableBalance func(context.Context, address.Address, types.TipSetKey) (types.BigInt, error) `perm:"read"` MsigCreate func(context.Context, uint64, []address.Address, abi.ChainEpoch, types.BigInt, address.Address, types.BigInt) (cid.Cid, error) `perm:"sign"` @@ -801,7 +801,7 @@ func (c *FullNodeStruct) StateDealProviderCollateralBounds(ctx context.Context, return c.Internal.StateDealProviderCollateralBounds(ctx, size, verified, tsk) } -func (c *FullNodeStruct) StateCirculatingSupply(ctx context.Context, tsk types.TipSetKey) (abi.TokenAmount, error) { +func (c *FullNodeStruct) StateCirculatingSupply(ctx context.Context, tsk types.TipSetKey) (api.CirculatingSupply, error) { return c.Internal.StateCirculatingSupply(ctx, tsk) } diff --git a/chain/stmgr/stmgr.go b/chain/stmgr/stmgr.go index a3bd9e023..f4943cba3 100644 --- a/chain/stmgr/stmgr.go +++ b/chain/stmgr/stmgr.go @@ -984,34 +984,34 @@ func GetFilBurnt(ctx context.Context, st *state.StateTree) (abi.TokenAmount, err return burnt.Balance, nil } -func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.ChainEpoch, st *state.StateTree) (abi.TokenAmount, error) { +func (sm *StateManager) GetCirculatingSupplyDetailed(ctx context.Context, height abi.ChainEpoch, st *state.StateTree) (api.CirculatingSupply, error) { sm.genesisMsigLk.Lock() defer sm.genesisMsigLk.Unlock() if sm.genInfo == nil { err := sm.setupGenesisActors(ctx) if err != nil { - return big.Zero(), xerrors.Errorf("failed to setup genesis information: %w", err) + return api.CirculatingSupply{}, xerrors.Errorf("failed to setup genesis information: %w", err) } } filVested, err := sm.GetFilVested(ctx, height, st) if err != nil { - return big.Zero(), xerrors.Errorf("failed to calculate filVested: %w", err) + return api.CirculatingSupply{}, xerrors.Errorf("failed to calculate filVested: %w", err) } filMined, err := GetFilMined(ctx, st) if err != nil { - return big.Zero(), xerrors.Errorf("failed to calculate filMined: %w", err) + return api.CirculatingSupply{}, xerrors.Errorf("failed to calculate filMined: %w", err) } filBurnt, err := GetFilBurnt(ctx, st) if err != nil { - return big.Zero(), xerrors.Errorf("failed to calculate filBurnt: %w", err) + return api.CirculatingSupply{}, xerrors.Errorf("failed to calculate filBurnt: %w", err) } filLocked, err := sm.GetFilLocked(ctx, st) if err != nil { - return big.Zero(), xerrors.Errorf("failed to calculate filLocked: %w", err) + return api.CirculatingSupply{}, xerrors.Errorf("failed to calculate filLocked: %w", err) } ret := types.BigAdd(filVested, filMined) @@ -1022,5 +1022,20 @@ func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.Cha ret = big.Zero() } - return ret, nil + return api.CirculatingSupply{ + FilVested: filVested, + FilMined: filMined, + FilBurnt: filBurnt, + FilLocked: filLocked, + FilCirculating: ret, + }, nil +} + +func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.ChainEpoch, st *state.StateTree) (abi.TokenAmount, error) { + csi, err := sm.GetCirculatingSupplyDetailed(ctx, height, st) + if err != nil { + return big.Zero(), err + } + + return csi.FilCirculating, nil } diff --git a/cli/state.go b/cli/state.go index 57b1409c2..0354131f9 100644 --- a/cli/state.go +++ b/cli/state.go @@ -1515,7 +1515,11 @@ var stateCircSupplyCmd = &cli.Command{ return err } - fmt.Println(types.FIL(circ)) + fmt.Println("Circulating supply: ", circ.FilCirculating) + fmt.Println("Mined: ", circ.FilMined) + fmt.Println("Vested: ", circ.FilVested) + fmt.Println("Burnt: ", circ.FilBurnt) + fmt.Println("Locked: ", circ.FilLocked) return nil }, diff --git a/cmd/lotus-chainwatch/syncer/sync.go b/cmd/lotus-chainwatch/syncer/sync.go index 99c0220b0..81cd9e269 100644 --- a/cmd/lotus-chainwatch/syncer/sync.go +++ b/cmd/lotus-chainwatch/syncer/sync.go @@ -287,7 +287,8 @@ func (s *Syncer) storeCirculatingSupply(ctx context.Context, tipset *types.TipSe if _, err := s.db.Exec(fmt.Sprintf(ceInsert, tipset.ParentState().String(), - supply.String(), + // TODO: Include all the details maybe? + supply.FilCirculating.String(), )); err != nil { return xerrors.Errorf("insert circulating supply for tipset (%s): %w", tipset.Key().String(), err) } diff --git a/node/impl/full/state.go b/node/impl/full/state.go index 4f08d324d..5140ba22a 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -1052,7 +1052,7 @@ func (a *StateAPI) StateMinerInitialPledgeCollateral(ctx context.Context, maddr powerState.ThisEpochPledgeCollateral, rewardState.ThisEpochRewardSmoothed, powerState.ThisEpochQAPowerSmoothed, - circSupply, + circSupply.FilCirculating, ) return types.BigDiv(types.BigMul(initialPledge, initialPledgeNum), initialPledgeDen), nil @@ -1149,26 +1149,26 @@ func (a *StateAPI) StateDealProviderCollateralBounds(ctx context.Context, size a return api.DealCollateralBounds{}, xerrors.Errorf("getting total circulating supply: %w") } - min, max := market.DealProviderCollateralBounds(size, verified, powerState.ThisEpochQualityAdjPower, rewardState.ThisEpochBaselinePower, circ) + min, max := market.DealProviderCollateralBounds(size, verified, powerState.ThisEpochQualityAdjPower, rewardState.ThisEpochBaselinePower, circ.FilCirculating) return api.DealCollateralBounds{ Min: types.BigDiv(types.BigMul(min, dealProviderCollateralNum), dealProviderCollateralDen), Max: max, }, nil } -func (a *StateAPI) StateCirculatingSupply(ctx context.Context, tsk types.TipSetKey) (abi.TokenAmount, error) { +func (a *StateAPI) StateCirculatingSupply(ctx context.Context, tsk types.TipSetKey) (api.CirculatingSupply, error) { ts, err := a.Chain.GetTipSetFromKey(tsk) if err != nil { - return abi.TokenAmount{}, xerrors.Errorf("loading tipset %s: %w", tsk, err) + return api.CirculatingSupply{}, xerrors.Errorf("loading tipset %s: %w", tsk, err) } st, _, err := a.StateManager.TipSetState(ctx, ts) if err != nil { - return big.Zero(), err + return api.CirculatingSupply{}, err } cst := cbor.NewCborStore(a.Chain.Blockstore()) sTree, err := state.LoadStateTree(cst, st) - return a.StateManager.GetCirculatingSupply(ctx, ts.Height(), sTree) + return a.StateManager.GetCirculatingSupplyDetailed(ctx, ts.Height(), sTree) } From aa611e49fdd76db047a98b71126b22f1a9ecae00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 14 Aug 2020 23:12:37 +0200 Subject: [PATCH 06/10] rpcenc: Add timeout for readers --- lib/rpcenc/reader.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/lib/rpcenc/reader.go b/lib/rpcenc/reader.go index 2b205f0e8..346c551a1 100644 --- a/lib/rpcenc/reader.go +++ b/lib/rpcenc/reader.go @@ -11,6 +11,7 @@ import ( "reflect" "strconv" "sync" + "time" "github.com/google/uuid" logging "github.com/ipfs/go-log/v2" @@ -21,7 +22,9 @@ import ( sealing "github.com/filecoin-project/storage-fsm" ) -var log = logging.Logger("rpc") +var log = logging.Logger("rpcenc") + +var Timeout = 30 * time.Second type StreamType string const ( @@ -111,10 +114,14 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { wait: make(chan struct{}), } + tctx, cancel := context.WithTimeout(req.Context(), Timeout) + defer cancel() + select { case ch <- wr: - case <-req.Context().Done(): - log.Error("context error in reader stream handler (1): %v", req.Context().Err()) + case <-tctx.Done(): + close(ch) + log.Error("context error in reader stream handler (1): %v", tctx.Err()) resp.WriteHeader(500) return } @@ -158,8 +165,15 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { } readersLk.Unlock() + ctx, cancel := context.WithTimeout(ctx, Timeout) + defer cancel() + select { - case wr := <-ch: + case wr, ok := <-ch: + if !ok { + return reflect.Value{}, xerrors.Errorf("handler timed out") + } + return reflect.ValueOf(wr), nil case <-ctx.Done(): return reflect.Value{}, ctx.Err() From d9ad6fb8c87a20a7b47f0c7ea0c3a1fd4ec20fb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 14 Aug 2020 23:24:41 +0200 Subject: [PATCH 07/10] worker: Make disconnects less bad --- api/client/client.go | 8 +++++--- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/api/client/client.go b/api/client/client.go index fae6b6765..b62a9fe59 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -4,6 +4,7 @@ import ( "net/http" "net/url" "path" + "time" "github.com/filecoin-project/go-jsonrpc" @@ -66,14 +67,15 @@ func NewWorkerRPC(addr string, requestHeader http.Header) (api.WorkerAPI, jsonrp u.Path = path.Join(u.Path, "../streams/v0/push") - readerEncoder := rpcenc.ReaderParamEncoder(u.String()) - var res apistruct.WorkerStruct closer, err := jsonrpc.NewMergeClient(addr, "Filecoin", []interface{}{ &res.Internal, }, - requestHeader, readerEncoder, + requestHeader, + rpcenc.ReaderParamEncoder(u.String()), + jsonrpc.WithNoReconnect(), + jsonrpc.WithWriteTimeout(30 * time.Second), ) return &res, closer, err diff --git a/go.mod b/go.mod index a51d28e8b..018a79e68 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/filecoin-project/go-data-transfer v0.6.1 github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f github.com/filecoin-project/go-fil-markets v0.5.6 - github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200814191451-7f179ed3d4fd + github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200814204552-b33a1ad44a4c github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261 diff --git a/go.sum b/go.sum index ba1f217bd..b5c2566b6 100644 --- a/go.sum +++ b/go.sum @@ -242,8 +242,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1 github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-markets v0.5.6 h1:WmBbV0qBU4NvLJ64xROpzrKUbkZxZqszZiEiCGmCEIY= github.com/filecoin-project/go-fil-markets v0.5.6/go.mod h1:SJApXAKr5jyGpbzDEOhvemui0pih7hhT8r2MXJxCP1E= -github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200814191451-7f179ed3d4fd h1:kgRzfncFVQmIScJjmmh/sYTWeQMUDBzgvSyYslbVAjU= -github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200814191451-7f179ed3d4fd/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4= +github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200814204552-b33a1ad44a4c h1:FB8d8m2qbmdOovxEu40m4yvpLuHDDpLJKQyck9IPFSg= +github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200814204552-b33a1ad44a4c/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4= github.com/filecoin-project/go-multistore v0.0.3 h1:vaRBY4YiA2UZFPK57RNuewypB8u0DzzQwqsL0XarpnI= github.com/filecoin-project/go-multistore v0.0.3/go.mod h1:kaNqCC4IhU4B1uyr7YWFHd23TL4KM32aChS0jNkyUvQ= github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs= From 1ecec49885358c9a1a607c70b1e538da859975f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 14 Aug 2020 23:40:11 +0200 Subject: [PATCH 08/10] build: Bump versions --- build/version.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build/version.go b/build/version.go index 21cec2494..2657853e4 100644 --- a/build/version.go +++ b/build/version.go @@ -25,7 +25,7 @@ func buildType() string { } // BuildVersion is the local build version, set by build system -const BuildVersion = "0.4.4" +const BuildVersion = "0.4.5" func UserVersion() string { return BuildVersion + buildType() + CurrentCommit @@ -53,7 +53,7 @@ func (ve Version) EqMajorMinor(v2 Version) bool { } // APIVersion is a semver version of the rpc api exposed -var APIVersion Version = newVer(0, 10, 0) +var APIVersion Version = newVer(0, 11, 0) //nolint:varcheck,deadcode const ( From 42bb91c2e6d26851146b4d93c3f4b3663e62ce1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 14 Aug 2020 23:40:41 +0200 Subject: [PATCH 09/10] gofmt --- api/client/client.go | 2 +- lib/rpcenc/reader.go | 1 + lib/rpcenc/reader_test.go | 6 +++--- storage/wdpost_run.go | 6 +++--- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/api/client/client.go b/api/client/client.go index b62a9fe59..02fb7c775 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -75,7 +75,7 @@ func NewWorkerRPC(addr string, requestHeader http.Header) (api.WorkerAPI, jsonrp requestHeader, rpcenc.ReaderParamEncoder(u.String()), jsonrpc.WithNoReconnect(), - jsonrpc.WithWriteTimeout(30 * time.Second), + jsonrpc.WithWriteTimeout(30*time.Second), ) return &res, closer, err diff --git a/lib/rpcenc/reader.go b/lib/rpcenc/reader.go index 346c551a1..010884423 100644 --- a/lib/rpcenc/reader.go +++ b/lib/rpcenc/reader.go @@ -27,6 +27,7 @@ var log = logging.Logger("rpcenc") var Timeout = 30 * time.Second type StreamType string + const ( Null StreamType = "null" PushStream StreamType = "push" diff --git a/lib/rpcenc/reader_test.go b/lib/rpcenc/reader_test.go index 0748f51ef..ec4ead52d 100644 --- a/lib/rpcenc/reader_test.go +++ b/lib/rpcenc/reader_test.go @@ -48,7 +48,7 @@ func TestReaderProxy(t *testing.T) { testServ := httptest.NewServer(mux) defer testServ.Close() - re := ReaderParamEncoder("http://"+testServ.Listener.Addr().String()+"/rpc/streams/v0/push") + re := ReaderParamEncoder("http://" + testServ.Listener.Addr().String() + "/rpc/streams/v0/push") closer, err := jsonrpc.NewMergeClient("ws://"+testServ.Listener.Addr().String()+"/rpc/v0", "ReaderHandler", []interface{}{&client}, nil, re) require.NoError(t, err) @@ -61,7 +61,7 @@ func TestReaderProxy(t *testing.T) { func TestNullReaderProxy(t *testing.T) { var client struct { - ReadAll func(ctx context.Context, r io.Reader) ([]byte, error) + ReadAll func(ctx context.Context, r io.Reader) ([]byte, error) ReadNullLen func(ctx context.Context, r io.Reader) (int64, error) } @@ -78,7 +78,7 @@ func TestNullReaderProxy(t *testing.T) { testServ := httptest.NewServer(mux) defer testServ.Close() - re := ReaderParamEncoder("http://"+testServ.Listener.Addr().String()+"/rpc/streams/v0/push") + re := ReaderParamEncoder("http://" + testServ.Listener.Addr().String() + "/rpc/streams/v0/push") closer, err := jsonrpc.NewMergeClient("ws://"+testServ.Listener.Addr().String()+"/rpc/v0", "ReaderHandler", []interface{}{&client}, nil, re) require.NoError(t, err) diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index 13f5547eb..dff0fc9fe 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -323,9 +323,9 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo } params := &miner.SubmitWindowedPoStParams{ - Deadline: di.Index, - Partitions: make([]miner.PoStPartition, 0, len(partitions)), - Proofs: nil, + Deadline: di.Index, + Partitions: make([]miner.PoStPartition, 0, len(partitions)), + Proofs: nil, } var sinfos []abi.SectorInfo From 4ca5fef0deda26df8f00f497ff10316f7e12c6ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 14 Aug 2020 23:49:08 +0200 Subject: [PATCH 10/10] rpcenc: fix bug --- lib/rpcenc/reader.go | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/rpcenc/reader.go b/lib/rpcenc/reader.go index 010884423..c5e2438c5 100644 --- a/lib/rpcenc/reader.go +++ b/lib/rpcenc/reader.go @@ -100,6 +100,7 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { u, err := uuid.Parse(strId) if err != nil { http.Error(resp, fmt.Sprintf("parsing reader uuid: %s", err), 400) + return } readersLk.Lock()