Skip to content

Commit

Permalink
Merge pull request #67 from hendrik-cliqz/register_value
Browse files Browse the repository at this point in the history
refactoring: do value packing before sorting
  • Loading branch information
hendrikmuhs committed Jan 21, 2016
2 parents dcaabc4 + 004ef8d commit f5d2b0a
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 230 deletions.
146 changes: 37 additions & 109 deletions keyvi/src/cpp/dictionary/dictionary_compiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,32 +45,31 @@ namespace dictionary {
* structure for internal processing
* Note: Not using std::pair because it did not compile with Tpie
*/
template<typename value_t>
struct key_value_pair {
key_value_pair() : key(), value() {
}

key_value_pair(const std::string& k, const value_t v): key(k), value(v) {}
key_value_pair(const std::string& k, const fsa::ValueHandle& v): key(k), value(v) {}

bool operator<(const key_value_pair kv) const {
return key < kv.key;
}

std::string key;
value_t value;
fsa::ValueHandle value;
};

/**
* Tpie serialization and deserialization for sorting.
*/
template<typename Dst, typename v>
void serialize(Dst & d, const keyvi::dictionary::key_value_pair<v> & pt) {
template<typename Dst>
void serialize(Dst & d, const keyvi::dictionary::key_value_pair & pt) {
using tpie::serialize;
serialize(d, pt.key);
serialize(d, pt.value);
}
template<typename Src, typename v>
void unserialize(Src & s, keyvi::dictionary::key_value_pair<v> & pt) {
template<typename Src>
void unserialize(Src & s, keyvi::dictionary::key_value_pair & pt) {
using tpie::unserialize;
unserialize(s, pt.key);
unserialize(s, pt.value);
Expand All @@ -83,7 +82,7 @@ template<class PersistenceT, class ValueStoreT = fsa::internal::NullValueStore>
class DictionaryCompiler
final {
typedef const fsa::internal::IValueStoreWriter::vs_param_t compiler_param_t;
typedef key_value_pair<typename ValueStoreT::value_t> key_value_t;
typedef key_value_pair key_value_t;
typedef std::function<void (size_t , size_t, void*)> callback_t;

public:
Expand All @@ -104,12 +103,15 @@ class DictionaryCompiler
sorter_.set_available_memory(memory_limit);
sorter_.begin();

generator_ = new fsa::GeneratorAdapter<PersistenceT, ValueStoreT, uint32_t, int32_t>(memory_limit, params);
value_store_= new ValueStoreT(params);
}

~DictionaryCompiler(){
if (generator_) {
delete generator_;
} else {
// if generator was not created we have to delete the value store ourselves
delete value_store_;
}
}

Expand All @@ -118,7 +120,8 @@ class DictionaryCompiler

void Add(const std::string& input_key, typename ValueStoreT::value_t value =
ValueStoreT::no_value) {
sorter_.push(key_value_t(input_key, value));

sorter_.push(key_value_t(input_key, RegisterValue(value)));

size_of_keys_ += input_key.size();
}
Expand All @@ -133,9 +136,9 @@ class DictionaryCompiler

/**
* Do the final compilation
* todo: implement some sort of progress callback
*/
void Compile(callback_t progress_callback = nullptr, void* user_data = nullptr) {
value_store_->CloseFeeding();
sorter_.end();
sorter_.merge_runs();
CreateGenerator();
Expand Down Expand Up @@ -165,99 +168,6 @@ class DictionaryCompiler
generator_->CloseFeeding();
}

/**
* Compile a partition
*
* @param maximum_partition_size
* @param stream
* @param maximum_partition_depth
* @return
*/
bool CompileNext(size_t maximum_partition_size, std::ostream& stream, int maximum_partition_depth = 2,
callback_t progress_callback = nullptr, void* user_data = nullptr)
{
if (!sort_finalized_){
CreateGenerator();

sorter_.end();
sorter_.merge_runs();
sort_finalized_ = true;

number_of_items_ = sorter_.item_count();
callback_trigger_ = 1+(number_of_items_-1)/100;
if (callback_trigger_ > 100000) {
callback_trigger_ = 100000;
}
}

size_t partition_conservative_size = (maximum_partition_size / 10) * 9;

TRACE("Compile next partition");
{
int i = 0;

//size_t number_of_items = sorter_.item_count();

while (sorter_.can_pull()) {
key_value_t key_value = sorter_.pull();

generator_->Add(key_value.key, key_value.value);
++i;
++added_key_values_;

if (progress_callback && (added_key_values_ % callback_trigger_ == 0)){
progress_callback(added_key_values_, number_of_items_, user_data);
}

if (i % 1000 == 0 && generator_->GetFsaSize() > partition_conservative_size){
// todo: continue feeding until we found a good point to persist the partition

TRACE("finish partition: find good partition end");
std::string last_key (key_value.key);
bool make_new_partition = false;

while (sorter_.can_pull()) {
key_value = sorter_.pull();

if (fsa::get_common_prefix_length(last_key.c_str(), key_value.key.c_str()) < maximum_partition_depth) {
make_new_partition = true;
break;
}

generator_->Add(key_value.key, key_value.value);
++added_key_values_;

if (progress_callback && (added_key_values_ % callback_trigger_ == 0)){
progress_callback(added_key_values_, number_of_items_, user_data);
}

last_key = key_value.key;
}
TRACE("finish partition: finalize partition");

generator_->CloseFeeding();
generator_->Write(stream);

// handle case where we do not have to start a new partition
if (!make_new_partition){
return false;
}

generator_->Reset();
generator_->Add(key_value.key, key_value.value);

return true;
}
}
}

// finalize the last partition
generator_->CloseFeeding();
generator_->Write(stream);

return false;
}

/**
* Set a custom manifest to be embedded into the index file.
*
Expand Down Expand Up @@ -297,7 +207,7 @@ class DictionaryCompiler
tpie::serialization_sorter<key_value_t> sorter_;
size_t memory_limit_;
compiler_param_t params_;

ValueStoreT* value_store_;
fsa::GeneratorAdapterInterface<PersistenceT, ValueStoreT>* generator_ = nullptr;
bool sort_finalized_ = false;
size_t added_key_values_ = 0;
Expand All @@ -309,7 +219,25 @@ class DictionaryCompiler

void CreateGenerator();

/**
* Register a value before inserting the key(for optimization purposes).
*
* @param value The Value
* @return a handle that later needs to be passed to Add()
*/
fsa::ValueHandle RegisterValue(typename ValueStoreT::value_t value =
ValueStoreT::no_value){

fsa::ValueHandle handle;
handle.no_minimization = false;

handle.value_idx = value_store_->GetValue(value, handle.no_minimization);

// if inner weights are used update them
handle.weight = value_store_->GetWeightValue(value);

return handle;
}
};

/**
Expand All @@ -323,15 +251,15 @@ inline void DictionaryCompiler<PersistenceT, ValueStoreT>::CreateGenerator()
// todo: find good parameters for auto-guessing this
if (size_of_keys_ > UINT32_MAX){
if (memory_limit_ > 0x280000000UL /* 10 GB */) {
generator_ = new fsa::GeneratorAdapter<PersistenceT, ValueStoreT, uint64_t, int64_t>(memory_limit_, params_);
generator_ = new fsa::GeneratorAdapter<PersistenceT, ValueStoreT, uint64_t, int64_t>(memory_limit_, params_, value_store_);
} else {
generator_ = new fsa::GeneratorAdapter<PersistenceT, ValueStoreT, uint64_t, int32_t>(memory_limit_, params_);
generator_ = new fsa::GeneratorAdapter<PersistenceT, ValueStoreT, uint64_t, int32_t>(memory_limit_, params_, value_store_);
}
} else {
if (memory_limit_ > 0x140000000UL) /* 5GB */ {
generator_ = new fsa::GeneratorAdapter<PersistenceT, ValueStoreT, uint32_t, int64_t>(memory_limit_, params_);
generator_ = new fsa::GeneratorAdapter<PersistenceT, ValueStoreT, uint32_t, int64_t>(memory_limit_, params_, value_store_);
} else {
generator_ = new fsa::GeneratorAdapter<PersistenceT, ValueStoreT, uint32_t, int32_t>(memory_limit_, params_);
generator_ = new fsa::GeneratorAdapter<PersistenceT, ValueStoreT, uint32_t, int32_t>(memory_limit_, params_, value_store_);
}
}

Expand Down
Loading

0 comments on commit f5d2b0a

Please sign in to comment.