diff --git a/include/datadog/environment.h b/include/datadog/environment.h index f2846b37..06b416fc 100644 --- a/include/datadog/environment.h +++ b/include/datadog/environment.h @@ -69,6 +69,7 @@ namespace environment { MACRO(DD_VERSION, STRING, "") \ MACRO(DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED, BOOLEAN, true) \ MACRO(DD_TELEMETRY_HEARTBEAT_INTERVAL, DECIMAL, 10) \ + MACRO(DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL, DECIMAL, 86400.0) \ MACRO(DD_TELEMETRY_METRICS_ENABLED, BOOLEAN, true) \ MACRO(DD_TELEMETRY_METRICS_INTERVAL_SECONDS, DECIMAL, 60) \ MACRO(DD_TELEMETRY_DEBUG, BOOLEAN, false) \ @@ -95,7 +96,7 @@ enum Variable { DD_LIST_ENVIRONMENT_VARIABLES(WITH_COMMA) }; #define QUOTED_WITH_COMMA(ARG, TYPE, DEFAULT_VALUE) \ WITH_COMMA(QUOTED(ARG), TYPE, DEFAULT_VALUE) -inline const char *const variable_names[] = { +inline const char* const variable_names[] = { DD_LIST_ENVIRONMENT_VARIABLES(QUOTED_WITH_COMMA)}; #undef QUOTED diff --git a/include/datadog/telemetry/configuration.h b/include/datadog/telemetry/configuration.h index 51693f26..7ab83e84 100644 --- a/include/datadog/telemetry/configuration.h +++ b/include/datadog/telemetry/configuration.h @@ -29,6 +29,10 @@ struct Configuration { // Interval at which the heartbeat payload will be sent. // Can be overriden by `DD_TELEMETRY_HEARTBEAT_INTERVAL` environment variable. tracing::Optional heartbeat_interval_seconds; + // Interval at which the extended heartbeat payload will be sent. + // Can be overriden by `DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL` environment + // variable. Default: 86400 seconds (24 hours). + tracing::Optional extended_heartbeat_interval_seconds; // `integration_name` is the name of the product integrating this library. // Example: "nginx", "envoy" or "istio". tracing::Optional integration_name; @@ -52,6 +56,7 @@ struct FinalizedConfiguration { bool report_logs; std::chrono::steady_clock::duration metrics_interval; std::chrono::steady_clock::duration heartbeat_interval; + std::chrono::steady_clock::duration extended_heartbeat_interval; std::string integration_name; std::string integration_version; std::vector products; diff --git a/src/datadog/telemetry/configuration.cpp b/src/datadog/telemetry/configuration.cpp index cc8d2e85..0ad2dcef 100644 --- a/src/datadog/telemetry/configuration.cpp +++ b/src/datadog/telemetry/configuration.cpp @@ -48,6 +48,15 @@ tracing::Expected load_telemetry_env_config() { env_cfg.heartbeat_interval_seconds = *maybe_value; } + if (auto extended_heartbeat_interval_seconds = + lookup(environment::DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL)) { + auto maybe_value = parse_double(*extended_heartbeat_interval_seconds); + if (auto error = maybe_value.if_error()) { + return *error; + } + env_cfg.extended_heartbeat_interval_seconds = *maybe_value; + } + return env_cfg; } @@ -112,6 +121,19 @@ tracing::Expected finalize_config( std::chrono::duration_cast( std::chrono::duration(heartbeat_interval.second)); + // extended_heartbeat_interval_seconds + auto extended_heartbeat_interval = + pick(env_config->extended_heartbeat_interval_seconds, + user_config.extended_heartbeat_interval_seconds, 86400.); + if (extended_heartbeat_interval.second <= 0) { + return Error{ + Error::Code::OUT_OF_RANGE_INTEGER, + "Telemetry extended heartbeat interval must be a positive value"}; + } + result.extended_heartbeat_interval = + std::chrono::duration_cast( + std::chrono::duration(extended_heartbeat_interval.second)); + // integration_name std::tie(origin, result.integration_name) = pick(env_config->integration_name, user_config.integration_name, diff --git a/src/datadog/telemetry/telemetry_impl.cpp b/src/datadog/telemetry/telemetry_impl.cpp index d9464dd7..4617fafd 100644 --- a/src/datadog/telemetry/telemetry_impl.cpp +++ b/src/datadog/telemetry/telemetry_impl.cpp @@ -227,6 +227,11 @@ void Telemetry::schedule_tasks() { tasks_.emplace_back(scheduler_->schedule_recurring_event( config_.metrics_interval, [this]() mutable { capture_metrics(); })); } + + tasks_.emplace_back(scheduler_->schedule_recurring_event( + config_.extended_heartbeat_interval, [this]() { + send_payload("app-extended-heartbeat", extended_heartbeat_payload()); + })); } Telemetry::~Telemetry() { @@ -251,6 +256,7 @@ Telemetry::Telemetry(Telemetry&& rhs) distributions_(std::move(rhs.distributions_)), seq_id_(rhs.seq_id_), config_seq_ids_(rhs.config_seq_ids_), + all_configurations_(rhs.all_configurations_), host_info_(rhs.host_info_) { cancel_tasks(rhs.tasks_); schedule_tasks(); @@ -274,6 +280,7 @@ Telemetry& Telemetry::operator=(Telemetry&& rhs) { std::swap(distributions_, rhs.distributions_); std::swap(seq_id_, rhs.seq_id_); std::swap(config_seq_ids_, rhs.config_seq_ids_); + std::swap(all_configurations_, rhs.all_configurations_); std::swap(host_info_, rhs.host_info_); schedule_tasks(); } @@ -678,6 +685,24 @@ std::string Telemetry::app_started_payload() { return batch.dump(); } +std::string Telemetry::extended_heartbeat_payload() { + auto configuration_json = nlohmann::json::array(); + + for (const auto& [name, config_metadata] : all_configurations_) { + configuration_json.emplace_back( + serialize_configuration_field(config_metadata, config_seq_ids_[name])); + } + + auto extended_hb_msg = nlohmann::json{ + {"request_type", "app-extended-heartbeat"}, + {"payload", nlohmann::json{{"configuration", configuration_json}}}, + }; + + auto batch = generate_telemetry_body("message-batch"); + batch["payload"] = nlohmann::json::array({std::move(extended_hb_msg)}); + return batch.dump(); +} + nlohmann::json Telemetry::generate_telemetry_body(std::string request_type) { std::time_t tracer_time = std::chrono::duration_cast( clock_().wall.time_since_epoch()) @@ -711,13 +736,8 @@ nlohmann::json Telemetry::generate_telemetry_body(std::string request_type) { }); } -nlohmann::json Telemetry::generate_configuration_field( - const ConfigMetadata& config_metadata) { - // NOTE(@dmehala): `seq_id` should start at 1 so that the go backend can - // detect between non set fields. - config_seq_ids_[config_metadata.name] += 1; - auto seq_id = config_seq_ids_[config_metadata.name]; - +nlohmann::json Telemetry::serialize_configuration_field( + const ConfigMetadata& config_metadata, std::size_t seq_id) { auto j = nlohmann::json{{"name", to_string(config_metadata.name)}, {"value", config_metadata.value}, {"seq_id", seq_id}}; @@ -749,6 +769,16 @@ nlohmann::json Telemetry::generate_configuration_field( return j; } +nlohmann::json Telemetry::generate_configuration_field( + const ConfigMetadata& config_metadata) { + // NOTE(@dmehala): `seq_id` should start at 1 so that the go backend can + // detect between non set fields. + config_seq_ids_[config_metadata.name] += 1; + all_configurations_[config_metadata.name] = config_metadata; + return serialize_configuration_field(config_metadata, + config_seq_ids_[config_metadata.name]); +} + void Telemetry::capture_configuration_change( const std::vector& new_configuration) { configuration_snapshot_.insert(configuration_snapshot_.begin(), diff --git a/src/datadog/telemetry/telemetry_impl.h b/src/datadog/telemetry/telemetry_impl.h index 7c92db3b..d916846c 100644 --- a/src/datadog/telemetry/telemetry_impl.h +++ b/src/datadog/telemetry/telemetry_impl.h @@ -66,6 +66,9 @@ class Telemetry final { uint64_t seq_id_ = 0; // Track sequence id per configuration field std::unordered_map config_seq_ids_; + // Track the latest reported value for each configuration field + std::unordered_map + all_configurations_; tracing::HostInfo host_info_; @@ -143,6 +146,8 @@ class Telemetry final { tracing::Optional stacktrace = tracing::nullopt); nlohmann::json generate_telemetry_body(std::string request_type); + nlohmann::json serialize_configuration_field( + const tracing::ConfigMetadata& config_metadata, std::size_t seq_id); nlohmann::json generate_configuration_field( const tracing::ConfigMetadata& config_metadata); @@ -152,6 +157,9 @@ class Telemetry final { // Constructs a messsage-batch containing `app-heartbeat`, and if metrics // have been modified, a `generate-metrics` message. std::string heartbeat_and_telemetry(); + // Constructs a message-batch containing `app-extended-heartbeat` with the + // full configuration payload. + std::string extended_heartbeat_payload(); // Constructs a message-batch containing `app-closing`, and if metrics have // been modified, a `generate-metrics` message. std::string app_closing_payload(); diff --git a/supported-configurations.json b/supported-configurations.json index 4ee3558a..6f5e3d79 100644 --- a/supported-configurations.json +++ b/supported-configurations.json @@ -119,6 +119,13 @@ "type": "BOOLEAN" } ], + "DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL": [ + { + "default": "86400", + "implementation": "A", + "type": "DECIMAL" + } + ], "DD_TELEMETRY_HEARTBEAT_INTERVAL": [ { "default": "10", diff --git a/test/telemetry/test_configuration.cpp b/test/telemetry/test_configuration.cpp index 24373e38..8b242e67 100644 --- a/test/telemetry/test_configuration.cpp +++ b/test/telemetry/test_configuration.cpp @@ -21,6 +21,7 @@ TELEMETRY_CONFIGURATION_TEST("defaults") { CHECK(cfg->report_metrics == true); CHECK(cfg->metrics_interval == 60s); CHECK(cfg->heartbeat_interval == 10s); + CHECK(cfg->extended_heartbeat_interval == 86400s); CHECK(cfg->install_id.has_value() == false); CHECK(cfg->install_type.has_value() == false); CHECK(cfg->install_time.has_value() == false); @@ -33,6 +34,7 @@ TELEMETRY_CONFIGURATION_TEST("code override") { cfg.report_metrics = false; cfg.metrics_interval_seconds = 1; cfg.heartbeat_interval_seconds = 2; + cfg.extended_heartbeat_interval_seconds = 3600; cfg.integration_name = "test"; cfg.integration_version = "2024.10.28"; @@ -44,6 +46,7 @@ TELEMETRY_CONFIGURATION_TEST("code override") { CHECK(final_cfg->report_metrics == false); CHECK(final_cfg->metrics_interval == 1s); CHECK(final_cfg->heartbeat_interval == 2s); + CHECK(final_cfg->extended_heartbeat_interval == 3600s); CHECK(final_cfg->integration_name == "test"); CHECK(final_cfg->integration_version == "2024.10.28"); } @@ -113,6 +116,14 @@ TELEMETRY_CONFIGURATION_TEST("environment environment override") { REQUIRE(final_cfg); CHECK(final_cfg->heartbeat_interval == 42s); } + + SECTION("Override extended heartbeat interval") { + cfg.extended_heartbeat_interval_seconds = 99999; + ddtest::EnvGuard env("DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL", "120"); + auto final_cfg = telemetry::finalize_config(cfg); + REQUIRE(final_cfg); + CHECK(final_cfg->extended_heartbeat_interval == 120s); + } } TELEMETRY_CONFIGURATION_TEST("validation") { @@ -147,6 +158,22 @@ TELEMETRY_CONFIGURATION_TEST("validation") { REQUIRE(!final_cfg); } } + + SECTION("extended heartbeat interval validation") { + SECTION("code override") { + telemetry::Configuration cfg; + cfg.extended_heartbeat_interval_seconds = -100; + + auto final_cfg = telemetry::finalize_config(cfg); + REQUIRE(!final_cfg); + } + + SECTION("environment variable override") { + ddtest::EnvGuard env("DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL", "-1"); + auto final_cfg = telemetry::finalize_config(); + REQUIRE(!final_cfg); + } + } } TELEMETRY_CONFIGURATION_TEST("installation infos are used when available") { diff --git a/test/telemetry/test_telemetry.cpp b/test/telemetry/test_telemetry.cpp index 83590773..e05c867c 100644 --- a/test/telemetry/test_telemetry.cpp +++ b/test/telemetry/test_telemetry.cpp @@ -46,19 +46,27 @@ struct FakeEventScheduler : public EventScheduler { size_t count_tasks = 0; std::function heartbeat_callback = nullptr; std::function metrics_callback = nullptr; + std::function extended_heartbeat_callback = nullptr; Optional heartbeat_interval; Optional metrics_interval; + Optional extended_heartbeat_interval; bool cancelled = false; // NOTE: White box testing. This is a limitation of the event scheduler API. + // Tasks are registered in order: heartbeat (0), metrics (1, if enabled), + // extended heartbeat (last). Cancel schedule_recurring_event(std::chrono::steady_clock::duration interval, std::function callback) override { if (count_tasks == 0) { heartbeat_callback = callback; heartbeat_interval = interval; - } else if (count_tasks == 1) { + } else if (interval <= std::chrono::minutes(1)) { + // Metrics interval is <= 60s; extended heartbeat is much larger. metrics_callback = callback; metrics_interval = interval; + } else { + extended_heartbeat_callback = callback; + extended_heartbeat_interval = interval; } count_tasks++; return [this]() { cancelled = true; }; @@ -74,6 +82,11 @@ struct FakeEventScheduler : public EventScheduler { metrics_callback(); } + void trigger_extended_heartbeat() { + assert(extended_heartbeat_callback != nullptr); + extended_heartbeat_callback(); + } + std::string config() const override { return nlohmann::json::object({{"type", "FakeEventScheduler"}}).dump(); } @@ -391,6 +404,105 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { REQUIRE(find_payload(message_batch["payload"], "app-heartbeat")); } + SECTION("generates an extended heartbeat with configuration") { + client->clear(); + + Product product; + product.name = Product::Name::tracing; + product.enabled = true; + product.version = tracer_version; + product.configurations = + std::unordered_map>{ + {ConfigName::SERVICE_NAME, + {ConfigMetadata(ConfigName::SERVICE_NAME, "my-service", + ConfigMetadata::Origin::CODE)}}, + }; + + Configuration cfg; + cfg.products.emplace_back(std::move(product)); + + auto scheduler2 = std::make_shared(); + Telemetry telemetry2{*finalize_config(cfg), + tracer_signature, + logger, + client, + scheduler2, + *url}; + + client->clear(); + scheduler2->trigger_extended_heartbeat(); + + auto message_batch = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch)); + + auto ext_hb = + find_payload(message_batch["payload"], "app-extended-heartbeat"); + REQUIRE(ext_hb.has_value()); + + auto& config_payload = (*ext_hb)["payload"]["configuration"]; + REQUIRE(config_payload.is_array()); + REQUIRE(config_payload.size() == 1); + CHECK(config_payload[0]["name"] == "service"); + CHECK(config_payload[0]["value"] == "my-service"); + CHECK(config_payload[0]["origin"] == "code"); + } + + SECTION( + "extended heartbeat reflects runtime configuration changes (remote " + "config)") { + client->clear(); + + Product product; + product.name = Product::Name::tracing; + product.enabled = true; + product.version = tracer_version; + product.configurations = + std::unordered_map>{ + {ConfigName::SERVICE_NAME, + {ConfigMetadata(ConfigName::SERVICE_NAME, "my-service", + ConfigMetadata::Origin::CODE)}}, + }; + + Configuration cfg; + cfg.products.emplace_back(std::move(product)); + + auto scheduler2 = std::make_shared(); + Telemetry telemetry2{*finalize_config(cfg), + tracer_signature, + logger, + client, + scheduler2, + *url}; + + // Simulate a remote config update overriding SERVICE_NAME + telemetry2.capture_configuration_change( + {{ConfigName::SERVICE_NAME, "rc-service", + ConfigMetadata::Origin::REMOTE_CONFIG}}); + telemetry2.send_configuration_change(); + + client->clear(); + scheduler2->trigger_extended_heartbeat(); + + auto payload = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(payload)); + + auto ext_hb = find_payload(payload["payload"], "app-extended-heartbeat"); + REQUIRE(ext_hb.has_value()); + + auto& configuration = (*ext_hb)["payload"]["configuration"]; + REQUIRE(configuration.is_array()); + + bool found = false; + for (const auto& entry : configuration) { + if (entry["name"] == "service") { + found = true; + CHECK(entry["value"] == "rc-service"); + CHECK(entry["origin"] == "remote_config"); + } + } + CHECK(found); + } + SECTION("metrics reporting") { SECTION("counters are correctly serialized in generate-metrics payload") { client->clear();