ZSTD and GZIP/DEFLATE streaming support (#16268)

* move compression header to compression.h

* prototype with zstd compression

* updated capabilities

* no need for resetting compression

* left-over reset function

* use ZSTD_compressStream() instead of ZSTD_compressStream2() for backwards compatibility

* remove call to LZ4_decoderRingBufferSize()

* debug signature failures

* fix the buffers of lz4

* fix decoding of zstd

* detect compression based on initialization; prefer ZSTD over LZ4

* allow both lz4 and zstd

* initialize zstd streams

* define missing ZSTD_CLEVEL_DEFAULT

* log zero compressed size

* debug log

* flush compression buffer

* add sender compression statistics

* removed debugging messages

* do not fail if zstd is not available

* cleanup and buildinfo

* fix max message size, use zstd level 1, add compressio ratio reporting

* use compression level 1

* fix ratio title

* better compression error logs

* for backwards compatibility use buffers of COMPRESSION_MAX_CHUNK

* switch to default compression level

* additional streaming error conditions detection

* do not expose compression stats when compression is not enabled

* test for the right lz4 functions

* moved lz4 and zstd to their own files

* add gzip streaming compression

* gzip error handling

* added unittest for streaming compression

* eliminate a copy of the uncompressed data during zstd compression

* eliminate not needed zstd allocations

* cleanup

* decode gzip with Z_SYNC_FLUSH

* set the decoding gzip algorithm

* user configuration for compression levels and compression algorithms order

* fix exclusion of not preferred compressions

* remove now obsolete compression define, since gzip is always available

* rename compression algorithms order in stream.conf

* move common checks in compression.c

* cleanup

* backwards compatible error checking
This commit is contained in:
Costa Tsaousis 2023-10-27 15:37:34 +01:00 committed by GitHub
parent 89978b5b84
commit cd584e0357
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1358 additions and 419 deletions

View File

@ -666,6 +666,13 @@ API_PLUGIN_FILES = \
STREAMING_PLUGIN_FILES = \
streaming/rrdpush.c \
streaming/compression.c \
streaming/compression.h \
streaming/compression_gzip.c \
streaming/compression_gzip.h \
streaming/compression_lz4.c \
streaming/compression_lz4.h \
streaming/compression_zstd.c \
streaming/compression_zstd.h \
streaming/sender.c \
streaming/receiver.c \
streaming/replication.h \
@ -1143,6 +1150,7 @@ NETDATA_COMMON_LIBS = \
$(OPTIONAL_MQTT_LIBS) \
$(OPTIONAL_UV_LIBS) \
$(OPTIONAL_LZ4_LIBS) \
$(OPTIONAL_ZSTD_LIBS) \
$(OPTIONAL_DATACHANNEL_LIBS) \
libjudy.a \
$(OPTIONAL_SSL_LIBS) \

View File

@ -555,16 +555,28 @@ OPTIONAL_UV_LIBS="${UV_LIBS}"
AC_CHECK_LIB(
[lz4],
[LZ4_initStream],
[LZ4_createStream],
[LZ4_LIBS_FAST="-llz4"]
)
AC_CHECK_LIB(
[lz4],
[LZ4_compress_default],
[LZ4_compress_fast_continue],
[LZ4_LIBS="-llz4"]
)
# -----------------------------------------------------------------------------
# zstd
AC_CHECK_LIB([zstd], [ZSTD_createCStream, ZSTD_compressStream, ZSTD_decompressStream, ZSTD_createDStream],
[LIBZSTD_FOUND=yes],
[LIBZSTD_FOUND=no])
if test "x$LIBZSTD_FOUND" = "xyes"; then
AC_DEFINE([ENABLE_ZSTD], [1], [libzstd usability])
OPTIONAL_ZSTD_LIBS="-lzstd"
fi
# -----------------------------------------------------------------------------
# zlib
@ -702,7 +714,7 @@ if test "${enable_lz4}" != "no"; then
AC_TRY_LINK(
[ #include <lz4.h> ],
[
LZ4_stream_t* stream = LZ4_initStream(NULL, 0);
LZ4_stream_t* stream = LZ4_createStream();
],
[ enable_lz4="yes"],
[ enable_lz4="no" ]
@ -1900,6 +1912,7 @@ AC_SUBST([OPTIONAL_MATH_LIBS])
AC_SUBST([OPTIONAL_DATACHANNEL_LIBS])
AC_SUBST([OPTIONAL_UV_LIBS])
AC_SUBST([OPTIONAL_LZ4_LIBS])
AC_SUBST([OPTIONAL_ZSTD_LIBS])
AC_SUBST([OPTIONAL_SSL_LIBS])
AC_SUBST([OPTIONAL_JSONC_LIBS])
AC_SUBST([OPTIONAL_YAML_LIBS])

View File

@ -48,6 +48,7 @@ typedef enum __attribute__((packed)) {
BIB_FEATURE_CLOUD,
BIB_FEATURE_HEALTH,
BIB_FEATURE_STREAMING,
BIB_FEATURE_BACKFILLING,
BIB_FEATURE_REPLICATION,
BIB_FEATURE_STREAMING_COMPRESSION,
BIB_FEATURE_CONTEXTS,
@ -66,6 +67,7 @@ typedef enum __attribute__((packed)) {
BIB_CONNECTIVITY_NATIVE_HTTPS,
BIB_CONNECTIVITY_TLS_HOST_VERIFY,
BIB_LIB_LZ4,
BIB_LIB_ZSTD,
BIB_LIB_ZLIB,
BIB_LIB_JUDY,
BIB_LIB_DLIB,
@ -484,6 +486,14 @@ static struct {
.json = "streaming",
.value = NULL,
},
[BIB_FEATURE_BACKFILLING] = {
.category = BIC_FEATURE,
.type = BIT_BOOLEAN,
.analytics = NULL,
.print = "Back-filling (of higher database tiers)",
.json = "back-filling",
.value = NULL,
},
[BIB_FEATURE_REPLICATION] = {
.category = BIC_FEATURE,
.type = BIT_BOOLEAN,
@ -498,7 +508,7 @@ static struct {
.analytics = "Stream Compression",
.print = "Streaming and Replication Compression",
.json = "stream-compression",
.value = "none",
.value = NULL,
},
[BIB_FEATURE_CONTEXTS] = {
.category = BIC_FEATURE,
@ -628,6 +638,14 @@ static struct {
.json = "lz4",
.value = NULL,
},
[BIB_LIB_ZSTD] = {
.category = BIC_LIBS,
.type = BIT_BOOLEAN,
.analytics = NULL,
.print = "ZSTD (fast, lossless compression algorithm)",
.json = "zstd",
.value = NULL,
},
[BIB_LIB_ZLIB] = {
.category = BIC_LIBS,
.type = BIT_BOOLEAN,
@ -1029,6 +1047,23 @@ static void build_info_set_value(BUILD_INFO_SLOT slot, const char *value) {
BUILD_INFO[slot].value = value;
}
static void build_info_append_value(BUILD_INFO_SLOT slot, const char *value) {
size_t size = BUILD_INFO[slot].value ? strlen(BUILD_INFO[slot].value) + 1 : 0;
size += strlen(value);
char buf[size + 1];
if(BUILD_INFO[slot].value) {
strcpy(buf, BUILD_INFO[slot].value);
strcat(buf, " ");
strcat(buf, value);
}
else
strcpy(buf, value);
freez((void *)BUILD_INFO[slot].value);
BUILD_INFO[slot].value = strdupz(buf);
}
static void build_info_set_value_strdupz(BUILD_INFO_SLOT slot, const char *value) {
if(!value) value = "";
build_info_set_value(slot, strdupz(value));
@ -1075,14 +1110,18 @@ __attribute__((constructor)) void initialize_build_info(void) {
build_info_set_status(BIB_FEATURE_HEALTH, true);
build_info_set_status(BIB_FEATURE_STREAMING, true);
build_info_set_status(BIB_FEATURE_BACKFILLING, true);
build_info_set_status(BIB_FEATURE_REPLICATION, true);
#ifdef ENABLE_RRDPUSH_COMPRESSION
build_info_set_status(BIB_FEATURE_STREAMING_COMPRESSION, true);
#ifdef ENABLE_ZSTD
build_info_append_value(BIB_FEATURE_STREAMING_COMPRESSION, "zstd");
#endif
#ifdef ENABLE_LZ4
build_info_set_value(BIB_FEATURE_STREAMING_COMPRESSION, "lz4");
#endif
build_info_append_value(BIB_FEATURE_STREAMING_COMPRESSION, "lz4");
#endif
build_info_append_value(BIB_FEATURE_STREAMING_COMPRESSION, "gzip");
build_info_set_status(BIB_FEATURE_CONTEXTS, true);
build_info_set_status(BIB_FEATURE_TIERING, true);
@ -1117,6 +1156,9 @@ __attribute__((constructor)) void initialize_build_info(void) {
#ifdef ENABLE_LZ4
build_info_set_status(BIB_LIB_LZ4, true);
#endif
#ifdef ENABLE_ZSTD
build_info_set_status(BIB_LIB_ZSTD, true);
#endif
build_info_set_status(BIB_LIB_ZLIB, true);

View File

@ -1337,6 +1337,7 @@ int julytest(void);
int pluginsd_parser_unittest(void);
void replication_initialize(void);
void bearer_tokens_init(void);
int unittest_rrdpush_compressions(void);
int main(int argc, char **argv) {
// initialize the system clocks
@ -1550,6 +1551,10 @@ int main(int argc, char **argv) {
unittest_running = true;
return pluginsd_parser_unittest();
}
else if(strcmp(optarg, "rrdpush_compressions_test") == 0) {
unittest_running = true;
return unittest_rrdpush_compressions();
}
else if(strncmp(optarg, createdataset_string, strlen(createdataset_string)) == 0) {
optarg += strlen(createdataset_string);
unsigned history_seconds = strtoul(optarg, NULL, 0);
@ -1901,6 +1906,8 @@ int main(int argc, char **argv) {
netdata_log_info("Netdata agent version \""VERSION"\" is starting");
ieee754_doubles = is_system_ieee754_double();
if(!ieee754_doubles)
globally_disabled_capabilities |= STREAM_CAP_IEEE754;
aral_judy_init();

View File

@ -1145,13 +1145,10 @@ static void rrdhost_streaming_sender_structures_init(RRDHOST *host)
host->sender->rrdpush_sender_pipe[PIPE_READ] = -1;
host->sender->rrdpush_sender_pipe[PIPE_WRITE] = -1;
host->sender->rrdpush_sender_socket = -1;
host->sender->disabled_capabilities = STREAM_CAP_NONE;
#ifdef ENABLE_RRDPUSH_COMPRESSION
if(default_rrdpush_compression_enabled)
host->sender->flags |= SENDER_FLAG_COMPRESSION;
else
host->sender->flags &= ~SENDER_FLAG_COMPRESSION;
#endif
if(!default_rrdpush_compression_enabled)
host->sender->disabled_capabilities |= STREAM_CAP_COMPRESSIONS_AVAILABLE;
spinlock_init(&host->sender->spinlock);
replication_init_sender(host->sender);
@ -1167,9 +1164,7 @@ static void rrdhost_streaming_sender_structures_free(RRDHOST *host)
rrdpush_sender_thread_stop(host, STREAM_HANDSHAKE_DISCONNECT_HOST_CLEANUP, true); // stop a possibly running thread
cbuffer_free(host->sender->buffer);
#ifdef ENABLE_RRDPUSH_COMPRESSION
rrdpush_compressor_destroy(&host->sender->compressor);
#endif
replication_cleanup_sender(host->sender);
@ -1885,9 +1880,7 @@ void rrdhost_status(RRDHOST *host, time_t now, RRDHOST_STATUS *s) {
else
s->stream.status = RRDHOST_STREAM_STATUS_ONLINE;
#ifdef ENABLE_RRDPUSH_COMPRESSION
s->stream.compression = (stream_has_capability(host->sender, STREAM_CAP_COMPRESSION) && host->sender->compressor.initialized);
#endif
s->stream.compression = host->sender->compressor.initialized;
}
else {
s->stream.status = RRDHOST_STREAM_STATUS_OFFLINE;

View File

@ -11,10 +11,6 @@ extern "C" {
#include <config.h>
#endif
#ifdef ENABLE_LZ4
#define ENABLE_RRDPUSH_COMPRESSION 1
#endif
#ifdef ENABLE_OPENSSL
#define ENABLE_HTTPS 1
#endif
@ -681,9 +677,10 @@ static inline BITMAPX *bitmapX_create(uint32_t bits) {
#define bitmap1024_get_bit(ptr, idx) bitmapX_get_bit((BITMAPX *)ptr, idx)
#define bitmap1024_set_bit(ptr, idx, value) bitmapX_set_bit((BITMAPX *)ptr, idx, value)
#define COMPRESSION_MAX_MSG_SIZE 0x4000
#define PLUGINSD_LINE_MAX (COMPRESSION_MAX_MSG_SIZE - 1024)
#define COMPRESSION_MAX_CHUNK 0x4000
#define COMPRESSION_MAX_OVERHEAD 128
#define COMPRESSION_MAX_MSG_SIZE (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD - 1)
#define PLUGINSD_LINE_MAX (COMPRESSION_MAX_MSG_SIZE - 768)
int pluginsd_isspace(char c);
int config_isspace(char c);
int group_by_label_isspace(char c);

View File

@ -1,181 +1,286 @@
#include "rrdpush.h"
// SPDX-License-Identifier: GPL-3.0-or-later
#ifdef ENABLE_RRDPUSH_COMPRESSION
#include "lz4.h"
#include "compression.h"
#define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION"
#include "compression_gzip.h"
/*
* Reset compressor state for a new stream
*/
void rrdpush_compressor_reset(struct compressor_state *state) {
if(!state->initialized) {
state->initialized = true;
#ifdef ENABLE_LZ4
#include "compression_lz4.h"
#endif
state->stream.lz4_stream = LZ4_createStream();
state->stream.input_ring_buffer_size = LZ4_DECODER_RING_BUFFER_SIZE(COMPRESSION_MAX_MSG_SIZE * 2);
state->stream.input_ring_buffer = callocz(1, state->stream.input_ring_buffer_size);
state->compression_result_buffer_size = 0;
#ifdef ENABLE_ZSTD
#include "compression_zstd.h"
#endif
// ----------------------------------------------------------------------------
// compressor public API
void rrdpush_compressor_init(struct compressor_state *state) {
switch(state->algorithm) {
#ifdef ENABLE_ZSTD
case COMPRESSION_ALGORITHM_ZSTD:
rrdpush_compressor_init_zstd(state);
break;
#endif
#ifdef ENABLE_LZ4
case COMPRESSION_ALGORITHM_LZ4:
rrdpush_compressor_init_lz4(state);
break;
#endif
default:
case COMPRESSION_ALGORITHM_GZIP:
rrdpush_compressor_init_gzip(state);
break;
}
LZ4_resetStream_fast(state->stream.lz4_stream);
state->stream.input_ring_buffer_pos = 0;
simple_ring_buffer_reset(&state->input);
simple_ring_buffer_reset(&state->output);
}
/*
* Destroy compressor state and all related data
*/
void rrdpush_compressor_destroy(struct compressor_state *state) {
if (state->stream.lz4_stream) {
LZ4_freeStream(state->stream.lz4_stream);
state->stream.lz4_stream = NULL;
switch(state->algorithm) {
#ifdef ENABLE_ZSTD
case COMPRESSION_ALGORITHM_ZSTD:
rrdpush_compressor_destroy_zstd(state);
break;
#endif
#ifdef ENABLE_LZ4
case COMPRESSION_ALGORITHM_LZ4:
rrdpush_compressor_destroy_lz4(state);
break;
#endif
default:
case COMPRESSION_ALGORITHM_GZIP:
rrdpush_compressor_destroy_gzip(state);
break;
}
freez(state->stream.input_ring_buffer);
state->stream.input_ring_buffer = NULL;
freez(state->compression_result_buffer);
state->compression_result_buffer = NULL;
state->initialized = false;
simple_ring_buffer_destroy(&state->input);
simple_ring_buffer_destroy(&state->output);
}
/*
* Compress the given block of data
* Compressed data will remain in the internal buffer until the next invocation
* Return the size of compressed data block as result and the pointer to internal buffer using the last argument
* or 0 in case of error
*/
size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out) {
if(unlikely(!state || !size || !out))
return 0;
size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, const char **out) {
size_t ret = 0;
if(unlikely(size > COMPRESSION_MAX_MSG_SIZE)) {
netdata_log_error("RRDPUSH COMPRESS: Compression Failed - Message size %lu above compression buffer limit: %d",
(long unsigned int)size, COMPRESSION_MAX_MSG_SIZE);
switch(state->algorithm) {
#ifdef ENABLE_ZSTD
case COMPRESSION_ALGORITHM_ZSTD:
ret = rrdpush_compress_zstd(state, data, size, out);
break;
#endif
#ifdef ENABLE_LZ4
case COMPRESSION_ALGORITHM_LZ4:
ret = rrdpush_compress_lz4(state, data, size, out);
break;
#endif
default:
case COMPRESSION_ALGORITHM_GZIP:
ret = rrdpush_compress_gzip(state, data, size, out);
break;
}
if(unlikely(ret >= COMPRESSION_MAX_CHUNK)) {
netdata_log_error("RRDPUSH_COMPRESS: compressed data is %zu bytes, which is >= than the max chunk size %zu",
ret, COMPRESSION_MAX_CHUNK);
return 0;
}
size_t max_dst_size = LZ4_COMPRESSBOUND(size);
size_t data_size = max_dst_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE;
if (!state->compression_result_buffer) {
state->compression_result_buffer = mallocz(data_size);
state->compression_result_buffer_size = data_size;
}
else if(unlikely(state->compression_result_buffer_size < data_size)) {
state->compression_result_buffer = reallocz(state->compression_result_buffer, data_size);
state->compression_result_buffer_size = data_size;
}
// the ring buffer always has space for LZ4_MAX_MSG_SIZE
memcpy(state->stream.input_ring_buffer + state->stream.input_ring_buffer_pos, data, size);
// this call needs the last 64K of our previous data
// they are available in the ring buffer
long int compressed_data_size = LZ4_compress_fast_continue(
state->stream.lz4_stream,
state->stream.input_ring_buffer + state->stream.input_ring_buffer_pos,
state->compression_result_buffer + RRDPUSH_COMPRESSION_SIGNATURE_SIZE,
(int)size,
(int)max_dst_size,
1);
if (compressed_data_size < 0) {
netdata_log_error("Data compression error: %ld", compressed_data_size);
return 0;
}
// update the next writing position of the ring buffer
state->stream.input_ring_buffer_pos += size;
if(unlikely(state->stream.input_ring_buffer_pos >= state->stream.input_ring_buffer_size - COMPRESSION_MAX_MSG_SIZE))
state->stream.input_ring_buffer_pos = 0;
// update the signature header
uint32_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8;
*(uint32_t *)state->compression_result_buffer = len | RRDPUSH_COMPRESSION_SIGNATURE;
*out = state->compression_result_buffer;
netdata_log_debug(D_STREAM, "%s: Compressed data header: %ld", STREAM_COMPRESSION_MSG, compressed_data_size);
return compressed_data_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE;
return ret;
}
/*
* Decompress the compressed data in the internal buffer
* Return the size of uncompressed data or 0 for error
*/
size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
if (unlikely(!state || !compressed_data || !compressed_size))
return 0;
if(unlikely(state->stream.read_at != state->stream.write_at))
fatal("RRDPUSH_DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");
if (unlikely(state->stream.write_at >= state->stream.size / 2)) {
state->stream.write_at = 0;
state->stream.read_at = 0;
}
long int decompressed_size = LZ4_decompress_safe_continue(
state->stream.lz4_stream
, compressed_data
, state->stream.buffer + state->stream.write_at
, (int)compressed_size
, (int)(state->stream.size - state->stream.write_at)
);
if (unlikely(decompressed_size < 0)) {
netdata_log_error("RRDPUSH DECOMPRESS: decompressor returned negative decompressed bytes: %ld", decompressed_size);
return 0;
}
if(unlikely(decompressed_size + state->stream.write_at > state->stream.size))
fatal("RRDPUSH DECOMPRESS: decompressor overflown the stream_buffer. size: %zu, pos: %zu, added: %ld, "
"exceeding the buffer by %zu"
, state->stream.size
, state->stream.write_at
, decompressed_size
, (size_t)(state->stream.write_at + decompressed_size - state->stream.size)
);
state->stream.write_at += decompressed_size;
// statistics
state->total_compressed += compressed_size + RRDPUSH_COMPRESSION_SIGNATURE_SIZE;
state->total_uncompressed += decompressed_size;
state->packet_count++;
return decompressed_size;
}
void rrdpush_decompressor_reset(struct decompressor_state *state) {
if(!state->initialized) {
state->initialized = true;
state->stream.lz4_stream = LZ4_createStreamDecode();
state->stream.size = LZ4_decoderRingBufferSize(COMPRESSION_MAX_MSG_SIZE) * 2;
state->stream.buffer = mallocz(state->stream.size);
}
LZ4_setStreamDecode(state->stream.lz4_stream, NULL, 0);
state->signature_size = RRDPUSH_COMPRESSION_SIGNATURE_SIZE;
state->stream.write_at = 0;
state->stream.read_at = 0;
}
// ----------------------------------------------------------------------------
// decompressor public API
void rrdpush_decompressor_destroy(struct decompressor_state *state) {
if(unlikely(!state->initialized))
return;
if (state->stream.lz4_stream) {
LZ4_freeStreamDecode(state->stream.lz4_stream);
state->stream.lz4_stream = NULL;
switch(state->algorithm) {
#ifdef ENABLE_ZSTD
case COMPRESSION_ALGORITHM_ZSTD:
rrdpush_decompressor_destroy_zstd(state);
break;
#endif
#ifdef ENABLE_LZ4
case COMPRESSION_ALGORITHM_LZ4:
rrdpush_decompressor_destroy_lz4(state);
break;
#endif
default:
case COMPRESSION_ALGORITHM_GZIP:
rrdpush_decompressor_destroy_gzip(state);
break;
}
freez(state->stream.buffer);
state->stream.buffer = NULL;
simple_ring_buffer_destroy(&state->output);
state->initialized = false;
}
void rrdpush_decompressor_init(struct decompressor_state *state) {
switch(state->algorithm) {
#ifdef ENABLE_ZSTD
case COMPRESSION_ALGORITHM_ZSTD:
rrdpush_decompressor_init_zstd(state);
break;
#endif
#ifdef ENABLE_LZ4
case COMPRESSION_ALGORITHM_LZ4:
rrdpush_decompressor_init_lz4(state);
break;
#endif
default:
case COMPRESSION_ALGORITHM_GZIP:
rrdpush_decompressor_init_gzip(state);
break;
}
state->signature_size = RRDPUSH_COMPRESSION_SIGNATURE_SIZE;
simple_ring_buffer_reset(&state->output);
}
size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
if (unlikely(state->output.read_pos != state->output.write_pos))
fatal("RRDPUSH_DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");
size_t ret = 0;
switch(state->algorithm) {
#ifdef ENABLE_ZSTD
case COMPRESSION_ALGORITHM_ZSTD:
ret = rrdpush_decompress_zstd(state, compressed_data, compressed_size);
break;
#endif
#ifdef ENABLE_LZ4
case COMPRESSION_ALGORITHM_LZ4:
ret = rrdpush_decompress_lz4(state, compressed_data, compressed_size);
break;
#endif
default:
case COMPRESSION_ALGORITHM_GZIP:
ret = rrdpush_decompress_gzip(state, compressed_data, compressed_size);
break;
}
// for backwards compatibility we cannot check for COMPRESSION_MAX_MSG_SIZE,
// because old children may send this big payloads.
if(unlikely(ret > COMPRESSION_MAX_CHUNK)) {
netdata_log_error("RRDPUSH_DECOMPRESS: decompressed data is %zu bytes, which is bigger than the max msg size %zu",
ret, COMPRESSION_MAX_CHUNK);
return 0;
}
return ret;
}
// ----------------------------------------------------------------------------
// unit test
int unittest_rrdpush_compression(compression_algorithm_t algorithm, const char *name) {
fprintf(stderr, "\nTesting streaming compression with %s\n", name);
struct compressor_state cctx = {
.initialized = false,
.algorithm = algorithm,
};
struct decompressor_state dctx = {
.initialized = false,
.algorithm = algorithm,
};
char txt[COMPRESSION_MAX_MSG_SIZE];
rrdpush_compressor_init(&cctx);
rrdpush_decompressor_init(&dctx);
int errors = 0;
memset(txt, '=', COMPRESSION_MAX_MSG_SIZE);
for(int i = 0; i < COMPRESSION_MAX_MSG_SIZE ;i++) {
txt[i] = 'A' + (i % 26);
size_t txt_len = i + 1;
const char *out;
size_t size = rrdpush_compress(&cctx, txt, txt_len, &out);
if(size >= COMPRESSION_MAX_CHUNK) {
fprintf(stderr, "iteration %d: compressed size %zu exceeds max allowed size\n",
i, size);
errors++;
goto cleanup;
}
else {
size_t dtxt_len = rrdpush_decompress(&dctx, out, size);
char *dtxt = (char *) &dctx.output.data[dctx.output.read_pos];
if(rrdpush_decompressed_bytes_in_buffer(&dctx) != dtxt_len) {
fprintf(stderr, "iteration %d: decompressed size %zu does not rrdpush_decompressed_bytes_in_buffer() %zu\n",
i, dtxt_len, rrdpush_decompressed_bytes_in_buffer(&dctx)
);
errors++;
goto cleanup;
}
if(dtxt_len != txt_len) {
fprintf(stderr, "iteration %d: decompressed size %zu does not match original size %zu\n",
i, dtxt_len, txt_len
);
errors++;
goto cleanup;
}
else {
if(memcmp(txt, dtxt, txt_len) != 0) {
txt[txt_len] = '\0';
dtxt[txt_len + 5] = '\0';
fprintf(stderr, "iteration %d: decompressed data '%s' do not match original data '%s' of length %zu\n",
i, dtxt, txt, txt_len);
errors++;
goto cleanup;
}
}
}
// fill the compressed buffer with garbage
memset((void *)out, 'x', size);
// here we are supposed to copy the data and advance the position
dctx.output.read_pos += rrdpush_decompressed_bytes_in_buffer(&dctx);
}
cleanup:
rrdpush_compressor_destroy(&cctx);
rrdpush_decompressor_destroy(&dctx);
if(errors)
fprintf(stderr, "Compression with %s: FAILED (%d errors)\n", name, errors);
else
fprintf(stderr, "Compression with %s: OK\n", name);
return errors;
}
int unittest_rrdpush_compressions(void) {
int ret = 0;
ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_GZIP, "GZIP");
ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_LZ4, "LZ4");
ret += unittest_rrdpush_compression(COMPRESSION_ALGORITHM_ZSTD, "ZSTD");
return ret;
}

171
streaming/compression.h Normal file
View File

@ -0,0 +1,171 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
#ifndef NETDATA_RRDPUSH_COMPRESSION_H
#define NETDATA_RRDPUSH_COMPRESSION_H 1
// signature MUST end with a newline
#if COMPRESSION_MAX_MSG_SIZE >= (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD)
#error "COMPRESSION_MAX_MSG_SIZE >= (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD)"
#endif
typedef uint32_t rrdpush_signature_t;
#define RRDPUSH_COMPRESSION_SIGNATURE ((rrdpush_signature_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((rrdpush_signature_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
#define RRDPUSH_COMPRESSION_SIGNATURE_SIZE sizeof(rrdpush_signature_t)
static inline rrdpush_signature_t rrdpush_compress_encode_signature(size_t compressed_data_size) {
rrdpush_signature_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8;
return len | RRDPUSH_COMPRESSION_SIGNATURE;
}
typedef enum {
COMPRESSION_ALGORITHM_NONE = 0,
COMPRESSION_ALGORITHM_ZSTD,
COMPRESSION_ALGORITHM_LZ4,
COMPRESSION_ALGORITHM_GZIP,
// terminator
COMPRESSION_ALGORITHM_MAX,
} compression_algorithm_t;
extern int rrdpush_compression_levels[COMPRESSION_ALGORITHM_MAX];
// ----------------------------------------------------------------------------
typedef struct simple_ring_buffer {
const char *data;
size_t size;
size_t read_pos;
size_t write_pos;
} SIMPLE_RING_BUFFER;
static inline void simple_ring_buffer_reset(SIMPLE_RING_BUFFER *b) {
b->read_pos = b->write_pos = 0;
}
static inline void simple_ring_buffer_make_room(SIMPLE_RING_BUFFER *b, size_t size) {
if(b->write_pos + size > b->size) {
if(!b->size)
b->size = COMPRESSION_MAX_CHUNK;
else
b->size *= 2;
if(b->write_pos + size > b->size)
b->size += size;
b->data = (const char *)reallocz((void *)b->data, b->size);
}
}
static inline void simple_ring_buffer_append_data(SIMPLE_RING_BUFFER *b, const void *data, size_t size) {
simple_ring_buffer_make_room(b, size);
memcpy((void *)(b->data + b->write_pos), data, size);
b->write_pos += size;
}
static inline void simple_ring_buffer_destroy(SIMPLE_RING_BUFFER *b) {
freez((void *)b->data);
b->data = NULL;
b->read_pos = b->write_pos = b->size = 0;
}
// ----------------------------------------------------------------------------
struct compressor_state {
bool initialized;
compression_algorithm_t algorithm;
SIMPLE_RING_BUFFER input;
SIMPLE_RING_BUFFER output;
int level;
void *stream;
struct {
size_t total_compressed;
size_t total_uncompressed;
size_t total_compressions;
} sender_locked;
};
void rrdpush_compressor_init(struct compressor_state *state);
void rrdpush_compressor_destroy(struct compressor_state *state);
size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, const char **out);
// ----------------------------------------------------------------------------
struct decompressor_state {
bool initialized;
compression_algorithm_t algorithm;
size_t signature_size;
size_t total_compressed;
size_t total_uncompressed;
size_t total_compressions;
SIMPLE_RING_BUFFER output;
void *stream;
};
void rrdpush_decompressor_destroy(struct decompressor_state *state);
void rrdpush_decompressor_init(struct decompressor_state *state);
size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
static inline size_t rrdpush_decompress_decode_signature(const char *data, size_t data_size) {
if (unlikely(!data || !data_size))
return 0;
if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE))
return 0;
rrdpush_signature_t sign = *(rrdpush_signature_t *)data;
if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE))
return 0;
size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
return length;
}
static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) {
if(unlikely(state->output.read_pos != state->output.write_pos))
fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");
return rrdpush_decompress_decode_signature(header, header_size);
}
static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) {
if(unlikely(state->output.read_pos > state->output.write_pos))
fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
return state->output.write_pos - state->output.read_pos;
}
static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) {
if (unlikely(!state || !size || !dst))
return 0;
size_t remaining = rrdpush_decompressed_bytes_in_buffer(state);
if(unlikely(!remaining))
return 0;
size_t bytes_to_return = size;
if(bytes_to_return > remaining)
bytes_to_return = remaining;
memcpy(dst, state->output.data + state->output.read_pos, bytes_to_return);
state->output.read_pos += bytes_to_return;
if(unlikely(state->output.read_pos > state->output.write_pos))
fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
return bytes_to_return;
}
// ----------------------------------------------------------------------------
#endif // NETDATA_RRDPUSH_COMPRESSION_H 1

View File

@ -0,0 +1,158 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "compression_gzip.h"
#include <zlib.h>
void rrdpush_compressor_init_gzip(struct compressor_state *state) {
if (!state->initialized) {
state->initialized = true;
// Initialize deflate stream
z_stream *strm = state->stream = (z_stream *) mallocz(sizeof(z_stream));
strm->zalloc = Z_NULL;
strm->zfree = Z_NULL;
strm->opaque = Z_NULL;
if(state->level < Z_BEST_SPEED)
state->level = Z_BEST_SPEED;
if(state->level > Z_BEST_COMPRESSION)
state->level = Z_BEST_COMPRESSION;
// int r = deflateInit2(strm, Z_BEST_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY);
int r = deflateInit2(strm, state->level, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY);
if (r != Z_OK) {
netdata_log_error("Failed to initialize deflate with error: %d", r);
freez(state->stream);
state->initialized = false;
return;
}
}
}
void rrdpush_compressor_destroy_gzip(struct compressor_state *state) {
if (state->stream) {
deflateEnd(state->stream);
free(state->stream);
state->stream = NULL;
}
}
size_t rrdpush_compress_gzip(struct compressor_state *state, const char *data, size_t size, const char **out) {
if (unlikely(!state || !size || !out))
return 0;
simple_ring_buffer_make_room(&state->output, deflateBound(state->stream, size));
z_stream *strm = state->stream;
strm->avail_in = (uInt)size;
strm->next_in = (Bytef *)data;
strm->avail_out = (uInt)state->output.size;
strm->next_out = (Bytef *)state->output.data;
int ret = deflate(strm, Z_SYNC_FLUSH);
if (ret != Z_OK && ret != Z_STREAM_END) {
netdata_log_error("STREAM: deflate() failed with error %d", ret);
return 0;
}
if(strm->avail_in != 0) {
netdata_log_error("STREAM: deflate() did not use all the input buffer, %u bytes out of %zu remain",
strm->avail_in, size);
return 0;
}
if(strm->avail_out == 0) {
netdata_log_error("STREAM: deflate() needs a bigger output buffer than the one we provided "
"(output buffer %zu bytes, compressed payload %zu bytes)",
state->output.size, size);
return 0;
}
size_t compressed_data_size = state->output.size - strm->avail_out;
if(compressed_data_size == 0) {
netdata_log_error("STREAM: deflate() did not produce any output "
"(output buffer %zu bytes, compressed payload %zu bytes)",
state->output.size, size);
return 0;
}
state->sender_locked.total_compressions++;
state->sender_locked.total_uncompressed += size;
state->sender_locked.total_compressed += compressed_data_size;
*out = state->output.data;
return compressed_data_size;
}
void rrdpush_decompressor_init_gzip(struct decompressor_state *state) {
if (!state->initialized) {
state->initialized = true;
// Initialize inflate stream
z_stream *strm = state->stream = (z_stream *) malloc(sizeof(z_stream));
strm->zalloc = Z_NULL;
strm->zfree = Z_NULL;
strm->opaque = Z_NULL;
inflateInit2(strm, 15 + 16);
simple_ring_buffer_make_room(&state->output, COMPRESSION_MAX_CHUNK);
}
}
void rrdpush_decompressor_destroy_gzip(struct decompressor_state *state) {
if (state->stream) {
inflateEnd(state->stream);
free(state->stream);
state->stream = NULL;
}
}
size_t rrdpush_decompress_gzip(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
if (unlikely(!state || !compressed_data || !compressed_size))
return 0;
// The state.output ring buffer is always EMPTY at this point,
// meaning that (state->output.read_pos == state->output.write_pos)
// However, THEY ARE NOT ZERO.
z_stream *strm = state->stream;
strm->avail_in = (uInt)compressed_size;
strm->next_in = (Bytef *)compressed_data;
strm->avail_out = (uInt)state->output.size;
strm->next_out = (Bytef *)state->output.data;
int ret = inflate(strm, Z_SYNC_FLUSH);
if (ret != Z_STREAM_END && ret != Z_OK) {
netdata_log_error("RRDPUSH DECOMPRESS: inflate() failed with error %d", ret);
return 0;
}
if(strm->avail_in != 0) {
netdata_log_error("RRDPUSH DECOMPRESS: inflate() did not use all compressed data we provided "
"(compressed payload %zu bytes, remaining to be uncompressed %u)"
, compressed_size, strm->avail_in);
return 0;
}
if(strm->avail_out == 0) {
netdata_log_error("RRDPUSH DECOMPRESS: inflate() needs a bigger output buffer than the one we provided "
"(compressed payload %zu bytes, output buffer size %zu bytes)"
, compressed_size, state->output.size);
return 0;
}
size_t decompressed_size = state->output.size - strm->avail_out;
state->output.read_pos = 0;
state->output.write_pos = decompressed_size;
state->total_compressed += compressed_size;
state->total_uncompressed += decompressed_size;
state->total_compressions++;
return decompressed_size;
}

View File

@ -0,0 +1,15 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "compression.h"
#ifndef NETDATA_STREAMING_COMPRESSION_GZIP_H
#define NETDATA_STREAMING_COMPRESSION_GZIP_H
void rrdpush_compressor_init_gzip(struct compressor_state *state);
void rrdpush_compressor_destroy_gzip(struct compressor_state *state);
size_t rrdpush_compress_gzip(struct compressor_state *state, const char *data, size_t size, const char **out);
size_t rrdpush_decompress_gzip(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
void rrdpush_decompressor_init_gzip(struct decompressor_state *state);
void rrdpush_decompressor_destroy_gzip(struct decompressor_state *state);
#endif //NETDATA_STREAMING_COMPRESSION_GZIP_H

143
streaming/compression_lz4.c Normal file
View File

@ -0,0 +1,143 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "compression_lz4.h"
#ifdef ENABLE_LZ4
#include "lz4.h"
// ----------------------------------------------------------------------------
// compress
void rrdpush_compressor_init_lz4(struct compressor_state *state) {
if(!state->initialized) {
state->initialized = true;
state->stream = LZ4_createStream();
// LZ4 needs access to the last 64KB of source data
// so, we keep twice the size of each message
simple_ring_buffer_make_room(&state->input, 65536 + COMPRESSION_MAX_CHUNK * 2);
}
}
void rrdpush_compressor_destroy_lz4(struct compressor_state *state) {
if (state->stream) {
LZ4_freeStream(state->stream);
state->stream = NULL;
}
}
/*
* Compress the given block of data
* Compressed data will remain in the internal buffer until the next invocation
* Return the size of compressed data block as result and the pointer to internal buffer using the last argument
* or 0 in case of error
*/
size_t rrdpush_compress_lz4(struct compressor_state *state, const char *data, size_t size, const char **out) {
if(unlikely(!state || !size || !out))
return 0;
// we need to keep the last 64K of our previous source data
// as they were in the ring buffer
simple_ring_buffer_make_room(&state->output, LZ4_COMPRESSBOUND(size));
if(state->input.write_pos + size > state->input.size)
// the input buffer cannot fit out data, restart from zero
simple_ring_buffer_reset(&state->input);
simple_ring_buffer_append_data(&state->input, data, size);
long int compressed_data_size = LZ4_compress_fast_continue(
state->stream,
state->input.data + state->input.read_pos,
(char *)state->output.data,
(int)(state->input.write_pos - state->input.read_pos),
(int)state->output.size,
state->level);
if (compressed_data_size <= 0) {
netdata_log_error("STREAM: LZ4_compress_fast_continue() returned %ld "
"(source is %zu bytes, output buffer can fit %zu bytes)",
compressed_data_size, size, state->output.size);
return 0;
}
state->input.read_pos = state->input.write_pos;
state->sender_locked.total_compressions++;
state->sender_locked.total_uncompressed += size;
state->sender_locked.total_compressed += compressed_data_size;
*out = state->output.data;
return compressed_data_size;
}
// ----------------------------------------------------------------------------
// decompress
void rrdpush_decompressor_init_lz4(struct decompressor_state *state) {
if(!state->initialized) {
state->initialized = true;
state->stream = LZ4_createStreamDecode();
simple_ring_buffer_make_room(&state->output, 65536 + COMPRESSION_MAX_CHUNK * 2);
}
}
void rrdpush_decompressor_destroy_lz4(struct decompressor_state *state) {
if (state->stream) {
LZ4_freeStreamDecode(state->stream);
state->stream = NULL;
}
}
/*
* Decompress the compressed data in the internal buffer
* Return the size of uncompressed data or 0 for error
*/
size_t rrdpush_decompress_lz4(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
if (unlikely(!state || !compressed_data || !compressed_size))
return 0;
// The state.output ring buffer is always EMPTY at this point,
// meaning that (state->output.read_pos == state->output.write_pos)
// However, THEY ARE NOT ZERO.
if (unlikely(state->output.write_pos + COMPRESSION_MAX_CHUNK > state->output.size))
// the input buffer cannot fit out data, restart from zero
simple_ring_buffer_reset(&state->output);
long int decompressed_size = LZ4_decompress_safe_continue(
state->stream
, compressed_data
, (char *)(state->output.data + state->output.write_pos)
, (int)compressed_size
, (int)(state->output.size - state->output.write_pos)
);
if (unlikely(decompressed_size < 0)) {
netdata_log_error("RRDPUSH DECOMPRESS: LZ4_decompress_safe_continue() returned negative value: %ld "
"(compressed chunk is %zu bytes)"
, decompressed_size, compressed_size);
return 0;
}
if(unlikely(decompressed_size + state->output.write_pos > state->output.size))
fatal("RRDPUSH DECOMPRESS: LZ4_decompress_safe_continue() overflown the stream_buffer "
"(size: %zu, pos: %zu, added: %ld, exceeding the buffer by %zu)"
, state->output.size
, state->output.write_pos
, decompressed_size
, (size_t)(state->output.write_pos + decompressed_size - state->output.size)
);
state->output.write_pos += decompressed_size;
// statistics
state->total_compressed += compressed_size;
state->total_uncompressed += decompressed_size;
state->total_compressions++;
return decompressed_size;
}
#endif // ENABLE_LZ4

View File

@ -0,0 +1,19 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "compression.h"
#ifndef NETDATA_STREAMING_COMPRESSION_LZ4_H
#define NETDATA_STREAMING_COMPRESSION_LZ4_H
#ifdef ENABLE_LZ4
void rrdpush_compressor_init_lz4(struct compressor_state *state);
void rrdpush_compressor_destroy_lz4(struct compressor_state *state);
size_t rrdpush_compress_lz4(struct compressor_state *state, const char *data, size_t size, const char **out);
size_t rrdpush_decompress_lz4(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
void rrdpush_decompressor_init_lz4(struct decompressor_state *state);
void rrdpush_decompressor_destroy_lz4(struct decompressor_state *state);
#endif // ENABLE_LZ4
#endif //NETDATA_STREAMING_COMPRESSION_LZ4_H

View File

@ -0,0 +1,163 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "compression_zstd.h"
#ifdef ENABLE_ZSTD
#include <zstd.h>
void rrdpush_compressor_init_zstd(struct compressor_state *state) {
if(!state->initialized) {
state->initialized = true;
state->stream = ZSTD_createCStream();
if(state->level < 1)
state->level = 1;
if(state->level > ZSTD_maxCLevel())
state->level = ZSTD_maxCLevel();
size_t ret = ZSTD_initCStream(state->stream, state->level);
if(ZSTD_isError(ret))
netdata_log_error("STREAM: ZSTD_initCStream() returned error: %s", ZSTD_getErrorName(ret));
// ZSTD_CCtx_setParameter(state->stream, ZSTD_c_compressionLevel, 1);
// ZSTD_CCtx_setParameter(state->stream, ZSTD_c_strategy, ZSTD_fast);
}
}
void rrdpush_compressor_destroy_zstd(struct compressor_state *state) {
if(state->stream) {
ZSTD_freeCStream(state->stream);
state->stream = NULL;
}
}
size_t rrdpush_compress_zstd(struct compressor_state *state, const char *data, size_t size, const char **out) {
if(unlikely(!state || !size || !out))
return 0;
ZSTD_inBuffer inBuffer = {
.pos = 0,
.size = size,
.src = data,
};
size_t wanted_size = MAX(ZSTD_compressBound(inBuffer.size - inBuffer.pos), ZSTD_CStreamOutSize());
simple_ring_buffer_make_room(&state->output, wanted_size);
ZSTD_outBuffer outBuffer = {
.pos = 0,
.size = state->output.size,
.dst = (void *)state->output.data,
};
// compress
size_t ret = ZSTD_compressStream(state->stream, &outBuffer, &inBuffer);
// error handling
if(ZSTD_isError(ret)) {
netdata_log_error("STREAM: ZSTD_compressStream() return error: %s", ZSTD_getErrorName(ret));
return 0;
}
if(inBuffer.pos < inBuffer.size) {
netdata_log_error("STREAM: ZSTD_compressStream() left unprocessed input (source payload %zu bytes, consumed %zu bytes)",
inBuffer.size, inBuffer.pos);
return 0;
}
if(outBuffer.pos == 0) {
// ZSTD needs more input to flush the output, so let's flush it manually
ret = ZSTD_flushStream(state->stream, &outBuffer);
if(ZSTD_isError(ret)) {
netdata_log_error("STREAM: ZSTD_flushStream() return error: %s", ZSTD_getErrorName(ret));
return 0;
}
if(outBuffer.pos == 0) {
netdata_log_error("STREAM: ZSTD_compressStream() returned zero compressed bytes "
"(source is %zu bytes, output buffer can fit %zu bytes) "
, size, outBuffer.size);
return 0;
}
}
state->sender_locked.total_compressions++;
state->sender_locked.total_uncompressed += size;
state->sender_locked.total_compressed += outBuffer.pos;
// return values
*out = state->output.data;
return outBuffer.pos;
}
void rrdpush_decompressor_init_zstd(struct decompressor_state *state) {
if(!state->initialized) {
state->initialized = true;
state->stream = ZSTD_createDStream();
size_t ret = ZSTD_initDStream(state->stream);
if(ZSTD_isError(ret))
netdata_log_error("STREAM: ZSTD_initDStream() returned error: %s", ZSTD_getErrorName(ret));
simple_ring_buffer_make_room(&state->output, MAX(COMPRESSION_MAX_CHUNK, ZSTD_DStreamOutSize()));
}
}
void rrdpush_decompressor_destroy_zstd(struct decompressor_state *state) {
if (state->stream) {
ZSTD_freeDStream(state->stream);
state->stream = NULL;
}
}
size_t rrdpush_decompress_zstd(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
if (unlikely(!state || !compressed_data || !compressed_size))
return 0;
// The state.output ring buffer is always EMPTY at this point,
// meaning that (state->output.read_pos == state->output.write_pos)
// However, THEY ARE NOT ZERO.
ZSTD_inBuffer inBuffer = {
.pos = 0,
.size = compressed_size,
.src = compressed_data,
};
ZSTD_outBuffer outBuffer = {
.pos = 0,
.dst = (char *)state->output.data,
.size = state->output.size,
};
size_t ret = ZSTD_decompressStream(
state->stream
, &outBuffer
, &inBuffer);
if(ZSTD_isError(ret)) {
netdata_log_error("STREAM: ZSTD_decompressStream() return error: %s", ZSTD_getErrorName(ret));
return 0;
}
if(inBuffer.pos < inBuffer.size)
fatal("RRDPUSH DECOMPRESS: ZSTD ZSTD_decompressStream() decompressed %zu bytes, "
"but %zu bytes of compressed data remain",
inBuffer.pos, inBuffer.size);
size_t decompressed_size = outBuffer.pos;
state->output.read_pos = 0;
state->output.write_pos = outBuffer.pos;
// statistics
state->total_compressed += compressed_size;
state->total_uncompressed += decompressed_size;
state->total_compressions++;
return decompressed_size;
}
#endif // ENABLE_ZSTD

View File

@ -0,0 +1,19 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "compression.h"
#ifndef NETDATA_STREAMING_COMPRESSION_ZSTD_H
#define NETDATA_STREAMING_COMPRESSION_ZSTD_H
#ifdef ENABLE_ZSTD
void rrdpush_compressor_init_zstd(struct compressor_state *state);
void rrdpush_compressor_destroy_zstd(struct compressor_state *state);
size_t rrdpush_compress_zstd(struct compressor_state *state, const char *data, size_t size, const char **out);
size_t rrdpush_decompress_zstd(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
void rrdpush_decompressor_init_zstd(struct decompressor_state *state);
void rrdpush_decompressor_destroy_zstd(struct decompressor_state *state);
#endif // ENABLE_ZSTD
#endif //NETDATA_STREAMING_COMPRESSION_ZSTD_H

View File

@ -28,9 +28,7 @@ void receiver_state_free(struct receiver_state *rpt) {
close(rpt->fd);
}
#ifdef ENABLE_RRDPUSH_COMPRESSION
rrdpush_decompressor_destroy(&rpt->decompressor);
#endif
if(rpt->system_info)
rrdhost_system_info_free(rpt->system_info);
@ -92,15 +90,44 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz
return (int)bytes_read;
}
static inline bool receiver_read_uncompressed(struct receiver_state *r) {
static inline STREAM_HANDSHAKE read_stream_error_to_reason(int code) {
if(code > 0)
return 0;
switch(code) {
case 0:
// asked to read zero bytes
return STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER;
case -1:
// EOF
return STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF;
case -2:
// failed to read
return STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED;
case -3:
// timeout
return STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT;
default:
// anything else
return STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR;
}
}
static inline bool receiver_read_uncompressed(struct receiver_state *r, STREAM_HANDSHAKE *reason) {
#ifdef NETDATA_INTERNAL_CHECKS
if(r->reader.read_buffer[r->reader.read_len] != '\0')
fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
#endif
int bytes_read = read_stream(r, r->reader.read_buffer + r->reader.read_len, sizeof(r->reader.read_buffer) - r->reader.read_len - 1);
if(unlikely(bytes_read <= 0))
if(unlikely(bytes_read <= 0)) {
*reason = read_stream_error_to_reason(bytes_read);
return false;
}
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read);
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_read);
@ -111,8 +138,7 @@ static inline bool receiver_read_uncompressed(struct receiver_state *r) {
return true;
}
#ifdef ENABLE_RRDPUSH_COMPRESSION
static inline bool receiver_read_compressed(struct receiver_state *r) {
static inline bool receiver_read_compressed(struct receiver_state *r, STREAM_HANDSHAKE *reason) {
internal_fatal(r->reader.read_buffer[r->reader.read_len] != '\0',
"%s: read_buffer does not start with zero #2", __FUNCTION__ );
@ -150,8 +176,10 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
int bytes_read = 0;
do {
int ret = read_stream(r, r->reader.read_buffer + r->reader.read_len + bytes_read, r->decompressor.signature_size - bytes_read);
if (unlikely(ret <= 0))
if (unlikely(ret <= 0)) {
*reason = read_stream_error_to_reason(ret);
return false;
}
bytes_read += ret;
} while(unlikely(bytes_read < (int)r->decompressor.signature_size));
@ -187,7 +215,7 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
int last_read_bytes = read_stream(r, &compressed[start], remaining);
if (unlikely(last_read_bytes <= 0)) {
internal_error(true, "read_stream() failed #2, with code %d", last_read_bytes);
*reason = read_stream_error_to_reason(last_read_bytes);
return false;
}
@ -217,11 +245,6 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
return true;
}
#else // !ENABLE_RRDPUSH_COMPRESSION
static inline bool receiver_read_compressed(struct receiver_state *r) {
return receiver_read_uncompressed(r);
}
#endif // ENABLE_RRDPUSH_COMPRESSION
/* Produce a full line if one exists, statefully return where we start next time.
* When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
@ -323,16 +346,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
// so, parser needs to be allocated before pushing it
netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
bool compressed_connection = false;
#ifdef ENABLE_RRDPUSH_COMPRESSION
if(stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) {
compressed_connection = true;
rrdpush_decompressor_reset(&rpt->decompressor);
}
else
rrdpush_decompressor_destroy(&rpt->decompressor);
#endif
bool compressed_connection = rrdpush_decompression_initialize(rpt);
buffered_reader_init(&rpt->reader);
@ -340,10 +354,12 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
while(!receiver_should_stop(rpt)) {
if(!buffered_reader_next_line(&rpt->reader, buffer)) {
bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt);
STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR;
bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason) : receiver_read_uncompressed(rpt, &reason);
if(unlikely(!have_new_data)) {
receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, false);
receiver_set_exit_reason(rpt, reason, false);
break;
}
@ -543,6 +559,29 @@ void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, con
}
static void rrdpush_parse_compression_order(struct receiver_state *rpt, const char *order) {
rpt->config.compression_priorities[0] = STREAM_CAP_ZSTD;
rpt->config.compression_priorities[1] = STREAM_CAP_LZ4;
rpt->config.compression_priorities[2] = STREAM_CAP_GZIP;
char *s = strdupz(order);
char *words[COMPRESSION_ALGORITHM_MAX] = { NULL };
size_t num_words = quoted_strings_splitter_pluginsd(s, words, COMPRESSION_ALGORITHM_MAX);
for(size_t i = 0; i < num_words ;i++) {
if(strcasecmp(words[i], "zstd") == 0)
rpt->config.compression_priorities[i] = STREAM_CAP_ZSTD;
else if(strcasecmp(words[i], "lz4") == 0)
rpt->config.compression_priorities[i] = STREAM_CAP_LZ4;
else if(strcasecmp(words[i], "gzip") == 0)
rpt->config.compression_priorities[i] = STREAM_CAP_GZIP;
else
rpt->config.compression_priorities[i] = 0;
}
freez(s);
}
static void rrdpush_receive(struct receiver_state *rpt)
{
rpt->config.mode = default_rrd_memory_mode;
@ -611,11 +650,15 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step);
rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step);
#ifdef ENABLE_RRDPUSH_COMPRESSION
rpt->config.rrdpush_compression = default_rrdpush_compression_enabled;
rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression);
rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression);
#endif // ENABLE_RRDPUSH_COMPRESSION
if(rpt->config.rrdpush_compression) {
char *order = appconfig_get(&stream_config, rpt->key, "compression algorithms order", "zstd lz4 gzip");
order = appconfig_get(&stream_config, rpt->machine_guid, "compression algorithms order", order);
rrdpush_parse_compression_order(rpt, order);
}
(void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:"");
@ -709,12 +752,27 @@ static void rrdpush_receive(struct receiver_state *rpt)
snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", rpt->client_ip, rpt->client_port);
#ifdef ENABLE_RRDPUSH_COMPRESSION
if (stream_has_capability(rpt, STREAM_CAP_COMPRESSION)) {
if (!rpt->config.rrdpush_compression)
rpt->capabilities &= ~STREAM_CAP_COMPRESSION;
if (!rpt->config.rrdpush_compression)
rpt->capabilities &= ~STREAM_CAP_COMPRESSIONS_AVAILABLE;
// select the right compression before sending our capabilities to the child
if(stream_has_more_than_one_capability_of(rpt->capabilities, STREAM_CAP_COMPRESSIONS_AVAILABLE)) {
STREAM_CAPABILITIES compressions = rpt->capabilities & STREAM_CAP_COMPRESSIONS_AVAILABLE;
for(int i = 0; i < COMPRESSION_ALGORITHM_MAX; i++) {
STREAM_CAPABILITIES c = rpt->config.compression_priorities[i];
if(!(c & STREAM_CAP_COMPRESSIONS_AVAILABLE))
continue;
if(compressions & c) {
STREAM_CAPABILITIES exclude = compressions;
exclude &= ~c;
rpt->capabilities &= ~exclude;
break;
}
}
}
#endif // ENABLE_RRDPUSH_COMPRESSION
{
// netdata_log_info("STREAM %s [receive from [%s]:%s]: initializing communication...", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);

View File

@ -39,9 +39,9 @@ struct config stream_config = {
};
unsigned int default_rrdpush_enabled = 0;
#ifdef ENABLE_RRDPUSH_COMPRESSION
STREAM_CAPABILITIES globally_disabled_capabilities = STREAM_CAP_NONE;
unsigned int default_rrdpush_compression_enabled = 1;
#endif
char *default_rrdpush_destination = NULL;
char *default_rrdpush_api_key = NULL;
char *default_rrdpush_send_charts_matching = NULL;
@ -67,43 +67,6 @@ static void load_stream_conf() {
freez(filename);
}
STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) {
// we can have DATA_WITH_ML when INTERPOLATED is available
bool ml_capability = true;
if(host && sender) {
// we have DATA_WITH_ML capability
// we should remove the DATA_WITH_ML capability if our database does not have anomaly info
// this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML
netdata_mutex_lock(&host->receiver_lock);
if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML))
ml_capability = false;
netdata_mutex_unlock(&host->receiver_lock);
}
return STREAM_CAP_V1 |
STREAM_CAP_V2 |
STREAM_CAP_VN |
STREAM_CAP_VCAPS |
STREAM_CAP_HLABELS |
STREAM_CAP_CLAIM |
STREAM_CAP_CLABELS |
STREAM_CAP_FUNCTIONS |
STREAM_CAP_REPLICATION |
STREAM_CAP_BINARY |
STREAM_CAP_INTERPOLATED |
STREAM_HAS_COMPRESSION |
#ifdef NETDATA_TEST_DYNCFG
STREAM_CAP_DYNCFG |
#endif
(ieee754_doubles ? STREAM_CAP_IEEE754 : 0) |
(ml_capability ? STREAM_CAP_DATA_WITH_ML : 0) |
0;
}
bool rrdpush_receiver_needs_dbengine() {
struct section *co;
@ -145,10 +108,20 @@ int rrdpush_init() {
rrdhost_free_orphan_time_s = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time_s);
#ifdef ENABLE_RRDPUSH_COMPRESSION
default_rrdpush_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
"enable compression", default_rrdpush_compression_enabled);
#endif
rrdpush_compression_levels[COMPRESSION_ALGORITHM_ZSTD] = (int)appconfig_get_number(
&stream_config, CONFIG_SECTION_STREAM, "zstd compression level",
rrdpush_compression_levels[COMPRESSION_ALGORITHM_ZSTD]);
rrdpush_compression_levels[COMPRESSION_ALGORITHM_LZ4] = (int)appconfig_get_number(
&stream_config, CONFIG_SECTION_STREAM, "lz4 compression acceleration",
rrdpush_compression_levels[COMPRESSION_ALGORITHM_LZ4]);
rrdpush_compression_levels[COMPRESSION_ALGORITHM_GZIP] = (int)appconfig_get_number(
&stream_config, CONFIG_SECTION_STREAM, "gzip compression level",
rrdpush_compression_levels[COMPRESSION_ALGORITHM_GZIP]);
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
netdata_log_error("STREAM [send]: cannot enable sending thread - information is missing.");
@ -921,9 +894,10 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
struct receiver_state *rpt = callocz(1, sizeof(*rpt));
rpt->last_msg_t = now_monotonic_sec();
rpt->capabilities = STREAM_CAP_INVALID;
rpt->hops = 1;
rpt->capabilities = STREAM_CAP_INVALID;
__atomic_add_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED);
__atomic_add_fetch(&netdata_buffers_statistics.rrdhost_allocations_size, sizeof(struct rrdhost_system_info), __ATOMIC_RELAXED);
@ -1380,11 +1354,15 @@ static struct {
{ STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, "DISCONNECTED SHUTDOWN REQUESTED" },
{ STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, "DISCONNECTED NETDATA EXIT" },
{ STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT, "DISCONNECTED PARSE ENDED" },
{ STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR, "DISCONNECTED SOCKET READ ERROR" },
{STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR, "DISCONNECTED UNKNOWN SOCKET READ ERROR" },
{ STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, "DISCONNECTED PARSE ERROR" },
{ STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT, "DISCONNECTED RECEIVER LEFT" },
{ STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST, "DISCONNECTED ORPHAN HOST" },
{ STREAM_HANDSHAKE_NON_STREAMABLE_HOST, "NON STREAMABLE HOST" },
{ STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER, "DISCONNECTED NOT SUFFICIENT READ BUFFER" },
{STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF, "DISCONNECTED SOCKET EOF" },
{STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED, "DISCONNECTED SOCKET READ FAILED" },
{STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT, "DISCONNECTED SOCKET READ TIMEOUT" },
{ 0, NULL },
};
@ -1405,22 +1383,24 @@ static struct {
STREAM_CAPABILITIES cap;
const char *str;
} capability_names[] = {
{ STREAM_CAP_V1, "V1" },
{ STREAM_CAP_V2, "V2" },
{ STREAM_CAP_VN, "VN" },
{ STREAM_CAP_VCAPS, "VCAPS" },
{ STREAM_CAP_HLABELS, "HLABELS" },
{ STREAM_CAP_CLAIM, "CLAIM" },
{ STREAM_CAP_CLABELS, "CLABELS" },
{ STREAM_CAP_COMPRESSION, "COMPRESSION" },
{ STREAM_CAP_FUNCTIONS, "FUNCTIONS" },
{ STREAM_CAP_REPLICATION, "REPLICATION" },
{ STREAM_CAP_BINARY, "BINARY" },
{ STREAM_CAP_INTERPOLATED, "INTERPOLATED" },
{ STREAM_CAP_IEEE754, "IEEE754" },
{ STREAM_CAP_DATA_WITH_ML, "ML" },
{ STREAM_CAP_DYNCFG, "DYN_CFG" },
{ 0 , NULL },
{STREAM_CAP_V1, "V1" },
{STREAM_CAP_V2, "V2" },
{STREAM_CAP_VN, "VN" },
{STREAM_CAP_VCAPS, "VCAPS" },
{STREAM_CAP_HLABELS, "HLABELS" },
{STREAM_CAP_CLAIM, "CLAIM" },
{STREAM_CAP_CLABELS, "CLABELS" },
{STREAM_CAP_LZ4, "LZ4" },
{STREAM_CAP_FUNCTIONS, "FUNCTIONS" },
{STREAM_CAP_REPLICATION, "REPLICATION" },
{STREAM_CAP_BINARY, "BINARY" },
{STREAM_CAP_INTERPOLATED, "INTERPOLATED" },
{STREAM_CAP_IEEE754, "IEEE754" },
{STREAM_CAP_DATA_WITH_ML, "ML" },
{STREAM_CAP_DYNCFG, "DYN_CFG" },
{STREAM_CAP_ZSTD, "ZSTD" },
{STREAM_CAP_GZIP, "GZIP" },
{0 , NULL },
};
static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) {
@ -1466,6 +1446,44 @@ void log_sender_capabilities(struct sender_state *s) {
buffer_free(wb);
}
STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender) {
STREAM_CAPABILITIES disabled_capabilities = globally_disabled_capabilities;
if(host && sender) {
// we have DATA_WITH_ML capability
// we should remove the DATA_WITH_ML capability if our database does not have anomaly info
// this can happen under these conditions: 1. we don't run ML, and 2. we don't receive ML
netdata_mutex_lock(&host->receiver_lock);
if(!ml_host_running(host) && !stream_has_capability(host->receiver, STREAM_CAP_DATA_WITH_ML))
disabled_capabilities |= STREAM_CAP_DATA_WITH_ML;
netdata_mutex_unlock(&host->receiver_lock);
if(host->sender)
disabled_capabilities |= host->sender->disabled_capabilities;
}
return (STREAM_CAP_V1 |
STREAM_CAP_V2 |
STREAM_CAP_VN |
STREAM_CAP_VCAPS |
STREAM_CAP_HLABELS |
STREAM_CAP_CLAIM |
STREAM_CAP_CLABELS |
STREAM_CAP_FUNCTIONS |
STREAM_CAP_REPLICATION |
STREAM_CAP_BINARY |
STREAM_CAP_INTERPOLATED |
STREAM_CAP_COMPRESSIONS_AVAILABLE |
#ifdef NETDATA_TEST_DYNCFG
STREAM_CAP_DYNCFG |
#endif
STREAM_CAP_IEEE754 |
STREAM_CAP_DATA_WITH_ML |
0) & ~disabled_capabilities;
}
STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDHOST *host, bool sender) {
STREAM_CAPABILITIES caps = 0;
@ -1473,7 +1491,7 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDH
else if(version < STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_V2 | STREAM_CAP_HLABELS;
else if(version <= STREAM_OLD_VERSION_CLAIM) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM;
else if(version <= STREAM_OLD_VERSION_CLABELS) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS;
else if(version <= STREAM_OLD_VERSION_COMPRESSION) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_HAS_COMPRESSION;
else if(version <= STREAM_OLD_VERSION_LZ4) caps = STREAM_CAP_VN | STREAM_CAP_HLABELS | STREAM_CAP_CLAIM | STREAM_CAP_CLABELS | STREAM_CAP_LZ4_AVAILABLE;
else caps = version;
if(caps & STREAM_CAP_VCAPS)
@ -1495,8 +1513,61 @@ STREAM_CAPABILITIES convert_stream_version_to_capabilities(int32_t version, RRDH
}
int32_t stream_capabilities_to_vn(uint32_t caps) {
if(caps & STREAM_CAP_COMPRESSION) return STREAM_OLD_VERSION_COMPRESSION;
if(caps & STREAM_CAP_LZ4) return STREAM_OLD_VERSION_LZ4;
if(caps & STREAM_CAP_CLABELS) return STREAM_OLD_VERSION_CLABELS;
return STREAM_OLD_VERSION_CLAIM; // if(caps & STREAM_CAP_CLAIM)
}
int rrdpush_compression_levels[COMPRESSION_ALGORITHM_MAX] = {
[COMPRESSION_ALGORITHM_NONE] = 0,
[COMPRESSION_ALGORITHM_ZSTD] = 3, // 1 (faster) - 22 (best compression),
[COMPRESSION_ALGORITHM_LZ4] = 1, // 1 (best compression) - 9 (faster)
[COMPRESSION_ALGORITHM_GZIP] = 1, // 1 (faster) - 9 (best compression)
};
bool rrdpush_compression_initialize(struct sender_state *s) {
rrdpush_compressor_destroy(&s->compressor);
// IMPORTANT
// KEEP THE SAME ORDER IN DECOMPRESSION
if(stream_has_capability(s, STREAM_CAP_ZSTD))
s->compressor.algorithm = COMPRESSION_ALGORITHM_ZSTD;
else if(stream_has_capability(s, STREAM_CAP_LZ4))
s->compressor.algorithm = COMPRESSION_ALGORITHM_LZ4;
else if(stream_has_capability(s, STREAM_CAP_GZIP))
s->compressor.algorithm = COMPRESSION_ALGORITHM_GZIP;
else
s->compressor.algorithm = COMPRESSION_ALGORITHM_NONE;
if(s->compressor.algorithm != COMPRESSION_ALGORITHM_NONE) {
s->compressor.level = rrdpush_compression_levels[s->compressor.algorithm];
rrdpush_compressor_init(&s->compressor);
return true;
}
return false;
}
bool rrdpush_decompression_initialize(struct receiver_state *rpt) {
rrdpush_decompressor_destroy(&rpt->decompressor);
// IMPORTANT
// KEEP THE SAME ORDER IN COMPRESSION
if(stream_has_capability(rpt, STREAM_CAP_ZSTD))
rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_ZSTD;
else if(stream_has_capability(rpt, STREAM_CAP_LZ4))
rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_LZ4;
else if(stream_has_capability(rpt, STREAM_CAP_GZIP))
rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_GZIP;
else
rpt->decompressor.algorithm = COMPRESSION_ALGORITHM_NONE;
if(rpt->decompressor.algorithm != COMPRESSION_ALGORITHM_NONE) {
rrdpush_decompressor_init(&rpt->decompressor);
return true;
}
return false;
}

View File

@ -18,12 +18,14 @@
#define STREAM_OLD_VERSION_CLAIM 3
#define STREAM_OLD_VERSION_CLABELS 4
#define STREAM_OLD_VERSION_COMPRESSION 5 // this is production
#define STREAM_OLD_VERSION_LZ4 5
// ----------------------------------------------------------------------------
// capabilities negotiation
typedef enum {
STREAM_CAP_NONE = 0,
// do not use the first 3 bits
// they used to be versions 1, 2 and 3
// before we introduce capabilities
@ -38,7 +40,7 @@ typedef enum {
STREAM_CAP_HLABELS = (1 << 7), // host labels supported
STREAM_CAP_CLAIM = (1 << 8), // claiming supported
STREAM_CAP_CLABELS = (1 << 9), // chart labels supported
STREAM_CAP_COMPRESSION = (1 << 10), // lz4 compression supported
STREAM_CAP_LZ4 = (1 << 10), // lz4 compression supported
STREAM_CAP_FUNCTIONS = (1 << 11), // plugin functions supported
STREAM_CAP_REPLICATION = (1 << 12), // replication supported
STREAM_CAP_BINARY = (1 << 13), // streaming supports binary data
@ -46,22 +48,39 @@ typedef enum {
STREAM_CAP_IEEE754 = (1 << 15), // streaming supports binary/hex transfer of double values
STREAM_CAP_DATA_WITH_ML = (1 << 16), // streaming supports transferring anomaly bit
STREAM_CAP_DYNCFG = (1 << 17), // dynamic configuration of plugins trough streaming
STREAM_CAP_ZSTD = (1 << 19), // ZSTD compression supported
STREAM_CAP_GZIP = (1 << 20), // GZIP compression supported
STREAM_CAP_INVALID = (1 << 30), // used as an invalid value for capabilities when this is set
// this must be signed int, so don't use the last bit
// needed for negotiating errors between parent and child
} STREAM_CAPABILITIES;
#ifdef ENABLE_RRDPUSH_COMPRESSION
#define STREAM_HAS_COMPRESSION STREAM_CAP_COMPRESSION
#ifdef ENABLE_LZ4
#define STREAM_CAP_LZ4_AVAILABLE STREAM_CAP_LZ4
#else
#define STREAM_HAS_COMPRESSION 0
#endif // ENABLE_RRDPUSH_COMPRESSION
#define STREAM_CAP_LZ4_AVAILABLE 0
#endif // ENABLE_LZ4
#ifdef ENABLE_ZSTD
#define STREAM_CAP_ZSTD_AVAILABLE STREAM_CAP_ZSTD
#else
#define STREAM_CAP_ZSTD_AVAILABLE 0
#endif // ENABLE_ZSTD
#define STREAM_CAP_COMPRESSIONS_AVAILABLE (STREAM_CAP_LZ4_AVAILABLE|STREAM_CAP_ZSTD_AVAILABLE|STREAM_CAP_GZIP)
extern STREAM_CAPABILITIES globally_disabled_capabilities;
STREAM_CAPABILITIES stream_our_capabilities(RRDHOST *host, bool sender);
#define stream_has_capability(rpt, capability) ((rpt) && ((rpt)->capabilities & (capability)) == (capability))
static inline bool stream_has_more_than_one_capability_of(STREAM_CAPABILITIES caps, STREAM_CAPABILITIES mask) {
STREAM_CAPABILITIES common = (STREAM_CAPABILITIES)(caps & mask);
return (common & (common - 1)) != 0 && common != 0;
}
// ----------------------------------------------------------------------------
// stream handshake
@ -101,11 +120,15 @@ typedef enum {
STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN = -15,
STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT = -16,
STREAM_HANDSHAKE_DISCONNECT_PARSER_EXIT = -17,
STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_ERROR = -18,
STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR = -18,
STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED = -19,
STREAM_HANDSHAKE_DISCONNECT_RECEIVER_LEFT = -20,
STREAM_HANDSHAKE_DISCONNECT_ORPHAN_HOST = -21,
STREAM_HANDSHAKE_NON_STREAMABLE_HOST = -22,
STREAM_HANDSHAKE_DISCONNECT_NOT_SUFFICIENT_READ_BUFFER = -23,
STREAM_HANDSHAKE_DISCONNECT_SOCKET_EOF = -24,
STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_FAILED = -25,
STREAM_HANDSHAKE_DISCONNECT_SOCKET_READ_TIMEOUT = -26,
} STREAM_HANDSHAKE;
@ -120,100 +143,7 @@ typedef struct {
char *kernel_version;
} stream_encoded_t;
#ifdef ENABLE_RRDPUSH_COMPRESSION
// signature MUST end with a newline
#define RRDPUSH_COMPRESSION_SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
#define RRDPUSH_COMPRESSION_SIGNATURE_SIZE 4
struct compressor_state {
bool initialized;
char *compression_result_buffer;
size_t compression_result_buffer_size;
struct {
void *lz4_stream;
char *input_ring_buffer;
size_t input_ring_buffer_size;
size_t input_ring_buffer_pos;
} stream;
size_t (*compress)(struct compressor_state *state, const char *data, size_t size, char **buffer);
void (*destroy)(struct compressor_state **state);
};
void rrdpush_compressor_reset(struct compressor_state *state);
void rrdpush_compressor_destroy(struct compressor_state *state);
size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, char **out);
struct decompressor_state {
bool initialized;
size_t signature_size;
size_t total_compressed;
size_t total_uncompressed;
size_t packet_count;
struct {
void *lz4_stream;
char *buffer;
size_t size;
size_t write_at;
size_t read_at;
} stream;
};
void rrdpush_decompressor_destroy(struct decompressor_state *state);
void rrdpush_decompressor_reset(struct decompressor_state *state);
size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
static inline size_t rrdpush_decompress_decode_header(const char *data, size_t data_size) {
if (unlikely(!data || !data_size))
return 0;
if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE))
return 0;
uint32_t sign = *(uint32_t *)data;
if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE))
return 0;
size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
return length;
}
static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) {
if(unlikely(state->stream.read_at != state->stream.write_at))
fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");
return rrdpush_decompress_decode_header(header, header_size);
}
static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) {
if(unlikely(state->stream.read_at > state->stream.write_at))
fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
return state->stream.write_at - state->stream.read_at;
}
static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) {
if (unlikely(!state || !size || !dst))
return 0;
size_t remaining = rrdpush_decompressed_bytes_in_buffer(state);
if(unlikely(!remaining))
return 0;
size_t bytes_to_return = size;
if(bytes_to_return > remaining)
bytes_to_return = remaining;
memcpy(dst, state->stream.buffer + state->stream.read_at, bytes_to_return);
state->stream.read_at += bytes_to_return;
if(unlikely(state->stream.read_at > state->stream.write_at))
fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
return bytes_to_return;
}
#endif
#include "compression.h"
// Thread-local storage
// Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
@ -230,7 +160,6 @@ typedef enum __attribute__((packed)) {
typedef enum __attribute__((packed)) {
SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
} SENDER_FLAGS;
struct function_payload_state {
@ -263,6 +192,7 @@ struct sender_state {
char read_buffer[PLUGINSD_LINE_MAX + 1];
ssize_t read_len;
STREAM_CAPABILITIES capabilities;
STREAM_CAPABILITIES disabled_capabilities;
size_t sent_bytes_on_this_connection_per_type[STREAM_TRAFFIC_TYPE_MAX];
@ -274,9 +204,7 @@ struct sender_state {
uint16_t hops;
#ifdef ENABLE_RRDPUSH_COMPRESSION
struct compressor_state compressor;
#endif // ENABLE_RRDPUSH_COMPRESSION
#ifdef ENABLE_HTTPS
NETDATA_SSL ssl; // structure used to encrypt the connection
@ -421,6 +349,7 @@ struct receiver_state {
time_t rrdpush_replication_step;
char *rrdpush_destination; // DONT FREE - it is allocated in appconfig
unsigned int rrdpush_compression;
STREAM_CAPABILITIES compression_priorities[COMPRESSION_ALGORITHM_MAX];
} config;
#ifdef ENABLE_HTTPS
@ -429,9 +358,7 @@ struct receiver_state {
time_t replication_first_time_t;
#ifdef ENABLE_RRDPUSH_COMPRESSION
struct decompressor_state decompressor;
#endif // ENABLE_RRDPUSH_COMPRESSION
/*
struct {
uint32_t count;
@ -453,9 +380,7 @@ struct rrdpush_destinations {
};
extern unsigned int default_rrdpush_enabled;
#ifdef ENABLE_RRDPUSH_COMPRESSION
extern unsigned int default_rrdpush_compression_enabled;
#endif // ENABLE_RRDPUSH_COMPRESSION
extern char *default_rrdpush_destination;
extern char *default_rrdpush_api_key;
extern char *default_rrdpush_send_charts_matching;
@ -514,10 +439,6 @@ int connect_to_one_of_destinations(
void rrdpush_signal_sender_to_wake_up(struct sender_state *s);
#ifdef ENABLE_RRDPUSH_COMPRESSION
struct compressor_state *create_compressor();
#endif // ENABLE_RRDPUSH_COMPRESSION
void rrdpush_reset_destinations_postpone_time(RRDHOST *host);
const char *stream_handshake_error_to_string(STREAM_HANDSHAKE handshake_error);
void stream_capabilities_to_json_array(BUFFER *wb, STREAM_CAPABILITIES caps, const char *key);
@ -784,4 +705,7 @@ void rrdpush_send_dyncfg_reg_module(RRDHOST *host, const char *plugin_name, cons
void rrdpush_send_dyncfg_reg_job(RRDHOST *host, const char *plugin_name, const char *module_name, const char *job_name, enum job_type type, uint32_t flags);
void rrdpush_send_dyncfg_reset(RRDHOST *host, const char *plugin_name);
bool rrdpush_compression_initialize(struct sender_state *s);
bool rrdpush_decompression_initialize(struct receiver_state *rpt);
#endif //NETDATA_RRDPUSH_H

View File

@ -20,9 +20,12 @@
#define WORKER_SENDER_JOB_BUFFER_RATIO 15
#define WORKER_SENDER_JOB_BYTES_RECEIVED 16
#define WORKER_SENDER_JOB_BYTES_SENT 17
#define WORKER_SENDER_JOB_REPLAY_REQUEST 18
#define WORKER_SENDER_JOB_FUNCTION_REQUEST 19
#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 20
#define WORKER_SENDER_JOB_BYTES_COMPRESSED 18
#define WORKER_SENDER_JOB_BYTES_UNCOMPRESSED 19
#define WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO 20
#define WORKER_SENDER_JOB_REPLAY_REQUEST 21
#define WORKER_SENDER_JOB_FUNCTION_REQUEST 22
#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 23
#if WORKER_UTILIZATION_MAX_JOB_TYPES < 21
#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21
@ -66,7 +69,6 @@ BUFFER *sender_start(struct sender_state *s) {
static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
#ifdef ENABLE_RRDPUSH_COMPRESSION
/*
* In case of stream compression buffer overflow
* Inform the user through the error log file and
@ -74,12 +76,35 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host);
*/
static inline void deactivate_compression(struct sender_state *s) {
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION);
netdata_log_error("STREAM_COMPRESSION: Compression returned error, disabling it.");
s->flags &= ~SENDER_FLAG_COMPRESSION;
netdata_log_error("STREAM %s [send to %s]: Restarting connection without compression.", rrdhost_hostname(s->host), s->connected_to);
switch(s->compressor.algorithm) {
case COMPRESSION_ALGORITHM_MAX:
case COMPRESSION_ALGORITHM_NONE:
netdata_log_error("STREAM_COMPRESSION: compression error on 'host:%s' without any compression enabled. Ignoring error.",
rrdhost_hostname(s->host));
break;
case COMPRESSION_ALGORITHM_GZIP:
netdata_log_error("STREAM_COMPRESSION: GZIP compression error on 'host:%s'. Disabling GZIP for this node.",
rrdhost_hostname(s->host));
s->disabled_capabilities |= STREAM_CAP_GZIP;
break;
case COMPRESSION_ALGORITHM_LZ4:
netdata_log_error("STREAM_COMPRESSION: LZ4 compression error on 'host:%s'. Disabling ZSTD for this node.",
rrdhost_hostname(s->host));
s->disabled_capabilities |= STREAM_CAP_LZ4;
break;
case COMPRESSION_ALGORITHM_ZSTD:
netdata_log_error("STREAM_COMPRESSION: ZSTD compression error on 'host:%s'. Disabling ZSTD for this node.",
rrdhost_hostname(s->host));
s->disabled_capabilities |= STREAM_CAP_ZSTD;
break;
}
rrdpush_sender_thread_close_socket(s->host);
}
#endif
#define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3
@ -117,8 +142,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
s->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
}
#ifdef ENABLE_RRDPUSH_COMPRESSION
if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor.initialized) {
if (s->compressor.initialized) {
while(src_len) {
size_t size_to_compress = src_len;
@ -143,13 +167,13 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
}
}
char *dst;
const char *dst;
size_t dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
if (!dst_len) {
netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed. Resetting compressor and re-trying",
rrdhost_hostname(s->host), s->connected_to);
rrdpush_compressor_reset(&s->compressor);
rrdpush_compression_initialize(s);
dst_len = rrdpush_compress(&s->compressor, src, size_to_compress, &dst);
if(!dst_len) {
netdata_log_error("STREAM %s [send to %s]: COMPRESSION failed again. Deactivating compression",
@ -161,10 +185,25 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
}
}
if(cbuffer_add_unsafe(s->buffer, dst, dst_len))
rrdpush_signature_t signature = rrdpush_compress_encode_signature(dst_len);
#ifdef NETDATA_INTERNAL_CHECKS
// check if reversing the signature provides the same length
size_t decoded_dst_len = rrdpush_decompress_decode_signature((const char *)&signature, sizeof(signature));
if(decoded_dst_len != dst_len)
fatal("RRDPUSH COMPRESSION: invalid signature, original payload %zu bytes, "
"compressed payload length %zu bytes, but signature says payload is %zu bytes",
size_to_compress, dst_len, decoded_dst_len);
#endif
if(cbuffer_add_unsafe(s->buffer, (const char *)&signature, sizeof(signature)))
s->flags |= SENDER_FLAG_OVERFLOW;
else
s->sent_bytes_on_this_connection_per_type[type] += dst_len;
else {
if(cbuffer_add_unsafe(s->buffer, dst, dst_len))
s->flags |= SENDER_FLAG_OVERFLOW;
else
s->sent_bytes_on_this_connection_per_type[type] += dst_len + sizeof(signature);
}
src = src + size_to_compress;
src_len -= size_to_compress;
@ -174,12 +213,6 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
s->flags |= SENDER_FLAG_OVERFLOW;
else
s->sent_bytes_on_this_connection_per_type[type] += src_len;
#else
if(cbuffer_add_unsafe(s->buffer, src, src_len))
s->flags |= SENDER_FLAG_OVERFLOW;
else
s->sent_bytes_on_this_connection_per_type[type] += src_len;
#endif
replication_recalculate_buffer_used_ratio_unsafe(s);
@ -600,12 +633,6 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
// reset our capabilities to default
s->capabilities = stream_our_capabilities(host, true);
#ifdef ENABLE_RRDPUSH_COMPRESSION
// If we don't want compression, remove it from our capabilities
if(!(s->flags & SENDER_FLAG_COMPRESSION))
s->capabilities &= ~STREAM_CAP_COMPRESSION;
#endif // ENABLE_RRDPUSH_COMPRESSION
/* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the
version negotiation resulted in a high enough version.
*/
@ -766,12 +793,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
if(!rrdpush_sender_validate_response(host, s, http, bytes))
return false;
#ifdef ENABLE_RRDPUSH_COMPRESSION
if(stream_has_capability(s, STREAM_CAP_COMPRESSION))
rrdpush_compressor_reset(&s->compressor);
else
rrdpush_compressor_destroy(&s->compressor);
#endif // ENABLE_RRDPUSH_COMPRESSION
rrdpush_compression_initialize(s);
log_sender_capabilities(s);
@ -1303,6 +1325,9 @@ void *rrdpush_sender_thread(void *ptr) {
worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENT);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENT);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_COMPRESSED, "bytes compressed", "bytes/s", WORKER_METRIC_INCREMENTAL_TOTAL);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_UNCOMPRESSED, "bytes uncompressed", "bytes/s", WORKER_METRIC_INCREMENTAL_TOTAL);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO, "cumulative compression savings ratio", "%", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE);
struct sender_state *s = ptr;
@ -1423,6 +1448,15 @@ void *rrdpush_sender_thread(void *ptr) {
rrdpush_sender_pipe_clear_pending_data(s);
rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false);
}
if(s->compressor.initialized) {
size_t bytes_uncompressed = s->compressor.sender_locked.total_uncompressed;
size_t bytes_compressed = s->compressor.sender_locked.total_compressed + s->compressor.sender_locked.total_compressions * sizeof(rrdpush_signature_t);
NETDATA_DOUBLE ratio = 100.0 - ((NETDATA_DOUBLE)bytes_compressed * 100.0 / (NETDATA_DOUBLE)bytes_uncompressed);
worker_set_metric(WORKER_SENDER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_uncompressed);
worker_set_metric(WORKER_SENDER_JOB_BYTES_COMPRESSED, (NETDATA_DOUBLE)bytes_compressed);
worker_set_metric(WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO, ratio);
}
sender_unlock(s);
worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size);

View File

@ -168,11 +168,14 @@
# Stream Compression
# By default it is enabled.
# You can control stream compression in this parent agent stream with options: yes | no
#enable compression = yes
enable compression = yes
# select the order the compression algorithms will be used, when multiple are offered by the child
compression algorithms order = zstd lz4 gzip
# Replication
# Enable replication for all hosts using this api key. Default: enabled
#enable replication = yes
enable replication = yes
# How many seconds to replicate from each child. Default: a day
#seconds to replicate = 86400

View File

@ -1272,12 +1272,8 @@ inline int web_client_api_request_v1_info_fill_buffer(RRDHOST *host, BUFFER *wb)
buffer_json_member_add_boolean(wb, "web-enabled", web_server_mode != WEB_SERVER_MODE_NONE);
buffer_json_member_add_boolean(wb, "stream-enabled", default_rrdpush_enabled);
#ifdef ENABLE_RRDPUSH_COMPRESSION
buffer_json_member_add_boolean(wb, "stream-compression",
host->sender && stream_has_capability(host->sender, STREAM_CAP_COMPRESSION));
#else // ! ENABLE_RRDPUSH_COMPRESSION
buffer_json_member_add_boolean(wb, "stream-compression", false);
#endif // ENABLE_RRDPUSH_COMPRESSION
host->sender && host->sender->compressor.initialized);
#ifdef ENABLE_HTTPS
buffer_json_member_add_boolean(wb, "https-enabled", true);