Merge branch 'master' into feat/remote-workers

This commit is contained in:
Łukasz Magiera 2019-12-04 14:10:15 +01:00
commit edd30c7aa1
39 changed files with 903 additions and 374 deletions

View File

@ -6,7 +6,7 @@ executors:
golang:
docker:
- image: circleci/golang:1.13
resource_class: 2xlarge
resource_class: 2xlarge
commands:
install-deps:
@ -14,25 +14,37 @@ commands:
- go/install-ssh
- go/install: {package: git}
prepare:
parameters:
linux:
default: true
description: is a linux build environment?
type: boolean
darwin:
default: false
description: is a darwin build environment?
type: boolean
steps:
- checkout
- run: sudo apt-get update
- run: sudo apt-get install ocl-icd-opencl-dev
- when:
condition: << parameters.linux >>
steps:
- run: sudo apt-get update
- run: sudo apt-get install ocl-icd-opencl-dev
- run: git submodule sync
- run: git submodule update --init
download-params:
steps:
- restore_cache:
name: Restore parameters cache
keys:
- 'v15-lotus-params-{{ checksum "build/proof-params/parameters.json" }}-{{ checksum "build/paramfetch.go" }}'
- 'v15-lotus-params-{{ checksum "build/proof-params/parameters.json" }}-'
keys:
- 'v19-lotus-params-{{ checksum "build/proof-params/parameters.json" }}-{{ checksum "build/paramfetch.go" }}'
- 'v19-lotus-params-{{ checksum "build/proof-params/parameters.json" }}-'
paths:
- /var/tmp/filecoin-proof-parameters/
- run: ./lotus fetch-params --include-test-params
- save_cache:
name: Save parameters cache
key: 'v15-lotus-params-{{ checksum "build/proof-params/parameters.json" }}-{{ checksum "build/paramfetch.go" }}'
key: 'v19-lotus-params-{{ checksum "build/proof-params/parameters.json" }}-{{ checksum "build/paramfetch.go" }}'
paths:
- /var/tmp/filecoin-proof-parameters/
@ -54,8 +66,15 @@ jobs:
- go/mod-download
- run: sudo apt-get update
- run: sudo apt-get install npm
- restore_cache:
name: restore go mod cache
key: v1-go-deps-{{ arch }}-{{ checksum "~/go/src/github.com/filecoin-project/lotus/go.mod" }}
- run:
command: make buildall
- store_artifacts:
path: lotus
- store_artifacts:
path: lotus-storage-miner
test:
description: |
@ -95,6 +114,9 @@ jobs:
- install-deps
- prepare
- go/mod-download
- restore_cache:
name: restore go mod cache
key: v1-go-deps-{{ arch }}-{{ checksum "~/go/src/github.com/filecoin-project/lotus/go.mod" }}
- run:
command: make deps lotus
no_output_timeout: 30m
@ -123,6 +145,63 @@ jobs:
shell: /bin/bash -eo pipefail
command: |
bash <(curl -s https://codecov.io/bash)
- save_cache:
name: save go mod cache
key: v1-go-deps-{{ arch }}-{{ checksum "~/go/src/github.com/filecoin-project/lotus/go.mod" }}
paths:
- "~/go/pkg"
- "~/go/src/github.com"
- "~/go/src/golang.org"
build_macos:
description: build darwin lotus binary
macos:
xcode: "10.0.0"
working_directory: ~/go/src/github.com/filecoin-project/lotus
steps:
- prepare:
linux: false
darwin: true
- run:
name: Install go
command: |
curl -O https://dl.google.com/go/go1.13.4.darwin-amd64.pkg && \
sudo installer -pkg go1.13.4.darwin-amd64.pkg -target /
- run:
name: Install pkg-config
command: HOMEBREW_NO_AUTO_UPDATE=1 brew install pkg-config
- run: go version
- run:
name: Install Rust
command: |
curl https://sh.rustup.rs -sSf | sh -s -- -y
- run:
name: Install jq
command: |
mkdir $HOME/.bin
curl --location https://github.com/stedolan/jq/releases/download/jq-1.6/jq-osx-amd64 --output $HOME/.bin/jq
chmod +x $HOME/.bin/jq
- restore_cache:
name: restore go mod and cargo cache
key: v1-go-deps-{{ arch }}-{{ checksum "~/go/src/github.com/filecoin-project/lotus/go.mod" }}
- install-deps
- go/mod-download
- run:
command: make build
no_output_timeout: 30m
- store_artifacts:
path: lotus
- store_artifacts:
path: lotus-storage-miner
- save_cache:
name: save go mod and cargo cache
key: v1-go-deps-{{ arch }}-{{ checksum "~/go/src/github.com/filecoin-project/lotus/go.mod" }}
paths:
- "~/go/pkg"
- "~/go/src/github.com"
- "~/go/src/golang.org"
- "~/.rustup"
- "~/.cargo"
lint: &lint
description: |
@ -177,4 +256,8 @@ workflows:
- test:
codecov-upload: true
- mod-tidy-check
- build-all
- build-all
- build_macos:
filters:
branches:
only: master

View File

@ -2,6 +2,7 @@ package api
import (
"context"
"time"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-filestore"
@ -263,6 +264,10 @@ type ActiveSync struct {
Stage SyncStateStage
Height uint64
Start time.Time
End time.Time
Message string
}
type SyncState struct {
@ -277,6 +282,7 @@ const (
StagePersistHeaders
StageMessages
StageSyncComplete
StageSyncErrored
)
type MpoolChange int

View File

@ -2,7 +2,7 @@ package api
import (
"context"
"fmt"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
@ -22,29 +22,30 @@ const (
Committing
Proving
SectorNoUpdate = UndefinedSectorState
SealFailed
PreCommitFailed
SealCommitFailed
CommitFailed
FailedUnrecoverable
)
func SectorStateStr(s SectorState) string {
switch s {
case UndefinedSectorState:
return "UndefinedSectorState"
case Empty:
return "Empty"
case Packing:
return "Packing"
case Unsealed:
return "Unsealed"
case PreCommitting:
return "PreCommitting"
case PreCommitted:
return "PreCommitted"
case Committing:
return "Committing"
case Proving:
return "Proving"
}
return fmt.Sprintf("<Unknown %d>", s)
var SectorStates = []string{
UndefinedSectorState: "UndefinedSectorState",
Empty: "Empty",
Packing: "Packing",
Unsealed: "Unsealed",
PreCommitting: "PreCommitting",
PreCommitted: "PreCommitted",
Committing: "Committing",
Proving: "Proving",
SealFailed: "SealFailed",
PreCommitFailed: "PreCommitFailed",
SealCommitFailed: "SealCommitFailed",
CommitFailed: "CommitFailed",
FailedUnrecoverable: "FailedUnrecoverable",
}
// StorageMiner is a low-level interface to the Filecoin network storage miner node
@ -83,6 +84,7 @@ type SectorInfo struct {
Deals []uint64
Ticket sectorbuilder.SealTicket
Seed sectorbuilder.SealSeed
LastErr string
}
type SealedRef struct {

View File

@ -1,3 +1,2 @@
/ip4/147.75.80.17/tcp/1347/p2p/12D3KooWPWCCqUN3gPEaFAMpAwfh5a6SryBEsFt5R2oK8oW86a4C
/ip6/2604:1380:2000:f400::1/tcp/36137/p2p/12D3KooWNL1fJPBArhsoqwg2wbXgCDTByMyg4ZGp6HjgWr9bgnaJ
/ip4/147.75.80.29/tcp/44397/p2p/12D3KooWNL1fJPBArhsoqwg2wbXgCDTByMyg4ZGp6HjgWr9bgnaJ
/ip4/147.75.80.29/tcp/1347/p2p/12D3KooWNL1fJPBArhsoqwg2wbXgCDTByMyg4ZGp6HjgWr9bgnaJ

View File

@ -4,9 +4,13 @@ package build
import "os"
var SectorSizes = []uint64{1024}
// Seconds
const BlockDelay = 6
const PropagationDelay = 3
// FallbackPoStDelay is the number of epochs the miner needs to wait after
// ElectionPeriodStart before starting fallback post computation
//

View File

@ -5,6 +5,8 @@ package build
// Seconds
const BlockDelay = 30
const PropagationDelay = 5
// FallbackPoStDelay is the number of epochs the miner needs to wait after
// ElectionPeriodStart before starting fallback post computation
//

View File

@ -23,6 +23,8 @@ import (
"golang.org/x/xerrors"
)
const MaxSectorID = 32 << 30 // 32 billion sectors should be good enough right?
type StorageMinerActor struct{}
type StorageMinerActorState struct {
@ -537,6 +539,9 @@ func SectorIsUnique(ctx context.Context, s types.Storage, sroot cid.Cid, sid uin
}
func AddToSectorSet(ctx context.Context, blks amt.Blocks, ss cid.Cid, sectorID uint64, commR, commD []byte) (cid.Cid, ActorError) {
if sectorID > MaxSectorID {
return cid.Undef, aerrors.Newf(25, "sector ID out of range: %d", sectorID)
}
ssr, err := amt.LoadAMT(blks, ss)
if err != nil {
return cid.Undef, aerrors.HandleExternalError(err, "could not load sector set node")
@ -557,6 +562,10 @@ func AddToSectorSet(ctx context.Context, blks amt.Blocks, ss cid.Cid, sectorID u
}
func GetFromSectorSet(ctx context.Context, s types.Storage, ss cid.Cid, sectorID uint64) (bool, []byte, []byte, ActorError) {
if sectorID > MaxSectorID {
return false, nil, nil, aerrors.Newf(25, "sector ID out of range: %d", sectorID)
}
ssr, err := amt.LoadAMT(types.WrapStorage(s), ss)
if err != nil {
return false, nil, nil, aerrors.HandleExternalError(err, "could not load sector set node")

View File

@ -5,7 +5,7 @@ import (
"io"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
cid "github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)

View File

@ -62,7 +62,7 @@ func (tsc *tipSetCache) add(ts *types.TipSet) error {
func (tsc *tipSetCache) revert(ts *types.TipSet) error {
if tsc.len == 0 {
return xerrors.New("tipSetCache.revert: nothing to revert; cache is empty")
return nil // this can happen, and it's fine
}
if !tsc.cache[tsc.start].Equals(ts) {
@ -92,7 +92,8 @@ func (tsc *tipSetCache) getNonNull(height uint64) (*types.TipSet, error) {
func (tsc *tipSetCache) get(height uint64) (*types.TipSet, error) {
if tsc.len == 0 {
return nil, xerrors.New("tipSetCache.get: cache is empty")
log.Warnf("tipSetCache.get: cache is empty, requesting from storage (h=%d)", height)
return tsc.storage(context.TODO(), height, nil)
}
headH := tsc.cache[tsc.start].Height()

View File

@ -261,13 +261,17 @@ func (cg *ChainGen) nextBlockProof(ctx context.Context, pts *types.TipSet, m add
VRFProof: vrfout,
}
win, eproof, err := IsRoundWinner(ctx, pts, round, m, cg.eppProvs[m], &mca{w: cg.w, sm: cg.sm})
eproofin, err := IsRoundWinner(ctx, pts, round, m, cg.eppProvs[m], &mca{w: cg.w, sm: cg.sm})
if err != nil {
return nil, nil, xerrors.Errorf("checking round winner failed: %w", err)
}
if !win {
if eproofin == nil {
return nil, tick, nil
}
eproof, err := ComputeProof(ctx, cg.eppProvs[m], eproofin)
if err != nil {
return nil, nil, xerrors.Errorf("computing proof: %w", err)
}
return eproof, tick, nil
}
@ -466,28 +470,35 @@ func (epp *eppProvider) ComputeProof(ctx context.Context, _ sectorbuilder.Sorted
return []byte("valid proof"), nil
}
func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner address.Address, epp ElectionPoStProver, a MiningCheckAPI) (bool, *types.EPostProof, error) {
type ProofInput struct {
sectors sectorbuilder.SortedPublicSectorInfo
hvrf []byte
winners []sectorbuilder.EPostCandidate
vrfout []byte
}
func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner address.Address, epp ElectionPoStProver, a MiningCheckAPI) (*ProofInput, error) {
r, err := a.ChainGetRandomness(ctx, ts.Key(), round-build.EcRandomnessLookback)
if err != nil {
return false, nil, xerrors.Errorf("chain get randomness: %w", err)
return nil, xerrors.Errorf("chain get randomness: %w", err)
}
mworker, err := a.StateMinerWorker(ctx, miner, ts)
if err != nil {
return false, nil, xerrors.Errorf("failed to get miner worker: %w", err)
return nil, xerrors.Errorf("failed to get miner worker: %w", err)
}
vrfout, err := ComputeVRF(ctx, a.WalletSign, mworker, miner, DSepElectionPost, r)
if err != nil {
return false, nil, xerrors.Errorf("failed to compute VRF: %w", err)
return nil, xerrors.Errorf("failed to compute VRF: %w", err)
}
pset, err := a.StateMinerProvingSet(ctx, miner, ts)
if err != nil {
return false, nil, xerrors.Errorf("failed to load proving set for miner: %w", err)
return nil, xerrors.Errorf("failed to load proving set for miner: %w", err)
}
if len(pset) == 0 {
return false, nil, nil
return nil, nil
}
var sinfos []ffi.PublicSectorInfo
@ -504,17 +515,17 @@ func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner add
hvrf := sha256.Sum256(vrfout)
candidates, err := epp.GenerateCandidates(ctx, sectors, hvrf[:])
if err != nil {
return false, nil, xerrors.Errorf("failed to generate electionPoSt candidates: %w", err)
return nil, xerrors.Errorf("failed to generate electionPoSt candidates: %w", err)
}
pow, err := a.StateMinerPower(ctx, miner, ts)
if err != nil {
return false, nil, xerrors.Errorf("failed to check power: %w", err)
return nil, xerrors.Errorf("failed to check power: %w", err)
}
ssize, err := a.StateMinerSectorSize(ctx, miner, ts)
if err != nil {
return false, nil, xerrors.Errorf("failed to look up miners sector size: %w", err)
return nil, xerrors.Errorf("failed to look up miners sector size: %w", err)
}
var winners []sectorbuilder.EPostCandidate
@ -526,19 +537,28 @@ func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner add
// no winners, sad
if len(winners) == 0 {
return false, nil, nil
return nil, nil
}
proof, err := epp.ComputeProof(ctx, sectors, hvrf[:], winners)
return &ProofInput{
sectors: sectors,
hvrf: hvrf[:],
winners: winners,
vrfout: vrfout,
}, nil
}
func ComputeProof(ctx context.Context, epp ElectionPoStProver, pi *ProofInput) (*types.EPostProof, error) {
proof, err := epp.ComputeProof(ctx, pi.sectors, pi.hvrf, pi.winners)
if err != nil {
return false, nil, xerrors.Errorf("failed to compute snark for election proof: %w", err)
return nil, xerrors.Errorf("failed to compute snark for election proof: %w", err)
}
ept := types.EPostProof{
Proof: proof,
PostRand: vrfout,
PostRand: pi.vrfout,
}
for _, win := range winners {
for _, win := range pi.winners {
ept.Candidates = append(ept.Candidates, types.EPostTicket{
Partial: win.PartialTicket[:],
SectorID: win.SectorID,
@ -546,7 +566,7 @@ func IsRoundWinner(ctx context.Context, ts *types.TipSet, round int64, miner add
})
}
return true, &ept, nil
return &ept, nil
}
type SignFunc func(context.Context, address.Address, []byte) (*types.Signature, error)

View File

@ -62,7 +62,7 @@ type MessagePool struct {
pending map[address.Address]*msgSet
pendingCount int
curTsLk sync.Mutex
curTsLk sync.Mutex // DO NOT LOCK INSIDE lk
curTs *types.TipSet
api Provider
@ -106,7 +106,7 @@ func (ms *msgSet) add(m *types.SignedMessage) error {
}
type Provider interface {
SubscribeHeadChanges(func(rev, app []*types.TipSet) error)
SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet
PutMessage(m store.ChainMsg) (cid.Cid, error)
PubSubPublish(string, []byte) error
StateGetActor(address.Address, *types.TipSet) (*types.Actor, error)
@ -124,8 +124,9 @@ func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider {
return &mpoolProvider{sm, ps}
}
func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) {
func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet {
mpp.sm.ChainStore().SubscribeHeadChanges(cb)
return mpp.sm.ChainStore().GetHeaviestTipSet()
}
func (mpp *mpoolProvider) PutMessage(m store.ChainMsg) (cid.Cid, error) {
@ -173,7 +174,7 @@ func New(api Provider, ds dtypes.MetadataDS) (*MessagePool, error) {
go mp.repubLocal()
api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
mp.curTs = api.SubscribeHeadChanges(func(rev, app []*types.TipSet) error {
err := mp.HeadChange(rev, app)
if err != nil {
log.Errorf("mpool head notif handler error: %+v", err)
@ -257,6 +258,12 @@ func (mp *MessagePool) Push(m *types.SignedMessage) error {
}
func (mp *MessagePool) Add(m *types.SignedMessage) error {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
return mp.addTs(m, mp.curTs)
}
func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error {
// big messages are bad, anti DOS
if m.Size() > 32*1024 {
return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig)
@ -275,7 +282,7 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error {
return err
}
snonce, err := mp.getStateNonce(m.Message.From)
snonce, err := mp.getStateNonce(m.Message.From, curTs)
if err != nil {
return xerrors.Errorf("failed to look up actor state nonce: %w", err)
}
@ -333,14 +340,17 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
}
func (mp *MessagePool) GetNonce(addr address.Address) (uint64, error) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.lk.Lock()
defer mp.lk.Unlock()
return mp.getNonceLocked(addr)
return mp.getNonceLocked(addr, mp.curTs)
}
func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
stateNonce, err := mp.getStateNonce(addr) // sanity check
func (mp *MessagePool) getNonceLocked(addr address.Address, curTs *types.TipSet) (uint64, error) {
stateNonce, err := mp.getStateNonce(addr, curTs) // sanity check
if err != nil {
return 0, err
}
@ -359,22 +369,9 @@ func (mp *MessagePool) getNonceLocked(addr address.Address) (uint64, error) {
return stateNonce, nil
}
func (mp *MessagePool) setCurTipset(ts *types.TipSet) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.curTs = ts
}
func (mp *MessagePool) getCurTipset() *types.TipSet {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
return mp.curTs
}
func (mp *MessagePool) getStateNonce(addr address.Address) (uint64, error) {
func (mp *MessagePool) getStateNonce(addr address.Address, curTs *types.TipSet) (uint64, error) {
// TODO: this method probably should be cached
curTs := mp.getCurTipset()
act, err := mp.api.StateGetActor(addr, curTs)
if err != nil {
return 0, err
@ -417,13 +414,16 @@ func (mp *MessagePool) getStateBalance(addr address.Address) (types.BigInt, erro
}
func (mp *MessagePool) PushWithNonce(addr address.Address, cb func(uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.lk.Lock()
defer mp.lk.Unlock()
if addr.Protocol() == address.ID {
log.Warnf("Called pushWithNonce with ID address (%s) this might not be handled properly yet", addr)
}
nonce, err := mp.getNonceLocked(addr)
nonce, err := mp.getNonceLocked(addr, mp.curTs)
if err != nil {
return nil, err
}
@ -485,15 +485,19 @@ func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
}
}
func (mp *MessagePool) Pending() []*types.SignedMessage {
func (mp *MessagePool) Pending() ([]*types.SignedMessage, *types.TipSet) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.lk.Lock()
defer mp.lk.Unlock()
out := make([]*types.SignedMessage, 0)
for a := range mp.pending {
out = append(out, mp.pendingFor(a)...)
}
return out
return out, mp.curTs
}
func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage {
@ -516,6 +520,8 @@ func (mp *MessagePool) pendingFor(a address.Address) []*types.SignedMessage {
}
func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet) error {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
for _, ts := range revert {
pts, err := mp.api.LoadTipSet(ts.Parents())
@ -523,27 +529,16 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
return err
}
mp.setCurTipset(pts)
for _, b := range ts.Blocks() {
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
if err != nil {
return xerrors.Errorf("failed to get messages for revert block %s(height %d): %w", b.Cid(), b.Height, err)
}
for _, msg := range smsgs {
if err := mp.Add(msg); err != nil {
log.Error(err) // TODO: probably lots of spam in multi-block tsets
}
}
msgs, err := mp.MessagesForBlocks(ts.Blocks())
if err != nil {
return err
}
for _, msg := range bmsgs {
smsg := mp.RecoverSig(msg)
if smsg != nil {
if err := mp.Add(smsg); err != nil {
log.Error(err) // TODO: probably lots of spam in multi-block tsets
}
} else {
log.Warnf("could not recover signature for bls message %s during a reorg revert", msg.Cid())
}
mp.curTs = pts
for _, msg := range msgs {
if err := mp.addTs(msg, pts); err != nil {
log.Error(err) // TODO: probably lots of spam in multi-block tsets
}
}
}
@ -562,12 +557,38 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
mp.Remove(msg.From, msg.Nonce)
}
}
mp.setCurTipset(ts)
mp.curTs = ts
}
return nil
}
func (mp *MessagePool) MessagesForBlocks(blks []*types.BlockHeader) ([]*types.SignedMessage, error) {
out := make([]*types.SignedMessage, 0)
for _, b := range blks {
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
if err != nil {
return nil, xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
}
for _, msg := range smsgs {
out = append(out, msg)
}
for _, msg := range bmsgs {
smsg := mp.RecoverSig(msg)
if smsg != nil {
out = append(out, smsg)
} else {
log.Warnf("could not recover signature for bls message %s", msg.Cid())
}
}
}
return out, nil
}
func (mp *MessagePool) RecoverSig(msg *types.Message) *types.SignedMessage {
val, ok := mp.blsSigCache.Get(msg.Cid())
if !ok {

View File

@ -52,8 +52,9 @@ func (tma *testMpoolApi) setBlockMessages(h *types.BlockHeader, msgs ...*types.S
tma.tipsets = append(tma.tipsets, mock.TipSet(h))
}
func (tma *testMpoolApi) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) {
func (tma *testMpoolApi) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet {
tma.cb = cb
return nil
}
func (tma *testMpoolApi) PutMessage(m store.ChainMsg) (cid.Cid, error) {
@ -216,7 +217,8 @@ func TestRevertMessages(t *testing.T) {
assertNonce(t, mp, sender, 4)
if len(mp.Pending()) != 3 {
p, _ := mp.Pending()
if len(p) != 3 {
t.Fatal("expected three messages in mempool")
}

View File

@ -195,6 +195,7 @@ func (sm *StateManager) computeTipSetState(ctx context.Context, blks []*types.Bl
}
if applied[m.From] != m.Nonce {
log.Infof("skipping message from %s: nonce check failed: exp %d, was %d", m.From, applied[m.From], m.Nonce)
continue
}
applied[m.From]++

View File

@ -290,7 +290,7 @@ func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet)
span.AddAttributes(trace.BoolAttribute("newHead", true))
log.Debugf("New heaviest tipset! %s", ts.Cids())
log.Infof("New heaviest tipset! %s (height=%d)", ts.Cids(), ts.Height())
cs.heaviest = ts
if err := cs.writeHead(ts); err != nil {

View File

@ -144,7 +144,11 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
bestPweight := syncer.store.GetHeaviestTipSet().Blocks()[0].ParentWeight
targetWeight := fts.TipSet().Blocks()[0].ParentWeight
if targetWeight.LessThan(bestPweight) {
log.Warn("incoming tipset does not appear to be better than our best chain, ignoring for now")
var miners []string
for _, blk := range fts.TipSet().Blocks() {
miners = append(miners, blk.Miner.String())
}
log.Warnf("incoming tipset from %s does not appear to be better than our best chain, ignoring for now", miners)
return
}
@ -538,6 +542,9 @@ func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock) err
if h.Timestamp > uint64(time.Now().Unix()+build.AllowableClockDrift) {
return xerrors.Errorf("block was from the future")
}
if h.Timestamp > uint64(time.Now().Unix()) {
log.Warn("Got block from the future, but within threshold", h.Timestamp, time.Now().Unix())
}
if h.Timestamp < baseTs.MinTimestamp()+(build.BlockDelay*(h.Height-baseTs.Height())) {
log.Warn("timestamp funtimes: ", h.Timestamp, baseTs.MinTimestamp(), h.Height, baseTs.Height())
@ -1112,6 +1119,7 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error
headers, err := syncer.collectHeaders(ctx, ts, syncer.store.GetHeaviestTipSet())
if err != nil {
ss.Error(err)
return err
}
@ -1128,14 +1136,18 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error
toPersist = append(toPersist, ts.Blocks()...)
}
if err := syncer.store.PersistBlockHeaders(toPersist...); err != nil {
return xerrors.Errorf("failed to persist synced blocks to the chainstore: %w", err)
err = xerrors.Errorf("failed to persist synced blocks to the chainstore: %w", err)
ss.Error(err)
return err
}
toPersist = nil
ss.SetStage(api.StageMessages)
if err := syncer.syncMessagesAndCheckState(ctx, headers); err != nil {
return xerrors.Errorf("collectChain syncMessages: %w", err)
err = xerrors.Errorf("collectChain syncMessages: %w", err)
ss.Error(err)
return err
}
ss.SetStage(api.StageSyncComplete)

View File

@ -71,7 +71,6 @@ func (sm *SyncManager) Stop() {
}
func (sm *SyncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet) {
log.Info("set peer head!", ts.Height(), ts.Cids())
sm.lk.Lock()
defer sm.lk.Unlock()
sm.peerHeads[p] = ts
@ -336,7 +335,6 @@ func (sm *SyncManager) syncWorker(id int) {
log.Info("sync manager worker shutting down")
return
}
log.Info("sync worker go time!", ts.Height(), ts.Cids())
ctx := context.WithValue(context.TODO(), syncStateKey{}, ss)
err := sm.doSync(ctx, ts)

View File

@ -3,6 +3,7 @@ package chain
import (
"fmt"
"sync"
"time"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
@ -18,17 +19,22 @@ func SyncStageString(v api.SyncStateStage) string {
return "message sync"
case api.StageSyncComplete:
return "complete"
case api.StageSyncErrored:
return "error"
default:
return fmt.Sprintf("<unknown: %d>", v)
}
}
type SyncerState struct {
lk sync.Mutex
Target *types.TipSet
Base *types.TipSet
Stage api.SyncStateStage
Height uint64
lk sync.Mutex
Target *types.TipSet
Base *types.TipSet
Stage api.SyncStateStage
Height uint64
Message string
Start time.Time
End time.Time
}
func (ss *SyncerState) SetStage(v api.SyncStateStage) {
@ -39,6 +45,9 @@ func (ss *SyncerState) SetStage(v api.SyncStateStage) {
ss.lk.Lock()
defer ss.lk.Unlock()
ss.Stage = v
if v == api.StageSyncComplete {
ss.End = time.Now()
}
}
func (ss *SyncerState) Init(base, target *types.TipSet) {
@ -52,6 +61,9 @@ func (ss *SyncerState) Init(base, target *types.TipSet) {
ss.Base = base
ss.Stage = api.StageHeaders
ss.Height = 0
ss.Message = ""
ss.Start = time.Now()
ss.End = time.Time{}
}
func (ss *SyncerState) SetHeight(h uint64) {
@ -64,13 +76,28 @@ func (ss *SyncerState) SetHeight(h uint64) {
ss.Height = h
}
func (ss *SyncerState) Error(err error) {
if ss == nil {
return
}
ss.lk.Lock()
defer ss.lk.Unlock()
ss.Message = err.Error()
ss.Stage = api.StageSyncErrored
ss.End = time.Now()
}
func (ss *SyncerState) Snapshot() SyncerState {
ss.lk.Lock()
defer ss.lk.Unlock()
return SyncerState{
Base: ss.Base,
Target: ss.Target,
Stage: ss.Stage,
Height: ss.Height,
Base: ss.Base,
Target: ss.Target,
Stage: ss.Stage,
Height: ss.Height,
Message: ss.Message,
Start: ss.Start,
End: ss.End,
}
}

View File

@ -5,7 +5,7 @@ import (
"io"
"math"
"github.com/ipfs/go-cid"
cid "github.com/ipfs/go-cid"
cbg "github.com/whyrusleeping/cbor-gen"
xerrors "golang.org/x/xerrors"
)

View File

@ -1,6 +1,7 @@
package cli
import (
"context"
"fmt"
"time"
@ -25,14 +26,14 @@ var syncStatusCmd = &cli.Command{
Name: "status",
Usage: "check sync status",
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
apic, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := ReqContext(cctx)
state, err := api.SyncState(ctx)
state, err := apic.SyncState(ctx)
if err != nil {
return err
}
@ -42,6 +43,7 @@ var syncStatusCmd = &cli.Command{
fmt.Printf("worker %d:\n", i)
var base, target []cid.Cid
var heightDiff int64
var theight uint64
if ss.Base != nil {
base = ss.Base.Cids()
heightDiff = int64(ss.Base.Height())
@ -49,14 +51,25 @@ var syncStatusCmd = &cli.Command{
if ss.Target != nil {
target = ss.Target.Cids()
heightDiff = int64(ss.Target.Height()) - heightDiff
theight = ss.Target.Height()
} else {
heightDiff = 0
}
fmt.Printf("\tBase:\t%s\n", base)
fmt.Printf("\tTarget:\t%s\n", target)
fmt.Printf("\tTarget:\t%s (%d)\n", target, theight)
fmt.Printf("\tHeight diff:\t%d\n", heightDiff)
fmt.Printf("\tStage: %s\n", chain.SyncStageString(ss.Stage))
fmt.Printf("\tHeight: %d\n", ss.Height)
if ss.End.IsZero() {
if !ss.Start.IsZero() {
fmt.Printf("\tElapsed: %s\n", time.Since(ss.Start))
}
} else {
fmt.Printf("\tElapsed: %s\n", ss.End.Sub(ss.Start))
}
if ss.Stage == api.StageSyncErrored {
fmt.Printf("\tError: %s\n", ss.Message)
}
}
return nil
},
@ -73,48 +86,52 @@ var syncWaitCmd = &cli.Command{
defer closer()
ctx := ReqContext(cctx)
for {
state, err := napi.SyncState(ctx)
if err != nil {
return err
}
head, err := napi.ChainHead(ctx)
if err != nil {
return err
}
working := 0
for i, ss := range state.ActiveSyncs {
switch ss.Stage {
case api.StageSyncComplete:
default:
working = i
case api.StageIdle:
// not complete, not actively working
}
}
ss := state.ActiveSyncs[working]
var target []cid.Cid
if ss.Target != nil {
target = ss.Target.Cids()
}
fmt.Printf("\r\x1b[2KWorker %d: Target: %s\tState: %s\tHeight: %d", working, target, chain.SyncStageString(ss.Stage), ss.Height)
if time.Now().Unix()-int64(head.MinTimestamp()) < build.BlockDelay {
fmt.Println("\nDone!")
return nil
}
select {
case <-ctx.Done():
fmt.Println("\nExit by user")
return nil
case <-time.After(1 * time.Second):
}
}
return SyncWait(ctx, napi)
},
}
func SyncWait(ctx context.Context, napi api.FullNode) error {
for {
state, err := napi.SyncState(ctx)
if err != nil {
return err
}
head, err := napi.ChainHead(ctx)
if err != nil {
return err
}
working := 0
for i, ss := range state.ActiveSyncs {
switch ss.Stage {
case api.StageSyncComplete:
default:
working = i
case api.StageIdle:
// not complete, not actively working
}
}
ss := state.ActiveSyncs[working]
var target []cid.Cid
if ss.Target != nil {
target = ss.Target.Cids()
}
fmt.Printf("\r\x1b[2KWorker %d: Target: %s\tState: %s\tHeight: %d", working, target, chain.SyncStageString(ss.Stage), ss.Height)
if time.Now().Unix()-int64(head.MinTimestamp()) < build.BlockDelay {
fmt.Println("\nDone!")
return nil
}
select {
case <-ctx.Done():
fmt.Println("\nExit by user")
return nil
case <-time.After(1 * time.Second):
}
}
}

View File

@ -160,6 +160,26 @@ create table if not exists mpool_messages
create unique index if not exists mpool_messages_msg_uindex
on mpool_messages (msg);
create table if not exists receipts
(
msg text not null
constraint receipts_messages_cid_fk
references messages,
state text not null
constraint receipts_blocks_parentStateRoot_fk
references blocks (parentStateRoot),
idx int not null,
exit int not null,
gas_used int not null,
return blob,
constraint receipts_pk
primary key (msg, state)
);
create index if not exists receipts_msg_state_index
on receipts (msg, state);
create table if not exists miner_heads
(
head text not null
@ -342,6 +362,34 @@ func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error {
return tx.Commit()
}
func (st *storage) storeReceipts(recs map[mrec]*types.MessageReceipt) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`insert into receipts (msg, state, idx, exit, gas_used, return) VALUES (?, ?, ?, ?, ?, ?) on conflict do nothing`)
if err != nil {
return err
}
defer stmt.Close()
for c, m := range recs {
if _, err := stmt.Exec(
c.msg.String(),
c.state.String(),
c.idx,
m.ExitCode,
m.GasUsed.String(),
m.Return,
); err != nil {
return err
}
}
return tx.Commit()
}
func (st *storage) storeAddressMap(addrs map[address.Address]address.Address) error {
tx, err := st.db.Begin()
if err != nil {

View File

@ -233,6 +233,15 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
return
}
log.Infof("Getting parent receipts")
receipts := fetchParentReceipts(ctx, api, toSync)
if err := st.storeReceipts(receipts); err != nil {
log.Error(err)
return
}
log.Infof("Resolving addresses")
for _, message := range msgs {
@ -290,3 +299,39 @@ func fetchMessages(ctx context.Context, api api.FullNode, toSync map[cid.Cid]*ty
return messages, inclusions
}
type mrec struct {
msg cid.Cid
state cid.Cid
idx int
}
func fetchParentReceipts(ctx context.Context, api api.FullNode, toSync map[cid.Cid]*types.BlockHeader) map[mrec]*types.MessageReceipt {
var lk sync.Mutex
out := map[mrec]*types.MessageReceipt{}
par(50, maparr(toSync), func(header *types.BlockHeader) {
recs, err := api.ChainGetParentReceipts(ctx, header.Cid())
if err != nil {
log.Error(err)
return
}
msgs, err := api.ChainGetParentMessages(ctx, header.Cid())
if err != nil {
log.Error(err)
return
}
lk.Lock()
for i, r := range recs {
out[mrec{
msg: msgs[i].Cid,
state: header.ParentStateRoot,
idx: i,
}] = r
}
lk.Unlock()
})
return out
}

View File

@ -131,7 +131,7 @@ func sectorsInfo(ctx context.Context, napi api.StorageMiner) (map[string]int, er
return nil, err
}
out[api.SectorStateStr(st.State)]++
out[api.SectorStates[st.State]]++
}
return out, nil

View File

@ -80,6 +80,23 @@ var initCmd = &cli.Command{
return xerrors.Errorf("fetching proof parameters: %w", err)
}
log.Info("Trying to connect to full node RPC")
api, closer, err := lcli.GetFullNodeAPI(cctx) // TODO: consider storing full node address in config
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
log.Info("Checking full node sync status")
if !cctx.Bool("genesis-miner") {
if err := lcli.SyncWait(ctx, api); err != nil {
return xerrors.Errorf("sync wait: %w", err)
}
}
log.Info("Checking if repo exists")
repoPath := cctx.String(FlagStorageRepo)
@ -96,15 +113,6 @@ var initCmd = &cli.Command{
return xerrors.Errorf("repo at '%s' is already initialized", cctx.String(FlagStorageRepo))
}
log.Info("Trying to connect to full node RPC")
api, closer, err := lcli.GetFullNodeAPI(cctx) // TODO: consider storing full node address in config
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
log.Info("Checking full node version")
v, err := api.Version(ctx)

View File

@ -41,6 +41,10 @@ var runCmd = &cli.Command{
Usage: "Enable use of GPU for mining operations",
Value: true,
},
&cli.BoolFlag{
Name: "nosync",
Usage: "Don't check full-node sync status",
},
},
Action: func(cctx *cli.Context) error {
if err := build.GetParams(true, false); err != nil {
@ -67,6 +71,14 @@ var runCmd = &cli.Command{
return xerrors.Errorf("lotus-daemon API version doesn't match: local: ", api.Version{APIVersion: build.APIVersion})
}
log.Info("Checking full node sync status")
if !cctx.Bool("nosync") {
if err := lcli.SyncWait(ctx, nodeApi); err != nil {
return xerrors.Errorf("sync wait: %w", err)
}
}
storageRepoPath := cctx.String(FlagStorageRepo)
r, err := repo.NewFS(storageRepoPath)
if err != nil {

View File

@ -61,7 +61,7 @@ var sectorsStatusCmd = &cli.Command{
}
fmt.Printf("SectorID:\t%d\n", status.SectorID)
fmt.Printf("Status:\t%s\n", api.SectorStateStr(status.State))
fmt.Printf("Status:\t%s\n", api.SectorStates[status.State])
fmt.Printf("CommD:\t\t%x\n", status.CommD)
fmt.Printf("CommR:\t\t%x\n", status.CommR)
fmt.Printf("Ticket:\t\t%x\n", status.Ticket.TicketBytes)
@ -70,6 +70,9 @@ var sectorsStatusCmd = &cli.Command{
fmt.Printf("SeedH:\t\t%d\n", status.Seed.BlockHeight)
fmt.Printf("Proof:\t\t%x\n", status.Proof)
fmt.Printf("Deals:\t\t%v\n", status.Deals)
if status.LastErr != "" {
fmt.Printf("Last Error:\t\t%s\n", status.LastErr)
}
return nil
},
}
@ -132,7 +135,7 @@ var sectorsListCmd = &cli.Command{
fmt.Printf("%d: %s\tsSet: %s\tpSet: %s\ttktH: %d\tseedH: %d\tdeals: %v\n",
s,
api.SectorStateStr(st.State),
api.SectorStates[st.State],
yesno(inSSet),
yesno(inPSet),
st.Ticket.BlockHeight,

2
go.mod
View File

@ -11,7 +11,7 @@ require (
github.com/fatih/color v1.7.0 // indirect
github.com/filecoin-project/chain-validation v0.0.0-20191106200742-11986803c0f7
github.com/filecoin-project/filecoin-ffi v0.0.0-00010101000000-000000000000
github.com/filecoin-project/go-amt-ipld v0.0.0-20191122035745-59b9dfc0efc7
github.com/filecoin-project/go-amt-ipld v0.0.0-20191203073133-f941215342ed
github.com/gbrlsnchs/jwt/v3 v3.0.0-beta.1
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/google/go-cmp v0.3.1 // indirect

4
go.sum
View File

@ -78,8 +78,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E=
github.com/filecoin-project/chain-validation v0.0.0-20191106200742-11986803c0f7 h1:Ags/z6ZubzKonQ9PsY9fO439yGdVg07qpdxfv/AEUno=
github.com/filecoin-project/chain-validation v0.0.0-20191106200742-11986803c0f7/go.mod h1:0/0/QUNqpF/jVzLHFncGeT3NvGPODBhGzQlNgzmoZew=
github.com/filecoin-project/go-amt-ipld v0.0.0-20191122035745-59b9dfc0efc7 h1:lKSMm8Go6qI7+Dk3rWCNIh57wBOqVNJ21re/p7D58gc=
github.com/filecoin-project/go-amt-ipld v0.0.0-20191122035745-59b9dfc0efc7/go.mod h1:lKjJYPg2kwbav5f78i5YA8kGccnZn18IySbpneXvaQs=
github.com/filecoin-project/go-amt-ipld v0.0.0-20191203073133-f941215342ed h1:Wt4+eF3fda6MKLjK0/zzBOxs5cUwGyucJSAfO4LnX/w=
github.com/filecoin-project/go-amt-ipld v0.0.0-20191203073133-f941215342ed/go.mod h1:KsFPWjF+UUYl6n9A+qbg4bjFgAOneicFZtDH/LQEX2U=
github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543 h1:aMJGfgqe1QDhAVwxRg5fjCRF533xHidiKsugk7Vvzug=
github.com/filecoin-project/go-leb128 v0.0.0-20190212224330-8d79a5489543/go.mod h1:mjrHv1cDGJWDlGmC0eDc1E5VJr8DmL9XMUcaFwiuKg8=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=

View File

@ -17,17 +17,21 @@ import (
"golang.org/x/xerrors"
)
const MaxMessagesPerBlock = 4000
var log = logging.Logger("miner")
type waitFunc func(ctx context.Context) error
type waitFunc func(ctx context.Context, baseTime uint64) error
func NewMiner(api api.FullNode, epp gen.ElectionPoStProver) *Miner {
return &Miner{
api: api,
epp: epp,
waitFunc: func(ctx context.Context) error {
waitFunc: func(ctx context.Context, baseTime uint64) error {
// Wait around for half the block time in case other parents come in
time.Sleep(build.BlockDelay * time.Second / 2)
deadline := baseTime + build.PropagationDelay
time.Sleep(time.Until(time.Unix(int64(deadline), 0)))
return nil
},
}
@ -141,8 +145,15 @@ eventLoop:
addrs := m.addresses
m.lk.Unlock()
// Sleep a small amount in order to wait for other blocks to arrive
if err := m.waitFunc(ctx); err != nil {
prebase, err := m.GetBestMiningCandidate(ctx)
if err != nil {
log.Errorf("failed to get best mining candidate: %s", err)
time.Sleep(time.Second * 5)
continue
}
// Wait until propagation delay period after block we plan to mine on
if err := m.waitFunc(ctx, prebase.ts.MinTimestamp()); err != nil {
log.Error(err)
return
}
@ -159,6 +170,8 @@ eventLoop:
}
lastBase = *base
log.Infof("Time delta between now and our mining base: %ds", uint64(time.Now().Unix())-base.ts.MinTimestamp())
blks := make([]*types.BlockMsg, 0)
for _, addr := range addrs {
@ -232,9 +245,8 @@ func (m *Miner) GetBestMiningCandidate(ctx context.Context) (*MiningBase, error)
}
}
return &MiningBase{
ts: bts,
}, nil
m.lastWork = &MiningBase{ts: bts}
return m.lastWork, nil
}
func (m *Miner) hasPower(ctx context.Context, addr address.Address, ts *types.TipSet) (bool, error) {
@ -265,24 +277,34 @@ func (m *Miner) mineOne(ctx context.Context, addr address.Address, base *MiningB
return nil, xerrors.Errorf("scratching ticket failed: %w", err)
}
win, proof, err := gen.IsRoundWinner(ctx, base.ts, int64(base.ts.Height()+base.nullRounds+1), addr, m.epp, m.api)
proofin, err := gen.IsRoundWinner(ctx, base.ts, int64(base.ts.Height()+base.nullRounds+1), addr, m.epp, m.api)
if err != nil {
return nil, xerrors.Errorf("failed to check if we win next round: %w", err)
}
if !win {
if proofin == nil {
base.nullRounds++
return nil, nil
}
b, err := m.createBlock(base, addr, ticket, proof)
// get pending messages early,
pending, err := m.api.MpoolPending(context.TODO(), base.ts)
if err != nil {
return nil, xerrors.Errorf("failed to get pending messages: %w", err)
}
proof, err := gen.ComputeProof(ctx, m.epp, proofin)
if err != nil {
return nil, xerrors.Errorf("computing election proof: %w", err)
}
b, err := m.createBlock(base, addr, ticket, proof, pending)
if err != nil {
return nil, xerrors.Errorf("failed to create block: %w", err)
}
log.Infow("mined new block", "cid", b.Cid(), "height", b.Header.Height)
dur := time.Now().Sub(start)
log.Infof("Creating block took %s", dur)
dur := time.Since(start)
log.Infow("mined new block", "cid", b.Cid(), "height", b.Header.Height, "took", dur)
if dur > time.Second*build.BlockDelay {
log.Warn("CAUTION: block production took longer than the block delay. Your computer may not be fast enough to keep up")
}
@ -335,13 +357,7 @@ func (m *Miner) computeTicket(ctx context.Context, addr address.Address, base *M
}, nil
}
func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *types.Ticket, proof *types.EPostProof) (*types.BlockMsg, error) {
pending, err := m.api.MpoolPending(context.TODO(), base.ts)
if err != nil {
return nil, xerrors.Errorf("failed to get pending messages: %w", err)
}
func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *types.Ticket, proof *types.EPostProof, pending []*types.SignedMessage) (*types.BlockMsg, error) {
msgs, err := selectMessages(context.TODO(), m.api.StateGetActor, base, pending)
if err != nil {
return nil, xerrors.Errorf("message filtering failed: %w", err)
@ -357,10 +373,21 @@ func (m *Miner) createBlock(base *MiningBase, addr address.Address, ticket *type
type actorLookup func(context.Context, address.Address, *types.TipSet) (*types.Actor, error)
func countFrom(msgs []*types.SignedMessage, from address.Address) (out int) {
for _, msg := range msgs {
if msg.Message.From == from {
out++
}
}
return out
}
func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs []*types.SignedMessage) ([]*types.SignedMessage, error) {
out := make([]*types.SignedMessage, 0, len(msgs))
inclNonces := make(map[address.Address]uint64)
inclBalances := make(map[address.Address]types.BigInt)
inclCount := make(map[address.Address]int)
for _, msg := range msgs {
if msg.Message.To == address.Undef {
log.Warnf("message in mempool had bad 'To' address")
@ -368,12 +395,13 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs
}
from := msg.Message.From
act, err := al(ctx, from, base.ts)
if err != nil {
return nil, xerrors.Errorf("failed to check message sender balance: %w", err)
}
if _, ok := inclNonces[from]; !ok {
act, err := al(ctx, from, base.ts)
if err != nil {
return nil, xerrors.Errorf("failed to check message sender balance: %w", err)
}
inclNonces[from] = act.Nonce
inclBalances[from] = act.Balance
}
@ -384,19 +412,23 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs
}
if msg.Message.Nonce > inclNonces[from] {
log.Warnf("message in mempool has too high of a nonce (%d > %d) %s", msg.Message.Nonce, inclNonces[from], msg.Cid())
log.Warnf("message in mempool has too high of a nonce (%d > %d, from %s, inclcount %d) %s (%d pending for orig)", msg.Message.Nonce, inclNonces[from], from, inclCount[from], msg.Cid(), countFrom(msgs, from))
continue
}
if msg.Message.Nonce < inclNonces[from] {
log.Warnf("message in mempool has already used nonce (%d < %d), from %s, to %s, %s", msg.Message.Nonce, inclNonces[from], msg.Message.From, msg.Message.To, msg.Cid())
log.Warnf("message in mempool has already used nonce (%d < %d), from %s, to %s, %s (%d pending for)", msg.Message.Nonce, inclNonces[from], msg.Message.From, msg.Message.To, msg.Cid(), countFrom(msgs, from))
continue
}
inclNonces[from] = msg.Message.Nonce + 1
inclBalances[from] = types.BigSub(inclBalances[from], msg.Message.RequiredFunds())
inclCount[from]++
out = append(out, msg)
if len(out) >= MaxMessagesPerBlock {
break
}
}
return out, nil
}

View File

@ -23,8 +23,8 @@ func NewTestMiner(nextCh <-chan struct{}, addr address.Address) func(api.FullNod
}
}
func chanWaiter(next <-chan struct{}) func(ctx context.Context) error {
return func(ctx context.Context) error {
func chanWaiter(next <-chan struct{}) func(ctx context.Context, _ uint64) error {
return func(ctx context.Context, _ uint64) error {
select {
case <-ctx.Done():
return ctx.Err()

View File

@ -3,12 +3,14 @@ package full
import (
"context"
"github.com/ipfs/go-cid"
"go.uber.org/fx"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
@ -17,13 +19,63 @@ type MpoolAPI struct {
WalletAPI
Chain *store.ChainStore
Mpool *messagepool.MessagePool
}
func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) {
// TODO: need to make sure we don't return messages that were already included in the referenced chain
// also need to accept ts == nil just fine, assume nil == chain.Head()
return a.Mpool.Pending(), nil
pending, mpts := a.Mpool.Pending()
haveCids := map[cid.Cid]struct{}{}
for _, m := range pending {
haveCids[m.Cid()] = struct{}{}
}
if ts == nil || mpts.Height() > ts.Height() {
return pending, nil
}
for {
if mpts.Height() == ts.Height() {
if mpts.Equals(ts) {
return pending, nil
}
// different blocks in tipsets
have, err := a.Mpool.MessagesForBlocks(ts.Blocks())
if err != nil {
return nil, xerrors.Errorf("getting messages for base ts: %w", err)
}
for _, m := range have {
haveCids[m.Cid()] = struct{}{}
}
}
msgs, err := a.Mpool.MessagesForBlocks(ts.Blocks())
if err != nil {
return nil, xerrors.Errorf(": %w", err)
}
for _, m := range msgs {
if _, ok := haveCids[m.Cid()]; ok {
continue
}
haveCids[m.Cid()] = struct{}{}
pending = append(pending, m)
}
if mpts.Height() >= ts.Height() {
return pending, nil
}
ts, err = a.Chain.LoadTipSet(ts.Parents())
if err != nil {
return nil, xerrors.Errorf("loading parent tipset: %w", err)
}
}
}
func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) error {

View File

@ -26,10 +26,13 @@ func (a *SyncAPI) SyncState(ctx context.Context) (*api.SyncState, error) {
for _, ss := range states {
out.ActiveSyncs = append(out.ActiveSyncs, api.ActiveSync{
Base: ss.Base,
Target: ss.Target,
Stage: ss.Stage,
Height: ss.Height,
Base: ss.Base,
Target: ss.Target,
Stage: ss.Stage,
Height: ss.Height,
Start: ss.Start,
End: ss.End,
Message: ss.Message,
})
}
return out, nil

View File

@ -182,6 +182,8 @@ func (sm *StorageMinerAPI) SectorsStatus(ctx context.Context, sid uint64) (api.S
Deals: deals,
Ticket: info.Ticket.SB(),
Seed: info.Seed.SB(),
LastErr: info.LastErr,
}, nil
}

View File

@ -239,7 +239,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
_, err := w.Write(cbg.CborNull)
return err
}
if _, err := w.Write([]byte{140}); err != nil {
if _, err := w.Write([]byte{141}); err != nil {
return err
}
@ -337,6 +337,13 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
}
}
// t.t.LastErr (string) (string)
if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.LastErr)))); err != nil {
return err
}
if _, err := w.Write([]byte(t.LastErr)); err != nil {
return err
}
return nil
}
@ -351,7 +358,7 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
return fmt.Errorf("cbor input should be of type array")
}
if extra != 12 {
if extra != 13 {
return fmt.Errorf("cbor input had wrong number of fields")
}
@ -552,5 +559,15 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
}
}
// t.t.LastErr (string) (string)
{
sval, err := cbg.ReadString(br)
if err != nil {
return err
}
t.LastErr = string(sval)
}
return nil
}

View File

@ -49,10 +49,17 @@ func (s *fpostScheduler) run(ctx context.Context) {
panic(err)
}
defer s.abortActivePoSt()
// not fine to panic after this point
for {
select {
case changes := <-notifs:
case changes, ok := <-notifs:
if !ok {
log.Warn("fpostScheduler notifs channel closed")
return
}
ctx, span := trace.StartSpan(ctx, "fpostScheduler.headChange")
var lowest, highest *types.TipSet = s.cur, nil
@ -74,6 +81,8 @@ func (s *fpostScheduler) run(ctx context.Context) {
}
span.End()
case <-ctx.Done():
return
}
}
}
@ -102,10 +111,12 @@ func (s *fpostScheduler) update(ctx context.Context, new *types.TipSet) error {
return err
}
if newEPS != s.activeEPS {
s.abortActivePoSt()
if newEPS == s.activeEPS {
return nil
}
s.abortActivePoSt()
if newEPS != Inactive && start {
s.doPost(ctx, newEPS, new)
}
@ -124,7 +135,7 @@ func (s *fpostScheduler) abortActivePoSt() {
log.Warnf("Aborting Fallback PoSt (EPS: %d)", s.activeEPS)
s.activeEPS = 0
s.activeEPS = Inactive
s.abort = nil
}

View File

@ -12,29 +12,24 @@ import (
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
type providerHandlerFunc func(ctx context.Context, deal SectorInfo) (func(*SectorInfo), error)
type providerHandlerFunc func(ctx context.Context, deal SectorInfo) *sectorUpdate
func (m *Miner) handleSectorUpdate(ctx context.Context, sector SectorInfo, cb providerHandlerFunc, next api.SectorState) {
func (m *Miner) handleSectorUpdate(ctx context.Context, sector SectorInfo, cb providerHandlerFunc) {
go func() {
mut, err := cb(ctx, sector)
update := cb(ctx, sector)
if err == nil && next == api.SectorNoUpdate {
return
if update == nil {
return // async
}
select {
case m.sectorUpdated <- sectorUpdate{
newState: next,
id: sector.SectorID,
err: err,
mut: mut,
}:
case m.sectorUpdated <- *update:
case <-m.stop:
}
}()
}
func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) {
func (m *Miner) handlePacking(ctx context.Context, sector SectorInfo) *sectorUpdate {
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID)
var allocated uint64
@ -45,12 +40,12 @@ func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*Sec
ubytes := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize())
if allocated > ubytes {
return nil, xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes)
return sector.upd().fatal(xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes))
}
fillerSizes, err := fillersFromRem(ubytes - allocated)
if err != nil {
return nil, err
return sector.upd().fatal(err)
}
if len(fillerSizes) > 0 {
@ -59,27 +54,27 @@ func (m *Miner) finishPacking(ctx context.Context, sector SectorInfo) (func(*Sec
pieces, err := m.storeGarbage(ctx, sector.SectorID, sector.existingPieces(), fillerSizes...)
if err != nil {
return nil, xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
return sector.upd().fatal(xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err))
}
return func(info *SectorInfo) {
return sector.upd().to(api.Unsealed).state(func(info *SectorInfo) {
info.Pieces = append(info.Pieces, pieces...)
}, nil
})
}
func (m *Miner) sealPreCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) {
func (m *Miner) handleUnsealed(ctx context.Context, sector SectorInfo) *sectorUpdate {
log.Infow("performing sector replication...", "sector", sector.SectorID)
ticket, err := m.tktFn(ctx)
if err != nil {
return nil, err
return sector.upd().fatal(err)
}
rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos())
if err != nil {
return nil, xerrors.Errorf("seal pre commit failed: %w", err)
return sector.upd().to(api.SealFailed).error(xerrors.Errorf("seal pre commit failed: %w", err))
}
return func(info *SectorInfo) {
return sector.upd().to(api.PreCommitting).state(func(info *SectorInfo) {
info.CommC = rspco.CommC[:]
info.CommD = rspco.CommD[:]
info.CommR = rspco.CommR[:]
@ -88,10 +83,11 @@ func (m *Miner) sealPreCommit(ctx context.Context, sector SectorInfo) (func(*Sec
BlockHeight: ticket.BlockHeight,
TicketBytes: ticket.TicketBytes[:],
}
}, nil
})
}
func (m *Miner) preCommit(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) {
func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate {
params := &actors.SectorPreCommitInfo{
SectorNumber: sector.SectorID,
@ -101,7 +97,7 @@ func (m *Miner) preCommit(ctx context.Context, sector SectorInfo) (func(*SectorI
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return nil, xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)
return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr))
}
msg := &types.Message{
@ -117,26 +113,26 @@ func (m *Miner) preCommit(ctx context.Context, sector SectorInfo) (func(*SectorI
log.Info("submitting precommit for sector: ", sector.SectorID)
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
return nil, xerrors.Errorf("pushing message to mpool: %w", err)
return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err))
}
return func(info *SectorInfo) {
return sector.upd().to(api.PreCommitted).state(func(info *SectorInfo) {
mcid := smsg.Cid()
info.PreCommitMessage = &mcid
}, nil
})
}
func (m *Miner) preCommitted(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) {
func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sectorUpdate {
// would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts
log.Info("Sector precommitted: ", sector.SectorID)
mw, err := m.api.StateWaitMsg(ctx, *sector.PreCommitMessage)
if err != nil {
return nil, err
return sector.upd().to(api.PreCommitFailed).error(err)
}
if mw.Receipt.ExitCode != 0 {
log.Error("sector precommit failed: ", mw.Receipt.ExitCode)
return nil, err
return sector.upd().to(api.PreCommitFailed).error(err)
}
log.Info("precommit message landed on chain: ", sector.SectorID)
@ -146,19 +142,18 @@ func (m *Miner) preCommitted(ctx context.Context, sector SectorInfo) (func(*Sect
err = m.events.ChainAt(func(ctx context.Context, ts *types.TipSet, curH uint64) error {
rand, err := m.api.ChainGetRandomness(ctx, ts.Key(), int64(randHeight))
if err != nil {
return xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)
err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)
m.sectorUpdated <- *sector.upd().fatal(err)
return err
}
m.sectorUpdated <- sectorUpdate{
newState: api.Committing,
id: sector.SectorID,
mut: func(info *SectorInfo) {
info.Seed = SealSeed{
BlockHeight: randHeight,
TicketBytes: rand,
}
},
}
m.sectorUpdated <- *sector.upd().to(api.Committing).state(func(info *SectorInfo) {
info.Seed = SealSeed{
BlockHeight: randHeight,
TicketBytes: rand,
}
})
return nil
}, func(ctx context.Context, ts *types.TipSet) error {
@ -169,15 +164,15 @@ func (m *Miner) preCommitted(ctx context.Context, sector SectorInfo) (func(*Sect
log.Warn("waitForPreCommitMessage ChainAt errored: ", err)
}
return nil, nil
return nil
}
func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*SectorInfo), error) {
func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate {
log.Info("scheduling seal proof computation...")
proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco())
if err != nil {
return nil, xerrors.Errorf("computing seal proof failed: %w", err)
return sector.upd().to(api.SealCommitFailed).error(xerrors.Errorf("computing seal proof failed: %w", err))
}
// TODO: Consider splitting states and persist proof for faster recovery
@ -190,7 +185,7 @@ func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*Sector
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return nil, xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr))
}
msg := &types.Message{
@ -205,24 +200,24 @@ func (m *Miner) committing(ctx context.Context, sector SectorInfo) (func(*Sector
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
log.Error(xerrors.Errorf("pushing message to mpool: %w", err))
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err))
}
// TODO: Separate state before this wait, so we persist message cid?
mw, err := m.api.StateWaitMsg(ctx, smsg.Cid())
if err != nil {
return nil, xerrors.Errorf("failed to wait for porep inclusion: %w", err)
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("failed to wait for porep inclusion: %w", err))
}
if mw.Receipt.ExitCode != 0 {
log.Errorf("UNHANDLED: submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, smsg.Cid(), sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, params.Proof)
return nil, xerrors.New("UNHANDLED: submitting sector proof failed")
return sector.upd().fatal(xerrors.New("UNHANDLED: submitting sector proof failed"))
}
return func(info *SectorInfo) {
return sector.upd().to(api.Proving).state(func(info *SectorInfo) {
mcid := smsg.Cid()
info.CommitMessage = &mcid
info.Proof = proof
}, nil
})
}

114
storage/sector_types.go Normal file
View File

@ -0,0 +1,114 @@
package storage
import (
"context"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error)
type SealTicket struct {
BlockHeight uint64
TicketBytes []byte
}
func (t *SealTicket) SB() sectorbuilder.SealTicket {
out := sectorbuilder.SealTicket{BlockHeight: t.BlockHeight}
copy(out.TicketBytes[:], t.TicketBytes)
return out
}
type SealSeed struct {
BlockHeight uint64
TicketBytes []byte
}
func (t *SealSeed) SB() sectorbuilder.SealSeed {
out := sectorbuilder.SealSeed{BlockHeight: t.BlockHeight}
copy(out.TicketBytes[:], t.TicketBytes)
return out
}
type Piece struct {
DealID uint64
Size uint64
CommP []byte
}
func (p *Piece) ppi() (out sectorbuilder.PublicPieceInfo) {
out.Size = p.Size
copy(out.CommP[:], p.CommP)
return out
}
type SectorInfo struct {
State api.SectorState
SectorID uint64
// Packing
Pieces []Piece
// PreCommit
CommC []byte
CommD []byte
CommR []byte
CommRLast []byte
Proof []byte
Ticket SealTicket
PreCommitMessage *cid.Cid
// PreCommitted
Seed SealSeed
// Committing
CommitMessage *cid.Cid
// Debug
LastErr string
}
func (t *SectorInfo) upd() *sectorUpdate {
return &sectorUpdate{id: t.SectorID}
}
func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo {
out := make([]sectorbuilder.PublicPieceInfo, len(t.Pieces))
for i, piece := range t.Pieces {
out[i] = piece.ppi()
}
return out
}
func (t *SectorInfo) deals() []uint64 {
out := make([]uint64, len(t.Pieces))
for i, piece := range t.Pieces {
out[i] = piece.DealID
}
return out
}
func (t *SectorInfo) existingPieces() []uint64 {
out := make([]uint64, len(t.Pieces))
for i, piece := range t.Pieces {
out[i] = piece.Size
}
return out
}
func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput {
var out sectorbuilder.RawSealPreCommitOutput
copy(out.CommC[:], t.CommC)
copy(out.CommD[:], t.CommD)
copy(out.CommR[:], t.CommR)
copy(out.CommRLast[:], t.CommRLast)
return out
}

View File

@ -2,9 +2,9 @@ package storage
import (
"context"
"fmt"
"io"
cid "github.com/ipfs/go-cid"
xerrors "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
@ -12,68 +12,6 @@ import (
"github.com/filecoin-project/lotus/lib/sectorbuilder"
)
type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error)
type SealTicket struct {
BlockHeight uint64
TicketBytes []byte
}
func (t *SealTicket) SB() sectorbuilder.SealTicket {
out := sectorbuilder.SealTicket{BlockHeight: t.BlockHeight}
copy(out.TicketBytes[:], t.TicketBytes)
return out
}
type SealSeed struct {
BlockHeight uint64
TicketBytes []byte
}
func (t *SealSeed) SB() sectorbuilder.SealSeed {
out := sectorbuilder.SealSeed{BlockHeight: t.BlockHeight}
copy(out.TicketBytes[:], t.TicketBytes)
return out
}
type Piece struct {
DealID uint64
Size uint64
CommP []byte
}
func (p *Piece) ppi() (out sectorbuilder.PublicPieceInfo) {
out.Size = p.Size
copy(out.CommP[:], p.CommP)
return out
}
type SectorInfo struct {
State api.SectorState
SectorID uint64
// Packing
Pieces []Piece
// PreCommit
CommC []byte
CommD []byte
CommR []byte
CommRLast []byte
Proof []byte
Ticket SealTicket
PreCommitMessage *cid.Cid
// PreCommitted
Seed SealSeed
// Committing
CommitMessage *cid.Cid
}
type sectorUpdate struct {
newState api.SectorState
id uint64
@ -81,39 +19,40 @@ type sectorUpdate struct {
mut func(*SectorInfo)
}
func (t *SectorInfo) pieceInfos() []sectorbuilder.PublicPieceInfo {
out := make([]sectorbuilder.PublicPieceInfo, len(t.Pieces))
for i, piece := range t.Pieces {
out[i] = piece.ppi()
func (u *sectorUpdate) fatal(err error) *sectorUpdate {
return &sectorUpdate{
newState: api.FailedUnrecoverable,
id: u.id,
err: err,
mut: u.mut,
}
return out
}
func (t *SectorInfo) deals() []uint64 {
out := make([]uint64, len(t.Pieces))
for i, piece := range t.Pieces {
out[i] = piece.DealID
func (u *sectorUpdate) error(err error) *sectorUpdate {
return &sectorUpdate{
newState: u.newState,
id: u.id,
err: err,
mut: u.mut,
}
return out
}
func (t *SectorInfo) existingPieces() []uint64 {
out := make([]uint64, len(t.Pieces))
for i, piece := range t.Pieces {
out[i] = piece.Size
func (u *sectorUpdate) state(m func(*SectorInfo)) *sectorUpdate {
return &sectorUpdate{
newState: u.newState,
id: u.id,
err: u.err,
mut: m,
}
return out
}
func (t *SectorInfo) rspco() sectorbuilder.RawSealPreCommitOutput {
var out sectorbuilder.RawSealPreCommitOutput
copy(out.CommC[:], t.CommC)
copy(out.CommD[:], t.CommD)
copy(out.CommR[:], t.CommR)
copy(out.CommRLast[:], t.CommRLast)
return out
func (u *sectorUpdate) to(newState api.SectorState) *sectorUpdate {
return &sectorUpdate{
newState: newState,
id: u.id,
err: u.err,
mut: u.mut,
}
}
func (m *Miner) sectorStateLoop(ctx context.Context) error {
@ -195,9 +134,7 @@ func (m *Miner) onSectorIncoming(sector *SectorInfo) {
}
if err := m.sectors.Begin(sector.SectorID, sector); err != nil {
// We may have re-sent the proposal
log.Errorf("deal tracking failed: %s", err)
m.failSector(sector.SectorID, err)
log.Errorf("sector tracking failed: %s", err)
return
}
@ -214,10 +151,15 @@ func (m *Miner) onSectorIncoming(sector *SectorInfo) {
}
func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
log.Infof("Sector %d updated state to %s", update.id, api.SectorStateStr(update.newState))
log.Infof("Sector %d updated state to %s", update.id, api.SectorStates[update.newState])
var sector SectorInfo
err := m.sectors.Mutate(update.id, func(s *SectorInfo) error {
s.State = update.newState
s.LastErr = ""
if update.err != nil {
s.LastErr = fmt.Sprintf("%+v", update.err)
}
if update.mut != nil {
update.mut(s)
}
@ -225,39 +167,80 @@ func (m *Miner) onSectorUpdated(ctx context.Context, update sectorUpdate) {
return nil
})
if update.err != nil {
log.Errorf("sector %d failed: %s", update.id, update.err)
m.failSector(update.id, update.err)
return
log.Errorf("sector %d failed: %+v", update.id, update.err)
}
if err != nil {
m.failSector(update.id, err)
log.Errorf("sector %d error: %+v", update.id, err)
return
}
/*
* Empty
| |
| v
*<- Packing <- incoming
| |
| v
*<- Unsealed <--> SealFailed
| |
| v
* PreCommitting <--> PreCommitFailed
| | ^
| v |
*<- PreCommitted ------/
| |
| v v--> SealCommitFailed
*<- Committing
| | ^--> CommitFailed
| v
*<- Proving
|
v
FailedUnrecoverable
UndefinedSectorState <- ¯\_()_/¯
| ^
*---------------------/
*/
switch update.newState {
// Happy path
case api.Packing:
m.handleSectorUpdate(ctx, sector, m.finishPacking, api.Unsealed)
m.handleSectorUpdate(ctx, sector, m.handlePacking)
case api.Unsealed:
m.handleSectorUpdate(ctx, sector, m.sealPreCommit, api.PreCommitting)
m.handleSectorUpdate(ctx, sector, m.handleUnsealed)
case api.PreCommitting:
m.handleSectorUpdate(ctx, sector, m.preCommit, api.PreCommitted)
m.handleSectorUpdate(ctx, sector, m.handlePreCommitting)
case api.PreCommitted:
m.handleSectorUpdate(ctx, sector, m.preCommitted, api.SectorNoUpdate)
m.handleSectorUpdate(ctx, sector, m.handlePreCommitted)
case api.Committing:
m.handleSectorUpdate(ctx, sector, m.committing, api.Proving)
m.handleSectorUpdate(ctx, sector, m.handleCommitting)
case api.Proving:
// TODO: track sector health / expiration
log.Infof("Proving sector %d", update.id)
case api.SectorNoUpdate: // noop
// Handled failure modes
case api.SealFailed:
log.Warn("sector %d entered unimplemented state 'SealFailed'", update.id)
case api.PreCommitFailed:
log.Warn("sector %d entered unimplemented state 'PreCommitFailed'", update.id)
case api.SealCommitFailed:
log.Warn("sector %d entered unimplemented state 'SealCommitFailed'", update.id)
case api.CommitFailed:
log.Warn("sector %d entered unimplemented state 'CommitFailed'", update.id)
// Fatal errors
case api.UndefinedSectorState:
log.Error("sector update with undefined state!")
case api.FailedUnrecoverable:
log.Errorf("sector %d failed unrecoverably", update.id)
default:
log.Errorf("unexpected sector update state: %d", update.newState)
}
}
func (m *Miner) failSector(id uint64, err error) {
log.Errorf("sector %d error: %+v", id, err)
}
func (m *Miner) AllocatePiece(size uint64) (sectorID uint64, offset uint64, err error) {
if padreader.PaddedSize(size) != size {
return 0, 0, xerrors.Errorf("cannot allocate unpadded piece")