lotus/storage/pipeline/numassign.go

345 lines
9.1 KiB
Go
Raw Normal View History

package sealing
2022-08-18 14:37:22 +00:00
import (
"bytes"
"context"
"encoding/binary"
"regexp"
"github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-bitfield"
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
2022-08-18 14:37:22 +00:00
)
var StorageCounterDSPrefix = "/storage/nextid"
var SectorBitfieldsDSPrefix = "/storage/sectorsids/"
var SectorReservationsDSPrefix = "/storage/sectorsids/reserved/"
var allocatedSectorsKey = datastore.NewKey(SectorBitfieldsDSPrefix + "allocated")
var reservedSectorsKey = datastore.NewKey(SectorBitfieldsDSPrefix + "reserved")
func (m *Sealing) loadBitField(ctx context.Context, name datastore.Key) (*bitfield.BitField, error) {
raw, err := m.ds.Get(ctx, name)
if err == datastore.ErrNotFound {
return nil, nil
}
if err != nil {
return nil, err
}
var bf bitfield.BitField
if err := bf.UnmarshalCBOR(bytes.NewBuffer(raw)); err != nil {
return nil, err
}
return &bf, nil
}
func (m *Sealing) saveBitField(ctx context.Context, name datastore.Key, bf bitfield.BitField) error {
var bb bytes.Buffer
err := bf.MarshalCBOR(&bb)
if err != nil {
return err
}
return m.ds.Put(ctx, name, bb.Bytes())
}
func (m *Sealing) NextSectorNumber(ctx context.Context) (abi.SectorNumber, error) {
m.sclk.Lock()
defer m.sclk.Unlock()
am, err := m.numAssignerMetaLocked(ctx)
if err != nil {
return 0, err
}
allocated := am.Allocated
allocated.Set(uint64(am.Next))
if err := m.saveBitField(ctx, allocatedSectorsKey, allocated); err != nil {
return 0, err
}
// save legacy counter so that in case of a miner downgrade things keep working
{
buf := make([]byte, binary.MaxVarintLen64)
size := binary.PutUvarint(buf, uint64(am.Next))
if err := m.ds.Put(ctx, datastore.NewKey(StorageCounterDSPrefix), buf[:size]); err != nil {
return 0, err
}
}
return am.Next, nil
}
func (m *Sealing) NumAssignerMeta(ctx context.Context) (api.NumAssignerMeta, error) {
m.sclk.Lock()
defer m.sclk.Unlock()
return m.numAssignerMetaLocked(ctx)
}
func (m *Sealing) numAssignerMetaLocked(ctx context.Context) (api.NumAssignerMeta, error) {
// load user-reserved and allocated bitfields
reserved, err := m.loadBitField(ctx, reservedSectorsKey)
if err != nil {
return api.NumAssignerMeta{}, xerrors.Errorf("loading reserved sectors bitfield: %w", err)
}
allocated, err := m.loadBitField(ctx, allocatedSectorsKey)
if err != nil {
return api.NumAssignerMeta{}, xerrors.Errorf("loading allocated sectors bitfield: %w", err)
}
// if the allocated bitfield doesn't exist, crate it from the legacy sector counter
if allocated == nil {
var i uint64
{
curBytes, err := m.ds.Get(ctx, datastore.NewKey(StorageCounterDSPrefix))
if err != nil {
return api.NumAssignerMeta{}, err
}
cur, _ := binary.Uvarint(curBytes)
i = cur + 1
}
rl := &rlepluslazy.RunSliceIterator{Runs: []rlepluslazy.Run{
{
Val: true,
Len: i,
},
}}
bf, err := bitfield.NewFromIter(rl)
if err != nil {
return api.NumAssignerMeta{}, xerrors.Errorf("bitfield from iter: %w", err)
}
allocated = &bf
}
// if there are no user reservations, use an empty bitfield
if reserved == nil {
emptyBf := bitfield.New()
reserved = &emptyBf
}
// todo union with miner allocated nums
inuse, err := bitfield.MergeBitFields(*reserved, *allocated)
if err != nil {
return api.NumAssignerMeta{}, xerrors.Errorf("merge reserved/allocated: %w", err)
}
stateAllocated, err := m.Api.StateMinerAllocated(ctx, m.maddr, types.EmptyTSK)
if err != nil || stateAllocated == nil {
return api.NumAssignerMeta{}, xerrors.Errorf("getting state-allocated sector numbers: %w", err)
}
inuse, err = bitfield.MergeBitFields(inuse, *stateAllocated)
if err != nil {
return api.NumAssignerMeta{}, xerrors.Errorf("merge inuse/stateAllocated: %w", err)
}
2022-08-18 14:37:22 +00:00
// find first available sector number
iri, err := inuse.RunIterator()
if err != nil {
return api.NumAssignerMeta{}, xerrors.Errorf("inuse run iter: %w", err)
}
var firstFree abi.SectorNumber
for iri.HasNext() {
r, err := iri.NextRun()
if err != nil {
return api.NumAssignerMeta{}, xerrors.Errorf("next run: %w", err)
}
if !r.Val {
break
}
firstFree += abi.SectorNumber(r.Len)
}
return api.NumAssignerMeta{
Reserved: *reserved,
Allocated: *allocated,
Next: firstFree,
}, nil
}
// NumReservations returns a list of named sector reservations
2022-08-18 14:37:22 +00:00
func (m *Sealing) NumReservations(ctx context.Context) (map[string]bitfield.BitField, error) {
res, err := m.ds.Query(ctx, dsq.Query{Prefix: SectorReservationsDSPrefix})
if err != nil {
return nil, xerrors.Errorf("query reservations: %w", err)
}
defer res.Close() //nolint:errcheck
out := map[string]bitfield.BitField{}
for {
res, ok := res.NextSync()
if !ok {
break
}
if res.Error != nil {
return nil, xerrors.Errorf("res error: %w", err)
}
b := bitfield.New()
if err := b.UnmarshalCBOR(bytes.NewReader(res.Value)); err != nil {
return nil, err
}
out[datastore.NewKey(res.Key).BaseNamespace()] = b
}
return out, nil
}
// NumReserve creates a new sector reservation
2022-08-18 14:37:22 +00:00
func (m *Sealing) NumReserve(ctx context.Context, name string, reserving bitfield.BitField, force bool) error {
m.sclk.Lock()
defer m.sclk.Unlock()
rk, err := reservationKey(name)
if err != nil {
return err
}
// check if there's an existing reservation with this name
cur := bitfield.New()
curRes, err := m.ds.Get(ctx, rk)
if err == nil {
log.Warnw("reservation with this name already exists", "name", name)
if !force {
return xerrors.Errorf("reservation with this name already exists")
}
if err := cur.UnmarshalCBOR(bytes.NewReader(curRes)); err != nil {
return xerrors.Errorf("unmarshaling existing reservation: %w", err)
}
2022-08-22 20:20:40 +00:00
} else if err != datastore.ErrNotFound {
2022-08-18 14:37:22 +00:00
return xerrors.Errorf("checking if reservation exists: %w", err)
}
// load the global reserved bitfield and subtract current reservation if we're overwriting
nm, err := m.numAssignerMetaLocked(ctx)
if err != nil {
return err
}
allReserved := nm.Reserved
allReserved, err = bitfield.SubtractBitField(allReserved, cur)
if err != nil {
return xerrors.Errorf("allReserved - cur: %w", err)
}
// check if the reservation is colliding with any other reservation
coliding, err := bitfield.IntersectBitField(allReserved, reserving)
if err != nil {
return xerrors.Errorf("intersect all / reserving: %w", err)
}
empty, err := coliding.IsEmpty()
if err != nil {
return xerrors.Errorf("colliding.empty: %w", err)
}
if !empty {
log.Warnw("new reservation is colliding with another existing reservation", "name", name)
if !force {
return xerrors.Errorf("new reservation is colliding with another existing reservation")
}
}
// check if the reservation is colliding with allocated sectors
coliding, err = bitfield.IntersectBitField(nm.Allocated, reserving)
if err != nil {
return xerrors.Errorf("intersect all / reserving: %w", err)
}
empty, err = coliding.IsEmpty()
if err != nil {
return xerrors.Errorf("colliding.empty: %w", err)
}
if !empty {
log.Warnw("new reservation is colliding with allocated sector numbers", "name", name)
if !force {
return xerrors.Errorf("new reservation is colliding with allocated sector numbers")
}
}
// write the reservation
newReserved, err := bitfield.MergeBitFields(allReserved, reserving)
if err != nil {
return xerrors.Errorf("merge allReserved+reserving: %w", err)
}
if err := m.saveBitField(ctx, rk, reserving); err != nil {
return xerrors.Errorf("saving reservation: %w", err)
}
if err := m.saveBitField(ctx, reservedSectorsKey, newReserved); err != nil {
return xerrors.Errorf("save reserved nums: %w", err)
}
return nil
}
// NumFree removes a named sector reservation
2022-08-18 14:37:22 +00:00
func (m *Sealing) NumFree(ctx context.Context, name string) error {
rk, err := reservationKey(name)
if err != nil {
return err
}
res, err := m.loadBitField(ctx, rk)
if err != nil {
return xerrors.Errorf("loading reservation: %w", err)
}
if res == nil {
return xerrors.Errorf("reservation with this name doesn't exist")
}
allRes, err := m.loadBitField(ctx, reservedSectorsKey)
if err != nil {
return xerrors.Errorf("leading all reservations: %w", err)
}
if allRes == nil {
return xerrors.Errorf("all reservations bitfield not found")
}
newAll, err := bitfield.SubtractBitField(*allRes, *res)
if err != nil {
return xerrors.Errorf("allRes-res: %w", err)
}
if err := m.saveBitField(ctx, reservedSectorsKey, newAll); err != nil {
return xerrors.Errorf("saving reservations bitfield: %w", err)
}
if err := m.ds.Delete(ctx, rk); err != nil {
return xerrors.Errorf("deleting reservation: %w", err)
}
return nil
}
func reservationKey(name string) (datastore.Key, error) {
2022-08-22 20:20:40 +00:00
ok, err := regexp.Match("^[a-zA-Z0-9_\\-]+$", []byte(name))
2022-08-18 14:37:22 +00:00
if err != nil {
return datastore.Key{}, err
}
if !ok {
return datastore.Key{}, xerrors.Errorf("reservation name contained disallowed characters (allowed: a-z, A-Z, 0-9, '-', '_')")
}
return datastore.KeyWithNamespaces([]string{SectorReservationsDSPrefix, name}), nil
}