Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tree/ntuple/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ endif()
if(curl)
set(ROOTNTuple_EXTRA_HEADERS ${ROOTNTuple_EXTRA_HEADERS} ROOT/RPageStorageS3.hxx)
target_sources(ROOTNTuple PRIVATE src/RPageStorageS3.cxx)
target_compile_definitions(ROOTNTuple PRIVATE R__ENABLE_S3)
target_link_libraries(ROOTNTuple PRIVATE nlohmann_json::nlohmann_json)
target_link_libraries(ROOTNTuple PRIVATE RCurlHttp)
endif()

if(MSVC)
Expand Down
71 changes: 68 additions & 3 deletions tree/ntuple/inc/ROOT/RPageStorageS3.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@

#include <ROOT/RError.hxx>
#include <ROOT/RNTuple.hxx>
#include <ROOT/RPageStorage.hxx>

#include <atomic>
#include <cstdint>
#include <memory>
#include <string>
#include <string_view>

namespace ROOT {
namespace Experimental {
Expand All @@ -43,9 +47,10 @@ struct RNTupleAnchorS3 {
std::uint16_t fVersionMajor = RNTuple::kVersionMajor;
std::uint16_t fVersionMinor = RNTuple::kVersionMinor;
std::uint16_t fVersionPatch = RNTuple::kVersionPatch;
/// Pattern for resolving object IDs to full S3 URLs.
/// ${baseurl} is replaced with the anchor URL, ${objid} with the numeric object ID.
std::string fUrlTemplate;
/// Pattern for resolving object IDs to full S3 URLs. ${baseurl} is replaced with the anchor URL,
/// ${objid} with the numeric object ID. Defaults to the scheme this writer uses; the reader
/// overrides it from the stored anchor.
std::string fUrlTemplate = "${baseurl}/${objid}";
/// Object ID and byte offset of the compressed header within the S3 object
std::uint64_t fHeaderObjId = 0;
std::uint64_t fHeaderOffset = 0;
Expand All @@ -67,6 +72,66 @@ struct RNTupleAnchorS3 {
static RResult<RNTupleAnchorS3> CreateFromJSON(const std::string &json);
};

/// \brief Parsed components of an S3-scheme URI
struct RS3Url {
std::string fHttpUrl; ///< The full HTTP(S) URL (the s3 scheme prefix translated to http/https)
std::string fScheme; ///< "http" or "https"
};

/// \brief Translate an S3-scheme URI into its HTTP equivalent.
///
/// Accepts `s3+http://host/bucket/path`, `s3+https://host/bucket/path`, and `s3://bucket/path`
/// (the bare form defaults to the AWS endpoint). Throws RException on an unrecognized scheme.
RS3Url ParseS3Url(std::string_view uri);

// clang-format off
/**
\class ROOT::Experimental::Internal::RPageSinkS3
\ingroup NTuple
\brief Storage provider that writes ntuple pages into S3-compatible object storage.
Currently implements Mode B (one sealed page per S3 object, kTypeObject64 locators).
Mode A (multiple packed pages per object, kTypeMulti locators) will be added separately.
\warning The S3 backend is experimental and under active development.
*/
// clang-format on
class RPageSinkS3 : public ROOT::Internal::RPagePersistentSink {
private:
/// HTTP base URL for this ntuple (derived from the s3 scheme URI); never has a trailing slash
std::string fBaseUrl;
/// Object ID counter; incremented for each object written. Atomic to match the DAOS pattern and
/// prepare for a future parallel CommitSealedPageVImpl.
std::atomic<std::uint64_t> fObjectId{0};
/// Tracks the number of bytes committed to the current cluster (reset in StageClusterImpl)
std::uint64_t fNBytesCurrentCluster{0};
/// Anchor metadata populated during the write path and uploaded last in CommitDatasetImpl
RNTupleAnchorS3 fAnchor;

/// Resolve a numeric object ID to its full HTTP URL
std::string ObjectUrl(std::uint64_t objId) const;
/// Upload raw bytes to the given S3 URL via an HTTP PUT request
void PutObject(const std::string &url, const unsigned char *data, std::size_t size);

protected:
using RPagePersistentSink::InitImpl;
void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final;
RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final;
RNTupleLocator
CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final;
std::uint64_t StageClusterImpl() final;
RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final;
using RPagePersistentSink::CommitDatasetImpl;
ROOT::Internal::RNTupleLink CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final;

public:
RPageSinkS3(std::string_view ntupleName, std::string_view uri, const ROOT::RNTupleWriteOptions &options);
~RPageSinkS3() override;

std::unique_ptr<ROOT::Internal::RPageSink>
CloneAsHidden(std::string_view name, const ROOT::RNTupleWriteOptions &opts) const final;
}; // class RPageSinkS3

} // namespace Internal
} // namespace Experimental
} // namespace ROOT
Expand Down
14 changes: 14 additions & 0 deletions tree/ntuple/src/RPageStorage.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
#ifdef R__ENABLE_DAOS
#include <ROOT/RPageStorageDaos.hxx>
#endif
#ifdef R__ENABLE_S3
#include <ROOT/RPageStorageS3.hxx>
#endif

#include <Compression.h>
#include <TError.h>
Expand Down Expand Up @@ -190,6 +193,9 @@ ROOT::Internal::RPageSource::Create(std::string_view ntupleName, std::string_vie
throw RException(R__FAIL("This RNTuple build does not support DAOS."));
#endif

if (location.find("s3://") == 0 || location.find("s3+http://") == 0 || location.find("s3+https://") == 0)
throw RException(R__FAIL("S3 read support is not yet implemented."));

return std::make_unique<ROOT::Internal::RPageSourceFile>(ntupleName, location, options);
}

Expand Down Expand Up @@ -904,6 +910,14 @@ ROOT::Internal::RPagePersistentSink::Create(std::string_view ntupleName, std::st
#endif
}

if (location.find("s3://") == 0 || location.find("s3+http://") == 0 || location.find("s3+https://") == 0) {
#ifdef R__ENABLE_S3
return std::make_unique<ROOT::Experimental::Internal::RPageSinkS3>(ntupleName, location, options);
#else
throw RException(R__FAIL("This RNTuple build does not support S3."));
#endif
}

// Otherwise assume that the user wants us to create a file.
return std::make_unique<ROOT::Internal::RPageSinkFile>(ntupleName, location, options);
}
Expand Down
223 changes: 223 additions & 0 deletions tree/ntuple/src/RPageStorageS3.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,26 @@

#include <ROOT/RPageStorageS3.hxx>

#include <ROOT/RColumn.hxx>
#include <ROOT/RCurlConnection.hxx>
#include <ROOT/RLogger.hxx>
#include <ROOT/RNTupleTypes.hxx>
#include <ROOT/RNTupleUtils.hxx>
#include <ROOT/RNTupleZip.hxx>
#include <ROOT/RPage.hxx>

#include <nlohmann/json.hpp>

#include <cctype>
#include <cstring>
#include <mutex>
#include <string>
#include <utility>

namespace {
using ROOT::Internal::MakeUninitArray;
using ROOT::Internal::RNTupleCompressor;
} // anonymous namespace

/// Field-by-field equality check across all 14 anchor members.
/// Used to verify round-trip correctness in tests.
Expand Down Expand Up @@ -97,3 +114,209 @@ ROOT::Experimental::Internal::RNTupleAnchorS3::CreateFromJSON(const std::string

return anchor;
}

////////////////////////////////////////////////////////////////////////////////
// S3 URI parsing

ROOT::Experimental::Internal::RS3Url ROOT::Experimental::Internal::ParseS3Url(std::string_view uri)
{
const std::string uriStr(uri);

// The base URL is a plain bucket/path prefix (ObjectUrl() appends "/<id>") and S3 authentication
// comes from the environment via SigV4, not from the URL. Reject embedded userinfo, query strings,
// and fragments rather than silently mishandling them.
if (uriStr.find_first_of("@?#") != std::string::npos)
throw ROOT::RException(
R__FAIL("S3 URI must not contain userinfo ('@'), a query ('?') or a fragment ('#'): " + uriStr));

// URI schemes are case-insensitive (RFC 3986); match on a lower-cased copy of the scheme, but keep
// the remainder verbatim because bucket names and object keys are case-sensitive.
std::string schemeLower;
for (std::size_t i = 0; i < uriStr.size() && i < std::strlen("s3+https://"); ++i)
schemeLower.push_back(static_cast<char>(std::tolower(static_cast<unsigned char>(uriStr[i]))));

RS3Url result;
std::size_t schemeLen = 0;
if (schemeLower.rfind("s3+http://", 0) == 0) {
result.fScheme = "http";
schemeLen = std::strlen("s3+http://");
result.fHttpUrl = "http://" + uriStr.substr(schemeLen);
} else if (schemeLower.rfind("s3+https://", 0) == 0) {
result.fScheme = "https";
schemeLen = std::strlen("s3+https://");
result.fHttpUrl = "https://" + uriStr.substr(schemeLen);
} else if (schemeLower.rfind("s3://", 0) == 0) {
// The bare s3:// form is a convenience shortcut for AWS path-style addressing on the global
// (us-east-1) endpoint. For other regions or providers, use the explicit s3+http(s):// form
// with the full host, e.g. s3+https://s3.eu-west-1.amazonaws.com/bucket/path.
result.fScheme = "https";
schemeLen = std::strlen("s3://");
result.fHttpUrl = "https://s3.amazonaws.com/" + uriStr.substr(schemeLen);
} else {
throw ROOT::RException(R__FAIL("invalid S3 URI (expected s3://, s3+http:// or s3+https://): " + uriStr));
}

// There must be a host (s3+http(s)://) or bucket (s3://) after the scheme.
if (uriStr.size() == schemeLen)
throw ROOT::RException(R__FAIL("S3 URI has no host or bucket: " + uriStr));

// Drop trailing slashes so ObjectUrl() never produces "//" in an object key and the anchor key
// (the base URL itself) is not left ending in '/'.
while (!result.fHttpUrl.empty() && result.fHttpUrl.back() == '/')
result.fHttpUrl.pop_back();

return result;
}

////////////////////////////////////////////////////////////////////////////////
// RPageSinkS3

ROOT::Experimental::Internal::RPageSinkS3::RPageSinkS3(std::string_view ntupleName, std::string_view uri,
const ROOT::RNTupleWriteOptions &options)
: RPagePersistentSink(ntupleName, options), fBaseUrl(ParseS3Url(uri).fHttpUrl)
{
static std::once_flag once;
std::call_once(once, []() {
R__LOG_WARNING(ROOT::Internal::NTupleLog()) << "The S3 backend is experimental and still under development. "
<< "Do not store real data with this version of RNTuple!";
});
EnableDefaultMetrics("RPageSinkS3");
}

ROOT::Experimental::Internal::RPageSinkS3::~RPageSinkS3() = default;

std::string ROOT::Experimental::Internal::RPageSinkS3::ObjectUrl(std::uint64_t objId) const
{
return fBaseUrl + "/" + std::to_string(objId);
}

void ROOT::Experimental::Internal::RPageSinkS3::PutObject(const std::string &url, const unsigned char *data,
std::size_t size)
{
// one PUT per object on a fresh connection.
ROOT::Internal::RCurlConnection conn(url);
conn.SetCredentialsFromEnvironment();
auto status = conn.SendPutReq(data, size);
if (!status)
throw ROOT::RException(R__FAIL("S3 PUT failed for " + url + ": " + status.fStatusMsg));
}

void ROOT::Experimental::Internal::RPageSinkS3::InitImpl(unsigned char *serializedHeader, std::uint32_t length)
{
// Unlike the DAOS backend there is no container/object-class setup here: the bucket is assumed to
// exist, so InitImpl only compresses and uploads the header (object 0).
// fAnchor.fUrlTemplate keeps its default ("${baseurl}/${objid}"), mirroring the version fields.

auto zipBuffer = MakeUninitArray<unsigned char>(length);
auto szZipHeader =
RNTupleCompressor::Zip(serializedHeader, length, GetWriteOptions().GetCompression(), zipBuffer.get());

const auto headerObjId = fObjectId.fetch_add(1);
{
Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
PutObject(ObjectUrl(headerObjId), zipBuffer.get(), szZipHeader);
}

fAnchor.fHeaderObjId = headerObjId;
fAnchor.fHeaderOffset = 0;
fAnchor.fNBytesHeader = szZipHeader;
fAnchor.fLenHeader = length;
}

ROOT::RNTupleLocator ROOT::Experimental::Internal::RPageSinkS3::CommitPageImpl(ColumnHandle_t columnHandle,
const ROOT::Internal::RPage &page)
{
auto element = columnHandle.fColumn->GetElement();
RPageStorage::RSealedPage sealedPage;
{
Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip);
sealedPage = SealPage(page, *element);
}

fCounters->fSzZip.Add(page.GetNBytes());
return CommitSealedPageImpl(columnHandle.fPhysicalId, sealedPage);
}

ROOT::RNTupleLocator
ROOT::Experimental::Internal::RPageSinkS3::CommitSealedPageImpl(ROOT::DescriptorId_t,
const RPageStorage::RSealedPage &sealedPage)
{
// Mode B: one S3 object per sealed page, located by a kTypeObject64 locator
const auto pageObjId = fObjectId.fetch_add(1);
{
Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
PutObject(ObjectUrl(pageObjId), reinterpret_cast<const unsigned char *>(sealedPage.GetBuffer()),
sealedPage.GetBufferSize());
}

RNTupleLocator result;
result.SetType(RNTupleLocator::kTypeObject64);
result.SetNBytesOnStorage(sealedPage.GetDataSize());
result.SetPosition(ROOT::RNTupleLocatorObject64{pageObjId});
fCounters->fNPageCommitted.Inc();
fCounters->fSzWritePayload.Add(sealedPage.GetBufferSize());
fNBytesCurrentCluster += sealedPage.GetBufferSize();
return result;
}

std::uint64_t ROOT::Experimental::Internal::RPageSinkS3::StageClusterImpl()
{
return std::exchange(fNBytesCurrentCluster, 0);
}

ROOT::RNTupleLocator
ROOT::Experimental::Internal::RPageSinkS3::CommitClusterGroupImpl(unsigned char *serializedPageList,
std::uint32_t length)
{
auto bufPageListZip = MakeUninitArray<unsigned char>(length);
auto szPageListZip =
RNTupleCompressor::Zip(serializedPageList, length, GetWriteOptions().GetCompression(), bufPageListZip.get());

const auto objId = fObjectId.fetch_add(1);
{
Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
PutObject(ObjectUrl(objId), bufPageListZip.get(), szPageListZip);
}

RNTupleLocator result;
result.SetType(RNTupleLocator::kTypeObject64);
result.SetNBytesOnStorage(szPageListZip);
result.SetPosition(ROOT::RNTupleLocatorObject64{objId});
fCounters->fSzWritePayload.Add(static_cast<std::int64_t>(szPageListZip));
return result;
}

ROOT::Internal::RNTupleLink
ROOT::Experimental::Internal::RPageSinkS3::CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length)
{
auto bufFooterZip = MakeUninitArray<unsigned char>(length);
auto szFooterZip =
RNTupleCompressor::Zip(serializedFooter, length, GetWriteOptions().GetCompression(), bufFooterZip.get());

const auto footerObjId = fObjectId.fetch_add(1);
{
Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite);
PutObject(ObjectUrl(footerObjId), bufFooterZip.get(), szFooterZip);
}

fAnchor.fFooterObjId = footerObjId;
fAnchor.fFooterOffset = 0;
fAnchor.fNBytesFooter = szFooterZip;
fAnchor.fLenFooter = length;

// Upload the anchor LAST: once it exists at the base URL, a reader can assume the whole ntuple
// is complete. Never upload it before all other objects are in place.
const auto anchorJson = fAnchor.ToJSON();
PutObject(fBaseUrl, reinterpret_cast<const unsigned char *>(anchorJson.data()), anchorJson.size());

// An S3 ntuple is self-locating: its anchor always lives at the base URL, so there is no anchor
// link to hand back here (unlike the file backend, which reports where it embedded the anchor).
return {};
}

std::unique_ptr<ROOT::Internal::RPageSink>
ROOT::Experimental::Internal::RPageSinkS3::CloneAsHidden(std::string_view /*name*/,
const ROOT::RNTupleWriteOptions & /*opts*/) const
{
throw ROOT::RException(R__FAIL("cloning an S3 sink is not implemented yet"));
}
2 changes: 1 addition & 1 deletion tree/ntuple/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ if(daos OR daos_mock)
endif()

if(curl)
ROOT_ADD_GTEST(ntuple_storage_s3 ntuple_storage_s3.cxx LIBRARIES ROOTNTuple)
ROOT_ADD_GTEST(ntuple_storage_s3 ntuple_storage_s3.cxx LIBRARIES ROOTNTuple Net)
endif()


Expand Down
Loading