From 55e94d874099ce6287c0c646add31d57fb86ffd6 Mon Sep 17 00:00:00 2001 From: JasMehta08 Date: Wed, 17 Jun 2026 11:34:37 +0530 Subject: [PATCH] [ntuple] Add RPageSinkS3 for the basic S3 write path --- tree/ntuple/CMakeLists.txt | 2 + tree/ntuple/inc/ROOT/RPageStorageS3.hxx | 71 ++++++- tree/ntuple/src/RPageStorage.cxx | 14 ++ tree/ntuple/src/RPageStorageS3.cxx | 223 ++++++++++++++++++++ tree/ntuple/test/CMakeLists.txt | 2 +- tree/ntuple/test/ntuple_storage_s3.cxx | 261 ++++++++++++++++++++++++ 6 files changed, 569 insertions(+), 4 deletions(-) diff --git a/tree/ntuple/CMakeLists.txt b/tree/ntuple/CMakeLists.txt index 24ab0556adb13..12393c79708e1 100644 --- a/tree/ntuple/CMakeLists.txt +++ b/tree/ntuple/CMakeLists.txt @@ -125,7 +125,9 @@ endif() if(curl) set(ROOTNTuple_EXTRA_HEADERS ${ROOTNTuple_EXTRA_HEADERS} ROOT/RPageStorageS3.hxx) target_sources(ROOTNTuple PRIVATE src/RPageStorageS3.cxx) + target_compile_definitions(ROOTNTuple PRIVATE R__ENABLE_S3) target_link_libraries(ROOTNTuple PRIVATE nlohmann_json::nlohmann_json) + target_link_libraries(ROOTNTuple PRIVATE RCurlHttp) endif() if(MSVC) diff --git a/tree/ntuple/inc/ROOT/RPageStorageS3.hxx b/tree/ntuple/inc/ROOT/RPageStorageS3.hxx index 33632c11c3d07..24bbef59c67bf 100644 --- a/tree/ntuple/inc/ROOT/RPageStorageS3.hxx +++ b/tree/ntuple/inc/ROOT/RPageStorageS3.hxx @@ -15,9 +15,13 @@ #include #include +#include +#include #include +#include #include +#include namespace ROOT { namespace Experimental { @@ -43,9 +47,10 @@ struct RNTupleAnchorS3 { std::uint16_t fVersionMajor = RNTuple::kVersionMajor; std::uint16_t fVersionMinor = RNTuple::kVersionMinor; std::uint16_t fVersionPatch = RNTuple::kVersionPatch; - /// Pattern for resolving object IDs to full S3 URLs. - /// ${baseurl} is replaced with the anchor URL, ${objid} with the numeric object ID. - std::string fUrlTemplate; + /// Pattern for resolving object IDs to full S3 URLs. ${baseurl} is replaced with the anchor URL, + /// ${objid} with the numeric object ID. Defaults to the scheme this writer uses; the reader + /// overrides it from the stored anchor. + std::string fUrlTemplate = "${baseurl}/${objid}"; /// Object ID and byte offset of the compressed header within the S3 object std::uint64_t fHeaderObjId = 0; std::uint64_t fHeaderOffset = 0; @@ -67,6 +72,66 @@ struct RNTupleAnchorS3 { static RResult CreateFromJSON(const std::string &json); }; +/// \brief Parsed components of an S3-scheme URI +struct RS3Url { + std::string fHttpUrl; ///< The full HTTP(S) URL (the s3 scheme prefix translated to http/https) + std::string fScheme; ///< "http" or "https" +}; + +/// \brief Translate an S3-scheme URI into its HTTP equivalent. +/// +/// Accepts `s3+http://host/bucket/path`, `s3+https://host/bucket/path`, and `s3://bucket/path` +/// (the bare form defaults to the AWS endpoint). Throws RException on an unrecognized scheme. +RS3Url ParseS3Url(std::string_view uri); + +// clang-format off +/** +\class ROOT::Experimental::Internal::RPageSinkS3 +\ingroup NTuple +\brief Storage provider that writes ntuple pages into S3-compatible object storage. + +Currently implements Mode B (one sealed page per S3 object, kTypeObject64 locators). +Mode A (multiple packed pages per object, kTypeMulti locators) will be added separately. + +\warning The S3 backend is experimental and under active development. +*/ +// clang-format on +class RPageSinkS3 : public ROOT::Internal::RPagePersistentSink { +private: + /// HTTP base URL for this ntuple (derived from the s3 scheme URI); never has a trailing slash + std::string fBaseUrl; + /// Object ID counter; incremented for each object written. Atomic to match the DAOS pattern and + /// prepare for a future parallel CommitSealedPageVImpl. + std::atomic fObjectId{0}; + /// Tracks the number of bytes committed to the current cluster (reset in StageClusterImpl) + std::uint64_t fNBytesCurrentCluster{0}; + /// Anchor metadata populated during the write path and uploaded last in CommitDatasetImpl + RNTupleAnchorS3 fAnchor; + + /// Resolve a numeric object ID to its full HTTP URL + std::string ObjectUrl(std::uint64_t objId) const; + /// Upload raw bytes to the given S3 URL via an HTTP PUT request + void PutObject(const std::string &url, const unsigned char *data, std::size_t size); + +protected: + using RPagePersistentSink::InitImpl; + void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final; + RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final; + RNTupleLocator + CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final; + std::uint64_t StageClusterImpl() final; + RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final; + using RPagePersistentSink::CommitDatasetImpl; + ROOT::Internal::RNTupleLink CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final; + +public: + RPageSinkS3(std::string_view ntupleName, std::string_view uri, const ROOT::RNTupleWriteOptions &options); + ~RPageSinkS3() override; + + std::unique_ptr + CloneAsHidden(std::string_view name, const ROOT::RNTupleWriteOptions &opts) const final; +}; // class RPageSinkS3 + } // namespace Internal } // namespace Experimental } // namespace ROOT diff --git a/tree/ntuple/src/RPageStorage.cxx b/tree/ntuple/src/RPageStorage.cxx index ee3269b2be9a6..3ca37b1dc20fa 100644 --- a/tree/ntuple/src/RPageStorage.cxx +++ b/tree/ntuple/src/RPageStorage.cxx @@ -27,6 +27,9 @@ #ifdef R__ENABLE_DAOS #include #endif +#ifdef R__ENABLE_S3 +#include +#endif #include #include @@ -190,6 +193,9 @@ ROOT::Internal::RPageSource::Create(std::string_view ntupleName, std::string_vie throw RException(R__FAIL("This RNTuple build does not support DAOS.")); #endif + if (location.find("s3://") == 0 || location.find("s3+http://") == 0 || location.find("s3+https://") == 0) + throw RException(R__FAIL("S3 read support is not yet implemented.")); + return std::make_unique(ntupleName, location, options); } @@ -904,6 +910,14 @@ ROOT::Internal::RPagePersistentSink::Create(std::string_view ntupleName, std::st #endif } + if (location.find("s3://") == 0 || location.find("s3+http://") == 0 || location.find("s3+https://") == 0) { +#ifdef R__ENABLE_S3 + return std::make_unique(ntupleName, location, options); +#else + throw RException(R__FAIL("This RNTuple build does not support S3.")); +#endif + } + // Otherwise assume that the user wants us to create a file. return std::make_unique(ntupleName, location, options); } diff --git a/tree/ntuple/src/RPageStorageS3.cxx b/tree/ntuple/src/RPageStorageS3.cxx index 0817eecc56a7c..21e84fbaf95f5 100644 --- a/tree/ntuple/src/RPageStorageS3.cxx +++ b/tree/ntuple/src/RPageStorageS3.cxx @@ -12,9 +12,26 @@ #include +#include +#include +#include +#include +#include +#include +#include + #include +#include +#include +#include #include +#include + +namespace { +using ROOT::Internal::MakeUninitArray; +using ROOT::Internal::RNTupleCompressor; +} // anonymous namespace /// Field-by-field equality check across all 14 anchor members. /// Used to verify round-trip correctness in tests. @@ -97,3 +114,209 @@ ROOT::Experimental::Internal::RNTupleAnchorS3::CreateFromJSON(const std::string return anchor; } + +//////////////////////////////////////////////////////////////////////////////// +// S3 URI parsing + +ROOT::Experimental::Internal::RS3Url ROOT::Experimental::Internal::ParseS3Url(std::string_view uri) +{ + const std::string uriStr(uri); + + // The base URL is a plain bucket/path prefix (ObjectUrl() appends "/") and S3 authentication + // comes from the environment via SigV4, not from the URL. Reject embedded userinfo, query strings, + // and fragments rather than silently mishandling them. + if (uriStr.find_first_of("@?#") != std::string::npos) + throw ROOT::RException( + R__FAIL("S3 URI must not contain userinfo ('@'), a query ('?') or a fragment ('#'): " + uriStr)); + + // URI schemes are case-insensitive (RFC 3986); match on a lower-cased copy of the scheme, but keep + // the remainder verbatim because bucket names and object keys are case-sensitive. + std::string schemeLower; + for (std::size_t i = 0; i < uriStr.size() && i < std::strlen("s3+https://"); ++i) + schemeLower.push_back(static_cast(std::tolower(static_cast(uriStr[i])))); + + RS3Url result; + std::size_t schemeLen = 0; + if (schemeLower.rfind("s3+http://", 0) == 0) { + result.fScheme = "http"; + schemeLen = std::strlen("s3+http://"); + result.fHttpUrl = "http://" + uriStr.substr(schemeLen); + } else if (schemeLower.rfind("s3+https://", 0) == 0) { + result.fScheme = "https"; + schemeLen = std::strlen("s3+https://"); + result.fHttpUrl = "https://" + uriStr.substr(schemeLen); + } else if (schemeLower.rfind("s3://", 0) == 0) { + // The bare s3:// form is a convenience shortcut for AWS path-style addressing on the global + // (us-east-1) endpoint. For other regions or providers, use the explicit s3+http(s):// form + // with the full host, e.g. s3+https://s3.eu-west-1.amazonaws.com/bucket/path. + result.fScheme = "https"; + schemeLen = std::strlen("s3://"); + result.fHttpUrl = "https://s3.amazonaws.com/" + uriStr.substr(schemeLen); + } else { + throw ROOT::RException(R__FAIL("invalid S3 URI (expected s3://, s3+http:// or s3+https://): " + uriStr)); + } + + // There must be a host (s3+http(s)://) or bucket (s3://) after the scheme. + if (uriStr.size() == schemeLen) + throw ROOT::RException(R__FAIL("S3 URI has no host or bucket: " + uriStr)); + + // Drop trailing slashes so ObjectUrl() never produces "//" in an object key and the anchor key + // (the base URL itself) is not left ending in '/'. + while (!result.fHttpUrl.empty() && result.fHttpUrl.back() == '/') + result.fHttpUrl.pop_back(); + + return result; +} + +//////////////////////////////////////////////////////////////////////////////// +// RPageSinkS3 + +ROOT::Experimental::Internal::RPageSinkS3::RPageSinkS3(std::string_view ntupleName, std::string_view uri, + const ROOT::RNTupleWriteOptions &options) + : RPagePersistentSink(ntupleName, options), fBaseUrl(ParseS3Url(uri).fHttpUrl) +{ + static std::once_flag once; + std::call_once(once, []() { + R__LOG_WARNING(ROOT::Internal::NTupleLog()) << "The S3 backend is experimental and still under development. " + << "Do not store real data with this version of RNTuple!"; + }); + EnableDefaultMetrics("RPageSinkS3"); +} + +ROOT::Experimental::Internal::RPageSinkS3::~RPageSinkS3() = default; + +std::string ROOT::Experimental::Internal::RPageSinkS3::ObjectUrl(std::uint64_t objId) const +{ + return fBaseUrl + "/" + std::to_string(objId); +} + +void ROOT::Experimental::Internal::RPageSinkS3::PutObject(const std::string &url, const unsigned char *data, + std::size_t size) +{ + // one PUT per object on a fresh connection. + ROOT::Internal::RCurlConnection conn(url); + conn.SetCredentialsFromEnvironment(); + auto status = conn.SendPutReq(data, size); + if (!status) + throw ROOT::RException(R__FAIL("S3 PUT failed for " + url + ": " + status.fStatusMsg)); +} + +void ROOT::Experimental::Internal::RPageSinkS3::InitImpl(unsigned char *serializedHeader, std::uint32_t length) +{ + // Unlike the DAOS backend there is no container/object-class setup here: the bucket is assumed to + // exist, so InitImpl only compresses and uploads the header (object 0). + // fAnchor.fUrlTemplate keeps its default ("${baseurl}/${objid}"), mirroring the version fields. + + auto zipBuffer = MakeUninitArray(length); + auto szZipHeader = + RNTupleCompressor::Zip(serializedHeader, length, GetWriteOptions().GetCompression(), zipBuffer.get()); + + const auto headerObjId = fObjectId.fetch_add(1); + { + Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite); + PutObject(ObjectUrl(headerObjId), zipBuffer.get(), szZipHeader); + } + + fAnchor.fHeaderObjId = headerObjId; + fAnchor.fHeaderOffset = 0; + fAnchor.fNBytesHeader = szZipHeader; + fAnchor.fLenHeader = length; +} + +ROOT::RNTupleLocator ROOT::Experimental::Internal::RPageSinkS3::CommitPageImpl(ColumnHandle_t columnHandle, + const ROOT::Internal::RPage &page) +{ + auto element = columnHandle.fColumn->GetElement(); + RPageStorage::RSealedPage sealedPage; + { + Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallZip, fCounters->fTimeCpuZip); + sealedPage = SealPage(page, *element); + } + + fCounters->fSzZip.Add(page.GetNBytes()); + return CommitSealedPageImpl(columnHandle.fPhysicalId, sealedPage); +} + +ROOT::RNTupleLocator +ROOT::Experimental::Internal::RPageSinkS3::CommitSealedPageImpl(ROOT::DescriptorId_t, + const RPageStorage::RSealedPage &sealedPage) +{ + // Mode B: one S3 object per sealed page, located by a kTypeObject64 locator + const auto pageObjId = fObjectId.fetch_add(1); + { + Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite); + PutObject(ObjectUrl(pageObjId), reinterpret_cast(sealedPage.GetBuffer()), + sealedPage.GetBufferSize()); + } + + RNTupleLocator result; + result.SetType(RNTupleLocator::kTypeObject64); + result.SetNBytesOnStorage(sealedPage.GetDataSize()); + result.SetPosition(ROOT::RNTupleLocatorObject64{pageObjId}); + fCounters->fNPageCommitted.Inc(); + fCounters->fSzWritePayload.Add(sealedPage.GetBufferSize()); + fNBytesCurrentCluster += sealedPage.GetBufferSize(); + return result; +} + +std::uint64_t ROOT::Experimental::Internal::RPageSinkS3::StageClusterImpl() +{ + return std::exchange(fNBytesCurrentCluster, 0); +} + +ROOT::RNTupleLocator +ROOT::Experimental::Internal::RPageSinkS3::CommitClusterGroupImpl(unsigned char *serializedPageList, + std::uint32_t length) +{ + auto bufPageListZip = MakeUninitArray(length); + auto szPageListZip = + RNTupleCompressor::Zip(serializedPageList, length, GetWriteOptions().GetCompression(), bufPageListZip.get()); + + const auto objId = fObjectId.fetch_add(1); + { + Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite); + PutObject(ObjectUrl(objId), bufPageListZip.get(), szPageListZip); + } + + RNTupleLocator result; + result.SetType(RNTupleLocator::kTypeObject64); + result.SetNBytesOnStorage(szPageListZip); + result.SetPosition(ROOT::RNTupleLocatorObject64{objId}); + fCounters->fSzWritePayload.Add(static_cast(szPageListZip)); + return result; +} + +ROOT::Internal::RNTupleLink +ROOT::Experimental::Internal::RPageSinkS3::CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) +{ + auto bufFooterZip = MakeUninitArray(length); + auto szFooterZip = + RNTupleCompressor::Zip(serializedFooter, length, GetWriteOptions().GetCompression(), bufFooterZip.get()); + + const auto footerObjId = fObjectId.fetch_add(1); + { + Detail::RNTupleAtomicTimer timer(fCounters->fTimeWallWrite, fCounters->fTimeCpuWrite); + PutObject(ObjectUrl(footerObjId), bufFooterZip.get(), szFooterZip); + } + + fAnchor.fFooterObjId = footerObjId; + fAnchor.fFooterOffset = 0; + fAnchor.fNBytesFooter = szFooterZip; + fAnchor.fLenFooter = length; + + // Upload the anchor LAST: once it exists at the base URL, a reader can assume the whole ntuple + // is complete. Never upload it before all other objects are in place. + const auto anchorJson = fAnchor.ToJSON(); + PutObject(fBaseUrl, reinterpret_cast(anchorJson.data()), anchorJson.size()); + + // An S3 ntuple is self-locating: its anchor always lives at the base URL, so there is no anchor + // link to hand back here (unlike the file backend, which reports where it embedded the anchor). + return {}; +} + +std::unique_ptr +ROOT::Experimental::Internal::RPageSinkS3::CloneAsHidden(std::string_view /*name*/, + const ROOT::RNTupleWriteOptions & /*opts*/) const +{ + throw ROOT::RException(R__FAIL("cloning an S3 sink is not implemented yet")); +} diff --git a/tree/ntuple/test/CMakeLists.txt b/tree/ntuple/test/CMakeLists.txt index 96b080a35f1ee..4fd5fadfed5f8 100644 --- a/tree/ntuple/test/CMakeLists.txt +++ b/tree/ntuple/test/CMakeLists.txt @@ -177,7 +177,7 @@ if(daos OR daos_mock) endif() if(curl) - ROOT_ADD_GTEST(ntuple_storage_s3 ntuple_storage_s3.cxx LIBRARIES ROOTNTuple) + ROOT_ADD_GTEST(ntuple_storage_s3 ntuple_storage_s3.cxx LIBRARIES ROOTNTuple Net) endif() diff --git a/tree/ntuple/test/ntuple_storage_s3.cxx b/tree/ntuple/test/ntuple_storage_s3.cxx index e145ef9f6eee0..88a9c32a35605 100644 --- a/tree/ntuple/test/ntuple_storage_s3.cxx +++ b/tree/ntuple/test/ntuple_storage_s3.cxx @@ -5,6 +5,17 @@ #include "ntuple_test.hxx" #include +#include + +#include "TServerSocket.h" +#include "TSocket.h" + +#include +#include +#include +#include +#include +#include using RNTupleAnchorS3 = ROOT::Experimental::Internal::RNTupleAnchorS3; @@ -349,3 +360,253 @@ TEST(RNTupleAnchorS3, MaxUint64Values) EXPECT_EQ(UINT64_MAX, parsed.fNBytesFooter); EXPECT_EQ(UINT64_MAX, parsed.fLenFooter); } + +// ==================== ParseS3Url Tests ==================== + +using ROOT::Experimental::Internal::ParseS3Url; + +TEST(RPageSinkS3, ParseS3UrlHttp) +{ + auto url = ParseS3Url("s3+http://localhost:9000/mybucket/path"); + EXPECT_EQ("http://localhost:9000/mybucket/path", url.fHttpUrl); + EXPECT_EQ("http", url.fScheme); +} + +TEST(RPageSinkS3, ParseS3UrlHttps) +{ + auto url = ParseS3Url("s3+https://s3.amazonaws.com/mybucket/path"); + EXPECT_EQ("https://s3.amazonaws.com/mybucket/path", url.fHttpUrl); + EXPECT_EQ("https", url.fScheme); +} + +TEST(RPageSinkS3, ParseS3UrlDefault) +{ + auto url = ParseS3Url("s3://mybucket/path"); + EXPECT_EQ("https://s3.amazonaws.com/mybucket/path", url.fHttpUrl); + EXPECT_EQ("https", url.fScheme); +} + +TEST(RPageSinkS3, ParseS3UrlInvalid) +{ + EXPECT_THROW(ParseS3Url("http://example.com"), ROOT::RException); + EXPECT_THROW(ParseS3Url("daos://pool/container"), ROOT::RException); + EXPECT_THROW(ParseS3Url(""), ROOT::RException); +} + +TEST(RPageSinkS3, ParseS3UrlTrailingSlash) +{ + // A trailing slash must not leak into object keys (ObjectUrl appends "/") or the anchor key. + EXPECT_EQ("http://localhost:9000/bucket/path", ParseS3Url("s3+http://localhost:9000/bucket/path/").fHttpUrl); + EXPECT_EQ("https://s3.amazonaws.com/bucket", ParseS3Url("s3://bucket/").fHttpUrl); +} + +TEST(RPageSinkS3, ParseS3UrlCaseInsensitiveScheme) +{ + // The scheme is matched case-insensitively; the host/bucket/key case is preserved verbatim. + EXPECT_EQ("http://Host:9000/MyBucket/Path", ParseS3Url("S3+HTTP://Host:9000/MyBucket/Path").fHttpUrl); + EXPECT_EQ("https://s3.amazonaws.com/MyBucket/path", ParseS3Url("S3://MyBucket/path").fHttpUrl); +} + +TEST(RPageSinkS3, ParseS3UrlAwsAndCeph) +{ + // AWS (any region, path-style or virtual-hosted) and Ceph/MinIO endpoints all work through the + // explicit s3+https:// form: the user supplies the full host, which is passed through verbatim. + EXPECT_EQ("https://s3.eu-west-1.amazonaws.com/bucket/data", // AWS path-style, regional + ParseS3Url("s3+https://s3.eu-west-1.amazonaws.com/bucket/data").fHttpUrl); + EXPECT_EQ("https://bucket.s3.eu-west-1.amazonaws.com/data", // AWS virtual-hosted style + ParseS3Url("s3+https://bucket.s3.eu-west-1.amazonaws.com/data").fHttpUrl); + EXPECT_EQ("https://s3.cern.ch/bucket/data", // Ceph RGW (CERN) + ParseS3Url("s3+https://s3.cern.ch/bucket/data").fHttpUrl); +} + +TEST(RPageSinkS3, ParseS3UrlRejectsUnsupportedComponents) +{ + EXPECT_THROW(ParseS3Url("s3+https://KEY:SECRET@host/bucket/path"), ROOT::RException); // userinfo + EXPECT_THROW(ParseS3Url("s3+http://host/bucket/path?versionId=1"), ROOT::RException); // query + EXPECT_THROW(ParseS3Url("s3+http://host/bucket/path#section"), ROOT::RException); // fragment + EXPECT_THROW(ParseS3Url("s3://"), ROOT::RException); // no bucket + EXPECT_THROW(ParseS3Url("s3+http://"), ROOT::RException); // no host +} + +// ==================== RPageSinkS3 Wire-Level Tests (mock HTTP server) ==================== + +// These tests stand up a loopback TServerSocket and point an RPageSinkS3 at it, so the exact HTTP +// PUT requests the write path emits can be inspected with no live S3 service (they always run in +// CI). The mock-server idiom mirrors net/curl/test/curl_connection.cxx. +namespace { + +/// Read one HTTP request (request line + headers + body) from an accepted socket, reply with the +/// given status (e.g. "200 OK"), and return the request-target (the path from the request line). +std::string ServeOneRequest(TSocket *sock, const char *status, std::string &headers, std::string &body) +{ + headers.clear(); + body.clear(); + + // Read up to and including the end-of-headers marker, byte by byte. + const char *eof = "\r\n\r\n"; + const std::size_t eofLen = std::strlen(eof); + std::size_t nextInEof = 0; + char c; + while (sock->RecvRaw(&c, 1) > 0) { + headers.push_back(c); + if (c == eof[nextInEof]) { + if (++nextInEof == eofLen) + break; + } else { + nextInEof = 0; + } + } + + std::string lower(headers); + std::transform(lower.begin(), lower.end(), lower.begin(), [](unsigned char ch) { return std::tolower(ch); }); + + // libcurl uploads with "Expect: 100-continue"; acknowledge before reading the body. + if (lower.find("expect: 100-continue") != std::string::npos) { + const char *cont = "HTTP/1.1 100 Continue\r\n\r\n"; + sock->SendRaw(cont, std::strlen(cont)); + } + + std::size_t contentLength = 0; + if (auto pos = lower.find("content-length: "); pos != std::string::npos) { + auto valStart = pos + std::strlen("content-length: "); + auto valEnd = lower.find("\r\n", valStart); + contentLength = std::stoul(lower.substr(valStart, valEnd - valStart)); + } + if (contentLength > 0) { + body.resize(contentLength); + sock->RecvRaw(&body[0], contentLength); + } + + const std::string response = std::string("HTTP/1.1 ") + status + "\r\nContent-Length: 0\r\n\r\n"; + sock->SendRaw(response.data(), response.size()); + + // The request line is "PUT /target HTTP/1.1"; return the middle token. + std::string target; + if (auto sp1 = headers.find(' '); sp1 != std::string::npos) { + if (auto sp2 = headers.find(' ', sp1 + 1); sp2 != std::string::npos) + target = headers.substr(sp1 + 1, sp2 - sp1 - 1); + } + return target; +} + +} // anonymous namespace + +TEST(RPageSinkS3Wire, WriteIssuesExpectedPuts) +{ + TServerSocket server(0, false, TServerSocket::kDefaultBacklog, -1, ESocketBindOption::kInaddrLoopback); + const std::string host = server.GetLocalInetAddress().GetHostAddress(); + const std::string basePath = "/wirebucket/wiretest"; + const std::string uri = "s3+http://" + host + ":" + std::to_string(server.GetLocalPort()) + basePath; + + // Dummy credentials so curl signs every PUT (SigV4 Authorization header). Deliberately leave + // S3_ENDPOINT/S3_BUCKET unset so the credential-gated MinIO tests above still skip. + setenv("S3_ACCESS_KEY", "dummykey", 1); + setenv("S3_SECRET_KEY", "dummysecret", 1); + setenv("S3_REGION", "us-east-1", 1); + + struct Request { + std::string fPath; + std::string fHeaders; + std::string fBody; + }; + std::vector requests; + + // The connection-per-PUT sink opens one connection per object; serve them on a background thread + // until the anchor (the request whose target is exactly the base path) arrives last. + std::thread serverThread([&] { + for (;;) { + TSocket *sock = server.Accept(); + if (!sock || sock == reinterpret_cast(-1)) + break; + Request req; + req.fPath = ServeOneRequest(sock, "200 OK", req.fHeaders, req.fBody); + sock->Close(); + requests.push_back(std::move(req)); + if (requests.back().fPath == basePath) + break; + } + }); + + { + // The sink ctor emits a one-time (std::call_once) experimental warning; allow it. It is + // optional because it only fires on the first sink construction in the whole process. + ROOT::TestSupport::CheckDiagsRAII diags; + diags.optionalDiag(kWarning, "[ROOT.NTuple]", "experimental", /*matchFullMessage=*/false); + + auto model = ROOT::RNTupleModel::Create(); + auto fldValue = model->MakeField("value"); + auto writer = ROOT::RNTupleWriter::Recreate(std::move(model), "wire", uri); + for (int i = 0; i < 20; ++i) { + *fldValue = i; + writer->Fill(); + } + } // writer destroyed here -> footer + anchor PUTs + + serverThread.join(); + + unsetenv("S3_ACCESS_KEY"); + unsetenv("S3_SECRET_KEY"); + unsetenv("S3_REGION"); + + // At minimum: header, one page, page list, footer, anchor. + ASSERT_GE(requests.size(), 5u); + + for (const auto &req : requests) { + // Every object is uploaded with a SigV4-signed HTTP PUT. + EXPECT_EQ(0u, req.fHeaders.find("PUT ")) << req.fHeaders.substr(0, 32); + std::string lower(req.fHeaders); + std::transform(lower.begin(), lower.end(), lower.begin(), [](unsigned char ch) { return std::tolower(ch); }); + EXPECT_NE(std::string::npos, lower.find("authorization: aws4-hmac-sha256")) + << "no SigV4 Authorization header on " << req.fPath; + } + + // Object 0 is the header, written first. + EXPECT_EQ(basePath + "/0", requests.front().fPath); + // Every request but the last targets a data object at /; the anchor is last, at . + for (std::size_t i = 0; i + 1 < requests.size(); ++i) + EXPECT_EQ(0u, requests[i].fPath.rfind(basePath + "/", 0)) << "unexpected object key " << requests[i].fPath; + EXPECT_EQ(basePath, requests.back().fPath); + // The anchor body is the JSON document the reader bootstraps from. + EXPECT_NE(std::string::npos, requests.back().fBody.find("\"footerObjId\"")); + EXPECT_NE(std::string::npos, requests.back().fBody.find("\"urlTemplate\"")); +} + +TEST(RPageSinkS3Wire, PutErrorThrows) +{ + TServerSocket server(0, false, TServerSocket::kDefaultBacklog, -1, ESocketBindOption::kInaddrLoopback); + const std::string host = server.GetLocalInetAddress().GetHostAddress(); + const std::string uri = "s3+http://" + host + ":" + std::to_string(server.GetLocalPort()) + "/wirebucket/wireerr"; + + setenv("S3_ACCESS_KEY", "dummykey", 1); + setenv("S3_SECRET_KEY", "dummysecret", 1); + setenv("S3_REGION", "us-east-1", 1); + + // Reject the first upload (the header, written during writer construction) with 403. + std::thread serverThread([&] { + TSocket *sock = server.Accept(); + if (sock && sock != reinterpret_cast(-1)) { + std::string headers, body; + ServeOneRequest(sock, "403 Forbidden", headers, body); + sock->Close(); + } + }); + + // Allow the one-time experimental warning the sink ctor may emit (optional; see above). + ROOT::TestSupport::CheckDiagsRAII diags; + diags.optionalDiag(kWarning, "[ROOT.NTuple]", "experimental", /*matchFullMessage=*/false); + + // The header PUT fails, so RPageSinkS3::PutObject throws out of writer construction. + EXPECT_THROW( + { + auto model = ROOT::RNTupleModel::Create(); + model->MakeField("value"); + auto writer = ROOT::RNTupleWriter::Recreate(std::move(model), "wire", uri); + }, + ROOT::RException); + + serverThread.join(); + + unsetenv("S3_ACCESS_KEY"); + unsetenv("S3_SECRET_KEY"); + unsetenv("S3_REGION"); +}