lotus/storage/sector/store.go

272 lines
6.0 KiB
Go
Raw Normal View History

2019-08-14 20:27:10 +00:00
package sector
import (
"context"
"fmt"
2019-08-14 20:27:10 +00:00
"io"
"sync"
2019-08-14 21:33:52 +00:00
"github.com/filecoin-project/go-sectorbuilder/sealing_state"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/sectorbuilder"
"github.com/filecoin-project/lotus/node/modules/dtypes"
2019-08-14 20:27:10 +00:00
)
func init() {
cbor.RegisterCborType(dealMapping{})
}
2019-08-14 21:33:52 +00:00
var log = logging.Logger("sectorstore")
var sectorDealsPrefix = datastore.NewKey("/sectordeals")
type dealMapping struct {
DealIDs []uint64
Committed bool
}
type TicketFn func(context.Context) (*sectorbuilder.SealTicket, error)
2019-10-27 08:56:53 +00:00
2019-08-14 20:27:10 +00:00
// TODO: eventually handle sector storage here instead of in rust-sectorbuilder
type Store struct {
waitingLk sync.Mutex
sb *sectorbuilder.SectorBuilder
2019-10-27 08:56:53 +00:00
tktFn TicketFn
2019-08-14 20:27:10 +00:00
dealsLk sync.Mutex
deals datastore.Datastore
2019-08-14 20:27:10 +00:00
waiting map[uint64]chan struct{}
incoming []chan sectorbuilder.SectorSealingStatus
// TODO: outdated chan
2019-08-14 22:17:27 +00:00
closeCh chan struct{}
2019-08-14 20:27:10 +00:00
}
2019-10-27 08:56:53 +00:00
func NewStore(sb *sectorbuilder.SectorBuilder, ds dtypes.MetadataDS, tktFn TicketFn) *Store {
2019-08-14 20:27:10 +00:00
return &Store{
sb: sb,
tktFn: tktFn,
deals: namespace.Wrap(ds, sectorDealsPrefix),
2019-08-14 20:27:10 +00:00
waiting: map[uint64]chan struct{}{},
2019-08-14 22:17:27 +00:00
closeCh: make(chan struct{}),
2019-08-14 20:27:10 +00:00
}
}
func (s *Store) restartSealing() {
sectors, err := s.sb.GetAllStagedSectors()
if err != nil {
return
}
for _, sid := range sectors {
status, err := s.sb.SealStatus(sid)
if err != nil {
return
}
2019-10-30 18:10:29 +00:00
if status.State != sealing_state.CommittingPaused { // TODO: Also handle PreCommit!
continue
}
log.Infof("Sector %d is in paused state, resuming sealing", sid)
go func() {
// TODO: when we refactor wait-for-seal below, care about this output too
// (see SealSector below)
2019-10-30 18:10:29 +00:00
_, err := s.sb.ResumeSealCommit(sid)
if err != nil {
return
}
}()
}
}
func (s *Store) SectorStatus(sid uint64) (*sectorbuilder.SectorSealingStatus, error) {
status, err := s.sb.SealStatus(sid)
if err != nil {
return nil, err
}
return &status, nil
}
2019-10-27 08:56:53 +00:00
func (s *Store) AddPiece(ref string, size uint64, r io.Reader, dealIDs ...uint64) (sectorID uint64, err error) {
2019-09-23 10:50:28 +00:00
sectorID, err = s.sb.AddPiece(ref, size, r)
2019-08-14 20:27:10 +00:00
if err != nil {
return 0, err
}
2019-08-14 21:33:52 +00:00
s.waitingLk.Lock()
2019-08-14 20:27:10 +00:00
_, exists := s.waiting[sectorID]
2019-08-14 21:33:52 +00:00
if !exists { // pieces can share sectors
2019-08-14 20:27:10 +00:00
s.waiting[sectorID] = make(chan struct{})
}
s.waitingLk.Unlock()
s.dealsLk.Lock()
defer s.dealsLk.Unlock()
k := datastore.NewKey(fmt.Sprint(sectorID))
e, err := s.deals.Get(k)
var deals dealMapping
switch err {
case nil:
if err := cbor.DecodeInto(e, &deals); err != nil {
return 0, err
}
if deals.Committed {
return 0, xerrors.Errorf("sector %d already committed", sectorID)
}
fallthrough
case datastore.ErrNotFound:
2019-10-27 08:56:53 +00:00
deals.DealIDs = append(deals.DealIDs, dealIDs...)
d, err := cbor.DumpObject(&deals)
if err != nil {
return 0, err
}
if err := s.deals.Put(k, d); err != nil {
return 0, err
}
default:
return 0, err
}
2019-08-14 21:33:52 +00:00
2019-08-14 20:27:10 +00:00
return sectorID, nil
}
func (s *Store) DealsForCommit(sectorID uint64) ([]uint64, error) {
s.dealsLk.Lock()
defer s.dealsLk.Unlock()
k := datastore.NewKey(fmt.Sprint(sectorID))
e, err := s.deals.Get(k)
switch err {
case nil:
var deals dealMapping
if err := cbor.DecodeInto(e, &deals); err != nil {
return nil, err
}
if deals.Committed {
log.Errorf("getting deal IDs for sector %d: sector already marked as committed", sectorID)
}
deals.Committed = true
d, err := cbor.DumpObject(&deals)
if err != nil {
return nil, err
}
if err := s.deals.Put(k, d); err != nil {
return nil, err
}
return deals.DealIDs, nil
case datastore.ErrNotFound:
log.Errorf("getting deal IDs for sector %d failed: %s", err)
return []uint64{}, nil
default:
return nil, err
}
}
func (s *Store) SealPreCommit(ctx context.Context, sectorID uint64) error {
2019-10-27 08:56:53 +00:00
tkt, err := s.tktFn(ctx)
if err != nil {
return err
}
// TODO: That's not async, is it?
// - If not then we probably can drop this wait-for-seal hack below
_, err = s.sb.SealPreCommit(sectorID, *tkt)
2019-10-27 08:56:53 +00:00
if err != nil {
return err
}
return nil
}
func (s *Store) SealComputeProof(ctx context.Context, sectorID uint64, height uint64, rand []byte) ([]byte, error) {
var tick [32]byte
copy(tick[:], rand)
sco, err := s.sb.SealCommit(sectorID, sectorbuilder.SealSeed{
BlockHeight: height,
TicketBytes: tick,
})
if err != nil {
return nil, err
}
return sco.Proof, nil
}
2019-08-14 20:27:10 +00:00
func (s *Store) WaitSeal(ctx context.Context, sector uint64) (sectorbuilder.SectorSealingStatus, error) {
s.waitingLk.Lock()
2019-08-14 20:27:10 +00:00
watch, ok := s.waiting[sector]
s.waitingLk.Unlock()
2019-08-14 20:27:10 +00:00
if ok {
select {
case <-watch:
case <-ctx.Done():
return sectorbuilder.SectorSealingStatus{}, ctx.Err()
}
}
return s.sb.SealStatus(sector)
}
2019-10-30 18:10:29 +00:00
func (s *Store) Commited() ([]sectorbuilder.SectorSealingStatus, error) {
l, err := s.sb.GetAllStagedSectors()
if err != nil {
return nil, err
}
out := make([]sectorbuilder.SectorSealingStatus, 0)
for _, sid := range l {
status, err := s.sb.SealStatus(sid)
if err != nil {
return nil, err
}
2019-10-30 18:10:29 +00:00
if status.State != sealing_state.Committed {
continue
}
out = append(out, status)
}
return out, nil
}
2019-09-18 03:32:52 +00:00
func (s *Store) RunPoSt(ctx context.Context, sectors []*api.SectorInfo, r []byte, faults []uint64) ([]byte, error) {
2019-09-19 16:17:49 +00:00
sbsi := make([]sectorbuilder.SectorInfo, len(sectors))
for k, sector := range sectors {
var commR [sectorbuilder.CommLen]byte
if copy(commR[:], sector.CommR) != sectorbuilder.CommLen {
return nil, xerrors.Errorf("commR too short, %d bytes", len(sector.CommR))
}
sbsi[k] = sectorbuilder.SectorInfo{
SectorID: sector.SectorID,
CommR: commR,
}
}
ssi := sectorbuilder.NewSortedSectorInfo(sbsi)
var seed [sectorbuilder.CommLen]byte
if copy(seed[:], r) != sectorbuilder.CommLen {
return nil, xerrors.Errorf("random seed too short, %d bytes", len(r))
}
return s.sb.GeneratePoSt(ssi, seed, faults)
}
2019-08-14 20:27:10 +00:00
func (s *Store) Stop() {
2019-08-14 22:17:27 +00:00
close(s.closeCh)
2019-08-14 20:27:10 +00:00
}