2020-03-23 11:40:02 +00:00
package stores
import (
2020-03-23 22:43:38 +00:00
"encoding/json"
2020-03-23 11:40:02 +00:00
"io"
"net/http"
"os"
2021-05-18 07:32:30 +00:00
"strconv"
2020-03-23 11:40:02 +00:00
2021-05-18 07:32:30 +00:00
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
2020-03-23 11:40:02 +00:00
"github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
2020-09-30 17:32:19 +00:00
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
2020-08-17 13:26:18 +00:00
"github.com/filecoin-project/lotus/extern/sector-storage/tarutil"
2020-11-16 18:20:18 +00:00
"github.com/filecoin-project/specs-storage/storage"
2020-03-23 11:40:02 +00:00
)
var log = logging . Logger ( "stores" )
2021-05-19 13:50:48 +00:00
var _ partialFileHandler = & DefaultPartialFileHandler { }
// DefaultPartialFileHandler is the default implementation of the partialFileHandler interface.
// This is probably the only implementation we'll ever use because the purpose of the
// interface to is to mock out partial file related functionality during testing.
type DefaultPartialFileHandler struct { }
func ( d * DefaultPartialFileHandler ) OpenPartialFile ( maxPieceSize abi . PaddedPieceSize , path string ) ( * partialfile . PartialFile , error ) {
return partialfile . OpenPartialFile ( maxPieceSize , path )
}
func ( d * DefaultPartialFileHandler ) HasAllocated ( pf * partialfile . PartialFile , offset storiface . UnpaddedByteIndex , size abi . UnpaddedPieceSize ) ( bool , error ) {
return pf . HasAllocated ( offset , size )
}
2020-03-23 11:40:02 +00:00
type FetchHandler struct {
2021-05-19 13:50:48 +00:00
Local Store
PfHandler partialFileHandler
2020-03-23 11:40:02 +00:00
}
func ( handler * FetchHandler ) ServeHTTP ( w http . ResponseWriter , r * http . Request ) { // /remote/
mux := mux . NewRouter ( )
2020-03-23 22:43:38 +00:00
mux . HandleFunc ( "/remote/stat/{id}" , handler . remoteStatFs ) . Methods ( "GET" )
2020-03-23 11:40:02 +00:00
mux . HandleFunc ( "/remote/{type}/{id}" , handler . remoteGetSector ) . Methods ( "GET" )
mux . HandleFunc ( "/remote/{type}/{id}" , handler . remoteDeleteSector ) . Methods ( "DELETE" )
2021-05-18 07:32:30 +00:00
mux . HandleFunc ( "/remote/{type}/{id}/{spt}/allocated/{offset}/{size}" , handler . remoteGetAllocated ) . Methods ( "GET" )
2020-03-23 11:40:02 +00:00
mux . ServeHTTP ( w , r )
}
2020-03-23 22:43:38 +00:00
func ( handler * FetchHandler ) remoteStatFs ( w http . ResponseWriter , r * http . Request ) {
vars := mux . Vars ( r )
id := ID ( vars [ "id" ] )
2020-03-24 20:28:07 +00:00
st , err := handler . Local . FsStat ( r . Context ( ) , id )
2020-03-23 22:43:38 +00:00
switch err {
case errPathNotFound :
w . WriteHeader ( 404 )
return
case nil :
break
default :
w . WriteHeader ( 500 )
log . Errorf ( "%+v" , err )
return
}
if err := json . NewEncoder ( w ) . Encode ( & st ) ; err != nil {
log . Warnf ( "error writing stat response: %+v" , err )
}
}
2021-05-18 11:35:25 +00:00
// remoteGetSector returns the sector file/tared directory byte stream for the sectorID and sector file type sent in the request.
// returns an error if it does NOT have the required sector file/dir.
2020-03-23 11:40:02 +00:00
func ( handler * FetchHandler ) remoteGetSector ( w http . ResponseWriter , r * http . Request ) {
log . Infof ( "SERVE GET %s" , r . URL )
vars := mux . Vars ( r )
2020-09-06 16:54:00 +00:00
id , err := storiface . ParseSectorID ( vars [ "id" ] )
2020-03-23 11:40:02 +00:00
if err != nil {
2020-09-28 20:05:51 +00:00
log . Errorf ( "%+v" , err )
2020-03-23 11:40:02 +00:00
w . WriteHeader ( 500 )
return
}
ft , err := ftFromString ( vars [ "type" ] )
if err != nil {
2020-09-28 20:05:51 +00:00
log . Errorf ( "%+v" , err )
2020-03-24 23:37:40 +00:00
w . WriteHeader ( 500 )
2020-03-23 11:40:02 +00:00
return
}
2020-05-08 16:54:06 +00:00
2020-06-04 19:00:16 +00:00
// The caller has a lock on this sector already, no need to get one here
2020-05-08 16:54:06 +00:00
// passing 0 spt because we don't allocate anything
2020-11-04 20:29:08 +00:00
si := storage . SectorRef {
ID : id ,
ProofType : 0 ,
}
paths , _ , err := handler . Local . AcquireSector ( r . Context ( ) , si , ft , storiface . FTNone , storiface . PathStorage , storiface . AcquireMove )
2020-03-23 11:40:02 +00:00
if err != nil {
2021-05-19 13:50:48 +00:00
log . Errorf ( "AcquireSector: %+v" , err )
2020-03-24 23:37:40 +00:00
w . WriteHeader ( 500 )
2020-03-23 11:40:02 +00:00
return
}
2020-07-06 16:36:44 +00:00
// TODO: reserve local storage here
2020-09-06 16:54:00 +00:00
path := storiface . PathByType ( paths , ft )
2020-03-23 11:40:02 +00:00
if path == "" {
log . Error ( "acquired path was empty" )
w . WriteHeader ( 500 )
return
}
stat , err := os . Stat ( path )
if err != nil {
2021-05-19 13:50:48 +00:00
log . Errorf ( "os.Stat: %+v" , err )
2020-03-23 11:40:02 +00:00
w . WriteHeader ( 500 )
return
}
if stat . IsDir ( ) {
2021-05-18 07:32:30 +00:00
if _ , has := r . Header [ "Range" ] ; has {
log . Error ( "Range not supported on directories" )
w . WriteHeader ( 500 )
return
}
rd , err := tarutil . TarDirectory ( path )
if err != nil {
log . Errorf ( "%+v" , err )
w . WriteHeader ( 500 )
return
}
2020-03-23 11:40:02 +00:00
w . Header ( ) . Set ( "Content-Type" , "application/x-tar" )
2021-05-18 07:32:30 +00:00
w . WriteHeader ( 200 )
if _ , err := io . CopyBuffer ( w , rd , make ( [ ] byte , CopyBuf ) ) ; err != nil {
log . Errorf ( "%+v" , err )
return
}
2020-03-23 11:40:02 +00:00
} else {
w . Header ( ) . Set ( "Content-Type" , "application/octet-stream" )
2021-05-19 13:50:48 +00:00
w . WriteHeader ( 200 )
2021-05-18 11:35:25 +00:00
// will do a ranged read over the file at the given path if the caller has asked for a ranged read in the request headers.
2021-05-18 07:32:30 +00:00
http . ServeFile ( w , r , path )
2020-03-23 11:40:02 +00:00
}
2021-05-18 11:35:25 +00:00
log . Debugf ( "served sector file/dir, sectorID=%+v, fileType=%s, path=%s" , id , ft , path )
2021-05-18 07:32:30 +00:00
}
func ( handler * FetchHandler ) remoteDeleteSector ( w http . ResponseWriter , r * http . Request ) {
log . Infof ( "SERVE DELETE %s" , r . URL )
vars := mux . Vars ( r )
id , err := storiface . ParseSectorID ( vars [ "id" ] )
2020-03-23 11:40:02 +00:00
if err != nil {
2020-09-28 20:05:51 +00:00
log . Errorf ( "%+v" , err )
2020-03-23 11:40:02 +00:00
w . WriteHeader ( 500 )
return
}
2021-05-18 07:32:30 +00:00
ft , err := ftFromString ( vars [ "type" ] )
if err != nil {
log . Errorf ( "%+v" , err )
w . WriteHeader ( 500 )
return
2020-12-29 02:07:28 +00:00
}
2020-03-23 11:40:02 +00:00
2021-05-19 13:50:48 +00:00
if err := handler . Local . Remove ( r . Context ( ) , id , ft , false ) ; err != nil {
2020-09-28 20:05:51 +00:00
log . Errorf ( "%+v" , err )
2021-05-18 07:32:30 +00:00
w . WriteHeader ( 500 )
2020-03-23 11:40:02 +00:00
return
}
}
2021-05-18 11:35:25 +00:00
// remoteGetAllocated returns `http.StatusOK` if the worker already has an Unsealed sector file
// containing the Unsealed piece sent in the request.
// returns `http.StatusRequestedRangeNotSatisfiable` otherwise.
2021-05-18 07:32:30 +00:00
func ( handler * FetchHandler ) remoteGetAllocated ( w http . ResponseWriter , r * http . Request ) {
log . Infof ( "SERVE Alloc check %s" , r . URL )
2020-03-23 11:40:02 +00:00
vars := mux . Vars ( r )
2020-09-06 16:54:00 +00:00
id , err := storiface . ParseSectorID ( vars [ "id" ] )
2020-03-23 11:40:02 +00:00
if err != nil {
2021-05-19 13:50:48 +00:00
log . Errorf ( "parsing sectorID: %+v" , err )
2020-03-23 11:40:02 +00:00
w . WriteHeader ( 500 )
return
}
ft , err := ftFromString ( vars [ "type" ] )
if err != nil {
2021-05-19 13:50:48 +00:00
log . Errorf ( "ftFromString: %+v" , err )
2020-03-24 23:37:40 +00:00
w . WriteHeader ( 500 )
2020-03-23 11:40:02 +00:00
return
}
2021-05-18 07:32:30 +00:00
if ft != storiface . FTUnsealed {
log . Errorf ( "/allocated only supports unsealed sector files" )
w . WriteHeader ( 500 )
return
}
2020-03-23 11:40:02 +00:00
2021-05-18 07:32:30 +00:00
spti , err := strconv . ParseInt ( vars [ "spt" ] , 10 , 64 )
if err != nil {
log . Errorf ( "parsing spt: %+v" , err )
w . WriteHeader ( 500 )
return
}
spt := abi . RegisteredSealProof ( spti )
ssize , err := spt . SectorSize ( )
if err != nil {
2021-05-19 13:50:48 +00:00
log . Errorf ( "spt.SectorSize(): %+v" , err )
2021-05-18 07:32:30 +00:00
w . WriteHeader ( 500 )
return
}
offi , err := strconv . ParseInt ( vars [ "offset" ] , 10 , 64 )
if err != nil {
log . Errorf ( "parsing offset: %+v" , err )
w . WriteHeader ( 500 )
return
}
szi , err := strconv . ParseInt ( vars [ "size" ] , 10 , 64 )
if err != nil {
2021-05-19 13:50:48 +00:00
log . Errorf ( "parsing size: %+v" , err )
2021-05-18 07:32:30 +00:00
w . WriteHeader ( 500 )
return
}
// The caller has a lock on this sector already, no need to get one here
// passing 0 spt because we don't allocate anything
si := storage . SectorRef {
ID : id ,
ProofType : 0 ,
}
2021-05-18 11:35:25 +00:00
// get the path of the local Unsealed file for the given sector.
// return error if we do NOT have it.
2021-05-18 07:32:30 +00:00
paths , _ , err := handler . Local . AcquireSector ( r . Context ( ) , si , ft , storiface . FTNone , storiface . PathStorage , storiface . AcquireMove )
if err != nil {
2021-05-19 13:50:48 +00:00
log . Errorf ( "AcquireSector: %+v" , err )
2020-03-23 11:40:02 +00:00
w . WriteHeader ( 500 )
return
}
2021-05-18 07:32:30 +00:00
path := storiface . PathByType ( paths , ft )
if path == "" {
log . Error ( "acquired path was empty" )
w . WriteHeader ( 500 )
return
}
2021-05-18 11:35:25 +00:00
// open the Unsealed file and check if it has the Unsealed sector for the piece at the given offset and size.
2021-05-19 13:50:48 +00:00
pf , err := handler . PfHandler . OpenPartialFile ( abi . PaddedPieceSize ( ssize ) , path )
2021-05-18 07:32:30 +00:00
if err != nil {
log . Error ( "opening partial file: " , err )
w . WriteHeader ( 500 )
return
}
defer func ( ) {
if err := pf . Close ( ) ; err != nil {
2021-05-19 05:47:56 +00:00
log . Error ( "closing partial file: " , err )
2021-05-18 07:32:30 +00:00
}
} ( )
2021-05-19 13:50:48 +00:00
has , err := handler . PfHandler . HasAllocated ( pf , storiface . UnpaddedByteIndex ( offi ) , abi . UnpaddedPieceSize ( szi ) )
2021-05-18 07:32:30 +00:00
if err != nil {
log . Error ( "has allocated: " , err )
w . WriteHeader ( 500 )
return
}
if has {
2021-05-18 11:35:25 +00:00
log . Debugf ( "returning ok: worker has unsealed file with unsealed piece, sector:%+v, offset:%d, size:%d" , id , offi , szi )
2021-05-18 07:32:30 +00:00
w . WriteHeader ( http . StatusOK )
return
}
2021-05-18 11:35:25 +00:00
log . Debugf ( "returning StatusRequestedRangeNotSatisfiable: worker does NOT have unsealed file with unsealed piece, sector:%+v, offset:%d, size:%d" , id , offi , szi )
2021-05-18 07:32:30 +00:00
w . WriteHeader ( http . StatusRequestedRangeNotSatisfiable )
2020-03-23 11:40:02 +00:00
}
2020-09-06 16:54:00 +00:00
func ftFromString ( t string ) ( storiface . SectorFileType , error ) {
2020-03-23 11:40:02 +00:00
switch t {
2020-09-06 16:54:00 +00:00
case storiface . FTUnsealed . String ( ) :
return storiface . FTUnsealed , nil
case storiface . FTSealed . String ( ) :
return storiface . FTSealed , nil
case storiface . FTCache . String ( ) :
return storiface . FTCache , nil
2020-03-23 11:40:02 +00:00
default :
return 0 , xerrors . Errorf ( "unknown sector file type: '%s'" , t )
}
}