Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <optional>
#include <string>
#include <unordered_map>
#include <unordered_set>

#include <rclcpp/serialized_message.hpp>

Expand Down Expand Up @@ -93,6 +94,7 @@ class Ros2TopicDataProvider final : public TopicDataProvider {
std::size_t graph_events_received;
std::size_t concurrent_cold_waits;
std::size_t cold_wait_cap;
std::size_t unsupported_type_count;
};

Ros2TopicDataProvider(std::shared_ptr<ros2_common::Ros2SubscriptionExecutor> exec,
Expand Down Expand Up @@ -166,6 +168,14 @@ class Ros2TopicDataProvider final : public TopicDataProvider {
std::shared_ptr<ros2_common::Ros2SubscriptionExecutor> exec_;
std::shared_ptr<ros2_medkit_serialization::JsonSerializer> serializer_;

// Message types whose package is not installed: warned once, then skipped on
// subsequent samples instead of re-attempting deserialize per message.
// Bounded by kMaxUnsupportedTypes so a stack advertising many distinct
// unknown types cannot grow the cache without limit.
static constexpr std::size_t kMaxUnsupportedTypes = 4096;
mutable std::mutex unsupported_types_mtx_;
std::unordered_set<std::string> unsupported_types_;

std::atomic<bool> shutdown_{false};

// Shared alive flag for graph callback: registered with the executor so the
Expand Down
28 changes: 26 additions & 2 deletions src/ros2_medkit_gateway/src/data/ros2_topic_data_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,15 +486,34 @@ tl::expected<TopicSampleResult, ErrorInfo> Ros2TopicDataProvider::sample(const s
result.has_data = false;
return result;
}
{
std::lock_guard<std::mutex> lk(unsupported_types_mtx_);
if (unsupported_types_.count(type_name) != 0) {
result.has_data = false;
return result;
}
}
try {
result.data = serializer_->deserialize(type_name, serialized_copy);
result.has_data = true;
if (msg_ns != 0) {
result.timestamp_ns = msg_ns;
}
} catch (const ros2_medkit_serialization::TypeNotFoundError & e) {
RCLCPP_WARN(exec_->node()->get_logger(), "Unknown type '%s' for topic '%s': %s", type_name.c_str(), topic.c_str(),
e.what());
bool first_seen = false;
{
std::lock_guard<std::mutex> lk(unsupported_types_mtx_);
// Cap the cache: past the bound stop tracking (and stop warning) rather
// than grow memory without limit on a graph with many unknown types.
if (unsupported_types_.size() < kMaxUnsupportedTypes) {
first_seen = unsupported_types_.insert(type_name).second;
}
}
if (first_seen) {
RCLCPP_WARN(exec_->node()->get_logger(),
"Unknown type '%s' for topic '%s': %s (skipping further samples of this type)", type_name.c_str(),
topic.c_str(), e.what());
}
Comment thread
mfaferek93 marked this conversation as resolved.
result.has_data = false;
Comment thread
mfaferek93 marked this conversation as resolved.
} catch (const ros2_medkit_serialization::SerializationError & e) {
RCLCPP_WARN(exec_->node()->get_logger(), "Deserialize failed on '%s': %s", topic.c_str(), e.what());
Expand Down Expand Up @@ -933,6 +952,7 @@ nlohmann::json Ros2TopicDataProvider::x_medkit_stats() const {
{"graph_events_received", p.graph_events_received},
{"concurrent_cold_waits", p.concurrent_cold_waits},
{"cold_wait_cap", p.cold_wait_cap},
{"unsupported_type_count", p.unsupported_type_count},
};
if (exec_) {
const auto e = exec_->stats();
Expand Down Expand Up @@ -968,6 +988,10 @@ Ros2TopicDataProvider::PoolStats Ros2TopicDataProvider::stats() const {
s.graph_events_received = graph_events_received_.load();
s.concurrent_cold_waits = concurrent_cold_waits_.load();
s.cold_wait_cap = cfg_.cold_wait_cap;
{
std::lock_guard<std::mutex> lk(unsupported_types_mtx_);
s.unsupported_type_count = unsupported_types_.size();
}
return s;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ TEST_F(Ros2TopicDataProviderTest, ConstructedProviderHasEmptyStats) {
EXPECT_EQ(s.pool_hits, 0u);
EXPECT_EQ(s.pool_misses, 0u);
EXPECT_GT(s.pool_cap, 0u);
EXPECT_EQ(s.unsupported_type_count, 0u);
}

// @verifies REQ_INTEROP_018
Expand Down
Loading