From eaec8e1342fe69e003328f23a9868cb0b8f2ea32 Mon Sep 17 00:00:00 2001 From: Jakub Jirutka Date: Fri, 15 Nov 2024 12:52:51 +0100 Subject: [PATCH] JSON output: remove Kafka output To avoid dependency on Kafka. There's a special version of this plugin named json-kafka that can be used instead and we can split it into a subpackage. --- src/plugins/output/json/CMakeLists.txt | 5 -- src/plugins/output/json/src/Config.cpp | 100 +------------------------ src/plugins/output/json/src/json.cpp | 5 -- 3 files changed, 2 insertions(+), 108 deletions(-) diff --git a/src/plugins/output/json/CMakeLists.txt b/src/plugins/output/json/CMakeLists.txt index 9bedf99..fe4b12c 100644 --- a/src/plugins/output/json/CMakeLists.txt +++ b/src/plugins/output/json/CMakeLists.txt @@ -9,8 +9,6 @@ add_library(json-output MODULE src/Printer.hpp src/File.cpp src/File.hpp - src/Kafka.cpp - src/Kafka.hpp src/Server.cpp src/Server.hpp src/Sender.cpp @@ -21,16 +19,13 @@ add_library(json-output MODULE src/SyslogSocket.hpp ) -find_package(LibRDKafka 0.9.3 REQUIRED) find_package(ZLIB REQUIRED) include_directories( ${ZLIB_INCLUDE_DIRS} # zlib - ${LIBRDKAFKA_INCLUDE_DIRS} # librdkafka ) target_link_libraries(json-output ${ZLIB_LIBRARIES} - ${LIBRDKAFKA_LIBRARIES} ) install( diff --git a/src/plugins/output/json/src/Config.cpp b/src/plugins/output/json/src/Config.cpp index 04434f3..7d1de83 100644 --- a/src/plugins/output/json/src/Config.cpp +++ b/src/plugins/output/json/src/Config.cpp @@ -49,7 +49,6 @@ #include #include #include -#include #include "Config.hpp" @@ -512,29 +511,7 @@ Config::parse_file(fds_xml_ctx_t *file) void Config::parse_kafka_property(struct cfg_kafka &kafka, fds_xml_ctx_t *property) { - std::string key, value; - - const struct fds_xml_cont *content; - while (fds_xml_next(property, &content) != FDS_EOC) { - switch (content->id) { - case KAFKA_PROP_KEY: - assert(content->type == FDS_OPTS_T_STRING); - key = content->ptr_string; - break; - case KAFKA_PROP_VALUE: - assert(content->type == FDS_OPTS_T_STRING); - value = content->ptr_string; - break; - default: - throw std::invalid_argument("Unexpected element within !"); - } - } - - if (key.empty()) { - throw std::invalid_argument("Property key of a output cannot be empty!"); - } - - kafka.properties.emplace(key, value); + throw std::invalid_argument("Use \"json-kafka\" plugin for Kafka outputs!"); } /** @@ -547,80 +524,7 @@ Config::parse_kafka_property(struct cfg_kafka &kafka, fds_xml_ctx_t *property) void Config::parse_kafka(fds_xml_ctx_t *kafka) { - // Prepare default values - struct cfg_kafka output; - output.partition = RD_KAFKA_PARTITION_UA; - output.blocking = false; - output.perf_tuning = true; - - // For partition parser - int32_t value; - char aux; - - const struct fds_xml_cont *content; - while (fds_xml_next(kafka, &content) != FDS_EOC) { - switch (content->id) { - case KAFKA_NAME: - assert(content->type == FDS_OPTS_T_STRING); - output.name = content->ptr_string; - break; - case KAFKA_BROKERS: - assert(content->type == FDS_OPTS_T_STRING); - output.brokers = content->ptr_string; - break; - case KAFKA_TOPIC: - assert(content->type == FDS_OPTS_T_STRING); - output.topic = content->ptr_string; - break; - case KAFKA_PARTION: - assert(content->type == FDS_OPTS_T_STRING); - if (strcasecmp(content->ptr_string, "unassigned") == 0) { - output.partition = RD_KAFKA_PARTITION_UA; - break; - } - - if (sscanf(content->ptr_string, "%" SCNi32 "%c", &value, &aux) != 1 || value < 0) { - throw std::invalid_argument("Invalid partition number of a output!"); - } - output.partition = value; - break; - case KAFKA_BVERSION: - assert(content->type == FDS_OPTS_T_STRING); - output.broker_fallback = content->ptr_string; - break; - case KAFKA_BLOCKING: - assert(content->type == FDS_OPTS_T_BOOL); - output.blocking = content->val_bool; - break; - case KAFKA_PERF_TUN: - assert(content->type == FDS_OPTS_T_BOOL); - output.perf_tuning = content->val_bool; - break; - case KAFKA_PROPERTY: - assert(content->type == FDS_OPTS_T_CONTEXT); - parse_kafka_property(output, content->ptr_ctx); - break; - default: - throw std::invalid_argument("Unexpected element within !"); - } - } - - // Check validity - if (output.brokers.empty()) { - throw std::invalid_argument("List of brokers must be specified!"); - } - if (output.topic.empty()) { - throw std::invalid_argument("Topic of output must be specified!"); - } - if (!output.broker_fallback.empty()) { - // Try to check if version string is valid version (at least expect major + minor version) - int version[4]; - if (parse_version(output.broker_fallback, version) != IPX_OK) { - throw std::invalid_argument("Broker version of a output is not invalid!"); - } - } - - outputs.kafkas.push_back(output); + throw std::invalid_argument("Use \"json-kafka\" plugin for Kafka outputs!"); } std::unique_ptr diff --git a/src/plugins/output/json/src/json.cpp b/src/plugins/output/json/src/json.cpp index 26275b5..c4cb1b8 100644 --- a/src/plugins/output/json/src/json.cpp +++ b/src/plugins/output/json/src/json.cpp @@ -49,7 +49,6 @@ #include "File.hpp" #include "Server.hpp" #include "Sender.hpp" -#include "Kafka.hpp" #include "Syslog.hpp" /** Plugin description */ @@ -104,10 +103,6 @@ outputs_initialize(ipx_ctx_t *ctx, Storage *storage, Config *cfg) storage->output_add(new Sender(send, ctx)); } - for (const auto &kafka : cfg->outputs.kafkas) { - storage->output_add(new Kafka(kafka, ctx)); - } - for (auto &syslog : cfg->outputs.syslogs) { storage->output_add(new Syslog(syslog, ctx)); } -- 2.46.2