From bf68c0d2cf83bdc28041608b98e576b69fa7e8d1 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Wed, 28 Jan 2026 17:17:31 +0100 Subject: [PATCH 1/3] MINIFICPP-2849 Implement LMDB based content repository --- .github/workflows/ci.yml | 7 +- CMakeLists.txt | 5 +- CONFIGURE.md | 14 +- cmake/LMDB.cmake | 44 +++ cmake/MiNiFiOptions.cmake | 1 + conf/minifi.properties.in | 3 + extensions/lmdb/CMakeLists.txt | 37 +++ extensions/lmdb/LmdbContentRepository.cpp | 273 ++++++++++++++++ extensions/lmdb/LmdbContentRepository.h | 85 +++++ extensions/lmdb/LmdbStream.cpp | 116 +++++++ extensions/lmdb/LmdbStream.h | 66 ++++ extensions/lmdb/tests/CMakeLists.txt | 34 ++ .../lmdb/tests/LmdbContentRepositoryTests.cpp | 241 ++++++++++++++ .../lmdb/tests/LmdbContentSessionTests.cpp | 301 ++++++++++++++++++ extensions/lmdb/tests/LmdbStreamTests.cpp | 271 ++++++++++++++++ .../tests/ContentSessionTests.cpp | 10 +- libminifi/src/Configuration.cpp | 1 + libminifi/src/core/RepositoryFactory.cpp | 2 + libminifi/test/libtest/unit/TestBase.h | 1 + .../minifi-cpp/properties/Configuration.h | 2 + thirdparty/lmdb/add-cmake-file.patch | 29 ++ 21 files changed, 1533 insertions(+), 10 deletions(-) create mode 100644 cmake/LMDB.cmake create mode 100644 extensions/lmdb/CMakeLists.txt create mode 100644 extensions/lmdb/LmdbContentRepository.cpp create mode 100644 extensions/lmdb/LmdbContentRepository.h create mode 100644 extensions/lmdb/LmdbStream.cpp create mode 100644 extensions/lmdb/LmdbStream.h create mode 100644 extensions/lmdb/tests/CMakeLists.txt create mode 100644 extensions/lmdb/tests/LmdbContentRepositoryTests.cpp create mode 100644 extensions/lmdb/tests/LmdbContentSessionTests.cpp create mode 100644 extensions/lmdb/tests/LmdbStreamTests.cpp create mode 100644 thirdparty/lmdb/add-cmake-file.patch diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8fa667fa87..ed050afe03 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,7 +11,7 @@ concurrency: env: DOCKER_CMAKE_FLAGS: -DDOCKER_VERIFY_THREAD=3 -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DCI_BUILD=ON -DENABLE_AWS=ON -DENABLE_KAFKA=ON -DENABLE_MQTT=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON \ -DENABLE_SPLUNK=ON -DENABLE_GCP=ON -DENABLE_OPC=ON -DENABLE_PYTHON_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_TEST_PROCESSORS=ON -DENABLE_PROMETHEUS=ON \ - -DENABLE_ELASTICSEARCH=ON -DENABLE_GRAFANA_LOKI=ON -DENABLE_COUCHBASE=ON -DENABLE_LLAMACPP=ON -DDOCKER_BUILD_ONLY=ON -DMINIFI_PERFORMANCE_TESTS=ON + -DENABLE_ELASTICSEARCH=ON -DENABLE_GRAFANA_LOKI=ON -DENABLE_COUCHBASE=ON -DENABLE_LLAMACPP=ON -DENABLE_LMDB=ON -DDOCKER_BUILD_ONLY=ON -DMINIFI_PERFORMANCE_TESTS=ON CCACHE_DIR: ${{ GITHUB.WORKSPACE }}/.ccache jobs: macos_xcode: @@ -41,6 +41,7 @@ jobs: -DENABLE_LIBARCHIVE=ON -DENABLE_LLAMACPP=ON -DENABLE_KAFKA=ON + -DENABLE_LMDB=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_LZMA=ON -DENABLE_MQTT=ON @@ -146,6 +147,7 @@ jobs: -DENABLE_LIBARCHIVE=ON -DENABLE_KAFKA=ON -DENABLE_LLAMACPP=ON + -DENABLE_LMDB=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_LZMA=ON -DENABLE_MQTT=ON @@ -244,10 +246,11 @@ jobs: -DENABLE_EXECUTE_PROCESS=ON -DENABLE_GCP=ON -DENABLE_GRAFANA_LOKI=ON + -DENABLE_KAFKA=ON -DENABLE_KUBERNETES=ON -DENABLE_LIBARCHIVE=ON -DENABLE_LLAMACPP=ON - -DENABLE_KAFKA=ON + -DENABLE_LMDB=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_LZMA=ON -DENABLE_MQTT=ON diff --git a/CMakeLists.txt b/CMakeLists.txt index 5af9c071c7..0b7cf8069d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -742,15 +742,16 @@ endif() cpack_add_component_group(extensions DISPLAY_NAME "Extensions" EXPANDED) set(EXTENSIONS_ENABLED_BY_DEFAULT ( + minifi-archive-extensions minifi-aws minifi-azure minifi-civet-extensions minifi-elasticsearch minifi-gcp minifi-grafana-loki - minifi-archive-extensions - minifi-mqtt-extensions minifi-kafka + minifi-lmdb + minifi-mqtt-extensions minifi-pdh minifi-prometheus minifi-rocksdb-repos diff --git a/CONFIGURE.md b/CONFIGURE.md index 40cb44b7b4..b08b4cdbea 100644 --- a/CONFIGURE.md +++ b/CONFIGURE.md @@ -625,7 +625,7 @@ The Flow File Repository can be configured with the `nifi.flowfile.repository.cl # in minifi.properties nifi.flowfile.repository.class.name=NoOpRepository # VolatileFlowFileRepository can also be used which is an alias for NoOpRepository -The Content Repository can be configured with the `nifi.content.repository.class.name` property. If not specified, it uses the `DatabaseContentRepository` class by default, which persists the content in a RocksDB database. `DatabaseContentRepository` is also the default value specified in the minifi.properties file. Alternatively it can be configured to use a `VolatileContentRepository` that keeps the state in memory (so the state gets lost upon restart), or the `FileSystemRepository` to keep the state in regular files. +The Content Repository can be configured with the `nifi.content.repository.class.name` property. If not specified, it uses the `DatabaseContentRepository` class by default, which persists the content in a RocksDB database. `DatabaseContentRepository` is also the default value specified in the minifi.properties file. Alternatively it can be configured to use a `VolatileContentRepository` that keeps the state in memory (so the state gets lost upon restart), the `FileSystemRepository` to keep the state in regular files, or `LmdbContentRepository` that uses LMDB database as an alternative to RocksDB. **NOTE:** RocksDB database has a limit of 4GB for the size of a database object. Due to this if you expect to process larger flow files than 4GB you should use the `FileSystemRepository`. The downside of using `FileSystemRepository` is that it does not have the transactional guarantees of the RocksDB repository implementation. @@ -745,6 +745,18 @@ RocksDB options can also be overridden for a specific repository using the `nifi nifi.provenance.repository.rocksdb.options.use_direct_reads=false nifi.state.storage.rocksdb.options.use_direct_io_for_flush_and_compaction=false +### Configuring LMDB content repository + +There is an alternative content repository that can be used: the LMDB database. When `LmdbContentRepository` is set, LMDB is used for storing content, which is a memory-mapped, fast (especially for reads), low-footprint, key-value database. It can be a good alternative on memory-limited edge devices, but each use case should be evaluated separately. The caveats of using this database are the following: + +- Single writer only — concurrent writes are serialized, which can be a bottleneck for write-heavy workloads +- Database file never shrinks automatically — occasional large flow files could keep the database size permanently high on disk and in the reserved virtual address space +- No compression - stores everything uncompressed on disk +- The maximum database size must be set upfront, which is done in the minifi.properties file with the following property, with the current default value set to 10 GB: + + # in minifi.properties + nifi.content.repository.lmdb.max.db.size=10 GB + #### Shared database It is also possible to use a single database to store multiple repositories with the `minifidb://` scheme. diff --git a/cmake/LMDB.cmake b/cmake/LMDB.cmake new file mode 100644 index 0000000000..267350b076 --- /dev/null +++ b/cmake/LMDB.cmake @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +include(FetchContent) + +set(PATCH_FILE "${CMAKE_SOURCE_DIR}/thirdparty/lmdb/add-cmake-file.patch") +set(PC ${Bash_EXECUTABLE} -c "set -x &&\ + (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE}\\\")") + +FetchContent_Declare( + lmdb + URL https://github.com/LMDB/lmdb/archive/refs/tags/LMDB_1.0.0-branch.tar.gz + URL_HASH SHA256=8d3e790194e43a72f172f34c442ea4737b2d1433fc0983f2ef70bae999bc2d28 + PATCH_COMMAND "${PC}" + SOURCE_SUBDIR "libraries/liblmdb" + SYSTEM +) + +if (WIN32) + get_directory_property(MINIFI_SAVED_COMPILE_DEFS COMPILE_DEFINITIONS) + remove_definitions(-DWIN32_LEAN_AND_MEAN) +endif() + +FetchContent_MakeAvailable(lmdb) + +if (WIN32) + set_directory_properties(PROPERTIES COMPILE_DEFINITIONS "${MINIFI_SAVED_COMPILE_DEFS}") +endif() + +set(LMDB_INCLUDE_DIR "${lmdb_SOURCE_DIR}/libraries/liblmdb") diff --git a/cmake/MiNiFiOptions.cmake b/cmake/MiNiFiOptions.cmake index 5c9820d419..669aa91702 100644 --- a/cmake/MiNiFiOptions.cmake +++ b/cmake/MiNiFiOptions.cmake @@ -90,6 +90,7 @@ endif() add_minifi_option(ENABLE_ALL "Enables all extensions" OFF) add_minifi_option(ENABLE_CIVET "Enables CivetWeb components." ON) add_minifi_option(ENABLE_ROCKSDB "Enables the RocksDB extension." ON) +add_minifi_option(ENABLE_LMDB "Enables the LMDB extension." ON) add_minifi_option(ENABLE_LIBARCHIVE "Enables the lib archive extensions." ON) add_minifi_option(ENABLE_LZMA "Enables the liblzma build" ON) add_minifi_option(ENABLE_BZIP2 "Enables the bzip2 build" ON) diff --git a/conf/minifi.properties.in b/conf/minifi.properties.in index afcb7f9d9f..722fc1ca71 100644 --- a/conf/minifi.properties.in +++ b/conf/minifi.properties.in @@ -54,6 +54,9 @@ nifi.content.repository.class.name=DatabaseContentRepository # nifi.database.content.repository.rocksdb.compaction.period=2 min # nifi.database.content.repository.optimize.for.small.db.cache.size=8 MB +## Relates to the internal workings of the LMDB backend +# nifi.content.repository.lmdb.max.db.size=10 GB + # setting this value to "0" enables synchronous deletion # nifi.database.content.repository.purge.period = 1 sec diff --git a/extensions/lmdb/CMakeLists.txt b/extensions/lmdb/CMakeLists.txt new file mode 100644 index 0000000000..4281b24da2 --- /dev/null +++ b/extensions/lmdb/CMakeLists.txt @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +if (NOT (ENABLE_ALL OR ENABLE_LMDB)) + return() +endif() + +include(LMDB) + +include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) + +file(GLOB SOURCES "*.cpp") + +add_minifi_library(minifi-lmdb SHARED ${SOURCES}) + +target_include_directories(minifi-lmdb PUBLIC ${LMDB_INCLUDE_DIR}) +target_link_libraries(minifi-lmdb PUBLIC lmdb) +target_link_libraries(minifi-lmdb PUBLIC minifi-api minifi-extension-framework) +target_link_libraries(minifi-lmdb PRIVATE $) + +register_extension(minifi-lmdb "LMDB" LMDB "This Enables persistent provenance, flowfile, and content repositories using LMDB" "extensions/lmdb/tests") diff --git a/extensions/lmdb/LmdbContentRepository.cpp b/extensions/lmdb/LmdbContentRepository.cpp new file mode 100644 index 0000000000..a6eeeb45f7 --- /dev/null +++ b/extensions/lmdb/LmdbContentRepository.cpp @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "LmdbContentRepository.h" + +#include +#include +#include +#include + +#include "LmdbStream.h" +#include "core/Resource.h" +#include "lmdb.h" +#include "minifi-cpp/Exception.h" +#include "minifi-cpp/utils/gsl.h" +#include "utils/Locations.h" + +namespace org::apache::nifi::minifi::core::repository { + +LmdbContentRepository::Session::Session(std::shared_ptr repository) : BufferedContentSession(std::move(repository)) {} + +void LmdbContentRepository::Session::commit() { + auto lmdb_content_repository = std::dynamic_pointer_cast(repository_); + if (!lmdb_content_repository) { throw Exception(REPOSITORY_EXCEPTION, "Session's repository is not an LmdbContentRepository"); } + + const auto writeResource = [&lmdb_content_repository](const std::shared_ptr& resource_claim, const std::shared_ptr& stream, bool is_append) { + auto outStream = lmdb_content_repository->write(*resource_claim, is_append); + if (outStream == nullptr) { throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for write: " + resource_claim->getContentFullPath()); } + const auto size = stream->size(); + if (outStream->write(stream->getBuffer()) != size) { + throw Exception(REPOSITORY_EXCEPTION, "Failed to write " + std::string(is_append ? "appended" : "new") + " resource: " + resource_claim->getContentFullPath()); + } + auto lmdb_out_stream = std::dynamic_pointer_cast(outStream); + if (lmdb_out_stream == nullptr) { throw Exception(REPOSITORY_EXCEPTION, "Couldn't cast output stream to LmdbStream for commit: " + resource_claim->getContentFullPath()); } + if (!lmdb_out_stream->commit()) { throw Exception(REPOSITORY_EXCEPTION, "Failed to commit " + std::string(is_append ? "appended" : "new") + " resource: " + resource_claim->getContentFullPath()); } + }; + + for (const auto& resource : managed_resources_) { + writeResource(resource.first, resource.second, false); + } + + for (const auto& resource : append_state_) { + writeResource(resource.first, resource.second.stream, true); + } + + managed_resources_.clear(); + append_state_.clear(); +} + +bool LmdbContentRepository::initialize(const std::shared_ptr& configuration) { + if (const int rc = mdb_env_create(&lmdb_env_)) { + logger_->log_error("Failed to create LMDB environment: {}", mdb_strerror(rc)); + return false; + } + + // Reserve virtual address space for the DB file (max size it can grow to) + const auto max_db_size = configuration->get(Configure::nifi_content_repository_lmdb_max_db_size) | utils::andThen([](auto max_db_size_str) -> std::optional { + if (max_db_size_str.empty()) { return std::nullopt; } + return parsing::parseDataSize(max_db_size_str) | utils::orThrow(fmt::format("{} was set to invalid value: '{}'", Configure::nifi_content_repository_lmdb_max_db_size, max_db_size_str)); + }) | utils::orElse([] { + // Default to 10 GB if the property is not set + return std::make_optional(10ULL * 1024 * 1024 * 1024); + }); + + if (!max_db_size) { + logger_->log_error("Invalid max DB size configuration for LMDB Content Repository"); + mdb_env_close(lmdb_env_); + return false; + } + + logger_->log_info("Setting LMDB max DB size to {} bytes", *max_db_size); + mdb_env_set_mapsize(lmdb_env_, gsl::narrow(*max_db_size)); + + const auto working_dir = utils::getMinifiDir(); + + std::string value; + if (configuration->get(Configure::nifi_dbcontent_repository_directory_default, value) && !value.empty()) { + directory_ = value; + } else { + directory_ = (working_dir / "lmdbcontentrepository").string(); + } + + if (std::filesystem::exists(directory_)) { + logger_->log_info("Using existing LMDB Content Repository directory at {}", directory_); + } else { + logger_->log_info("Creating LMDB Content Repository directory at {}", directory_); + if (!std::filesystem::create_directories(directory_)) { + logger_->log_error("Failed to create LMDB Content Repository directory at {}", directory_); + return false; + } + } + + if (const int rc = mdb_env_open(lmdb_env_, directory_.c_str(), MDB_NOTLS, 0664)) { + logger_->log_error("Failed to open LMDB environment: {}", mdb_strerror(rc)); + return false; + } + + MDB_txn* init_txn = nullptr; + if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, 0, &init_txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to begin LMDB transaction during initialize: {}", mdb_strerror(rc)); + mdb_env_close(lmdb_env_); + return false; + } + if (const int rc = mdb_dbi_open(init_txn, nullptr, 0, &lmdb_handle_); rc != MDB_SUCCESS) { + logger_->log_error("Failed to open LMDB database: {}", mdb_strerror(rc)); + mdb_txn_abort(init_txn); + mdb_env_close(lmdb_env_); + return false; + } + + if (const int rc = mdb_txn_commit(init_txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to commit LMDB transaction during initialize: {}", mdb_strerror(rc)); + mdb_env_close(lmdb_env_); + return false; + } + + return true; +} + +void LmdbContentRepository::start() {} +void LmdbContentRepository::stop() {} + +std::shared_ptr LmdbContentRepository::createSession() { + return std::make_shared(sharedFromThis()); +} + +std::shared_ptr LmdbContentRepository::write(const minifi::ResourceClaim& claim, bool) { + return std::make_shared(claim.getContentFullPath(), lmdb_env_, &lmdb_handle_, true); +} + +std::shared_ptr LmdbContentRepository::read(const minifi::ResourceClaim& claim) { + return std::make_shared(claim.getContentFullPath(), lmdb_env_, &lmdb_handle_, false); +} + +bool LmdbContentRepository::exists(const minifi::ResourceClaim& streamId) { + const auto path = streamId.getContentFullPath(); + MDB_val key{path.size(), const_cast(path.data())}; + MDB_val value{}; + + MDB_txn* txn = nullptr; + if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to begin LMDB read transaction in exists: {}", mdb_strerror(rc)); + return false; + } + auto guard = gsl::finally([txn] { mdb_txn_abort(txn); }); + + const auto rc = mdb_get(txn, lmdb_handle_, &key, &value); + if (rc != MDB_SUCCESS && rc != MDB_NOTFOUND) { + logger_->log_error("Failed to get value from LMDB database: {}", mdb_strerror(rc)); + } + return rc == MDB_SUCCESS; +} + +bool LmdbContentRepository::removeKey(const std::string& content_path) { + MDB_val key{content_path.size(), const_cast(content_path.data())}; + + MDB_txn* txn = nullptr; + if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, 0, &txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to begin LMDB write transaction in removeKey: {}", mdb_strerror(rc)); + return false; + } + int rc = mdb_del(txn, lmdb_handle_, &key, nullptr); + + if (rc == MDB_SUCCESS) { + if (const int rc = mdb_txn_commit(txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to commit LMDB transaction during delete: {}", mdb_strerror(rc)); + return false; + } + return true; + } else if (rc == MDB_NOTFOUND) { + logger_->log_debug("Key {} not found in LMDB database during delete", content_path); + mdb_txn_abort(txn); + return true; + } else { + logger_->log_error("Failed to delete key '{}' from LMDB database: {}", content_path, mdb_strerror(rc)); + mdb_txn_abort(txn); + return false; + } +} + +void LmdbContentRepository::clearOrphans() { + std::vector keys_to_be_deleted; + + MDB_txn* txn = nullptr; + if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to begin LMDB read transaction in clearOrphans: {}", mdb_strerror(rc)); + return; + } + + MDB_cursor* cursor = nullptr; + if (const int rc = mdb_cursor_open(txn, lmdb_handle_, &cursor); rc != MDB_SUCCESS) { + logger_->log_error("Failed to open LMDB cursor in clearOrphans: {}", mdb_strerror(rc)); + mdb_txn_abort(txn); + return; + } + + MDB_val key{}; + MDB_val val{}; + int rc = mdb_cursor_get(cursor, &key, &val, MDB_FIRST); + + while (rc == MDB_SUCCESS) { + std::string key_string = std::string(static_cast(key.mv_data), key.mv_size); + + std::lock_guard lock(count_map_mutex_); + auto claim_it = count_map_.find(key_string); + if (claim_it == count_map_.end() || claim_it->second == 0) { + logger_->log_error("Deleting orphan resource {}", key_string); + keys_to_be_deleted.push_back(key_string); + } + rc = mdb_cursor_get(cursor, &key, &val, MDB_NEXT); + } + + mdb_cursor_close(cursor); + mdb_txn_abort(txn); + + if (rc != MDB_NOTFOUND) { + logger_->log_error("Failed to iterate over LMDB database: {}", mdb_strerror(rc)); + return; + } + + std::vector failed_deletions; + for (const auto& key : keys_to_be_deleted) { + auto delete_result = removeKey(key); + if (!delete_result) { + logger_->log_warn("Failed to delete orphan resource {} from LMDB database", key); + failed_deletions.push_back(key); + } + } + + std::lock_guard lock(purge_list_mutex_); + purge_list_.insert(purge_list_.end(), std::make_move_iterator(failed_deletions.begin()), std::make_move_iterator(failed_deletions.end())); +} + +MDB_stat LmdbContentRepository::getDbStat() const { + MDB_stat stat{}; + MDB_txn* txn = nullptr; + if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to begin LMDB read transaction in getDbStat: {}", mdb_strerror(rc)); + return stat; + } + if (const int rc = mdb_stat(txn, lmdb_handle_, &stat); rc != MDB_SUCCESS) { + logger_->log_error("Failed to read LMDB database stats: {}", mdb_strerror(rc)); + } + mdb_txn_abort(txn); + return stat; +} + +uint64_t LmdbContentRepository::getRepositorySize() const { + const auto stat = getDbStat(); + return stat.ms_psize * (stat.ms_branch_pages + stat.ms_leaf_pages + stat.ms_overflow_pages); +} + +uint64_t LmdbContentRepository::getRepositoryEntryCount() const { + return getDbStat().ms_entries; +} + +REGISTER_RESOURCE_AS(LmdbContentRepository, InternalResource, ("LmdbContentRepository", "lmdbcontentrepository")); + +} // namespace org::apache::nifi::minifi::core::repository diff --git a/extensions/lmdb/LmdbContentRepository.h b/extensions/lmdb/LmdbContentRepository.h new file mode 100644 index 0000000000..754b35b8bd --- /dev/null +++ b/extensions/lmdb/LmdbContentRepository.h @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include "core/BufferedContentSession.h" +#include "core/ContentRepository.h" +#include "core/ClassName.h" +#include "core/logging/LoggerFactory.h" +#include "minifi-cpp/core/PropertyDefinition.h" +#include "minifi-cpp/utils/Export.h" +#include "minifi-cpp/utils/Id.h" +#include "lmdb.h" + +namespace org::apache::nifi::minifi::core::repository { + +class LmdbContentRepository : public core::ContentRepositoryImpl { + public: + explicit LmdbContentRepository(std::string_view name = className(), const utils::Identifier& uuid = {}) + : core::ContentRepositoryImpl(name, uuid) {} + + ~LmdbContentRepository() override { + stop(); + mdb_dbi_close(lmdb_env_, lmdb_handle_); + mdb_env_close(lmdb_env_); + } + + EXTENSIONAPI static constexpr auto Properties = std::array{}; + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; + EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; + + class Session : public BufferedContentSession { + public: + explicit Session(std::shared_ptr repository); + + void commit() override; + }; + + std::shared_ptr createSession() override; + bool initialize(const std::shared_ptr& configuration) override; + std::shared_ptr write(const minifi::ResourceClaim& claim, bool append = false) override; + std::shared_ptr read(const minifi::ResourceClaim& claim) override; + + bool close(const minifi::ResourceClaim& claim) override { return remove(claim); } + + bool exists(const minifi::ResourceClaim& streamId) override; + + void clearOrphans() override; + + void start() override; + void stop() override; + + uint64_t getRepositorySize() const override; + uint64_t getRepositoryEntryCount() const override; + + protected: + bool removeKey(const std::string& content_path) override; + + private: + MDB_stat getDbStat() const; + + MDB_env* lmdb_env_{nullptr}; + MDB_dbi lmdb_handle_{}; + std::shared_ptr logger_{logging::LoggerFactory::getLogger()}; +}; + +} // namespace org::apache::nifi::minifi::core::repository diff --git a/extensions/lmdb/LmdbStream.cpp b/extensions/lmdb/LmdbStream.cpp new file mode 100644 index 0000000000..e707da3446 --- /dev/null +++ b/extensions/lmdb/LmdbStream.cpp @@ -0,0 +1,116 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "LmdbStream.h" + +#include +#include + +#include "io/validation.h" + +namespace org::apache::nifi::minifi::io { + +LmdbStream::LmdbStream(std::string path, MDB_env* lmdb_env, MDB_dbi* lmdb_handle, bool write_enable) + : BaseStreamImpl(), + path_(std::move(path)), + write_enable_(write_enable), + lmdb_env_(lmdb_env), + lmdb_handle_(lmdb_handle), + exists_(loadValue()) {} + +bool LmdbStream::loadValue() { + MDB_val key{path_.size(), const_cast(path_.data())}; + MDB_val value{}; + + MDB_txn* txn = nullptr; + if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to begin LMDB read transaction in loadValue: {}", mdb_strerror(rc)); + return false; + } + auto guard = gsl::finally([txn] { mdb_txn_abort(txn); }); + + const auto rc = mdb_get(txn, *lmdb_handle_, &key, &value); + if (rc == MDB_SUCCESS) { + value_ = std::string(static_cast(value.mv_data), value.mv_size); + return true; + } else if (rc != MDB_NOTFOUND) { + logger_->log_error("Failed to get value from LMDB database: {}", mdb_strerror(rc)); + } + return false; +} + +void LmdbStream::close() { + commit(); +} + +bool LmdbStream::commit() { + if (!write_enable_ || !dirty_) { return false; } + dirty_ = false; + + MDB_txn* txn = nullptr; + auto rc = mdb_txn_begin(lmdb_env_, nullptr, 0, &txn); + if (rc != MDB_SUCCESS) { + logger_->log_error("Failed to begin LMDB transaction in close: {}", mdb_strerror(rc)); + return false; + } + + MDB_val key{path_.size(), const_cast(path_.data())}; + MDB_val val{value_.size(), const_cast(value_.data())}; + rc = mdb_put(txn, *lmdb_handle_, &key, &val, 0); + if (rc != MDB_SUCCESS) { + logger_->log_error("Failed to put value in LMDB database during close: {}", mdb_strerror(rc)); + mdb_txn_abort(txn); + return false; + } + + rc = mdb_txn_commit(txn); + if (rc != MDB_SUCCESS) { + logger_->log_error("Failed to commit LMDB transaction during close: {}", mdb_strerror(rc)); + return false; + } + return true; +} + +void LmdbStream::seek(size_t offset) { + offset_ = offset; +} + +size_t LmdbStream::tell() const { + return offset_; +} + +size_t LmdbStream::write(const uint8_t* value, size_t size) { + if (!write_enable_) { return STREAM_ERROR; } + if (size != 0 && IsNullOrEmpty(value)) { return STREAM_ERROR; } + value_.append(reinterpret_cast(value), size); + dirty_ = true; + return size; +} + +size_t LmdbStream::read(std::span buf) { + if (!exists_) { return STREAM_ERROR; } + if (buf.empty()) { return 0; } + if (offset_ >= value_.size()) { return 0; } + + const auto bytes_to_read = std::min(buf.size(), value_.size() - offset_); + std::memcpy(buf.data(), value_.data() + offset_, bytes_to_read); + offset_ += bytes_to_read; + return bytes_to_read; +} + +} // namespace org::apache::nifi::minifi::io diff --git a/extensions/lmdb/LmdbStream.h b/extensions/lmdb/LmdbStream.h new file mode 100644 index 0000000000..30b72ef147 --- /dev/null +++ b/extensions/lmdb/LmdbStream.h @@ -0,0 +1,66 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +#include "core/logging/LoggerFactory.h" +#include "io/BaseStream.h" +#include "lmdb.h" + +namespace org::apache::nifi::minifi::io { + +class LmdbStream : public io::BaseStreamImpl { + public: + explicit LmdbStream(std::string path, MDB_env* lmdb_env, MDB_dbi* lmdb_handle, bool write_enable = false); + + ~LmdbStream() override { close(); } + + void close() final; + void seek(size_t offset) override; + + size_t tell() const override; + + size_t size() const override { return value_.size(); } + + using BaseStream::read; + using BaseStream::write; + + size_t read(std::span buf) override; + size_t write(const uint8_t* value, size_t size) override; + + bool commit(); + + private: + bool loadValue(); + + std::string path_; + bool write_enable_; + std::string value_; + MDB_env* lmdb_env_; + MDB_dbi* lmdb_handle_; + bool exists_; + size_t offset_ = 0; + bool dirty_ = false; + + std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(); +}; + +} // namespace org::apache::nifi::minifi::io diff --git a/extensions/lmdb/tests/CMakeLists.txt b/extensions/lmdb/tests/CMakeLists.txt new file mode 100644 index 0000000000..918f946f38 --- /dev/null +++ b/extensions/lmdb/tests/CMakeLists.txt @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +file(GLOB LMDB_UNIT_TESTS "*.cpp") +SET(LMDB_TEST_COUNT 0) +FOREACH(testfile ${LMDB_UNIT_TESTS}) + get_filename_component(testfilename "${testfile}" NAME_WE) + add_minifi_executable("${testfilename}" "${testfile}") + target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/lmdb/") + target_include_directories(${testfilename} SYSTEM BEFORE PRIVATE "${LMDB_THIRDPARTY_ROOT}/include") + createTests("${testfilename}") + target_link_libraries(${testfilename} Catch2WithMain) + target_link_libraries(${testfilename} minifi-lmdb) + target_link_libraries(${testfilename} minifi-standard-processors) + MATH(EXPR LMDB_TEST_COUNT "${LMDB_TEST_COUNT}+1") + add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR}) +ENDFOREACH() +message("-- Finished building ${LMDB_TEST_COUNT} LMDB related test file(s)...") diff --git a/extensions/lmdb/tests/LmdbContentRepositoryTests.cpp b/extensions/lmdb/tests/LmdbContentRepositoryTests.cpp new file mode 100644 index 0000000000..17af0c6fd2 --- /dev/null +++ b/extensions/lmdb/tests/LmdbContentRepositoryTests.cpp @@ -0,0 +1,241 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "LmdbContentRepository.h" +#include "ResourceClaim.h" +#include "properties/Configure.h" +#include "unit/Catch.h" +#include "unit/ContentRepositoryDependentTests.h" +#include "unit/TestBase.h" + +namespace org::apache::nifi::minifi::test { + +class LmdbContentRepositoryTests : TestController { + public: + LmdbContentRepositoryTests() { + auto configuration = std::make_shared(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, createTempDirectory().string()); + REQUIRE(content_repo_->initialize(configuration)); + } + + protected: + static constexpr std::string_view test_content_ = "well hello there"; + std::shared_ptr content_repo_ = std::make_shared(); + + void writeContent(const minifi::ResourceClaim& claim) { + auto stream = content_repo_->write(claim); + stream->write(as_bytes(std::span(test_content_))); + stream->close(); + } +}; + +TEST_CASE("Invalid or empty dbsize configuration value is set", "[lmdb]") { + TestController controller; + LogTestController::getInstance().setDebug(); + auto db_path = controller.createTempDirectory().string(); + auto configuration = std::make_shared(); + auto content_repo = std::make_shared(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, db_path); + SECTION("Invalid value") { + configuration->set(minifi::Configure::nifi_content_repository_lmdb_max_db_size, "invalid"); + REQUIRE_THROWS_WITH(content_repo->initialize(configuration), + "nifi.content.repository.lmdb.max.db.size was set to invalid value: 'invalid', but got GeneralParsingError (Parsing Error:0)"); + } + SECTION("Empty value") { + configuration->set(minifi::Configure::nifi_content_repository_lmdb_max_db_size, ""); + REQUIRE(content_repo->initialize(configuration)); + REQUIRE(LogTestController::getInstance().contains("Setting LMDB max DB size to 10737418240 bytes")); + } +} + +TEST_CASE("Valid dbsize configuration value is set", "[lmdb]") { + TestController controller; + LogTestController::getInstance().setDebug(); + auto configuration = std::make_shared(); + auto content_repo = std::make_shared(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, controller.createTempDirectory().string()); + configuration->set(minifi::Configure::nifi_content_repository_lmdb_max_db_size, "100 MB"); + REQUIRE(content_repo->initialize(configuration)); + REQUIRE(LogTestController::getInstance().contains("Setting LMDB max DB size to 104857600 bytes")); +} + +TEST_CASE("Initialize succeeds when target directory already exists", "[lmdb]") { + TestController controller; + auto db_path = controller.createTempDirectory(); + REQUIRE(std::filesystem::exists(db_path)); + auto configuration = std::make_shared(); + auto content_repo = std::make_shared(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, db_path.string()); + REQUIRE(content_repo->initialize(configuration)); +} + +TEST_CASE_METHOD(LmdbContentRepositoryTests, "Key does not exist in empty database", "[lmdb]") { + auto claim = std::make_shared(content_repo_); + REQUIRE_FALSE(content_repo_->exists(*claim)); +} + +TEST_CASE_METHOD(LmdbContentRepositoryTests, "Written value exists", "[lmdb]") { + auto claim = std::make_shared(content_repo_); + writeContent(*claim); + REQUIRE(content_repo_->exists(*claim)); +} + +TEST_CASE_METHOD(LmdbContentRepositoryTests, "exists returns false for unrelated claim when other content was written", "[lmdb]") { + auto written_claim = std::make_shared(content_repo_); + writeContent(*written_claim); + auto unrelated_claim = std::make_shared(content_repo_); + REQUIRE(content_repo_->exists(*written_claim)); + REQUIRE_FALSE(content_repo_->exists(*unrelated_claim)); +} + +TEST_CASE_METHOD(LmdbContentRepositoryTests, "Empty content claim does not exist", "[lmdb]") { + auto claim = std::make_shared(content_repo_); + auto write_stream = content_repo_->write(*claim); + write_stream->close(); + REQUIRE_FALSE(content_repo_->exists(*claim)); +} + +TEST_CASE_METHOD(LmdbContentRepositoryTests, "Multiple claims coexist independently", "[lmdb]") { + auto claim1 = std::make_shared(content_repo_); + auto claim2 = std::make_shared(content_repo_); + auto claim3 = std::make_shared(content_repo_); + writeContent(*claim1); + writeContent(*claim2); + writeContent(*claim3); + REQUIRE(content_repo_->exists(*claim1)); + REQUIRE(content_repo_->exists(*claim2)); + REQUIRE(content_repo_->exists(*claim3)); + REQUIRE(content_repo_->getRepositoryEntryCount() == 3); + REQUIRE(content_repo_->remove(*claim2)); + REQUIRE(content_repo_->exists(*claim1)); + REQUIRE_FALSE(content_repo_->exists(*claim2)); + REQUIRE(content_repo_->exists(*claim3)); + REQUIRE(content_repo_->getRepositoryEntryCount() == 2); +} + +TEST_CASE_METHOD(LmdbContentRepositoryTests, "Reading nonexistent claim returns STREAM_ERROR", "[lmdb]") { + auto claim = std::make_shared(content_repo_); + auto read_stream = content_repo_->read(*claim); + std::vector buffer(8); + REQUIRE(minifi::io::isError(read_stream->read(as_writable_bytes(std::span(buffer))))); +} + +TEST_CASE_METHOD(LmdbContentRepositoryTests, "Read written value", "[lmdb]") { + auto claim = std::make_shared(content_repo_); + writeContent(*claim); + auto read_stream = content_repo_->read(*claim); + std::vector buffer(test_content_.size()); + auto bytes_read = read_stream->read(as_writable_bytes(std::span(buffer))); + read_stream->close(); + REQUIRE(bytes_read == test_content_.size()); + REQUIRE(std::string_view(reinterpret_cast(buffer.data()), buffer.size()) == test_content_); +} + +TEST_CASE_METHOD(LmdbContentRepositoryTests, "Removing a nonexistent claim succeeds", "[lmdb]") { + auto claim = std::make_shared(content_repo_); + REQUIRE_FALSE(content_repo_->exists(*claim)); + REQUIRE(content_repo_->remove(*claim)); +} + +TEST_CASE_METHOD(LmdbContentRepositoryTests, "Removing an existing value", "[lmdb]") { + auto claim = std::make_shared(content_repo_); + writeContent(*claim); + REQUIRE(content_repo_->exists(*claim)); + REQUIRE(content_repo_->remove(*claim)); + REQUIRE_FALSE(content_repo_->exists(*claim)); +} + +TEST_CASE_METHOD(LmdbContentRepositoryTests, "clearOrphans is a no-op on an empty repository", "[lmdb]") { + REQUIRE(content_repo_->getRepositoryEntryCount() == 0); + content_repo_->clearOrphans(); + REQUIRE(content_repo_->getRepositoryEntryCount() == 0); +} + +TEST_CASE_METHOD(LmdbContentRepositoryTests, "Clear orphan values", "[lmdb]") { + auto claim = std::make_shared(content_repo_); + writeContent(*claim); + REQUIRE(content_repo_->exists(*claim)); + content_repo_->reset(); + content_repo_->clearOrphans(); + REQUIRE_FALSE(content_repo_->exists(*claim)); +} + +TEST_CASE_METHOD(LmdbContentRepositoryTests, "Empty repository reports zero size and entry count", "[lmdb]") { + REQUIRE(content_repo_->getRepositoryEntryCount() == 0); + REQUIRE(content_repo_->getRepositorySize() == 0); +} + +TEST_CASE_METHOD(LmdbContentRepositoryTests, "Written value updates repository stats", "[lmdb]") { + auto claim = std::make_shared(content_repo_); + writeContent(*claim); + REQUIRE(content_repo_->getRepositoryEntryCount() == 1); + REQUIRE(content_repo_->getRepositorySize() > 0); +} + +TEST_CASE("Content persists across LmdbContentRepository re-initialization", "[lmdb]") { + TestController controller; + auto db_path = controller.createTempDirectory(); + auto configuration = std::make_shared(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, db_path.string()); + + std::string claim_path; + static constexpr std::string_view content = "persisted content"; + { + auto content_repo = std::make_shared(); + REQUIRE(content_repo->initialize(configuration)); + auto claim = std::make_shared(content_repo); + claim_path = claim->getContentFullPath(); + auto stream = content_repo->write(*claim); + stream->write(as_bytes(std::span(content))); + stream->close(); + // ensure the content is not deleted on resource claim destruction + content_repo->incrementStreamCount(*claim); + } + + auto reopened_repo = std::make_shared(); + REQUIRE(reopened_repo->initialize(configuration)); + auto reopened_claim = std::make_shared(claim_path, reopened_repo); + REQUIRE(reopened_repo->exists(*reopened_claim)); + auto read_stream = reopened_repo->read(*reopened_claim); + std::vector buffer(content.size()); + REQUIRE(read_stream->read(as_writable_bytes(std::span(buffer))) == content.size()); + REQUIRE(std::string_view(reinterpret_cast(buffer.data()), buffer.size()) == content); +} + +TEST_CASE("ProcessSession::read reads the flowfile from offset to size", "[lmdb]") { + ContentRepositoryDependentTests::testReadOnSmallerClonedFlowFiles(std::make_shared()); +} + +TEST_CASE("ProcessSession::append should append to the flowfile and set its size correctly", "[lmdb]") { + ContentRepositoryDependentTests::testAppendToUnmanagedFlowFile(std::make_shared()); + ContentRepositoryDependentTests::testAppendToManagedFlowFile(std::make_shared()); +} + +TEST_CASE("ProcessSession::read can read zero length flowfiles without crash", "[lmdb]") { + ContentRepositoryDependentTests::testReadFromZeroLengthFlowFile(std::make_shared()); +} + +TEST_CASE("ProcessSession::write can be cancelled", "[lmdb]") { + ContentRepositoryDependentTests::testOkWrite(std::make_shared()); + ContentRepositoryDependentTests::testErrWrite(std::make_shared()); + ContentRepositoryDependentTests::testCancelWrite(std::make_shared()); +} + +} // namespace org::apache::nifi::minifi::test diff --git a/extensions/lmdb/tests/LmdbContentSessionTests.cpp b/extensions/lmdb/tests/LmdbContentSessionTests.cpp new file mode 100644 index 0000000000..f504469934 --- /dev/null +++ b/extensions/lmdb/tests/LmdbContentSessionTests.cpp @@ -0,0 +1,301 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include "LmdbContentRepository.h" +#include "catch2/matchers/catch_matchers_string.hpp" +#include "core/repository/VolatileContentRepository.h" +#include "unit/Catch.h" +#include "unit/TestBase.h" +#include "utils/ConfigurationUtils.h" + +namespace org::apache::nifi::minifi::test { + +class LmdbContentSessionController : public TestController { + public: + LmdbContentSessionController() : content_repository_(std::make_shared()) { + auto content_repo_path = createTempDirectory(); + auto config = std::make_shared(); + config->set(Configure::nifi_dbcontent_repository_directory_default, content_repo_path.string()); + content_repository_->initialize(config); + } + + ~LmdbContentSessionController() override { log.reset(); } + + LmdbContentSessionController(LmdbContentSessionController&&) = delete; + LmdbContentSessionController(const LmdbContentSessionController&) = delete; + LmdbContentSessionController& operator=(LmdbContentSessionController&&) = delete; + LmdbContentSessionController& operator=(const LmdbContentSessionController&) = delete; + + std::shared_ptr content_repository_; +}; + +namespace { + +std::shared_ptr operator<<(std::shared_ptr stream, const std::string& str) { + REQUIRE(stream->write(reinterpret_cast(str.data()), str.length()) == str.length()); + return stream; +} + +std::shared_ptr operator>>(std::shared_ptr stream, std::string& str) { + str.clear(); + std::array buffer{}; + while (true) { + const auto ret = stream->read(buffer); + REQUIRE_FALSE(io::isError(ret)); + if (ret == 0) { break; } + str += std::string{reinterpret_cast(buffer.data()), ret}; + } + return stream; +} + +void requireNoCreate(const std::shared_ptr&) { + REQUIRE(false); +} + +std::shared_ptr makeCommittedClaim(const std::shared_ptr& content_repository, const std::string& content) { + auto session = content_repository->createSession(); + auto claim = session->create(); + session->write(claim) << content; + session->commit(); + return claim; +} + +} // namespace + +TEST_CASE("Writing a previously committed claim from a new session throws", "[lmdb]") { + LmdbContentSessionController controller; + auto old_claim = makeCommittedClaim(controller.content_repository_, "data"); + + auto session = controller.content_repository_->createSession(); + REQUIRE_THROWS(session->write(old_claim)); +} + +TEST_CASE("Reading a previously committed claim from a new session succeeds", "[lmdb]") { + LmdbContentSessionController controller; + auto old_claim = makeCommittedClaim(controller.content_repository_, "data"); + + auto session = controller.content_repository_->createSession(); + REQUIRE_NOTHROW(session->read(old_claim)); +} + +TEST_CASE("Reading a claim after appending to it in the same session throws", "[lmdb]") { + LmdbContentSessionController controller; + auto old_claim = makeCommittedClaim(controller.content_repository_, "data"); + + auto session = controller.content_repository_->createSession(); + session->append(old_claim, 4, requireNoCreate) << "-addendum"; + REQUIRE_THROWS(session->read(old_claim)); +} + +TEST_CASE("Append with on_copy callback produces a new claim with concatenated content when the append lock is held", "[lmdb]") { + LmdbContentSessionController controller; + auto old_claim = makeCommittedClaim(controller.content_repository_, "data"); + + auto blocking_session = controller.content_repository_->createSession(); + blocking_session->append(old_claim, 4, requireNoCreate) << "-addendum"; + + std::shared_ptr copied_claim; + { + auto other_session = controller.content_repository_->createSession(); + other_session->append(old_claim, 4, [&](auto new_claim) { copied_claim = std::move(new_claim); }) << "-some extra content"; + other_session->commit(); + } + REQUIRE(copied_claim); + + std::string read_content; + read_content.resize(controller.content_repository_->size(*copied_claim)); + controller.content_repository_->read(*copied_claim)->read(as_writable_bytes(std::span(read_content))); + REQUIRE(read_content == "data-some extra content"); +} + +TEST_CASE("Session commits a write to a new claim", "[lmdb]") { + LmdbContentSessionController controller; + + std::shared_ptr claim; + { + auto session = controller.content_repository_->createSession(); + claim = session->create(); + session->write(claim) << "hello content!"; + + std::string buffered_content; + session->read(claim) >> buffered_content; + REQUIRE(buffered_content == "hello content!"); + + session->commit(); + } + + std::string content; + controller.content_repository_->read(*claim) >> content; + REQUIRE(content == "hello content!"); +} + +TEST_CASE("Session commits multiple appends from offsets on a new claim", "[lmdb]") { + LmdbContentSessionController controller; + + std::shared_ptr claim; + { + auto session = controller.content_repository_->createSession(); + claim = session->create(); + session->append(claim, 0, requireNoCreate) << "beginning"; + session->append(claim, 9, requireNoCreate) << "-end"; + session->commit(); + } + + std::string content; + controller.content_repository_->read(*claim) >> content; + REQUIRE(content == "beginning-end"); +} + +TEST_CASE("Session commits a write followed by an append on a new claim", "[lmdb]") { + LmdbContentSessionController controller; + + std::shared_ptr claim; + { + auto session = controller.content_repository_->createSession(); + claim = session->create(); + session->write(claim) << "first"; + session->append(claim, 5, requireNoCreate) << "-last"; + session->commit(); + } + + std::string content; + controller.content_repository_->read(*claim) >> content; + REQUIRE(content == "first-last"); +} + +TEST_CASE("Session commits a write that overwrites an earlier write on the same claim", "[lmdb]") { + LmdbContentSessionController controller; + + std::shared_ptr claim; + { + auto session = controller.content_repository_->createSession(); + claim = session->create(); + session->write(claim) << "beginning"; + session->write(claim) << "overwritten"; + session->commit(); + } + + std::string content; + controller.content_repository_->read(*claim) >> content; + REQUIRE(content == "overwritten"); +} + +TEST_CASE("Session commits an append to a previously committed claim", "[lmdb]") { + LmdbContentSessionController controller; + auto old_claim = makeCommittedClaim(controller.content_repository_, "data"); + + { + auto session = controller.content_repository_->createSession(); + session->append(old_claim, 4, requireNoCreate) << "-addendum"; + session->commit(); + } + + std::string content; + controller.content_repository_->read(*old_claim) >> content; + REQUIRE(content == "data-addendum"); +} + +TEST_CASE("Session rollback discards new claims", "[lmdb]") { + LmdbContentSessionController controller; + std::shared_ptr content_repository = controller.content_repository_; + + std::shared_ptr claim; + { + auto session = content_repository->createSession(); + claim = session->create(); + session->write(claim) << "discarded"; + session->rollback(); + } + REQUIRE_FALSE(content_repository->exists(*claim)); +} + +TEST_CASE("Session rollback leaves a previously committed claim unchanged", "[lmdb]") { + LmdbContentSessionController controller; + auto old_claim = makeCommittedClaim(controller.content_repository_, "data"); + + { + auto session = controller.content_repository_->createSession(); + session->append(old_claim, 4, requireNoCreate) << "-addendum"; + session->rollback(); + } + + std::string content; + controller.content_repository_->read(*old_claim) >> content; + REQUIRE(content == "data"); +} + +TEST_CASE("Buffered session reads buffered content before commit", "[lmdb]") { + LmdbContentSessionController controller; + + auto session = controller.content_repository_->createSession(); + auto claim = session->create(); + session->write(claim) << "uncommitted-data"; + + std::string content; + session->read(claim) >> content; + REQUIRE(content == "uncommitted-data"); + + REQUIRE_FALSE(controller.content_repository_->exists(*claim)); +} + +TEST_CASE("Empty new claim commit does not throw", "[lmdb]") { + LmdbContentSessionController controller; + + auto session = controller.content_repository_->createSession(); + auto claim = session->create(); + REQUIRE_NOTHROW(session->commit()); + + REQUIRE(controller.content_repository_->exists(*claim)); + REQUIRE(controller.content_repository_->size(*claim) == 0); +} + +TEST_CASE("Rollback after a commit on a different session leaves committed content untouched", "[lmdb]") { + LmdbContentSessionController controller; + + std::shared_ptr claim; + { + auto first_session = controller.content_repository_->createSession(); + claim = first_session->create(); + first_session->write(claim) << "committed"; + first_session->commit(); + } + + { + auto rolled_back_session = controller.content_repository_->createSession(); + rolled_back_session->append(claim, 9, requireNoCreate) << "-rolled-back"; + rolled_back_session->rollback(); + } + + std::string content; + controller.content_repository_->read(*claim) >> content; + REQUIRE(content == "committed"); +} + +TEST_CASE("LmdbContentRepository::Session commit throws when underlying repository is not LmdbContentRepository", "[lmdb]") { + auto unrelated_repository = std::make_shared(); + unrelated_repository->initialize(std::make_shared()); + + core::repository::LmdbContentRepository::Session session(unrelated_repository); + REQUIRE_THROWS_WITH(session.commit(), Catch::Matchers::ContainsSubstring("Session's repository is not an LmdbContentRepository")); +} + +} // namespace org::apache::nifi::minifi::test diff --git a/extensions/lmdb/tests/LmdbStreamTests.cpp b/extensions/lmdb/tests/LmdbStreamTests.cpp new file mode 100644 index 0000000000..522a1610f1 --- /dev/null +++ b/extensions/lmdb/tests/LmdbStreamTests.cpp @@ -0,0 +1,271 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "LmdbStream.h" +#include "lmdb.h" +#include "unit/Catch.h" +#include "unit/TestBase.h" + +namespace org::apache::nifi::minifi::test { + +class LmdbStreamTest : TestController { + public: + LmdbStreamTest() : db_path_(createTempDirectory().string()) { + if (const int rc = mdb_env_create(&lmdb_env_)) { + throw std::runtime_error("Failed to create LMDB environment: " + std::string(mdb_strerror(rc))); + } + mdb_env_set_mapsize(lmdb_env_, 100ULL * 1024 * 1024); // 100 MB + + if (const int rc = mdb_env_open(lmdb_env_, db_path_.c_str(), MDB_NOTLS, 0664)) { + throw std::runtime_error("Failed to open LMDB environment " + db_path_ + ": " + std::string(mdb_strerror(rc))); + } + + MDB_txn* init_txn = nullptr; + mdb_txn_begin(lmdb_env_, nullptr, 0, &init_txn); + if (const auto rc = mdb_dbi_open(init_txn, nullptr, 0, &lmdb_handle_); rc != MDB_SUCCESS) { + mdb_txn_abort(init_txn); + mdb_env_close(lmdb_env_); + throw std::runtime_error("Failed to open LMDB database: " + std::string(mdb_strerror(rc))); + } + mdb_txn_commit(init_txn); + } + + LmdbStreamTest(const LmdbStreamTest&) = delete; + LmdbStreamTest& operator=(const LmdbStreamTest&) = delete; + LmdbStreamTest(LmdbStreamTest&&) = delete; + LmdbStreamTest& operator=(LmdbStreamTest&&) = delete; + + ~LmdbStreamTest() override { + mdb_dbi_close(lmdb_env_, lmdb_handle_); + mdb_env_close(lmdb_env_); + } + + std::optional readValue(const std::string& key) { + MDB_val db_key{key.size(), const_cast(key.data())}; + MDB_val db_value{}; + + MDB_txn* txn = nullptr; + mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); + auto guard = gsl::finally([txn] { mdb_txn_abort(txn); }); + const auto result = mdb_get(txn, lmdb_handle_, &db_key, &db_value); + + if (result == MDB_SUCCESS) { return std::string(static_cast(db_value.mv_data), db_value.mv_size); } + return std::nullopt; + } + + protected: + std::string db_path_; + MDB_env* lmdb_env_{nullptr}; + MDB_dbi lmdb_handle_{}; +}; + +namespace { + +size_t writeString(io::LmdbStream& stream, const std::string& content) { + return stream.write(reinterpret_cast(content.data()), content.size()); +} + +std::string bytesToString(const std::vector& buf, size_t len) { + return {reinterpret_cast(buf.data()), len}; +} + +} // namespace + +TEST_CASE_METHOD(LmdbStreamTest, "Simple write tests") { + std::string content; + SECTION("Non-empty value") { + content = "banana"; + } + SECTION("Empty value") { + content = ""; + } + + { + io::LmdbStream stream(db_path_, lmdb_env_, &lmdb_handle_, true); + REQUIRE_FALSE(minifi::io::isError(writeString(stream, content))); + } + auto val = readValue(db_path_); + REQUIRE(val.has_value()); + CHECK(val->size() == content.size()); + CHECK(*val == content); +} + +TEST_CASE_METHOD(LmdbStreamTest, "Multiple write test") { + std::string content = "banana"; + std::string expected_content; + + { + io::LmdbStream stream(db_path_, lmdb_env_, &lmdb_handle_, true); + for (int i = 0; i < 5; ++i) { + REQUIRE_FALSE(minifi::io::isError(writeString(stream, content))); + expected_content += content; + } + } + auto val = readValue(db_path_); + REQUIRE(val.has_value()); + CHECK(val->size() == expected_content.size()); + CHECK(*val == expected_content); +} + +TEST_CASE_METHOD(LmdbStreamTest, "Simple read tests") { + std::string content; + SECTION("Non-empty value") { + content = "banana"; + } + SECTION("Empty value") { + content = ""; + } + + io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, content))); + write_stream.close(); + + io::LmdbStream read_stream(db_path_, lmdb_env_, &lmdb_handle_, false); + std::vector buffer(content.size()); + REQUIRE_FALSE(minifi::io::isError(read_stream.read(buffer))); + REQUIRE(bytesToString(buffer, buffer.size()) == content); +} + +TEST_CASE_METHOD(LmdbStreamTest, "Read in chunks") { + std::string content = "banana"; + io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, content))); + write_stream.close(); + + io::LmdbStream read_stream(db_path_, lmdb_env_, &lmdb_handle_, false); + std::vector buffer(2); + + REQUIRE_FALSE(minifi::io::isError(read_stream.read(buffer))); + REQUIRE(bytesToString(buffer, buffer.size()) == "ba"); + + REQUIRE_FALSE(minifi::io::isError(read_stream.read(buffer))); + REQUIRE(bytesToString(buffer, buffer.size()) == "na"); + + std::vector buffer2(5); + const auto read_result = read_stream.read(buffer2); + REQUIRE_FALSE(minifi::io::isError(read_result)); + REQUIRE(bytesToString(buffer2, read_result) == "na"); +} + +TEST_CASE_METHOD(LmdbStreamTest, "Reading a nonexistent key returns STREAM_ERROR") { + io::LmdbStream read_stream(db_path_, lmdb_env_, &lmdb_handle_, false); + std::vector buffer(8); + REQUIRE(minifi::io::isError(read_stream.read(buffer))); +} + +TEST_CASE_METHOD(LmdbStreamTest, "Reading after EOF returns zero") { + io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, "abc"))); + write_stream.close(); + + io::LmdbStream read_stream(db_path_, lmdb_env_, &lmdb_handle_, false); + std::vector buffer(3); + REQUIRE(read_stream.read(buffer) == 3); + REQUIRE(read_stream.read(buffer) == 0); +} + +TEST_CASE_METHOD(LmdbStreamTest, "Reading into an empty buffer returns zero") { + io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, "abc"))); + write_stream.close(); + + io::LmdbStream read_stream(db_path_, lmdb_env_, &lmdb_handle_, false); + std::vector empty_buffer; + REQUIRE(read_stream.read(empty_buffer) == 0); +} + +TEST_CASE_METHOD(LmdbStreamTest, "seek and tell control read offset") { + io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, "banana"))); + write_stream.close(); + + io::LmdbStream read_stream(db_path_, lmdb_env_, &lmdb_handle_, false); + read_stream.seek(2); + REQUIRE(read_stream.tell() == 2); + std::vector buffer(3); + REQUIRE(read_stream.read(buffer) == 3); + REQUIRE(bytesToString(buffer, buffer.size()) == "nan"); + REQUIRE(read_stream.tell() == 5); +} + +TEST_CASE_METHOD(LmdbStreamTest, "size reflects buffered writes before commit") { + io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + REQUIRE(write_stream.size() == 0); + REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, "hello"))); + REQUIRE(write_stream.size() == 5); + REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, "!"))); + REQUIRE(write_stream.size() == 6); +} + +TEST_CASE_METHOD(LmdbStreamTest, "Writing to a read-only stream returns STREAM_ERROR") { + io::LmdbStream read_stream(db_path_, lmdb_env_, &lmdb_handle_, false); + REQUIRE(minifi::io::isError(writeString(read_stream, "anything"))); +} + +TEST_CASE_METHOD(LmdbStreamTest, "Writing nullptr with a non-zero length returns STREAM_ERROR") { + io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + REQUIRE(minifi::io::isError(write_stream.write(static_cast(nullptr), 4))); +} + +TEST_CASE_METHOD(LmdbStreamTest, "Writing zero bytes is a no-op without error") { + io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + const std::array dummy{}; + REQUIRE(write_stream.write(dummy.data(), 0) == 0); + REQUIRE(write_stream.size() == 0); +} + +TEST_CASE_METHOD(LmdbStreamTest, "commit on a read-only stream returns false") { + io::LmdbStream read_stream(db_path_, lmdb_env_, &lmdb_handle_, false); + REQUIRE_FALSE(read_stream.commit()); +} + +TEST_CASE_METHOD(LmdbStreamTest, "Repeated commit is a no-op after the first") { + io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, "banana"))); + REQUIRE(write_stream.commit()); + REQUIRE_FALSE(write_stream.commit()); +} + +TEST_CASE_METHOD(LmdbStreamTest, "Destructor commits buffered writes") { + { + io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, "destroyed-write"))); + // Stream goes out of scope here; the destructor must commit + } + auto val = readValue(db_path_); + REQUIRE(val.has_value()); + REQUIRE(*val == "destroyed-write"); +} + +TEST_CASE_METHOD(LmdbStreamTest, "Reopening an existing key in write mode appends to existing value") { + { + io::LmdbStream write_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + REQUIRE_FALSE(minifi::io::isError(writeString(write_stream, "first-"))); + } + + io::LmdbStream reopened_stream(db_path_, lmdb_env_, &lmdb_handle_, true); + REQUIRE(reopened_stream.size() == 6); + REQUIRE_FALSE(minifi::io::isError(writeString(reopened_stream, "second"))); + REQUIRE(reopened_stream.commit()); + + auto val = readValue(db_path_); + REQUIRE(val.has_value()); + REQUIRE(*val == "first-second"); +} + +} // namespace org::apache::nifi::minifi::test diff --git a/extensions/rocksdb-repos/tests/ContentSessionTests.cpp b/extensions/rocksdb-repos/tests/ContentSessionTests.cpp index f1e5d9d196..43e74a5dc3 100644 --- a/extensions/rocksdb-repos/tests/ContentSessionTests.cpp +++ b/extensions/rocksdb-repos/tests/ContentSessionTests.cpp @@ -34,14 +34,14 @@ template class ContentSessionController : public TestController { public: ContentSessionController() - : contentRepository(std::make_shared()) { + : content_repository_(std::make_shared()) { auto contentRepoPath = createTempDirectory(); auto config = std::make_shared(); config->set(minifi::Configure::nifi_dbcontent_repository_directory_default, contentRepoPath.string()); - contentRepository->initialize(config); + content_repository_->initialize(config); } - ~ContentSessionController() { + ~ContentSessionController() override { log.reset(); } @@ -50,7 +50,7 @@ class ContentSessionController : public TestController { ContentSessionController& operator=(ContentSessionController&&) = delete; ContentSessionController& operator=(const ContentSessionController&) = delete; - std::shared_ptr contentRepository; + std::shared_ptr content_repository_; }; std::shared_ptr operator<<(std::shared_ptr stream, const std::string& str) { @@ -80,7 +80,7 @@ void NO_CREATE(const std::shared_ptr&) { template void test_template() { ContentSessionController controller; - std::shared_ptr contentRepository = controller.contentRepository; + std::shared_ptr contentRepository = controller.content_repository_; std::shared_ptr oldClaim; { diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index 17dba361b0..491627d075 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -55,6 +55,7 @@ const std::unordered_map createContentRepository(const std::string& co logger.log_critical("Could not create the configured content repository ({})", configuration_class_name); if (class_name_lc == "databasecontentrepository") { logger.log_error("To use DatabaseContentRepository MiNiFi needs RocksDB extension, please check the extension path configured in minifi.properties"); + } else if (class_name_lc == "lmdbcontentrepository") { + logger.log_error("To use LmdbContentRepository MiNiFi needs LMDB extension, please check the extension path configured in minifi.properties"); } throw std::runtime_error("Support for the provided configuration class could not be found, check logs for more details"); diff --git a/libminifi/test/libtest/unit/TestBase.h b/libminifi/test/libtest/unit/TestBase.h index 9e55571944..d1e87f743c 100644 --- a/libminifi/test/libtest/unit/TestBase.h +++ b/libminifi/test/libtest/unit/TestBase.h @@ -400,6 +400,7 @@ class TestController { }; TestController(); + virtual ~TestController() = default; std::shared_ptr createPlan(PlanConfig config); diff --git a/minifi-api/include/minifi-cpp/properties/Configuration.h b/minifi-api/include/minifi-cpp/properties/Configuration.h index 44e50ea0f9..e54fe73d2b 100644 --- a/minifi-api/include/minifi-cpp/properties/Configuration.h +++ b/minifi-api/include/minifi-cpp/properties/Configuration.h @@ -78,6 +78,8 @@ class Configuration : public virtual Properties { static constexpr const char *nifi_rocksdb_state_storage_read_verify_checksums = "nifi.rocksdb.state.storage.read.verify.checksums"; static constexpr const char *nifi_dbcontent_optimize_for_small_db_cache_size = "nifi.database.content.repository.optimize.for.small.db.cache.size"; + static constexpr const char *nifi_content_repository_lmdb_max_db_size = "nifi.content.repository.lmdb.max.db.size"; + static constexpr const char *nifi_remote_input_secure = "nifi.remote.input.secure"; static constexpr const char *nifi_security_need_ClientAuth = "nifi.security.need.ClientAuth"; static constexpr const char *nifi_sensitive_props_additional_keys = "nifi.sensitive.props.additional.keys"; diff --git a/thirdparty/lmdb/add-cmake-file.patch b/thirdparty/lmdb/add-cmake-file.patch new file mode 100644 index 0000000000..d60e6d112c --- /dev/null +++ b/thirdparty/lmdb/add-cmake-file.patch @@ -0,0 +1,29 @@ +lmdb only comes with a Makefile which only works on Linux so we create our own CMakeLists.txt file to be platform independent + +diff -rupN orig/libraries/liblmdb/CMakeLists.txt patched/libraries/liblmdb/CMakeLists.txt +--- orig/libraries/liblmdb/CMakeLists.txt 2026-06-12 13:02:00.868592927 +0200 ++++ patched/libraries/liblmdb/CMakeLists.txt 2026-06-12 11:51:51.369241534 +0200 +@@ -0,0 +1,23 @@ ++cmake_minimum_required(VERSION 3.25) ++project(lmdb C) ++ ++if(MSVC) ++ add_compile_options(/W3 /WX-) ++else() ++ add_compile_options(-W -Wall -Wno-unused-parameter -Wbad-function-cast -Wuninitialized -O2 -g) ++endif() ++ ++find_package(Threads REQUIRED) ++ ++add_library(lmdb STATIC mdb.c midl.c) ++target_include_directories(lmdb PUBLIC ++ $ ++ $ ++) ++target_link_libraries(lmdb PUBLIC Threads::Threads) ++set_target_properties(lmdb PROPERTIES ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib") ++ ++include(GNUInstallDirs) ++ ++install(TARGETS lmdb ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}) ++install(FILES lmdb.h DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}) From a125e60a78665a35778c38b0c144b9f1b96d08de Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Tue, 23 Jun 2026 13:08:31 +0200 Subject: [PATCH 2/3] Fix lmdb lifetime issues --- extensions/lmdb/LmdbContentRepository.cpp | 74 +++++++++++-------- extensions/lmdb/LmdbContentRepository.h | 6 +- .../lmdb/tests/LmdbContentRepositoryTests.cpp | 17 +++++ 3 files changed, 63 insertions(+), 34 deletions(-) diff --git a/extensions/lmdb/LmdbContentRepository.cpp b/extensions/lmdb/LmdbContentRepository.cpp index a6eeeb45f7..c76185ceea 100644 --- a/extensions/lmdb/LmdbContentRepository.cpp +++ b/extensions/lmdb/LmdbContentRepository.cpp @@ -18,9 +18,11 @@ #include "LmdbContentRepository.h" #include +#include #include #include #include +#include #include "LmdbStream.h" #include "core/Resource.h" @@ -79,6 +81,7 @@ bool LmdbContentRepository::initialize(const std::shared_ptr& if (!max_db_size) { logger_->log_error("Invalid max DB size configuration for LMDB Content Repository"); mdb_env_close(lmdb_env_); + lmdb_env_ = nullptr; return false; } @@ -100,12 +103,16 @@ bool LmdbContentRepository::initialize(const std::shared_ptr& logger_->log_info("Creating LMDB Content Repository directory at {}", directory_); if (!std::filesystem::create_directories(directory_)) { logger_->log_error("Failed to create LMDB Content Repository directory at {}", directory_); + mdb_env_close(lmdb_env_); + lmdb_env_ = nullptr; return false; } } if (const int rc = mdb_env_open(lmdb_env_, directory_.c_str(), MDB_NOTLS, 0664)) { logger_->log_error("Failed to open LMDB environment: {}", mdb_strerror(rc)); + mdb_env_close(lmdb_env_); + lmdb_env_ = nullptr; return false; } @@ -113,18 +120,21 @@ bool LmdbContentRepository::initialize(const std::shared_ptr& if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, 0, &init_txn); rc != MDB_SUCCESS) { logger_->log_error("Failed to begin LMDB transaction during initialize: {}", mdb_strerror(rc)); mdb_env_close(lmdb_env_); + lmdb_env_ = nullptr; return false; } if (const int rc = mdb_dbi_open(init_txn, nullptr, 0, &lmdb_handle_); rc != MDB_SUCCESS) { logger_->log_error("Failed to open LMDB database: {}", mdb_strerror(rc)); mdb_txn_abort(init_txn); mdb_env_close(lmdb_env_); + lmdb_env_ = nullptr; return false; } if (const int rc = mdb_txn_commit(init_txn); rc != MDB_SUCCESS) { logger_->log_error("Failed to commit LMDB transaction during initialize: {}", mdb_strerror(rc)); mdb_env_close(lmdb_env_); + lmdb_env_ = nullptr; return false; } @@ -195,41 +205,41 @@ bool LmdbContentRepository::removeKey(const std::string& content_path) { void LmdbContentRepository::clearOrphans() { std::vector keys_to_be_deleted; - MDB_txn* txn = nullptr; - if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); rc != MDB_SUCCESS) { - logger_->log_error("Failed to begin LMDB read transaction in clearOrphans: {}", mdb_strerror(rc)); - return; - } - - MDB_cursor* cursor = nullptr; - if (const int rc = mdb_cursor_open(txn, lmdb_handle_, &cursor); rc != MDB_SUCCESS) { - logger_->log_error("Failed to open LMDB cursor in clearOrphans: {}", mdb_strerror(rc)); - mdb_txn_abort(txn); - return; - } - - MDB_val key{}; - MDB_val val{}; - int rc = mdb_cursor_get(cursor, &key, &val, MDB_FIRST); - - while (rc == MDB_SUCCESS) { - std::string key_string = std::string(static_cast(key.mv_data), key.mv_size); - - std::lock_guard lock(count_map_mutex_); - auto claim_it = count_map_.find(key_string); - if (claim_it == count_map_.end() || claim_it->second == 0) { - logger_->log_error("Deleting orphan resource {}", key_string); - keys_to_be_deleted.push_back(key_string); + { + MDB_txn* txn = nullptr; + if (const int rc = mdb_txn_begin(lmdb_env_, nullptr, MDB_RDONLY, &txn); rc != MDB_SUCCESS) { + logger_->log_error("Failed to begin LMDB read transaction in clearOrphans: {}", mdb_strerror(rc)); + return; } - rc = mdb_cursor_get(cursor, &key, &val, MDB_NEXT); - } + auto txn_guard = gsl::finally([txn] { mdb_txn_abort(txn); }); - mdb_cursor_close(cursor); - mdb_txn_abort(txn); + MDB_cursor* cursor = nullptr; + if (const int rc = mdb_cursor_open(txn, lmdb_handle_, &cursor); rc != MDB_SUCCESS) { + logger_->log_error("Failed to open LMDB cursor in clearOrphans: {}", mdb_strerror(rc)); + return; + } + auto cursor_guard = gsl::finally([cursor] { mdb_cursor_close(cursor); }); + + MDB_val key{}; + MDB_val val{}; + int rc = mdb_cursor_get(cursor, &key, &val, MDB_FIRST); + + while (rc == MDB_SUCCESS) { + std::string key_string = std::string(static_cast(key.mv_data), key.mv_size); + + std::lock_guard lock(count_map_mutex_); + auto claim_it = count_map_.find(key_string); + if (claim_it == count_map_.end() || claim_it->second == 0) { + logger_->log_debug("Deleting orphan resource {}", key_string); + keys_to_be_deleted.push_back(key_string); + } + rc = mdb_cursor_get(cursor, &key, &val, MDB_NEXT); + } - if (rc != MDB_NOTFOUND) { - logger_->log_error("Failed to iterate over LMDB database: {}", mdb_strerror(rc)); - return; + if (rc != MDB_NOTFOUND) { + logger_->log_error("Failed to iterate over LMDB database: {}", mdb_strerror(rc)); + return; + } } std::vector failed_deletions; diff --git a/extensions/lmdb/LmdbContentRepository.h b/extensions/lmdb/LmdbContentRepository.h index 754b35b8bd..65d47a4dbb 100644 --- a/extensions/lmdb/LmdbContentRepository.h +++ b/extensions/lmdb/LmdbContentRepository.h @@ -39,8 +39,10 @@ class LmdbContentRepository : public core::ContentRepositoryImpl { ~LmdbContentRepository() override { stop(); - mdb_dbi_close(lmdb_env_, lmdb_handle_); - mdb_env_close(lmdb_env_); + if (lmdb_env_) { + mdb_dbi_close(lmdb_env_, lmdb_handle_); + mdb_env_close(lmdb_env_); + } } EXTENSIONAPI static constexpr auto Properties = std::array{}; diff --git a/extensions/lmdb/tests/LmdbContentRepositoryTests.cpp b/extensions/lmdb/tests/LmdbContentRepositoryTests.cpp index 17af0c6fd2..ec9b9c7b03 100644 --- a/extensions/lmdb/tests/LmdbContentRepositoryTests.cpp +++ b/extensions/lmdb/tests/LmdbContentRepositoryTests.cpp @@ -17,6 +17,7 @@ */ #include +#include #include "LmdbContentRepository.h" #include "ResourceClaim.h" @@ -86,6 +87,22 @@ TEST_CASE("Initialize succeeds when target directory already exists", "[lmdb]") REQUIRE(content_repo->initialize(configuration)); } +TEST_CASE("Initialize fails and is safe to destroy when the directory path is a regular file", "[lmdb]") { + TestController controller; + const auto file_path = controller.createTempDirectory() / "not_a_directory"; + { + std::ofstream file(file_path); + file << "this is a regular file, not a directory"; + } + REQUIRE(std::filesystem::is_regular_file(file_path)); + + auto configuration = std::make_shared(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, file_path.string()); + + auto content_repo = std::make_shared(); + REQUIRE_FALSE(content_repo->initialize(configuration)); +} + TEST_CASE_METHOD(LmdbContentRepositoryTests, "Key does not exist in empty database", "[lmdb]") { auto claim = std::make_shared(content_repo_); REQUIRE_FALSE(content_repo_->exists(*claim)); From 52d93b20633060c479678d6ac9d71c21e018b572 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Tue, 23 Jun 2026 14:20:11 +0200 Subject: [PATCH 3/3] Fix LMDB build on Windows with WIN32_LEAN_AND_MEAN defined --- cmake/LMDB.cmake | 22 ++--- thirdparty/lmdb/fix-windows-symbols.patch | 112 ++++++++++++++++++++++ 2 files changed, 122 insertions(+), 12 deletions(-) create mode 100644 thirdparty/lmdb/fix-windows-symbols.patch diff --git a/cmake/LMDB.cmake b/cmake/LMDB.cmake index 267350b076..65ff449c59 100644 --- a/cmake/LMDB.cmake +++ b/cmake/LMDB.cmake @@ -17,9 +17,16 @@ include(FetchContent) -set(PATCH_FILE "${CMAKE_SOURCE_DIR}/thirdparty/lmdb/add-cmake-file.patch") -set(PC ${Bash_EXECUTABLE} -c "set -x &&\ - (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE}\\\")") +set(PATCH_FILE_1 "${CMAKE_SOURCE_DIR}/thirdparty/lmdb/add-cmake-file.patch") +if (WIN32) + set(PATCH_FILE_2 "${CMAKE_SOURCE_DIR}/thirdparty/lmdb/fix-windows-symbols.patch") + set(PC ${Bash_EXECUTABLE} -c "set -x &&\ + (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE_1}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE_1}\\\") &&\ + (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE_2}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE_2}\\\")") +else() + set(PC ${Bash_EXECUTABLE} -c "set -x &&\ + (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE_1}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE_1}\\\")") +endif() FetchContent_Declare( lmdb @@ -30,15 +37,6 @@ FetchContent_Declare( SYSTEM ) -if (WIN32) - get_directory_property(MINIFI_SAVED_COMPILE_DEFS COMPILE_DEFINITIONS) - remove_definitions(-DWIN32_LEAN_AND_MEAN) -endif() - FetchContent_MakeAvailable(lmdb) -if (WIN32) - set_directory_properties(PROPERTIES COMPILE_DEFINITIONS "${MINIFI_SAVED_COMPILE_DEFS}") -endif() - set(LMDB_INCLUDE_DIR "${lmdb_SOURCE_DIR}/libraries/liblmdb") diff --git a/thirdparty/lmdb/fix-windows-symbols.patch b/thirdparty/lmdb/fix-windows-symbols.patch new file mode 100644 index 0000000000..76bb2a75c2 --- /dev/null +++ b/thirdparty/lmdb/fix-windows-symbols.patch @@ -0,0 +1,112 @@ +diff --git a/libraries/liblmdb/mdb.c b/libraries/liblmdb/mdb.c +index 5a06d7b..2c35abf 100644 +--- a/libraries/liblmdb/mdb.c ++++ b/libraries/liblmdb/mdb.c +@@ -42,6 +42,7 @@ + #include + #include + #include /* get wcscpy() */ ++#include + + /* We use native NT APIs to setup the memory map, so that we can + * let the DB file grow incrementally instead of always preallocating +@@ -52,31 +53,31 @@ + * NTDLL.DLL at runtime, to avoid buildtime dependencies on any + * NTDLL import libraries. + */ +-typedef NTSTATUS (WINAPI NtCreateSectionFunc) ++typedef NTSTATUS (WINAPI MDB_NtCreateSectionFunc) + (OUT PHANDLE sh, IN ACCESS_MASK acc, + IN void * oa OPTIONAL, + IN PLARGE_INTEGER ms OPTIONAL, + IN ULONG pp, IN ULONG aa, IN HANDLE fh OPTIONAL); + +-static NtCreateSectionFunc *NtCreateSection; ++static MDB_NtCreateSectionFunc *MDB_NtCreateSection; + + typedef enum _SECTION_INHERIT { + ViewShare = 1, + ViewUnmap = 2 + } SECTION_INHERIT; + +-typedef NTSTATUS (WINAPI NtMapViewOfSectionFunc) ++typedef NTSTATUS (WINAPI MDB_NtMapViewOfSectionFunc) + (IN PHANDLE sh, IN HANDLE ph, + IN OUT PVOID *addr, IN ULONG_PTR zbits, + IN SIZE_T cs, IN OUT PLARGE_INTEGER off OPTIONAL, + IN OUT PSIZE_T vs, IN SECTION_INHERIT ih, + IN ULONG at, IN ULONG pp); + +-static NtMapViewOfSectionFunc *NtMapViewOfSection; ++static MDB_NtMapViewOfSectionFunc *MDB_NtMapViewOfSection; + +-typedef NTSTATUS (WINAPI NtCloseFunc)(HANDLE h); ++typedef NTSTATUS (WINAPI MDB_NtCloseFunc)(HANDLE h); + +-static NtCloseFunc *NtClose; ++static MDB_NtCloseFunc *MDB_NtClose; + + /** getpid() returns int; MinGW defines pid_t but MinGW64 typedefs it + * as int64 which is wrong. MSVC doesn't define it at all, so just +@@ -5060,22 +5061,22 @@ mdb_env_map(MDB_env *env, void *addr) + LARGE_INTEGER fsize; + fsize.LowPart = msize & 0xffffffff; + fsize.HighPart = msize >> 16 >> 16; +- rc = NtCreateSection(&mh, access, NULL, &fsize, secprot, SEC_RESERVE, env->me_fd); ++ rc = MDB_NtCreateSection(&mh, access, NULL, &fsize, secprot, SEC_RESERVE, env->me_fd); + #else +- rc = NtCreateSection(&mh, access, NULL, NULL, secprot, SEC_RESERVE, env->me_fd); ++ rc = MDB_NtCreateSection(&mh, access, NULL, NULL, secprot, SEC_RESERVE, env->me_fd); + #endif + if (rc) + return mdb_nt2win32(rc); + map = addr; + if (MDB_REMAPPING(env->me_flags)) + msize = NUM_METAS * env->me_psize; +- rc = NtMapViewOfSection(mh, GetCurrentProcess(), &map, 0, 0, NULL, &msize, ViewUnmap, alloctype, pageprot); ++ rc = MDB_NtMapViewOfSection(mh, GetCurrentProcess(), &map, 0, 0, NULL, &msize, ViewUnmap, alloctype, pageprot); + #if MDB_RPAGE_CACHE + if (MDB_REMAPPING(env->me_flags)) + env->me_fmh = mh; + else + #endif +- NtClose(mh); ++ MDB_NtClose(mh); + if (rc) + return mdb_nt2win32(rc); + env->me_map = map; +@@ -5435,18 +5436,18 @@ mdb_env_open2(MDB_env *env, int prev) + else + env->me_pidquery = PROCESS_QUERY_INFORMATION; + /* Grab functions we need from NTDLL */ +- if (!NtCreateSection) { ++ if (!MDB_NtCreateSection) { + HMODULE h = GetModuleHandleW(L"NTDLL.DLL"); + if (!h) + return MDB_PROBLEM; +- NtClose = (NtCloseFunc *)GetProcAddress(h, "NtClose"); +- if (!NtClose) ++ MDB_NtClose = (MDB_NtCloseFunc *)GetProcAddress(h, "NtClose"); ++ if (!MDB_NtClose) + return MDB_PROBLEM; +- NtMapViewOfSection = (NtMapViewOfSectionFunc *)GetProcAddress(h, "NtMapViewOfSection"); +- if (!NtMapViewOfSection) ++ MDB_NtMapViewOfSection = (MDB_NtMapViewOfSectionFunc *)GetProcAddress(h, "NtMapViewOfSection"); ++ if (!MDB_NtMapViewOfSection) + return MDB_PROBLEM; +- NtCreateSection = (NtCreateSectionFunc *)GetProcAddress(h, "NtCreateSection"); +- if (!NtCreateSection) ++ MDB_NtCreateSection = (MDB_NtCreateSectionFunc *)GetProcAddress(h, "NtCreateSection"); ++ if (!MDB_NtCreateSection) + return MDB_PROBLEM; + } + env->me_ovs = 0; +@@ -6909,7 +6910,7 @@ mdb_rpage_get(MDB_txn *txn, pgno_t pg0, int numpgs, MDB_page **ret) + #define SET_OFF(off,val) off.QuadPart = val + #define MAP(rc,env,addr,len,off) \ + addr = NULL; \ +- rc = NtMapViewOfSection(env->me_fmh, GetCurrentProcess(), &addr, 0, \ ++ rc = MDB_NtMapViewOfSection(env->me_fmh, GetCurrentProcess(), &addr, 0, \ + len, &off, &len, ViewUnmap, (env->me_flags & MDB_RDONLY) ? 0 : MEM_RESERVE, PAGE_READONLY); \ + if (rc) rc = mdb_nt2win32(rc) + #else