Merge branch 'master' of github.com:filecoin-project/lotus

This commit is contained in:
eshon 2019-10-10 19:35:01 +02:00
commit 3f155ccc9a
74 changed files with 1810 additions and 675 deletions

View File

@ -63,7 +63,7 @@ lotus: $(BUILD_DEPS)
.PHONY: lotus
CLEAN+=lotus
lotus-sotrage-miner: $(BUILD_DEPS)
lotus-storage-miner: $(BUILD_DEPS)
rm -f lotus-storage-miner
go build -o lotus-storage-miner ./cmd/lotus-storage-miner
go run github.com/GeertJohan/go.rice/rice append --exec lotus-storage-miner -i ./build
@ -72,7 +72,7 @@ lotus-sotrage-miner: $(BUILD_DEPS)
CLEAN+=lotus-storage-miner
build: lotus lotus-sotrage-miner
build: lotus lotus-storage-miner
.PHONY: build

View File

@ -2,13 +2,13 @@
Lotus is an experimental implementation of the Filecoin Distributed Storage
Network. For more details, check out the
[spec](https://github.com/filecoin-project/spec).
[spec](https://github.com/filecoin-project/specs).
## Development
All work is tracked via issues, and an attempt to keep an up to date view on
this exists in the [lotus testnet github project
board](https://github.com/filecoin-project/go-lotus/projects/1).
All work is tracked via issues. An attempt at keeping an up-to-date view on
remaining work is in the [lotus testnet github project
board](https://github.com/filecoin-project/lotus/projects/1).
## Building
@ -38,10 +38,10 @@ Start full node daemon
$ lotus daemon
```
Connect to the network:
Check that you are connected to the network
```sh
$ lotus net connect /ip4/147.75.80.29/tcp/1347/p2p/12D3KooWGThG7Ct5aX4tTRkgvjr3pT2JyCyyvK77GhXVQ9Cfjzj2
$ lotus net connect /ip4/147.75.80.17/tcp/1347/p2p/12D3KooWRNm4a6ESBr9bbTpSC2CfLfoWKRpABJi7FR3GhHw7usKW
$ lotus net peers | wc -l
2 # number of peers
```
[wait for the chain to finish syncing]
@ -190,4 +190,4 @@ open up localhost:16686 in your browser.
For more details, see [this document](./docs/tracing.md).
## License
MIT + Apache
Dual-licensed under [MIT](https://github.com/filecoin-project/lotus/blob/master/LICENSE-MIT) + [Apache 2.0](https://github.com/filecoin-project/lotus/blob/master/LICENSE-APACHE)

View File

@ -2,6 +2,7 @@ package api
import (
"context"
"fmt"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-filestore"
@ -9,6 +10,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
@ -51,13 +53,13 @@ type FullNode interface {
ChainHead(context.Context) (*types.TipSet, error) // TODO: check serialization
ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error // TODO: check serialization
ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error)
ChainWaitMsg(context.Context, cid.Cid) (*MsgWait, error)
ChainGetBlock(context.Context, cid.Cid) (*types.BlockHeader, error)
ChainGetBlockMessages(context.Context, cid.Cid) (*BlockMessages, error)
ChainGetParentReceipts(context.Context, cid.Cid) ([]*types.MessageReceipt, error)
ChainGetParentMessages(context.Context, cid.Cid) ([]cid.Cid, error)
ChainGetParentMessages(context.Context, cid.Cid) ([]Message, error)
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
ChainSetHead(context.Context, *types.TipSet) error
// syncer
SyncState(context.Context) (*SyncState, error)
@ -65,7 +67,7 @@ type FullNode interface {
// messages
MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error)
MpoolPush(context.Context, *types.SignedMessage) error
MpoolPush(context.Context, *types.SignedMessage) error // TODO: remove
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) // get nonce, sign, push
MpoolGetNonce(context.Context, address.Address) (uint64, error)
@ -89,6 +91,8 @@ type FullNode interface {
WalletSign(context.Context, address.Address, []byte) (*types.Signature, error)
WalletSignMessage(context.Context, address.Address, *types.Message) (*types.SignedMessage, error)
WalletDefaultAddress(context.Context) (address.Address, error)
WalletExport(context.Context, address.Address) (*types.KeyInfo, error)
WalletImport(context.Context, *types.KeyInfo) (address.Address, error)
// Other
@ -122,6 +126,7 @@ type FullNode interface {
StateMinerPeerID(ctx context.Context, m address.Address, ts *types.TipSet) (peer.ID, error)
StateMinerProvingPeriodEnd(ctx context.Context, actor address.Address, ts *types.TipSet) (uint64, error)
StatePledgeCollateral(context.Context, *types.TipSet) (types.BigInt, error)
StateWaitMsg(context.Context, cid.Cid) (*MsgWait, error)
PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error)
PaychList(context.Context) ([]address.Address, error)
@ -171,6 +176,11 @@ type Version struct {
// TODO: git commit / os / genesis cid?
}
func (v Version) String() string {
vM, vm, vp := build.VersionInts(v.APIVersion)
return fmt.Sprintf("%s+api%d.%d.%d", v.Version, vM, vm, vp)
}
type Import struct {
Status filestore.Status
Key cid.Cid
@ -193,6 +203,7 @@ type DealInfo struct {
type MsgWait struct {
Receipt types.MessageReceipt
TipSet *types.TipSet
}
type BlockMessages struct {
@ -202,6 +213,11 @@ type BlockMessages struct {
Cids []cid.Cid
}
type Message struct {
Cid cid.Cid
Message *types.Message
}
type SectorInfo struct {
SectorID uint64
CommD []byte

View File

@ -41,13 +41,13 @@ type FullNodeStruct struct {
ChainSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"`
ChainHead func(context.Context) (*types.TipSet, error) `perm:"read"`
ChainGetRandomness func(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error) `perm:"read"`
ChainWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"`
ChainGetBlock func(context.Context, cid.Cid) (*types.BlockHeader, error) `perm:"read"`
ChainGetBlockMessages func(context.Context, cid.Cid) (*BlockMessages, error) `perm:"read"`
ChainGetParentReceipts func(context.Context, cid.Cid) ([]*types.MessageReceipt, error) `perm:"read"`
ChainGetParentMessages func(context.Context, cid.Cid) ([]cid.Cid, error) `perm:"read"`
ChainGetParentMessages func(context.Context, cid.Cid) ([]Message, error) `perm:"read"`
ChainGetTipSetByHeight func(context.Context, uint64, *types.TipSet) (*types.TipSet, error) `perm:"read"`
ChainReadObj func(context.Context, cid.Cid) ([]byte, error) `perm:"read"`
ChainSetHead func(context.Context, *types.TipSet) error `perm:"admin"`
SyncState func(context.Context) (*SyncState, error) `perm:"read"`
@ -67,7 +67,10 @@ type FullNodeStruct struct {
WalletSign func(context.Context, address.Address, []byte) (*types.Signature, error) `perm:"sign"`
WalletSignMessage func(context.Context, address.Address, *types.Message) (*types.SignedMessage, error) `perm:"sign"`
WalletDefaultAddress func(context.Context) (address.Address, error) `perm:"write"`
MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"`
WalletExport func(context.Context, address.Address) (*types.KeyInfo, error) `perm:"admin"`
WalletImport func(context.Context, *types.KeyInfo) (address.Address, error) `perm:"admin"`
MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"`
ClientImport func(ctx context.Context, path string) (cid.Cid, error) `perm:"write"`
ClientListImports func(ctx context.Context) ([]Import, error) `perm:"write"`
@ -89,6 +92,7 @@ type FullNodeStruct struct {
StateGetActor func(context.Context, address.Address, *types.TipSet) (*types.Actor, error) `perm:"read"`
StateReadState func(context.Context, *types.Actor, *types.TipSet) (*ActorState, error) `perm:"read"`
StatePledgeCollateral func(context.Context, *types.TipSet) (types.BigInt, error) `perm:"read"`
StateWaitMsg func(context.Context, cid.Cid) (*MsgWait, error) `perm:"read"`
PaychGet func(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) `perm:"sign"`
PaychList func(context.Context) ([]address.Address, error) `perm:"read"`
@ -232,10 +236,6 @@ func (c *FullNodeStruct) ChainGetRandomness(ctx context.Context, pts *types.TipS
return c.Internal.ChainGetRandomness(ctx, pts, ticks, lb)
}
func (c *FullNodeStruct) ChainWaitMsg(ctx context.Context, msgc cid.Cid) (*MsgWait, error) {
return c.Internal.ChainWaitMsg(ctx, msgc)
}
func (c *FullNodeStruct) ChainGetTipSetByHeight(ctx context.Context, h uint64, ts *types.TipSet) (*types.TipSet, error) {
return c.Internal.ChainGetTipSetByHeight(ctx, h, ts)
}
@ -268,6 +268,14 @@ func (c *FullNodeStruct) WalletDefaultAddress(ctx context.Context) (address.Addr
return c.Internal.WalletDefaultAddress(ctx)
}
func (c *FullNodeStruct) WalletExport(ctx context.Context, a address.Address) (*types.KeyInfo, error) {
return c.Internal.WalletExport(ctx, a)
}
func (c *FullNodeStruct) WalletImport(ctx context.Context, ki *types.KeyInfo) (address.Address, error) {
return c.Internal.WalletImport(ctx, ki)
}
func (c *FullNodeStruct) MpoolGetNonce(ctx context.Context, addr address.Address) (uint64, error) {
return c.Internal.MpoolGetNonce(ctx, addr)
}
@ -284,7 +292,7 @@ func (c *FullNodeStruct) ChainGetParentReceipts(ctx context.Context, b cid.Cid)
return c.Internal.ChainGetParentReceipts(ctx, b)
}
func (c *FullNodeStruct) ChainGetParentMessages(ctx context.Context, b cid.Cid) ([]cid.Cid, error) {
func (c *FullNodeStruct) ChainGetParentMessages(ctx context.Context, b cid.Cid) ([]Message, error) {
return c.Internal.ChainGetParentMessages(ctx, b)
}
@ -296,6 +304,10 @@ func (c *FullNodeStruct) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte,
return c.Internal.ChainReadObj(ctx, obj)
}
func (c *FullNodeStruct) ChainSetHead(ctx context.Context, ts *types.TipSet) error {
return c.Internal.ChainSetHead(ctx, ts)
}
func (c *FullNodeStruct) SyncState(ctx context.Context) (*SyncState, error) {
return c.Internal.SyncState(ctx)
}
@ -343,6 +355,10 @@ func (c *FullNodeStruct) StatePledgeCollateral(ctx context.Context, ts *types.Ti
return c.Internal.StatePledgeCollateral(ctx, ts)
}
func (c *FullNodeStruct) StateWaitMsg(ctx context.Context, msgc cid.Cid) (*MsgWait, error) {
return c.Internal.StateWaitMsg(ctx, msgc)
}
func (c *FullNodeStruct) PaychGet(ctx context.Context, from, to address.Address, ensureFunds types.BigInt) (*ChannelInfo, error) {
return c.Internal.PaychGet(ctx, from, to, ensureFunds)
}

30
build/bootstrap.go Normal file
View File

@ -0,0 +1,30 @@
package build
import (
"context"
"github.com/filecoin-project/go-lotus/lib/addrutil"
"os"
"strings"
rice "github.com/GeertJohan/go.rice"
"github.com/libp2p/go-libp2p-core/peer"
)
func BuiltinBootstrap() ([]peer.AddrInfo, error) {
var out []peer.AddrInfo
b := rice.MustFindBox("bootstrap")
err := b.Walk("", func(path string, info os.FileInfo, err error) error {
if !strings.HasSuffix(path, ".pi") {
return nil
}
spi := b.MustString(path)
if spi == "" {
return nil
}
pi, err := addrutil.ParseAddresses(context.TODO(), strings.Split(strings.TrimSpace(spi), "\n"))
out = append(out, pi...)
return err
})
return out, err
}

0
build/bootstrap/.gitkeep Normal file
View File

View File

@ -0,0 +1 @@
/ip4/147.75.80.17/tcp/1347/p2p/12D3KooWJdB9SxJfUb327CsBMCyEqzkDCHaqcrwaHGZ88wUuoc5F

1
build/bootstrap/root.pi Normal file
View File

@ -0,0 +1 @@
/ip4/147.75.80.29/tcp/1347/p2p/12D3KooWAShT7qd3oM7QPC8AsQffs6ThH69fZGui4xCW68E35rDP

Binary file not shown.

View File

@ -36,7 +36,7 @@ const BlockDelay = 6
const AllowableClockDrift = BlockDelay * 2
// Blocks
const ForkLengthThreshold = 20
const ForkLengthThreshold = 100
// /////
// Proofs / Mining
@ -85,3 +85,6 @@ func init() {
panic("could not parse InitialRewardStr")
}
}
// Sync
const BadBlockCacheSize = 8192

View File

@ -19,3 +19,8 @@ const (
MinorMask = 0xffff00
PatchMask = 0xffffff
)
// VersionInts returns (major, minor, patch) versions
func VersionInts(version uint32) (uint32, uint32, uint32) {
return (version & MajorMask) >> 16, (version & MinorMask) >> 8, version & PatchMask
}

30
chain/badtscache.go Normal file
View File

@ -0,0 +1,30 @@
package chain
import (
"github.com/filecoin-project/go-lotus/build"
lru "github.com/hashicorp/golang-lru"
"github.com/ipfs/go-cid"
)
type BadBlockCache struct {
badBlocks *lru.ARCCache
}
func NewBadBlockCache() *BadBlockCache {
cache, err := lru.NewARC(build.BadBlockCacheSize)
if err != nil {
panic(err)
}
return &BadBlockCache{
badBlocks: cache,
}
}
func (bts *BadBlockCache) Add(c cid.Cid) {
bts.badBlocks.Add(c, nil)
}
func (bts *BadBlockCache) Has(c cid.Cid) bool {
return bts.badBlocks.Contains(c)
}

View File

@ -277,7 +277,7 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int)
}
oerr = bs.processStatus(req, res)
if oerr != nil {
log.Warnf("BlockSync peer %s response was an error: %s", peers[p].String(), err)
log.Warnf("BlockSync peer %s response was an error: %s", peers[p].String(), oerr)
}
}
return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", oerr)
@ -361,6 +361,10 @@ func bstsToFullTipSet(bts *BSTipSet) (*store.FullTipSet, error) {
for _, mi := range bts.BlsMsgIncludes[i] {
fb.BlsMessages = append(fb.BlsMessages, bts.BlsMessages[mi])
}
for _, mi := range bts.SecpkMsgIncludes[i] {
fb.SecpkMessages = append(fb.SecpkMessages, bts.SecpkMessages[mi])
}
fts.Blocks = append(fts.Blocks, fb)
}

View File

@ -153,7 +153,7 @@ func (h *Handler) accept(ctx context.Context, deal MinerDeal) (func(*MinerDeal),
if deal.Proposal.Payment.ChannelMessage != nil {
log.Info("waiting for channel message to appear on chain")
if _, err := h.full.ChainWaitMsg(ctx, *deal.Proposal.Payment.ChannelMessage); err != nil {
if _, err := h.full.StateWaitMsg(ctx, *deal.Proposal.Payment.ChannelMessage); err != nil {
return nil, xerrors.Errorf("waiting for paych message: %w", err)
}
}

View File

@ -27,16 +27,34 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
// TODO: log error if h below gcconfidence
// revert height-based triggers
for _, tid := range e.htHeights[ts.Height()] {
// don't revert if newH is above this ts
if newH >= ts.Height() {
continue
revert := func(h uint64, ts *types.TipSet) {
for _, tid := range e.htHeights[h] {
// don't revert if newH is above this ts
if newH >= h {
continue
}
err := e.heightTriggers[tid].revert(ts)
if err != nil {
log.Errorf("reverting chain trigger (@H %d): %s", h, err)
}
}
}
revert(ts.Height(), ts)
subh := ts.Height() - 1
for {
cts, err := e.tsc.get(subh)
if err != nil {
return err
}
err := e.heightTriggers[tid].revert(ts)
if err != nil {
log.Errorf("reverting chain trigger (@H %d): %s", ts.Height(), err)
if cts != nil {
break
}
revert(subh, ts)
subh--
}
if err := e.tsc.revert(ts); err != nil {
@ -54,19 +72,44 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
// height triggers
for _, tid := range e.htTriggerHeights[ts.Height()] {
hnd := e.heightTriggers[tid]
triggerH := ts.Height() - uint64(hnd.confidence)
apply := func(h uint64, ts *types.TipSet) error {
for _, tid := range e.htTriggerHeights[h] {
hnd := e.heightTriggers[tid]
triggerH := h - uint64(hnd.confidence)
incTs, err := e.tsc.get(triggerH)
incTs, err := e.tsc.getNonNull(triggerH)
if err != nil {
return err
}
if err := hnd.handle(incTs, h); err != nil {
log.Errorf("chain trigger (@H %d, called @ %d) failed: %s", triggerH, ts.Height(), err)
}
}
return nil
}
if err := apply(ts.Height(), ts); err != nil {
return err
}
subh := ts.Height() - 1
for {
cts, err := e.tsc.get(subh)
if err != nil {
return err
}
if err := hnd.handle(incTs, ts.Height()); err != nil {
log.Errorf("chain trigger (@H %d, called @ %d) failed: %s", triggerH, ts.Height(), err)
if cts != nil {
break
}
if err := apply(subh, ts); err != nil {
return err
}
subh--
}
}
return nil
@ -75,6 +118,8 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
// ChainAt invokes the specified `HeightHandler` when the chain reaches the
// specified height+confidence threshold. If the chain is rolled-back under the
// specified height, `RevertHandler` will be called.
//
// ts passed to handlers is the tipset at the specified, or above, if lower tipsets were null
func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence int, h uint64) error {
e.lk.Lock()
defer e.lk.Unlock()

View File

@ -114,21 +114,28 @@ func (fcs *fakeCS) fakeMsgs(m fakeMsg) cid.Cid {
return c
}
func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid) { // todo: allow msgs
func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid, nulls ...int) { // todo: allow msgs
if fcs.sub == nil {
fcs.t.Fatal("sub not be nil")
}
nullm := map[int]struct{}{}
for _, v := range nulls {
nullm[v] = struct{}{}
}
var revs []*types.TipSet
for i := 0; i < rev; i++ {
ts := fcs.tsc.best()
revs = append(revs, ts)
if _, ok := nullm[int(ts.Height())]; !ok {
revs = append(revs, ts)
require.NoError(fcs.t, fcs.tsc.revert(ts))
}
fcs.h--
require.NoError(fcs.t, fcs.tsc.revert(ts))
}
apps := make([]*types.TipSet, app)
var apps []*types.TipSet
for i := 0; i < app; i++ {
fcs.h++
@ -137,6 +144,10 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid) { // todo: allow
mc = dummyCid
}
if _, ok := nullm[int(fcs.h)]; ok {
continue
}
ts := makeTs(fcs.t, fcs.h, mc)
require.NoError(fcs.t, fcs.tsc.add(ts))
@ -144,7 +155,11 @@ func (fcs *fakeCS) advance(rev, app int, msgs map[int]cid.Cid) { // todo: allow
fcs.blkMsgs[ts.Blocks()[0].Cid()] = mc
}
apps[app-i-1] = ts
apps = append(apps, ts)
}
for i, j := 0, len(apps)-1; i < j; i, j = i+1, j-1 {
apps[i], apps[j] = apps[j], apps[i]
}
fcs.sub(revs, apps)
@ -212,6 +227,79 @@ func TestAt(t *testing.T) {
require.Equal(t, false, reverted)
}
func TestAtNullTrigger(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
var applied bool
var reverted bool
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error {
require.Equal(t, uint64(6), ts.Height())
require.Equal(t, 8, int(curH))
applied = true
return nil
}, func(ts *types.TipSet) error {
reverted = true
return nil
}, 3, 5)
require.NoError(t, err)
fcs.advance(0, 6, nil, 5)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
fcs.advance(0, 3, nil)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
applied = false
}
func TestAtNullConf(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(makeTs(t, 1, dummyCid)))
events := NewEvents(context.Background(), fcs)
var applied bool
var reverted bool
err := events.ChainAt(func(ts *types.TipSet, curH uint64) error {
require.Equal(t, 5, int(ts.Height()))
require.Equal(t, 8, int(curH))
applied = true
return nil
}, func(ts *types.TipSet) error {
reverted = true
return nil
}, 3, 5)
require.NoError(t, err)
fcs.advance(0, 6, nil)
require.Equal(t, false, applied)
require.Equal(t, false, reverted)
fcs.advance(0, 3, nil, 8)
require.Equal(t, true, applied)
require.Equal(t, false, reverted)
applied = false
fcs.advance(7, 1, nil)
require.Equal(t, false, applied)
require.Equal(t, true, reverted)
reverted = false
}
func TestAtStart(t *testing.T) {
fcs := &fakeCS{
t: t,

View File

@ -2,6 +2,7 @@ package events
import (
"context"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-lotus/chain/types"
@ -31,11 +32,26 @@ func newTSCache(cap int, storage tsByHFunc) *tipSetCache {
func (tsc *tipSetCache) add(ts *types.TipSet) error {
if tsc.len > 0 {
if tsc.cache[tsc.start].Height()+1 != ts.Height() {
return xerrors.Errorf("tipSetCache.add: expected new tipset height to be %d, was %d", tsc.cache[tsc.start].Height()+1, ts.Height())
if tsc.cache[tsc.start].Height() >= ts.Height() {
return xerrors.Errorf("tipSetCache.add: expected new tipset height to be at least %d, was %d", tsc.cache[tsc.start].Height()+1, ts.Height())
}
}
nextH := ts.Height()
if tsc.len > 0 {
nextH = tsc.cache[tsc.start].Height() + 1
}
// fill null blocks
for nextH != ts.Height() {
tsc.start = normalModulo(tsc.start+1, len(tsc.cache))
tsc.cache[tsc.start] = nil
if tsc.len < len(tsc.cache) {
tsc.len++
}
nextH++
}
tsc.start = normalModulo(tsc.start+1, len(tsc.cache))
tsc.cache[tsc.start] = ts
if tsc.len < len(tsc.cache) {
@ -56,9 +72,24 @@ func (tsc *tipSetCache) revert(ts *types.TipSet) error {
tsc.cache[tsc.start] = nil
tsc.start = normalModulo(tsc.start-1, len(tsc.cache))
tsc.len--
_ = tsc.revert(nil) // revert null block gap
return nil
}
func (tsc *tipSetCache) getNonNull(height uint64) (*types.TipSet, error) {
for {
ts, err := tsc.get(height)
if err != nil {
return nil, err
}
if ts != nil {
return ts, nil
}
height++
}
}
func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) {
if tsc.len == 0 {
return nil, xerrors.New("tipSetCache.get: cache is empty")
@ -71,7 +102,13 @@ func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) {
}
clen := len(tsc.cache)
tail := tsc.cache[normalModulo(tsc.start-tsc.len+1, clen)]
var tail *types.TipSet
for i := 1; i <= tsc.len; i++ {
tail = tsc.cache[normalModulo(tsc.start-tsc.len+i, clen)]
if tail != nil {
break
}
}
if height < tail.Height() {
log.Warnf("tipSetCache.get: requested tipset not in cache, requesting from storage (h=%d; tail=%d)", height, tail.Height())

View File

@ -2,7 +2,7 @@ package events
import (
"context"
"fmt"
"github.com/stretchr/testify/require"
"testing"
"github.com/filecoin-project/go-lotus/chain/types"
@ -33,8 +33,6 @@ func TestTsCache(t *testing.T) {
}
for i := 0; i < 9000; i++ {
fmt.Printf("i=%d; tl=%d; tcl=%d\n", i, tsc.len, len(tsc.cache))
if i%90 > 60 {
if err := tsc.revert(tsc.best()); err != nil {
t.Fatal(err, "; i:", i)
@ -47,3 +45,65 @@ func TestTsCache(t *testing.T) {
}
}
func TestTsCacheNulls(t *testing.T) {
tsc := newTSCache(50, func(context.Context, uint64, *types.TipSet) (*types.TipSet, error) {
t.Fatal("storage call")
return &types.TipSet{}, nil
})
h := uint64(75)
add := func() {
ts, err := types.NewTipSet([]*types.BlockHeader{{
Height: h,
ParentStateRoot: dummyCid,
Messages: dummyCid,
ParentMessageReceipts: dummyCid,
}})
if err != nil {
t.Fatal(err)
}
if err := tsc.add(ts); err != nil {
t.Fatal(err)
}
h++
}
add()
add()
add()
h += 5
add()
add()
require.Equal(t, h-1, tsc.best().Height())
ts, err := tsc.get(h - 1)
require.NoError(t, err)
require.Equal(t, h-1, ts.Height())
ts, err = tsc.get(h - 2)
require.NoError(t, err)
require.Equal(t, h-2, ts.Height())
ts, err = tsc.get(h - 3)
require.NoError(t, err)
require.Nil(t, ts)
ts, err = tsc.get(h - 8)
require.NoError(t, err)
require.Equal(t, h-8, ts.Height())
require.NoError(t, tsc.revert(tsc.best()))
require.NoError(t, tsc.revert(tsc.best()))
require.Equal(t, h-8, tsc.best().Height())
h += 50
add()
ts, err = tsc.get(h - 1)
require.NoError(t, err)
require.Equal(t, h-1, ts.Height())
}

View File

@ -20,7 +20,6 @@ import (
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/chain/wallet"
"github.com/filecoin-project/go-lotus/lib/vdf"
"github.com/filecoin-project/go-lotus/node/repo"
block "github.com/ipfs/go-block-format"
@ -45,11 +44,13 @@ type ChainGen struct {
sm *stmgr.StateManager
genesis *types.BlockHeader
curTipset *store.FullTipSet
CurTipset *store.FullTipSet
Timestamper func(*types.TipSet, int) uint64
w *wallet.Wallet
miners []address.Address
Miners []address.Address
mworkers []address.Address
receivers []address.Address
banker address.Address
@ -104,7 +105,12 @@ func NewGenerator() (*ChainGen, error) {
return nil, xerrors.Errorf("creating memrepo wallet failed: %w", err)
}
worker, err := w.GenerateKey(types.KTBLS)
worker1, err := w.GenerateKey(types.KTBLS)
if err != nil {
return nil, xerrors.Errorf("failed to generate worker key: %w", err)
}
worker2, err := w.GenerateKey(types.KTBLS)
if err != nil {
return nil, xerrors.Errorf("failed to generate worker key: %w", err)
}
@ -123,14 +129,15 @@ func NewGenerator() (*ChainGen, error) {
}
minercfg := &GenMinerCfg{
Workers: []address.Address{worker, worker},
Owners: []address.Address{worker, worker},
Workers: []address.Address{worker1, worker2},
Owners: []address.Address{worker1, worker2},
PeerIDs: []peer.ID{"peerID1", "peerID2"},
}
genb, err := MakeGenesisBlock(bs, map[address.Address]types.BigInt{
worker: types.FromFil(40000),
banker: types.FromFil(50000),
worker1: types.FromFil(40000),
worker2: types.FromFil(40000),
banker: types.FromFil(50000),
}, minercfg, 100000)
if err != nil {
return nil, xerrors.Errorf("make genesis block failed: %w", err)
@ -159,12 +166,12 @@ func NewGenerator() (*ChainGen, error) {
genesis: genb.Genesis,
w: w,
miners: minercfg.MinerAddrs,
Miners: minercfg.MinerAddrs,
mworkers: minercfg.Workers,
banker: banker,
receivers: receievers,
curTipset: gents,
CurTipset: gents,
r: mr,
lr: lr,
@ -191,8 +198,7 @@ func (cg *ChainGen) GenesisCar() ([]byte, error) {
return out.Bytes(), nil
}
func (cg *ChainGen) nextBlockProof(ctx context.Context, m address.Address, ticks []*types.Ticket) (types.ElectionProof, *types.Ticket, error) {
pts := cg.curTipset.TipSet()
func (cg *ChainGen) nextBlockProof(ctx context.Context, pts *types.TipSet, m address.Address, ticks []*types.Ticket) (types.ElectionProof, *types.Ticket, error) {
var lastTicket *types.Ticket
if len(ticks) == 0 {
@ -201,27 +207,20 @@ func (cg *ChainGen) nextBlockProof(ctx context.Context, m address.Address, ticks
lastTicket = ticks[len(ticks)-1]
}
st := cg.curTipset.TipSet().ParentState()
st := pts.ParentState()
worker, err := stmgr.GetMinerWorker(ctx, cg.sm, st, m)
if err != nil {
return nil, nil, err
}
vrfout, err := ComputeVRF(ctx, cg.w.Sign, worker, lastTicket.VDFResult)
if err != nil {
return nil, nil, err
}
out, proof, err := vdf.Run(vrfout)
vrfout, err := ComputeVRF(ctx, cg.w.Sign, worker, lastTicket.VRFProof)
if err != nil {
return nil, nil, err
}
tick := &types.Ticket{
VRFProof: vrfout,
VDFProof: proof,
VDFResult: out,
VRFProof: vrfout,
}
win, eproof, err := IsRoundWinner(ctx, pts, append(ticks, tick), m, &mca{w: cg.w, sm: cg.sm})
@ -241,19 +240,27 @@ type MinedTipSet struct {
}
func (cg *ChainGen) NextTipSet() (*MinedTipSet, error) {
mts, err := cg.NextTipSetFromMiners(cg.CurTipset.TipSet(), cg.Miners)
if err != nil {
return nil, err
}
cg.CurTipset = mts.TipSet
return mts, nil
}
func (cg *ChainGen) NextTipSetFromMiners(base *types.TipSet, miners []address.Address) (*MinedTipSet, error) {
var blks []*types.FullBlock
ticketSets := make([][]*types.Ticket, len(cg.miners))
ticketSets := make([][]*types.Ticket, len(miners))
msgs, err := cg.getRandomMessages()
if err != nil {
return nil, err
}
base := cg.curTipset.TipSet()
for len(blks) == 0 {
for i, m := range cg.miners {
proof, t, err := cg.nextBlockProof(context.TODO(), m, ticketSets[i])
for i, m := range miners {
proof, t, err := cg.nextBlockProof(context.TODO(), base, m, ticketSets[i])
if err != nil {
return nil, xerrors.Errorf("next block proof: %w", err)
}
@ -265,7 +272,7 @@ func (cg *ChainGen) NextTipSet() (*MinedTipSet, error) {
return nil, xerrors.Errorf("making a block for next tipset failed: %w", err)
}
if err := cg.cs.AddBlock(fblk.Header); err != nil {
if err := cg.cs.PersistBlockHeader(fblk.Header); err != nil {
return nil, xerrors.Errorf("chainstore AddBlock: %w", err)
}
@ -274,17 +281,22 @@ func (cg *ChainGen) NextTipSet() (*MinedTipSet, error) {
}
}
cg.curTipset = store.NewFullTipSet(blks)
fts := store.NewFullTipSet(blks)
return &MinedTipSet{
TipSet: cg.curTipset,
TipSet: fts,
Messages: msgs,
}, nil
}
func (cg *ChainGen) makeBlock(parents *types.TipSet, m address.Address, eproof types.ElectionProof, tickets []*types.Ticket, msgs []*types.SignedMessage) (*types.FullBlock, error) {
ts := parents.MinTimestamp() + (uint64(len(tickets)) * build.BlockDelay)
var ts uint64
if cg.Timestamper != nil {
ts = cg.Timestamper(parents, len(tickets))
} else {
ts = parents.MinTimestamp() + (uint64(len(tickets)) * build.BlockDelay)
}
fblk, err := MinerCreateBlock(context.TODO(), cg.sm, cg.w, m, parents, tickets, eproof, msgs, ts)
if err != nil {
@ -294,6 +306,18 @@ func (cg *ChainGen) makeBlock(parents *types.TipSet, m address.Address, eproof t
return fblk, err
}
// This function is awkward. It's used to deal with messages made when
// simulating forks
func (cg *ChainGen) ResyncBankerNonce(ts *types.TipSet) error {
act, err := cg.sm.GetActor(cg.banker, ts)
if err != nil {
return err
}
cg.bankerNonce = act.Nonce
return nil
}
func (cg *ChainGen) getRandomMessages() ([]*types.SignedMessage, error) {
msgs := make([]*types.SignedMessage, cg.msgsPerBlock)
for m := range msgs {
@ -311,12 +335,7 @@ func (cg *ChainGen) getRandomMessages() ([]*types.SignedMessage, error) {
GasPrice: types.NewInt(0),
}
unsigned, err := msg.Serialize()
if err != nil {
return nil, err
}
sig, err := cg.w.Sign(context.TODO(), cg.banker, unsigned)
sig, err := cg.w.Sign(context.TODO(), cg.banker, msg.Cid().Bytes())
if err != nil {
return nil, err
}
@ -325,10 +344,6 @@ func (cg *ChainGen) getRandomMessages() ([]*types.SignedMessage, error) {
Message: msg,
Signature: *sig,
}
if _, err := cg.cs.PutMessage(msgs[m]); err != nil {
return nil, err
}
}
return msgs, nil

View File

@ -60,8 +60,14 @@ func MinerCreateBlock(ctx context.Context, sm *stmgr.StateManager, w *wallet.Wal
blsMsgCids = append(blsMsgCids, c)
} else {
secpkMsgCids = append(secpkMsgCids, msg.Cid())
c, err := sm.ChainStore().PutMessage(msg)
if err != nil {
return nil, err
}
secpkMsgCids = append(secpkMsgCids, c)
secpkMessages = append(secpkMessages, msg)
}
}

View File

@ -202,7 +202,7 @@ func SetupStorageMiners(ctx context.Context, cs *store.ChainStore, sroot cid.Cid
// TODO: hardcoding 7000000 here is a little fragile, it changes any
// time anyone changes the initial account allocations
rval, err := doExecValue(ctx, vm, actors.StorageMarketAddress, owner, types.FromFil(6000), actors.SMAMethods.CreateStorageMiner, params)
rval, err := doExecValue(ctx, vm, actors.StorageMarketAddress, owner, types.FromFil(6500), actors.SMAMethods.CreateStorageMiner, params)
if err != nil {
return cid.Undef, xerrors.Errorf("failed to create genesis miner: %w", err)
}
@ -329,9 +329,7 @@ func MakeGenesisBlock(bs bstore.Blockstore, balances map[address.Address]types.B
log.Infof("Empty Genesis root: %s", emptyroot)
genesisticket := &types.Ticket{
VRFProof: []byte("vrf proof"),
VDFResult: []byte("i am a vdf result"),
VDFProof: []byte("vdf proof"),
VRFProof: []byte("vrf proof"),
}
b := &types.BlockHeader{

View File

@ -1,7 +1,6 @@
package chain
import (
"encoding/base64"
"sync"
pubsub "github.com/libp2p/go-libp2p-pubsub"
@ -82,14 +81,9 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
}
func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
data, err := m.Message.Serialize()
if err != nil {
return err
}
log.Debugf("mpooladd: %s %s", m.Message.From, m.Message.Nonce)
log.Infof("mpooladd: %d %s", m.Message.Nonce, base64.StdEncoding.EncodeToString(data))
if err := m.Signature.Verify(m.Message.From, data); err != nil {
if err := m.Signature.Verify(m.Message.From, m.Message.Cid().Bytes()); err != nil {
log.Warnf("mpooladd signature verification failed: %s", err)
return err
}

View File

@ -2,6 +2,7 @@ package stmgr
import (
"context"
"fmt"
"sync"
amt "github.com/filecoin-project/go-amt-ipld"
@ -14,6 +15,7 @@ import (
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
bls "github.com/filecoin-project/go-bls-sigs"
"github.com/ipfs/go-cid"
hamt "github.com/ipfs/go-hamt-ipld"
logging "github.com/ipfs/go-log"
@ -233,3 +235,191 @@ func (sm *StateManager) LoadActorState(ctx context.Context, a address.Address, o
return act, nil
}
func (sm *StateManager) ResolveToKeyAddress(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
switch addr.Protocol() {
case address.BLS, address.SECP256K1:
return addr, nil
case address.Actor:
return address.Undef, xerrors.New("cannot resolve actor address to key address")
default:
}
if ts == nil {
ts = sm.cs.GetHeaviestTipSet()
}
st, _, err := sm.TipSetState(ts)
if err != nil {
return address.Undef, xerrors.Errorf("resolve address failed to get tipset state: %w", err)
}
cst := hamt.CSTFromBstore(sm.cs.Blockstore())
tree, err := state.LoadStateTree(cst, st)
if err != nil {
return address.Undef, xerrors.Errorf("failed to load state tree")
}
return vm.ResolveToKeyAddr(tree, cst, addr)
}
func (sm *StateManager) GetBlsPublicKey(ctx context.Context, addr address.Address, ts *types.TipSet) (pubk bls.PublicKey, err error) {
kaddr, err := sm.ResolveToKeyAddress(ctx, addr, ts)
if err != nil {
return pubk, xerrors.Errorf("failed to resolve address to key address: %w", err)
}
if kaddr.Protocol() != address.BLS {
return pubk, xerrors.Errorf("address must be BLS address to load bls public key")
}
copy(pubk[:], kaddr.Payload())
return pubk, nil
}
func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid) (*types.TipSet, *types.MessageReceipt, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
msg, err := sm.cs.GetCMessage(mcid)
if err != nil {
return nil, nil, fmt.Errorf("failed to load message: %w", err)
}
tsub := sm.cs.SubHeadChanges(ctx)
head, ok := <-tsub
if !ok {
return nil, nil, fmt.Errorf("SubHeadChanges stream was invalid")
}
if len(head) != 1 {
return nil, nil, fmt.Errorf("SubHeadChanges first entry should have been one item")
}
if head[0].Type != store.HCCurrent {
return nil, nil, fmt.Errorf("expected current head on SHC stream (got %s)", head[0].Type)
}
r, err := sm.tipsetExecutedMessage(head[0].Val, mcid)
if err != nil {
return nil, nil, err
}
if r != nil {
return head[0].Val, r, nil
}
var backTs *types.TipSet
var backRcp *types.MessageReceipt
backSearchWait := make(chan struct{})
go func() {
fts, r, err := sm.searchBackForMsg(ctx, head[0].Val, msg)
if err != nil {
log.Warnf("failed to look back through chain for message: %w", err)
return
}
backTs = fts
backRcp = r
close(backSearchWait)
}()
for {
select {
case notif, ok := <-tsub:
if !ok {
return nil, nil, ctx.Err()
}
for _, val := range notif {
switch val.Type {
case store.HCRevert:
continue
case store.HCApply:
r, err := sm.tipsetExecutedMessage(val.Val, mcid)
if err != nil {
return nil, nil, err
}
if r != nil {
return val.Val, r, nil
}
}
}
case <-backSearchWait:
if backTs != nil {
return backTs, backRcp, nil
}
backSearchWait = nil
case <-ctx.Done():
return nil, nil, ctx.Err()
}
}
}
func (sm *StateManager) searchBackForMsg(ctx context.Context, from *types.TipSet, m store.ChainMsg) (*types.TipSet, *types.MessageReceipt, error) {
cur := from
for {
if cur.Height() == 0 {
// it ain't here!
return nil, nil, nil
}
select {
case <-ctx.Done():
return nil, nil, nil
default:
}
act, err := sm.GetActor(m.VMMessage().From, cur)
if err != nil {
return nil, nil, err
}
if act.Nonce < m.VMMessage().Nonce {
// nonce on chain is before message nonce we're looking for, its
// not going to be here
return nil, nil, nil
}
ts, err := sm.cs.LoadTipSet(cur.Parents())
if err != nil {
return nil, nil, fmt.Errorf("failed to load tipset during msg wait searchback: %w", err)
}
r, err := sm.tipsetExecutedMessage(ts, m.Cid())
if err != nil {
return nil, nil, fmt.Errorf("checking for message execution during lookback: %w", err)
}
if r != nil {
return ts, r, nil
}
cur = ts
}
}
func (sm *StateManager) tipsetExecutedMessage(ts *types.TipSet, msg cid.Cid) (*types.MessageReceipt, error) {
// The genesis block did not execute any messages
if ts.Height() == 0 {
return nil, nil
}
pts, err := sm.cs.LoadTipSet(ts.Parents())
if err != nil {
return nil, err
}
cm, err := sm.cs.MessagesForTipset(pts)
if err != nil {
return nil, err
}
for i, m := range cm {
if m.Cid() == msg {
return sm.cs.GetParentReceipt(ts.Blocks()[0], i)
}
}
return nil, nil
}

View File

@ -10,6 +10,7 @@ import (
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/state"
"go.uber.org/zap"
amt "github.com/filecoin-project/go-amt-ipld"
"github.com/filecoin-project/go-lotus/chain/types"
@ -173,31 +174,30 @@ func (cs *ChainStore) SubscribeHeadChanges(f func(rev, app []*types.TipSet) erro
}
func (cs *ChainStore) SetGenesis(b *types.BlockHeader) error {
fts := &FullTipSet{
Blocks: []*types.FullBlock{
{Header: b},
},
ts, err := types.NewTipSet([]*types.BlockHeader{b})
if err != nil {
return err
}
if err := cs.PutTipSet(fts); err != nil {
if err := cs.PutTipSet(ts); err != nil {
return err
}
return cs.ds.Put(dstore.NewKey("0"), b.Cid().Bytes())
}
func (cs *ChainStore) PutTipSet(ts *FullTipSet) error {
for _, b := range ts.Blocks {
if err := cs.persistBlock(b); err != nil {
func (cs *ChainStore) PutTipSet(ts *types.TipSet) error {
for _, b := range ts.Blocks() {
if err := cs.PersistBlockHeader(b); err != nil {
return err
}
}
expanded, err := cs.expandTipset(ts.TipSet().Blocks()[0])
expanded, err := cs.expandTipset(ts.Blocks()[0])
if err != nil {
return xerrors.Errorf("errored while expanding tipset: %w", err)
}
log.Debugf("expanded %s into %s\n", ts.TipSet().Cids(), expanded.Cids())
log.Debugf("expanded %s into %s\n", ts.Cids(), expanded.Cids())
if err := cs.MaybeTakeHeavierTipSet(expanded); err != nil {
return errors.Wrap(err, "MaybeTakeHeavierTipSet failed in PutTipSet")
@ -212,31 +212,45 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ts *types.TipSet) error {
// TODO: don't do this for initial sync. Now that we don't have a
// difference between 'bootstrap sync' and 'caught up' sync, we need
// some other heuristic.
if cs.heaviest != nil {
revert, apply, err := cs.ReorgOps(cs.heaviest, ts)
if err != nil {
return errors.Wrap(err, "computing reorg ops failed")
}
for _, hcf := range cs.headChangeNotifs {
if err := hcf(revert, apply); err != nil {
return errors.Wrap(err, "head change func errored (BAD)")
}
}
} else {
log.Warn("no heaviest tipset found, using %s", ts.Cids())
}
log.Debugf("New heaviest tipset! %s", ts.Cids())
cs.heaviest = ts
if err := cs.writeHead(ts); err != nil {
log.Errorf("failed to write chain head: %s", err)
return nil
}
return cs.takeHeaviestTipSet(ts)
}
return nil
}
func (cs *ChainStore) takeHeaviestTipSet(ts *types.TipSet) error {
if cs.heaviest != nil {
revert, apply, err := cs.ReorgOps(cs.heaviest, ts)
if err != nil {
return errors.Wrap(err, "computing reorg ops failed")
}
for _, hcf := range cs.headChangeNotifs {
if err := hcf(revert, apply); err != nil {
return errors.Wrap(err, "head change func errored (BAD)")
}
}
} else {
log.Warn("no heaviest tipset found, using %s", ts.Cids())
}
log.Debugf("New heaviest tipset! %s", ts.Cids())
cs.heaviest = ts
if err := cs.writeHead(ts); err != nil {
log.Errorf("failed to write chain head: %s", err)
return nil
}
return nil
}
// SetHead sets the chainstores current 'best' head node.
// This should only be called if something is broken and needs fixing
func (cs *ChainStore) SetHead(ts *types.TipSet) error {
cs.heaviestLk.Lock()
defer cs.heaviestLk.Unlock()
return cs.takeHeaviestTipSet(ts)
}
func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
for _, c := range ts.Cids() {
has, err := cs.bs.Has(c)
@ -341,7 +355,7 @@ func (cs *ChainStore) GetHeaviestTipSet() *types.TipSet {
return cs.heaviest
}
func (cs *ChainStore) addToTipSetTracker(b *types.BlockHeader) error {
func (cs *ChainStore) AddToTipSetTracker(b *types.BlockHeader) error {
cs.tstLk.Lock()
defer cs.tstLk.Unlock()
@ -366,31 +380,9 @@ func (cs *ChainStore) PersistBlockHeader(b *types.BlockHeader) error {
return err
}
if err := cs.addToTipSetTracker(b); err != nil {
return xerrors.Errorf("failed to insert new block (%s) into tipset tracker: %w", b.Cid(), err)
}
return cs.bs.Put(sb)
}
func (cs *ChainStore) persistBlock(b *types.FullBlock) error {
if err := cs.PersistBlockHeader(b.Header); err != nil {
return err
}
for _, m := range b.BlsMessages {
if _, err := cs.PutMessage(m); err != nil {
return err
}
}
for _, m := range b.SecpkMessages {
if _, err := cs.PutMessage(m); err != nil {
return err
}
}
return nil
}
type storable interface {
ToStorageBlock() (block.Block, error)
}
@ -481,6 +473,15 @@ func (cs *ChainStore) GetGenesis() (*types.BlockHeader, error) {
return types.DecodeBlock(genb.RawData())
}
func (cs *ChainStore) GetCMessage(c cid.Cid) (ChainMsg, error) {
m, err := cs.GetMessage(c)
if err == nil {
return m, nil
}
return cs.GetSignedMessage(c)
}
func (cs *ChainStore) GetMessage(c cid.Cid) (*types.Message, error) {
sb, err := cs.bs.Get(c)
if err != nil {
@ -660,71 +661,6 @@ func (cs *ChainStore) LoadSignedMessagesFromCids(cids []cid.Cid) ([]*types.Signe
return msgs, nil
}
func (cs *ChainStore) WaitForMessage(ctx context.Context, mcid cid.Cid) (*types.MessageReceipt, error) {
tsub := cs.SubHeadChanges(ctx)
head := cs.GetHeaviestTipSet()
r, err := cs.tipsetExecutedMessage(head, mcid)
if err != nil {
return nil, err
}
if r != nil {
return r, nil
}
for {
select {
case notif, ok := <-tsub:
if !ok {
return nil, ctx.Err()
}
for _, val := range notif {
switch val.Type {
case HCRevert:
continue
case HCApply:
r, err := cs.tipsetExecutedMessage(val.Val, mcid)
if err != nil {
return nil, err
}
if r != nil {
return r, nil
}
}
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
func (cs *ChainStore) tipsetExecutedMessage(ts *types.TipSet, msg cid.Cid) (*types.MessageReceipt, error) {
// The genesis block did not execute any messages
if ts.Height() == 0 {
return nil, nil
}
pts, err := cs.LoadTipSet(ts.Parents())
if err != nil {
return nil, err
}
cm, err := cs.MessagesForTipset(pts)
if err != nil {
return nil, err
}
for i, m := range cm {
if m.Cid() == msg {
return cs.GetParentReceipt(ts.Blocks()[0], i)
}
}
return nil, nil
}
func (cs *ChainStore) Blockstore() blockstore.Blockstore {
return cs.bs
}
@ -757,11 +693,11 @@ func (cs *ChainStore) GetRandomness(ctx context.Context, blks []cid.Cid, tickets
}
lt := int64(len(tickets))
if lb < lt {
log.Warn("self sampling randomness. this should be extremely rare, if you see this often it may be a bug")
log.Desugar().Warn("self sampling randomness. this should be extremely rare, if you see this often it may be a bug", zap.Stack("call-stack"))
t := tickets[lt-(1+lb)]
return t.VDFResult, nil
return t.VRFProof, nil
}
nv := lb - lt
@ -776,7 +712,7 @@ func (cs *ChainStore) GetRandomness(ctx context.Context, blks []cid.Cid, tickets
lt := int64(len(mtb.Tickets))
if nv < lt {
t := mtb.Tickets[lt-(1+nv)]
return t.VDFResult, nil
return t.VRFProof, nil
}
nv -= lt
@ -787,7 +723,7 @@ func (cs *ChainStore) GetRandomness(ctx context.Context, blks []cid.Cid, tickets
t := mtb.Tickets[0]
rval := t.VDFResult
rval := t.VRFProof
for i := int64(0); i < nv; i++ {
h := sha256.Sum256(rval)
rval = h[:]

View File

@ -44,7 +44,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
return
}
log.Debug("inform new block over pubsub")
log.Infof("inform new block over pubsub: %s from %s", blk.Header.Cid(), msg.GetFrom())
s.InformNewBlock(msg.GetFrom(), &types.FullBlock{
Header: blk.Header,
BlsMessages: bmsgs,

View File

@ -6,6 +6,7 @@ import (
"sync"
"time"
"github.com/filecoin-project/go-bls-sigs"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/actors"
@ -14,7 +15,6 @@ import (
"github.com/filecoin-project/go-lotus/chain/stmgr"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/vdf"
amt "github.com/filecoin-project/go-amt-ipld"
"github.com/ipfs/go-cid"
@ -23,7 +23,6 @@ import (
bstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/pkg/errors"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
)
@ -45,7 +44,7 @@ type Syncer struct {
syncLock sync.Mutex
// TipSets known to be invalid
bad BadTipSetCache
bad *BadBlockCache
// handle to the block sync service
Bsync *BlockSync
@ -72,6 +71,7 @@ func NewSyncer(sm *stmgr.StateManager, bsync *BlockSync, self peer.ID) (*Syncer,
}
return &Syncer{
bad: NewBadBlockCache(),
Genesis: gent,
Bsync: bsync,
peerHeads: make(map[peer.ID]*types.TipSet),
@ -81,10 +81,6 @@ func NewSyncer(sm *stmgr.StateManager, bsync *BlockSync, self peer.ID) (*Syncer,
}, nil
}
type BadTipSetCache struct {
badBlocks map[cid.Cid]struct{}
}
const BootstrapPeerThreshold = 1
// InformNewHead informs the syncer about a new potential tipset
@ -94,12 +90,20 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
if fts == nil {
panic("bad")
}
for _, b := range fts.Blocks {
if err := syncer.validateMsgMeta(b); err != nil {
log.Warnf("invalid block received: %s", err)
return
}
}
if from == syncer.self {
// TODO: this is kindof a hack...
log.Debug("got block from ourselves")
log.Info("got block from ourselves")
if err := syncer.Sync(fts); err != nil {
log.Errorf("failed to sync our own block: %+v", err)
if err := syncer.Sync(fts.TipSet()); err != nil {
log.Errorf("failed to sync our own block %s: %+v", fts.TipSet().Cids(), err)
}
return
@ -110,12 +114,37 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
syncer.Bsync.AddPeer(from)
go func() {
if err := syncer.Sync(fts); err != nil {
if err := syncer.Sync(fts.TipSet()); err != nil {
log.Errorf("sync error: %+v", err)
}
}()
}
func (syncer *Syncer) validateMsgMeta(fblk *types.FullBlock) error {
var bcids, scids []cbg.CBORMarshaler
for _, m := range fblk.BlsMessages {
c := cbg.CborCid(m.Cid())
bcids = append(bcids, &c)
}
for _, m := range fblk.SecpkMessages {
c := cbg.CborCid(m.Cid())
scids = append(scids, &c)
}
bs := amt.WrapBlockstore(syncer.store.Blockstore())
smroot, err := computeMsgMeta(bs, bcids, scids)
if err != nil {
return xerrors.Errorf("validating msgmeta, compute failed: %w", err)
}
if fblk.Header.Messages != smroot {
return xerrors.Errorf("messages in full block did not match msgmeta root in header (%s != %s)", fblk.Header.Messages, smroot)
}
return nil
}
func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) {
// TODO: search for other blocks that could form a tipset with this block
// and then send that tipset to InformNewHead
@ -171,11 +200,6 @@ func zipTipSetAndMessages(bs amt.Blocks, ts *types.TipSet, allbmsgs []*types.Mes
smsgCids = append(smsgCids, &c)
}
smroot, err := amt.FromArray(bs, smsgCids)
if err != nil {
return nil, err
}
var bmsgs []*types.Message
var bmsgCids []cbg.CBORMarshaler
for _, m := range bmi[bi] {
@ -184,15 +208,7 @@ func zipTipSetAndMessages(bs amt.Blocks, ts *types.TipSet, allbmsgs []*types.Mes
bmsgCids = append(bmsgCids, &c)
}
bmroot, err := amt.FromArray(bs, bmsgCids)
if err != nil {
return nil, err
}
mrcid, err := bs.Put(&types.MsgMeta{
BlsMessages: bmroot,
SecpkMessages: smroot,
})
mrcid, err := computeMsgMeta(bs, bmsgCids, smsgCids)
if err != nil {
return nil, err
}
@ -213,6 +229,28 @@ func zipTipSetAndMessages(bs amt.Blocks, ts *types.TipSet, allbmsgs []*types.Mes
return fts, nil
}
func computeMsgMeta(bs amt.Blocks, bmsgCids, smsgCids []cbg.CBORMarshaler) (cid.Cid, error) {
bmroot, err := amt.FromArray(bs, bmsgCids)
if err != nil {
return cid.Undef, err
}
smroot, err := amt.FromArray(bs, smsgCids)
if err != nil {
return cid.Undef, err
}
mrcid, err := bs.Put(&types.MsgMeta{
BlsMessages: bmroot,
SecpkMessages: smroot,
})
if err != nil {
return cid.Undef, xerrors.Errorf("failed to put msgmeta: %w", err)
}
return mrcid, nil
}
func (syncer *Syncer) selectHead(heads map[peer.ID]*types.TipSet) (*types.TipSet, error) {
var headsArr []*types.TipSet
for _, ts := range heads {
@ -289,21 +327,22 @@ func (syncer *Syncer) tryLoadFullTipSet(cids []cid.Cid) (*store.FullTipSet, erro
return fts, nil
}
func (syncer *Syncer) Sync(maybeHead *store.FullTipSet) error {
func (syncer *Syncer) Sync(maybeHead *types.TipSet) error {
ctx := context.TODO()
syncer.syncLock.Lock()
defer syncer.syncLock.Unlock()
ts := maybeHead.TipSet()
if syncer.Genesis.Equals(ts) || syncer.store.GetHeaviestTipSet().Equals(ts) {
if syncer.Genesis.Equals(maybeHead) || syncer.store.GetHeaviestTipSet().Equals(maybeHead) {
return nil
}
if err := syncer.collectChain(maybeHead); err != nil {
if err := syncer.collectChain(ctx, maybeHead); err != nil {
return xerrors.Errorf("collectChain failed: %w", err)
}
if err := syncer.store.PutTipSet(maybeHead); err != nil {
return errors.Wrap(err, "failed to put synced tipset to chainstore")
return xerrors.Errorf("failed to put synced tipset to chainstore: %w", err)
}
return nil
@ -317,8 +356,13 @@ func (syncer *Syncer) ValidateTipSet(ctx context.Context, fts *store.FullTipSet)
for _, b := range fts.Blocks {
if err := syncer.ValidateBlock(ctx, b); err != nil {
syncer.bad.Add(b.Cid())
return xerrors.Errorf("validating block %s: %w", b.Cid(), err)
}
if err := syncer.sm.ChainStore().AddToTipSetTracker(b.Header); err != nil {
return xerrors.Errorf("failed to add validated header to tipset tracker: %w", err)
}
}
return nil
}
@ -363,15 +407,11 @@ func (syncer *Syncer) validateTickets(ctx context.Context, mworker address.Addre
Data: next.VRFProof,
}
if err := sig.Verify(mworker, cur.VDFResult); err != nil {
// TODO: ticket signatures should also include miner address
if err := sig.Verify(mworker, cur.VRFProof); err != nil {
return xerrors.Errorf("invalid ticket, VRFProof invalid: %w", err)
}
// now verify the VDF
if err := vdf.Verify(next.VRFProof, next.VDFResult, next.VDFProof); err != nil {
return xerrors.Errorf("ticket %d had invalid VDF: %w", err)
}
cur = next
}
@ -384,7 +424,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
baseTs, err := syncer.store.LoadTipSet(h.Parents)
if err != nil {
return xerrors.Errorf("load tipset failed: %w", err)
return xerrors.Errorf("load parent tipset failed (%s): %w", h.Parents, err)
}
stateroot, precp, err := syncer.sm.TipSetState(baseTs)
@ -464,7 +504,7 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
}
if nonces[m.From] != m.Nonce {
return xerrors.Errorf("wrong nonce")
return xerrors.Errorf("wrong nonce (exp: %d, got: %d)", nonces[m.From], m.Nonce)
}
nonces[m.From]++
@ -476,22 +516,92 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
return nil
}
bs := amt.WrapBlockstore(syncer.store.Blockstore())
var blsCids []cbg.CBORMarshaler
var sigCids []cid.Cid // this is what we get for people not wanting the marshalcbor method on the cid type
var pubks []bls.PublicKey
for i, m := range b.BlsMessages {
if err := checkMsg(m); err != nil {
xerrors.Errorf("block had invalid bls message at index %d: %w", i, err)
return xerrors.Errorf("block had invalid bls message at index %d: %w", i, err)
}
sigCids = append(sigCids, m.Cid())
c := cbg.CborCid(m.Cid())
blsCids = append(blsCids, &c)
pubk, err := syncer.sm.GetBlsPublicKey(ctx, m.From, baseTs)
if err != nil {
return xerrors.Errorf("failed to load bls public to validate block: %w", err)
}
pubks = append(pubks, pubk)
}
if err := syncer.verifyBlsAggregate(h.BLSAggregate, sigCids, pubks); err != nil {
return xerrors.Errorf("bls aggregate signature was invalid: %w", err)
}
var secpkCids []cbg.CBORMarshaler
for i, m := range b.SecpkMessages {
if err := checkMsg(&m.Message); err != nil {
xerrors.Errorf("block had invalid secpk message at index %d: %w", i, err)
return xerrors.Errorf("block had invalid secpk message at index %d: %w", i, err)
}
kaddr, err := syncer.sm.ResolveToKeyAddress(ctx, m.Message.From, baseTs)
if err != nil {
return xerrors.Errorf("failed to resolve key addr: %w", err)
}
if err := m.Signature.Verify(kaddr, m.Message.Cid().Bytes()); err != nil {
return xerrors.Errorf("secpk message %s has invalid signature: %w", m.Cid(), err)
}
c := cbg.CborCid(m.Cid())
secpkCids = append(secpkCids, &c)
}
bmroot, err := amt.FromArray(bs, blsCids)
if err != nil {
return xerrors.Errorf("failed to build amt from bls msg cids: %w", err)
}
smroot, err := amt.FromArray(bs, secpkCids)
if err != nil {
return xerrors.Errorf("failed to build amt from bls msg cids: %w", err)
}
mrcid, err := bs.Put(&types.MsgMeta{
BlsMessages: bmroot,
SecpkMessages: smroot,
})
if err != nil {
return err
}
if h.Messages != mrcid {
return fmt.Errorf("messages didnt match message root in header")
}
return nil
}
func (syncer *Syncer) collectHeaders(from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
func (syncer *Syncer) verifyBlsAggregate(sig types.Signature, msgs []cid.Cid, pubks []bls.PublicKey) error {
var digests []bls.Digest
for _, c := range msgs {
digests = append(digests, bls.Hash(bls.Message(c.Bytes())))
}
var bsig bls.Signature
copy(bsig[:], sig.Data)
if !bls.Verify(bsig, digests, pubks) {
return xerrors.New("bls aggregate signature failed to verify")
}
return nil
}
func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
blockSet := []*types.TipSet{from}
at := from.Parents()
@ -502,6 +612,12 @@ func (syncer *Syncer) collectHeaders(from *types.TipSet, to *types.TipSet) ([]*t
// If, for some reason, we have a suffix of the chain locally, handle that here
for blockSet[len(blockSet)-1].Height() > untilHeight {
log.Warn("syncing local: ", at)
for _, bc := range at {
if syncer.bad.Has(bc) {
return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s)", from.Cids(), bc)
}
}
ts, err := syncer.store.LoadTipSet(at)
if err != nil {
if xerrors.Is(err, bstore.ErrNotFound) {
@ -543,6 +659,11 @@ loop:
if b.Height() < untilHeight {
break loop
}
for _, bc := range b.Cids() {
if syncer.bad.Has(bc) {
return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s)", from.Cids(), bc)
}
}
blockSet = append(blockSet, b)
}
@ -558,15 +679,47 @@ loop:
return blockSet, nil
}
// TODO: handle the case where we are on a fork and its not a simple fast forward
// need to walk back to either a common ancestor, or until we hit the fork length threshold.
return nil, xerrors.Errorf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height())
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height())
fork, err := syncer.syncFork(ctx, last, to)
if err != nil {
return nil, xerrors.Errorf("failed to sync fork: %w", err)
}
blockSet = append(blockSet, fork...)
}
return blockSet, nil
}
func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), build.ForkLengthThreshold)
if err != nil {
return nil, err
}
nts, err := syncer.store.LoadTipSet(to.Parents())
if err != nil {
return nil, xerrors.Errorf("failed to load next local tipset: %w", err)
}
for cur := 0; cur < len(tips); {
if nts.Equals(tips[cur]) {
return tips[:cur+1], nil
}
if nts.Height() < tips[cur].Height() {
cur++
} else {
nts, err = syncer.store.LoadTipSet(nts.Parents())
if err != nil {
return nil, xerrors.Errorf("loading next local tipset: %w", err)
}
}
}
return nil, xerrors.Errorf("fork was longer than our threshold")
}
func (syncer *Syncer) syncMessagesAndCheckState(headers []*types.TipSet) error {
syncer.syncState.SetHeight(0)
return syncer.iterFullTipsets(headers, func(fts *store.FullTipSet) error {
@ -643,7 +796,6 @@ func (syncer *Syncer) iterFullTipsets(headers []*types.TipSet, cb func(*store.Fu
return xerrors.Errorf("message processing failed: %w", err)
}
}
}
return nil
@ -671,14 +823,18 @@ func persistMessages(bs bstore.Blockstore, bst *BSTipSet) error {
return nil
}
func (syncer *Syncer) collectChain(fts *store.FullTipSet) error {
syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), fts.TipSet())
func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error {
syncer.syncState.Init(syncer.store.GetHeaviestTipSet(), ts)
headers, err := syncer.collectHeaders(fts.TipSet(), syncer.store.GetHeaviestTipSet())
headers, err := syncer.collectHeaders(ctx, ts, syncer.store.GetHeaviestTipSet())
if err != nil {
return err
}
if !headers[0].Equals(ts) {
log.Errorf("collectChain headers[0] should be equal to sync target. Its not: %s != %s", headers[0].Cids(), ts.Cids())
}
syncer.syncState.SetStage(api.StagePersistHeaders)
for _, ts := range headers {

View File

@ -4,16 +4,20 @@ import (
"context"
"fmt"
"testing"
"time"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/gen"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/node"
"github.com/filecoin-project/go-lotus/node/impl"
"github.com/filecoin-project/go-lotus/node/modules"
"github.com/filecoin-project/go-lotus/node/repo"
)
@ -31,7 +35,6 @@ func (tu *syncTestUtil) repoWithChain(t testing.TB, h int) (repo.Repo, []byte, [
ts := mts.TipSet.TipSet()
fmt.Printf("tipset at H:%d: %s\n", ts.Height(), ts.Cids())
}
r, err := tu.g.YieldRepo()
@ -91,17 +94,72 @@ func (tu *syncTestUtil) Shutdown() {
tu.cancel()
}
func (tu *syncTestUtil) mineNewBlock(src int) {
mts, err := tu.g.NextTipSet()
func (tu *syncTestUtil) pushFtsAndWait(to int, fts *store.FullTipSet, wait bool) {
// TODO: would be great if we could pass a whole tipset here...
for _, fb := range fts.Blocks {
var b types.BlockMsg
// -1 to match block.Height
b.Header = fb.Header
for _, msg := range fb.SecpkMessages {
c, err := tu.nds[to].(*impl.FullNodeAPI).ChainAPI.Chain.PutMessage(msg)
require.NoError(tu.t, err)
b.SecpkMessages = append(b.SecpkMessages, c)
}
for _, msg := range fb.BlsMessages {
c, err := tu.nds[to].(*impl.FullNodeAPI).ChainAPI.Chain.PutMessage(msg)
require.NoError(tu.t, err)
b.BlsMessages = append(b.BlsMessages, c)
}
require.NoError(tu.t, tu.nds[to].ChainSubmitBlock(tu.ctx, &b))
}
if wait {
start := time.Now()
h, err := tu.nds[to].ChainHead(tu.ctx)
require.NoError(tu.t, err)
for !h.Equals(fts.TipSet()) {
time.Sleep(time.Millisecond * 50)
h, err = tu.nds[to].ChainHead(tu.ctx)
require.NoError(tu.t, err)
if time.Since(start) > time.Second*10 {
tu.t.Fatal("took too long waiting for block to be accepted")
}
}
}
}
func (tu *syncTestUtil) mineOnBlock(blk *store.FullTipSet, src int, miners []int, wait bool) *store.FullTipSet {
if miners == nil {
for i := range tu.g.Miners {
miners = append(miners, i)
}
}
var maddrs []address.Address
for _, i := range miners {
maddrs = append(maddrs, tu.g.Miners[i])
}
fmt.Println("Miner mining block: ", maddrs)
mts, err := tu.g.NextTipSetFromMiners(blk.TipSet(), maddrs)
require.NoError(tu.t, err)
for _, msg := range mts.Messages {
require.NoError(tu.t, tu.nds[src].MpoolPush(context.TODO(), msg))
}
tu.pushFtsAndWait(src, mts.TipSet, wait)
for _, fblk := range mts.TipSet.Blocks {
require.NoError(tu.t, tu.nds[src].ChainSubmitBlock(context.TODO(), fblkToBlkMsg(fblk)))
}
return mts.TipSet
}
func (tu *syncTestUtil) mineNewBlock(src int, miners []int) {
mts := tu.mineOnBlock(tu.g.CurTipset, src, miners, true)
tu.g.CurTipset = mts
}
func fblkToBlkMsg(fb *types.FullBlock) *types.BlockMsg {
@ -137,6 +195,12 @@ func (tu *syncTestUtil) addSourceNode(gen int) {
)
require.NoError(tu.t, err)
lastTs := blocks[len(blocks)-1].Blocks
for _, lastB := range lastTs {
err = out.(*impl.FullNodeAPI).ChainAPI.Chain.AddBlock(lastB.Header)
require.NoError(tu.t, err)
}
tu.genesis = genesis
tu.blocks = blocks
tu.nds = append(tu.nds, out) // always at 0
@ -164,6 +228,13 @@ func (tu *syncTestUtil) addClientNode() int {
return len(tu.nds) - 1
}
func (tu *syncTestUtil) pid(n int) peer.ID {
nal, err := tu.nds[n].NetAddrsListen(tu.ctx)
require.NoError(tu.t, err)
return nal.ID
}
func (tu *syncTestUtil) connect(from, to int) {
toPI, err := tu.nds[to].NetAddrsListen(tu.ctx)
require.NoError(tu.t, err)
@ -172,6 +243,14 @@ func (tu *syncTestUtil) connect(from, to int) {
require.NoError(tu.t, err)
}
func (tu *syncTestUtil) disconnect(from, to int) {
toPI, err := tu.nds[to].NetAddrsListen(tu.ctx)
require.NoError(tu.t, err)
err = tu.nds[from].NetDisconnect(tu.ctx, toPI.ID)
require.NoError(tu.t, err)
}
func (tu *syncTestUtil) checkHeight(name string, n int, h int) {
b, err := tu.nds[n].ChainHead(tu.ctx)
require.NoError(tu.t, err)
@ -209,14 +288,18 @@ func (tu *syncTestUtil) compareSourceState(with int) {
}
func (tu *syncTestUtil) waitUntilSync(from, to int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
target, err := tu.nds[from].ChainHead(ctx)
target, err := tu.nds[from].ChainHead(tu.ctx)
if err != nil {
tu.t.Fatal(err)
}
tu.waitUntilSyncTarget(to, target)
}
func (tu *syncTestUtil) waitUntilSyncTarget(to int, target *types.TipSet) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
hc, err := tu.nds[to].ChainNotify(ctx)
if err != nil {
tu.t.Fatal(err)
@ -232,32 +315,6 @@ func (tu *syncTestUtil) waitUntilSync(from, to int) {
}
}
/*
func (tu *syncTestUtil) submitSourceBlock(to int, h int) {
// utility to simulate incoming blocks without miner process
// TODO: should call syncer directly, this won't work correctly in all cases
var b chain.BlockMsg
// -1 to match block.Height
b.Header = tu.blocks[h-1].Header
for _, msg := range tu.blocks[h-1].SecpkMessages {
c, err := tu.nds[to].(*impl.FullNodeAPI).ChainAPI.Chain.PutMessage(msg)
require.NoError(tu.t, err)
b.SecpkMessages = append(b.SecpkMessages, c)
}
require.NoError(tu.t, tu.nds[to].ChainSubmitBlock(tu.ctx, &b))
}
func (tu *syncTestUtil) submitSourceBlocks(to int, h int, n int) {
for i := 0; i < n; i++ {
tu.submitSourceBlock(to, h+i)
}
}
*/
func TestSyncSimple(t *testing.T) {
H := 50
tu := prepSyncTest(t, H)
@ -290,12 +347,110 @@ func TestSyncMining(t *testing.T) {
tu.compareSourceState(client)
for i := 0; i < 5; i++ {
tu.mineNewBlock(0)
tu.mineNewBlock(0, nil)
tu.waitUntilSync(0, client)
tu.compareSourceState(client)
}
}
func TestSyncBadTimestamp(t *testing.T) {
H := 50
tu := prepSyncTest(t, H)
client := tu.addClientNode()
require.NoError(t, tu.mn.LinkAll())
tu.connect(client, 0)
tu.waitUntilSync(0, client)
base := tu.g.CurTipset
tu.g.Timestamper = func(pts *types.TipSet, tl int) uint64 {
return pts.MinTimestamp() + 2
}
a1 := tu.mineOnBlock(base, 0, nil, false)
tu.g.Timestamper = nil
tu.g.ResyncBankerNonce(a1.TipSet())
a2 := tu.mineOnBlock(base, 0, nil, true)
tu.waitUntilSync(0, client)
head, err := tu.nds[0].ChainHead(tu.ctx)
require.NoError(t, err)
if !head.Equals(a2.TipSet()) {
t.Fatalf("expected head to be %s, but got %s", a2.Cids(), head.Cids())
}
}
func (tu *syncTestUtil) loadChainToNode(to int) {
// utility to simulate incoming blocks without miner process
// TODO: should call syncer directly, this won't work correctly in all cases
for i := 0; i < len(tu.blocks); i++ {
tu.pushFtsAndWait(to, tu.blocks[i], true)
}
}
func TestSyncFork(t *testing.T) {
H := 10
tu := prepSyncTest(t, H)
p1 := tu.addClientNode()
p2 := tu.addClientNode()
fmt.Println("GENESIS: ", tu.g.Genesis().Cid())
tu.loadChainToNode(p1)
tu.loadChainToNode(p2)
phead := func() {
h1, err := tu.nds[1].ChainHead(tu.ctx)
require.NoError(tu.t, err)
h2, err := tu.nds[2].ChainHead(tu.ctx)
require.NoError(tu.t, err)
fmt.Println("Node 1: ", h1.Cids(), h1.Parents(), h1.Height())
fmt.Println("Node 2: ", h2.Cids(), h1.Parents(), h2.Height())
//time.Sleep(time.Second * 2)
fmt.Println()
fmt.Println()
fmt.Println()
fmt.Println()
}
phead()
base := tu.g.CurTipset
fmt.Println("Mining base: ", base.TipSet().Cids(), base.TipSet().Height())
// The two nodes fork at this point into 'a' and 'b'
a1 := tu.mineOnBlock(base, p1, []int{0}, true)
a := tu.mineOnBlock(a1, p1, []int{0}, true)
a = tu.mineOnBlock(a, p1, []int{0}, true)
tu.g.ResyncBankerNonce(a1.TipSet())
// chain B will now be heaviest
b := tu.mineOnBlock(base, p2, []int{1}, true)
b = tu.mineOnBlock(b, p2, []int{1}, true)
b = tu.mineOnBlock(b, p2, []int{1}, true)
b = tu.mineOnBlock(b, p2, []int{1}, true)
fmt.Println("A: ", a.Cids(), a.TipSet().Height())
fmt.Println("B: ", b.Cids(), b.TipSet().Height())
// Now for the fun part!!
require.NoError(t, tu.mn.LinkAll())
tu.connect(p1, p2)
tu.waitUntilSyncTarget(p1, b.TipSet())
tu.waitUntilSyncTarget(p2, b.TipSet())
phead()
}
func BenchmarkSyncBasic(b *testing.B) {
for i := 0; i < b.N; i++ {
runSyncBenchLength(b, 100)
@ -315,44 +470,3 @@ func runSyncBenchLength(b *testing.B, l int) {
tu.waitUntilSync(0, client)
}
/*
TODO: this is broken because of how tu.submitSourceBlock works now
func TestSyncManual(t *testing.T) {
H := 20
tu := prepSyncTest(t, H)
client := tu.addClientNode()
tu.checkHeight("client", client, 0)
tu.submitSourceBlocks(client, 1, H)
time.Sleep(time.Second * 1)
tu.checkHeight("client", client, H)
tu.compareSourceState(client)
}
func TestSyncIncoming(t *testing.T) {
H := 1
tu := prepSyncTest(t, H)
producer := tu.addClientNode()
client := tu.addClientNode()
tu.mn.LinkAll()
tu.connect(client, producer)
for h := 0; h < H; h++ {
tu.submitSourceBlock(producer, h + 1)
time.Sleep(time.Millisecond * 200)
}
tu.checkHeight("client", client, H)
tu.checkHeight("producer", producer, H)
tu.compareSourceState(client)
}
*/

View File

@ -14,9 +14,7 @@ import (
)
type Ticket struct {
VRFProof []byte
VDFResult []byte
VDFProof []byte
VRFProof []byte
}
type ElectionProof []byte
@ -176,5 +174,5 @@ func PowerCmp(eproof ElectionProof, mpow, totpow BigInt) bool {
}
func (t *Ticket) Equals(ot *Ticket) bool {
return bytes.Equal(t.VDFResult, ot.VDFResult)
return bytes.Equal(t.VRFProof, ot.VRFProof)
}

View File

@ -27,9 +27,7 @@ func testBlockHeader(t testing.TB) *BlockHeader {
ElectionProof: []byte("cats won the election"),
Tickets: []*Ticket{
&Ticket{
VRFProof: []byte("vrf proof"),
VDFResult: []byte("vdf result"),
VDFProof: []byte("vrf proof"),
VRFProof: []byte("vrf proof"),
},
},
Parents: []cid.Cid{c, c},

View File

@ -5,7 +5,7 @@ import (
"io"
"math"
cid "github.com/ipfs/go-cid"
"github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)
@ -284,7 +284,7 @@ func (t *Ticket) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{131}); err != nil {
if _, err := w.Write([]byte{129}); err != nil {
return err
}
@ -295,22 +295,6 @@ func (t *Ticket) MarshalCBOR(w io.Writer) error {
if _, err := w.Write(t.VRFProof); err != nil {
return err
}
// t.t.VDFResult ([]uint8)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.VDFResult)))); err != nil {
return err
}
if _, err := w.Write(t.VDFResult); err != nil {
return err
}
// t.t.VDFProof ([]uint8)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajByteString, uint64(len(t.VDFProof)))); err != nil {
return err
}
if _, err := w.Write(t.VDFProof); err != nil {
return err
}
return nil
}
@ -325,7 +309,7 @@ func (t *Ticket) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 3 {
if extra != 1 {
return fmt.Errorf("cbor input had wrong number of fields")
}
@ -346,40 +330,6 @@ func (t *Ticket) UnmarshalCBOR(r io.Reader) error {
if _, err := io.ReadFull(br, t.VRFProof); err != nil {
return err
}
// t.t.VDFResult ([]uint8)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("t.VDFResult: array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.VDFResult = make([]byte, extra)
if _, err := io.ReadFull(br, t.VDFResult); err != nil {
return err
}
// t.t.VDFProof ([]uint8)
maj, extra, err = cbg.CborReadHeader(br)
if err != nil {
return err
}
if extra > 8192 {
return fmt.Errorf("t.VDFProof: array too large (%d)", extra)
}
if maj != cbg.MajByteString {
return fmt.Errorf("expected byte array")
}
t.VDFProof = make([]byte, extra)
if _, err := io.ReadFull(br, t.VDFProof); err != nil {
return err
}
return nil
}

View File

@ -9,6 +9,10 @@ import (
)
func (m *SignedMessage) ToStorageBlock() (block.Block, error) {
if m.Signature.Type == KTBLS {
return m.Message.ToStorageBlock()
}
data, err := m.Serialize()
if err != nil {
return nil, err

View File

@ -56,7 +56,7 @@ func tipsetSortFunc(blks []*BlockHeader) func(i, j int) bool {
tj := blks[j].LastTicket()
if ti.Equals(tj) {
log.Warn("blocks have same ticket")
log.Warnf("blocks have same ticket (%s %s)", blks[i].Miner, blks[j].Miner)
return blks[i].Cid().KeyString() < blks[j].Cid().KeyString()
}
@ -104,6 +104,13 @@ func (ts *TipSet) Blocks() []*BlockHeader {
}
func (ts *TipSet) Equals(ots *TipSet) bool {
if ts == nil && ots == nil {
return true
}
if ts == nil || ots == nil {
return false
}
if len(ts.blks) != len(ots.blks) {
return false
}
@ -118,7 +125,7 @@ func (ts *TipSet) Equals(ots *TipSet) bool {
}
func (t *Ticket) Less(o *Ticket) bool {
return bytes.Compare(t.VDFResult, o.VDFResult) < 0
return bytes.Compare(t.VRFProof, o.VRFProof) < 0
}
func (ts *TipSet) MinTicket() *Ticket {

View File

@ -471,7 +471,7 @@ func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*ApplyRet,
return nil, xerrors.Errorf("fatal error: %w", actorErr)
}
if actorErr != nil {
log.Warn("[%d] Send actor error: %s", vm.blockHeight, actorErr)
log.Warnf("[%d] Send actor error: %+v", vm.blockHeight, actorErr)
}
var errcode uint8

View File

@ -4,6 +4,7 @@ import (
"context"
"sort"
"strings"
"sync"
"github.com/filecoin-project/go-bls-sigs"
"github.com/filecoin-project/go-lotus/node/repo"
@ -23,6 +24,8 @@ const (
type Wallet struct {
keys map[address.Address]*Key
keystore types.KeyStore
lk sync.Mutex
}
func NewWallet(keystore types.KeyStore) (*Wallet, error) {
@ -71,6 +74,9 @@ func (w *Wallet) Sign(ctx context.Context, addr address.Address, msg []byte) (*t
}
func (w *Wallet) findKey(addr address.Address) (*Key, error) {
w.lk.Lock()
defer w.lk.Unlock()
k, ok := w.keys[addr]
if ok {
return k, nil
@ -90,12 +96,29 @@ func (w *Wallet) findKey(addr address.Address) (*Key, error) {
return k, nil
}
func (w *Wallet) Export(addr address.Address) ([]byte, error) {
panic("nyi")
func (w *Wallet) Export(addr address.Address) (*types.KeyInfo, error) {
k, err := w.findKey(addr)
if err != nil {
return nil, xerrors.Errorf("failed to find key to export: %w", err)
}
return &k.KeyInfo, nil
}
func (w *Wallet) Import(kdata []byte) (address.Address, error) {
panic("nyi")
func (w *Wallet) Import(ki *types.KeyInfo) (address.Address, error) {
w.lk.Lock()
defer w.lk.Unlock()
k, err := NewKey(*ki)
if err != nil {
return address.Undef, xerrors.Errorf("failed to make key: %w", err)
}
if err := w.keystore.Put(KNamePrefix+k.Address.String(), k.KeyInfo); err != nil {
return address.Undef, xerrors.Errorf("saving to keystore: %w", err)
}
return k.Address, nil
}
func (w *Wallet) ListAddrs() ([]address.Address, error) {
@ -148,6 +171,9 @@ func GenerateKey(typ string) (*Key, error) {
}
func (w *Wallet) GenerateKey(typ string) (address.Address, error) {
w.lk.Lock()
defer w.lk.Unlock()
k, err := GenerateKey(typ)
if err != nil {
return address.Undef, err

View File

@ -1,14 +1,16 @@
package cli
import (
"context"
"encoding/json"
"fmt"
cid "github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"gopkg.in/urfave/cli.v2"
"github.com/filecoin-project/go-lotus/api"
types "github.com/filecoin-project/go-lotus/chain/types"
cid "github.com/ipfs/go-cid"
)
var chainCmd = &cli.Command{
@ -18,6 +20,8 @@ var chainCmd = &cli.Command{
chainHeadCmd,
chainGetBlock,
chainReadObjCmd,
chainGetMsgCmd,
chainSetHeadCmd,
},
}
@ -113,7 +117,7 @@ var chainGetBlock = &cli.Command{
cblock.BlsMessages = msgs.BlsMessages
cblock.SecpkMessages = msgs.SecpkMessages
cblock.ParentReceipts = recpts
cblock.ParentMessages = pmsgs
cblock.ParentMessages = apiMsgCids(pmsgs)
out, err := json.MarshalIndent(cblock, "", " ")
if err != nil {
@ -126,6 +130,14 @@ var chainGetBlock = &cli.Command{
},
}
func apiMsgCids(in []api.Message) []cid.Cid {
out := make([]cid.Cid, len(in))
for k, v := range in {
out[k] = v.Cid
}
return out
}
var chainReadObjCmd = &cli.Command{
Name: "read-obj",
Usage: "Read the raw bytes of an object",
@ -151,3 +163,93 @@ var chainReadObjCmd = &cli.Command{
return nil
},
}
var chainGetMsgCmd = &cli.Command{
Name: "getmessage",
Usage: "Get and print a message by its cid",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
c, err := cid.Decode(cctx.Args().First())
if err != nil {
return xerrors.Errorf("failed to parse cid input: %w", err)
}
mb, err := api.ChainReadObj(ctx, c)
if err != nil {
return xerrors.Errorf("failed to read object: %w", err)
}
var i interface{}
m, err := types.DecodeMessage(mb)
if err != nil {
sm, err := types.DecodeSignedMessage(mb)
if err != nil {
return xerrors.Errorf("failed to decode object as a message: %w", err)
}
i = sm
} else {
i = m
}
enc, err := json.MarshalIndent(i, "", " ")
if err != nil {
return err
}
fmt.Println(string(enc))
return nil
},
}
var chainSetHeadCmd = &cli.Command{
Name: "sethead",
Usage: "manually set the local nodes head tipset (Caution: normally only used for recovery)",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
if !cctx.Args().Present() {
return fmt.Errorf("must pass cids for tipset to set as head")
}
ts, err := parseTipSet(api, ctx, cctx.Args().Slice())
if err != nil {
return err
}
if err := api.ChainSetHead(ctx, ts); err != nil {
return err
}
return nil
},
}
func parseTipSet(api api.FullNode, ctx context.Context, vals []string) (*types.TipSet, error) {
var headers []*types.BlockHeader
for _, c := range vals {
blkc, err := cid.Decode(c)
if err != nil {
return nil, err
}
bh, err := api.ChainGetBlock(ctx, blkc)
if err != nil {
return nil, err
}
headers = append(headers, bh)
}
return types.NewTipSet(headers)
}

View File

@ -68,42 +68,22 @@ var createMinerCmd = &cli.Command{
return err
}
nonce, err := api.MpoolGetNonce(ctx, addr)
if err != nil {
return xerrors.Errorf("failed to get account nonce: %w", err)
}
msg := types.Message{
msg := &types.Message{
To: actors.StorageMarketAddress,
From: addr,
Method: actors.SMAMethods.CreateStorageMiner,
Params: params,
Value: types.NewInt(0),
Nonce: nonce,
GasPrice: types.NewInt(1),
GasLimit: types.NewInt(1),
GasPrice: types.NewInt(0),
GasLimit: types.NewInt(10000),
}
msgbytes, err := msg.Serialize()
smsg, err := api.MpoolPushMessage(ctx, msg)
if err != nil {
return err
}
sig, err := api.WalletSign(ctx, addr, msgbytes)
if err != nil {
return xerrors.Errorf("failed to sign message: %w", err)
}
smsg := &types.SignedMessage{
Message: msg,
Signature: *sig,
}
if err := api.MpoolPush(ctx, smsg); err != nil {
return xerrors.Errorf("failed to push signed message: %w", err)
}
mwait, err := api.ChainWaitMsg(ctx, smsg.Cid())
mwait, err := api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return xerrors.Errorf("failed waiting for message inclusion: %w", err)
}

View File

@ -1,15 +1,11 @@
package cli
import (
"context"
"fmt"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
"gopkg.in/urfave/cli.v2"
"github.com/filecoin-project/go-lotus/lib/addrutil"
)
var netCmd = &cli.Command{
@ -70,8 +66,9 @@ var netListen = &cli.Command{
}
var netConnect = &cli.Command{
Name: "connect",
Usage: "Connect to a peer",
Name: "connect",
Usage: "Connect to a peer",
ArgsUsage: "<peer multiaddr>",
Action: func(cctx *cli.Context) error {
api, closer, err := GetAPI(cctx)
if err != nil {
@ -80,7 +77,7 @@ var netConnect = &cli.Command{
defer closer()
ctx := ReqContext(cctx)
pis, err := parseAddresses(ctx, cctx.Args().Slice())
pis, err := addrutil.ParseAddresses(ctx, cctx.Args().Slice())
if err != nil {
return err
}
@ -120,80 +117,3 @@ var netId = &cli.Command{
return nil
},
}
// parseAddresses is a function that takes in a slice of string peer addresses
// (multiaddr + peerid) and returns a slice of properly constructed peers
func parseAddresses(ctx context.Context, addrs []string) ([]peer.AddrInfo, error) {
// resolve addresses
maddrs, err := resolveAddresses(ctx, addrs)
if err != nil {
return nil, err
}
return peer.AddrInfosFromP2pAddrs(maddrs...)
}
const (
dnsResolveTimeout = 10 * time.Second
)
// resolveAddresses resolves addresses parallelly
func resolveAddresses(ctx context.Context, addrs []string) ([]ma.Multiaddr, error) {
ctx, cancel := context.WithTimeout(ctx, dnsResolveTimeout)
defer cancel()
var maddrs []ma.Multiaddr
var wg sync.WaitGroup
resolveErrC := make(chan error, len(addrs))
maddrC := make(chan ma.Multiaddr)
for _, addr := range addrs {
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
// check whether address ends in `ipfs/Qm...`
if _, last := ma.SplitLast(maddr); last.Protocol().Code == ma.P_IPFS {
maddrs = append(maddrs, maddr)
continue
}
wg.Add(1)
go func(maddr ma.Multiaddr) {
defer wg.Done()
raddrs, err := madns.Resolve(ctx, maddr)
if err != nil {
resolveErrC <- err
return
}
// filter out addresses that still doesn't end in `ipfs/Qm...`
found := 0
for _, raddr := range raddrs {
if _, last := ma.SplitLast(raddr); last != nil && last.Protocol().Code == ma.P_IPFS {
maddrC <- raddr
found++
}
}
if found == 0 {
resolveErrC <- fmt.Errorf("found no ipfs peers at %s", maddr)
}
}(maddr)
}
go func() {
wg.Wait()
close(maddrC)
}()
for maddr := range maddrC {
maddrs = append(maddrs, maddr)
}
select {
case err := <-resolveErrC:
return nil, err
default:
}
return maddrs, nil
}

View File

@ -350,7 +350,7 @@ var paychVoucherSubmitCmd = &cli.Command{
return err
}
mwait, err := api.ChainWaitMsg(ctx, mcid)
mwait, err := api.StateWaitMsg(ctx, mcid)
if err != nil {
return err
}

View File

@ -9,8 +9,9 @@ import (
)
var sendCmd = &cli.Command{
Name: "send",
Usage: "send funds between accounts",
Name: "send",
Usage: "Send funds between accounts",
ArgsUsage: "<target> <amount>",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "source",

View File

@ -1,9 +1,15 @@
package cli
import (
"encoding/hex"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"strings"
"github.com/filecoin-project/go-lotus/chain/address"
types "github.com/filecoin-project/go-lotus/chain/types"
"gopkg.in/urfave/cli.v2"
)
@ -14,12 +20,15 @@ var walletCmd = &cli.Command{
walletNew,
walletList,
walletBalance,
walletExport,
walletImport,
},
}
var walletNew = &cli.Command{
Name: "new",
Usage: "Generate a new key of the given type (bls or secp256k1)",
Name: "new",
Usage: "Generate a new key of the given type",
ArgsUsage: "[bls|secp256k1]",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
@ -68,8 +77,9 @@ var walletList = &cli.Command{
}
var walletBalance = &cli.Command{
Name: "balance",
Usage: "get account balance",
Name: "balance",
Usage: "Get account balance",
ArgsUsage: "[account address]",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
@ -97,3 +107,84 @@ var walletBalance = &cli.Command{
return nil
},
}
var walletExport = &cli.Command{
Name: "export",
Usage: "export keys",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
if !cctx.Args().Present() {
return fmt.Errorf("must specify key to export")
}
addr, err := address.NewFromString(cctx.Args().First())
if err != nil {
return err
}
ki, err := api.WalletExport(ctx, addr)
if err != nil {
return err
}
b, err := json.Marshal(ki)
if err != nil {
return err
}
fmt.Println(hex.EncodeToString(b))
return nil
},
}
var walletImport = &cli.Command{
Name: "import",
Usage: "import keys",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
var data []byte
if !cctx.Args().Present() || cctx.Args().First() == "-" {
indata, err := ioutil.ReadAll(os.Stdin)
if err != nil {
return err
}
dec, err := hex.DecodeString(strings.TrimSpace(string(indata)))
if err != nil {
return err
}
data = dec
} else {
fdata, err := ioutil.ReadFile(cctx.Args().First())
if err != nil {
return err
}
data = fdata
}
var ki types.KeyInfo
if err := json.Unmarshal(data, &ki); err != nil {
return err
}
addr, err := api.WalletImport(ctx, &ki)
if err != nil {
return err
}
fmt.Printf("imported key %s successfully!", addr)
return nil
},
}

View File

@ -64,7 +64,7 @@ var runCmd = &cli.Command{
},
},
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetFullNodeAPI(cctx)
nodeApi, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}

View File

@ -3,6 +3,7 @@ package main
import (
"context"
"crypto/rand"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/ipfs/go-datastore"
@ -232,7 +233,7 @@ func configureStorageMiner(ctx context.Context, api api.FullNode, addr address.A
}
log.Info("Waiting for message: ", smsg.Cid())
ret, err := api.ChainWaitMsg(ctx, smsg.Cid())
ret, err := api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return err
}
@ -303,7 +304,7 @@ func createStorageMiner(ctx context.Context, api api.FullNode, peerid peer.ID, c
log.Infof("Pushed StorageMarket.CreateStorageMiner, %s to Mpool", signed.Cid())
log.Infof("Waiting for confirmation")
mw, err := api.ChainWaitMsg(ctx, signed.Cid())
mw, err := api.StateWaitMsg(ctx, signed.Cid())
if err != nil {
return address.Undef, err
}

View File

@ -40,6 +40,10 @@ var DaemonCmd = &cli.Command{
Name: "genesis",
Usage: "genesis file to use for first node run",
},
&cli.BoolFlag{
Name: "bootstrap",
Value: true,
},
},
Action: func(cctx *cli.Context) error {
ctx := context.Background()
@ -95,7 +99,32 @@ var DaemonCmd = &cli.Command{
return err
}
go func() {
if !cctx.Bool("bootstrap") {
return
}
err := bootstrap(ctx, api)
if err != nil {
log.Error("Bootstrap failed: ", err)
}
}()
// TODO: properly parse api endpoint (or make it a URL)
return serveRPC(api, stop, "127.0.0.1:"+cctx.String("api"))
},
}
func bootstrap(ctx context.Context, api api.FullNode) error {
pis, err := build.BuiltinBootstrap()
if err != nil {
return err
}
for _, pi := range pis {
if err := api.NetConnect(ctx, pi); err != nil {
return err
}
}
return nil
}

2
go.mod
View File

@ -14,6 +14,7 @@ require (
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/gorilla/websocket v1.4.0
github.com/hashicorp/golang-lru v0.5.3
github.com/ipfs/go-bitswap v0.1.8
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c
@ -76,6 +77,7 @@ require (
go.uber.org/dig v1.7.0 // indirect
go.uber.org/fx v1.9.0
go.uber.org/goleak v0.10.0 // indirect
go.uber.org/zap v1.10.0
go4.org v0.0.0-20190313082347-94abd6928b1d // indirect
golang.org/x/crypto v0.0.0-20190829043050-9756ffdc2472 // indirect
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd // indirect

89
lib/addrutil/parse.go Normal file
View File

@ -0,0 +1,89 @@
package addrutil
import (
"context"
"fmt"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
)
// parseAddresses is a function that takes in a slice of string peer addresses
// (multiaddr + peerid) and returns a slice of properly constructed peers
func ParseAddresses(ctx context.Context, addrs []string) ([]peer.AddrInfo, error) {
// resolve addresses
maddrs, err := resolveAddresses(ctx, addrs)
if err != nil {
return nil, err
}
return peer.AddrInfosFromP2pAddrs(maddrs...)
}
const (
dnsResolveTimeout = 10 * time.Second
)
// resolveAddresses resolves addresses parallelly
func resolveAddresses(ctx context.Context, addrs []string) ([]ma.Multiaddr, error) {
ctx, cancel := context.WithTimeout(ctx, dnsResolveTimeout)
defer cancel()
var maddrs []ma.Multiaddr
var wg sync.WaitGroup
resolveErrC := make(chan error, len(addrs))
maddrC := make(chan ma.Multiaddr)
for _, addr := range addrs {
maddr, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
// check whether address ends in `ipfs/Qm...`
if _, last := ma.SplitLast(maddr); last.Protocol().Code == ma.P_IPFS {
maddrs = append(maddrs, maddr)
continue
}
wg.Add(1)
go func(maddr ma.Multiaddr) {
defer wg.Done()
raddrs, err := madns.Resolve(ctx, maddr)
if err != nil {
resolveErrC <- err
return
}
// filter out addresses that still doesn't end in `ipfs/Qm...`
found := 0
for _, raddr := range raddrs {
if _, last := ma.SplitLast(raddr); last != nil && last.Protocol().Code == ma.P_IPFS {
maddrC <- raddr
found++
}
}
if found == 0 {
resolveErrC <- fmt.Errorf("found no ipfs peers at %s", maddr)
}
}(maddr)
}
go func() {
wg.Wait()
close(maddrC)
}()
for maddr := range maddrC {
maddrs = append(maddrs, maddr)
}
select {
case err := <-resolveErrC:
return nil, err
default:
}
return maddrs, nil
}

View File

@ -88,6 +88,7 @@ func NewMergeClient(addr string, namespace string, outs []interface{}, requestHe
}
stop := make(chan struct{})
exiting := make(chan struct{})
c.requests = make(chan clientRequest)
handlers := map[string]rpcHandler{}
@ -96,6 +97,7 @@ func NewMergeClient(addr string, namespace string, outs []interface{}, requestHe
handler: handlers,
requests: c.requests,
stop: stop,
exiting: exiting,
}).handleWsConn(context.TODO())
for _, handler := range outs {
@ -122,6 +124,7 @@ func NewMergeClient(addr string, namespace string, outs []interface{}, requestHe
return func() {
close(stop)
<-exiting
}, nil
}

View File

@ -52,6 +52,7 @@ func (s *RPCServer) handleWS(ctx context.Context, w http.ResponseWriter, r *http
(&wsConn{
conn: c,
handler: s.methods,
exiting: make(chan struct{}),
}).handleWsConn(ctx)
if err := c.Close(); err != nil {

View File

@ -43,6 +43,7 @@ type wsConn struct {
handler handlers
requests <-chan clientRequest
stop <-chan struct{}
exiting chan struct{}
// incoming messages
incoming chan io.Reader
@ -389,6 +390,7 @@ func (c *wsConn) handleWsConn(ctx context.Context) {
c.registerCh = make(chan outChanReg)
defer close(c.registerCh)
defer close(c.exiting)
// ////

View File

@ -1,28 +0,0 @@
package vdf
import (
"bytes"
"crypto/sha256"
"fmt"
)
func Run(input []byte) ([]byte, []byte, error) {
h := sha256.Sum256(input)
// TODO: THIS IS A FAKE VDF. THE SPEC IS UNCLEAR ON WHAT TO REALLY DO HERE
return h[:], []byte("proof"), nil
}
func Verify(input []byte, out []byte, proof []byte) error {
// this is a fake VDF
h := sha256.Sum256(input)
if !bytes.Equal(h[:], out) {
return fmt.Errorf("vdf output incorrect")
}
if !bytes.Equal(proof, []byte("proof")) {
return fmt.Errorf("vdf proof failed to validate")
}
return nil
}

View File

@ -168,7 +168,7 @@ a:hover {
.Block {
user-select: text;
font-family: monospace;
min-width: 50em;
min-width: 60em;
display: inline-block;
}
@ -210,10 +210,18 @@ a:hover {
background: #440000
}
.ChainExplorer-after:hover {
background: #770000
}
.ChainExplorer-before {
background: #444400
}
.ChainExplorer-before:hover {
background: #777700
}
.Logs {
width: 100%;
height: 100%;

View File

@ -14,21 +14,19 @@ class Block extends React.Component {
async loadHeader() {
const header = await this.props.conn.call('Filecoin.ChainGetBlock', [this.props.cid])
let messages = await this.props.conn.call('Filecoin.ChainGetBlockMessages', [this.props.cid])
let receipts = await this.props.conn.call('Filecoin.ChainGetBlockReceipts', [this.props.cid])
let messages = await this.props.conn.call('Filecoin.ChainGetParentMessages', [this.props.cid])
let receipts = await this.props.conn.call('Filecoin.ChainGetParentReceipts', [this.props.cid])
const mcids = messages.Cids
if (!messages) {
messages = []
}
messages = [
...(messages.BlsMessages.map(m => ({...m, type: 'BLS'}))),
...(messages.SecpkMessages.map(m => ({...(m.Message), type: 'Secpk'})))
]
messages = messages.map((msg, k) => ({...msg, receipt: receipts[k]}))
messages = messages.map((msg, k) => ({...msg.Message, cid: msg.Cid, receipt: receipts[k]}))
messages = await Promise.all(messages.map(async (msg, i) => {
if (msg.receipt.ExitCode !== 0) {
let reply = await this.props.conn.call('Filecoin.StateReplay', [{Cids: [this.props.cid], Blocks: [header], Height: header.Height}, mcids[i]])
let reply = await this.props.conn.call('Filecoin.StateReplay', [{Cids: [this.props.cid], Blocks: [header], Height: header.Height}, msg.Cid])
if(!reply.Error) {
reply.Error = "reply: no error"
}
@ -45,8 +43,8 @@ class Block extends React.Component {
if (this.state.header) {
let head = this.state.header
const messages = this.state.messages.map(m => (
<div>
const messages = this.state.messages.map((m, k) => (
<div key={k}>
<div>
<Address client={this.props.conn} addr={m.From} mountWindow={this.props.mountWindow}/><b>&nbsp;=>&nbsp;</b>
<Address client={this.props.conn} addr={m.To} mountWindow={this.props.mountWindow} transfer={m.Value} method={m.Method}/>
@ -65,15 +63,15 @@ class Block extends React.Component {
<div>Weight: {head.ParentWeight}</div>
<div>Miner: {<Address client={this.props.conn} addr={head.Miner} mountWindow={this.props.mountWindow}/>}</div>
<div>Messages: {head.Messages['/']} {/*TODO: link to message explorer */}</div>
<div>Receipts: {head.MessageReceipts['/']}</div>
<div>State Root:&nbsp;{head.StateRoot['/']}</div>
<div>Parent Receipts: {head.ParentMessageReceipts['/']}</div>
<div>Parent State Root:&nbsp;{head.ParentStateRoot['/']}</div>
<div>----</div>
<div>{messages}</div>
</div>
)
}
return (<Window className="CristalScroll" initialSize={{width: 700, height: 400}} onClose={this.props.onClose} title={`Block ${this.props.cid['/']}`}>
return (<Window className="CristalScroll" initialSize={{width: 950, height: 400}} onClose={this.props.onClose} title={`Block ${this.props.cid['/']}`}>
{content}
</Window>)
}

View File

@ -12,7 +12,7 @@ export class BlockLinks extends React.Component {
block = this.props.blocks[k]
}
return <BlockLink key={c} block={block} conn={this.props.conn} cid={c} mountWindow={this.props.mountWindow}/>
return <BlockLink key={c + '-' + k} block={block} conn={this.props.conn} cid={c} mountWindow={this.props.mountWindow}/>
})
}
}

View File

@ -5,6 +5,8 @@ import Window from "./Window";
const rows = 32
class ChainExplorer extends React.Component {
fetching = []
constructor(props) {
super(props)
@ -38,7 +40,7 @@ class ChainExplorer extends React.Component {
}
async updateMessages(cids, msgcache) {
const msgs = await Promise.all(cids.map(async cid => [cid['/'], await this.props.client.call('Filecoin.ChainGetBlockMessages', [cid])]))
const msgs = await Promise.all(cids.map(async cid => [cid['/'], await this.props.client.call('Filecoin.ChainGetParentMessages', [cid])]))
msgs.forEach(([cid, msg]) => msgcache[cid] = msg)
}
@ -57,20 +59,41 @@ class ChainExplorer extends React.Component {
await this.fetchVisible()
}
async fetch(h, cache, msgcache) {
async fetch(h, base, cache, msgcache) {
console.log(h, base, cache)
if (this.fetching[h]) {
return
}
this.fetching[h] = true
if (h < 0) {
return
}
const cids = cache[h + 1].Blocks.map(b => b.Parents).reduce((acc, val) => acc.concat(val), [])
if(!base.Blocks) {
console.log("base for H is nll blk", h, base)
return
}
let cids = base.Blocks.map(b => b.Parents)
.reduce((acc, val) => {
let out = {...acc}
val.forEach(c => out[c['/']] = 8)
return out
}, {})
cids = Object.keys(cids).map(k => ({'/': k}))
console.log("parents", cids)
const blocks = await Promise.all(cids.map(cid => this.props.client.call('Filecoin.ChainGetBlock', [cid])))
cache[h] = {
Height: h,
Height: blocks[0].Height,
Cids: cids,
Blocks: blocks,
}
await this.updateMessages(cids, msgcache)
return cache[h]
}
async fetchVisible() {
@ -93,7 +116,11 @@ class ChainExplorer extends React.Component {
let cache = {...this.state.cache}
let msgcache = {...this.state.messages}
await tofetch.reduce(async (prev, next) => [...await prev, await this.fetch(next, cache, msgcache)], Promise.resolve([]))
await tofetch.reduce(async (prev, next) => {
let prevts = await prev
let newts = await this.fetch(next, prevts, cache, msgcache)
return newts ? newts : prevts
}, Promise.resolve(cache[top]))
this.setState({cache: cache, messages: msgcache})
}
@ -113,23 +140,30 @@ class ChainExplorer extends React.Component {
const base = this.state.at - this.state.at % rows
const className = row === this.state.at ? 'ChainExplorer-at' : (row < base ? 'ChainExplorer-after' : 'ChainExplorer-before')
let info = <span>(fetching)</span>
let h = <i>{row}</i>
if(this.state.cache[row]) {
const ts = this.state.cache[row]
h = ts.Height
let msgc = -1
if(ts.Cids[0] && this.state.messages[ts.Cids[0]['/']]) { // TODO: get from all blks
msgc = this.state.messages[ts.Cids[0]['/']].SecpkMessages.length + this.state.messages[ts.Cids[0]['/']].BlsMessages.length
msgc = this.state.messages[ts.Cids[0]['/']].length
}
if(msgc > 0) {
msgc = <b>{msgc}</b>
}
let time = '?'
if(this.state.cache[row - 1]){
time = <span>{ts.Blocks[0].Timestamp - this.state.cache[row - 1].Blocks[0].Timestamp}s</span>
}
info = <span>
<BlockLinks cids={ts.Cids} blocks={ts.Blocks} conn={this.props.client} mountWindow={this.props.mountWindow} /> Msgs: {msgc}
<BlockLinks cids={ts.Cids} blocks={ts.Blocks} conn={this.props.client} mountWindow={this.props.mountWindow} /> Msgs: {msgc} ΔT:{time}
</span>
}
return <div key={row} className={className}>@{row} {info}</div>
return <div key={row} className={className}>@{h} {info}</div>
})}</div>
return (<Window onClose={this.props.onClose} title={`Chain Explorer ${this.state.follow ? '(Following)' : ''}`}>

View File

@ -46,7 +46,7 @@ func (api *api) Spawn() (nodeInfo, error) {
mux := newWsMux()
cmd := exec.Command("./lotus", "daemon", genParam, "--api", fmt.Sprintf("%d", 2500+id))
cmd := exec.Command("./lotus", "daemon", "--bootstrap=false", genParam, "--api", fmt.Sprintf("%d", 2500+id))
cmd.Stderr = io.MultiWriter(os.Stderr, errlogfile, mux.errpw)
cmd.Stdout = io.MultiWriter(os.Stdout, logfile, mux.outpw)
cmd.Env = []string{"LOTUS_PATH=" + dir}

View File

@ -10,7 +10,6 @@ import (
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/gen"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/lib/vdf"
"github.com/filecoin-project/go-lotus/node/impl/full"
logging "github.com/ipfs/go-log"
@ -22,7 +21,7 @@ import (
var log = logging.Logger("miner")
type vdfFunc func(ctx context.Context, input []byte) ([]byte, []byte, error)
type waitFunc func(ctx context.Context) error
type api struct {
fx.In
@ -36,9 +35,11 @@ type api struct {
func NewMiner(api api) *Miner {
return &Miner{
api: api,
// time between blocks, network parameter
runVDF: delayVDF(build.BlockDelay * time.Second),
waitFunc: func(ctx context.Context) error {
// Wait around for half the block time in case other parents come in
time.Sleep(build.BlockDelay * time.Second / 2)
return nil
},
}
}
@ -50,7 +51,7 @@ type Miner struct {
stop chan struct{}
stopping chan struct{}
runVDF vdfFunc
waitFunc waitFunc
lastWork *MiningBase
}
@ -130,6 +131,8 @@ func (m *Miner) mine(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "/mine")
defer span.End()
var lastBase MiningBase
for {
select {
case <-m.stop:
@ -145,11 +148,23 @@ func (m *Miner) mine(ctx context.Context) {
default:
}
// Sleep a small amount in order to wait for other blocks to arrive
if err := m.waitFunc(ctx); err != nil {
log.Error(err)
return
}
base, err := m.GetBestMiningCandidate()
if err != nil {
log.Errorf("failed to get best mining candidate: %s", err)
continue
}
if base.ts.Equals(lastBase.ts) && len(lastBase.tickets) == len(base.tickets) {
log.Error("BestMiningCandidate from the previous round: %s (tkts:%d)", lastBase.ts.Cids(), len(lastBase.tickets))
time.Sleep(build.BlockDelay * time.Second)
continue
}
lastBase = *base
b, err := m.mineOne(ctx, base)
if err != nil {
@ -160,9 +175,19 @@ func (m *Miner) mine(ctx context.Context) {
}
if b != nil {
btime := time.Unix(int64(b.Header.Timestamp), 0)
if time.Now().Before(btime) {
time.Sleep(time.Until(btime))
} else {
log.Warnf("Mined block in the past: b.T: %s, T: %s, dT: %s", btime, time.Now(), time.Now().Sub(btime))
}
if err := m.api.ChainSubmitBlock(ctx, b); err != nil {
log.Errorf("failed to submit newly mined block: %s", err)
}
} else {
nextRound := time.Unix(int64(base.ts.MinTimestamp()+uint64(build.BlockDelay*len(base.tickets))), 0)
time.Sleep(time.Until(nextRound))
}
}
}
@ -194,7 +219,7 @@ func (m *Miner) GetBestMiningCandidate() (*MiningBase, error) {
}
func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) {
log.Debug("attempting to mine a block on:", base.ts.Cids())
log.Info("attempting to mine a block on:", base.ts.Cids())
ticket, err := m.scratchTicket(ctx, base)
if err != nil {
return nil, errors.Wrap(err, "scratching ticket failed")
@ -255,18 +280,6 @@ func (m *Miner) getMinerWorker(ctx context.Context, addr address.Address, ts *ty
return w, nil
}
func delayVDF(delay time.Duration) func(ctx context.Context, input []byte) ([]byte, []byte, error) {
return func(ctx context.Context, input []byte) ([]byte, []byte, error) {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
case <-time.After(delay):
}
return vdf.Run(input)
}
}
func (m *Miner) scratchTicket(ctx context.Context, base *MiningBase) (*types.Ticket, error) {
var lastTicket *types.Ticket
if len(base.tickets) > 0 {
@ -275,20 +288,13 @@ func (m *Miner) scratchTicket(ctx context.Context, base *MiningBase) (*types.Tic
lastTicket = base.ts.MinTicket()
}
vrfOut, err := m.computeVRF(ctx, lastTicket.VDFResult)
if err != nil {
return nil, err
}
res, proof, err := m.runVDF(ctx, vrfOut)
vrfOut, err := m.computeVRF(ctx, lastTicket.VRFProof)
if err != nil {
return nil, err
}
return &types.Ticket{
VRFProof: vrfOut,
VDFResult: res,
VDFProof: proof,
VRFProof: vrfOut,
}, nil
}
@ -304,7 +310,7 @@ func (m *Miner) createBlock(base *MiningBase, ticket *types.Ticket, proof types.
return nil, xerrors.Errorf("message filtering failed: %w", err)
}
uts := time.Now().Unix() // TODO: put smallest valid timestamp
uts := base.ts.MinTimestamp() + uint64(build.BlockDelay*(len(base.tickets)+1))
// why even return this? that api call could just submit it for us
return m.api.MinerCreateBlock(context.TODO(), m.addresses[0], base.ts, append(base.tickets, ticket), proof, msgs, uint64(uts))

View File

@ -2,27 +2,25 @@ package miner
import (
"context"
"github.com/filecoin-project/go-lotus/lib/vdf"
)
func NewTestMiner(nextCh <-chan struct{}) func(api api) *Miner {
return func(api api) *Miner {
return &Miner{
api: api,
runVDF: chanVDF(nextCh),
api: api,
waitFunc: chanWaiter(nextCh),
}
}
}
func chanVDF(next <-chan struct{}) func(ctx context.Context, input []byte) ([]byte, []byte, error) {
return func(ctx context.Context, input []byte) ([]byte, []byte, error) {
func chanWaiter(next <-chan struct{}) func(ctx context.Context) error {
return func(ctx context.Context) error {
select {
case <-ctx.Done():
return nil, nil, ctx.Err()
return ctx.Err()
case <-next:
}
return vdf.Run(input)
return nil
}
}

View File

@ -171,7 +171,7 @@ func libp2p() Option {
Override(BaseRoutingKey, lp2p.BaseRouting),
Override(new(routing.Routing), lp2p.Routing),
//Override(NatPortMapKey, lp2p.NatPortMap), //TODO: reenable when closing logic is actually there
Override(NatPortMapKey, lp2p.NatPortMap),
Override(ConnectionManagerKey, lp2p.ConnectionManager(50, 200, 20*time.Second)),
Override(new(*pubsub.PubSub), lp2p.GossipSub()),

View File

@ -71,6 +71,7 @@ func (hs *Service) HandleStream(s inet.Stream) {
return
}
log.Infof("Got new tipset through Hello: %s from %s", ts.Cids(), s.Conn().RemotePeer())
hs.syncer.InformNewHead(s.Conn().RemotePeer(), ts)
}

View File

@ -85,7 +85,8 @@ func (a *CommonAPI) ID(context.Context) (peer.ID, error) {
func (a *CommonAPI) Version(context.Context) (api.Version, error) {
return api.Version{
Version: build.Version,
Version: build.Version,
APIVersion: build.APIVersion,
}, nil
}

View File

@ -27,9 +27,7 @@ func (a *ChainAPI) ChainNotify(ctx context.Context) (<-chan []*store.HeadChange,
}
func (a *ChainAPI) ChainSubmitBlock(ctx context.Context, blk *types.BlockMsg) error {
if err := a.Chain.AddBlock(blk.Header); err != nil {
return xerrors.Errorf("AddBlock failed: %w", err)
}
// TODO: should we have some sort of fast path to adding a local block?
b, err := blk.Serialize()
if err != nil {
@ -48,19 +46,6 @@ func (a *ChainAPI) ChainGetRandomness(ctx context.Context, pts *types.TipSet, ti
return a.Chain.GetRandomness(ctx, pts.Cids(), tickets, int64(lb))
}
func (a *ChainAPI) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) {
// TODO: consider using event system for this, expose confidence
recpt, err := a.Chain.WaitForMessage(ctx, msg)
if err != nil {
return nil, err
}
return &api.MsgWait{
Receipt: *recpt,
}, nil
}
func (a *ChainAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*types.BlockHeader, error) {
return a.Chain.GetBlock(msg)
}
@ -93,12 +78,17 @@ func (a *ChainAPI) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api
}, nil
}
func (a *ChainAPI) ChainGetParentMessages(ctx context.Context, bcid cid.Cid) ([]cid.Cid, error) {
func (a *ChainAPI) ChainGetParentMessages(ctx context.Context, bcid cid.Cid) ([]api.Message, error) {
b, err := a.Chain.GetBlock(bcid)
if err != nil {
return nil, err
}
// genesis block has no parent messages...
if b.Height == 0 {
return nil, nil
}
// TODO: need to get the number of messages better than this
pts, err := a.Chain.LoadTipSet(b.Parents)
if err != nil {
@ -110,9 +100,12 @@ func (a *ChainAPI) ChainGetParentMessages(ctx context.Context, bcid cid.Cid) ([]
return nil, err
}
var out []cid.Cid
var out []api.Message
for _, m := range cm {
out = append(out, m.Cid())
out = append(out, api.Message{
Cid: m.Cid(),
Message: m.VMMessage(),
})
}
return out, nil
@ -124,6 +117,10 @@ func (a *ChainAPI) ChainGetParentReceipts(ctx context.Context, bcid cid.Cid) ([]
return nil, err
}
if b.Height == 0 {
return nil, nil
}
// TODO: need to get the number of messages better than this
pts, err := a.Chain.LoadTipSet(b.Parents)
if err != nil {
@ -160,3 +157,7 @@ func (a *ChainAPI) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, error
return blk.RawData(), nil
}
func (a *ChainAPI) ChainSetHead(ctx context.Context, ts *types.TipSet) error {
return a.Chain.SetHead(ts)
}

View File

@ -36,6 +36,16 @@ func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message) (*t
return a.Mpool.PushWithNonce(msg.From, func(nonce uint64) (*types.SignedMessage, error) {
msg.Nonce = nonce
b, err := a.WalletBalance(ctx, msg.From)
if err != nil {
return nil, xerrors.Errorf("mpool push: getting origin balance: %w", err)
}
if b.LessThan(msg.Value) {
return nil, xerrors.Errorf("mpool push: not enough funds: %s < %s", b, msg.Value)
}
return a.WalletSignMessage(ctx, msg.From, msg)
})
}

View File

@ -193,3 +193,17 @@ func (a *StateAPI) MinerCreateBlock(ctx context.Context, addr address.Address, p
return &out, nil
}
func (a *StateAPI) StateWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) {
// TODO: consider using event system for this, expose confidence
ts, recpt, err := a.StateManager.WaitForMessage(ctx, msg)
if err != nil {
return nil, err
}
return &api.MsgWait{
Receipt: *recpt,
TipSet: ts,
}, nil
}

View File

@ -40,12 +40,9 @@ func (a *WalletAPI) WalletSign(ctx context.Context, k address.Address, msg []byt
}
func (a *WalletAPI) WalletSignMessage(ctx context.Context, k address.Address, msg *types.Message) (*types.SignedMessage, error) {
msgbytes, err := msg.Serialize()
if err != nil {
return nil, err
}
mcid := msg.Cid()
sig, err := a.WalletSign(ctx, k, msgbytes)
sig, err := a.WalletSign(ctx, k, mcid.Bytes())
if err != nil {
return nil, xerrors.Errorf("failed to sign message: %w", err)
}
@ -68,3 +65,11 @@ func (a *WalletAPI) WalletDefaultAddress(ctx context.Context) (address.Address,
// TODO: store a default address in the config or 'wallet' portion of the repo
return addrs[0], nil
}
func (a *WalletAPI) WalletExport(ctx context.Context, addr address.Address) (*types.KeyInfo, error) {
return a.Wallet.Export(addr)
}
func (a *WalletAPI) WalletImport(ctx context.Context, ki *types.KeyInfo) (address.Address, error) {
return a.Wallet.Import(ki)
}

View File

@ -43,8 +43,8 @@ func PNetChecker(repo repo.Repo, ph host.Host, lc fx.Lifecycle) error {
select {
case <-t.C:
if len(ph.Network().Peers()) == 0 {
log.Warning("We are in private network and have no peers.")
log.Warning("This might be configuration mistake.")
log.Warn("We are in private network and have no peers.")
log.Warn("This might be configuration mistake.")
}
case <-done:
return

View File

@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
"time"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-car"
@ -79,7 +80,7 @@ func MakeGenesis(outFile string) func(bs dtypes.ChainBlockstore, w *wallet.Walle
minerAddr: types.FromFil(100000),
}
b, err := gen.MakeGenesisBlock(bs, addrs, gmc, 100000)
b, err := gen.MakeGenesisBlock(bs, addrs, gmc, uint64(time.Now().Unix()))
if err != nil {
return nil, err
}

View File

@ -4,10 +4,11 @@ import (
"bytes"
"context"
"fmt"
"golang.org/x/xerrors"
"math"
"strconv"
"golang.org/x/xerrors"
logging "github.com/ipfs/go-log"
"go.uber.org/fx"
@ -25,7 +26,7 @@ type ManagerApi struct {
full.MpoolAPI
full.WalletAPI
full.ChainAPI
full.StateAPI
}
type Manager struct {
@ -34,7 +35,7 @@ type Manager struct {
mpool full.MpoolAPI
wallet full.WalletAPI
chain full.ChainAPI
state full.StateAPI
}
func NewManager(sm *stmgr.StateManager, pchstore *Store, api ManagerApi) *Manager {
@ -44,7 +45,7 @@ func NewManager(sm *stmgr.StateManager, pchstore *Store, api ManagerApi) *Manage
mpool: api.MpoolAPI,
wallet: api.WalletAPI,
chain: api.ChainAPI,
state: api.StateAPI,
}
}

View File

@ -44,7 +44,7 @@ func (pm *Manager) createPaych(ctx context.Context, from, to address.Address, am
// TODO: wait outside the store lock!
// (tricky because we need to setup channel tracking before we know it's address)
mwait, err := pm.chain.ChainWaitMsg(ctx, mcid)
mwait, err := pm.state.StateWaitMsg(ctx, mcid)
if err != nil {
return address.Undef, cid.Undef, err
}
@ -85,7 +85,7 @@ func (pm *Manager) addFunds(ctx context.Context, ch address.Address, from addres
return err
}
mwait, err := pm.chain.ChainWaitMsg(ctx, smsg.Cid()) // TODO: wait outside the store lock!
mwait, err := pm.state.StateWaitMsg(ctx, smsg.Cid()) // TODO: wait outside the store lock!
if err != nil {
return err
}

2
scripts/bootstrap.toml Normal file
View File

@ -0,0 +1,2 @@
[Libp2p]
ListenAddresses = ["/ip4/0.0.0.0/tcp/1347"]

9
scripts/daemon.service Normal file
View File

@ -0,0 +1,9 @@
[Unit]
Description=Lotus Daemon
After=network.target
[Service]
ExecStart=/usr/local/bin/lotus daemon
[Install]
WantedBy=multiuser.target

110
scripts/deploy-devnet.sh Executable file
View File

@ -0,0 +1,110 @@
#!/usr/bin/env bash
############
## Settings
GENESIS_HOST=root@147.75.80.29
BOOTSTRAPPERS=( root@147.75.80.17 )
############
log() {
echo -e "\e[33m$1\e[39m"
}
rm -f build/bootstrap/*.pi
log '> Generating genesis'
make
GENPATH=$(mktemp -d)
log 'staring temp daemon'
./lotus --repo="${GENPATH}" daemon --lotus-make-random-genesis="${GENPATH}/devnet.car" &
GDPID=$!
sleep 3
log 'Extracting genesis miner prvate key'
ADDR=$(./lotus --repo="${GENPATH}" wallet list)
./lotus --repo="${GENPATH}" wallet export "$ADDR" > "${GENPATH}/wallet.key"
kill "$GDPID"
wait
log '> Creating genesis binary'
cp "${GENPATH}/devnet.car" build/genesis/devnet.car
rm -f build/bootstrap/*.pi
make
log '> Deploying and starting genesis miner'
ssh $GENESIS_HOST 'systemctl stop lotus-daemon' &
ssh $GENESIS_HOST 'systemctl stop lotus-storage-miner' &
wait
ssh $GENESIS_HOST 'rm -rf .lotus' &
ssh $GENESIS_HOST 'rm -rf .lotusstorage' &
scp -C lotus "${GENESIS_HOST}":/usr/local/bin/lotus &
scp -C lotus-storage-miner "${GENESIS_HOST}":/usr/local/bin/lotus-storage-miner &
wait
log 'Initializing genesis miner repo'
ssh $GENESIS_HOST 'systemctl start lotus-daemon'
scp scripts/bootstrap.toml "${GENESIS_HOST}:.lotus/config.toml" &
ssh < "${GENPATH}/wallet.key" $GENESIS_HOST '/usr/local/bin/lotus wallet import' &
wait
ssh $GENESIS_HOST 'systemctl restart lotus-daemon'
log 'Starting genesis mining'
ssh $GENESIS_HOST '/usr/local/bin/lotus-storage-miner init --genesis-miner --actor=t0101'
ssh $GENESIS_HOST 'systemctl start lotus-storage-miner'
log 'Getting genesis addr info'
ssh $GENESIS_HOST './lotus net listen' | grep -v '/10' | grep -v '/127' > build/bootstrap/root.pi
log '> Creating bootstrap binaries'
make
for host in "${BOOTSTRAPPERS[@]}"
do
log "> Deploying bootstrap node $host"
log "Stopping lotus daemon"
ssh "$host" 'systemctl stop lotus-daemon' &
ssh "$host" 'systemctl stop lotus-storage-miner' &
wait
ssh "$host" 'rm -rf .lotus' &
ssh "$host" 'rm -rf .lotusstorage' &
scp -C lotus "${host}":/usr/local/bin/lotus &
scp -C lotus-storage-miner "${host}":/usr/local/bin/lotus-storage-miner &
wait
log 'Initializing repo'
ssh "$host" 'systemctl start lotus-daemon'
scp scripts/bootstrap.toml "${host}:.lotus/config.toml"
ssh "$host" 'systemctl restart lotus-daemon'
log 'Extracting addr info'
ssh "$host" './lotus net listen' | grep -v '/10' | grep -v '/127' >> build/bootstrap/bootstrappers.pi
done

6
scripts/setup-host.sh Executable file
View File

@ -0,0 +1,6 @@
#!/usr/bin/env bash
HOST=$1
scp scripts/daemon.service "${HOST}:/etc/systemd/system/lotus-daemon.service"
scp scripts/sminer.service "${HOST}:/etc/systemd/system/lotus-storage-miner.service"

9
scripts/sminer.service Normal file
View File

@ -0,0 +1,9 @@
[Unit]
Description=Lotus Storage Miner
After=network.target
[Service]
ExecStart=/usr/local/bin/lotus-storage-miner run
[Install]
WantedBy=multiuser.target

View File

@ -54,11 +54,11 @@ type storageMinerApi interface {
StateMinerWorker(context.Context, address.Address, *types.TipSet) (address.Address, error)
StateMinerProvingPeriodEnd(context.Context, address.Address, *types.TipSet) (uint64, error)
StateMinerProvingSet(context.Context, address.Address, *types.TipSet) ([]*api.SectorInfo, error)
StateWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error)
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)
ChainHead(context.Context) (*types.TipSet, error)
ChainWaitMsg(context.Context, cid.Cid) (*api.MsgWait, error)
ChainNotify(context.Context) (<-chan []*store.HeadChange, error)
ChainGetRandomness(context.Context, *types.TipSet, []*types.Ticket, int) ([]byte, error)
ChainGetTipSetByHeight(context.Context, uint64, *types.TipSet) (*types.TipSet, error)
@ -102,7 +102,7 @@ func (m *Miner) handlePostingSealedSectors(ctx context.Context) {
if !ok {
// TODO: set some state variable so that this state can be
// visible via some status command
log.Warning("sealed sector channel closed, aborting process")
log.Warn("sealed sector channel closed, aborting process")
return
}
@ -112,7 +112,7 @@ func (m *Miner) handlePostingSealedSectors(ctx context.Context) {
}
case <-ctx.Done():
log.Warning("exiting seal posting routine")
log.Warn("exiting seal posting routine")
return
}
}
@ -161,7 +161,7 @@ func (m *Miner) commitSector(ctx context.Context, sinfo sectorbuilder.SectorSeal
}
go func() {
_, err := m.api.ChainWaitMsg(ctx, smsg.Cid())
_, err := m.api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return
}

View File

@ -149,7 +149,7 @@ func (m *Miner) computePost(ppe uint64) func(ts *types.TipSet, curH uint64) erro
log.Infof("Waiting for post %s to appear on chain", smsg.Cid())
// make sure it succeeds...
rec, err := m.api.ChainWaitMsg(ctx, smsg.Cid())
rec, err := m.api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return err
}