sealing: Address review

This commit is contained in:
Łukasz Magiera 2022-09-16 23:45:23 +02:00
parent a05593da5b
commit 859c2606f0
25 changed files with 71 additions and 53 deletions

View File

@ -563,11 +563,11 @@ type RemoteSectorMeta struct {
// Sector urls - lotus will use those for fetching files into local storage
// Required in all states
DataUnsealed *storiface.SectorData
DataUnsealed *storiface.SectorLocation
// Required in PreCommitting and later
DataSealed *storiface.SectorData
DataCache *storiface.SectorData
DataSealed *storiface.SectorLocation
DataCache *storiface.SectorLocation
////////
// SEALING SERVICE HOOKS

View File

@ -49,7 +49,7 @@ type Worker interface {
MoveStorage(ctx context.Context, sector storiface.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin
UnsealPiece(context.Context, storiface.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin
Fetch(context.Context, storiface.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) //perm:admin
DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) (storiface.CallID, error) //perm:admin
DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) (storiface.CallID, error) //perm:admin
GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error) //perm:admin
GenerateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness) (storiface.WindowPoStResult, error) //perm:admin

View File

@ -345,7 +345,7 @@ func init() {
"Authorization": []string{"Bearer ey.."},
})
addExample(map[storiface.SectorFileType]storiface.SectorData{
addExample(map[storiface.SectorFileType]storiface.SectorLocation{
storiface.FTSealed: {
Local: false,
URL: "https://example.com/sealingservice/sectors/s-f0123-12345",

View File

@ -955,7 +955,7 @@ type WorkerStruct struct {
DataCid func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storiface.Data) (storiface.CallID, error) `perm:"admin"`
DownloadSectorData func(p0 context.Context, p1 storiface.SectorRef, p2 bool, p3 map[storiface.SectorFileType]storiface.SectorData) (storiface.CallID, error) `perm:"admin"`
DownloadSectorData func(p0 context.Context, p1 storiface.SectorRef, p2 bool, p3 map[storiface.SectorFileType]storiface.SectorLocation) (storiface.CallID, error) `perm:"admin"`
Enabled func(p0 context.Context) (bool, error) `perm:"admin"`
@ -5551,14 +5551,14 @@ func (s *WorkerStub) DataCid(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 st
return *new(storiface.CallID), ErrNotSupported
}
func (s *WorkerStruct) DownloadSectorData(p0 context.Context, p1 storiface.SectorRef, p2 bool, p3 map[storiface.SectorFileType]storiface.SectorData) (storiface.CallID, error) {
func (s *WorkerStruct) DownloadSectorData(p0 context.Context, p1 storiface.SectorRef, p2 bool, p3 map[storiface.SectorFileType]storiface.SectorLocation) (storiface.CallID, error) {
if s.Internal.DownloadSectorData == nil {
return *new(storiface.CallID), ErrNotSupported
}
return s.Internal.DownloadSectorData(p0, p1, p2, p3)
}
func (s *WorkerStub) DownloadSectorData(p0 context.Context, p1 storiface.SectorRef, p2 bool, p3 map[storiface.SectorFileType]storiface.SectorData) (storiface.CallID, error) {
func (s *WorkerStub) DownloadSectorData(p0 context.Context, p1 storiface.SectorRef, p2 bool, p3 map[storiface.SectorFileType]storiface.SectorLocation) (storiface.CallID, error) {
return *new(storiface.CallID), ErrNotSupported
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -104,7 +104,7 @@ func main() {
err = gen.WriteMapEncodersToFile("./storage/sealer/storiface/cbor_gen.go", "storiface",
storiface.CallID{},
storiface.SecDataHttpHeader{},
storiface.SectorData{},
storiface.SectorLocation{},
)
if err != nil {
fmt.Println(err)

View File

@ -113,7 +113,7 @@ func TestSectorImport(t *testing.T) {
})
require.NoError(t, err)
// CRETE THE UNSEALED FILE
// CREATE THE UNSEALED FILE
// create a reader for all-zero (CC) data
dataReader := bytes.NewReader(bytes.Repeat([]byte{0}, int(pieceSize.Unpadded())))
@ -194,15 +194,15 @@ func TestSectorImport(t *testing.T) {
CommD: &scids.Unsealed,
CommR: &scids.Sealed,
DataUnsealed: &storiface.SectorData{
DataUnsealed: &storiface.SectorLocation{
Local: false,
URL: unsealedURL,
},
DataSealed: &storiface.SectorData{
DataSealed: &storiface.SectorLocation{
Local: false,
URL: sealedURL,
},
DataCache: &storiface.SectorData{
DataCache: &storiface.SectorLocation{
Local: false,
URL: cacheURL,
},

View File

@ -87,7 +87,7 @@ func TestSectorImportAfterPC2(t *testing.T) {
})
require.NoError(t, err)
// CRETE THE UNSEALED FILE
// CREATE THE UNSEALED FILE
// create a reader for all-zero (CC) data
dataReader := bytes.NewReader(bytes.Repeat([]byte{0}, int(pieceSize.Unpadded())))
@ -169,15 +169,15 @@ func TestSectorImportAfterPC2(t *testing.T) {
CommD: &scids.Unsealed,
CommR: &scids.Sealed,
DataUnsealed: &storiface.SectorData{
DataUnsealed: &storiface.SectorLocation{
Local: false,
URL: unsealedURL,
},
DataSealed: &storiface.SectorData{
DataSealed: &storiface.SectorLocation{
Local: false,
URL: sealedURL,
},
DataCache: &storiface.SectorData{
DataCache: &storiface.SectorLocation{
Local: false,
URL: cacheURL,
},

View File

@ -797,7 +797,7 @@ This parameter is ONLY applicable if the retrieval pricing policy strategy has b
Comment: ``,
},
{
Num: "AllowSectorDownload",
Name: "AllowSectorDownload",
Type: "bool",
Comment: ``,

View File

@ -72,6 +72,7 @@ func fetch(ctx context.Context, url, outname string, header http.Header) (rerr e
}
// FetchWithTemp fetches data into a temp 'fetching' directory, then moves the file to destination
// The set of URLs must refer to the same object, if one fails, another one will be tried.
func FetchWithTemp(ctx context.Context, urls []string, dest string, header http.Header) (string, error) {
var merr error
for _, url := range urls {

View File

@ -31,7 +31,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
cw := cbg.NewCborWriter(w)
if _, err := cw.Write([]byte{184, 39}); err != nil {
if _, err := cw.Write([]byte{184, 38}); err != nil {
return err
}
@ -655,7 +655,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
}
}
// t.RemoteDataUnsealed (storiface.SectorData) (struct)
// t.RemoteDataUnsealed (storiface.SectorLocation) (struct)
if len("RemoteDataUnsealed") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"RemoteDataUnsealed\" was too long")
}
@ -671,7 +671,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
return err
}
// t.RemoteDataSealed (storiface.SectorData) (struct)
// t.RemoteDataSealed (storiface.SectorLocation) (struct)
if len("RemoteDataSealed") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"RemoteDataSealed\" was too long")
}
@ -687,7 +687,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
return err
}
// t.RemoteDataCache (storiface.SectorData) (struct)
// t.RemoteDataCache (storiface.SectorLocation) (struct)
if len("RemoteDataCache") > cbg.MaxLength {
return xerrors.Errorf("Value in field \"RemoteDataCache\" was too long")
}
@ -1488,7 +1488,7 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) (err error) {
t.TerminatedAt = abi.ChainEpoch(extraI)
}
// t.RemoteDataUnsealed (storiface.SectorData) (struct)
// t.RemoteDataUnsealed (storiface.SectorLocation) (struct)
case "RemoteDataUnsealed":
{
@ -1501,14 +1501,14 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) (err error) {
if err := cr.UnreadByte(); err != nil {
return err
}
t.RemoteDataUnsealed = new(storiface.SectorData)
t.RemoteDataUnsealed = new(storiface.SectorLocation)
if err := t.RemoteDataUnsealed.UnmarshalCBOR(cr); err != nil {
return xerrors.Errorf("unmarshaling t.RemoteDataUnsealed pointer: %w", err)
}
}
}
// t.RemoteDataSealed (storiface.SectorData) (struct)
// t.RemoteDataSealed (storiface.SectorLocation) (struct)
case "RemoteDataSealed":
{
@ -1521,14 +1521,14 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) (err error) {
if err := cr.UnreadByte(); err != nil {
return err
}
t.RemoteDataSealed = new(storiface.SectorData)
t.RemoteDataSealed = new(storiface.SectorLocation)
if err := t.RemoteDataSealed.UnmarshalCBOR(cr); err != nil {
return xerrors.Errorf("unmarshaling t.RemoteDataSealed pointer: %w", err)
}
}
}
// t.RemoteDataCache (storiface.SectorData) (struct)
// t.RemoteDataCache (storiface.SectorLocation) (struct)
case "RemoteDataCache":
{
@ -1541,7 +1541,7 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) (err error) {
if err := cr.UnreadByte(); err != nil {
return err
}
t.RemoteDataCache = new(storiface.SectorData)
t.RemoteDataCache = new(storiface.SectorLocation)
if err := t.RemoteDataCache.UnmarshalCBOR(cr); err != nil {
return xerrors.Errorf("unmarshaling t.RemoteDataCache pointer: %w", err)
}

View File

@ -91,8 +91,13 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta
switch SectorState(meta.State) {
case Proving, Available:
// todo possibly check
info.CommitMessage = meta.CommitMessage
if meta.CommitMessage != nil {
if err := checkMessagePrefix(*meta.CommitMessage); err != nil {
return SectorInfo{}, xerrors.Errorf("commit message prefix: %w", err)
}
info.CommitMessage = meta.CommitMessage
}
fallthrough
case SubmitCommit:
@ -100,10 +105,14 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta
return SectorInfo{}, xerrors.Errorf("sector PreCommitDeposit was null")
}
info.PreCommitInfo = meta.PreCommitInfo
info.PreCommitDeposit = *meta.PreCommitDeposit
info.PreCommitMessage = meta.PreCommitMessage
info.PreCommitTipSet = meta.PreCommitTipSet
if info.PreCommitMessage != nil {
if err := checkMessagePrefix(*meta.PreCommitMessage); err != nil {
return SectorInfo{}, xerrors.Errorf("commit message prefix: %w", err)
}
info.PreCommitMessage = meta.PreCommitMessage
}
// check provided seed
if len(meta.SeedValue) != abi.RandomnessLength {
@ -256,9 +265,9 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta
}
func (m *Sealing) handleReceiveSector(ctx statemachine.Context, sector SectorInfo) error {
toFetch := map[storiface.SectorFileType]storiface.SectorData{}
toFetch := map[storiface.SectorFileType]storiface.SectorLocation{}
for fileType, data := range map[storiface.SectorFileType]*storiface.SectorData{
for fileType, data := range map[storiface.SectorFileType]*storiface.SectorLocation{
storiface.FTUnsealed: sector.RemoteDataUnsealed,
storiface.FTSealed: sector.RemoteDataSealed,
storiface.FTCache: sector.RemoteDataCache,
@ -285,3 +294,11 @@ func (m *Sealing) handleReceiveSector(ctx statemachine.Context, sector SectorInf
return ctx.Send(SectorReceived{})
}
func checkMessagePrefix(c cid.Cid) error {
p := c.Prefix()
if p.Version != 1 || p.MhLength != 32 || p.MhType != multihash.BLAKE2B_MIN+31 || p.Codec != cid.DagCBOR {
return xerrors.New("invalid message prefix")
}
return nil
}

View File

@ -94,9 +94,9 @@ type SectorInfo struct {
TerminatedAt abi.ChainEpoch
// Remote import
RemoteDataUnsealed *storiface.SectorData
RemoteDataSealed *storiface.SectorData
RemoteDataCache *storiface.SectorData
RemoteDataUnsealed *storiface.SectorLocation
RemoteDataSealed *storiface.SectorLocation
RemoteDataCache *storiface.SectorLocation
RemoteCommit1Endpoint string
RemoteCommit2Endpoint string
RemoteSealingDoneEndpoint string

View File

@ -1169,7 +1169,7 @@ func (sb *Sealer) Remove(ctx context.Context, sector storiface.SectorRef) error
return xerrors.Errorf("not supported at this layer") // happens in localworker
}
func (sb *Sealer) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) error {
func (sb *Sealer) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) error {
var todo storiface.SectorFileType
for fileType := range src {
todo |= fileType

View File

@ -1089,7 +1089,7 @@ func (m *Manager) ProveReplicaUpdate2(ctx context.Context, sector storiface.Sect
return out, waitErr
}
func (m *Manager) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) error {
func (m *Manager) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -1098,7 +1098,7 @@ func (m *Manager) DownloadSectorData(ctx context.Context, sector storiface.Secto
// get a sorted list of sectors files to make a consistent work key from
ents := make([]struct {
T storiface.SectorFileType
S storiface.SectorData
S storiface.SectorLocation
}, 0, len(src))
for fileType, data := range src {
if len(fileType.AllSet()) != 1 {
@ -1109,7 +1109,7 @@ func (m *Manager) DownloadSectorData(ctx context.Context, sector storiface.Secto
ents = append(ents, struct {
T storiface.SectorFileType
S storiface.SectorData
S storiface.SectorLocation
}{T: fileType, S: data})
}
sort.Slice(ents, func(i, j int) bool {

View File

@ -517,7 +517,7 @@ func (mgr *SectorMgr) ReleaseSectorKey(ctx context.Context, sector storiface.Sec
return nil
}
func (mgr *SectorMgr) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) error {
func (mgr *SectorMgr) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) error {
return xerrors.Errorf("not supported")
}

View File

@ -67,7 +67,7 @@ type schedTestWorker struct {
ignoreResources bool
}
func (s *schedTestWorker) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) (storiface.CallID, error) {
func (s *schedTestWorker) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) (storiface.CallID, error) {
panic("implement me")
}

View File

@ -282,7 +282,7 @@ func (t *SecDataHttpHeader) UnmarshalCBOR(r io.Reader) (err error) {
return nil
}
func (t *SectorData) MarshalCBOR(w io.Writer) error {
func (t *SectorLocation) MarshalCBOR(w io.Writer) error {
if t == nil {
_, err := w.Write(cbg.CborNull)
return err
@ -360,8 +360,8 @@ func (t *SectorData) MarshalCBOR(w io.Writer) error {
return nil
}
func (t *SectorData) UnmarshalCBOR(r io.Reader) (err error) {
*t = SectorData{}
func (t *SectorLocation) UnmarshalCBOR(r io.Reader) (err error) {
*t = SectorLocation{}
cr := cbg.NewCborReader(r)
@ -380,7 +380,7 @@ func (t *SectorData) UnmarshalCBOR(r io.Reader) (err error) {
}
if extra > cbg.MaxLength {
return fmt.Errorf("SectorData: map struct too large (%d)", extra)
return fmt.Errorf("SectorLocation: map struct too large (%d)", extra)
}
var name string

View File

@ -87,7 +87,7 @@ type Sealer interface {
FinalizeReplicaUpdate(ctx context.Context, sector SectorRef, keepUnsealed []Range) error
DownloadSectorData(ctx context.Context, sector SectorRef, finalized bool, src map[SectorFileType]SectorData) error
DownloadSectorData(ctx context.Context, sector SectorRef, finalized bool, src map[SectorFileType]SectorLocation) error
}
type Unsealer interface {
@ -123,7 +123,7 @@ type Prover interface {
AggregateSealProofs(aggregateInfo proof.AggregateSealVerifyProofAndInfos, proofs [][]byte) ([]byte, error)
}
type SectorData struct {
type SectorLocation struct {
// Local when set to true indicates to lotus that sector data is already
// available locally; When set lotus will skip fetching sector data, and
// only check that sector data exists in sector storage
@ -140,7 +140,7 @@ type SectorData struct {
Headers []SecDataHttpHeader
}
func (sd *SectorData) HttpHeaders() http.Header {
func (sd *SectorLocation) HttpHeaders() http.Header {
out := http.Header{}
for _, header := range sd.Headers {
out[header.Key] = append(out[header.Key], header.Value)

View File

@ -134,7 +134,7 @@ type WorkerCalls interface {
MoveStorage(ctx context.Context, sector SectorRef, types SectorFileType) (CallID, error)
UnsealPiece(context.Context, SectorRef, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (CallID, error)
Fetch(context.Context, SectorRef, SectorFileType, PathType, AcquireMode) (CallID, error)
DownloadSectorData(ctx context.Context, sector SectorRef, finalized bool, src map[SectorFileType]SectorData) (CallID, error)
DownloadSectorData(ctx context.Context, sector SectorRef, finalized bool, src map[SectorFileType]SectorLocation) (CallID, error)
// sync
GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error)

View File

@ -21,7 +21,7 @@ type testExec struct {
apch chan chan apres
}
func (t *testExec) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) error {
func (t *testExec) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) error {
panic("implement me")
}

View File

@ -588,7 +588,7 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storiface.SectorRe
})
}
func (l *LocalWorker) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) (storiface.CallID, error) {
func (l *LocalWorker) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err