Skip to content

Commit

Permalink
Merge pull request #75 from hendrik-cliqz/fix-spark-issues
Browse files Browse the repository at this point in the history
Fix spark issues and some code cleanup
  • Loading branch information
hendrikmuhs committed Mar 8, 2016
2 parents b33f77a + 0dfdc54 commit ff09545
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 401 deletions.
20 changes: 14 additions & 6 deletions keyvi/src/cpp/dictionary/fsa/automata.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#define AUTOMATA_H_

#include <sys/mman.h>
#include <boost/lexical_cast.hpp>
#include <boost/filesystem.hpp>
#include <boost/interprocess/file_mapping.hpp>
#include <boost/interprocess/mapped_region.hpp>
Expand Down Expand Up @@ -70,14 +71,18 @@ final {
sparse_array_properties_ = internal::SerializationUtils::ReadJsonRecord(
in_stream);

compact_size_ = sparse_array_properties_.get<uint32_t>("version", 1) == 2;
compact_size_ = boost::lexical_cast<uint32_t> (sparse_array_properties_.get<std::string>("version")) == 2;
size_t bucket_size = compact_size_ ? sizeof(uint16_t) : sizeof(uint32_t);

// get start state and number of keys
start_state_ = boost::lexical_cast<uint64_t> (automata_properties_.get<std::string>("start_state"));
number_of_keys_ = boost::lexical_cast<uint64_t> (automata_properties_.get<std::string>("number_of_keys"));

size_t offset = in_stream.tellg();

file_mapping_ = new boost::interprocess::file_mapping(
filename, boost::interprocess::read_only);
size_t array_size = sparse_array_properties_.get<size_t>("size");
size_t array_size = boost::lexical_cast<size_t>(sparse_array_properties_.get<std::string>("size"));

in_stream.seekg(offset + array_size + bucket_size * array_size - 1);

Expand Down Expand Up @@ -119,8 +124,9 @@ final {

// initialize value store
internal::value_store_t value_store_type =
static_cast<internal::value_store_t>(automata_properties_.get<int>(
"value_store_type"));
static_cast<internal::value_store_t>(
boost::lexical_cast<int> (automata_properties_.get<std::string>(
"value_store_type")));
value_store_reader_ = internal::ValueStoreFactory::MakeReader(value_store_type, in_stream, file_mapping_);

in_stream.close();
Expand All @@ -143,11 +149,11 @@ final {
* @return index of root state.
*/
uint64_t GetStartState() const {
return automata_properties_.get<uint64_t>("start_state");
return start_state_;
}

uint64_t GetNumberOfKeys() const {
return automata_properties_.get<uint64_t>("number_of_keys");
return number_of_keys_;
}

uint64_t TryWalkTransition(uint64_t starting_state, unsigned char c) const {
Expand Down Expand Up @@ -469,6 +475,8 @@ final {
uint32_t* transitions_;
uint16_t* transitions_compact_;
bool compact_size_;
uint64_t start_state_;
uint64_t number_of_keys_;

inline uint64_t ResolvePointer(uint64_t starting_state, unsigned char c) const {
if (!compact_size_) {
Expand Down
2 changes: 1 addition & 1 deletion keyvi/src/cpp/dictionary/fsa/internal/json_value_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ class JsonValueStoreReader final: public IValueStoreReader {
internal::SerializationUtils::ReadJsonRecord(stream);

size_t offset = stream.tellg();
size_t strings_size = properties_.get<size_t>("size");
size_t strings_size = boost::lexical_cast<size_t> (properties_.get<std::string>("size"));

// check for file truncation
if (strings_size > 0) {
Expand Down
273 changes: 3 additions & 270 deletions keyvi/src/cpp/dictionary/fsa/internal/json_value_store_deprecated.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
#define RAPIDJSON_SSE42
#endif

#include <boost/lexical_cast.hpp>

#include "rapidjson/document.h"
#include "rapidjson/writer.h"
#include "rapidjson/stringbuffer.h"
Expand All @@ -59,275 +61,6 @@ namespace dictionary {
namespace fsa {
namespace internal {

/**
* Value store where the value consists of a single integer.
*/
class JsonValueStoreDeprecated final : public IValueStoreWriter {
public:

struct RawPointer
final {
public:
RawPointer()
: RawPointer(0, 0, 0) {
}

RawPointer(uint64_t offset, int hashcode, size_t length)
: offset_(offset),
hashcode_(hashcode),
length_(length) {

if (length > USHRT_MAX) {
length_ = USHRT_MAX;
}

}

int GetHashcode() const {
return hashcode_;
}

uint64_t GetOffset() const {
return offset_;
}

ushort GetLength() const {
return length_;
}

int GetCookie() const {
return cookie_;
}

void SetCookie(int value) {
cookie_ = static_cast<ushort>(value);
}

bool IsEmpty() const {
return offset_ == 0 && hashcode_ == 0 && length_ == 0;
}

bool operator==(const RawPointer& l) {
return offset_ == l.offset_;
}

static size_t GetMaxCookieSize() {
return MaxCookieSize;
}

private:
static const size_t MaxCookieSize = 0xFFFF;

uint64_t offset_;
int32_t hashcode_;
ushort length_;
ushort cookie_ = 0;

};

template<class PersistenceT>
struct RawPointerForCompare
final
{
public:
RawPointerForCompare(const std::string& value,
PersistenceT* persistence)
: value_(value),
persistence_(persistence) {
hashcode_ = std::hash<value_t>()(value);
length_ = value.size();
}

int GetHashcode() const {
return hashcode_;
}

bool operator==(const RawPointer& l) const {
// First filter - check if hash code is the same
if (l.GetHashcode() != hashcode_) {
return false;
}
TRACE("check");
size_t length_l = l.GetLength();

if (length_l < USHRT_MAX && length_l != length_) {
return false;
}

TRACE("Compare values at data level length: %d %d", l.GetOffset(), value_.size());

return persistence_->Compare(l.GetOffset(), (void*) value_.data(), value_.size());
}

private:
std::string value_;
PersistenceT* persistence_;
int32_t hashcode_;
size_t length_;
};

typedef std::string value_t;
static const uint64_t no_value = 0;
static const bool inner_weight = false;

JsonValueStoreDeprecated(const vs_param_t& parameters,
size_t memory_limit = 104857600)
: IValueStoreWriter(parameters), hash_(memory_limit) {
temporary_directory_ = parameters_[TEMPORARY_PATH_KEY];
temporary_directory_ /= boost::filesystem::unique_path(
"dictionary-fsa-json_value_store-%%%%-%%%%-%%%%-%%%%");
boost::filesystem::create_directory(temporary_directory_);

size_t external_memory_chunk_size = 1073741824;

values_extern_ = new MemoryMapManager(external_memory_chunk_size,
temporary_directory_,
"json_values_filebuffer");
}

~JsonValueStoreDeprecated() {
delete values_extern_;
boost::filesystem::remove_all(temporary_directory_);
}

JsonValueStoreDeprecated() = delete;
JsonValueStoreDeprecated& operator=(JsonValueStoreDeprecated const&) = delete;
JsonValueStoreDeprecated(const JsonValueStoreDeprecated& that) = delete;

/**
* Simple implementation of a value store for json values:
* todo: performance improvements?
*/
uint64_t GetValue(const value_t& value, bool& no_minimization) {
std::string packed_value;
msgpack_buffer_.clear();

++number_of_values_;

rapidjson::Document json_document;
json_document.Parse(value.c_str());

if (!json_document.HasParseError()) {
TRACE("Got json");
msgpack::pack(&msgpack_buffer_, json_document);
} else {
TRACE("Got a normal string");
msgpack::pack(&msgpack_buffer_, value);
}

// zlib compression
if (msgpack_buffer_.size() > 32) {
packed_value = compress_string(msgpack_buffer_.data(), msgpack_buffer_.size());
} else {
util::encodeVarint(msgpack_buffer_.size(), packed_value);
packed_value.append(msgpack_buffer_.data(), msgpack_buffer_.size());
}

TRACE("Packed value: %s", packed_value.c_str());

const RawPointerForCompare<MemoryMapManager> stp(packed_value,
values_extern_);
const RawPointer p = hash_.Get(stp);

if (!p.IsEmpty()) {
// found the same value again, minimize
TRACE("Minimized value");
return p.GetOffset();
} // else persist string value

no_minimization = true;
TRACE("New unique value");
++number_of_unique_values_;

uint64_t pt = static_cast<uint64_t>(values_buffer_size_);

values_extern_->Append(values_buffer_size_, (void*)packed_value.data(), packed_value.size());
values_buffer_size_ += packed_value.size();

TRACE("add value to hash at %d, length %d", pt, packed_value.size());
hash_.Add(RawPointer(pt, stp.GetHashcode(), packed_value.size()));

return pt;
}

uint32_t GetWeightValue(value_t value) const {
return 0;
}

value_store_t GetValueStoreType() const {
return JSON_VALUE_STORE_DEPRECATED;
}

void Write(std::ostream& stream) {

boost::property_tree::ptree pt;
pt.put("size", std::to_string(values_buffer_size_));
pt.put("values", std::to_string(number_of_values_));
pt.put("unique_values", std::to_string(number_of_unique_values_));

internal::SerializationUtils::WriteJsonRecord(stream, pt);
TRACE("Wrote JSON header, stream at %d", stream.tellp());

values_extern_->Write(stream, values_buffer_size_);
}

private:
MemoryMapManager* values_extern_;

LeastRecentlyUsedGenerationsCache<RawPointer> hash_;
msgpack::sbuffer msgpack_buffer_;
char zlib_buffer_[32768];
size_t number_of_values_ = 0;
size_t number_of_unique_values_ = 0;
size_t values_buffer_size_ = 0;
boost::filesystem::path temporary_directory_;

/** Compress a STL string using zlib with given compression level and return
* the binary data. */
std::string compress_string(const char* data, size_t data_size,
int compressionlevel = Z_BEST_COMPRESSION)
{
z_stream zs; // z_stream is zlib's control structure
memset(&zs, 0, sizeof(zs));

if (deflateInit(&zs, compressionlevel) != Z_OK)
throw(std::runtime_error("deflateInit failed while compressing."));

zs.next_in = (Bytef*)data;
zs.avail_in = data_size; // set the z_stream's input

int ret;
std::string outstring = " ";

// retrieve the compressed bytes blockwise
do {
zs.next_out = reinterpret_cast<Bytef*>(zlib_buffer_);
zs.avail_out = sizeof(zlib_buffer_);

ret = deflate(&zs, Z_FINISH);

if (outstring.size() - 1 < zs.total_out) {
// append the block to the output string
outstring.append(zlib_buffer_,
zs.total_out - outstring.size() + 1);
}
} while (ret == Z_OK);

deflateEnd(&zs);

if (ret != Z_STREAM_END) { // an error occurred that was not EOF
std::ostringstream oss;
oss << "Exception during zlib compression: (" << ret << ") " << zs.msg;
throw(std::runtime_error(oss.str()));
}

std::string size_prefix;
util::encodeVarint(outstring.size(), size_prefix);

return size_prefix + outstring;
}
};

class JsonValueStoreDeprecatedReader final: public IValueStoreReader {
public:
using IValueStoreReader::IValueStoreReader;
Expand All @@ -343,7 +76,7 @@ class JsonValueStoreDeprecated final : public IValueStoreWriter {
internal::SerializationUtils::ReadJsonRecord(stream);

size_t offset = stream.tellg();
size_t strings_size = properties_.get<size_t>("size");
size_t strings_size = boost::lexical_cast<size_t>(properties_.get<std::string>("size"));

// check for file truncation
if (strings_size > 0) {
Expand Down
Loading

0 comments on commit ff09545

Please sign in to comment.