Merge pull request #3065 from filecoin-project/next

Update calibration branch from latest next
This commit is contained in:
Łukasz Magiera 2020-08-14 23:51:39 +02:00 committed by GitHub
commit 82033fd5ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 410 additions and 92 deletions

View File

@ -359,7 +359,7 @@ type FullNode interface {
StateDealProviderCollateralBounds(context.Context, abi.PaddedPieceSize, bool, types.TipSetKey) (DealCollateralBounds, error)
// StateCirculatingSupply returns the circulating supply of Filecoin at the given tipset
StateCirculatingSupply(context.Context, types.TipSetKey) (abi.TokenAmount, error)
StateCirculatingSupply(context.Context, types.TipSetKey) (CirculatingSupply, error)
// MethodGroup: Msig
// The Msig methods are used to interact with multisig wallets on the
@ -674,6 +674,14 @@ type DealCollateralBounds struct {
Max abi.TokenAmount
}
type CirculatingSupply struct {
FilVested abi.TokenAmount
FilMined abi.TokenAmount
FilBurnt abi.TokenAmount
FilLocked abi.TokenAmount
FilCirculating abi.TokenAmount
}
type MiningBaseInfo struct {
MinerPower types.BigInt
NetworkPower types.BigInt

View File

@ -23,6 +23,8 @@ type WorkerAPI interface {
Paths(context.Context) ([]stores.StoragePath, error)
Info(context.Context) (storiface.WorkerInfo, error)
AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error)
storage.Sealer
MoveStorage(ctx context.Context, sector abi.SectorID) error

View File

@ -181,7 +181,7 @@ type FullNodeStruct struct {
StateCompute func(context.Context, abi.ChainEpoch, []*types.Message, types.TipSetKey) (*api.ComputeStateOutput, error) `perm:"read"`
StateVerifiedClientStatus func(context.Context, address.Address, types.TipSetKey) (*verifreg.DataCap, error) `perm:"read"`
StateDealProviderCollateralBounds func(context.Context, abi.PaddedPieceSize, bool, types.TipSetKey) (api.DealCollateralBounds, error) `perm:"read"`
StateCirculatingSupply func(context.Context, types.TipSetKey) (abi.TokenAmount, error) `perm:"read"`
StateCirculatingSupply func(context.Context, types.TipSetKey) (api.CirculatingSupply, error) `perm:"read"`
MsigGetAvailableBalance func(context.Context, address.Address, types.TipSetKey) (types.BigInt, error) `perm:"read"`
MsigCreate func(context.Context, uint64, []address.Address, abi.ChainEpoch, types.BigInt, address.Address, types.BigInt) (cid.Cid, error) `perm:"sign"`
@ -300,6 +300,7 @@ type WorkerStruct struct {
Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"`
Info func(context.Context) (storiface.WorkerInfo, error) `perm:"admin"`
AddPiece func(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) `perm:"admin"`
SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"`
SealPreCommit2 func(context.Context, abi.SectorID, storage.PreCommit1Out) (cids storage.SectorCids, err error) `perm:"admin"`
SealCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) `perm:"admin"`
@ -801,7 +802,7 @@ func (c *FullNodeStruct) StateDealProviderCollateralBounds(ctx context.Context,
return c.Internal.StateDealProviderCollateralBounds(ctx, size, verified, tsk)
}
func (c *FullNodeStruct) StateCirculatingSupply(ctx context.Context, tsk types.TipSetKey) (abi.TokenAmount, error) {
func (c *FullNodeStruct) StateCirculatingSupply(ctx context.Context, tsk types.TipSetKey) (api.CirculatingSupply, error) {
return c.Internal.StateCirculatingSupply(ctx, tsk)
}
@ -1147,6 +1148,10 @@ func (w *WorkerStruct) Info(ctx context.Context) (storiface.WorkerInfo, error) {
return w.Internal.Info(ctx)
}
func (w *WorkerStruct) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
return w.Internal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
}
func (w *WorkerStruct) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
return w.Internal.SealPreCommit1(ctx, sector, ticket, pieces)
}

View File

@ -2,11 +2,15 @@ package client
import (
"net/http"
"net/url"
"path"
"time"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/lib/rpcenc"
)
// NewCommonRPC creates a new http jsonrpc client.
@ -49,12 +53,29 @@ func NewStorageMinerRPC(addr string, requestHeader http.Header) (api.StorageMine
}
func NewWorkerRPC(addr string, requestHeader http.Header) (api.WorkerAPI, jsonrpc.ClientCloser, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, nil, err
}
switch u.Scheme {
case "ws":
u.Scheme = "http"
case "wss":
u.Scheme = "https"
}
///rpc/v0 -> /rpc/streams/v0/push
u.Path = path.Join(u.Path, "../streams/v0/push")
var res apistruct.WorkerStruct
closer, err := jsonrpc.NewMergeClient(addr, "Filecoin",
[]interface{}{
&res.Internal,
},
requestHeader,
rpcenc.ReaderParamEncoder(u.String()),
jsonrpc.WithNoReconnect(),
jsonrpc.WithWriteTimeout(30*time.Second),
)
return &res, closer, err

View File

@ -25,7 +25,7 @@ func buildType() string {
}
// BuildVersion is the local build version, set by build system
const BuildVersion = "0.4.4"
const BuildVersion = "0.4.5"
func UserVersion() string {
return BuildVersion + buildType() + CurrentCommit
@ -53,7 +53,7 @@ func (ve Version) EqMajorMinor(v2 Version) bool {
}
// APIVersion is a semver version of the rpc api exposed
var APIVersion Version = newVer(0, 10, 0)
var APIVersion Version = newVer(0, 11, 0)
//nolint:varcheck,deadcode
const (

View File

@ -984,34 +984,34 @@ func GetFilBurnt(ctx context.Context, st *state.StateTree) (abi.TokenAmount, err
return burnt.Balance, nil
}
func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.ChainEpoch, st *state.StateTree) (abi.TokenAmount, error) {
func (sm *StateManager) GetCirculatingSupplyDetailed(ctx context.Context, height abi.ChainEpoch, st *state.StateTree) (api.CirculatingSupply, error) {
sm.genesisMsigLk.Lock()
defer sm.genesisMsigLk.Unlock()
if sm.genInfo == nil {
err := sm.setupGenesisActors(ctx)
if err != nil {
return big.Zero(), xerrors.Errorf("failed to setup genesis information: %w", err)
return api.CirculatingSupply{}, xerrors.Errorf("failed to setup genesis information: %w", err)
}
}
filVested, err := sm.GetFilVested(ctx, height, st)
if err != nil {
return big.Zero(), xerrors.Errorf("failed to calculate filVested: %w", err)
return api.CirculatingSupply{}, xerrors.Errorf("failed to calculate filVested: %w", err)
}
filMined, err := GetFilMined(ctx, st)
if err != nil {
return big.Zero(), xerrors.Errorf("failed to calculate filMined: %w", err)
return api.CirculatingSupply{}, xerrors.Errorf("failed to calculate filMined: %w", err)
}
filBurnt, err := GetFilBurnt(ctx, st)
if err != nil {
return big.Zero(), xerrors.Errorf("failed to calculate filBurnt: %w", err)
return api.CirculatingSupply{}, xerrors.Errorf("failed to calculate filBurnt: %w", err)
}
filLocked, err := sm.GetFilLocked(ctx, st)
if err != nil {
return big.Zero(), xerrors.Errorf("failed to calculate filLocked: %w", err)
return api.CirculatingSupply{}, xerrors.Errorf("failed to calculate filLocked: %w", err)
}
ret := types.BigAdd(filVested, filMined)
@ -1022,5 +1022,20 @@ func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.Cha
ret = big.Zero()
}
return ret, nil
return api.CirculatingSupply{
FilVested: filVested,
FilMined: filMined,
FilBurnt: filBurnt,
FilLocked: filLocked,
FilCirculating: ret,
}, nil
}
func (sm *StateManager) GetCirculatingSupply(ctx context.Context, height abi.ChainEpoch, st *state.StateTree) (abi.TokenAmount, error) {
csi, err := sm.GetCirculatingSupplyDetailed(ctx, height, st)
if err != nil {
return big.Zero(), err
}
return csi.FilCirculating, nil
}

View File

@ -1515,7 +1515,11 @@ var stateCircSupplyCmd = &cli.Command{
return err
}
fmt.Println(types.FIL(circ))
fmt.Println("Circulating supply: ", circ.FilCirculating)
fmt.Println("Mined: ", circ.FilMined)
fmt.Println("Vested: ", circ.FilVested)
fmt.Println("Burnt: ", circ.FilBurnt)
fmt.Println("Locked: ", circ.FilLocked)
return nil
},

View File

@ -287,7 +287,8 @@ func (s *Syncer) storeCirculatingSupply(ctx context.Context, tipset *types.TipSe
if _, err := s.db.Exec(fmt.Sprintf(ceInsert,
tipset.ParentState().String(),
supply.String(),
// TODO: Include all the details maybe?
supply.FilCirculating.String(),
)); err != nil {
return xerrors.Errorf("insert circulating supply for tipset (%s): %w", tipset.Key().String(), err)
}

View File

@ -29,6 +29,7 @@ import (
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/node/repo"
sectorstorage "github.com/filecoin-project/sector-storage"
"github.com/filecoin-project/sector-storage/sealtasks"
@ -105,6 +106,11 @@ var runCmd = &cli.Command{
Name: "no-local-storage",
Usage: "don't use storageminer repo for sector storage",
},
&cli.BoolFlag{
Name: "addpiece",
Usage: "enable addpiece",
Value: true,
},
&cli.BoolFlag{
Name: "precommit1",
Usage: "enable precommit1 (32G sectors: 1 core, 128GiB Memory)",
@ -204,6 +210,9 @@ var runCmd = &cli.Command{
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize)
if cctx.Bool("addpiece") {
taskTypes = append(taskTypes, sealtasks.TTAddPiece)
}
if cctx.Bool("precommit1") {
taskTypes = append(taskTypes, sealtasks.TTPreCommit1)
}
@ -337,10 +346,12 @@ var runCmd = &cli.Command{
log.Info("Setting up control endpoint at " + address)
rpcServer := jsonrpc.NewServer()
readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(readerServerOpt)
rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(workerApi))
mux.Handle("/rpc/v0", rpcServer)
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
mux.PathPrefix("/remote").HandlerFunc((&stores.FetchHandler{Local: localStore}).ServeHTTP)
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof

View File

@ -2,19 +2,12 @@ package sealing
import (
"context"
"io"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi"
nr "github.com/filecoin-project/storage-fsm/lib/nullreader"
)
func (m *Sealing) pledgeReader(size abi.UnpaddedPieceSize) io.Reader {
return io.LimitReader(&nr.Reader{}, int64(size))
}
func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) {
if len(sizes) == 0 {
return nil, nil
@ -24,7 +17,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorID, exist
out := make([]abi.PieceInfo, len(sizes))
for i, size := range sizes {
ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, m.pledgeReader(size))
ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, NewNullReader(size))
if err != nil {
return nil, xerrors.Errorf("add piece: %w", err)
}

20
extern/storage-fsm/nullreader.go vendored Normal file
View File

@ -0,0 +1,20 @@
package sealing
import (
"io"
"github.com/filecoin-project/specs-actors/actors/abi"
nr "github.com/filecoin-project/storage-fsm/lib/nullreader"
)
type NullReader struct {
*io.LimitedReader
}
func NewNullReader(size abi.UnpaddedPieceSize) io.Reader {
return &NullReader{(io.LimitReader(&nr.Reader{}, int64(size))).(*io.LimitedReader)}
}
func (m NullReader) NullBytes() int64 {
return m.N
}

View File

@ -146,7 +146,7 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
}
for _, p := range pads {
err = m.addPiece(ctx, sid, p.Unpadded(), m.pledgeReader(p.Unpadded()), nil)
err = m.addPiece(ctx, sid, p.Unpadded(), NewNullReader(p.Unpadded()), nil)
if err != nil {
m.unsealedInfoMap.mux.Unlock()
return 0, 0, xerrors.Errorf("writing pads: %w", err)

4
go.mod
View File

@ -27,8 +27,8 @@ require (
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v0.6.1
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f
github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814021159-7be996ed8ccb
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24
github.com/filecoin-project/go-fil-markets v0.5.6
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200814204552-b33a1ad44a4c
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261

9
go.sum
View File

@ -240,10 +240,10 @@ github.com/filecoin-project/go-data-transfer v0.6.1 h1:EA6X8fSiBRNVVwKm5pA7+njZn
github.com/filecoin-project/go-data-transfer v0.6.1/go.mod h1:uRYBRKVBVM12CSusBtVrzDHkVw/3DKZpkxKJVP1Ydas=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1:GxJzR3oRIMTPtpZ0b7QF8FKPK6/iPAc7trhlL5k/g+s=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814021159-7be996ed8ccb h1:eCLqJb1tmhMCWUFAfJuSyyv/qLrqiAhICLjhUcbi4x8=
github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814021159-7be996ed8ccb/go.mod h1:SJApXAKr5jyGpbzDEOhvemui0pih7hhT8r2MXJxCP1E=
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 h1:Jc7vkplmZYVuaEcSXGHDwefvZIdoyyaoGDLqSr8Svms=
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24/go.mod h1:j6zV//WXIIY5kky873Q3iIKt/ViOE8rcijovmpxrXzM=
github.com/filecoin-project/go-fil-markets v0.5.6 h1:WmBbV0qBU4NvLJ64xROpzrKUbkZxZqszZiEiCGmCEIY=
github.com/filecoin-project/go-fil-markets v0.5.6/go.mod h1:SJApXAKr5jyGpbzDEOhvemui0pih7hhT8r2MXJxCP1E=
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200814204552-b33a1ad44a4c h1:FB8d8m2qbmdOovxEu40m4yvpLuHDDpLJKQyck9IPFSg=
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200814204552-b33a1ad44a4c/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4=
github.com/filecoin-project/go-multistore v0.0.3 h1:vaRBY4YiA2UZFPK57RNuewypB8u0DzzQwqsL0XarpnI=
github.com/filecoin-project/go-multistore v0.0.3/go.mod h1:kaNqCC4IhU4B1uyr7YWFHd23TL4KM32aChS0jNkyUvQ=
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs=
@ -385,6 +385,7 @@ github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c/go.mod h1:wJfORR
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f h1:KMlcu9X58lhTA/KrfX8Bi1LQSO4pzoVjTiL3h4Jk+Zk=
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/handlers v1.4.2 h1:0QniY0USkHQ1RGCLfKxeNHK9bkDHGRYGNDFBCS+YARg=
github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=

186
lib/rpcenc/reader.go Normal file
View File

@ -0,0 +1,186 @@
package rpcenc
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"path"
"reflect"
"strconv"
"sync"
"time"
"github.com/google/uuid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/specs-actors/actors/abi"
sealing "github.com/filecoin-project/storage-fsm"
)
var log = logging.Logger("rpcenc")
var Timeout = 30 * time.Second
type StreamType string
const (
Null StreamType = "null"
PushStream StreamType = "push"
// TODO: Data transfer handoff to workers?
)
type ReaderStream struct {
Type StreamType
Info string
}
func ReaderParamEncoder(addr string) jsonrpc.Option {
return jsonrpc.WithParamEncoder(new(io.Reader), func(value reflect.Value) (reflect.Value, error) {
r := value.Interface().(io.Reader)
if r, ok := r.(*sealing.NullReader); ok {
return reflect.ValueOf(ReaderStream{Type: Null, Info: fmt.Sprint(r.N)}), nil
}
reqID := uuid.New()
u, _ := url.Parse(addr)
u.Path = path.Join(u.Path, reqID.String())
go func() {
// TODO: figure out errors here
resp, err := http.Post(u.String(), "application/octet-stream", r)
if err != nil {
log.Errorf("sending reader param: %+v", err)
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
log.Errorf("sending reader param: non-200 status: ", resp.Status)
return
}
}()
return reflect.ValueOf(ReaderStream{Type: PushStream, Info: reqID.String()}), nil
})
}
type waitReadCloser struct {
io.ReadCloser
wait chan struct{}
}
func (w *waitReadCloser) Read(p []byte) (int, error) {
n, err := w.ReadCloser.Read(p)
if err != nil {
close(w.wait)
}
return n, err
}
func (w *waitReadCloser) Close() error {
close(w.wait)
return w.ReadCloser.Close()
}
func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
var readersLk sync.Mutex
readers := map[uuid.UUID]chan *waitReadCloser{}
hnd := func(resp http.ResponseWriter, req *http.Request) {
strId := path.Base(req.URL.Path)
u, err := uuid.Parse(strId)
if err != nil {
http.Error(resp, fmt.Sprintf("parsing reader uuid: %s", err), 400)
return
}
readersLk.Lock()
ch, found := readers[u]
if !found {
ch = make(chan *waitReadCloser)
readers[u] = ch
}
readersLk.Unlock()
wr := &waitReadCloser{
ReadCloser: req.Body,
wait: make(chan struct{}),
}
tctx, cancel := context.WithTimeout(req.Context(), Timeout)
defer cancel()
select {
case ch <- wr:
case <-tctx.Done():
close(ch)
log.Error("context error in reader stream handler (1): %v", tctx.Err())
resp.WriteHeader(500)
return
}
select {
case <-wr.wait:
case <-req.Context().Done():
log.Error("context error in reader stream handler (2): %v", req.Context().Err())
resp.WriteHeader(500)
return
}
resp.WriteHeader(200)
}
dec := jsonrpc.WithParamDecoder(new(io.Reader), func(ctx context.Context, b []byte) (reflect.Value, error) {
var rs ReaderStream
if err := json.Unmarshal(b, &rs); err != nil {
return reflect.Value{}, xerrors.Errorf("unmarshaling reader id: %w", err)
}
if rs.Type == Null {
n, err := strconv.ParseInt(rs.Info, 10, 64)
if err != nil {
return reflect.Value{}, xerrors.Errorf("parsing null byte count: %w", err)
}
return reflect.ValueOf(sealing.NewNullReader(abi.UnpaddedPieceSize(n))), nil
}
u, err := uuid.Parse(rs.Info)
if err != nil {
return reflect.Value{}, xerrors.Errorf("parsing reader UUDD: %w", err)
}
readersLk.Lock()
ch, found := readers[u]
if !found {
ch = make(chan *waitReadCloser)
readers[u] = ch
}
readersLk.Unlock()
ctx, cancel := context.WithTimeout(ctx, Timeout)
defer cancel()
select {
case wr, ok := <-ch:
if !ok {
return reflect.Value{}, xerrors.Errorf("handler timed out")
}
return reflect.ValueOf(wr), nil
case <-ctx.Done():
return reflect.Value{}, ctx.Err()
}
})
return hnd, dec
}

90
lib/rpcenc/reader_test.go Normal file
View File

@ -0,0 +1,90 @@
package rpcenc
import (
"context"
"io"
"io/ioutil"
"net/http/httptest"
"strings"
"testing"
"github.com/gorilla/mux"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-jsonrpc"
sealing "github.com/filecoin-project/storage-fsm"
)
type ReaderHandler struct {
}
func (h *ReaderHandler) ReadAll(ctx context.Context, r io.Reader) ([]byte, error) {
return ioutil.ReadAll(r)
}
func (h *ReaderHandler) ReadNullLen(ctx context.Context, r io.Reader) (int64, error) {
return r.(*sealing.NullReader).N, nil
}
func (h *ReaderHandler) ReadUrl(ctx context.Context, u string) (string, error) {
return u, nil
}
func TestReaderProxy(t *testing.T) {
var client struct {
ReadAll func(ctx context.Context, r io.Reader) ([]byte, error)
}
serverHandler := &ReaderHandler{}
readerHandler, readerServerOpt := ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(readerServerOpt)
rpcServer.Register("ReaderHandler", serverHandler)
mux := mux.NewRouter()
mux.Handle("/rpc/v0", rpcServer)
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
testServ := httptest.NewServer(mux)
defer testServ.Close()
re := ReaderParamEncoder("http://" + testServ.Listener.Addr().String() + "/rpc/streams/v0/push")
closer, err := jsonrpc.NewMergeClient("ws://"+testServ.Listener.Addr().String()+"/rpc/v0", "ReaderHandler", []interface{}{&client}, nil, re)
require.NoError(t, err)
defer closer()
read, err := client.ReadAll(context.TODO(), strings.NewReader("pooooootato"))
require.NoError(t, err)
require.Equal(t, "pooooootato", string(read), "potatos weren't equal")
}
func TestNullReaderProxy(t *testing.T) {
var client struct {
ReadAll func(ctx context.Context, r io.Reader) ([]byte, error)
ReadNullLen func(ctx context.Context, r io.Reader) (int64, error)
}
serverHandler := &ReaderHandler{}
readerHandler, readerServerOpt := ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(readerServerOpt)
rpcServer.Register("ReaderHandler", serverHandler)
mux := mux.NewRouter()
mux.Handle("/rpc/v0", rpcServer)
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
testServ := httptest.NewServer(mux)
defer testServ.Close()
re := ReaderParamEncoder("http://" + testServ.Listener.Addr().String() + "/rpc/streams/v0/push")
closer, err := jsonrpc.NewMergeClient("ws://"+testServ.Listener.Addr().String()+"/rpc/v0", "ReaderHandler", []interface{}{&client}, nil, re)
require.NoError(t, err)
defer closer()
n, err := client.ReadNullLen(context.TODO(), sealing.NewNullReader(1016))
require.NoError(t, err)
require.Equal(t, int64(1016), n)
}

View File

@ -108,7 +108,6 @@ const (
HandleIncomingBlocksKey
HandleIncomingMessagesKey
RegisterClientValidatorKey
HandlePaymentChannelManagerKey
// miner
@ -116,7 +115,6 @@ const (
HandleDealsKey
HandleRetrievalKey
RunSectorServiceKey
RegisterProviderValidatorKey
// daemon
ExtractApiKey
@ -266,14 +264,11 @@ func Online() Option {
Override(new(retrievalmarket.PeerResolver), modules.RetrievalResolver),
Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient),
Override(new(dtypes.ClientDealStore), modules.NewClientDealStore),
Override(new(dtypes.ClientDatastore), modules.NewClientDatastore),
Override(new(dtypes.ClientDataTransfer), modules.NewClientGraphsyncDataTransfer),
Override(new(dtypes.ClientRequestValidator), modules.NewClientRequestValidator),
Override(new(modules.ClientDealFunds), modules.NewClientDealFunds),
Override(new(storagemarket.StorageClient), modules.StorageClient),
Override(new(storagemarket.StorageClientNode), storageadapter.NewClientNodeAdapter),
Override(RegisterClientValidatorKey, modules.RegisterClientValidator),
Override(new(beacon.RandomBeacon), modules.RandomBeacon),
Override(new(*paychmgr.Store), paychmgr.NewStore),
@ -310,16 +305,13 @@ func Online() Option {
Override(new(dtypes.StagingDAG), modules.StagingDAG),
Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync),
Override(new(retrievalmarket.RetrievalProvider), modules.RetrievalProvider),
Override(new(dtypes.ProviderDealStore), modules.NewProviderDealStore),
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
Override(new(dtypes.ProviderRequestValidator), modules.NewProviderRequestValidator),
Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore),
Override(new(*storedask.StoredAsk), modules.NewStorageAsk),
Override(new(dtypes.DealFilter), modules.BasicDealFilter(nil)),
Override(new(modules.ProviderDealFunds), modules.NewProviderDealFunds),
Override(new(storagemarket.StorageProvider), modules.StorageProvider),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter),
Override(RegisterProviderValidatorKey, modules.RegisterProviderValidator),
Override(HandleRetrievalKey, modules.HandleRetrieval),
Override(GetParamsKey, modules.GetParams),
Override(HandleDealsKey, modules.HandleDeals),

View File

@ -1052,7 +1052,7 @@ func (a *StateAPI) StateMinerInitialPledgeCollateral(ctx context.Context, maddr
powerState.ThisEpochPledgeCollateral,
rewardState.ThisEpochRewardSmoothed,
powerState.ThisEpochQAPowerSmoothed,
circSupply,
circSupply.FilCirculating,
)
return types.BigDiv(types.BigMul(initialPledge, initialPledgeNum), initialPledgeDen), nil
@ -1149,26 +1149,26 @@ func (a *StateAPI) StateDealProviderCollateralBounds(ctx context.Context, size a
return api.DealCollateralBounds{}, xerrors.Errorf("getting total circulating supply: %w")
}
min, max := market.DealProviderCollateralBounds(size, verified, powerState.ThisEpochQualityAdjPower, rewardState.ThisEpochBaselinePower, circ)
min, max := market.DealProviderCollateralBounds(size, verified, powerState.ThisEpochQualityAdjPower, rewardState.ThisEpochBaselinePower, circ.FilCirculating)
return api.DealCollateralBounds{
Min: types.BigDiv(types.BigMul(min, dealProviderCollateralNum), dealProviderCollateralDen),
Max: max,
}, nil
}
func (a *StateAPI) StateCirculatingSupply(ctx context.Context, tsk types.TipSetKey) (abi.TokenAmount, error) {
func (a *StateAPI) StateCirculatingSupply(ctx context.Context, tsk types.TipSetKey) (api.CirculatingSupply, error) {
ts, err := a.Chain.GetTipSetFromKey(tsk)
if err != nil {
return abi.TokenAmount{}, xerrors.Errorf("loading tipset %s: %w", tsk, err)
return api.CirculatingSupply{}, xerrors.Errorf("loading tipset %s: %w", tsk, err)
}
st, _, err := a.StateManager.TipSetState(ctx, ts)
if err != nil {
return big.Zero(), err
return api.CirculatingSupply{}, err
}
cst := cbor.NewCborStore(a.Chain.Blockstore())
sTree, err := state.LoadStateTree(cst, st)
return a.StateManager.GetCirculatingSupply(ctx, ts.Height(), sTree)
return a.StateManager.GetCirculatingSupplyDetailed(ctx, ts.Height(), sTree)
}

View File

@ -9,7 +9,6 @@ import (
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/specs-actors/actors/abi"
storage2 "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/client"
@ -25,10 +24,6 @@ func (r *remoteWorker) NewSector(ctx context.Context, sector abi.SectorID) error
return xerrors.New("unsupported")
}
func (r *remoteWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage2.Data) (abi.PieceInfo, error) {
return abi.PieceInfo{}, xerrors.New("unsupported")
}
func connectRemoteWorker(ctx context.Context, fa api.Common, url string) (*remoteWorker, error) {
token, err := fa.AuthNew(ctx, []auth.Permission{"admin"})
if err != nil {

View File

@ -21,7 +21,6 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/go-storedcounter"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
@ -101,20 +100,11 @@ func NewClientGraphsyncDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.Grap
return dt, nil
}
// NewClientDealStore creates a statestore for the client to store its deals
func NewClientDealStore(ds dtypes.ClientDatastore) dtypes.ClientDealStore {
return statestore.New(ds)
}
// NewClientDatastore creates a datastore for the client to store its deals
func NewClientDatastore(ds dtypes.MetadataDS) dtypes.ClientDatastore {
return namespace.Wrap(ds, datastore.NewKey("/deals/client"))
}
func NewClientRequestValidator(deals dtypes.ClientDealStore) dtypes.ClientRequestValidator {
return requestvalidation.NewUnifiedRequestValidator(nil, deals)
}
type ClientDealFunds funds.DealFunds
func NewClientDealFunds(ds dtypes.MetadataDS) (ClientDealFunds, error) {

View File

@ -36,13 +36,11 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket"
storageimpl "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/funds"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/requestvalidation"
"github.com/filecoin-project/go-fil-markets/storagemarket/impl/storedask"
smnet "github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/go-multistore"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/go-storedcounter"
sectorstorage "github.com/filecoin-project/sector-storage"
"github.com/filecoin-project/sector-storage/ffiwrapper"
@ -209,15 +207,6 @@ func HandleDeals(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, h sto
})
}
// RegisterProviderValidator is an initialization hook that registers the provider
// request validator with the data transfer module as the validator for
// StorageDataTransferVoucher types
func RegisterProviderValidator(mrv dtypes.ProviderRequestValidator, dtm dtypes.ProviderDataTransfer) {
if err := dtm.RegisterVoucherType(&requestvalidation.StorageDataTransferVoucher{}, (*requestvalidation.UnifiedRequestValidator)(mrv)); err != nil {
panic(err)
}
}
// NewProviderDAGServiceDataTransfer returns a data transfer manager that just
// uses the provider's Staging DAG service for transfers
func NewProviderDAGServiceDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.StagingGraphsync, ds dtypes.MetadataDS) (dtypes.ProviderDataTransfer, error) {
@ -242,11 +231,6 @@ func NewProviderDAGServiceDataTransfer(lc fx.Lifecycle, h host.Host, gs dtypes.S
return dt, nil
}
// NewProviderDealStore creates a statestore for the client to store its deals
func NewProviderDealStore(ds dtypes.MetadataDS) dtypes.ProviderDealStore {
return statestore.New(namespace.Wrap(ds, datastore.NewKey("/deals/provider")))
}
// NewProviderPieceStore creates a statestore for storing metadata about pieces
// shared by the storage and retrieval providers
func NewProviderPieceStore(ds dtypes.MetadataDS) dtypes.ProviderPieceStore {
@ -337,10 +321,6 @@ func SetupBlockProducer(lc fx.Lifecycle, ds dtypes.MetadataDS, api lapi.FullNode
return m, nil
}
func NewProviderRequestValidator(deals dtypes.ProviderDealStore) dtypes.ProviderRequestValidator {
return requestvalidation.NewUnifiedRequestValidator(deals, nil)
}
func NewStorageAsk(ctx helpers.MetricsCtx, fapi lapi.FullNode, ds dtypes.MetadataDS, minerAddress dtypes.MinerAddress, spn storagemarket.StorageProviderNode) (*storedask.StoredAsk, error) {
mi, err := fapi.StateMinerInfo(ctx, address.Address(minerAddress), types.EmptyTSK)

View File

@ -317,23 +317,15 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo
return nil, xerrors.Errorf("failed to get chain randomness for windowPost (ts=%d; deadline=%d): %w", ts.Height(), di, err)
}
commEpoch := di.Open
commRand, err := s.api.ChainGetRandomnessFromTickets(ctx, ts.Key(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil)
if err != nil {
return nil, xerrors.Errorf("failed to get chain randomness for windowPost (ts=%d; deadline=%d): %w", ts.Height(), di, err)
}
partitions, err := s.api.StateMinerPartitions(ctx, s.actor, di.Index, ts.Key())
if err != nil {
return nil, xerrors.Errorf("getting partitions: %w", err)
}
params := &miner.SubmitWindowedPoStParams{
Deadline: di.Index,
Partitions: make([]miner.PoStPartition, 0, len(partitions)),
Proofs: nil,
ChainCommitEpoch: commEpoch,
ChainCommitRand: commRand,
Deadline: di.Index,
Partitions: make([]miner.PoStPartition, 0, len(partitions)),
Proofs: nil,
}
var sinfos []abi.SectorInfo
@ -425,6 +417,15 @@ func (s *WindowPoStScheduler) runPost(ctx context.Context, di miner.DeadlineInfo
}
elapsed := time.Since(tsStart)
commEpoch := di.Open
commRand, err := s.api.ChainGetRandomnessFromTickets(ctx, ts.Key(), crypto.DomainSeparationTag_PoStChainCommit, commEpoch, nil)
if err != nil {
return nil, xerrors.Errorf("failed to get chain randomness for windowPost (ts=%d; deadline=%d): %w", ts.Height(), di, err)
}
params.ChainCommitEpoch = commEpoch
params.ChainCommitRand = commRand
log.Infow("submitting window PoSt", "elapsed", elapsed)
return params, nil

View File

@ -190,8 +190,11 @@ func (s *WindowPoStScheduler) update(ctx context.Context, new *types.TipSet) err
s.abortActivePoSt()
if di.Challenge+StartConfidence >= new.Height() {
log.Info("not starting windowPost yet, waiting for startconfidence", di.Challenge, di.Challenge+StartConfidence, new.Height())
// TODO: wait for di.Challenge here, will give us ~10min more to compute windowpost
// (Need to get correct deadline above, which is tricky)
if di.Open+StartConfidence >= new.Height() {
log.Info("not starting windowPost yet, waiting for startconfidence", di.Open, di.Open+StartConfidence, new.Height())
return nil
}