@ -125,7 +125,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
db_lock_ ( NULL ) ,
db_lock_ ( NULL ) ,
shutting_down_ ( NULL ) ,
shutting_down_ ( NULL ) ,
bg_cv_ ( & mutex_ ) ,
bg_cv_ ( & mutex_ ) ,
mem_ ( new MemTable ( internal_comparator_ ) ) ,
mem_ ( NULL ) ,
imm_ ( NULL ) ,
imm_ ( NULL ) ,
logfile_ ( NULL ) ,
logfile_ ( NULL ) ,
logfile_number_ ( 0 ) ,
logfile_number_ ( 0 ) ,
@ -134,7 +134,6 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
tmp_batch_ ( new WriteBatch ) ,
tmp_batch_ ( new WriteBatch ) ,
bg_compaction_scheduled_ ( false ) ,
bg_compaction_scheduled_ ( false ) ,
manual_compaction_ ( NULL ) {
manual_compaction_ ( NULL ) {
mem_ - > Ref ( ) ;
has_imm_ . Release_Store ( NULL ) ;
has_imm_ . Release_Store ( NULL ) ;
// Reserve ten files or so for other uses and give the rest to TableCache.
// Reserve ten files or so for other uses and give the rest to TableCache.
@ -271,7 +270,7 @@ void DBImpl::DeleteObsoleteFiles() {
}
}
}
}
Status DBImpl : : Recover ( VersionEdit * edit ) {
Status DBImpl : : Recover ( VersionEdit * edit , bool * save_manifest ) {
mutex_ . AssertHeld ( ) ;
mutex_ . AssertHeld ( ) ;
// Ignore error from CreateDir since the creation of the DB is
// Ignore error from CreateDir since the creation of the DB is
@ -301,66 +300,69 @@ Status DBImpl::Recover(VersionEdit* edit) {
}
}
}
}
s = versions_ - > Recover ( ) ;
if ( s . ok ( ) ) {
SequenceNumber max_sequence ( 0 ) ;
// Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous
// incarnation without registering them in the descriptor).
//
// Note that PrevLogNumber() is no longer used, but we pay
// attention to it in case we are recovering a database
// produced by an older version of leveldb.
const uint64_t min_log = versions_ - > LogNumber ( ) ;
const uint64_t prev_log = versions_ - > PrevLogNumber ( ) ;
std : : vector < std : : string > filenames ;
s = env_ - > GetChildren ( dbname_ , & filenames ) ;
s = versions_ - > Recover ( save_manifest ) ;
if ( ! s . ok ( ) ) {
return s ;
}
SequenceNumber max_sequence ( 0 ) ;
// Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous
// incarnation without registering them in the descriptor).
//
// Note that PrevLogNumber() is no longer used, but we pay
// attention to it in case we are recovering a database
// produced by an older version of leveldb.
const uint64_t min_log = versions_ - > LogNumber ( ) ;
const uint64_t prev_log = versions_ - > PrevLogNumber ( ) ;
std : : vector < std : : string > filenames ;
s = env_ - > GetChildren ( dbname_ , & filenames ) ;
if ( ! s . ok ( ) ) {
return s ;
}
std : : set < uint64_t > expected ;
versions_ - > AddLiveFiles ( & expected ) ;
uint64_t number ;
FileType type ;
std : : vector < uint64_t > logs ;
for ( size_t i = 0 ; i < filenames . size ( ) ; i + + ) {
if ( ParseFileName ( filenames [ i ] , & number , & type ) ) {
expected . erase ( number ) ;
if ( type = = kLogFile & & ( ( number > = min_log ) | | ( number = = prev_log ) ) )
logs . push_back ( number ) ;
}
}
if ( ! expected . empty ( ) ) {
char buf [ 50 ] ;
snprintf ( buf , sizeof ( buf ) , " %d missing files; e.g. " ,
static_cast < int > ( expected . size ( ) ) ) ;
return Status : : Corruption ( buf , TableFileName ( dbname_ , * ( expected . begin ( ) ) ) ) ;
}
// Recover in the order in which the logs were generated
std : : sort ( logs . begin ( ) , logs . end ( ) ) ;
for ( size_t i = 0 ; i < logs . size ( ) ; i + + ) {
s = RecoverLogFile ( logs [ i ] , ( i = = logs . size ( ) - 1 ) , save_manifest , edit ,
& max_sequence ) ;
if ( ! s . ok ( ) ) {
if ( ! s . ok ( ) ) {
return s ;
return s ;
}
}
std : : set < uint64_t > expected ;
versions_ - > AddLiveFiles ( & expected ) ;
uint64_t number ;
FileType type ;
std : : vector < uint64_t > logs ;
for ( size_t i = 0 ; i < filenames . size ( ) ; i + + ) {
if ( ParseFileName ( filenames [ i ] , & number , & type ) ) {
expected . erase ( number ) ;
if ( type = = kLogFile & & ( ( number > = min_log ) | | ( number = = prev_log ) ) )
logs . push_back ( number ) ;
}
}
if ( ! expected . empty ( ) ) {
char buf [ 50 ] ;
snprintf ( buf , sizeof ( buf ) , " %d missing files; e.g. " ,
static_cast < int > ( expected . size ( ) ) ) ;
return Status : : Corruption ( buf , TableFileName ( dbname_ , * ( expected . begin ( ) ) ) ) ;
}
// Recover in the order in which the logs were generated
std : : sort ( logs . begin ( ) , logs . end ( ) ) ;
for ( size_t i = 0 ; i < logs . size ( ) ; i + + ) {
s = RecoverLogFile ( logs [ i ] , edit , & max_sequence ) ;
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_ - > MarkFileNumberUsed ( logs [ i ] ) ;
}
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_ - > MarkFileNumberUsed ( logs [ i ] ) ;
}
if ( s . ok ( ) ) {
if ( versions_ - > LastSequence ( ) < max_sequence ) {
versions_ - > SetLastSequence ( max_sequence ) ;
}
}
if ( versions_ - > LastSequence ( ) < max_sequence ) {
versions_ - > SetLastSequence ( max_sequence ) ;
}
}
return s ;
return Status : : OK ( ) ;
}
}
Status DBImpl : : RecoverLogFile ( uint64_t log_number ,
VersionEdit * edit ,
Status DBImpl : : RecoverLogFile ( uint64_t log_number , bool last_log ,
bool * save_manifest , VersionEdit * edit ,
SequenceNumber * max_sequence ) {
SequenceNumber * max_sequence ) {
struct LogReporter : public log : : Reader : : Reporter {
struct LogReporter : public log : : Reader : : Reporter {
Env * env ;
Env * env ;
@ -405,6 +407,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
std : : string scratch ;
std : : string scratch ;
Slice record ;
Slice record ;
WriteBatch batch ;
WriteBatch batch ;
int compactions = 0 ;
MemTable * mem = NULL ;
MemTable * mem = NULL ;
while ( reader . ReadRecord ( & record , & scratch ) & &
while ( reader . ReadRecord ( & record , & scratch ) & &
status . ok ( ) ) {
status . ok ( ) ) {
@ -432,25 +435,52 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
}
}
if ( mem - > ApproximateMemoryUsage ( ) > options_ . write_buffer_size ) {
if ( mem - > ApproximateMemoryUsage ( ) > options_ . write_buffer_size ) {
compactions + + ;
* save_manifest = true ;
status = WriteLevel0Table ( mem , edit , NULL ) ;
status = WriteLevel0Table ( mem , edit , NULL ) ;
mem - > Unref ( ) ;
mem = NULL ;
if ( ! status . ok ( ) ) {
if ( ! status . ok ( ) ) {
// Reflect errors immediately so that conditions like full
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
// file-systems cause the DB::Open() to fail.
break ;
break ;
}
}
mem - > Unref ( ) ;
mem = NULL ;
}
}
}
}
if ( status . ok ( ) & & mem ! = NULL ) {
status = WriteLevel0Table ( mem , edit , NULL ) ;
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
delete file ;
// See if we should keep reusing the last log file.
if ( status . ok ( ) & & options_ . reuse_logs & & last_log & & compactions = = 0 ) {
assert ( logfile_ = = NULL ) ;
assert ( log_ = = NULL ) ;
assert ( mem_ = = NULL ) ;
uint64_t lfile_size ;
if ( env_ - > GetFileSize ( fname , & lfile_size ) . ok ( ) & &
env_ - > NewAppendableFile ( fname , & logfile_ ) . ok ( ) ) {
Log ( options_ . info_log , " Reusing old log %s \n " , fname . c_str ( ) ) ;
log_ = new log : : Writer ( logfile_ , lfile_size ) ;
logfile_number_ = log_number ;
if ( mem ! = NULL ) {
mem_ = mem ;
mem = NULL ;
} else {
// mem can be NULL if lognum exists but was empty.
mem_ = new MemTable ( internal_comparator_ ) ;
mem_ - > Ref ( ) ;
}
}
}
if ( mem ! = NULL ) {
// mem did not get reused; compact it.
if ( status . ok ( ) ) {
* save_manifest = true ;
status = WriteLevel0Table ( mem , edit , NULL ) ;
}
mem - > Unref ( ) ;
}
}
if ( mem ! = NULL ) mem - > Unref ( ) ;
delete file ;
return status ;
return status ;
}
}
@ -1449,8 +1479,11 @@ Status DB::Open(const Options& options, const std::string& dbname,
DBImpl * impl = new DBImpl ( options , dbname ) ;
DBImpl * impl = new DBImpl ( options , dbname ) ;
impl - > mutex_ . Lock ( ) ;
impl - > mutex_ . Lock ( ) ;
VersionEdit edit ;
VersionEdit edit ;
Status s = impl - > Recover ( & edit ) ; // Handles create_if_missing, error_if_exists
if ( s . ok ( ) ) {
// Recover handles create_if_missing, error_if_exists
bool save_manifest = false ;
Status s = impl - > Recover ( & edit , & save_manifest ) ;
if ( s . ok ( ) & & impl - > mem_ = = NULL ) {
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl - > versions_ - > NewFileNumber ( ) ;
uint64_t new_log_number = impl - > versions_ - > NewFileNumber ( ) ;
WritableFile * lfile ;
WritableFile * lfile ;
s = options . env - > NewWritableFile ( LogFileName ( dbname , new_log_number ) ,
s = options . env - > NewWritableFile ( LogFileName ( dbname , new_log_number ) ,
@ -1460,15 +1493,22 @@ Status DB::Open(const Options& options, const std::string& dbname,
impl - > logfile_ = lfile ;
impl - > logfile_ = lfile ;
impl - > logfile_number_ = new_log_number ;
impl - > logfile_number_ = new_log_number ;
impl - > log_ = new log : : Writer ( lfile ) ;
impl - > log_ = new log : : Writer ( lfile ) ;
s = impl - > versions_ - > LogAndApply ( & edit , & impl - > mutex_ ) ;
}
if ( s . ok ( ) ) {
impl - > DeleteObsoleteFiles ( ) ;
impl - > MaybeScheduleCompaction ( ) ;
impl - > mem_ = new MemTable ( impl - > internal_comparator_ ) ;
impl - > mem_ - > Ref ( ) ;
}
}
}
}
if ( s . ok ( ) & & save_manifest ) {
edit . SetPrevLogNumber ( 0 ) ; // No older logs needed after recovery.
edit . SetLogNumber ( impl - > logfile_number_ ) ;
s = impl - > versions_ - > LogAndApply ( & edit , & impl - > mutex_ ) ;
}
if ( s . ok ( ) ) {
impl - > DeleteObsoleteFiles ( ) ;
impl - > MaybeScheduleCompaction ( ) ;
}
impl - > mutex_ . Unlock ( ) ;
impl - > mutex_ . Unlock ( ) ;
if ( s . ok ( ) ) {
if ( s . ok ( ) ) {
assert ( impl - > mem_ ! = NULL ) ;
* dbptr = impl ;
* dbptr = impl ;
} else {
} else {
delete impl ;
delete impl ;