Merge branch 'master' into feat/sector-recovery

This commit is contained in:
Łukasz Magiera 2020-01-22 03:27:00 +01:00
commit 3c934ea21f
37 changed files with 2032 additions and 905 deletions

View File

@ -14,7 +14,7 @@ MODULES:=
CLEAN:=
BINS:=
GOFLAGS+=-ldflags='-X="github.com/filecoin-project/lotus/build".CurrentCommit=+git$(subst -,.,$(shell git describe --always --match=NeVeRmAtCh --dirty 2>/dev/null || git rev-parse --short HEAD 2>/dev/null))'
GOFLAGS+=-ldflags=-X="github.com/filecoin-project/lotus/build".CurrentCommit="+git$(subst -,.,$(shell git describe --always --match=NeVeRmAtCh --dirty 2>/dev/null || git rev-parse --short HEAD 2>/dev/null))"
## FFI

View File

@ -38,6 +38,7 @@ type FullNode interface {
ChainGetNode(ctx context.Context, p string) (interface{}, error)
ChainGetMessage(context.Context, cid.Cid) (*types.Message, error)
ChainGetPath(ctx context.Context, from types.TipSetKey, to types.TipSetKey) ([]*store.HeadChange, error)
ChainExport(context.Context, *types.TipSet) (<-chan []byte, error)
// syncer
SyncState(context.Context) (*SyncState, error)
@ -94,7 +95,7 @@ type FullNode interface {
//ClientListAsks() []Ask
// if tipset is nil, we'll use heaviest
StateCall(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error)
StateCall(context.Context, *types.Message, *types.TipSet) (*MethodCall, error)
StateReplay(context.Context, *types.TipSet, cid.Cid) (*ReplayResults, error)
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error)
StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*ActorState, error)
@ -270,6 +271,11 @@ type ReplayResults struct {
Error string
}
type MethodCall struct {
types.MessageReceipt
Error string
}
type ActiveSync struct {
Base *types.TipSet
Target *types.TipSet

View File

@ -56,6 +56,7 @@ type FullNodeStruct struct {
ChainGetNode func(ctx context.Context, p string) (interface{}, error) `perm:"read"`
ChainGetMessage func(context.Context, cid.Cid) (*types.Message, error) `perm:"read"`
ChainGetPath func(context.Context, types.TipSetKey, types.TipSetKey) ([]*store.HeadChange, error) `perm:"read"`
ChainExport func(context.Context, *types.TipSet) (<-chan []byte, error) `perm:"read"`
SyncState func(context.Context) (*api.SyncState, error) `perm:"read"`
SyncSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"`
@ -98,7 +99,7 @@ type FullNodeStruct struct {
StateMinerPeerID func(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error) `perm:"read"`
StateMinerElectionPeriodStart func(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error) `perm:"read"`
StateMinerSectorSize func(context.Context, address.Address, *types.TipSet) (uint64, error) `perm:"read"`
StateCall func(context.Context, *types.Message, *types.TipSet) (*types.MessageReceipt, error) `perm:"read"`
StateCall func(context.Context, *types.Message, *types.TipSet) (*api.MethodCall, error) `perm:"read"`
StateReplay func(context.Context, *types.TipSet, cid.Cid) (*api.ReplayResults, error) `perm:"read"`
StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"`
StateReadState func(context.Context, *types.Actor, *types.TipSet) (*api.ActorState, error) `perm:"read"`
@ -361,6 +362,10 @@ func (c *FullNodeStruct) ChainGetPath(ctx context.Context, from types.TipSetKey,
return c.Internal.ChainGetPath(ctx, from, to)
}
func (c *FullNodeStruct) ChainExport(ctx context.Context, ts *types.TipSet) (<-chan []byte, error) {
return c.Internal.ChainExport(ctx, ts)
}
func (c *FullNodeStruct) SyncState(ctx context.Context) (*api.SyncState, error) {
return c.Internal.SyncState(ctx)
}
@ -405,7 +410,7 @@ func (c *FullNodeStruct) StateMinerSectorSize(ctx context.Context, actor address
return c.Internal.StateMinerSectorSize(ctx, actor, ts)
}
func (c *FullNodeStruct) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) {
func (c *FullNodeStruct) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.MethodCall, error) {
return c.Internal.StateCall(ctx, msg, ts)
}

View File

@ -98,114 +98,95 @@ func (t *PaymentInfo) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type map")
}
if extra != 3 {
return fmt.Errorf("cbor input had wrong number of fields")
if extra > cbg.MaxLength {
return fmt.Errorf("PaymentInfo: map struct too large (%d)", extra)
}
var name string
n := extra
// t.Channel (address.Address) (struct)
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "Channel" {
return fmt.Errorf("expected struct map entry %s to be Channel", name)
}
{
if err := t.Channel.UnmarshalCBOR(br); err != nil {
return err
}
}
// t.ChannelMessage (cid.Cid) (struct)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "ChannelMessage" {
return fmt.Errorf("expected struct map entry %s to be ChannelMessage", name)
}
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
} else {
c, err := cbg.ReadCid(br)
name = string(sval)
}
switch name {
// t.Channel (address.Address) (struct)
case "Channel":
{
if err := t.Channel.UnmarshalCBOR(br); err != nil {
return err
}
}
// t.ChannelMessage (cid.Cid) (struct)
case "ChannelMessage":
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.ChannelMessage: %w", err)
}
t.ChannelMessage = &c
}
}
// t.Vouchers ([]*types.SignedVoucher) (slice)
case "Vouchers":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.ChannelMessage: %w", err)
return err
}
t.ChannelMessage = &c
if extra > cbg.MaxLength {
return fmt.Errorf("t.Vouchers: array too large (%d)", extra)
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.Vouchers = make([]*types.SignedVoucher, extra)
}
for i := 0; i < int(extra); i++ {
var v types.SignedVoucher
if err := v.UnmarshalCBOR(br); err != nil {
return err
}
t.Vouchers[i] = &v
}
default:
return fmt.Errorf("unknown struct field %d: '%s'", i, name)
}
}
// t.Vouchers ([]*types.SignedVoucher) (slice)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "Vouchers" {
return fmt.Errorf("expected struct map entry %s to be Vouchers", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > cbg.MaxLength {
return fmt.Errorf("t.Vouchers: array too large (%d)", extra)
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.Vouchers = make([]*types.SignedVoucher, extra)
}
for i := 0; i < int(extra); i++ {
var v types.SignedVoucher
if err := v.UnmarshalCBOR(br); err != nil {
return err
}
t.Vouchers[i] = &v
}
return nil
}
func (t *SealedRef) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
@ -276,84 +257,66 @@ func (t *SealedRef) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type map")
}
if extra != 3 {
return fmt.Errorf("cbor input had wrong number of fields")
if extra > cbg.MaxLength {
return fmt.Errorf("SealedRef: map struct too large (%d)", extra)
}
var name string
n := extra
// t.SectorID (uint64) (uint64)
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
name = string(sval)
}
switch name {
// t.SectorID (uint64) (uint64)
case "SectorID":
if name != "SectorID" {
return fmt.Errorf("expected struct map entry %s to be SectorID", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.SectorID = uint64(extra)
// t.Offset (uint64) (uint64)
case "Offset":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.SectorID = uint64(extra)
// t.Offset (uint64) (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Offset = uint64(extra)
// t.Size (uint64) (uint64)
case "Size":
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Size = uint64(extra)
default:
return fmt.Errorf("unknown struct field %d: '%s'", i, name)
}
name = string(sval)
}
if name != "Offset" {
return fmt.Errorf("expected struct map entry %s to be Offset", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Offset = uint64(extra)
// t.Size (uint64) (uint64)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "Size" {
return fmt.Errorf("expected struct map entry %s to be Size", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Size = uint64(extra)
return nil
}
func (t *SealedRefs) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
@ -401,50 +364,56 @@ func (t *SealedRefs) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type map")
}
if extra != 1 {
return fmt.Errorf("cbor input had wrong number of fields")
if extra > cbg.MaxLength {
return fmt.Errorf("SealedRefs: map struct too large (%d)", extra)
}
var name string
n := extra
// t.Refs ([]api.SealedRef) (slice)
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
name = string(sval)
}
switch name {
// t.Refs ([]api.SealedRef) (slice)
case "Refs":
if name != "Refs" {
return fmt.Errorf("expected struct map entry %s to be Refs", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > cbg.MaxLength {
return fmt.Errorf("t.Refs: array too large (%d)", extra)
}
if extra > cbg.MaxLength {
return fmt.Errorf("t.Refs: array too large (%d)", extra)
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.Refs = make([]SealedRef, extra)
}
for i := 0; i < int(extra); i++ {
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.Refs = make([]SealedRef, extra)
}
for i := 0; i < int(extra); i++ {
var v SealedRef
if err := v.UnmarshalCBOR(br); err != nil {
return err
}
var v SealedRef
if err := v.UnmarshalCBOR(br); err != nil {
return err
t.Refs[i] = v
}
default:
return fmt.Errorf("unknown struct field %d: '%s'", i, name)
}
t.Refs[i] = v
}
return nil

View File

@ -65,7 +65,6 @@ func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration) {
defer close(done)
for mine {
time.Sleep(blocktime)
fmt.Println("mining a block now")
if err := sn[0].MineOne(ctx); err != nil {
t.Error(err)
}

View File

@ -13,7 +13,7 @@ import (
"github.com/ipfs/go-hamt-ipld"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-cbor-util"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-sectorbuilder"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/aerrors"

View File

@ -8,13 +8,14 @@ import (
"go.opencensus.io/trace"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
)
func (sm *StateManager) CallRaw(ctx context.Context, msg *types.Message, bstate cid.Cid, r vm.Rand, bheight uint64) (*types.MessageReceipt, error) {
func (sm *StateManager) CallRaw(ctx context.Context, msg *types.Message, bstate cid.Cid, r vm.Rand, bheight uint64) (*api.MethodCall, error) {
ctx, span := trace.StartSpan(ctx, "statemanager.CallRaw")
defer span.End()
@ -54,14 +55,19 @@ func (sm *StateManager) CallRaw(ctx context.Context, msg *types.Message, bstate
return nil, xerrors.Errorf("apply message failed: %w", err)
}
var errs string
if ret.ActorErr != nil {
errs = ret.ActorErr.Error()
log.Warnf("chain call failed: %s", ret.ActorErr)
}
return &ret.MessageReceipt, nil
return &api.MethodCall{
MessageReceipt: ret.MessageReceipt,
Error: errs,
}, nil
}
func (sm *StateManager) Call(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) {
func (sm *StateManager) Call(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.MethodCall, error) {
if ts == nil {
ts = sm.cs.GetHeaviestTipSet()
}

View File

@ -607,3 +607,32 @@ func (sm *StateManager) MarketBalance(ctx context.Context, addr address.Address,
return b[0], nil
}
func (sm *StateManager) ValidateChain(ctx context.Context, ts *types.TipSet) error {
tschain := []*types.TipSet{ts}
for ts.Height() != 0 {
next, err := sm.cs.LoadTipSet(ts.Parents())
if err != nil {
return err
}
tschain = append(tschain, next)
ts = next
}
lastState := tschain[len(tschain)-1].ParentState()
for i := len(tschain) - 1; i >= 0; i-- {
cur := tschain[i]
log.Infof("computing state (height: %d, ts=%s)", cur.Height(), cur.Cids())
if cur.ParentState() != lastState {
return xerrors.Errorf("tipset chain had state mismatch at height %d", cur.Height())
}
st, _, err := sm.TipSetState(ctx, cur)
if err != nil {
return err
}
lastState = st
}
return nil
}

View File

@ -1,10 +1,12 @@
package store
import (
"bytes"
"context"
"crypto/sha256"
"encoding/binary"
"encoding/json"
"io"
"sync"
"github.com/filecoin-project/go-address"
@ -19,11 +21,16 @@ import (
lru "github.com/hashicorp/golang-lru"
block "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
car "github.com/ipfs/go-car"
"github.com/ipfs/go-cid"
dstore "github.com/ipfs/go-datastore"
hamt "github.com/ipfs/go-hamt-ipld"
blockstore "github.com/ipfs/go-ipfs-blockstore"
bstore "github.com/ipfs/go-ipfs-blockstore"
format "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
dag "github.com/ipfs/go-merkledag"
cbg "github.com/whyrusleeping/cbor-gen"
pubsub "github.com/whyrusleeping/pubsub"
"golang.org/x/xerrors"
@ -917,6 +924,84 @@ func (cs *ChainStore) GetTipsetByHeight(ctx context.Context, h uint64, ts *types
}
}
func recurseLinks(bs blockstore.Blockstore, root cid.Cid, in []cid.Cid) ([]cid.Cid, error) {
data, err := bs.Get(root)
if err != nil {
return nil, err
}
top, err := cbg.ScanForLinks(bytes.NewReader(data.RawData()))
if err != nil {
return nil, err
}
in = append(in, top...)
for _, c := range top {
var err error
in, err = recurseLinks(bs, c, in)
if err != nil {
return nil, err
}
}
return in, nil
}
func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, w io.Writer) error {
if ts == nil {
ts = cs.GetHeaviestTipSet()
}
bsrv := blockservice.New(cs.bs, nil)
dserv := dag.NewDAGService(bsrv)
return car.WriteCarWithWalker(ctx, dserv, ts.Cids(), w, func(nd format.Node) ([]*format.Link, error) {
var b types.BlockHeader
if err := b.UnmarshalCBOR(bytes.NewBuffer(nd.RawData())); err != nil {
return nil, err
}
var out []*format.Link
for _, p := range b.Parents {
out = append(out, &format.Link{Cid: p})
}
cids, err := recurseLinks(cs.bs, b.Messages, nil)
if err != nil {
return nil, err
}
for _, c := range cids {
out = append(out, &format.Link{Cid: c})
}
if b.Height == 0 {
cids, err := recurseLinks(cs.bs, b.ParentStateRoot, nil)
if err != nil {
return nil, err
}
for _, c := range cids {
out = append(out, &format.Link{Cid: c})
}
}
return out, nil
})
}
func (cs *ChainStore) Import(r io.Reader) (*types.TipSet, error) {
header, err := car.LoadCar(cs.Blockstore(), r)
if err != nil {
return nil, xerrors.Errorf("loadcar failed: %w", err)
}
root, err := cs.LoadTipSet(types.NewTipSetKey(header.Roots...))
if err != nil {
return nil, xerrors.Errorf("failed to load root tipset from chainfile: %w", err)
}
return root, nil
}
type chainRand struct {
cs *ChainStore
blks []cid.Cid

View File

@ -161,6 +161,10 @@ func DecodeParams(b []byte, out interface{}) error {
}
func DumpActorState(code cid.Cid, b []byte) (interface{}, error) {
if code == actors.AccountCodeCid { // Account code special case
return nil, nil
}
i := newInvoker() // TODO: register builtins in init block
typ, ok := i.builtInState[code]

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"time"
@ -26,6 +27,7 @@ var chainCmd = &cli.Command{
chainSetHeadCmd,
chainListCmd,
chainGetCmd,
chainExportCmd,
},
}
@ -172,6 +174,10 @@ var chainGetMsgCmd = &cli.Command{
Name: "getmessage",
Usage: "Get and print a message by its cid",
Action: func(cctx *cli.Context) error {
if !cctx.Args().Present() {
return fmt.Errorf("must pass a cid of a message to get")
}
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
@ -385,3 +391,50 @@ func printTipSet(format string, ts *types.TipSet) {
fmt.Println(format)
}
var chainExportCmd = &cli.Command{
Name: "export",
Usage: "export chain to a car file",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "tipset",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
if !cctx.Args().Present() {
return fmt.Errorf("must specify filename to export chain to")
}
fi, err := os.Create(cctx.Args().First())
if err != nil {
return err
}
defer fi.Close()
ts, err := loadTipSet(ctx, cctx, api)
if err != nil {
return err
}
stream, err := api.ChainExport(ctx, ts)
if err != nil {
return err
}
for b := range stream {
_, err := fi.Write(b)
if err != nil {
return err
}
}
return nil
},
}

View File

@ -202,6 +202,7 @@ var Commands = []*cli.Command{
clientCmd,
fetchParamCmd,
mpoolCmd,
multisigCmd,
netCmd,
paychCmd,
sendCmd,

View File

@ -124,7 +124,8 @@ var mpoolStat = &cli.Command{
for a, bkt := range buckets {
act, err := api.StateGetActor(ctx, a, ts)
if err != nil {
return err
fmt.Printf("%s, err: %s\n", a, err)
continue
}
cur := act.Nonce

400
cli/multisig.go Normal file
View File

@ -0,0 +1,400 @@
package cli
import (
"bytes"
"encoding/hex"
"fmt"
"os"
"strconv"
"text/tabwriter"
"github.com/filecoin-project/go-address"
actors "github.com/filecoin-project/lotus/chain/actors"
types "github.com/filecoin-project/lotus/chain/types"
cbg "github.com/whyrusleeping/cbor-gen"
"gopkg.in/urfave/cli.v2"
)
var multisigCmd = &cli.Command{
Name: "msig",
Usage: "Interact with a multisig wallet",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "source",
Usage: "specify the account to send propose from",
},
},
Subcommands: []*cli.Command{
msigCreateCmd,
msigInspectCmd,
msigProposeCmd,
msigApproveCmd,
},
}
var msigCreateCmd = &cli.Command{
Name: "create",
Usage: "Create a new multisig wallet",
Flags: []cli.Flag{
&cli.Uint64Flag{
Name: "required",
},
&cli.StringFlag{
Name: "value",
Usage: "initial funds to give to multisig",
Value: "0",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
var addrs []address.Address
for _, a := range cctx.Args().Slice() {
addr, err := address.NewFromString(a)
if err != nil {
return err
}
addrs = append(addrs, addr)
}
// get the address we're going to use to create the multisig (can be one of the above, as long as they have funds)
sendAddr, err := api.WalletDefaultAddress(ctx)
if err != nil {
return err
}
val := cctx.String("value")
filval, err := types.ParseFIL(val)
if err != nil {
return err
}
required := cctx.Uint64("required")
if required == 0 {
required = uint64(len(addrs))
}
// Set up constructor parameters for multisig
msigParams := &actors.MultiSigConstructorParams{
Signers: addrs,
Required: required,
}
enc, err := actors.SerializeParams(msigParams)
if err != nil {
return err
}
// new actors are created by invoking 'exec' on the init actor with the constructor params
execParams := &actors.ExecParams{
Code: actors.MultisigCodeCid,
Params: enc,
}
enc, err = actors.SerializeParams(execParams)
if err != nil {
return err
}
// now we create the message to send this with
msg := types.Message{
To: actors.InitAddress,
From: sendAddr,
Method: actors.IAMethods.Exec,
Params: enc,
GasPrice: types.NewInt(1),
GasLimit: types.NewInt(1000000),
Value: types.BigInt(filval),
}
// send the message out to the network
smsg, err := api.MpoolPushMessage(ctx, &msg)
if err != nil {
return err
}
// wait for it to get mined into a block
wait, err := api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return err
}
// check it executed successfully
if wait.Receipt.ExitCode != 0 {
fmt.Println("actor creation failed!")
return err
}
// get address of newly created miner
msigaddr, err := address.NewFromBytes(wait.Receipt.Return)
if err != nil {
return err
}
fmt.Println("Created new multisig: ", msigaddr.String())
// TODO: maybe register this somewhere
return nil
},
}
var msigInspectCmd = &cli.Command{
Name: "inspect",
Usage: "Inspect a multisig wallet",
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
if !cctx.Args().Present() {
return fmt.Errorf("must specify address of multisig to inspect")
}
maddr, err := address.NewFromString(cctx.Args().First())
if err != nil {
return err
}
act, err := api.StateGetActor(ctx, maddr, nil)
if err != nil {
return err
}
obj, err := api.ChainReadObj(ctx, act.Head)
if err != nil {
return err
}
var mstate actors.MultiSigActorState
if err := mstate.UnmarshalCBOR(bytes.NewReader(obj)); err != nil {
return err
}
fmt.Printf("Balance: %sfil\n", types.FIL(act.Balance))
fmt.Printf("Threshold: %d / %d\n", mstate.Required, len(mstate.Signers))
fmt.Println("Signers:")
for _, s := range mstate.Signers {
fmt.Printf("\t%s\n", s)
}
fmt.Println("Transactions: ", len(mstate.Transactions))
if len(mstate.Transactions) > 0 {
w := tabwriter.NewWriter(os.Stdout, 8, 4, 0, ' ', 0)
fmt.Fprintf(w, "ID\tState\tTo\tValue\tMethod\tParams\n")
for _, tx := range mstate.Transactions {
fmt.Fprintf(w, "%d\t%s\t%s\t%s\t%d\t%x\n", tx.TxID, state(tx), tx.To, types.FIL(tx.Value), tx.Method, tx.Params)
}
w.Flush()
}
return nil
},
}
func state(tx actors.MTransaction) string {
if tx.Complete {
return "done"
}
if tx.Canceled {
return "canceled"
}
return "pending"
}
var msigProposeCmd = &cli.Command{
Name: "propose",
Usage: "Propose a multisig transaction",
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
if cctx.Args().Len() < 3 {
return fmt.Errorf("must pass multisig address, destination, and value")
}
if cctx.Args().Len() > 3 && cctx.Args().Len() != 5 {
return fmt.Errorf("usage: msig propose <msig addr> <desination> <value> [ <method> <params> ]")
}
msig, err := address.NewFromString(cctx.Args().Get(0))
if err != nil {
return err
}
dest, err := address.NewFromString(cctx.Args().Get(1))
if err != nil {
return err
}
value, err := types.ParseFIL(cctx.Args().Get(2))
if err != nil {
return err
}
var method uint64
var params []byte
if cctx.Args().Len() == 5 {
m, err := strconv.ParseUint(cctx.Args().Get(3), 10, 64)
if err != nil {
return err
}
method = m
p, err := hex.DecodeString(cctx.Args().Get(4))
if err != nil {
return err
}
params = p
}
enc, err := actors.SerializeParams(&actors.MultiSigProposeParams{
To: dest,
Value: types.BigInt(value),
Method: method,
Params: params,
})
if err != nil {
return err
}
var from address.Address
if cctx.IsSet("source") {
f, err := address.NewFromString(cctx.String("source"))
if err != nil {
return err
}
from = f
} else {
defaddr, err := api.WalletDefaultAddress(ctx)
if err != nil {
return err
}
from = defaddr
}
msg := &types.Message{
To: msig,
From: from,
Value: types.NewInt(0),
Method: actors.MultiSigMethods.Propose,
Params: enc,
GasLimit: types.NewInt(100000),
GasPrice: types.NewInt(1),
}
smsg, err := api.MpoolPushMessage(ctx, msg)
if err != nil {
return err
}
fmt.Println("send proposal in message: ", smsg.Cid())
wait, err := api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return err
}
if wait.Receipt.ExitCode != 0 {
return fmt.Errorf("proposal returned exit %d", wait.Receipt.ExitCode)
}
_, v, err := cbg.CborReadHeader(bytes.NewReader(wait.Receipt.Return))
if err != nil {
return err
}
fmt.Printf("Transaction ID: %d\n", v)
return nil
},
}
var msigApproveCmd = &cli.Command{
Name: "approve",
Usage: "Approve a multisig transaction",
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
if cctx.Args().Len() < 2 {
return fmt.Errorf("must pass multisig address and transaction ID")
}
msig, err := address.NewFromString(cctx.Args().Get(0))
if err != nil {
return err
}
txid, err := strconv.ParseUint(cctx.Args().Get(1), 10, 64)
if err != nil {
return err
}
enc, err := actors.SerializeParams(&actors.MultiSigTxID{
TxID: txid,
})
if err != nil {
return err
}
var from address.Address
if cctx.IsSet("source") {
f, err := address.NewFromString(cctx.String("source"))
if err != nil {
return err
}
from = f
} else {
defaddr, err := api.WalletDefaultAddress(ctx)
if err != nil {
return err
}
from = defaddr
}
msg := &types.Message{
To: msig,
From: from,
Value: types.NewInt(0),
Method: actors.MultiSigMethods.Approve,
Params: enc,
GasLimit: types.NewInt(100000),
GasPrice: types.NewInt(1),
}
smsg, err := api.MpoolPushMessage(ctx, msg)
if err != nil {
return err
}
fmt.Println("sent approval in message: ", smsg.Cid())
wait, err := api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return err
}
if wait.Receipt.ExitCode != 0 {
return fmt.Errorf("approve returned exit %d", wait.Receipt.ExitCode)
}
return nil
},
}

View File

@ -15,7 +15,7 @@ var sendCmd = &cli.Command{
Flags: []cli.Flag{
&cli.StringFlag{
Name: "source",
Usage: "optinally specifiy the account to send funds from",
Usage: "optionally specify the account to send funds from",
},
},
Action: func(cctx *cli.Context) error {

View File

@ -1,18 +1,24 @@
package cli
import (
"bytes"
"context"
"encoding/json"
"fmt"
"reflect"
"strconv"
"strings"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
actors "github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/miner"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"
"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
"gopkg.in/urfave/cli.v2"
)
@ -39,6 +45,7 @@ var stateCmd = &cli.Command{
stateReadStateCmd,
stateListMessagesCmd,
stateComputeStateCmd,
stateCallCmd,
},
}
@ -528,6 +535,10 @@ var stateListMessagesCmd = &cli.Command{
Name: "toheight",
Usage: "don't look before given block height",
},
&cli.BoolFlag{
Name: "cids",
Usage: "print message CIDs instead of messages",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
@ -568,6 +579,11 @@ var stateListMessagesCmd = &cli.Command{
}
for _, c := range msgs {
if cctx.Bool("cids") {
fmt.Println(c.String())
continue
}
m, err := api.ChainGetMessage(ctx, c)
if err != nil {
return err
@ -648,3 +664,209 @@ var stateComputeStateCmd = &cli.Command{
return nil
},
}
var stateCallCmd = &cli.Command{
Name: "call",
Usage: "Invoke a method on an actor locally",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "from",
Usage: "",
Value: actors.NetworkAddress.String(),
},
&cli.StringFlag{
Name: "value",
Usage: "specify value field for invocation",
Value: "0",
},
&cli.StringFlag{
Name: "ret",
Usage: "specify how to parse output (auto, raw, addr, big)",
Value: "auto",
},
},
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() < 2 {
return fmt.Errorf("must specify at least actor and method to invoke")
}
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
toa, err := address.NewFromString(cctx.Args().First())
if err != nil {
return fmt.Errorf("given 'to' address %q was invalid: %w", cctx.Args().First(), err)
}
froma, err := address.NewFromString(cctx.String("from"))
if err != nil {
return fmt.Errorf("given 'from' address %q was invalid: %w", cctx.String("from"), err)
}
ts, err := loadTipSet(ctx, cctx, api)
if err != nil {
return err
}
method, err := strconv.ParseUint(cctx.Args().Get(1), 10, 64)
if err != nil {
return fmt.Errorf("must pass method as a number")
}
value, err := types.ParseFIL(cctx.String("value"))
if err != nil {
return fmt.Errorf("failed to parse 'value': %s", err)
}
act, err := api.StateGetActor(ctx, toa, ts)
if err != nil {
return fmt.Errorf("failed to lookup target actor: %s", err)
}
params, err := parseParamsForMethod(act.Code, method, cctx.Args().Slice()[2:])
if err != nil {
return fmt.Errorf("failed to parse params: %s", err)
}
ret, err := api.StateCall(ctx, &types.Message{
From: froma,
To: toa,
Value: types.BigInt(value),
GasLimit: types.NewInt(10000000000),
GasPrice: types.NewInt(0),
Method: method,
Params: params,
}, ts)
if err != nil {
return fmt.Errorf("state call failed: %s", err)
}
if ret.ExitCode != 0 {
return fmt.Errorf("invocation failed (exit: %d): %s", ret.ExitCode, ret.Error)
}
s, err := formatOutput(cctx.String("ret"), ret.Return)
if err != nil {
return fmt.Errorf("failed to format output: %s", err)
}
fmt.Printf("return: %s\n", s)
return nil
},
}
func formatOutput(t string, val []byte) (string, error) {
switch t {
case "raw", "hex":
return fmt.Sprintf("%x", val), nil
case "address", "addr", "a":
a, err := address.NewFromBytes(val)
if err != nil {
return "", err
}
return a.String(), nil
case "big", "int", "bigint":
bi := types.BigFromBytes(val)
return bi.String(), nil
case "fil":
bi := types.FIL(types.BigFromBytes(val))
return bi.String(), nil
case "pid", "peerid", "peer":
pid, err := peer.IDFromBytes(val)
if err != nil {
return "", err
}
return pid.Pretty(), nil
case "auto":
if len(val) == 0 {
return "", nil
}
a, err := address.NewFromBytes(val)
if err == nil {
return "address: " + a.String(), nil
}
pid, err := peer.IDFromBytes(val)
if err == nil {
return "peerID: " + pid.Pretty(), nil
}
bi := types.BigFromBytes(val)
return "bigint: " + bi.String(), nil
default:
return "", fmt.Errorf("unrecognized output type: %q", t)
}
}
func parseParamsForMethod(act cid.Cid, method uint64, args []string) ([]byte, error) {
if len(args) == 0 {
return nil, nil
}
var f interface{}
switch act {
case actors.StorageMarketCodeCid:
f = actors.StorageMarketActor{}.Exports()[method]
case actors.StorageMinerCodeCid:
f = actors.StorageMinerActor{}.Exports()[method]
case actors.StoragePowerCodeCid:
f = actors.StoragePowerActor{}.Exports()[method]
case actors.MultisigCodeCid:
f = actors.MultiSigActor{}.Exports()[method]
case actors.PaymentChannelCodeCid:
f = actors.PaymentChannelActor{}.Exports()[method]
default:
return nil, fmt.Errorf("the lazy devs didnt add support for that actor to this call yet")
}
rf := reflect.TypeOf(f)
if rf.NumIn() != 3 {
return nil, fmt.Errorf("expected referenced method to have three arguments")
}
paramObj := rf.In(2).Elem()
if paramObj.NumField() != len(args) {
return nil, fmt.Errorf("not enough arguments given to call that method (expecting %d)", paramObj.NumField())
}
p := reflect.New(paramObj)
for i := 0; i < len(args); i++ {
switch paramObj.Field(i).Type {
case reflect.TypeOf(address.Address{}):
a, err := address.NewFromString(args[i])
if err != nil {
return nil, fmt.Errorf("failed to parse address: %s", err)
}
p.Elem().Field(i).Set(reflect.ValueOf(a))
case reflect.TypeOf(uint64(0)):
val, err := strconv.ParseUint(args[i], 10, 64)
if err != nil {
return nil, err
}
p.Elem().Field(i).Set(reflect.ValueOf(val))
case reflect.TypeOf(peer.ID("")):
pid, err := peer.IDB58Decode(args[i])
if err != nil {
return nil, fmt.Errorf("failed to parse peer ID: %s", err)
}
p.Elem().Field(i).Set(reflect.ValueOf(pid))
default:
return nil, fmt.Errorf("unsupported type for call (TODO): %s", paramObj.Field(i).Type)
}
}
m := p.Interface().(cbg.CBORMarshaler)
buf := new(bytes.Buffer)
if err := m.MarshalCBOR(buf); err != nil {
return nil, fmt.Errorf("failed to marshal param object: %s", err)
}
return buf.Bytes(), nil
}

View File

@ -2,16 +2,18 @@ package main
import (
"database/sql"
"encoding/hex"
"fmt"
"github.com/filecoin-project/lotus/api"
"golang.org/x/xerrors"
"sync"
"time"
"github.com/filecoin-project/go-address"
"github.com/ipfs/go-cid"
_ "github.com/lib/pq"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
)
@ -75,8 +77,8 @@ create table if not exists blocks
eprof bytea,
prand bytea,
ep0partial bytea,
ep0sector bigint not null,
ep0challangei bigint not null
ep0sector numeric not null,
ep0challangei numeric not null
);
create unique index if not exists block_cid_uindex
@ -117,6 +119,22 @@ create index if not exists id_address_map_address_index
create index if not exists id_address_map_id_index
on id_address_map (id);
create table if not exists actor_states
(
head text not null,
code text not null,
state json not null
);
create unique index if not exists actor_states_head_code_uindex
on actor_states (head, code);
create index if not exists actor_states_head_index
on actor_states (head);
create index if not exists actor_states_code_head_index
on actor_states (head, code);
create table if not exists messages
(
cid text not null
@ -124,11 +142,11 @@ create table if not exists messages
primary key,
"from" text not null,
"to" text not null,
nonce int not null,
nonce bigint not null,
value text not null,
gasprice int not null,
gaslimit int not null,
method int,
gasprice bigint not null,
gaslimit bigint not null,
method bigint,
params bytea
);
@ -184,7 +202,7 @@ create table if not exists miner_heads
worker text not null,
peerid text not null,
sectorsize bigint not null,
power bigint not null,
power decimal not null,
active bool,
ppe bigint not null,
slashed_at bigint not null,
@ -192,6 +210,33 @@ create table if not exists miner_heads
primary key (head, addr)
);
create table if not exists deals
(
id int not null,
pieceRef text not null,
pieceSize bigint not null,
client text not null,
provider text not null,
expiration decimal not null,
duration decimal not null,
epochPrice decimal not null,
collateral decimal not null,
constraint deals_pk
primary key (id)
);
create index if not exists deals_client_index
on deals (client);
create unique index if not exists deals_id_uindex
on deals (id);
create index if not exists deals_pieceRef_index
on deals (pieceRef);
create index if not exists deals_provider_index
on deals (provider);
`)
if err != nil {
return err
@ -226,17 +271,15 @@ func (st *storage) hasList() map[cid.Cid]struct{} {
return out
}
func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Cid) error {
func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorInfo) error {
// Basic
tx, err := st.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`
create temp table a (like actors excluding constraints) on commit drop;
`); err != nil {
create temp table a (like actors excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
@ -247,7 +290,7 @@ create temp table a (like actors excluding constraints) on commit drop;
for addr, acts := range actors {
for act, st := range acts {
if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String(), st.String()); err != nil {
if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String(), st.stateroot.String()); err != nil {
return err
}
}
@ -261,7 +304,47 @@ create temp table a (like actors excluding constraints) on commit drop;
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit()
if err := tx.Commit(); err != nil {
return err
}
// States
tx, err = st.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`
create temp table a (like actor_states excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err = tx.Prepare(`copy a (head, code, state) from stdin `)
if err != nil {
return err
}
for _, acts := range actors {
for act, st := range acts {
if _, err := stmt.Exec(act.Head.String(), act.Code.String(), st.state); err != nil {
return err
}
}
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into actor_states select * from a on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error {
@ -602,11 +685,8 @@ func (st *storage) storeMpoolInclusions(msgs []api.MpoolUpdate) error {
}
if _, err := tx.Exec(`
create temp table mi (like mpool_messages excluding constraints) on commit drop;
`); err != nil {
create temp table mi (like mpool_messages excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
@ -639,6 +719,59 @@ create temp table mi (like mpool_messages excluding constraints) on commit drop;
return tx.Commit()
}
func (st *storage) storeDeals(deals map[string]actors.OnChainDeal) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`
create temp table d (like deals excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy d (id, pieceref, piecesize, client, "provider", expiration, duration, epochprice, collateral) from stdin `)
if err != nil {
return err
}
var bloat uint64
for id, deal := range deals {
if len(deal.PieceRef) > 40 {
bloat += uint64(len(deal.PieceRef))
continue
}
if _, err := stmt.Exec(
id,
hex.EncodeToString(deal.PieceRef),
deal.PieceSize,
deal.Client.String(),
deal.Provider.String(),
fmt.Sprint(deal.ProposalExpiration),
fmt.Sprint(deal.Duration),
deal.StoragePricePerEpoch.String(),
deal.StorageCollateral.String(),
); err != nil {
return err
}
}
if bloat > 0 {
log.Warnf("deal PieceRefs had %d bytes of garbage", bloat)
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into deals select * from d on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit()
}
func (st *storage) close() error {
return st.db.Close()
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"container/list"
"context"
"encoding/json"
"math"
"sync"
@ -60,9 +61,14 @@ type minerInfo struct {
psize uint64
}
type actorInfo struct {
stateroot cid.Cid
state string
}
func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipSet) {
addresses := map[address.Address]address.Address{}
actors := map[address.Address]map[types.Actor]cid.Cid{}
actors := map[address.Address]map[types.Actor]actorInfo{}
var alk sync.Mutex
log.Infof("Getting synced block list")
@ -150,12 +156,26 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
log.Error(err)
return
}
ast, err := api.StateReadState(ctx, act, ts)
if err != nil {
log.Error(err)
return
}
state, err := json.Marshal(ast.State)
if err != nil {
log.Error(err)
return
}
alk.Lock()
_, ok := actors[addr]
if !ok {
actors[addr] = map[types.Actor]cid.Cid{}
actors[addr] = map[types.Actor]actorInfo{}
}
actors[addr][*act] = actorInfo{
stateroot: bh.ParentStateRoot,
state: string(state),
}
actors[addr][*act] = bh.ParentStateRoot
addresses[addr] = address.Undef
alk.Unlock()
})
@ -181,13 +201,26 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
log.Error(err)
return
}
ast, err := api.StateReadState(ctx, &act, ts)
if err != nil {
log.Error(err)
return
}
state, err := json.Marshal(ast.State)
if err != nil {
log.Error(err)
return
}
alk.Lock()
_, ok := actors[addr]
if !ok {
actors[addr] = map[types.Actor]cid.Cid{}
actors[addr] = map[types.Actor]actorInfo{}
}
actors[addr][act] = actorInfo{
stateroot: bh.ParentStateRoot,
state: string(state),
}
actors[addr][act] = bh.ParentStateRoot
addresses[addr] = address.Undef
alk.Unlock()
}
@ -228,7 +261,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
miners[minerKey{
addr: addr,
act: actor,
stateroot: c,
stateroot: c.stateroot,
}] = &minerInfo{}
}
}
@ -322,6 +355,22 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
log.Infof("Sync stage done")
}
log.Infof("Get deals")
// TODO: incremental, gather expired
deals, err := api.StateMarketDeals(ctx, ts)
if err != nil {
log.Error(err)
return
}
log.Infof("Store deals")
if err := st.storeDeals(deals); err != nil {
log.Error(err)
return
}
log.Infof("Sync done")
}

View File

@ -111,7 +111,7 @@ var watchHeadCmd = &cli.Command{
}()
for {
ok := checkWindow(ch, int(interval))
ok := checkWindow(ch, threshold)
if !ok {
log.Warn("chain head has not updated. Restarting systemd service")
aCh <- nil
@ -135,14 +135,14 @@ func checkWindow(ch chan CidWindow, t int) bool {
windowLen := len(window)
if windowLen >= t {
cidWindow:
for i, cids := range window {
for i := range window {
next := windowLen - 1 - i
// if array length is different, head is changing
if next >= 1 && len(window[next]) != len(window[next-1]) {
break cidWindow
}
// if cids are different, head is changing
for j := range cids {
for j := range window[next] {
if next >= 1 && window[next][j] != window[next-1][j] {
break cidWindow
}
@ -161,9 +161,9 @@ func checkWindow(ch chan CidWindow, t int) bool {
}
/*
* reads channel of slices of slices of Cids
* compares Cids when len of window is greater or equal to `t` - threshold
* if all slices are the equal, head has not updated and returns false
* get chain head from API
* returns a slice of slices of Cids
* len of slice <= `t` - threshold
*/
func updateWindow(ctx context.Context, a api.FullNode, w CidWindow, t int, ch chan CidWindow) (CidWindow, error) {
head, err := a.ChainHead(ctx)

View File

@ -31,6 +31,7 @@ func TestCheckWindow(t *testing.T) {
ch := make(chan CidWindow, 1)
och := make(chan bool, 1)
threshold := 3
go func() {
och <- checkWindow(ch, threshold)
}()
@ -51,6 +52,107 @@ func TestCheckWindow(t *testing.T) {
case ok := <-och:
assert.True(ok)
}
go func() {
och <- checkWindow(ch, threshold)
}()
var healthyHeadCheckWindow1 CidWindow
healthyHeadCheckWindow1 = appendCIDsToWindow(healthyHeadCheckWindow1, []cid.Cid{
makeCID("bbcd"),
makeCID("bbfe"),
}, threshold)
healthyHeadCheckWindow1 = appendCIDsToWindow(healthyHeadCheckWindow1, []cid.Cid{
makeCID("bbcd"),
makeCID("bbfe"),
makeCID("abcd"),
}, threshold)
healthyHeadCheckWindow1 = appendCIDsToWindow(healthyHeadCheckWindow1, []cid.Cid{
makeCID("abcd"),
}, threshold)
ch <- healthyHeadCheckWindow1
select {
case ok := <-och:
assert.True(ok)
}
go func() {
och <- checkWindow(ch, threshold)
}()
var healthyHeadCheckWindow2 CidWindow
healthyHeadCheckWindow2 = appendCIDsToWindow(healthyHeadCheckWindow2, []cid.Cid{
makeCID("bbcd"),
makeCID("bbfe"),
}, threshold)
healthyHeadCheckWindow2 = appendCIDsToWindow(healthyHeadCheckWindow2, []cid.Cid{
makeCID("abcd"),
}, threshold)
ch <- healthyHeadCheckWindow2
select {
case ok := <-och:
assert.True(ok)
}
go func() {
och <- checkWindow(ch, threshold)
}()
var healthyHeadCheckWindow3 CidWindow
healthyHeadCheckWindow3 = appendCIDsToWindow(healthyHeadCheckWindow3, []cid.Cid{
makeCID("abcd"),
}, threshold)
healthyHeadCheckWindow3 = appendCIDsToWindow(healthyHeadCheckWindow3, []cid.Cid{
makeCID("bbcd"),
makeCID("bbfe"),
}, threshold)
ch <- healthyHeadCheckWindow3
select {
case ok := <-och:
assert.True(ok)
}
go func() {
och <- checkWindow(ch, threshold)
}()
var healthyHeadCheckWindow4 CidWindow
healthyHeadCheckWindow4 = appendCIDsToWindow(healthyHeadCheckWindow4, []cid.Cid{
makeCID("bbcd"),
makeCID("bbfe"),
}, threshold)
ch <- healthyHeadCheckWindow4
select {
case ok := <-och:
assert.True(ok)
}
go func() {
och <- checkWindow(ch, 5)
}()
var healthyHeadCheckWindow5 CidWindow
healthyHeadCheckWindow5 = appendCIDsToWindow(healthyHeadCheckWindow5, []cid.Cid{
makeCID("bbcd"),
makeCID("bbfe"),
makeCID("bbff"),
}, 5)
healthyHeadCheckWindow5 = appendCIDsToWindow(healthyHeadCheckWindow5, []cid.Cid{
makeCID("bbcd"),
makeCID("bbfe"),
}, 5)
healthyHeadCheckWindow5 = appendCIDsToWindow(healthyHeadCheckWindow5, []cid.Cid{
makeCID("abcd"),
}, 5)
healthyHeadCheckWindow5 = appendCIDsToWindow(healthyHeadCheckWindow5, []cid.Cid{
makeCID("cbcd"),
makeCID("cbfe"),
}, 5)
healthyHeadCheckWindow5 = appendCIDsToWindow(healthyHeadCheckWindow5, []cid.Cid{
makeCID("cbcd"),
makeCID("cbfe"),
}, 5)
ch <- healthyHeadCheckWindow5
select {
case ok := <-och:
assert.True(ok)
}
go func() {
och <- checkWindow(ch, threshold)
}()
@ -73,6 +175,42 @@ func TestCheckWindow(t *testing.T) {
assert.False(ok)
}
go func() {
och <- checkWindow(ch, threshold)
}()
var unhealthyHeadCheckWindow1 CidWindow
unhealthyHeadCheckWindow1 = appendCIDsToWindow(unhealthyHeadCheckWindow1, []cid.Cid{
makeCID("abcd"),
makeCID("fbcd"),
}, threshold)
unhealthyHeadCheckWindow1 = appendCIDsToWindow(unhealthyHeadCheckWindow1, []cid.Cid{
makeCID("abcd"),
makeCID("fbcd"),
}, threshold)
ch <- unhealthyHeadCheckWindow1
select {
case ok := <-och:
assert.True(ok)
}
go func() {
och <- checkWindow(ch, threshold)
}()
var unhealthyHeadCheckWindow2 CidWindow
unhealthyHeadCheckWindow2 = appendCIDsToWindow(unhealthyHeadCheckWindow2, []cid.Cid{
makeCID("abcd"),
}, threshold)
unhealthyHeadCheckWindow2 = appendCIDsToWindow(unhealthyHeadCheckWindow2, []cid.Cid{
makeCID("abcd"),
}, threshold)
unhealthyHeadCheckWindow2 = appendCIDsToWindow(unhealthyHeadCheckWindow2, []cid.Cid{
makeCID("abcd"),
}, threshold)
ch <- unhealthyHeadCheckWindow2
select {
case ok := <-och:
assert.False(ok)
}
}
func makeCID(s string) cid.Cid {

View File

@ -5,14 +5,20 @@ package main
import (
"context"
"io/ioutil"
"os"
paramfetch "github.com/filecoin-project/go-paramfetch"
"github.com/filecoin-project/go-sectorbuilder"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/multiformats/go-multiaddr"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/testing"
@ -56,6 +62,14 @@ var DaemonCmd = &cli.Command{
Name: "bootstrap",
Value: true,
},
&cli.StringFlag{
Name: "import-chain",
Usage: "on first run, load chain from given file",
},
&cli.BoolFlag{
Name: "halt-after-import",
Usage: "halt the process after importing chain from file",
},
},
Action: func(cctx *cli.Context) error {
ctx := context.Background()
@ -81,6 +95,16 @@ var DaemonCmd = &cli.Command{
}
}
chainfile := cctx.String("import-chain")
if chainfile != "" {
if err := ImportChain(r, chainfile); err != nil {
return err
}
if cctx.Bool("halt-after-import") {
return nil
}
}
genesis := node.Options()
if len(genBytes) > 0 {
genesis = node.Override(new(modules.Genesis), modules.LoadGenesis(genBytes))
@ -129,3 +153,50 @@ var DaemonCmd = &cli.Command{
return serveRPC(api, stop, endpoint)
},
}
func ImportChain(r repo.Repo, fname string) error {
fi, err := os.Open(fname)
if err != nil {
return err
}
lr, err := r.Lock(repo.FullNode)
if err != nil {
return err
}
defer lr.Close()
ds, err := lr.Datastore("/blocks")
if err != nil {
return err
}
mds, err := lr.Datastore("/metadata")
if err != nil {
return err
}
bs := blockstore.NewBlockstore(ds)
cst := store.NewChainStore(bs, mds, vm.Syscalls(sectorbuilder.ProofVerifier))
log.Info("importing chain from file...")
ts, err := cst.Import(fi)
if err != nil {
return xerrors.Errorf("importing chain failed: %w", err)
}
stm := stmgr.NewStateManager(cst)
log.Infof("validating imported chain...")
if err := stm.ValidateChain(context.TODO(), ts); err != nil {
return xerrors.Errorf("chain validation failed: %w", err)
}
log.Info("accepting %s as new head", ts.Cids())
if err := cst.SetHead(ts); err != nil {
return err
}
return nil
}

View File

@ -122,7 +122,7 @@ func main() {
os.Exit(1)
}
err = gen.WriteMapEncodersToFile("./storage/sectors/cbor_gen.go", "sectors",
err = gen.WriteMapEncodersToFile("./storage/sealing/cbor_gen.go", "sealing",
sealing.SealTicket{},
sealing.SealSeed{},
sealing.Piece{},

4
go.mod
View File

@ -32,7 +32,7 @@ require (
github.com/ipfs/go-bitswap v0.1.8
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c
github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1
github.com/ipfs/go-car v0.0.3-0.20200121013634-f188c0e24291
github.com/ipfs/go-cid v0.0.4
github.com/ipfs/go-datastore v0.3.1
github.com/ipfs/go-ds-badger2 v0.0.0-20200108185345-7f650e6b2521
@ -86,7 +86,7 @@ require (
github.com/prometheus/common v0.2.0
github.com/stretchr/testify v1.4.0
github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba
github.com/whyrusleeping/cbor-gen v0.0.0-20200106232624-282db0d37dbe
github.com/whyrusleeping/cbor-gen v0.0.0-20200121162646-b63bacf5eaf8
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
github.com/whyrusleeping/pubsub v0.0.0-20131020042734-02de8aa2db3d
go.opencensus.io v0.22.2

26
go.sum
View File

@ -106,27 +106,11 @@ github.com/filecoin-project/go-amt-ipld/v2 v2.0.0/go.mod h1:PAZ5tvSfMfWE327osqFX
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8=
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce h1:Jdejrx6XVSTRy2PiX08HCU5y68p3wx2hNMJJc/J7kZY=
github.com/filecoin-project/go-data-transfer v0.0.0-20191219005021-4accf56bd2ce/go.mod h1:b14UWxhxVCAjrQUYvVGrQRRsjAh79wXYejw9RbUcAww=
github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8 h1:g3oodvSz+Ou+ObwcVBB2wyt8SHdWpwzMiNJ19U1zZNA=
github.com/filecoin-project/go-fil-markets v0.0.0-20200114015428-74d100f305f8/go.mod h1:c8NTjvFVy1Ud02mmGDjOiMeawY2t6ALfrrdvAB01FQc=
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA=
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA=
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA=
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA=
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA=
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU=
github.com/filecoin-project/go-paramfetch v0.0.1 h1:gV7bs5YaqlgpGFMiLxInGK2L1FyCXUE0rimz4L7ghoE=
github.com/filecoin-project/go-paramfetch v0.0.1/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
@ -134,8 +118,6 @@ github.com/filecoin-project/go-sectorbuilder v0.0.1/go.mod h1:3OZ4E3B2OuwhJjtxR4
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200114015900-4103afa82689 h1:2cT5bhm/5I0RY+HBIPdRRrtjCwLj33Qx6DHRs9TCslY=
github.com/filecoin-project/go-sectorbuilder v0.0.2-0.20200114015900-4103afa82689/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
@ -227,8 +209,9 @@ github.com/ipfs/go-blockservice v0.0.7/go.mod h1:EOfb9k/Y878ZTRY/CH0x5+ATtaipfbR
github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7sYcvYaKbop39M=
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c h1:lN5IQA07VtLiTLAp/Scezp1ljFhXErC6yq4O1cu+yJ0=
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I=
github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1 h1:Nq8xEW+2KZq7IkRlkOh0rTEUI8FgunhMoLj5EMkJzbQ=
github.com/ipfs/go-car v0.0.3-0.20191203022317-23b0a85fd1b1/go.mod h1:rmd887mJxQRDfndfDEY3Liyx8gQVyfFFRSHdsnDSAlk=
github.com/ipfs/go-car v0.0.3-0.20200121013634-f188c0e24291 h1:Yy0dcFWw8oDV/WJ4S/rkMQRWnJ3tGr9EbgDDv2JhVQw=
github.com/ipfs/go-car v0.0.3-0.20200121013634-f188c0e24291/go.mod h1:AG6sBpd2PWMccpAG7XLFBBQ/4rfBEtzUNeO2GSMesYk=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
@ -320,6 +303,7 @@ github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb h1:tmWYgjltxwM7PD
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb/go.mod h1:IwAAgul1UQIcNZzKPYZWOCijryFBeCV79cNubPzol+k=
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
github.com/ipld/go-ipld-prime v0.0.1/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w=
github.com/ipld/go-ipld-prime v0.0.2-0.20191108012745-28a82f04c785 h1:fASnkvtR+SmB2y453RxmDD3Uvd4LonVUgFGk9JoDaZs=
github.com/ipld/go-ipld-prime v0.0.2-0.20191108012745-28a82f04c785/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w=
github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5 h1:lSip43rAdyGA+yRQuy6ju0ucZkWpYc1F2CTQtZTVW/4=
@ -727,8 +711,8 @@ github.com/whyrusleeping/cbor-gen v0.0.0-20190917003517-d78d67427694/go.mod h1:x
github.com/whyrusleeping/cbor-gen v0.0.0-20191116002219-891f55cd449d/go.mod h1:xdlJQaiqipF0HW+Mzpg7XRM3fWbGvfgFlcppuvlkIvY=
github.com/whyrusleeping/cbor-gen v0.0.0-20191212224538-d370462a7e8a/go.mod h1:xdlJQaiqipF0HW+Mzpg7XRM3fWbGvfgFlcppuvlkIvY=
github.com/whyrusleeping/cbor-gen v0.0.0-20191216205031-b047b6acb3c0/go.mod h1:xdlJQaiqipF0HW+Mzpg7XRM3fWbGvfgFlcppuvlkIvY=
github.com/whyrusleeping/cbor-gen v0.0.0-20200106232624-282db0d37dbe h1:n7En1uyDtknjLRDXebWlPGJoHvwL8AkNcSQzuOoYYYQ=
github.com/whyrusleeping/cbor-gen v0.0.0-20200106232624-282db0d37dbe/go.mod h1:xdlJQaiqipF0HW+Mzpg7XRM3fWbGvfgFlcppuvlkIvY=
github.com/whyrusleeping/cbor-gen v0.0.0-20200121162646-b63bacf5eaf8 h1:PHZv8Nu+95MBVNu3qSgg3ncxIv8hy4DzGAOBR9xYQRc=
github.com/whyrusleeping/cbor-gen v0.0.0-20200121162646-b63bacf5eaf8/go.mod h1:xdlJQaiqipF0HW+Mzpg7XRM3fWbGvfgFlcppuvlkIvY=
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E=
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f/go.mod h1:p9UJB6dDgdPgMJZs7UjUOdulKyRr9fqkS+6JKAInPy8=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k=

View File

@ -99,7 +99,7 @@ func (h handlers) register(namespace string, r interface{}) {
// Handle
type rpcErrFunc func(w func(func(io.Writer)), req *request, code int, err error)
type chanOut func(reflect.Value) interface{}
type chanOut func(reflect.Value, int64) error
func (h handlers) handleReader(ctx context.Context, r io.Reader, w io.Writer, rpcError rpcErrFunc) {
wf := func(cb func(io.Writer)) {
@ -222,16 +222,25 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer
if handler.valOut != -1 {
resp.Result = callResult[handler.valOut].Interface()
}
if resp.Result != nil && reflect.TypeOf(resp.Result).Kind() == reflect.Chan {
// Channel responses are sent from channel control goroutine.
// Sending responses here could cause deadlocks on writeLk, or allow
// sending channel messages before this rpc call returns
w(func(w io.Writer) {
if resp.Result != nil && reflect.TypeOf(resp.Result).Kind() == reflect.Chan {
// this must happen in the writer callback, otherwise we may start sending
// channel messages before we send this response
//noinspection GoNilness // already checked above
resp.Result = chOut(callResult[handler.valOut])
//noinspection GoNilness // already checked above
err = chOut(callResult[handler.valOut], *req.ID)
if err == nil {
return // channel goroutine handles responding
}
log.Warnf("failed to setup channel in RPC call to '%s': %+v", req.Method, err)
resp.Error = &respError{
Code: 1,
Message: err.(error).Error(),
}
}
w(func(w io.Writer) {
if err := json.NewEncoder(w).Encode(resp); err != nil {
log.Error(err)
return

View File

@ -375,5 +375,51 @@ func TestChan(t *testing.T) {
serverHandler.wait <- struct{}{}
_, ok = <-sub
require.Equal(t, false, ok)
}
func TestControlChanDeadlock(t *testing.T) {
for r := 0; r < 20; r++ {
testControlChanDeadlock(t)
}
}
func testControlChanDeadlock(t *testing.T) {
var client struct {
Sub func(context.Context, int, int) (<-chan int, error)
}
n := 5000
serverHandler := &ChanHandler{
wait: make(chan struct{}, n),
}
rpcServer := NewServer()
rpcServer.Register("ChanHandler", serverHandler)
testServ := httptest.NewServer(rpcServer)
defer testServ.Close()
closer, err := NewClient("ws://"+testServ.Listener.Addr().String(), "ChanHandler", &client, nil)
require.NoError(t, err)
defer closer()
for i := 0; i < n; i++ {
serverHandler.wait <- struct{}{}
}
ctx, _ := context.WithCancel(context.Background())
sub, err := client.Sub(ctx, 1, -1)
require.NoError(t, err)
go func() {
for i := 0; i < n; i++ {
require.Equal(t, i+1, <-sub)
}
}()
_, err = client.Sub(ctx, 2, -1)
require.NoError(t, err)
}

View File

@ -11,6 +11,7 @@ import (
"sync/atomic"
"github.com/gorilla/websocket"
"golang.org/x/xerrors"
)
const wsCancel = "xrpc.cancel"
@ -33,8 +34,10 @@ type frame struct {
}
type outChanReg struct {
id uint64
ch reflect.Value
reqID int64
chID uint64
ch reflect.Value
}
type wsConn struct {
@ -134,51 +137,80 @@ func (c *wsConn) sendRequest(req request) {
// (forwards channel messages to client)
func (c *wsConn) handleOutChans() {
regV := reflect.ValueOf(c.registerCh)
exitV := reflect.ValueOf(c.exiting)
cases := []reflect.SelectCase{
{ // registration chan always 0
Dir: reflect.SelectRecv,
Chan: regV,
},
{ // exit chan always 1
Dir: reflect.SelectRecv,
Chan: exitV,
},
}
internal := len(cases)
var caseToID []uint64
for {
chosen, val, ok := reflect.Select(cases)
if chosen == 0 { // control channel
switch chosen {
case 0: // registration channel
if !ok {
// control channel closed - signals closed connection
// This shouldn't happen, instead the exiting channel should get closed
log.Warn("control channel closed")
return
}
registration := val.Interface().(outChanReg)
caseToID = append(caseToID, registration.chID)
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: registration.ch,
})
c.nextWriter(func(w io.Writer) {
resp := &response{
Jsonrpc: "2.0",
ID: registration.reqID,
Result: registration.chID,
}
if err := json.NewEncoder(w).Encode(resp); err != nil {
log.Error(err)
return
}
})
continue
case 1: // exiting channel
if !ok {
// exiting channel closed - signals closed connection
//
// We're not closing any channels as we're on receiving end.
// Also, context cancellation below should take care of any running
// requests
return
}
registration := val.Interface().(outChanReg)
caseToID = append(caseToID, registration.id)
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: registration.ch,
})
log.Warn("exiting channel received a message")
continue
}
if !ok {
// Output channel closed, cleanup, and tell remote that this happened
n := len(caseToID)
n := len(cases) - 1
if n > 0 {
cases[chosen] = cases[n]
caseToID[chosen-1] = caseToID[n-1]
caseToID[chosen-internal] = caseToID[n-internal]
}
id := caseToID[chosen-1]
id := caseToID[chosen-internal]
cases = cases[:n]
caseToID = caseToID[:n-1]
caseToID = caseToID[:n-internal]
c.sendRequest(request{
Jsonrpc: "2.0",
@ -194,24 +226,29 @@ func (c *wsConn) handleOutChans() {
Jsonrpc: "2.0",
ID: nil, // notification
Method: chValue,
Params: []param{{v: reflect.ValueOf(caseToID[chosen-1])}, {v: val}},
Params: []param{{v: reflect.ValueOf(caseToID[chosen-internal])}, {v: val}},
})
}
}
// handleChanOut registers output channel for forwarding to client
func (c *wsConn) handleChanOut(ch reflect.Value) interface{} {
func (c *wsConn) handleChanOut(ch reflect.Value, req int64) error {
c.spawnOutChanHandlerOnce.Do(func() {
go c.handleOutChans()
})
id := atomic.AddUint64(&c.chanCtr, 1)
c.registerCh <- outChanReg{
id: id,
ch: ch,
}
select {
case c.registerCh <- outChanReg{
reqID: req,
return id
chID: id,
ch: ch,
}:
return nil
case <-c.exiting:
return xerrors.New("connection closing")
}
}
// //
@ -389,7 +426,6 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
c.chanHandlers = map[uint64]func(m []byte, ok bool){}
c.registerCh = make(chan outChanReg)
defer close(c.registerCh)
defer close(c.exiting)
// ////

View File

@ -66,61 +66,55 @@ func (t *TestState) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type map")
}
if extra != 2 {
return fmt.Errorf("cbor input had wrong number of fields")
if extra > cbg.MaxLength {
return fmt.Errorf("TestState: map struct too large (%d)", extra)
}
var name string
n := extra
// t.A (uint64) (uint64)
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
name = string(sval)
}
switch name {
// t.A (uint64) (uint64)
case "A":
if name != "A" {
return fmt.Errorf("expected struct map entry %s to be A", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.A = uint64(extra)
// t.B (uint64) (uint64)
case "B":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.A = uint64(extra)
// t.B (uint64) (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.B = uint64(extra)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
default:
return fmt.Errorf("unknown struct field %d: '%s'", i, name)
}
name = string(sval)
}
if name != "B" {
return fmt.Errorf("expected struct map entry %s to be B", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.B = uint64(extra)
return nil
}
func (t *TestEvent) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
@ -182,57 +176,52 @@ func (t *TestEvent) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type map")
}
if extra != 2 {
return fmt.Errorf("cbor input had wrong number of fields")
if extra > cbg.MaxLength {
return fmt.Errorf("TestEvent: map struct too large (%d)", extra)
}
var name string
n := extra
// t.A (string) (string)
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
name = string(sval)
}
switch name {
// t.A (string) (string)
case "A":
if name != "A" {
return fmt.Errorf("expected struct map entry %s to be A", name)
}
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
t.A = string(sval)
}
// t.Val (uint64) (uint64)
case "Val":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Val = uint64(extra)
default:
return fmt.Errorf("unknown struct field %d: '%s'", i, name)
}
t.A = string(sval)
}
// t.Val (uint64) (uint64)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "Val" {
return fmt.Errorf("expected struct map entry %s to be Val", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Val = uint64(extra)
return nil
}

View File

@ -3,6 +3,7 @@ package full
import (
"context"
"fmt"
"io"
"strconv"
"strings"
@ -18,6 +19,7 @@ import (
"github.com/ipfs/go-path"
"github.com/ipfs/go-path/resolver"
mh "github.com/multiformats/go-multihash"
"github.com/prometheus/common/log"
"go.uber.org/fx"
"golang.org/x/xerrors"
@ -312,3 +314,34 @@ func (a *ChainAPI) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Mess
return cm.VMMessage(), nil
}
func (a *ChainAPI) ChainExport(ctx context.Context, ts *types.TipSet) (<-chan []byte, error) {
r, w := io.Pipe()
out := make(chan []byte)
go func() {
defer w.Close()
if err := a.Chain.Export(ctx, ts, w); err != nil {
log.Errorf("chain export call failed: %s", err)
return
}
}()
go func() {
defer close(out)
for {
buf := make([]byte, 4096)
n, err := r.Read(buf)
if err != nil {
log.Errorf("chain export pipe read failed: %s", err)
return
}
select {
case out <- buf[:n]:
case <-ctx.Done():
log.Warnf("export writer failed: %s", ctx.Err())
}
}
}()
return out, nil
}

View File

@ -108,7 +108,7 @@ func (a *StateAPI) StatePledgeCollateral(ctx context.Context, ts *types.TipSet)
return types.BigFromBytes(ret.Return), nil
}
func (a *StateAPI) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error) {
func (a *StateAPI) StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*api.MethodCall, error) {
return a.StateManager.Call(ctx, msg, ts)
}

View File

@ -393,6 +393,12 @@ func TestAPIRPC(t *testing.T) {
}
func TestAPIDealFlow(t *testing.T) {
logging.SetLogLevel("miner", "ERROR")
logging.SetLogLevel("chainstore", "ERROR")
logging.SetLogLevel("chain", "ERROR")
logging.SetLogLevel("sub", "ERROR")
logging.SetLogLevel("storageminer", "ERROR")
test.TestDealFlow(t, mockSbBuilder, 10*time.Millisecond)
}

View File

@ -117,7 +117,10 @@ func (s *fpostScheduler) runPost(ctx context.Context, eps uint64, ts *types.TipS
return nil, xerrors.Errorf("getting sorted sector info: %w", err)
}
log.Infow("running fPoSt", "chain-random", rand, "eps", eps, "height", ts.Height())
log.Infow("running fPoSt",
"chain-random", rand,
"eps", eps,
"height", ts.Height())
faults, err := s.checkFaults(ctx, ssi)
if err != nil {
@ -129,6 +132,10 @@ func (s *fpostScheduler) runPost(ctx context.Context, eps uint64, ts *types.TipS
var seed [32]byte
copy(seed[:], rand)
log.Infow("generating fPoSt",
"sectors", len(ssi.Values()),
"faults", len(faults))
scandidates, proof, err := s.sb.GenerateFallbackPoSt(ssi, seed, faults)
if err != nil {
return nil, xerrors.Errorf("running post failed: %w", err)

View File

@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/store"
@ -42,7 +43,7 @@ type Miner struct {
type storageMinerApi interface {
// Call a read only method on actors (no interaction with the chain required)
StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error)
StateCall(context.Context, *types.Message, *types.TipSet) (*api.MethodCall, error)
StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error)
StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error)
StateMinerSectors(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error)
@ -51,6 +52,7 @@ type storageMinerApi interface {
StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error) // TODO: removeme eventually
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error)
StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error)
StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error)
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)
@ -100,6 +102,8 @@ func (m *Miner) Run(ctx context.Context) error {
evts := events.NewEvents(ctx, m.api)
m.sealing = sealing.New(m.api, evts, m.maddr, m.worker, m.ds, m.sb, m.tktFn)
go m.sealing.Run(ctx)
return nil
}

View File

@ -73,68 +73,62 @@ func (t *SealTicket) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type map")
}
if extra != 2 {
return fmt.Errorf("cbor input had wrong number of fields")
if extra > cbg.MaxLength {
return fmt.Errorf("SealTicket: map struct too large (%d)", extra)
}
var name string
n := extra
// t.BlockHeight (uint64) (uint64)
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
name = string(sval)
}
switch name {
// t.BlockHeight (uint64) (uint64)
case "BlockHeight":
if name != "BlockHeight" {
return fmt.Errorf("expected struct map entry %s to be BlockHeight", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.BlockHeight = uint64(extra)
// t.TicketBytes ([]uint8) (slice)
case "TicketBytes":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.BlockHeight = uint64(extra)
// t.TicketBytes ([]uint8) (slice)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.TicketBytes: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.TicketBytes = make([]byte, extra)
if _, err := io.ReadFull(br, t.TicketBytes); err != nil {
return err
}
default:
return fmt.Errorf("unknown struct field %d: '%s'", i, name)
}
name = string(sval)
}
if name != "TicketBytes" {
return fmt.Errorf("expected struct map entry %s to be TicketBytes", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.TicketBytes: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.TicketBytes = make([]byte, extra)
if _, err := io.ReadFull(br, t.TicketBytes); err != nil {
return err
}
return nil
}
func (t *SealSeed) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
@ -196,68 +190,62 @@ func (t *SealSeed) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type map")
}
if extra != 2 {
return fmt.Errorf("cbor input had wrong number of fields")
if extra > cbg.MaxLength {
return fmt.Errorf("SealSeed: map struct too large (%d)", extra)
}
var name string
n := extra
// t.BlockHeight (uint64) (uint64)
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
name = string(sval)
}
switch name {
// t.BlockHeight (uint64) (uint64)
case "BlockHeight":
if name != "BlockHeight" {
return fmt.Errorf("expected struct map entry %s to be BlockHeight", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.BlockHeight = uint64(extra)
// t.TicketBytes ([]uint8) (slice)
case "TicketBytes":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.BlockHeight = uint64(extra)
// t.TicketBytes ([]uint8) (slice)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.TicketBytes: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.TicketBytes = make([]byte, extra)
if _, err := io.ReadFull(br, t.TicketBytes); err != nil {
return err
}
default:
return fmt.Errorf("unknown struct field %d: '%s'", i, name)
}
name = string(sval)
}
if name != "TicketBytes" {
return fmt.Errorf("expected struct map entry %s to be TicketBytes", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.TicketBytes: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.TicketBytes = make([]byte, extra)
if _, err := io.ReadFull(br, t.TicketBytes); err != nil {
return err
}
return nil
}
func (t *Piece) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
@ -335,91 +323,73 @@ func (t *Piece) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type map")
}
if extra != 3 {
return fmt.Errorf("cbor input had wrong number of fields")
if extra > cbg.MaxLength {
return fmt.Errorf("Piece: map struct too large (%d)", extra)
}
var name string
n := extra
// t.DealID (uint64) (uint64)
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
name = string(sval)
}
switch name {
// t.DealID (uint64) (uint64)
case "DealID":
if name != "DealID" {
return fmt.Errorf("expected struct map entry %s to be DealID", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.DealID = uint64(extra)
// t.Size (uint64) (uint64)
case "Size":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.DealID = uint64(extra)
// t.Size (uint64) (uint64)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Size = uint64(extra)
// t.CommP ([]uint8) (slice)
case "CommP":
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.CommP: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.CommP = make([]byte, extra)
if _, err := io.ReadFull(br, t.CommP); err != nil {
return err
}
default:
return fmt.Errorf("unknown struct field %d: '%s'", i, name)
}
name = string(sval)
}
if name != "Size" {
return fmt.Errorf("expected struct map entry %s to be Size", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Size = uint64(extra)
// t.CommP ([]uint8) (slice)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "CommP" {
return fmt.Errorf("expected struct map entry %s to be CommP", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.CommP: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.CommP = make([]byte, extra)
if _, err := io.ReadFull(br, t.CommP); err != nil {
return err
}
return nil
}
func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
@ -477,7 +447,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
return err
}
// t.Pieces ([]storage.Piece) (slice)
// t.Pieces ([]sealing.Piece) (slice)
if len("Pieces") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Pieces\" was too long")
}
@ -571,7 +541,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
return err
}
// t.Ticket (storage.SealTicket) (struct)
// t.Ticket (sealing.SealTicket) (struct)
if len("Ticket") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Ticket\" was too long")
}
@ -609,7 +579,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
}
}
// t.Seed (storage.SealSeed) (struct)
// t.Seed (sealing.SealSeed) (struct)
if len("Seed") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"Seed\" was too long")
}
@ -705,388 +675,251 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type map")
}
if extra != 13 {
return fmt.Errorf("cbor input had wrong number of fields")
if extra > cbg.MaxLength {
return fmt.Errorf("SectorInfo: map struct too large (%d)", extra)
}
var name string
n := extra
// t.State (uint64) (uint64)
for i := uint64(0); i < n; i++ {
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "State" {
return fmt.Errorf("expected struct map entry %s to be State", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.State = uint64(extra)
// t.SectorID (uint64) (uint64)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "SectorID" {
return fmt.Errorf("expected struct map entry %s to be SectorID", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.SectorID = uint64(extra)
// t.Nonce (uint64) (uint64)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "Nonce" {
return fmt.Errorf("expected struct map entry %s to be Nonce", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Nonce = uint64(extra)
// t.Pieces ([]storage.Piece) (slice)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "Pieces" {
return fmt.Errorf("expected struct map entry %s to be Pieces", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > cbg.MaxLength {
return fmt.Errorf("t.Pieces: array too large (%d)", extra)
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.Pieces = make([]Piece, extra)
}
for i := 0; i < int(extra); i++ {
var v Piece
if err := v.UnmarshalCBOR(br); err != nil {
return err
}
t.Pieces[i] = v
}
// t.CommD ([]uint8) (slice)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "CommD" {
return fmt.Errorf("expected struct map entry %s to be CommD", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.CommD: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.CommD = make([]byte, extra)
if _, err := io.ReadFull(br, t.CommD); err != nil {
return err
}
// t.CommR ([]uint8) (slice)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "CommR" {
return fmt.Errorf("expected struct map entry %s to be CommR", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.CommR: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.CommR = make([]byte, extra)
if _, err := io.ReadFull(br, t.CommR); err != nil {
return err
}
// t.Proof ([]uint8) (slice)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "Proof" {
return fmt.Errorf("expected struct map entry %s to be Proof", name)
}
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.Proof: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.Proof = make([]byte, extra)
if _, err := io.ReadFull(br, t.Proof); err != nil {
return err
}
// t.Ticket (storage.SealTicket) (struct)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "Ticket" {
return fmt.Errorf("expected struct map entry %s to be Ticket", name)
}
{
if err := t.Ticket.UnmarshalCBOR(br); err != nil {
return err
}
}
// t.PreCommitMessage (cid.Cid) (struct)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "PreCommitMessage" {
return fmt.Errorf("expected struct map entry %s to be PreCommitMessage", name)
}
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
} else {
c, err := cbg.ReadCid(br)
name = string(sval)
}
switch name {
// t.State (uint64) (uint64)
case "State":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.PreCommitMessage: %w", err)
}
t.PreCommitMessage = &c
}
}
// t.Seed (storage.SealSeed) (struct)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "Seed" {
return fmt.Errorf("expected struct map entry %s to be Seed", name)
}
{
if err := t.Seed.UnmarshalCBOR(br); err != nil {
return err
}
}
// t.CommitMessage (cid.Cid) (struct)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "CommitMessage" {
return fmt.Errorf("expected struct map entry %s to be CommitMessage", name)
}
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.CommitMessage: %w", err)
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.State = uint64(extra)
// t.SectorID (uint64) (uint64)
case "SectorID":
t.CommitMessage = &c
}
}
// t.FaultReportMsg (cid.Cid) (struct)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "FaultReportMsg" {
return fmt.Errorf("expected struct map entry %s to be FaultReportMsg", name)
}
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
} else {
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.SectorID = uint64(extra)
// t.Nonce (uint64) (uint64)
case "Nonce":
c, err := cbg.ReadCid(br)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.FaultReportMsg: %w", err)
return err
}
if maj != cbg.MajUnsignedInt {
return fmt.Errorf("wrong type for uint64 field")
}
t.Nonce = uint64(extra)
// t.Pieces ([]sealing.Piece) (slice)
case "Pieces":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
t.FaultReportMsg = &c
if extra > cbg.MaxLength {
return fmt.Errorf("t.Pieces: array too large (%d)", extra)
}
if maj != cbg.MajArray {
return fmt.Errorf("expected cbor array")
}
if extra > 0 {
t.Pieces = make([]Piece, extra)
}
for i := 0; i < int(extra); i++ {
var v Piece
if err := v.UnmarshalCBOR(br); err != nil {
return err
}
t.Pieces[i] = v
}
// t.CommD ([]uint8) (slice)
case "CommD":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.CommD: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.CommD = make([]byte, extra)
if _, err := io.ReadFull(br, t.CommD); err != nil {
return err
}
// t.CommR ([]uint8) (slice)
case "CommR":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.CommR: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.CommR = make([]byte, extra)
if _, err := io.ReadFull(br, t.CommR); err != nil {
return err
}
// t.Proof ([]uint8) (slice)
case "Proof":
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > cbg.ByteArrayMaxLen {
return fmt.Errorf("t.Proof: byte array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.Proof = make([]byte, extra)
if _, err := io.ReadFull(br, t.Proof); err != nil {
return err
}
// t.Ticket (sealing.SealTicket) (struct)
case "Ticket":
{
if err := t.Ticket.UnmarshalCBOR(br); err != nil {
return err
}
}
// t.PreCommitMessage (cid.Cid) (struct)
case "PreCommitMessage":
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.PreCommitMessage: %w", err)
}
t.PreCommitMessage = &c
}
}
// t.Seed (sealing.SealSeed) (struct)
case "Seed":
{
if err := t.Seed.UnmarshalCBOR(br); err != nil {
return err
}
}
// t.CommitMessage (cid.Cid) (struct)
case "CommitMessage":
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.CommitMessage: %w", err)
}
t.CommitMessage = &c
}
}
// t.FaultReportMsg (cid.Cid) (struct)
case "FaultReportMsg":
{
pb, err := br.PeekByte()
if err != nil {
return err
}
if pb == cbg.CborNull[0] {
var nbuf [1]byte
if _, err := br.Read(nbuf[:]); err != nil {
return err
}
} else {
c, err := cbg.ReadCid(br)
if err != nil {
return xerrors.Errorf("failed to read cid field t.FaultReportMsg: %w", err)
}
t.FaultReportMsg = &c
}
}
// t.LastErr (string) (string)
case "LastErr":
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
t.LastErr = string(sval)
}
default:
return fmt.Errorf("unknown struct field %d: '%s'", i, name)
}
}
// t.LastErr (string) (string)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
name = string(sval)
}
if name != "LastErr" {
return fmt.Errorf("expected struct map entry %s to be LastErr", name)
}
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
t.LastErr = string(sval)
}
return nil
}

View File

@ -44,7 +44,10 @@ var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error{
on(SectorPreCommitFailed{}, api.PreCommitFailed),
),
api.Committing: planCommitting,
api.CommitWait: planOne(on(SectorProving{}, api.Proving)),
api.CommitWait: planOne(
on(SectorProving{}, api.Proving),
on(SectorCommitFailed{}, api.CommitFailed),
),
api.Proving: planOne(
on(SectorFaultReported{}, api.FaultReported),
@ -233,11 +236,15 @@ func planOne(ts ...func() (mut mutator, next api.SectorState)) func(events []sta
continue
}
if err, iserr := events[0].User.(error); iserr {
log.Warnf("sector %d got error event %T: %+v", state.SectorID, events[0].User, err)
}
events[0].User.(mutator).apply(state)
state.State = next
return nil
}
return xerrors.Errorf("planner for state %s received unexpected event %+v", events[0])
return xerrors.Errorf("planner for state %s received unexpected event %+v", api.SectorStates[state.State], events[0])
}
}

View File

@ -97,7 +97,11 @@ func (evt SectorSeedReady) apply(state *SectorInfo) {
}
type SectorComputeProofFailed struct{ error }
type SectorCommitFailed struct{ error }
func (evt SectorCommitFailed) apply(*SectorInfo) {}
type SectorCommitted struct {
message cid.Cid
proof []byte

View File

@ -29,7 +29,7 @@ type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error)
type sealingApi interface { // TODO: trim down
// Call a read only method on actors (no interaction with the chain required)
StateCall(ctx context.Context, msg *types.Message, ts *types.TipSet) (*types.MessageReceipt, error)
StateCall(context.Context, *types.Message, *types.TipSet) (*api.MethodCall, error)
StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error)
StateMinerElectionPeriodStart(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error)
StateMinerSectors(context.Context, address.Address, *types.TipSet) ([]*api.ChainSectorInfo, error)
@ -82,8 +82,6 @@ func New(api sealingApi, events *events.Events, maddr address.Address, worker ad
}
func (m *Sealing) Run(ctx context.Context) error {
m.events = events.NewEvents(ctx, m.api)
if err := m.restartSectors(ctx); err != nil {
log.Errorf("%+v", err)
return xerrors.Errorf("failed load sector states: %w", err)