From cd584e0357b82ec5cad12156fd7a5b65f545a0d0 Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Fri, 27 Oct 2023 15:37:34 +0100 Subject: [PATCH] ZSTD and GZIP/DEFLATE streaming support (#16268) * move compression header to compression.h * prototype with zstd compression * updated capabilities * no need for resetting compression * left-over reset function * use ZSTD_compressStream() instead of ZSTD_compressStream2() for backwards compatibility * remove call to LZ4_decoderRingBufferSize() * debug signature failures * fix the buffers of lz4 * fix decoding of zstd * detect compression based on initialization; prefer ZSTD over LZ4 * allow both lz4 and zstd * initialize zstd streams * define missing ZSTD_CLEVEL_DEFAULT * log zero compressed size * debug log * flush compression buffer * add sender compression statistics * removed debugging messages * do not fail if zstd is not available * cleanup and buildinfo * fix max message size, use zstd level 1, add compressio ratio reporting * use compression level 1 * fix ratio title * better compression error logs * for backwards compatibility use buffers of COMPRESSION_MAX_CHUNK * switch to default compression level * additional streaming error conditions detection * do not expose compression stats when compression is not enabled * test for the right lz4 functions * moved lz4 and zstd to their own files * add gzip streaming compression * gzip error handling * added unittest for streaming compression * eliminate a copy of the uncompressed data during zstd compression * eliminate not needed zstd allocations * cleanup * decode gzip with Z_SYNC_FLUSH * set the decoding gzip algorithm * user configuration for compression levels and compression algorithms order * fix exclusion of not preferred compressions * remove now obsolete compression define, since gzip is always available * rename compression algorithms order in stream.conf * move common checks in compression.c * cleanup * backwards compatible error checking --- Makefile.am | 8 + configure.ac | 19 +- daemon/buildinfo.c | 50 ++++- daemon/main.c | 7 + database/rrdhost.c | 15 +- libnetdata/libnetdata.h | 11 +- streaming/compression.c | 403 ++++++++++++++++++++++------------- streaming/compression.h | 171 +++++++++++++++ streaming/compression_gzip.c | 158 ++++++++++++++ streaming/compression_gzip.h | 15 ++ streaming/compression_lz4.c | 143 +++++++++++++ streaming/compression_lz4.h | 19 ++ streaming/compression_zstd.c | 163 ++++++++++++++ streaming/compression_zstd.h | 19 ++ streaming/receiver.c | 122 ++++++++--- streaming/rrdpush.c | 193 +++++++++++------ streaming/rrdpush.h | 148 ++++--------- streaming/sender.c | 100 ++++++--- streaming/stream.conf | 7 +- web/api/web_api_v1.c | 6 +- 20 files changed, 1358 insertions(+), 419 deletions(-) create mode 100644 streaming/compression.h create mode 100644 streaming/compression_gzip.c create mode 100644 streaming/compression_gzip.h create mode 100644 streaming/compression_lz4.c create mode 100644 streaming/compression_lz4.h create mode 100644 streaming/compression_zstd.c create mode 100644 streaming/compression_zstd.h diff --git a/Makefile.am b/Makefile.am index 405fd862a3..3996c91097 100644 --- a/Makefile.am +++ b/Makefile.am @@ -666,6 +666,13 @@ API_PLUGIN_FILES = \ STREAMING_PLUGIN_FILES = \ streaming/rrdpush.c \ streaming/compression.c \ + streaming/compression.h \ + streaming/compression_gzip.c \ + streaming/compression_gzip.h \ + streaming/compression_lz4.c \ + streaming/compression_lz4.h \ + streaming/compression_zstd.c \ + streaming/compression_zstd.h \ streaming/sender.c \ streaming/receiver.c \ streaming/replication.h \ @@ -1143,6 +1150,7 @@ NETDATA_COMMON_LIBS = \ $(OPTIONAL_MQTT_LIBS) \ $(OPTIONAL_UV_LIBS) \ $(OPTIONAL_LZ4_LIBS) \ + $(OPTIONAL_ZSTD_LIBS) \ $(OPTIONAL_DATACHANNEL_LIBS) \ libjudy.a \ $(OPTIONAL_SSL_LIBS) \ diff --git a/configure.ac b/configure.ac index 5db1b9f21c..8a78fbc5a4 100644 --- a/configure.ac +++ b/configure.ac @@ -555,16 +555,28 @@ OPTIONAL_UV_LIBS="${UV_LIBS}" AC_CHECK_LIB( [lz4], - [LZ4_initStream], + [LZ4_createStream], [LZ4_LIBS_FAST="-llz4"] ) AC_CHECK_LIB( [lz4], - [LZ4_compress_default], + [LZ4_compress_fast_continue], [LZ4_LIBS="-llz4"] ) +# ----------------------------------------------------------------------------- +# zstd + +AC_CHECK_LIB([zstd], [ZSTD_createCStream, ZSTD_compressStream, ZSTD_decompressStream, ZSTD_createDStream], + [LIBZSTD_FOUND=yes], + [LIBZSTD_FOUND=no]) + +if test "x$LIBZSTD_FOUND" = "xyes"; then + AC_DEFINE([ENABLE_ZSTD], [1], [libzstd usability]) + OPTIONAL_ZSTD_LIBS="-lzstd" +fi + # ----------------------------------------------------------------------------- # zlib @@ -702,7 +714,7 @@ if test "${enable_lz4}" != "no"; then AC_TRY_LINK( [ #include ], [ - LZ4_stream_t* stream = LZ4_initStream(NULL, 0); + LZ4_stream_t* stream = LZ4_createStream(); ], [ enable_lz4="yes"], [ enable_lz4="no" ] @@ -1900,6 +1912,7 @@ AC_SUBST([OPTIONAL_MATH_LIBS]) AC_SUBST([OPTIONAL_DATACHANNEL_LIBS]) AC_SUBST([OPTIONAL_UV_LIBS]) AC_SUBST([OPTIONAL_LZ4_LIBS]) +AC_SUBST([OPTIONAL_ZSTD_LIBS]) AC_SUBST([OPTIONAL_SSL_LIBS]) AC_SUBST([OPTIONAL_JSONC_LIBS]) AC_SUBST([OPTIONAL_YAML_LIBS]) diff --git a/daemon/buildinfo.c b/daemon/buildinfo.c index 4bc1e72a4e..bbe646993e 100644 --- a/daemon/buildinfo.c +++ b/daemon/buildinfo.c @@ -48,6 +48,7 @@ typedef enum __attribute__((packed)) { BIB_FEATURE_CLOUD, BIB_FEATURE_HEALTH, BIB_FEATURE_STREAMING, + BIB_FEATURE_BACKFILLING, BIB_FEATURE_REPLICATION, BIB_FEATURE_STREAMING_COMPRESSION, BIB_FEATURE_CONTEXTS, @@ -66,6 +67,7 @@ typedef enum __attribute__((packed)) { BIB_CONNECTIVITY_NATIVE_HTTPS, BIB_CONNECTIVITY_TLS_HOST_VERIFY, BIB_LIB_LZ4, + BIB_LIB_ZSTD, BIB_LIB_ZLIB, BIB_LIB_JUDY, BIB_LIB_DLIB, @@ -484,6 +486,14 @@ static struct { .json = "streaming", .value = NULL, }, + [BIB_FEATURE_BACKFILLING] = { + .category = BIC_FEATURE, + .type = BIT_BOOLEAN, + .analytics = NULL, + .print = "Back-filling (of higher database tiers)", + .json = "back-filling", + .value = NULL, + }, [BIB_FEATURE_REPLICATION] = { .category = BIC_FEATURE, .type = BIT_BOOLEAN, @@ -498,7 +508,7 @@ static struct { .analytics = "Stream Compression", .print = "Streaming and Replication Compression", .json = "stream-compression", - .value = "none", + .value = NULL, }, [BIB_FEATURE_CONTEXTS] = { .category = BIC_FEATURE, @@ -628,6 +638,14 @@ static struct { .json = "lz4", .value = NULL, }, + [BIB_LIB_ZSTD] = { + .category = BIC_LIBS, + .type = BIT_BOOLEAN, + .analytics = NULL, + .print = "ZSTD (fast, lossless compression algorithm)", + .json = "zstd", + .value = NULL, + }, [BIB_LIB_ZLIB] = { .category = BIC_LIBS, .type = BIT_BOOLEAN, @@ -1029,6 +1047,23 @@ static void build_info_set_value(BUILD_INFO_SLOT slot, const char *value) { BUILD_INFO[slot].value = value; } +static void build_info_append_value(BUILD_INFO_SLOT slot, const char *value) { + size_t size = BUILD_INFO[slot].value ? strlen(BUILD_INFO[slot].value) + 1 : 0; + size += strlen(value); + char buf[size + 1]; + + if(BUILD_INFO[slot].value) { + strcpy(buf, BUILD_INFO[slot].value); + strcat(buf, " "); + strcat(buf, value); + } + else + strcpy(buf, value); + + freez((void *)BUILD_INFO[slot].value); + BUILD_INFO[slot].value = strdupz(buf); +} + static void build_info_set_value_strdupz(BUILD_INFO_SLOT slot, const char *value) { if(!value) value = ""; build_info_set_value(slot, strdupz(value)); @@ -1075,14 +1110,18 @@ __attribute__((constructor)) void initialize_build_info(void) { build_info_set_status(BIB_FEATURE_HEALTH, true); build_info_set_status(BIB_FEATURE_STREAMING, true); + build_info_set_status(BIB_FEATURE_BACKFILLING, true); build_info_set_status(BIB_FEATURE_REPLICATION, true); -#ifdef ENABLE_RRDPUSH_COMPRESSION build_info_set_status(BIB_FEATURE_STREAMING_COMPRESSION, true); + +#ifdef ENABLE_ZSTD + build_info_append_value(BIB_FEATURE_STREAMING_COMPRESSION, "zstd"); +#endif #ifdef ENABLE_LZ4 - build_info_set_value(BIB_FEATURE_STREAMING_COMPRESSION, "lz4"); -#endif + build_info_append_value(BIB_FEATURE_STREAMING_COMPRESSION, "lz4"); #endif + build_info_append_value(BIB_FEATURE_STREAMING_COMPRESSION, "gzip"); build_info_set_status(BIB_FEATURE_CONTEXTS, true); build_info_set_status(BIB_FEATURE_TIERING, true); @@ -1117,6 +1156,9 @@ __attribute__((constructor)) void initialize_build_info(void) { #ifdef ENABLE_LZ4 build_info_set_status(BIB_LIB_LZ4, true); #endif +#ifdef ENABLE_ZSTD + build_info_set_status(BIB_LIB_ZSTD, true); +#endif build_info_set_status(BIB_LIB_ZLIB, true); diff --git a/daemon/main.c b/daemon/main.c index 5d25f88b5f..e767b96764 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -1337,6 +1337,7 @@ int julytest(void); int pluginsd_parser_unittest(void); void replication_initialize(void); void bearer_tokens_init(void); +int unittest_rrdpush_compressions(void); int main(int argc, char **argv) { // initialize the system clocks @@ -1550,6 +1551,10 @@ int main(int argc, char **argv) { unittest_running = true; return pluginsd_parser_unittest(); } + else if(strcmp(optarg, "rrdpush_compressions_test") == 0) { + unittest_running = true; + return unittest_rrdpush_compressions(); + } else if(strncmp(optarg, createdataset_string, strlen(createdataset_string)) == 0) { optarg += strlen(createdataset_string); unsigned history_seconds = strtoul(optarg, NULL, 0); @@ -1901,6 +1906,8 @@ int main(int argc, char **argv) { netdata_log_info("Netdata agent version \""VERSION"\" is starting"); ieee754_doubles = is_system_ieee754_double(); + if(!ieee754_doubles) + globally_disabled_capabilities |= STREAM_CAP_IEEE754; aral_judy_init(); diff --git a/database/rrdhost.c b/database/rrdhost.c index 6abd3b8169..5281677087 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -1145,13 +1145,10 @@ static void rrdhost_streaming_sender_structures_init(RRDHOST *host) host->sender->rrdpush_sender_pipe[PIPE_READ] = -1; host->sender->rrdpush_sender_pipe[PIPE_WRITE] = -1; host->sender->rrdpush_sender_socket = -1; + host->sender->disabled_capabilities = STREAM_CAP_NONE; -#ifdef ENABLE_RRDPUSH_COMPRESSION - if(default_rrdpush_compression_enabled) - host->sender->flags |= SENDER_FLAG_COMPRESSION; - else - host->sender->flags &= ~SENDER_FLAG_COMPRESSION; -#endif + if(!default_rrdpush_compression_enabled) + host->sender->disabled_capabilities |= STREAM_CAP_COMPRESSIONS_AVAILABLE; spinlock_init(&host->sender->spinlock); replication_init_sender(host->sender); @@ -1167,9 +1164,7 @@ static void rrdhost_streaming_sender_structures_free(RRDHOST *host) rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP, true); // stop a possibly running thread cbuffer_free(host->sender->buffer); -#ifdef ENABLE_RRDPUSH_COMPRESSION rrdpush_compressor_destroy(&host->sender->compressor); -#endif replication_cleanup_sender(host->sender); @@ -1885,9 +1880,7 @@ void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s) { else s->stream.status = RRDHOST_STREAM_STATUS_ONLINE; -#ifdef ENABLE_RRDPUSH_COMPRESSION - s->stream.compression = (stream_has_capability(host->sender, STREAM_CAP_COMPRESSION) && host->sender->compressor.initialized); -#endif + s->stream.compression = host->sender->compressor.initialized; } else { s->stream.status = RRDHOST_STREAM_STATUS_OFFLINE; diff --git a/libnetdata/libnetdata.h b/libnetdata/libnetdata.h index c337b3b5d4..7459cd0ed7 100644 --- a/libnetdata/libnetdata.h +++ b/libnetdata/libnetdata.h @@ -11,10 +11,6 @@ extern "C" { #include #endif -#ifdef ENABLE_LZ4 -#define ENABLE_RRDPUSH_COMPRESSION 1 -#endif - #ifdef ENABLE_OPENSSL #define ENABLE_HTTPS 1 #endif @@ -681,9 +677,10 @@ static inline BITMAPX *bitmapX_create(uint32_t bits) { #define bitmap1024_get_bit(ptr, idx) bitmapX_get_bit((BITMAPX *)ptr, idx) #define bitmap1024_set_bit(ptr, idx, value) bitmapX_set_bit((BITMAPX *)ptr, idx, value) - -#define COMPRESSION_MAX_MSG_SIZE 0x4000 -#define PLUGINSD_LINE_MAX (COMPRESSION_MAX_MSG_SIZE - 1024) +#define COMPRESSION_MAX_CHUNK 0x4000 +#define COMPRESSION_MAX_OVERHEAD 128 +#define COMPRESSION_MAX_MSG_SIZE (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD - 1) +#define PLUGINSD_LINE_MAX (COMPRESSION_MAX_MSG_SIZE - 768) int pluginsd_isspace(char c); int config_isspace(char c); int group_by_label_isspace(char c); diff --git a/streaming/compression.c b/streaming/compression.c index 6d4a128b05..22f0877fd0 100644 --- a/streaming/compression.c +++ b/streaming/compression.c @@ -1,181 +1,286 @@ -#include "rrdpush.h" +// SPDX-License-Identifier: GPL-3.0-or-later -#ifdef ENABLE_RRDPUSH_COMPRESSION -#include "lz4.h" +#include "compression.h" -#define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION" +#include "compression_gzip.h" -/* - * Reset compressor state for a new stream - */ -void rrdpush_compressor_reset(struct compressor_state *state) { - if(!state->initialized) { - state->initialized = true; +#ifdef ENABLE_LZ4 +#include "compression_lz4.h" +#endif - state->stream.lz4_stream = LZ4_createStream(); - state->stream.input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(COMPRESSION_MAX_MSG_SIZE * 2); - state->stream.input_ring_buffer = callocz(1, state->stream.input_ring_buffer_size); - state->compression_result_buffer_size = 0; +#ifdef ENABLE_ZSTD +#include "compression_zstd.h" +#endif + +// ---------------------------------------------------------------------------- +// compressor public API + +void rrdpush_compressor_init(struct compressor_state *state) { + switch(state->algorithm) { +#ifdef ENABLE_ZSTD + case COMPRESSION_ALGORITHM_ZSTD: + rrdpush_compressor_init_zstd(state); + break; +#endif + +#ifdef ENABLE_LZ4 + case COMPRESSION_ALGORITHM_LZ4: + rrdpush_compressor_init_lz4(state); + break; +#endif + + default: + case COMPRESSION_ALGORITHM_GZIP: + rrdpush_compressor_init_gzip(state); + break; } - LZ4_resetStream_fast(state->stream.lz4_stream); - - state->stream.input_ring_buffer_pos = 0; + simple_ring_buffer_reset(&state->input); + simple_ring_buffer_reset(&state->output); } -/* - * Destroy compressor state and all related data - */ void rrdpush_compressor_destroy(struct compressor_state *state) { - if (state->stream.lz4_stream) { - LZ4_freeStream(state->stream.lz4_stream); - state->stream.lz4_stream = NULL; + switch(state->algorithm) { +#ifdef ENABLE_ZSTD + case COMPRESSION_ALGORITHM_ZSTD: + rrdpush_compressor_destroy_zstd(state); + break; +#endif + +#ifdef ENABLE_LZ4 + case COMPRESSION_ALGORITHM_LZ4: + rrdpush_compressor_destroy_lz4(state); + break; +#endif + + default: + case COMPRESSION_ALGORITHM_GZIP: + rrdpush_compressor_destroy_gzip(state); + break; } - freez(state->stream.input_ring_buffer); - state->stream.input_ring_buffer = NULL; - - freez(state->compression_result_buffer); - state->compression_result_buffer = NULL; - state->initialized = false; + + simple_ring_buffer_destroy(&state->input); + simple_ring_buffer_destroy(&state->output); } -/* - * Compress the given block of data - * Compressed data will remain in the internal buffer until the next invocation - * Return the size of compressed data block as result and the pointer to internal buffer using the last argument - * or 0 in case of error - */ -size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out) { - if(unlikely(!state || !size || !out)) - return 0; +size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, const char **out) { + size_t ret = 0; - if(unlikely(size > COMPRESSION_MAX_MSG_SIZE)) { - netdata_log_error("RRDPUSH COMPRESS: Compression Failed - Message size %lu above compression buffer limit: %d", - (long unsigned int)size, COMPRESSION_MAX_MSG_SIZE); + switch(state->algorithm) { +#ifdef ENABLE_ZSTD + case COMPRESSION_ALGORITHM_ZSTD: + ret = rrdpush_compress_zstd(state, data, size, out); + break; +#endif + +#ifdef ENABLE_LZ4 + case COMPRESSION_ALGORITHM_LZ4: + ret = rrdpush_compress_lz4(state, data, size, out); + break; +#endif + + default: + case COMPRESSION_ALGORITHM_GZIP: + ret = rrdpush_compress_gzip(state, data, size, out); + break; + } + + if(unlikely(ret >= COMPRESSION_MAX_CHUNK)) { + netdata_log_error("RRDPUSH_COMPRESS: compressed data is %zu bytes, which is >= than the max chunk size %zu", + ret, COMPRESSION_MAX_CHUNK); return 0; } - size_t max_dst_size = LZ4_COMPRESSBOUND(size); - size_t data_size = max_dst_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE; - - if (!state->compression_result_buffer) { - state->compression_result_buffer = mallocz(data_size); - state->compression_result_buffer_size = data_size; - } - else if(unlikely(state->compression_result_buffer_size < data_size)) { - state->compression_result_buffer = reallocz(state->compression_result_buffer, data_size); - state->compression_result_buffer_size = data_size; - } - - // the ring buffer always has space for LZ4_MAX_MSG_SIZE - memcpy(state->stream.input_ring_buffer + state->stream.input_ring_buffer_pos, data, size); - - // this call needs the last 64K of our previous data - // they are available in the ring buffer - long int compressed_data_size = LZ4_compress_fast_continue( - state->stream.lz4_stream, - state->stream.input_ring_buffer + state->stream.input_ring_buffer_pos, - state->compression_result_buffer + RRDPUSH_COMPRESSION_SIGNATURE_SIZE, - (int)size, - (int)max_dst_size, - 1); - - if (compressed_data_size < 0) { - netdata_log_error("Data compression error: %ld", compressed_data_size); - return 0; - } - - // update the next writing position of the ring buffer - state->stream.input_ring_buffer_pos += size; - if(unlikely(state->stream.input_ring_buffer_pos >= state->stream.input_ring_buffer_size - COMPRESSION_MAX_MSG_SIZE)) - state->stream.input_ring_buffer_pos = 0; - - // update the signature header - uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8; - *(uint32_t *)state->compression_result_buffer = len | RRDPUSH_COMPRESSION_SIGNATURE; - *out = state->compression_result_buffer; - netdata_log_debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size); - return compressed_data_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE; + return ret; } -/* - * Decompress the compressed data in the internal buffer - * Return the size of uncompressed data or 0 for error - */ -size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) { - if (unlikely(!state || !compressed_data || !compressed_size)) - return 0; - - if(unlikely(state->stream.read_at != state->stream.write_at)) - fatal("RRDPUSH_DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!"); - - if (unlikely(state->stream.write_at >= state->stream.size / 2)) { - state->stream.write_at = 0; - state->stream.read_at = 0; - } - - long int decompressed_size = LZ4_decompress_safe_continue( - state->stream.lz4_stream - , compressed_data - , state->stream.buffer + state->stream.write_at - , (int)compressed_size - , (int)(state->stream.size - state->stream.write_at) - ); - - if (unlikely(decompressed_size < 0)) { - netdata_log_error("RRDPUSH DECOMPRESS: decompressor returned negative decompressed bytes: %ld", decompressed_size); - return 0; - } - - if(unlikely(decompressed_size + state->stream.write_at > state->stream.size)) - fatal("RRDPUSH DECOMPRESS: decompressor overflown the stream_buffer. size: %zu, pos: %zu, added: %ld, " - "exceeding the buffer by %zu" - , state->stream.size - , state->stream.write_at - , decompressed_size - , (size_t)(state->stream.write_at + decompressed_size - state->stream.size) - ); - - state->stream.write_at += decompressed_size; - - // statistics - state->total_compressed += compressed_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE; - state->total_uncompressed += decompressed_size; - state->packet_count++; - - return decompressed_size; -} - -void rrdpush_decompressor_reset(struct decompressor_state *state) { - if(!state->initialized) { - state->initialized = true; - state->stream.lz4_stream = LZ4_createStreamDecode(); - state->stream.size = LZ4_decoderRingBufferSize(COMPRESSION_MAX_MSG_SIZE) * 2; - state->stream.buffer = mallocz(state->stream.size); - } - - LZ4_setStreamDecode(state->stream.lz4_stream, NULL, 0); - - state->signature_size = RRDPUSH_COMPRESSION_SIGNATURE_SIZE; - state->stream.write_at = 0; - state->stream.read_at = 0; -} +// ---------------------------------------------------------------------------- +// decompressor public API void rrdpush_decompressor_destroy(struct decompressor_state *state) { if(unlikely(!state->initialized)) return; - if (state->stream.lz4_stream) { - LZ4_freeStreamDecode(state->stream.lz4_stream); - state->stream.lz4_stream = NULL; + switch(state->algorithm) { +#ifdef ENABLE_ZSTD + case COMPRESSION_ALGORITHM_ZSTD: + rrdpush_decompressor_destroy_zstd(state); + break; +#endif + +#ifdef ENABLE_LZ4 + case COMPRESSION_ALGORITHM_LZ4: + rrdpush_decompressor_destroy_lz4(state); + break; +#endif + + default: + case COMPRESSION_ALGORITHM_GZIP: + rrdpush_decompressor_destroy_gzip(state); + break; } - freez(state->stream.buffer); - state->stream.buffer = NULL; + simple_ring_buffer_destroy(&state->output); state->initialized = false; } +void rrdpush_decompressor_init(struct decompressor_state *state) { + switch(state->algorithm) { +#ifdef ENABLE_ZSTD + case COMPRESSION_ALGORITHM_ZSTD: + rrdpush_decompressor_init_zstd(state); + break; #endif + +#ifdef ENABLE_LZ4 + case COMPRESSION_ALGORITHM_LZ4: + rrdpush_decompressor_init_lz4(state); + break; +#endif + + default: + case COMPRESSION_ALGORITHM_GZIP: + rrdpush_decompressor_init_gzip(state); + break; + } + + state->signature_size = RRDPUSH_COMPRESSION_SIGNATURE_SIZE; + simple_ring_buffer_reset(&state->output); +} + +size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) { + if (unlikely(state->output.read_pos != state->output.write_pos)) + fatal("RRDPUSH_DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!"); + + size_t ret = 0; + + switch(state->algorithm) { +#ifdef ENABLE_ZSTD + case COMPRESSION_ALGORITHM_ZSTD: + ret = rrdpush_decompress_zstd(state, compressed_data, compressed_size); + break; +#endif + +#ifdef ENABLE_LZ4 + case COMPRESSION_ALGORITHM_LZ4: + ret = rrdpush_decompress_lz4(state, compressed_data, compressed_size); + break; +#endif + + default: + case COMPRESSION_ALGORITHM_GZIP: + ret = rrdpush_decompress_gzip(state, compressed_data, compressed_size); + break; + } + + // for backwards compatibility we cannot check for COMPRESSION_MAX_MSG_SIZE, + // because old children may send this big payloads. + if(unlikely(ret > COMPRESSION_MAX_CHUNK)) { + netdata_log_error("RRDPUSH_DECOMPRESS: decompressed data is %zu bytes, which is bigger than the max msg size %zu", + ret, COMPRESSION_MAX_CHUNK); + return 0; + } + + return ret; +} + +// ---------------------------------------------------------------------------- +// unit test + +int unittest_rrdpush_compression(compression_algorithm_t algorithm, const char *name) { + fprintf(stderr, "\nTesting streaming compression with %s\n", name); + + struct compressor_state cctx = { + .initialized = false, + .algorithm = algorithm, + }; + struct decompressor_state dctx = { + .initialized = false, + .algorithm = algorithm, + }; + + char txt[COMPRESSION_MAX_MSG_SIZE]; + + rrdpush_compressor_init(&cctx); + rrdpush_decompressor_init(&dctx); + + int errors = 0; + + memset(txt, '=', COMPRESSION_MAX_MSG_SIZE); + + for(int i = 0; i < COMPRESSION_MAX_MSG_SIZE ;i++) { + txt[i] = 'A' + (i % 26); + size_t txt_len = i + 1; + + const char *out; + size_t size = rrdpush_compress(&cctx, txt, txt_len, &out); + + if(size >= COMPRESSION_MAX_CHUNK) { + fprintf(stderr, "iteration %d: compressed size %zu exceeds max allowed size\n", + i, size); + errors++; + goto cleanup; + } + else { + size_t dtxt_len = rrdpush_decompress(&dctx, out, size); + char *dtxt = (char *) &dctx.output.data[dctx.output.read_pos]; + + if(rrdpush_decompressed_bytes_in_buffer(&dctx) != dtxt_len) { + fprintf(stderr, "iteration %d: decompressed size %zu does not rrdpush_decompressed_bytes_in_buffer() %zu\n", + i, dtxt_len, rrdpush_decompressed_bytes_in_buffer(&dctx) + ); + errors++; + goto cleanup; + } + + if(dtxt_len != txt_len) { + fprintf(stderr, "iteration %d: decompressed size %zu does not match original size %zu\n", + i, dtxt_len, txt_len + ); + errors++; + goto cleanup; + } + else { + if(memcmp(txt, dtxt, txt_len) != 0) { + txt[txt_len] = '\0'; + dtxt[txt_len + 5] = '\0'; + + fprintf(stderr, "iteration %d: decompressed data '%s' do not match original data '%s' of length %zu\n", + i, dtxt, txt, txt_len); + errors++; + goto cleanup; + } + } + } + + // fill the compressed buffer with garbage + memset((void *)out, 'x', size); + + // here we are supposed to copy the data and advance the position + dctx.output.read_pos += rrdpush_decompressed_bytes_in_buffer(&dctx); + } + +cleanup: + rrdpush_compressor_destroy(&cctx); + rrdpush_decompressor_destroy(&dctx); + + if(errors) + fprintf(stderr, "Compression with %s: FAILED (%d errors)\n", name, errors); + else + fprintf(stderr, "Compression with %s: OK\n", name); + + return errors; +} + +int unittest_rrdpush_compressions(void) { + int ret = 0; + + ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_GZIP, "GZIP"); + ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_LZ4, "LZ4"); + ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_ZSTD, "ZSTD"); + + return ret; +} diff --git a/streaming/compression.h b/streaming/compression.h new file mode 100644 index 0000000000..011e14eadd --- /dev/null +++ b/streaming/compression.h @@ -0,0 +1,171 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "rrdpush.h" + +#ifndef NETDATA_RRDPUSH_COMPRESSION_H +#define NETDATA_RRDPUSH_COMPRESSION_H 1 + +// signature MUST end with a newline + +#if COMPRESSION_MAX_MSG_SIZE >= (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD) +#error "COMPRESSION_MAX_MSG_SIZE >= (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD)" +#endif + +typedef uint32_t rrdpush_signature_t; +#define RRDPUSH_COMPRESSION_SIGNATURE ((rrdpush_signature_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24)) +#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((rrdpush_signature_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24)) +#define RRDPUSH_COMPRESSION_SIGNATURE_SIZE sizeof(rrdpush_signature_t) + +static inline rrdpush_signature_t rrdpush_compress_encode_signature(size_t compressed_data_size) { + rrdpush_signature_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8; + return len | RRDPUSH_COMPRESSION_SIGNATURE; +} + +typedef enum { + COMPRESSION_ALGORITHM_NONE = 0, + COMPRESSION_ALGORITHM_ZSTD, + COMPRESSION_ALGORITHM_LZ4, + COMPRESSION_ALGORITHM_GZIP, + + // terminator + COMPRESSION_ALGORITHM_MAX, +} compression_algorithm_t; + +extern int rrdpush_compression_levels[COMPRESSION_ALGORITHM_MAX]; + +// ---------------------------------------------------------------------------- + +typedef struct simple_ring_buffer { + const char *data; + size_t size; + size_t read_pos; + size_t write_pos; +} SIMPLE_RING_BUFFER; + +static inline void simple_ring_buffer_reset(SIMPLE_RING_BUFFER *b) { + b->read_pos = b->write_pos = 0; +} + +static inline void simple_ring_buffer_make_room(SIMPLE_RING_BUFFER *b, size_t size) { + if(b->write_pos + size > b->size) { + if(!b->size) + b->size = COMPRESSION_MAX_CHUNK; + else + b->size *= 2; + + if(b->write_pos + size > b->size) + b->size += size; + + b->data = (const char *)reallocz((void *)b->data, b->size); + } +} + +static inline void simple_ring_buffer_append_data(SIMPLE_RING_BUFFER *b, const void *data, size_t size) { + simple_ring_buffer_make_room(b, size); + memcpy((void *)(b->data + b->write_pos), data, size); + b->write_pos += size; +} + +static inline void simple_ring_buffer_destroy(SIMPLE_RING_BUFFER *b) { + freez((void *)b->data); + b->data = NULL; + b->read_pos = b->write_pos = b->size = 0; +} + +// ---------------------------------------------------------------------------- + +struct compressor_state { + bool initialized; + compression_algorithm_t algorithm; + + SIMPLE_RING_BUFFER input; + SIMPLE_RING_BUFFER output; + + int level; + void *stream; + + struct { + size_t total_compressed; + size_t total_uncompressed; + size_t total_compressions; + } sender_locked; +}; + +void rrdpush_compressor_init(struct compressor_state *state); +void rrdpush_compressor_destroy(struct compressor_state *state); +size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, const char **out); + +// ---------------------------------------------------------------------------- + +struct decompressor_state { + bool initialized; + compression_algorithm_t algorithm; + size_t signature_size; + + size_t total_compressed; + size_t total_uncompressed; + size_t total_compressions; + + SIMPLE_RING_BUFFER output; + + void *stream; +}; + +void rrdpush_decompressor_destroy(struct decompressor_state *state); +void rrdpush_decompressor_init(struct decompressor_state *state); +size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size); + +static inline size_t rrdpush_decompress_decode_signature(const char *data, size_t data_size) { + if (unlikely(!data || !data_size)) + return 0; + + if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE)) + return 0; + + rrdpush_signature_t sign = *(rrdpush_signature_t *)data; + if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE)) + return 0; + + size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7)); + return length; +} + +static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) { + if(unlikely(state->output.read_pos != state->output.write_pos)) + fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!"); + + return rrdpush_decompress_decode_signature(header, header_size); +} + +static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) { + if(unlikely(state->output.read_pos > state->output.write_pos)) + fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions"); + + return state->output.write_pos - state->output.read_pos; +} + +static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) { + if (unlikely(!state || !size || !dst)) + return 0; + + size_t remaining = rrdpush_decompressed_bytes_in_buffer(state); + + if(unlikely(!remaining)) + return 0; + + size_t bytes_to_return = size; + if(bytes_to_return > remaining) + bytes_to_return = remaining; + + memcpy(dst, state->output.data + state->output.read_pos, bytes_to_return); + state->output.read_pos += bytes_to_return; + + if(unlikely(state->output.read_pos > state->output.write_pos)) + fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions"); + + return bytes_to_return; +} + +// ---------------------------------------------------------------------------- + +#endif // NETDATA_RRDPUSH_COMPRESSION_H 1 diff --git a/streaming/compression_gzip.c b/streaming/compression_gzip.c new file mode 100644 index 0000000000..c76c4aad41 --- /dev/null +++ b/streaming/compression_gzip.c @@ -0,0 +1,158 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "compression_gzip.h" +#include + +void rrdpush_compressor_init_gzip(struct compressor_state *state) { + if (!state->initialized) { + state->initialized = true; + + // Initialize deflate stream + z_stream *strm = state->stream = (z_stream *) mallocz(sizeof(z_stream)); + strm->zalloc = Z_NULL; + strm->zfree = Z_NULL; + strm->opaque = Z_NULL; + + if(state->level < Z_BEST_SPEED) + state->level = Z_BEST_SPEED; + + if(state->level > Z_BEST_COMPRESSION) + state->level = Z_BEST_COMPRESSION; + + // int r = deflateInit2(strm, Z_BEST_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY); + int r = deflateInit2(strm, state->level, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY); + if (r != Z_OK) { + netdata_log_error("Failed to initialize deflate with error: %d", r); + freez(state->stream); + state->initialized = false; + return; + } + + } +} + +void rrdpush_compressor_destroy_gzip(struct compressor_state *state) { + if (state->stream) { + deflateEnd(state->stream); + free(state->stream); + state->stream = NULL; + } +} + +size_t rrdpush_compress_gzip(struct compressor_state *state, const char *data, size_t size, const char **out) { + if (unlikely(!state || !size || !out)) + return 0; + + simple_ring_buffer_make_room(&state->output, deflateBound(state->stream, size)); + + z_stream *strm = state->stream; + strm->avail_in = (uInt)size; + strm->next_in = (Bytef *)data; + strm->avail_out = (uInt)state->output.size; + strm->next_out = (Bytef *)state->output.data; + + int ret = deflate(strm, Z_SYNC_FLUSH); + if (ret != Z_OK && ret != Z_STREAM_END) { + netdata_log_error("STREAM: deflate() failed with error %d", ret); + return 0; + } + + if(strm->avail_in != 0) { + netdata_log_error("STREAM: deflate() did not use all the input buffer, %u bytes out of %zu remain", + strm->avail_in, size); + return 0; + } + + if(strm->avail_out == 0) { + netdata_log_error("STREAM: deflate() needs a bigger output buffer than the one we provided " + "(output buffer %zu bytes, compressed payload %zu bytes)", + state->output.size, size); + return 0; + } + + size_t compressed_data_size = state->output.size - strm->avail_out; + + if(compressed_data_size == 0) { + netdata_log_error("STREAM: deflate() did not produce any output " + "(output buffer %zu bytes, compressed payload %zu bytes)", + state->output.size, size); + return 0; + } + + state->sender_locked.total_compressions++; + state->sender_locked.total_uncompressed += size; + state->sender_locked.total_compressed += compressed_data_size; + + *out = state->output.data; + return compressed_data_size; +} + +void rrdpush_decompressor_init_gzip(struct decompressor_state *state) { + if (!state->initialized) { + state->initialized = true; + + // Initialize inflate stream + z_stream *strm = state->stream = (z_stream *) malloc(sizeof(z_stream)); + strm->zalloc = Z_NULL; + strm->zfree = Z_NULL; + strm->opaque = Z_NULL; + + inflateInit2(strm, 15 + 16); + + simple_ring_buffer_make_room(&state->output, COMPRESSION_MAX_CHUNK); + } +} + +void rrdpush_decompressor_destroy_gzip(struct decompressor_state *state) { + if (state->stream) { + inflateEnd(state->stream); + free(state->stream); + state->stream = NULL; + } +} + +size_t rrdpush_decompress_gzip(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) { + if (unlikely(!state || !compressed_data || !compressed_size)) + return 0; + + // The state.output ring buffer is always EMPTY at this point, + // meaning that (state->output.read_pos == state->output.write_pos) + // However, THEY ARE NOT ZERO. + + z_stream *strm = state->stream; + strm->avail_in = (uInt)compressed_size; + strm->next_in = (Bytef *)compressed_data; + strm->avail_out = (uInt)state->output.size; + strm->next_out = (Bytef *)state->output.data; + + int ret = inflate(strm, Z_SYNC_FLUSH); + if (ret != Z_STREAM_END && ret != Z_OK) { + netdata_log_error("RRDPUSH DECOMPRESS: inflate() failed with error %d", ret); + return 0; + } + + if(strm->avail_in != 0) { + netdata_log_error("RRDPUSH DECOMPRESS: inflate() did not use all compressed data we provided " + "(compressed payload %zu bytes, remaining to be uncompressed %u)" + , compressed_size, strm->avail_in); + return 0; + } + + if(strm->avail_out == 0) { + netdata_log_error("RRDPUSH DECOMPRESS: inflate() needs a bigger output buffer than the one we provided " + "(compressed payload %zu bytes, output buffer size %zu bytes)" + , compressed_size, state->output.size); + return 0; + } + + size_t decompressed_size = state->output.size - strm->avail_out; + + state->output.read_pos = 0; + state->output.write_pos = decompressed_size; + + state->total_compressed += compressed_size; + state->total_uncompressed += decompressed_size; + state->total_compressions++; + + return decompressed_size; +} diff --git a/streaming/compression_gzip.h b/streaming/compression_gzip.h new file mode 100644 index 0000000000..85f34bc6d7 --- /dev/null +++ b/streaming/compression_gzip.h @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "compression.h" + +#ifndef NETDATA_STREAMING_COMPRESSION_GZIP_H +#define NETDATA_STREAMING_COMPRESSION_GZIP_H + +void rrdpush_compressor_init_gzip(struct compressor_state *state); +void rrdpush_compressor_destroy_gzip(struct compressor_state *state); +size_t rrdpush_compress_gzip(struct compressor_state *state, const char *data, size_t size, const char **out); +size_t rrdpush_decompress_gzip(struct decompressor_state *state, const char *compressed_data, size_t compressed_size); +void rrdpush_decompressor_init_gzip(struct decompressor_state *state); +void rrdpush_decompressor_destroy_gzip(struct decompressor_state *state); + +#endif //NETDATA_STREAMING_COMPRESSION_GZIP_H diff --git a/streaming/compression_lz4.c b/streaming/compression_lz4.c new file mode 100644 index 0000000000..f5174134eb --- /dev/null +++ b/streaming/compression_lz4.c @@ -0,0 +1,143 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "compression_lz4.h" + +#ifdef ENABLE_LZ4 +#include "lz4.h" + +// ---------------------------------------------------------------------------- +// compress + +void rrdpush_compressor_init_lz4(struct compressor_state *state) { + if(!state->initialized) { + state->initialized = true; + state->stream = LZ4_createStream(); + + // LZ4 needs access to the last 64KB of source data + // so, we keep twice the size of each message + simple_ring_buffer_make_room(&state->input, 65536 + COMPRESSION_MAX_CHUNK * 2); + } +} + +void rrdpush_compressor_destroy_lz4(struct compressor_state *state) { + if (state->stream) { + LZ4_freeStream(state->stream); + state->stream = NULL; + } +} + +/* + * Compress the given block of data + * Compressed data will remain in the internal buffer until the next invocation + * Return the size of compressed data block as result and the pointer to internal buffer using the last argument + * or 0 in case of error + */ +size_t rrdpush_compress_lz4(struct compressor_state *state, const char *data, size_t size, const char **out) { + if(unlikely(!state || !size || !out)) + return 0; + + // we need to keep the last 64K of our previous source data + // as they were in the ring buffer + + simple_ring_buffer_make_room(&state->output, LZ4_COMPRESSBOUND(size)); + + if(state->input.write_pos + size > state->input.size) + // the input buffer cannot fit out data, restart from zero + simple_ring_buffer_reset(&state->input); + + simple_ring_buffer_append_data(&state->input, data, size); + + long int compressed_data_size = LZ4_compress_fast_continue( + state->stream, + state->input.data + state->input.read_pos, + (char *)state->output.data, + (int)(state->input.write_pos - state->input.read_pos), + (int)state->output.size, + state->level); + + if (compressed_data_size <= 0) { + netdata_log_error("STREAM: LZ4_compress_fast_continue() returned %ld " + "(source is %zu bytes, output buffer can fit %zu bytes)", + compressed_data_size, size, state->output.size); + return 0; + } + + state->input.read_pos = state->input.write_pos; + + state->sender_locked.total_compressions++; + state->sender_locked.total_uncompressed += size; + state->sender_locked.total_compressed += compressed_data_size; + + *out = state->output.data; + return compressed_data_size; +} + +// ---------------------------------------------------------------------------- +// decompress + +void rrdpush_decompressor_init_lz4(struct decompressor_state *state) { + if(!state->initialized) { + state->initialized = true; + state->stream = LZ4_createStreamDecode(); + simple_ring_buffer_make_room(&state->output, 65536 + COMPRESSION_MAX_CHUNK * 2); + } +} + +void rrdpush_decompressor_destroy_lz4(struct decompressor_state *state) { + if (state->stream) { + LZ4_freeStreamDecode(state->stream); + state->stream = NULL; + } +} + +/* + * Decompress the compressed data in the internal buffer + * Return the size of uncompressed data or 0 for error + */ +size_t rrdpush_decompress_lz4(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) { + if (unlikely(!state || !compressed_data || !compressed_size)) + return 0; + + // The state.output ring buffer is always EMPTY at this point, + // meaning that (state->output.read_pos == state->output.write_pos) + // However, THEY ARE NOT ZERO. + + if (unlikely(state->output.write_pos + COMPRESSION_MAX_CHUNK > state->output.size)) + // the input buffer cannot fit out data, restart from zero + simple_ring_buffer_reset(&state->output); + + long int decompressed_size = LZ4_decompress_safe_continue( + state->stream + , compressed_data + , (char *)(state->output.data + state->output.write_pos) + , (int)compressed_size + , (int)(state->output.size - state->output.write_pos) + ); + + if (unlikely(decompressed_size < 0)) { + netdata_log_error("RRDPUSH DECOMPRESS: LZ4_decompress_safe_continue() returned negative value: %ld " + "(compressed chunk is %zu bytes)" + , decompressed_size, compressed_size); + return 0; + } + + if(unlikely(decompressed_size + state->output.write_pos > state->output.size)) + fatal("RRDPUSH DECOMPRESS: LZ4_decompress_safe_continue() overflown the stream_buffer " + "(size: %zu, pos: %zu, added: %ld, exceeding the buffer by %zu)" + , state->output.size + , state->output.write_pos + , decompressed_size + , (size_t)(state->output.write_pos + decompressed_size - state->output.size) + ); + + state->output.write_pos += decompressed_size; + + // statistics + state->total_compressed += compressed_size; + state->total_uncompressed += decompressed_size; + state->total_compressions++; + + return decompressed_size; +} + +#endif // ENABLE_LZ4 diff --git a/streaming/compression_lz4.h b/streaming/compression_lz4.h new file mode 100644 index 0000000000..69f0fadccf --- /dev/null +++ b/streaming/compression_lz4.h @@ -0,0 +1,19 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "compression.h" + +#ifndef NETDATA_STREAMING_COMPRESSION_LZ4_H +#define NETDATA_STREAMING_COMPRESSION_LZ4_H + +#ifdef ENABLE_LZ4 + +void rrdpush_compressor_init_lz4(struct compressor_state *state); +void rrdpush_compressor_destroy_lz4(struct compressor_state *state); +size_t rrdpush_compress_lz4(struct compressor_state *state, const char *data, size_t size, const char **out); +size_t rrdpush_decompress_lz4(struct decompressor_state *state, const char *compressed_data, size_t compressed_size); +void rrdpush_decompressor_init_lz4(struct decompressor_state *state); +void rrdpush_decompressor_destroy_lz4(struct decompressor_state *state); + +#endif // ENABLE_LZ4 + +#endif //NETDATA_STREAMING_COMPRESSION_LZ4_H diff --git a/streaming/compression_zstd.c b/streaming/compression_zstd.c new file mode 100644 index 0000000000..dabc044f7f --- /dev/null +++ b/streaming/compression_zstd.c @@ -0,0 +1,163 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "compression_zstd.h" + +#ifdef ENABLE_ZSTD +#include + +void rrdpush_compressor_init_zstd(struct compressor_state *state) { + if(!state->initialized) { + state->initialized = true; + state->stream = ZSTD_createCStream(); + + if(state->level < 1) + state->level = 1; + + if(state->level > ZSTD_maxCLevel()) + state->level = ZSTD_maxCLevel(); + + size_t ret = ZSTD_initCStream(state->stream, state->level); + if(ZSTD_isError(ret)) + netdata_log_error("STREAM: ZSTD_initCStream() returned error: %s", ZSTD_getErrorName(ret)); + + // ZSTD_CCtx_setParameter(state->stream, ZSTD_c_compressionLevel, 1); + // ZSTD_CCtx_setParameter(state->stream, ZSTD_c_strategy, ZSTD_fast); + } +} + +void rrdpush_compressor_destroy_zstd(struct compressor_state *state) { + if(state->stream) { + ZSTD_freeCStream(state->stream); + state->stream = NULL; + } +} + +size_t rrdpush_compress_zstd(struct compressor_state *state, const char *data, size_t size, const char **out) { + if(unlikely(!state || !size || !out)) + return 0; + + ZSTD_inBuffer inBuffer = { + .pos = 0, + .size = size, + .src = data, + }; + + size_t wanted_size = MAX(ZSTD_compressBound(inBuffer.size - inBuffer.pos), ZSTD_CStreamOutSize()); + simple_ring_buffer_make_room(&state->output, wanted_size); + + ZSTD_outBuffer outBuffer = { + .pos = 0, + .size = state->output.size, + .dst = (void *)state->output.data, + }; + + // compress + size_t ret = ZSTD_compressStream(state->stream, &outBuffer, &inBuffer); + + // error handling + if(ZSTD_isError(ret)) { + netdata_log_error("STREAM: ZSTD_compressStream() return error: %s", ZSTD_getErrorName(ret)); + return 0; + } + + if(inBuffer.pos < inBuffer.size) { + netdata_log_error("STREAM: ZSTD_compressStream() left unprocessed input (source payload %zu bytes, consumed %zu bytes)", + inBuffer.size, inBuffer.pos); + return 0; + } + + if(outBuffer.pos == 0) { + // ZSTD needs more input to flush the output, so let's flush it manually + ret = ZSTD_flushStream(state->stream, &outBuffer); + + if(ZSTD_isError(ret)) { + netdata_log_error("STREAM: ZSTD_flushStream() return error: %s", ZSTD_getErrorName(ret)); + return 0; + } + + if(outBuffer.pos == 0) { + netdata_log_error("STREAM: ZSTD_compressStream() returned zero compressed bytes " + "(source is %zu bytes, output buffer can fit %zu bytes) " + , size, outBuffer.size); + return 0; + } + } + + state->sender_locked.total_compressions++; + state->sender_locked.total_uncompressed += size; + state->sender_locked.total_compressed += outBuffer.pos; + + // return values + *out = state->output.data; + return outBuffer.pos; +} + +void rrdpush_decompressor_init_zstd(struct decompressor_state *state) { + if(!state->initialized) { + state->initialized = true; + state->stream = ZSTD_createDStream(); + + size_t ret = ZSTD_initDStream(state->stream); + if(ZSTD_isError(ret)) + netdata_log_error("STREAM: ZSTD_initDStream() returned error: %s", ZSTD_getErrorName(ret)); + + simple_ring_buffer_make_room(&state->output, MAX(COMPRESSION_MAX_CHUNK, ZSTD_DStreamOutSize())); + } +} + +void rrdpush_decompressor_destroy_zstd(struct decompressor_state *state) { + if (state->stream) { + ZSTD_freeDStream(state->stream); + state->stream = NULL; + } +} + +size_t rrdpush_decompress_zstd(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) { + if (unlikely(!state || !compressed_data || !compressed_size)) + return 0; + + // The state.output ring buffer is always EMPTY at this point, + // meaning that (state->output.read_pos == state->output.write_pos) + // However, THEY ARE NOT ZERO. + + ZSTD_inBuffer inBuffer = { + .pos = 0, + .size = compressed_size, + .src = compressed_data, + }; + + ZSTD_outBuffer outBuffer = { + .pos = 0, + .dst = (char *)state->output.data, + .size = state->output.size, + }; + + size_t ret = ZSTD_decompressStream( + state->stream + , &outBuffer + , &inBuffer); + + if(ZSTD_isError(ret)) { + netdata_log_error("STREAM: ZSTD_decompressStream() return error: %s", ZSTD_getErrorName(ret)); + return 0; + } + + if(inBuffer.pos < inBuffer.size) + fatal("RRDPUSH DECOMPRESS: ZSTD ZSTD_decompressStream() decompressed %zu bytes, " + "but %zu bytes of compressed data remain", + inBuffer.pos, inBuffer.size); + + size_t decompressed_size = outBuffer.pos; + + state->output.read_pos = 0; + state->output.write_pos = outBuffer.pos; + + // statistics + state->total_compressed += compressed_size; + state->total_uncompressed += decompressed_size; + state->total_compressions++; + + return decompressed_size; +} + +#endif // ENABLE_ZSTD diff --git a/streaming/compression_zstd.h b/streaming/compression_zstd.h new file mode 100644 index 0000000000..bfabbf89d5 --- /dev/null +++ b/streaming/compression_zstd.h @@ -0,0 +1,19 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "compression.h" + +#ifndef NETDATA_STREAMING_COMPRESSION_ZSTD_H +#define NETDATA_STREAMING_COMPRESSION_ZSTD_H + +#ifdef ENABLE_ZSTD + +void rrdpush_compressor_init_zstd(struct compressor_state *state); +void rrdpush_compressor_destroy_zstd(struct compressor_state *state); +size_t rrdpush_compress_zstd(struct compressor_state *state, const char *data, size_t size, const char **out); +size_t rrdpush_decompress_zstd(struct decompressor_state *state, const char *compressed_data, size_t compressed_size); +void rrdpush_decompressor_init_zstd(struct decompressor_state *state); +void rrdpush_decompressor_destroy_zstd(struct decompressor_state *state); + +#endif // ENABLE_ZSTD + +#endif //NETDATA_STREAMING_COMPRESSION_ZSTD_H diff --git a/streaming/receiver.c b/streaming/receiver.c index 10ef8b7d3f..056ea21954 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -28,9 +28,7 @@ void receiver_state_free(struct receiver_state *rpt) { close(rpt->fd); } -#ifdef ENABLE_RRDPUSH_COMPRESSION rrdpush_decompressor_destroy(&rpt->decompressor); -#endif if(rpt->system_info) rrdhost_system_info_free(rpt->system_info); @@ -92,15 +90,44 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz return (int)bytes_read; } -static inline bool receiver_read_uncompressed(struct receiver_state *r) { +static inline STREAM_HANDSHAKE read_stream_error_to_reason(int code) { + if(code > 0) + return 0; + + switch(code) { + case 0: + // asked to read zero bytes + return STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER; + + case -1: + // EOF + return STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF; + + case -2: + // failed to read + return STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED; + + case -3: + // timeout + return STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT; + + default: + // anything else + return STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR; + } +} + +static inline bool receiver_read_uncompressed(struct receiver_state *r, STREAM_HANDSHAKE *reason) { #ifdef NETDATA_INTERNAL_CHECKS if(r->reader.read_buffer[r->reader.read_len] != '\0') fatal("%s(): read_buffer does not start with zero", __FUNCTION__ ); #endif int bytes_read = read_stream(r, r->reader.read_buffer + r->reader.read_len, sizeof(r->reader.read_buffer) - r->reader.read_len - 1); - if(unlikely(bytes_read <= 0)) + if(unlikely(bytes_read <= 0)) { + *reason = read_stream_error_to_reason(bytes_read); return false; + } worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read); worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_read); @@ -111,8 +138,7 @@ static inline bool receiver_read_uncompressed(struct receiver_state *r) { return true; } -#ifdef ENABLE_RRDPUSH_COMPRESSION -static inline bool receiver_read_compressed(struct receiver_state *r) { +static inline bool receiver_read_compressed(struct receiver_state *r, STREAM_HANDSHAKE *reason) { internal_fatal(r->reader.read_buffer[r->reader.read_len] != '\0', "%s: read_buffer does not start with zero #2", __FUNCTION__ ); @@ -150,8 +176,10 @@ static inline bool receiver_read_compressed(struct receiver_state *r) { int bytes_read = 0; do { int ret = read_stream(r, r->reader.read_buffer + r->reader.read_len + bytes_read, r->decompressor.signature_size - bytes_read); - if (unlikely(ret <= 0)) + if (unlikely(ret <= 0)) { + *reason = read_stream_error_to_reason(ret); return false; + } bytes_read += ret; } while(unlikely(bytes_read < (int)r->decompressor.signature_size)); @@ -187,7 +215,7 @@ static inline bool receiver_read_compressed(struct receiver_state *r) { int last_read_bytes = read_stream(r, &compressed[start], remaining); if (unlikely(last_read_bytes <= 0)) { - internal_error(true, "read_stream() failed #2, with code %d", last_read_bytes); + *reason = read_stream_error_to_reason(last_read_bytes); return false; } @@ -217,11 +245,6 @@ static inline bool receiver_read_compressed(struct receiver_state *r) { return true; } -#else // !ENABLE_RRDPUSH_COMPRESSION -static inline bool receiver_read_compressed(struct receiver_state *r) { - return receiver_read_uncompressed(r); -} -#endif // ENABLE_RRDPUSH_COMPRESSION /* Produce a full line if one exists, statefully return where we start next time. * When we hit the end of the buffer with a partial line move it to the beginning for the next fill. @@ -323,16 +346,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i // so, parser needs to be allocated before pushing it netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); - bool compressed_connection = false; - -#ifdef ENABLE_RRDPUSH_COMPRESSION - if(stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { - compressed_connection = true; - rrdpush_decompressor_reset(&rpt->decompressor); - } - else - rrdpush_decompressor_destroy(&rpt->decompressor); -#endif + bool compressed_connection = rrdpush_decompression_initialize(rpt); buffered_reader_init(&rpt->reader); @@ -340,10 +354,12 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i while(!receiver_should_stop(rpt)) { if(!buffered_reader_next_line(&rpt->reader, buffer)) { - bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt); + STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR; + + bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason) : receiver_read_uncompressed(rpt, &reason); if(unlikely(!have_new_data)) { - receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, false); + receiver_set_exit_reason(rpt, reason, false); break; } @@ -543,6 +559,29 @@ void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, con } +static void rrdpush_parse_compression_order(struct receiver_state *rpt, const char *order) { + rpt->config.compression_priorities[0] = STREAM_CAP_ZSTD; + rpt->config.compression_priorities[1] = STREAM_CAP_LZ4; + rpt->config.compression_priorities[2] = STREAM_CAP_GZIP; + + char *s = strdupz(order); + + char *words[COMPRESSION_ALGORITHM_MAX] = { NULL }; + size_t num_words = quoted_strings_splitter_pluginsd(s, words, COMPRESSION_ALGORITHM_MAX); + for(size_t i = 0; i < num_words ;i++) { + if(strcasecmp(words[i], "zstd") == 0) + rpt->config.compression_priorities[i] = STREAM_CAP_ZSTD; + else if(strcasecmp(words[i], "lz4") == 0) + rpt->config.compression_priorities[i] = STREAM_CAP_LZ4; + else if(strcasecmp(words[i], "gzip") == 0) + rpt->config.compression_priorities[i] = STREAM_CAP_GZIP; + else + rpt->config.compression_priorities[i] = 0; + } + + freez(s); +} + static void rrdpush_receive(struct receiver_state *rpt) { rpt->config.mode = default_rrd_memory_mode; @@ -611,11 +650,15 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step); rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step); -#ifdef ENABLE_RRDPUSH_COMPRESSION rpt->config.rrdpush_compression = default_rrdpush_compression_enabled; rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression); rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression); -#endif // ENABLE_RRDPUSH_COMPRESSION + + if(rpt->config.rrdpush_compression) { + char *order = appconfig_get(&stream_config, rpt->key, "compression algorithms order", "zstd lz4 gzip"); + order = appconfig_get(&stream_config, rpt->machine_guid, "compression algorithms order", order); + rrdpush_parse_compression_order(rpt, order); + } (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:""); @@ -709,12 +752,27 @@ static void rrdpush_receive(struct receiver_state *rpt) snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port); snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port); -#ifdef ENABLE_RRDPUSH_COMPRESSION - if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) { - if (!rpt->config.rrdpush_compression) - rpt->capabilities &= ~STREAM_CAP_COMPRESSION; + if (!rpt->config.rrdpush_compression) + rpt->capabilities &= ~STREAM_CAP_COMPRESSIONS_AVAILABLE; + + // select the right compression before sending our capabilities to the child + if(stream_has_more_than_one_capability_of(rpt->capabilities, STREAM_CAP_COMPRESSIONS_AVAILABLE)) { + STREAM_CAPABILITIES compressions = rpt->capabilities & STREAM_CAP_COMPRESSIONS_AVAILABLE; + for(int i = 0; i < COMPRESSION_ALGORITHM_MAX; i++) { + STREAM_CAPABILITIES c = rpt->config.compression_priorities[i]; + + if(!(c & STREAM_CAP_COMPRESSIONS_AVAILABLE)) + continue; + + if(compressions & c) { + STREAM_CAPABILITIES exclude = compressions; + exclude &= ~c; + + rpt->capabilities &= ~exclude; + break; + } + } } -#endif // ENABLE_RRDPUSH_COMPRESSION { // netdata_log_info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 22f7549961..e2d56399b6 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -39,9 +39,9 @@ struct config stream_config = { }; unsigned int default_rrdpush_enabled = 0; -#ifdef ENABLE_RRDPUSH_COMPRESSION +STREAM_CAPABILITIES globally_disabled_capabilities = STREAM_CAP_NONE; + unsigned int default_rrdpush_compression_enabled = 1; -#endif char *default_rrdpush_destination = NULL; char *default_rrdpush_api_key = NULL; char *default_rrdpush_send_charts_matching = NULL; @@ -67,43 +67,6 @@ static void load_stream_conf() { freez(filename); } -STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) { - - // we can have DATA_WITH_ML when INTERPOLATED is available - bool ml_capability = true; - - if(host && sender) { - // we have DATA_WITH_ML capability - // we should remove the DATA_WITH_ML capability if our database does not have anomaly info - // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML - netdata_mutex_lock(&host->receiver_lock); - - if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML)) - ml_capability = false; - - netdata_mutex_unlock(&host->receiver_lock); - } - - return STREAM_CAP_V1 | - STREAM_CAP_V2 | - STREAM_CAP_VN | - STREAM_CAP_VCAPS | - STREAM_CAP_HLABELS | - STREAM_CAP_CLAIM | - STREAM_CAP_CLABELS | - STREAM_CAP_FUNCTIONS | - STREAM_CAP_REPLICATION | - STREAM_CAP_BINARY | - STREAM_CAP_INTERPOLATED | - STREAM_HAS_COMPRESSION | -#ifdef NETDATA_TEST_DYNCFG - STREAM_CAP_DYNCFG | -#endif - (ieee754_doubles ? STREAM_CAP_IEEE754 : 0) | - (ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) | - 0; -} - bool rrdpush_receiver_needs_dbengine() { struct section *co; @@ -145,10 +108,20 @@ int rrdpush_init() { rrdhost_free_orphan_time_s = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time_s); -#ifdef ENABLE_RRDPUSH_COMPRESSION default_rrdpush_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "enable compression", default_rrdpush_compression_enabled); -#endif + + rrdpush_compression_levels[COMPRESSION_ALGORITHM_ZSTD] = (int)appconfig_get_number( + &stream_config, CONFIG_SECTION_STREAM, "zstd compression level", + rrdpush_compression_levels[COMPRESSION_ALGORITHM_ZSTD]); + + rrdpush_compression_levels[COMPRESSION_ALGORITHM_LZ4] = (int)appconfig_get_number( + &stream_config, CONFIG_SECTION_STREAM, "lz4 compression acceleration", + rrdpush_compression_levels[COMPRESSION_ALGORITHM_LZ4]); + + rrdpush_compression_levels[COMPRESSION_ALGORITHM_GZIP] = (int)appconfig_get_number( + &stream_config, CONFIG_SECTION_STREAM, "gzip compression level", + rrdpush_compression_levels[COMPRESSION_ALGORITHM_GZIP]); if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) { netdata_log_error("STREAM [send]: cannot enable sending thread - information is missing."); @@ -921,9 +894,10 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri struct receiver_state *rpt = callocz(1, sizeof(*rpt)); rpt->last_msg_t = now_monotonic_sec(); - rpt->capabilities = STREAM_CAP_INVALID; rpt->hops = 1; + rpt->capabilities = STREAM_CAP_INVALID; + __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED); __atomic_add_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED); @@ -1380,11 +1354,15 @@ static struct { { STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, "DISCONNECTED SHUTDOWN REQUESTED" }, { STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, "DISCONNECTED NETDATA EXIT" }, { STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, "DISCONNECTED PARSE ENDED" }, - { STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, "DISCONNECTED SOCKET READ ERROR" }, + {STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR, "DISCONNECTED UNKNOWN SOCKET READ ERROR" }, { STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, "DISCONNECTED PARSE ERROR" }, { STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, "DISCONNECTED RECEIVER LEFT" }, { STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST, "DISCONNECTED ORPHAN HOST" }, { STREAM_HANDSHAKE_NON_STREAMABLE_HOST, "NON STREAMABLE HOST" }, + { STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER, "DISCONNECTED NOT SUFFICIENT READ BUFFER" }, + {STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF, "DISCONNECTED SOCKET EOF" }, + {STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED, "DISCONNECTED SOCKET READ FAILED" }, + {STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT, "DISCONNECTED SOCKET READ TIMEOUT" }, { 0, NULL }, }; @@ -1405,22 +1383,24 @@ static struct { STREAM_CAPABILITIES cap; const char *str; } capability_names[] = { - { STREAM_CAP_V1, "V1" }, - { STREAM_CAP_V2, "V2" }, - { STREAM_CAP_VN, "VN" }, - { STREAM_CAP_VCAPS, "VCAPS" }, - { STREAM_CAP_HLABELS, "HLABELS" }, - { STREAM_CAP_CLAIM, "CLAIM" }, - { STREAM_CAP_CLABELS, "CLABELS" }, - { STREAM_CAP_COMPRESSION, "COMPRESSION" }, - { STREAM_CAP_FUNCTIONS, "FUNCTIONS" }, - { STREAM_CAP_REPLICATION, "REPLICATION" }, - { STREAM_CAP_BINARY, "BINARY" }, - { STREAM_CAP_INTERPOLATED, "INTERPOLATED" }, - { STREAM_CAP_IEEE754, "IEEE754" }, - { STREAM_CAP_DATA_WITH_ML, "ML" }, - { STREAM_CAP_DYNCFG, "DYN_CFG" }, - { 0 , NULL }, + {STREAM_CAP_V1, "V1" }, + {STREAM_CAP_V2, "V2" }, + {STREAM_CAP_VN, "VN" }, + {STREAM_CAP_VCAPS, "VCAPS" }, + {STREAM_CAP_HLABELS, "HLABELS" }, + {STREAM_CAP_CLAIM, "CLAIM" }, + {STREAM_CAP_CLABELS, "CLABELS" }, + {STREAM_CAP_LZ4, "LZ4" }, + {STREAM_CAP_FUNCTIONS, "FUNCTIONS" }, + {STREAM_CAP_REPLICATION, "REPLICATION" }, + {STREAM_CAP_BINARY, "BINARY" }, + {STREAM_CAP_INTERPOLATED, "INTERPOLATED" }, + {STREAM_CAP_IEEE754, "IEEE754" }, + {STREAM_CAP_DATA_WITH_ML, "ML" }, + {STREAM_CAP_DYNCFG, "DYN_CFG" }, + {STREAM_CAP_ZSTD, "ZSTD" }, + {STREAM_CAP_GZIP, "GZIP" }, + {0 , NULL }, }; static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) { @@ -1466,6 +1446,44 @@ void log_sender_capabilities(struct sender_state *s) { buffer_free(wb); } +STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) { + STREAM_CAPABILITIES disabled_capabilities = globally_disabled_capabilities; + + if(host && sender) { + // we have DATA_WITH_ML capability + // we should remove the DATA_WITH_ML capability if our database does not have anomaly info + // this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML + netdata_mutex_lock(&host->receiver_lock); + + if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML)) + disabled_capabilities |= STREAM_CAP_DATA_WITH_ML; + + netdata_mutex_unlock(&host->receiver_lock); + + if(host->sender) + disabled_capabilities |= host->sender->disabled_capabilities; + } + + return (STREAM_CAP_V1 | + STREAM_CAP_V2 | + STREAM_CAP_VN | + STREAM_CAP_VCAPS | + STREAM_CAP_HLABELS | + STREAM_CAP_CLAIM | + STREAM_CAP_CLABELS | + STREAM_CAP_FUNCTIONS | + STREAM_CAP_REPLICATION | + STREAM_CAP_BINARY | + STREAM_CAP_INTERPOLATED | + STREAM_CAP_COMPRESSIONS_AVAILABLE | + #ifdef NETDATA_TEST_DYNCFG + STREAM_CAP_DYNCFG | + #endif + STREAM_CAP_IEEE754 | + STREAM_CAP_DATA_WITH_ML | + 0) & ~disabled_capabilities; +} + STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender) { STREAM_CAPABILITIES caps = 0; @@ -1473,7 +1491,7 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDH else if(version < STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_V2 | STREAM_CAP_HLABELS; else if(version <= STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM; else if(version <= STREAM_OLD_VERSION_CLABELS) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS; - else if(version <= STREAM_OLD_VERSION_COMPRESSION) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_HAS_COMPRESSION; + else if(version <= STREAM_OLD_VERSION_LZ4) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_CAP_LZ4_AVAILABLE; else caps = version; if(caps & STREAM_CAP_VCAPS) @@ -1495,8 +1513,61 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDH } int32_t stream_capabilities_to_vn(uint32_t caps) { - if(caps & STREAM_CAP_COMPRESSION) return STREAM_OLD_VERSION_COMPRESSION; + if(caps & STREAM_CAP_LZ4) return STREAM_OLD_VERSION_LZ4; if(caps & STREAM_CAP_CLABELS) return STREAM_OLD_VERSION_CLABELS; return STREAM_OLD_VERSION_CLAIM; // if(caps & STREAM_CAP_CLAIM) } +int rrdpush_compression_levels[COMPRESSION_ALGORITHM_MAX] = { + [COMPRESSION_ALGORITHM_NONE] = 0, + [COMPRESSION_ALGORITHM_ZSTD] = 3, // 1 (faster) - 22 (best compression), + [COMPRESSION_ALGORITHM_LZ4] = 1, // 1 (best compression) - 9 (faster) + [COMPRESSION_ALGORITHM_GZIP] = 1, // 1 (faster) - 9 (best compression) +}; + +bool rrdpush_compression_initialize(struct sender_state *s) { + rrdpush_compressor_destroy(&s->compressor); + + // IMPORTANT + // KEEP THE SAME ORDER IN DECOMPRESSION + + if(stream_has_capability(s, STREAM_CAP_ZSTD)) + s->compressor.algorithm = COMPRESSION_ALGORITHM_ZSTD; + else if(stream_has_capability(s, STREAM_CAP_LZ4)) + s->compressor.algorithm = COMPRESSION_ALGORITHM_LZ4; + else if(stream_has_capability(s, STREAM_CAP_GZIP)) + s->compressor.algorithm = COMPRESSION_ALGORITHM_GZIP; + else + s->compressor.algorithm = COMPRESSION_ALGORITHM_NONE; + + if(s->compressor.algorithm != COMPRESSION_ALGORITHM_NONE) { + s->compressor.level = rrdpush_compression_levels[s->compressor.algorithm]; + rrdpush_compressor_init(&s->compressor); + return true; + } + + return false; +} + +bool rrdpush_decompression_initialize(struct receiver_state *rpt) { + rrdpush_decompressor_destroy(&rpt->decompressor); + + // IMPORTANT + // KEEP THE SAME ORDER IN COMPRESSION + + if(stream_has_capability(rpt, STREAM_CAP_ZSTD)) + rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_ZSTD; + else if(stream_has_capability(rpt, STREAM_CAP_LZ4)) + rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_LZ4; + else if(stream_has_capability(rpt, STREAM_CAP_GZIP)) + rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_GZIP; + else + rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_NONE; + + if(rpt->decompressor.algorithm != COMPRESSION_ALGORITHM_NONE) { + rrdpush_decompressor_init(&rpt->decompressor); + return true; + } + + return false; +} diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 787e63de12..2b833f1917 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -18,12 +18,14 @@ #define STREAM_OLD_VERSION_CLAIM 3 #define STREAM_OLD_VERSION_CLABELS 4 -#define STREAM_OLD_VERSION_COMPRESSION 5 // this is production +#define STREAM_OLD_VERSION_LZ4 5 // ---------------------------------------------------------------------------- // capabilities negotiation typedef enum { + STREAM_CAP_NONE = 0, + // do not use the first 3 bits // they used to be versions 1, 2 and 3 // before we introduce capabilities @@ -38,7 +40,7 @@ typedef enum { STREAM_CAP_HLABELS = (1 << 7), // host labels supported STREAM_CAP_CLAIM = (1 << 8), // claiming supported STREAM_CAP_CLABELS = (1 << 9), // chart labels supported - STREAM_CAP_COMPRESSION = (1 << 10), // lz4 compression supported + STREAM_CAP_LZ4 = (1 << 10), // lz4 compression supported STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported STREAM_CAP_REPLICATION = (1 << 12), // replication supported STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data @@ -46,22 +48,39 @@ typedef enum { STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit STREAM_CAP_DYNCFG = (1 << 17), // dynamic configuration of plugins trough streaming + STREAM_CAP_ZSTD = (1 << 19), // ZSTD compression supported + STREAM_CAP_GZIP = (1 << 20), // GZIP compression supported STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set // this must be signed int, so don't use the last bit // needed for negotiating errors between parent and child } STREAM_CAPABILITIES; -#ifdef ENABLE_RRDPUSH_COMPRESSION -#define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION +#ifdef ENABLE_LZ4 +#define STREAM_CAP_LZ4_AVAILABLE STREAM_CAP_LZ4 #else -#define STREAM_HAS_COMPRESSION 0 -#endif // ENABLE_RRDPUSH_COMPRESSION +#define STREAM_CAP_LZ4_AVAILABLE 0 +#endif // ENABLE_LZ4 + +#ifdef ENABLE_ZSTD +#define STREAM_CAP_ZSTD_AVAILABLE STREAM_CAP_ZSTD +#else +#define STREAM_CAP_ZSTD_AVAILABLE 0 +#endif // ENABLE_ZSTD + +#define STREAM_CAP_COMPRESSIONS_AVAILABLE (STREAM_CAP_LZ4_AVAILABLE|STREAM_CAP_ZSTD_AVAILABLE|STREAM_CAP_GZIP) + +extern STREAM_CAPABILITIES globally_disabled_capabilities; STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender); #define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability)) +static inline bool stream_has_more_than_one_capability_of(STREAM_CAPABILITIES caps, STREAM_CAPABILITIES mask) { + STREAM_CAPABILITIES common = (STREAM_CAPABILITIES)(caps & mask); + return (common & (common - 1)) != 0 && common != 0; +} + // ---------------------------------------------------------------------------- // stream handshake @@ -101,11 +120,15 @@ typedef enum { STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN = -15, STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT = -16, STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT = -17, - STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR = -18, + STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR = -18, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED = -19, STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT = -20, STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST = -21, STREAM_HANDSHAKE_NON_STREAMABLE_HOST = -22, + STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER = -23, + STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF = -24, + STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED = -25, + STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT = -26, } STREAM_HANDSHAKE; @@ -120,100 +143,7 @@ typedef struct { char *kernel_version; } stream_encoded_t; -#ifdef ENABLE_RRDPUSH_COMPRESSION -// signature MUST end with a newline -#define RRDPUSH_COMPRESSION_SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24)) -#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24)) -#define RRDPUSH_COMPRESSION_SIGNATURE_SIZE 4 - -struct compressor_state { - bool initialized; - char *compression_result_buffer; - size_t compression_result_buffer_size; - struct { - void *lz4_stream; - char *input_ring_buffer; - size_t input_ring_buffer_size; - size_t input_ring_buffer_pos; - } stream; - size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer); - void (*destroy)(struct compressor_state **state); -}; - -void rrdpush_compressor_reset(struct compressor_state *state); -void rrdpush_compressor_destroy(struct compressor_state *state); -size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out); - -struct decompressor_state { - bool initialized; - size_t signature_size; - size_t total_compressed; - size_t total_uncompressed; - size_t packet_count; - struct { - void *lz4_stream; - char *buffer; - size_t size; - size_t write_at; - size_t read_at; - } stream; -}; - -void rrdpush_decompressor_destroy(struct decompressor_state *state); -void rrdpush_decompressor_reset(struct decompressor_state *state); -size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size); - -static inline size_t rrdpush_decompress_decode_header(const char *data, size_t data_size) { - if (unlikely(!data || !data_size)) - return 0; - - if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE)) - return 0; - - uint32_t sign = *(uint32_t *)data; - if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE)) - return 0; - - size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7)); - return length; -} - -static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) { - if(unlikely(state->stream.read_at != state->stream.write_at)) - fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!"); - - return rrdpush_decompress_decode_header(header, header_size); -} - -static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) { - if(unlikely(state->stream.read_at > state->stream.write_at)) - fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions"); - - return state->stream.write_at - state->stream.read_at; -} - -static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) { - if (unlikely(!state || !size || !dst)) - return 0; - - size_t remaining = rrdpush_decompressed_bytes_in_buffer(state); - - if(unlikely(!remaining)) - return 0; - - size_t bytes_to_return = size; - if(bytes_to_return > remaining) - bytes_to_return = remaining; - - memcpy(dst, state->stream.buffer + state->stream.read_at, bytes_to_return); - state->stream.read_at += bytes_to_return; - - if(unlikely(state->stream.read_at > state->stream.write_at)) - fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions"); - - return bytes_to_return; -} -#endif +#include "compression.h" // Thread-local storage // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it. @@ -230,7 +160,6 @@ typedef enum __attribute__((packed)) { typedef enum __attribute__((packed)) { SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown - SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression } SENDER_FLAGS; struct function_payload_state { @@ -263,6 +192,7 @@ struct sender_state { char read_buffer[PLUGINSD_LINE_MAX + 1]; ssize_t read_len; STREAM_CAPABILITIES capabilities; + STREAM_CAPABILITIES disabled_capabilities; size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX]; @@ -274,9 +204,7 @@ struct sender_state { uint16_t hops; -#ifdef ENABLE_RRDPUSH_COMPRESSION struct compressor_state compressor; -#endif // ENABLE_RRDPUSH_COMPRESSION #ifdef ENABLE_HTTPS NETDATA_SSL ssl; // structure used to encrypt the connection @@ -421,6 +349,7 @@ struct receiver_state { time_t rrdpush_replication_step; char *rrdpush_destination; // DONT FREE - it is allocated in appconfig unsigned int rrdpush_compression; + STREAM_CAPABILITIES compression_priorities[COMPRESSION_ALGORITHM_MAX]; } config; #ifdef ENABLE_HTTPS @@ -429,9 +358,7 @@ struct receiver_state { time_t replication_first_time_t; -#ifdef ENABLE_RRDPUSH_COMPRESSION struct decompressor_state decompressor; -#endif // ENABLE_RRDPUSH_COMPRESSION /* struct { uint32_t count; @@ -453,9 +380,7 @@ struct rrdpush_destinations { }; extern unsigned int default_rrdpush_enabled; -#ifdef ENABLE_RRDPUSH_COMPRESSION extern unsigned int default_rrdpush_compression_enabled; -#endif // ENABLE_RRDPUSH_COMPRESSION extern char *default_rrdpush_destination; extern char *default_rrdpush_api_key; extern char *default_rrdpush_send_charts_matching; @@ -514,10 +439,6 @@ int connect_to_one_of_destinations( void rrdpush_signal_sender_to_wake_up(struct sender_state *s); -#ifdef ENABLE_RRDPUSH_COMPRESSION -struct compressor_state *create_compressor(); -#endif // ENABLE_RRDPUSH_COMPRESSION - void rrdpush_reset_destinations_postpone_time(RRDHOST *host); const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error); void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key); @@ -784,4 +705,7 @@ void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, cons void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags); void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name); +bool rrdpush_compression_initialize(struct sender_state *s); +bool rrdpush_decompression_initialize(struct receiver_state *rpt); + #endif //NETDATA_RRDPUSH_H diff --git a/streaming/sender.c b/streaming/sender.c index 71f8750342..b8117d974a 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -20,9 +20,12 @@ #define WORKER_SENDER_JOB_BUFFER_RATIO 15 #define WORKER_SENDER_JOB_BYTES_RECEIVED 16 #define WORKER_SENDER_JOB_BYTES_SENT 17 -#define WORKER_SENDER_JOB_REPLAY_REQUEST 18 -#define WORKER_SENDER_JOB_FUNCTION_REQUEST 19 -#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 20 +#define WORKER_SENDER_JOB_BYTES_COMPRESSED 18 +#define WORKER_SENDER_JOB_BYTES_UNCOMPRESSED 19 +#define WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO 20 +#define WORKER_SENDER_JOB_REPLAY_REQUEST 21 +#define WORKER_SENDER_JOB_FUNCTION_REQUEST 22 +#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 23 #if WORKER_UTILIZATION_MAX_JOB_TYPES < 21 #error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21 @@ -66,7 +69,6 @@ BUFFER *sender_start(struct sender_state *s) { static inline void rrdpush_sender_thread_close_socket(RRDHOST *host); -#ifdef ENABLE_RRDPUSH_COMPRESSION /* * In case of stream compression buffer overflow * Inform the user through the error log file and @@ -74,12 +76,35 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host); */ static inline void deactivate_compression(struct sender_state *s) { worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION); - netdata_log_error("STREAM_COMPRESSION: Compression returned error, disabling it."); - s->flags &= ~SENDER_FLAG_COMPRESSION; - netdata_log_error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to); + + switch(s->compressor.algorithm) { + case COMPRESSION_ALGORITHM_MAX: + case COMPRESSION_ALGORITHM_NONE: + netdata_log_error("STREAM_COMPRESSION: compression error on 'host:%s' without any compression enabled. Ignoring error.", + rrdhost_hostname(s->host)); + break; + + case COMPRESSION_ALGORITHM_GZIP: + netdata_log_error("STREAM_COMPRESSION: GZIP compression error on 'host:%s'. Disabling GZIP for this node.", + rrdhost_hostname(s->host)); + s->disabled_capabilities |= STREAM_CAP_GZIP; + break; + + case COMPRESSION_ALGORITHM_LZ4: + netdata_log_error("STREAM_COMPRESSION: LZ4 compression error on 'host:%s'. Disabling ZSTD for this node.", + rrdhost_hostname(s->host)); + s->disabled_capabilities |= STREAM_CAP_LZ4; + break; + + case COMPRESSION_ALGORITHM_ZSTD: + netdata_log_error("STREAM_COMPRESSION: ZSTD compression error on 'host:%s'. Disabling ZSTD for this node.", + rrdhost_hostname(s->host)); + s->disabled_capabilities |= STREAM_CAP_ZSTD; + break; + } + rrdpush_sender_thread_close_socket(s->host); } -#endif #define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3 @@ -117,8 +142,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE; } -#ifdef ENABLE_RRDPUSH_COMPRESSION - if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor.initialized) { + if (s->compressor.initialized) { while(src_len) { size_t size_to_compress = src_len; @@ -143,13 +167,13 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) } } - char *dst; + const char *dst; size_t dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst); if (!dst_len) { netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying", rrdhost_hostname(s->host), s->connected_to); - rrdpush_compressor_reset(&s->compressor); + rrdpush_compression_initialize(s); dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst); if(!dst_len) { netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression", @@ -161,10 +185,25 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) } } - if(cbuffer_add_unsafe(s->buffer, dst, dst_len)) + rrdpush_signature_t signature = rrdpush_compress_encode_signature(dst_len); + +#ifdef NETDATA_INTERNAL_CHECKS + // check if reversing the signature provides the same length + size_t decoded_dst_len = rrdpush_decompress_decode_signature((const char *)&signature, sizeof(signature)); + if(decoded_dst_len != dst_len) + fatal("RRDPUSH COMPRESSION: invalid signature, original payload %zu bytes, " + "compressed payload length %zu bytes, but signature says payload is %zu bytes", + size_to_compress, dst_len, decoded_dst_len); +#endif + + if(cbuffer_add_unsafe(s->buffer, (const char *)&signature, sizeof(signature))) s->flags |= SENDER_FLAG_OVERFLOW; - else - s->sent_bytes_on_this_connection_per_type[type] += dst_len; + else { + if(cbuffer_add_unsafe(s->buffer, dst, dst_len)) + s->flags |= SENDER_FLAG_OVERFLOW; + else + s->sent_bytes_on_this_connection_per_type[type] += dst_len + sizeof(signature); + } src = src + size_to_compress; src_len -= size_to_compress; @@ -174,12 +213,6 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) s->flags |= SENDER_FLAG_OVERFLOW; else s->sent_bytes_on_this_connection_per_type[type] += src_len; -#else - if(cbuffer_add_unsafe(s->buffer, src, src_len)) - s->flags |= SENDER_FLAG_OVERFLOW; - else - s->sent_bytes_on_this_connection_per_type[type] += src_len; -#endif replication_recalculate_buffer_used_ratio_unsafe(s); @@ -600,12 +633,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p // reset our capabilities to default s->capabilities = stream_our_capabilities(host, true); -#ifdef ENABLE_RRDPUSH_COMPRESSION - // If we don't want compression, remove it from our capabilities - if(!(s->flags & SENDER_FLAG_COMPRESSION)) - s->capabilities &= ~STREAM_CAP_COMPRESSION; -#endif // ENABLE_RRDPUSH_COMPRESSION - /* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the version negotiation resulted in a high enough version. */ @@ -766,12 +793,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p if(!rrdpush_sender_validate_response(host, s, http, bytes)) return false; -#ifdef ENABLE_RRDPUSH_COMPRESSION - if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) - rrdpush_compressor_reset(&s->compressor); - else - rrdpush_compressor_destroy(&s->compressor); -#endif // ENABLE_RRDPUSH_COMPRESSION + rrdpush_compression_initialize(s); log_sender_capabilities(s); @@ -1303,6 +1325,9 @@ void *rrdpush_sender_thread(void *ptr) { worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT); worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT); + worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_COMPRESSED, "bytes compressed", "bytes/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_UNCOMPRESSED, "bytes uncompressed", "bytes/s", WORKER_METRIC_INCREMENTAL_TOTAL); + worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO, "cumulative compression savings ratio", "%", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE); struct sender_state *s = ptr; @@ -1423,6 +1448,15 @@ void *rrdpush_sender_thread(void *ptr) { rrdpush_sender_pipe_clear_pending_data(s); rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false); } + + if(s->compressor.initialized) { + size_t bytes_uncompressed = s->compressor.sender_locked.total_uncompressed; + size_t bytes_compressed = s->compressor.sender_locked.total_compressed + s->compressor.sender_locked.total_compressions * sizeof(rrdpush_signature_t); + NETDATA_DOUBLE ratio = 100.0 - ((NETDATA_DOUBLE)bytes_compressed * 100.0 / (NETDATA_DOUBLE)bytes_uncompressed); + worker_set_metric(WORKER_SENDER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_uncompressed); + worker_set_metric(WORKER_SENDER_JOB_BYTES_COMPRESSED, (NETDATA_DOUBLE)bytes_compressed); + worker_set_metric(WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO, ratio); + } sender_unlock(s); worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size); diff --git a/streaming/stream.conf b/streaming/stream.conf index 94e94cab7e..0841953940 100644 --- a/streaming/stream.conf +++ b/streaming/stream.conf @@ -168,11 +168,14 @@ # Stream Compression # By default it is enabled. # You can control stream compression in this parent agent stream with options: yes | no - #enable compression = yes + enable compression = yes + + # select the order the compression algorithms will be used, when multiple are offered by the child + compression algorithms order = zstd lz4 gzip # Replication # Enable replication for all hosts using this api key. Default: enabled - #enable replication = yes + enable replication = yes # How many seconds to replicate from each child. Default: a day #seconds to replicate = 86400 diff --git a/web/api/web_api_v1.c b/web/api/web_api_v1.c index 6488640177..458b29751f 100644 --- a/web/api/web_api_v1.c +++ b/web/api/web_api_v1.c @@ -1272,12 +1272,8 @@ inline int web_client_api_request_v1_info_fill_buffer(RRDHOST *host, BUFFER *wb) buffer_json_member_add_boolean(wb, "web-enabled", web_server_mode != WEB_SERVER_MODE_NONE); buffer_json_member_add_boolean(wb, "stream-enabled", default_rrdpush_enabled); -#ifdef ENABLE_RRDPUSH_COMPRESSION buffer_json_member_add_boolean(wb, "stream-compression", - host->sender && stream_has_capability(host->sender, STREAM_CAP_COMPRESSION)); -#else // ! ENABLE_RRDPUSH_COMPRESSION - buffer_json_member_add_boolean(wb, "stream-compression", false); -#endif // ENABLE_RRDPUSH_COMPRESSION + host->sender && host->sender->compressor.initialized); #ifdef ENABLE_HTTPS buffer_json_member_add_boolean(wb, "https-enabled", true);