@ -18,6 +18,7 @@
# include <unistd.h>
# include <unistd.h>
# include <atomic>
# include <atomic>
# include <cstring>
# include <deque>
# include <deque>
# include <limits>
# include <limits>
# include <set>
# include <set>
@ -44,7 +45,7 @@ namespace {
static int open_read_only_file_limit = - 1 ;
static int open_read_only_file_limit = - 1 ;
static int mmap_limit = - 1 ;
static int mmap_limit = - 1 ;
static const size_t kBufSize = 65536 ;
constexpr const size_t kWritableFile Buffer Size = 65536 ;
static Status PosixError ( const std : : string & context , int err_number ) {
static Status PosixError ( const std : : string & context , int err_number ) {
if ( err_number = = ENOENT ) {
if ( err_number = = ENOENT ) {
@ -213,131 +214,165 @@ class PosixMmapReadableFile: public RandomAccessFile {
}
}
} ;
} ;
class PosixWritableFile : public WritableFile {
private :
// buf_[0, pos_-1] contains data to be written to fd_.
std : : string filename_ ;
int fd_ ;
char buf_ [ kBufSize ] ;
size_t pos_ ;
class PosixWritableFile final : public WritableFile {
public :
public :
PosixWritableFile ( const std : : string & fname , int fd )
: filename_ ( fname ) , fd_ ( fd ) , pos_ ( 0 ) { }
PosixWritableFile ( std : : string filename , int fd )
: pos_ ( 0 ) , fd_ ( fd ) , is_manifest_ ( IsManifest ( filename ) ) ,
filename_ ( std : : move ( filename ) ) , dirname_ ( Dirname ( filename_ ) ) { }
~ PosixWritableFile ( ) {
~ PosixWritableFile ( ) override {
if ( fd_ > = 0 ) {
if ( fd_ > = 0 ) {
// Ignoring any potential errors
// Ignoring any potential errors
Close ( ) ;
Close ( ) ;
}
}
}
}
virtual Status Append ( const Slice & data ) {
size_t n = data . size ( ) ;
const char * p = data . data ( ) ;
Status Append ( const Slice & data ) override {
size_t write_size = data . size ( ) ;
const char * write_data = data . data ( ) ;
// Fit as much as possible into buffer.
// Fit as much as possible into buffer.
size_t copy = std : : min ( n , kBuf Size - pos_ ) ;
memcpy ( buf_ + pos_ , p , copy ) ;
p + = copy ;
n - = copy ;
pos_ + = copy ;
if ( n = = 0 ) {
size_t copy_size = std : : min ( write_size , kWritableFileBuffer Size - pos_ ) ;
std : : memcpy( buf_ + pos_ , write_data , copy_size ) ;
write_data + = copy_size ;
write_size - = copy_size ;
pos_ + = copy_size ;
if ( write_size = = 0 ) {
return Status : : OK ( ) ;
return Status : : OK ( ) ;
}
}
// Can't fit in buffer, so need to do at least one write.
// Can't fit in buffer, so need to do at least one write.
Status s = FlushBuffered ( ) ;
if ( ! s . ok ( ) ) {
return s ;
Status status = FlushBuffer ( ) ;
if ( ! status . ok ( ) ) {
return status ;
}
}
// Small writes go to buffer, large writes are written directly.
// Small writes go to buffer, large writes are written directly.
if ( n < kBufSize ) {
memcpy ( buf_ , p , n ) ;
pos_ = n ;
if ( write_size < kWritableFile Buffer Size ) {
std : : memcpy( buf_ , write_data , write_size ) ;
pos_ = write_size ;
return Status : : OK ( ) ;
return Status : : OK ( ) ;
}
}
return WriteRaw ( p , n ) ;
return WriteUnbuffered ( write_data , write_size ) ;
}
}
virtual Status Close ( ) {
Status re sul t = FlushBuffered ( ) ;
const int r = close ( fd_ ) ;
if ( r < 0 & & re sul t. ok ( ) ) {
re sul t = PosixError ( filename_ , errno ) ;
Status Close ( ) override {
Status status = FlushBuffer ( ) ;
const int close_ result = : : close ( fd_ ) ;
if ( close_ result < 0 & & status . ok ( ) ) {
status = PosixError ( filename_ , errno ) ;
}
}
fd_ = - 1 ;
fd_ = - 1 ;
return re sul t;
return status ;
}
}
virtual Status Flush ( ) {
return FlushBuffered ( ) ;
}
Status SyncDirIfManifest ( ) {
const char * f = filename_ . c_str ( ) ;
const char * sep = strrchr ( f , ' / ' ) ;
Slice basename ;
std : : string dir ;
if ( sep = = nullptr ) {
dir = " . " ;
basename = f ;
} else {
dir = std : : string ( f , sep - f ) ;
basename = sep + 1 ;
}
Status s ;
if ( basename . starts_with ( " MANIFEST " ) ) {
int fd = open ( dir . c_str ( ) , O_RDONLY ) ;
if ( fd < 0 ) {
s = PosixError ( dir , errno ) ;
} else {
if ( fsync ( fd ) < 0 ) {
s = PosixError ( dir , errno ) ;
}
close ( fd ) ;
}
}
return s ;
Status Flush ( ) override {
return FlushBuffer ( ) ;
}
}
virtual Status Sync ( ) {
Status Sync ( ) override {
// Ensure new files referred to by the manifest are in the filesystem.
// Ensure new files referred to by the manifest are in the filesystem.
Status s = SyncDirIfManifest ( ) ;
if ( ! s . ok ( ) ) {
return s ;
//
// This needs to happen before the manifest file is flushed to disk, to
// avoid crashing in a state where the manifest refers to files that are not
// yet on disk.
Status status = SyncDirIfManifest ( ) ;
if ( ! status . ok ( ) ) {
return status ;
}
}
s = FlushBuffered ( ) ;
if ( s . ok ( ) ) {
if ( fdatasync ( fd_ ) ! = 0 ) {
s = PosixError ( filename_ , errno ) ;
}
status = FlushBuffer ( ) ;
if ( status . ok ( ) & & : : fdatasync ( fd_ ) ! = 0 ) {
status = PosixError ( filename_ , errno ) ;
}
}
return s ;
return status ;
}
}
private :
private :
Status FlushBuffered ( ) {
Status s = WriteRaw ( buf_ , pos_ ) ;
Status FlushBuffer ( ) {
Status status = WriteUnbuffered ( buf_ , pos_ ) ;
pos_ = 0 ;
pos_ = 0 ;
return s ;
return status ;
}
}
Status WriteRaw ( const char * p , size_t n ) {
while ( n > 0 ) {
ssize_t r = write ( fd_ , p , n ) ;
if ( r < 0 ) {
Status WriteUnbuffered ( const char * data , size_t size ) {
while ( size > 0 ) {
ssize_t w rite_result = : : write ( fd_ , data , size ) ;
if ( w rite_result < 0 ) {
if ( errno = = EINTR ) {
if ( errno = = EINTR ) {
continue ; // Retry
continue ; // Retry
}
}
return PosixError ( filename_ , errno ) ;
return PosixError ( filename_ , errno ) ;
}
}
p + = r ;
n - = r ;
data + = w rite_result ;
size - = w rite_result ;
}
}
return Status : : OK ( ) ;
return Status : : OK ( ) ;
}
}
Status SyncDirIfManifest ( ) {
Status status ;
if ( ! is_manifest_ ) {
return status ;
}
int fd = : : open ( dirname_ . c_str ( ) , O_RDONLY ) ;
if ( fd < 0 ) {
status = PosixError ( dirname_ , errno ) ;
} else {
if ( : : fsync ( fd ) < 0 ) {
status = PosixError ( dirname_ , errno ) ;
}
: : close ( fd ) ;
}
return status ;
}
// Returns the directory name in a path pointing to a file.
//
// Returns "." if the path does not contain any directory separator.
static std : : string Dirname ( const std : : string & filename ) {
std : : string : : size_type separator_pos = filename . rfind ( ' / ' ) ;
if ( separator_pos = = std : : string : : npos ) {
return std : : string ( " . " ) ;
}
// The filename component should not contain a path separator. If it does,
// the splitting was done incorrectly.
assert ( filename . find ( ' / ' , separator_pos + 1 ) = = std : : string : : npos ) ;
return filename . substr ( 0 , separator_pos ) ;
}
// Extracts the file name from a path pointing to a file.
//
// The returned Slice points to |filename|'s data buffer, so it is only valid
// while |filename| is alive and unchanged.
static Slice Basename ( const std : : string & filename ) {
std : : string : : size_type separator_pos = filename . rfind ( ' / ' ) ;
if ( separator_pos = = std : : string : : npos ) {
return Slice ( filename ) ;
}
// The filename component should not contain a path separator. If it does,
// the splitting was done incorrectly.
assert ( filename . find ( ' / ' , separator_pos + 1 ) = = std : : string : : npos ) ;
return Slice ( filename . data ( ) + separator_pos + 1 ,
filename . length ( ) - separator_pos - 1 ) ;
}
// True if the given file is a manifest file.
static bool IsManifest ( const std : : string & filename ) {
return Basename ( filename ) . starts_with ( " MANIFEST " ) ;
}
// buf_[0, pos_ - 1] contains data to be written to fd_.
char buf_ [ kWritableFileBufferSize ] ;
size_t pos_ ;
int fd_ ;
const bool is_manifest_ ; // True if the file's name starts with MANIFEST.
const std : : string filename_ ;
const std : : string dirname_ ; // The directory of filename_.
} ;
} ;
static int LockOrUnlock ( int fd , bool lock ) {
static int LockOrUnlock ( int fd , bool lock ) {