Improve ACLK according to results of the smoke-test. (#8358)

* Cleaning up the ACLK part 2 (#8187)

* Initial proxy support implementation (#8146)

* Implement the ACLK Challenge-Response Authentication (#8317)

Co-authored-by: Timo <6674623+underhood@users.noreply.github.com>
This commit is contained in:
Andrew Moss 2020-03-10 12:21:32 +01:00 committed by GitHub
parent 3e272d1cff
commit 4acc880bab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1286 additions and 811 deletions

View File

@ -472,6 +472,8 @@ CLAIM_PLUGIN_FILES = \
$(NULL) $(NULL)
ACLK_PLUGIN_FILES = \ ACLK_PLUGIN_FILES = \
aclk/aclk_common.c \
aclk/aclk_common.h \
aclk/agent_cloud_link.c \ aclk/agent_cloud_link.c \
aclk/agent_cloud_link.h \ aclk/agent_cloud_link.h \
aclk/mqtt.c \ aclk/mqtt.c \

35
aclk/aclk_common.c Normal file
View File

@ -0,0 +1,35 @@
#include "aclk_common.h"
struct {
ACLK_PROXY_TYPE type;
const char *url_str;
} supported_proxy_types[] = {
{ .type = PROXY_TYPE_SOCKS5, .url_str = "socks5" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
{ .type = PROXY_TYPE_SOCKS5, .url_str = "socks5h" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
{ .type = PROXY_TYPE_UNKNOWN, .url_str = NULL },
};
static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string)
{
int i = 0;
while( supported_proxy_types[i].url_str ) {
if(!strncmp(supported_proxy_types[i].url_str, string, strlen(supported_proxy_types[i].url_str)))
return supported_proxy_types[i].type;
i++;
}
return PROXY_TYPE_UNKNOWN;
}
ACLK_PROXY_TYPE aclk_verify_proxy(const char *string)
{
if(!string)
return PROXY_TYPE_UNKNOWN;
while(*string == 0x20)
string++;
if(!*string)
return PROXY_TYPE_UNKNOWN;
return aclk_find_proxy(string);
}

20
aclk/aclk_common.h Normal file
View File

@ -0,0 +1,20 @@
#ifndef ACLK_COMMON_H
#define ACLK_COMMON_H
#include "libnetdata/libnetdata.h"
typedef enum aclk_proxy_type {
PROXY_TYPE_UNKNOWN = 0,
PROXY_TYPE_SOCKS5,
PROXY_TYPE_HTTP,
PROXY_DISABLED,
PROXY_NOT_SET,
} ACLK_PROXY_TYPE;
#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://"
#define ACLK_PROXY_ENV "env"
#define ACLK_PROXY_CONFIG_VAR "proxy"
ACLK_PROXY_TYPE aclk_verify_proxy(const char *string);
#endif //ACLK_COMMON_H

View File

@ -1,6 +1,8 @@
#include "aclk_lws_wss_client.h" #include "aclk_lws_wss_client.h"
#include "libnetdata/libnetdata.h" #include "libnetdata/libnetdata.h"
#include "../daemon/common.h"
#include "aclk_common.h"
static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
@ -14,6 +16,8 @@ struct lws_wss_packet_buffer {
struct lws_wss_packet_buffer *next; struct lws_wss_packet_buffer *next;
}; };
static struct aclk_lws_wss_engine_instance *engine_instance = NULL;
static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void *data, size_t size) static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void *data, size_t size)
{ {
struct lws_wss_packet_buffer *new = callocz(1, sizeof(struct lws_wss_packet_buffer)); struct lws_wss_packet_buffer *new = callocz(1, sizeof(struct lws_wss_packet_buffer));
@ -69,27 +73,22 @@ static inline void _aclk_lws_wss_write_buffer_clear(struct lws_wss_packet_buffer
*list = NULL; *list = NULL;
} }
static inline void aclk_lws_wss_clear_io_buffers(struct aclk_lws_wss_engine_instance *inst) static inline void aclk_lws_wss_clear_io_buffers()
{ {
aclk_lws_mutex_lock(&inst->read_buf_mutex); aclk_lws_mutex_lock(&engine_instance->read_buf_mutex);
_aclk_lws_wss_read_buffer_clear(inst->read_ringbuffer); _aclk_lws_wss_read_buffer_clear(engine_instance->read_ringbuffer);
aclk_lws_mutex_unlock(&inst->read_buf_mutex); aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
aclk_lws_mutex_lock(&inst->write_buf_mutex); aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
_aclk_lws_wss_write_buffer_clear(&inst->write_buffer_head); _aclk_lws_wss_write_buffer_clear(&engine_instance->write_buffer_head);
aclk_lws_mutex_unlock(&inst->write_buf_mutex); aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
} }
static const struct lws_protocols protocols[] = { static const struct lws_protocols protocols[] = { { "aclk-wss", aclk_lws_wss_callback,
{ sizeof(struct aclk_lws_wss_perconnect_data), 0, 0, 0, 0 },
"aclk-wss", { NULL, NULL, 0, 0, 0, 0, 0 } };
aclk_lws_wss_callback,
sizeof(struct aclk_lws_wss_perconnect_data),
0, 0, 0, 0
},
{ NULL, NULL, 0, 0, 0, 0, 0 }
};
static void aclk_lws_wss_log_divert(int level, const char *line) { static void aclk_lws_wss_log_divert(int level, const char *line)
{
switch (level) { switch (level) {
case LLL_ERR: case LLL_ERR:
error("Libwebsockets Error: %s", line); error("Libwebsockets Error: %s", line);
@ -102,89 +101,218 @@ static void aclk_lws_wss_log_divert(int level, const char *line) {
} }
} }
struct aclk_lws_wss_engine_instance* aclk_lws_wss_client_init (const struct aclk_lws_wss_engine_callbacks *callbacks, const char *target_hostname, int target_port) { static int aclk_lws_wss_client_init( char *target_hostname, int target_port)
{
static int lws_logging_initialized = 0; static int lws_logging_initialized = 0;
struct lws_context_creation_info info; struct lws_context_creation_info info;
struct aclk_lws_wss_engine_instance *inst;
if (unlikely(!lws_logging_initialized)) { if (unlikely(!lws_logging_initialized)) {
lws_set_log_level(LLL_ERR | LLL_WARN, aclk_lws_wss_log_divert); lws_set_log_level(LLL_ERR | LLL_WARN, aclk_lws_wss_log_divert);
lws_logging_initialized = 1; lws_logging_initialized = 1;
} }
if(!callbacks || !target_hostname) if (!target_hostname)
return NULL; return 1;
inst = callocz(1, sizeof(struct aclk_lws_wss_engine_instance)); engine_instance = callocz(1, sizeof(struct aclk_lws_wss_engine_instance));
inst->host = target_hostname; engine_instance->host = target_hostname;
inst->port = target_port; engine_instance->port = target_port;
memset(&info, 0, sizeof(struct lws_context_creation_info)); memset(&info, 0, sizeof(struct lws_context_creation_info));
info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT; info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
info.port = CONTEXT_PORT_NO_LISTEN; info.port = CONTEXT_PORT_NO_LISTEN;
info.protocols = protocols; info.protocols = protocols;
info.user = inst;
inst->lws_context = lws_create_context(&info); engine_instance->lws_context = lws_create_context(&info);
if(!inst->lws_context) if (!engine_instance->lws_context)
goto failure_cleanup_2; goto failure_cleanup_2;
inst->callbacks = *callbacks; aclk_lws_mutex_init(&engine_instance->write_buf_mutex);
aclk_lws_mutex_init(&engine_instance->read_buf_mutex);
aclk_lws_mutex_init(&inst->write_buf_mutex); engine_instance->read_ringbuffer = lws_ring_create(1, ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES, NULL);
aclk_lws_mutex_init(&inst->read_buf_mutex); if (!engine_instance->read_ringbuffer)
inst->read_ringbuffer = lws_ring_create(1, ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES, NULL);
if(!inst->read_ringbuffer)
goto failure_cleanup; goto failure_cleanup;
return inst; return 0;
failure_cleanup: failure_cleanup:
lws_context_destroy(inst->lws_context); lws_context_destroy(engine_instance->lws_context);
failure_cleanup_2: failure_cleanup_2:
freez(inst); freez(engine_instance);
return NULL; return 1;
} }
void aclk_lws_wss_client_destroy(struct aclk_lws_wss_engine_instance* inst) { void aclk_lws_wss_client_destroy(struct aclk_lws_wss_engine_instance *engine_instance)
lws_context_destroy(inst->lws_context); {
inst->lws_context = NULL; if (engine_instance == NULL)
inst->lws_wsi = NULL; return;
lws_context_destroy(engine_instance->lws_context);
engine_instance->lws_context = NULL;
engine_instance->lws_wsi = NULL;
aclk_lws_wss_clear_io_buffers(inst); aclk_lws_wss_clear_io_buffers(engine_instance);
#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED #ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED
pthread_mutex_destroy(&inst->write_buf_mutex); pthread_mutex_destroy(&engine_instance->write_buf_mutex);
pthread_mutex_destroy(&inst->read_buf_mutex); pthread_mutex_destroy(&engine_instance->read_buf_mutex);
#endif #endif
} }
void aclk_lws_wss_connect(struct aclk_lws_wss_engine_instance *inst){ static int _aclk_wss_set_socks(struct lws_vhost *vhost, const char *socks)
struct lws_client_connect_info i; {
char *proxy = strstr(socks, ACLK_PROXY_PROTO_ADDR_SEPARATOR);
if(inst->lws_wsi) { if(!proxy)
error("Already Connected. Only one connection supported at a time."); return -1;
return;
proxy += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR);
if(!*proxy)
return -1;
return lws_set_socks(vhost, proxy);
} }
// helper function to censor user&password
// for logging purposes
static void safe_log_proxy_censor(char *proxy) {
size_t length = strlen(proxy);
char *auth = proxy+length-1;
char *cur;
while( (auth >= proxy) && (*auth != '@') )
auth--;
//if not found or @ is first char do nothing
if(auth<=proxy)
return;
cur = strstr(proxy, ACLK_PROXY_PROTO_ADDR_SEPARATOR);
if(!cur)
cur = proxy;
else
cur += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR);
while(cur < auth) {
*cur='X';
cur++;
}
}
static inline void safe_log_proxy_error(char *str, const char *proxy) {
char *log = strdupz(proxy);
safe_log_proxy_censor(log);
error("%s Provided Value:\"%s\"", str, log);
freez(log);
}
static inline int check_socks_enviroment(const char **proxy) {
char *tmp = getenv("socks_proxy");
if(!tmp)
return 1;
if(aclk_verify_proxy(tmp) == PROXY_TYPE_SOCKS5) {
*proxy = tmp;
return 0;
}
safe_log_proxy_error("Environment var \"socks_proxy\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", tmp);
return 1;
}
static const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type) {
const char *proxy = config_get(CONFIG_SECTION_ACLK, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV);
*type = PROXY_DISABLED;
if(strcmp(proxy, "none") == 0)
return proxy;
if(strcmp(proxy, ACLK_PROXY_ENV) == 0) {
if(check_socks_enviroment(&proxy) == 0)
*type = PROXY_TYPE_SOCKS5;
return proxy;
}
*type = aclk_verify_proxy(proxy);
if(*type == PROXY_TYPE_UNKNOWN) {
*type = PROXY_DISABLED;
safe_log_proxy_error("Config var \"" ACLK_PROXY_CONFIG_VAR "\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".", proxy);
}
return proxy;
}
// Return code indicates if connection attempt has started async.
int aclk_lws_wss_connect(char *host, int port)
{
struct lws_client_connect_info i;
struct lws_vhost *vhost;
static const char *proxy = NULL;
static ACLK_PROXY_TYPE proxy_type = PROXY_NOT_SET;
char *log;
if (!engine_instance) {
return aclk_lws_wss_client_init(host, port);
// PROTOCOL_INIT callback will call again.
}
if(proxy_type == PROXY_NOT_SET)
proxy = aclk_lws_wss_get_proxy_setting(&proxy_type);
if (engine_instance->lws_wsi) {
error("Already Connected. Only one connection supported at a time.");
return 0;
}
// from LWS docu:
// If option LWS_SERVER_OPTION_EXPLICIT_VHOSTS is given, no vhost is
// created; you're expected to create your own vhosts afterwards using
// lws_create_vhost(). Otherwise a vhost named "default" is also created
// using the information in the vhost-related members, for compatibility.
vhost = lws_get_vhost_by_name(engine_instance->lws_context, "default");
if(!vhost)
fatal("Could not find the default LWS vhost.");
memset(&i, 0, sizeof(i)); memset(&i, 0, sizeof(i));
i.context = inst->lws_context; i.context = engine_instance->lws_context;
i.port = inst->port; i.port = engine_instance->port;
i.address = inst->host; i.address = engine_instance->host;
i.path = "/mqtt"; i.path = "/mqtt";
i.host = inst->host; i.host = engine_instance->host;
i.protocol = "mqtt"; i.protocol = "mqtt";
switch (proxy_type) {
case PROXY_DISABLED:
lws_set_socks(vhost, ":");
lws_set_proxy(vhost, ":");
break;
case PROXY_TYPE_SOCKS5:
log = strdupz(proxy);
safe_log_proxy_censor(log);
info("Connecting using SOCKS5 proxy:\"%s\"", log);
freez(log);
if(_aclk_wss_set_socks(vhost, proxy))
error("LWS failed to accept socks proxy.");
break;
default:
error("The proxy could not be set. Unknown proxy type.");
}
#ifdef ACLK_SSL_ALLOW_SELF_SIGNED #ifdef ACLK_SSL_ALLOW_SELF_SIGNED
i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK; i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK;
info("Disabling SSL certificate checks");
#else #else
i.ssl_connection = LCCSCF_USE_SSL; i.ssl_connection = LCCSCF_USE_SSL;
#endif #endif
lws_client_connect_via_info(&i); lws_client_connect_via_info(&i);
return 0;
} }
static inline int received_data_to_ringbuff(struct lws_ring *buffer, void* data, size_t len) { static inline int received_data_to_ringbuff(struct lws_ring *buffer, void *data, size_t len)
{
if (lws_ring_insert(buffer, data, len) != len) { if (lws_ring_insert(buffer, data, len) != len) {
error("ACLK_LWS_WSS_CLIENT: receive buffer full. Closing connection to prevent flooding."); error("ACLK_LWS_WSS_CLIENT: receive buffer full. Closing connection to prevent flooding.");
return 0; return 0;
@ -194,8 +322,7 @@ static inline int received_data_to_ringbuff(struct lws_ring *buffer, void* data,
static const char *aclk_lws_callback_name(enum lws_callback_reasons reason) static const char *aclk_lws_callback_name(enum lws_callback_reasons reason)
{ {
switch(reason) switch (reason) {
{
case LWS_CALLBACK_CLIENT_WRITEABLE: case LWS_CALLBACK_CLIENT_WRITEABLE:
return "LWS_CALLBACK_CLIENT_WRITEABLE"; return "LWS_CALLBACK_CLIENT_WRITEABLE";
case LWS_CALLBACK_CLIENT_RECEIVE: case LWS_CALLBACK_CLIENT_RECEIVE:
@ -216,61 +343,54 @@ static const char *aclk_lws_callback_name(enum lws_callback_reasons reason)
return "LWS_CALLBACK_WSI_DESTROY"; return "LWS_CALLBACK_WSI_DESTROY";
case LWS_CALLBACK_CLIENT_ESTABLISHED: case LWS_CALLBACK_CLIENT_ESTABLISHED:
return "LWS_CALLBACK_CLIENT_ESTABLISHED"; return "LWS_CALLBACK_CLIENT_ESTABLISHED";
case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
return "LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION";
default: default:
// Not using an internal buffer here for thread-safety with unknown calling context. // Not using an internal buffer here for thread-safety with unknown calling context.
error("Unknown LWS callback %u", reason); error("Unknown LWS callback %u", reason);
return "unknown"; return "unknown";
} }
} }
static int static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
{ {
UNUSED(user); UNUSED(user);
struct aclk_lws_wss_engine_instance *inst = lws_context_user(lws_get_context(wsi));
struct lws_wss_packet_buffer *data; struct lws_wss_packet_buffer *data;
int retval = 0; int retval = 0;
if( !inst ) {
error("Callback received without any aclk_lws_wss_engine_instance!");
return -1;
}
// Callback servicing is forced when we are closed from above. // Callback servicing is forced when we are closed from above.
if( inst->upstream_reconnect_request ) { if (engine_instance->upstream_reconnect_request) {
error("Closing lws connectino due to libmosquitto error."); error("Closing lws connectino due to libmosquitto error.");
char *upstream_connection_error = "MQTT protocol error. Closing underlying wss connection."; char *upstream_connection_error = "MQTT protocol error. Closing underlying wss connection.";
lws_close_reason(wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char*)upstream_connection_error, strlen(upstream_connection_error)); lws_close_reason(
wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *)upstream_connection_error,
strlen(upstream_connection_error));
retval = -1; retval = -1;
inst->upstream_reconnect_request = 0; engine_instance->upstream_reconnect_request = 0;
} }
// Don't log to info - volume is proportional to message flow on ACLK. // Don't log to info - volume is proportional to message flow on ACLK.
switch (reason) { switch (reason) {
case LWS_CALLBACK_CLIENT_WRITEABLE: case LWS_CALLBACK_CLIENT_WRITEABLE:
aclk_lws_mutex_lock(&inst->write_buf_mutex); aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
data = lws_wss_packet_buffer_pop(&inst->write_buffer_head); data = lws_wss_packet_buffer_pop(&engine_instance->write_buffer_head);
if (likely(data)) { if (likely(data)) {
lws_write(wsi, data->data + LWS_PRE, data->data_size, LWS_WRITE_BINARY); lws_write(wsi, data->data + LWS_PRE, data->data_size, LWS_WRITE_BINARY);
lws_wss_packet_buffer_free(data); lws_wss_packet_buffer_free(data);
if(inst->write_buffer_head) if (engine_instance->write_buffer_head)
lws_callback_on_writable(inst->lws_wsi); lws_callback_on_writable(engine_instance->lws_wsi);
} }
aclk_lws_mutex_unlock(&inst->write_buf_mutex); aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
return retval; return retval;
case LWS_CALLBACK_CLIENT_RECEIVE: case LWS_CALLBACK_CLIENT_RECEIVE:
aclk_lws_mutex_lock(&inst->read_buf_mutex); aclk_lws_mutex_lock(&engine_instance->read_buf_mutex);
if(!received_data_to_ringbuff(inst->read_ringbuffer, in, len)) if (!received_data_to_ringbuff(engine_instance->read_ringbuffer, in, len))
retval = 1; retval = 1;
aclk_lws_mutex_unlock(&inst->read_buf_mutex); aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
if(likely(inst->callbacks.data_rcvd_callback))
// to future myself -> do not call this while read lock is active as it will eventually // to future myself -> do not call this while read lock is active as it will eventually
// want to acquire same lock later in aclk_lws_wss_client_read() function // want to acquire same lock later in aclk_lws_wss_client_read() function
inst->callbacks.data_rcvd_callback(); aclk_lws_connection_data_received();
else
inst->data_to_read = 1; //to inform logic above there is reason to call mosquitto_loop_read
return retval; return retval;
case LWS_CALLBACK_WSI_CREATE: case LWS_CALLBACK_WSI_CREATE:
@ -279,74 +399,73 @@ aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason,
case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS:
case LWS_CALLBACK_GET_THREAD_ID: // ? case LWS_CALLBACK_GET_THREAD_ID: // ?
case LWS_CALLBACK_EVENT_WAIT_CANCELLED: case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
// Expected and safe to ignore. // Expected and safe to ignore.
debug(D_ACLK, "Ignoring expected callback from LWS: %s", aclk_lws_callback_name(reason));
return retval; return retval;
default: default:
// Pass to next switch, this case removes compiler warnings. // Pass to next switch, this case removes compiler warnings.
break; break;
} }
// Log to info - volume is proportional to connection attempts. // Log to info - volume is proportional to connection attempts.
info("Processing callback %s", aclk_lws_callback_name(reason)); info("Processing callback %s", aclk_lws_callback_name(reason));
switch (reason) { switch (reason) {
case LWS_CALLBACK_PROTOCOL_INIT: case LWS_CALLBACK_PROTOCOL_INIT:
aclk_lws_wss_connect(inst); // Makes the outgoing connection aclk_lws_wss_connect(engine_instance->host, engine_instance->port); // Makes the outgoing connection
break; break;
case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
if (inst->lws_wsi != NULL && inst->lws_wsi != wsi) if (engine_instance->lws_wsi != NULL && engine_instance->lws_wsi != wsi)
error("Multiple connections on same WSI? %p vs %p", inst->lws_wsi, wsi); error("Multiple connections on same WSI? %p vs %p", engine_instance->lws_wsi, wsi);
inst->lws_wsi = wsi; engine_instance->lws_wsi = wsi;
break; break;
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
error("Could not connect MQTT over WSS server \"%s:%d\". LwsReason:\"%s\"", inst->host, inst->port, (in ? (char*)in : "not given")); error(
"Could not connect MQTT over WSS server \"%s:%d\". LwsReason:\"%s\"", engine_instance->host,
engine_instance->port, (in ? (char *)in : "not given"));
// Fall-through // Fall-through
case LWS_CALLBACK_CLIENT_CLOSED: case LWS_CALLBACK_CLIENT_CLOSED:
case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
inst->lws_wsi = NULL; // inside libwebsockets lws_close_free_wsi is called after callback engine_instance->lws_wsi = NULL; // inside libwebsockets lws_close_free_wsi is called after callback
if (inst->callbacks.connection_closed) aclk_lws_connection_closed();
inst->callbacks.connection_closed();
return -1; // the callback response is ignored, hope the above remains true return -1; // the callback response is ignored, hope the above remains true
case LWS_CALLBACK_WSI_DESTROY: case LWS_CALLBACK_WSI_DESTROY:
aclk_lws_wss_clear_io_buffers(inst); aclk_lws_wss_clear_io_buffers(engine_instance);
inst->lws_wsi = NULL; engine_instance->lws_wsi = NULL;
inst->websocket_connection_up = 0; engine_instance->websocket_connection_up = 0;
if (inst->callbacks.connection_closed) aclk_lws_connection_closed();
inst->callbacks.connection_closed();
break; break;
case LWS_CALLBACK_CLIENT_ESTABLISHED: case LWS_CALLBACK_CLIENT_ESTABLISHED:
inst->websocket_connection_up = 1; engine_instance->websocket_connection_up = 1;
if(inst->callbacks.connection_established_callback) aclk_lws_connection_established(engine_instance->host, engine_instance->port);
inst->callbacks.connection_established_callback();
break; break;
default: default:
error("Unexecpted callback from libwebsockets %s",aclk_lws_callback_name(reason)); error("Unexpected callback from libwebsockets %s", aclk_lws_callback_name(reason));
break; break;
} }
return retval; //0-OK, other connection should be closed! return retval; //0-OK, other connection should be closed!
} }
int aclk_lws_wss_client_write(struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count) int aclk_lws_wss_client_write(void *buf, size_t count)
{ {
if(inst && inst->lws_wsi && inst->websocket_connection_up) if (engine_instance && engine_instance->lws_wsi && engine_instance->websocket_connection_up) {
{ aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
aclk_lws_mutex_lock(&inst->write_buf_mutex); lws_wss_packet_buffer_append(&engine_instance->write_buffer_head, lws_wss_packet_buffer_new(buf, count));
lws_wss_packet_buffer_append(&inst->write_buffer_head, lws_wss_packet_buffer_new(buf, count)); aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
aclk_lws_mutex_unlock(&inst->write_buf_mutex);
lws_callback_on_writable(inst->lws_wsi); lws_callback_on_writable(engine_instance->lws_wsi);
return count; return count;
} }
return 0; return 0;
} }
int aclk_lws_wss_client_read(struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count) int aclk_lws_wss_client_read(void *buf, size_t count)
{ {
size_t data_to_be_read = count; size_t data_to_be_read = count;
aclk_lws_mutex_lock(&inst->read_buf_mutex); aclk_lws_mutex_lock(&engine_instance->read_buf_mutex);
size_t readable_byte_count = lws_ring_get_count_waiting_elements(inst->read_ringbuffer, NULL); size_t readable_byte_count = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL);
if (unlikely(readable_byte_count == 0)) { if (unlikely(readable_byte_count == 0)) {
errno = EAGAIN; errno = EAGAIN;
data_to_be_read = -1; data_to_be_read = -1;
@ -356,27 +475,31 @@ int aclk_lws_wss_client_read(struct aclk_lws_wss_engine_instance *inst, void *bu
if (readable_byte_count < data_to_be_read) if (readable_byte_count < data_to_be_read)
data_to_be_read = readable_byte_count; data_to_be_read = readable_byte_count;
data_to_be_read = lws_ring_consume(inst->read_ringbuffer, NULL, buf, data_to_be_read); data_to_be_read = lws_ring_consume(engine_instance->read_ringbuffer, NULL, buf, data_to_be_read);
if (data_to_be_read == readable_byte_count) if (data_to_be_read == readable_byte_count)
inst->data_to_read = 0; engine_instance->data_to_read = 0;
abort: abort:
aclk_lws_mutex_unlock(&inst->read_buf_mutex); aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
return data_to_be_read; return data_to_be_read;
} }
int aclk_lws_wss_service_loop(struct aclk_lws_wss_engine_instance *inst) void aclk_lws_wss_service_loop()
{ {
return lws_service(inst->lws_context, 0); if (engine_instance)
lws_service(engine_instance->lws_context, 0);
} }
// in case the MQTT connection disconnect while lws transport is still operational // in case the MQTT connection disconnect while lws transport is still operational
// we should drop connection and reconnect // we should drop connection and reconnect
// this function should be called when that happens to notify lws of that situation // this function should be called when that happens to notify lws of that situation
void aclk_lws_wss_mqtt_layer_disconect_notif(struct aclk_lws_wss_engine_instance *inst) void aclk_lws_wss_mqtt_layer_disconect_notif()
{ {
if(inst->lws_wsi && inst->websocket_connection_up) { if (!engine_instance)
inst->upstream_reconnect_request = 1; return;
lws_callback_on_writable(inst->lws_wsi); //here we just do it to ensure we get callback called from lws, we don't need any actual data to be written. if (engine_instance->lws_wsi && engine_instance->websocket_connection_up) {
engine_instance->upstream_reconnect_request = 1;
lws_callback_on_writable(
engine_instance->lws_wsi); //here we just do it to ensure we get callback called from lws, we don't need any actual data to be written.
} }
} }

View File

@ -5,8 +5,6 @@
#include "libnetdata/libnetdata.h" #include "libnetdata/libnetdata.h"
#define ACLK_LWS_WSS_RECONNECT_TIMEOUT 5
// This is as define because ideally the ACLK at high level // This is as define because ideally the ACLK at high level
// can do mosqitto writes and reads only from one thread // can do mosqitto writes and reads only from one thread
// which is cleaner implementation IMHO // which is cleaner implementation IMHO
@ -14,7 +12,7 @@
// is simpler // is simpler
#define ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED 1 #define ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED 1
#define ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES 128*1024 #define ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES (128 * 1024)
#ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED #ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED
#define aclk_lws_mutex_init(x) netdata_mutex_init(x) #define aclk_lws_mutex_init(x) netdata_mutex_init(x)
@ -37,7 +35,7 @@ struct lws_wss_packet_buffer;
struct aclk_lws_wss_engine_instance { struct aclk_lws_wss_engine_instance {
//target host/port for connection //target host/port for connection
const char *host; char *host;
int port; int port;
//internal data //internal data
@ -52,8 +50,6 @@ struct aclk_lws_wss_engine_instance {
struct lws_wss_packet_buffer *write_buffer_head; struct lws_wss_packet_buffer *write_buffer_head;
struct lws_ring *read_ringbuffer; struct lws_ring *read_ringbuffer;
struct aclk_lws_wss_engine_callbacks callbacks;
//flags to be readed by engine user //flags to be readed by engine user
int websocket_connection_up; int websocket_connection_up;
@ -63,15 +59,20 @@ struct aclk_lws_wss_engine_instance {
int upstream_reconnect_request; int upstream_reconnect_request;
}; };
struct aclk_lws_wss_engine_instance* aclk_lws_wss_client_init (const struct aclk_lws_wss_engine_callbacks *callbacks, const char *target_hostname, int target_port); void aclk_lws_wss_client_destroy();
void aclk_lws_wss_client_destroy(struct aclk_lws_wss_engine_instance* inst);
void aclk_lws_wss_connect(struct aclk_lws_wss_engine_instance *inst); int aclk_lws_wss_connect(char *host, int port);
int aclk_lws_wss_client_write(struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count); int aclk_lws_wss_client_write(void *buf, size_t count);
int aclk_lws_wss_client_read (struct aclk_lws_wss_engine_instance *inst, void *buf, size_t count); int aclk_lws_wss_client_read(void *buf, size_t count);
int aclk_lws_wss_service_loop(struct aclk_lws_wss_engine_instance *inst); void aclk_lws_wss_service_loop();
void aclk_lws_wss_mqtt_layer_disconect_notif();
// Notifications inside the layer above
void aclk_lws_connection_established();
void aclk_lws_connection_data_received();
void aclk_lws_connection_closed();
void aclk_lws_wss_mqtt_layer_disconect_notif(struct aclk_lws_wss_engine_instance *inst);
#endif #endif

File diff suppressed because it is too large Load Diff

View File

@ -8,11 +8,6 @@
#define ACLK_VERSION 1 #define ACLK_VERSION 1
#define ACLK_THREAD_NAME "ACLK_Query" #define ACLK_THREAD_NAME "ACLK_Query"
#define ACLK_JSON_IN_MSGID "msg-id"
#define ACLK_JSON_IN_TYPE "type"
#define ACLK_JSON_IN_VERSION "version"
#define ACLK_JSON_IN_TOPIC "callback-topic"
#define ACLK_JSON_IN_URL "payload"
#define ACLK_CHART_TOPIC "chart" #define ACLK_CHART_TOPIC "chart"
#define ACLK_ALARMS_TOPIC "alarms" #define ACLK_ALARMS_TOPIC "alarms"
#define ACLK_METADATA_TOPIC "meta" #define ACLK_METADATA_TOPIC "meta"
@ -34,8 +29,6 @@
#define ACLK_DEFAULT_PORT 9002 #define ACLK_DEFAULT_PORT 9002
#define ACLK_DEFAULT_HOST "localhost" #define ACLK_DEFAULT_HOST "localhost"
#define CONFIG_SECTION_ACLK "agent_cloud_link"
struct aclk_request { struct aclk_request {
char *type_id; char *type_id;
char *msg_id; char *msg_id;
@ -44,7 +37,6 @@ struct aclk_request {
int version; int version;
}; };
typedef enum aclk_cmd { typedef enum aclk_cmd {
ACLK_CMD_CLOUD, ACLK_CMD_CLOUD,
ACLK_CMD_ONCONNECT, ACLK_CMD_ONCONNECT,
@ -56,24 +48,29 @@ typedef enum aclk_cmd {
ACLK_CMD_MAX ACLK_CMD_MAX
} ACLK_CMD; } ACLK_CMD;
typedef enum aclk_init_action { typedef enum aclk_metadata_state {
ACLK_INIT, ACLK_METADATA_REQUIRED,
ACLK_REINIT ACLK_METADATA_CMD_QUEUED,
} ACLK_INIT_ACTION; ACLK_METADATA_SENT
} ACLK_METADATA_STATE;
typedef enum agent_state {
AGENT_INITIALIZING,
AGENT_STABLE
} AGENT_STATE;
typedef enum aclk_init_action { ACLK_INIT, ACLK_REINIT } ACLK_INIT_ACTION;
void *aclk_main(void *ptr); void *aclk_main(void *ptr);
#define NETDATA_ACLK_HOOK \ #define NETDATA_ACLK_HOOK \
{ \ { .name = "ACLK_Main", \
.name = "ACLK_Main", \
.config_section = NULL, \ .config_section = NULL, \
.config_name = NULL, \ .config_name = NULL, \
.enabled = 1, \ .enabled = 1, \
.thread = NULL, \ .thread = NULL, \
.init_routine = NULL, \ .init_routine = NULL, \
.start_routine = aclk_main \ .start_routine = aclk_main },
},
extern int aclk_send_message(char *sub_topic, char *message, char *msg_id); extern int aclk_send_message(char *sub_topic, char *message, char *msg_id);
@ -83,23 +80,21 @@ extern int aclk_send_message(char *sub_topic, char *message, char *msg_id);
extern char *is_agent_claimed(void); extern char *is_agent_claimed(void);
char *create_uuid(); char *create_uuid();
// callbacks for agent cloud link // callbacks for agent cloud link
int aclk_subscribe(char *topic, int qos); int aclk_subscribe(char *topic, int qos);
void aclk_shutdown(); void aclk_shutdown();
int cloud_to_agent_parse(JSON_ENTRY *e); int cloud_to_agent_parse(JSON_ENTRY *e);
void aclk_disconnect(void *conn); void aclk_disconnect();
void aclk_connect(void *conn); void aclk_connect();
int aclk_send_metadata(); int aclk_send_metadata();
int aclk_send_info_metadata(); int aclk_send_info_metadata();
int aclk_wait_for_initialization(); int aclk_wait_for_initialization();
char *create_publish_base_topic(); char *create_publish_base_topic();
void aclk_try_to_connect();
int aclk_send_single_chart(char *host, char *chart); int aclk_send_single_chart(char *host, char *chart);
int aclk_queue_query(char *token, char *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd); int aclk_queue_query(char *token, char *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd);
struct aclk_query *aclk_query_find(char *token, char *data, char *msg_id, struct aclk_query *
char *query, ACLK_CMD cmd, struct aclk_query **last_query); aclk_query_find(char *token, char *data, char *msg_id, char *query, ACLK_CMD cmd, struct aclk_query **last_query);
int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd); int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd);
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae); int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
void aclk_create_header(BUFFER *dest, char *type, char *msg_id); void aclk_create_header(BUFFER *dest, char *type, char *msg_id);
@ -116,5 +111,4 @@ extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST
void aclk_single_update_enable(); void aclk_single_update_enable();
void aclk_single_update_disable(); void aclk_single_update_disable();
#endif //NETDATA_AGENT_CLOUD_LINK_H #endif //NETDATA_AGENT_CLOUD_LINK_H

View File

@ -5,62 +5,14 @@
#include "mqtt.h" #include "mqtt.h"
#include "aclk_lws_wss_client.h" #include "aclk_lws_wss_client.h"
void (*_on_connect)(void *ptr) = NULL;
void (*_on_disconnect)(void *ptr) = NULL;
#ifndef ENABLE_ACLK
inline const char *_link_strerror(int rc)
{
UNUSED(rc);
return "no error";
}
int _link_event_loop(int timeout)
{
UNUSED(timeout);
return 0;
}
int _link_send_message(char *topic, char *message, int *mid)
{
UNUSED(topic);
UNUSED(message);
UNUSED(mid);
return 0;
}
int _link_subscribe(char *topic, int qos)
{
UNUSED(topic);
UNUSED(qos);
return 0;
}
void _link_shutdown()
{
return;
}
int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *))
{
UNUSED(aclk_hostname);
UNUSED(aclk_port);
UNUSED(on_connect);
UNUSED(on_disconnect);
return 0;
}
#else
struct mosquitto *mosq = NULL;
// Get a string description of the error
inline const char *_link_strerror(int rc) inline const char *_link_strerror(int rc)
{ {
return mosquitto_strerror(rc); return mosquitto_strerror(rc);
} }
static struct mosquitto *mosq = NULL;
void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
{ {
UNUSED(mosq); UNUSED(mosq);
@ -69,12 +21,6 @@ void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosqu
aclk_handle_cloud_request(msg->payload); aclk_handle_cloud_request(msg->payload);
} }
// This is not define because in future we might want to try plain
// MQTT as fallback ?
// e.g. try 1st MQTT-WSS, 2nd MQTT plain, 3rd https fallback...
int mqtt_over_websockets = 1;
struct aclk_lws_wss_engine_instance *lws_engine_instance = NULL;
void publish_callback(struct mosquitto *mosq, void *obj, int rc) void publish_callback(struct mosquitto *mosq, void *obj, int rc)
{ {
UNUSED(mosq); UNUSED(mosq);
@ -87,29 +33,26 @@ void publish_callback(struct mosquitto *mosq, void *obj, int rc)
void connect_callback(struct mosquitto *mosq, void *obj, int rc) void connect_callback(struct mosquitto *mosq, void *obj, int rc)
{ {
UNUSED(mosq);
UNUSED(obj); UNUSED(obj);
UNUSED(rc); UNUSED(rc);
info("Connection to cloud estabilished"); info("Connection to cloud estabilished");
aclk_connect();
aclk_mqtt_connected = 1;
_on_connect((void *)mosq);
return; return;
} }
void disconnect_callback(struct mosquitto *mosq, void *obj, int rc) void disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
{ {
UNUSED(mosq);
UNUSED(obj); UNUSED(obj);
UNUSED(rc); UNUSED(rc);
info("Connection to cloud failed"); info("Connection to cloud failed");
aclk_disconnect();
aclk_mqtt_connected = 0; aclk_lws_wss_mqtt_layer_disconect_notif();
_on_disconnect((void *)mosq);
if (mqtt_over_websockets && lws_engine_instance)
aclk_lws_wss_mqtt_layer_disconect_notif(lws_engine_instance);
return; return;
} }
@ -126,24 +69,25 @@ void _show_mqtt_info()
size_t _mqtt_external_write_hook(void *buf, size_t count) size_t _mqtt_external_write_hook(void *buf, size_t count)
{ {
return aclk_lws_wss_client_write(lws_engine_instance, buf, count); return aclk_lws_wss_client_write(buf, count);
} }
size_t _mqtt_external_read_hook(void *buf, size_t count) size_t _mqtt_external_read_hook(void *buf, size_t count)
{ {
return aclk_lws_wss_client_read(lws_engine_instance, buf, count); return aclk_lws_wss_client_read(buf, count);
} }
int _mqtt_lib_init(void (*on_connect)(void *), void (*on_disconnect)(void *)) int _mqtt_lib_init()
{ {
int rc; int rc;
int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version; //int libmosq_major, libmosq_minor, libmosq_revision, libmosq_version;
/* Commenting out now as it is unused - do not delete, this is needed for the on-prem version.
char *ca_crt; char *ca_crt;
char *server_crt; char *server_crt;
char *server_key; char *server_key;
// show library info so can have it in the logfile // show library info so can have it in the logfile
libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision); //libmosq_version = mosquitto_lib_version(&libmosq_major, &libmosq_minor, &libmosq_revision);
ca_crt = config_get(CONFIG_SECTION_ACLK, "agent cloud link cert", "*"); ca_crt = config_get(CONFIG_SECTION_ACLK, "agent cloud link cert", "*");
server_crt = config_get(CONFIG_SECTION_ACLK, "agent cloud link server cert", "*"); server_crt = config_get(CONFIG_SECTION_ACLK, "agent cloud link server cert", "*");
server_key = config_get(CONFIG_SECTION_ACLK, "agent cloud link server key", "*"); server_key = config_get(CONFIG_SECTION_ACLK, "agent cloud link server key", "*");
@ -162,6 +106,7 @@ int _mqtt_lib_init(void (*on_connect)(void *), void (*on_disconnect)(void *))
freez(server_key); freez(server_key);
server_key = NULL; server_key = NULL;
} }
*/
// info( // info(
// "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor, // "Detected libmosquitto library version %d, %d.%d.%d", libmosq_version, libmosq_major, libmosq_minor,
@ -172,24 +117,28 @@ int _mqtt_lib_init(void (*on_connect)(void *), void (*on_disconnect)(void *))
error("Failed to initialize MQTT (libmosquitto library)"); error("Failed to initialize MQTT (libmosquitto library)");
return 1; return 1;
} }
return 0;
}
mosq = mosquitto_new("anon", true, NULL); static int _mqtt_create_connection(char *username, char *password)
{
if (mosq != NULL)
mosquitto_destroy(mosq);
mosq = mosquitto_new(username, true, NULL);
if (unlikely(!mosq)) { if (unlikely(!mosq)) {
mosquitto_lib_cleanup(); mosquitto_lib_cleanup();
error("MQTT new structure -- %s", mosquitto_strerror(errno)); error("MQTT new structure -- %s", mosquitto_strerror(errno));
return 1; return MOSQ_ERR_UNKNOWN;
} }
_on_connect = on_connect;
_on_disconnect = on_disconnect;
mosquitto_connect_callback_set(mosq, connect_callback); mosquitto_connect_callback_set(mosq, connect_callback);
mosquitto_disconnect_callback_set(mosq, disconnect_callback); mosquitto_disconnect_callback_set(mosq, disconnect_callback);
mosquitto_publish_callback_set(mosq, publish_callback); mosquitto_publish_callback_set(mosq, publish_callback);
mosquitto_username_pw_set(mosq, NULL, NULL); info("Using challenge-response: %s / %s", username, password);
mosquitto_username_pw_set(mosq, username, password);
rc = mosquitto_threaded_set(mosq, 1); int rc = mosquitto_threaded_set(mosq, 1);
if (unlikely(rc != MOSQ_ERR_SUCCESS)) if (unlikely(rc != MOSQ_ERR_SUCCESS))
error("Failed to tune the thread model for libmoquitto (%s)", mosquitto_strerror(rc)); error("Failed to tune the thread model for libmoquitto (%s)", mosquitto_strerror(rc));
@ -202,19 +151,10 @@ int _mqtt_lib_init(void (*on_connect)(void *), void (*on_disconnect)(void *))
info("MQTT in flight messages set to 1 -- %s", mosquitto_strerror(rc)); info("MQTT in flight messages set to 1 -- %s", mosquitto_strerror(rc));
#endif #endif
if (!mqtt_over_websockets) {
rc = mosquitto_reconnect_delay_set(mosq, ACLK_RECONNECT_DELAY, ACLK_MAX_BACKOFF_DELAY, 1);
if (unlikely(rc != MOSQ_ERR_SUCCESS))
error("Failed to set the reconnect delay (%d) (%s)", rc, mosquitto_strerror(rc));
mosquitto_tls_set(mosq, ca_crt, NULL, server_crt, server_key, NULL);
}
return rc; return rc;
} }
int _link_mqtt_connect(char *aclk_hostname, int aclk_port) static int _link_mqtt_connect(char *aclk_hostname, int aclk_port)
{ {
int rc; int rc;
@ -234,9 +174,6 @@ static inline void _link_mosquitto_write()
{ {
int rc; int rc;
if (!mqtt_over_websockets)
return;
rc = mosquitto_loop_misc(mosq); rc = mosquitto_loop_misc(mosq);
if (unlikely(rc != MOSQ_ERR_SUCCESS)) if (unlikely(rc != MOSQ_ERR_SUCCESS))
debug(D_ACLK, "ACLK: failure during mosquitto_loop_misc %s", mosquitto_strerror(rc)); debug(D_ACLK, "ACLK: failure during mosquitto_loop_misc %s", mosquitto_strerror(rc));
@ -248,15 +185,13 @@ static inline void _link_mosquitto_write()
} }
} }
void aclk_lws_connect_notif_callback() void aclk_lws_connection_established(char *hostname, int port)
{ {
//the connection is done by LWS so this parameters dont matter _link_mqtt_connect(hostname, port); // Parameters only used for logging, lower layer connected.
//ig MQTT over LWS is used
_link_mqtt_connect(aclk_hostname, aclk_port);
_link_mosquitto_write(); _link_mosquitto_write();
} }
void aclk_lws_data_received_callback() void aclk_lws_connection_data_received()
{ {
int rc = mosquitto_loop_read(mosq, 1); int rc = mosquitto_loop_read(mosq, 1);
if (rc != MOSQ_ERR_SUCCESS) if (rc != MOSQ_ERR_SUCCESS)
@ -268,85 +203,31 @@ void aclk_lws_connection_closed()
aclk_disconnect(NULL); aclk_disconnect(NULL);
} }
static const struct aclk_lws_wss_engine_callbacks aclk_lws_engine_callbacks = {
.connection_established_callback = aclk_lws_connect_notif_callback,
.data_rcvd_callback = aclk_lws_data_received_callback,
.data_writable_callback = NULL,
.connection_closed = aclk_lws_connection_closed
};
int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *)) int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username, char *password)
{ {
int rc; if(aclk_lws_wss_connect(aclk_hostname, aclk_port))
return MOSQ_ERR_UNKNOWN;
aclk_lws_wss_service_loop();
if (mqtt_over_websockets) { int rc = _mqtt_create_connection(username, password);
// we will connect when WebSocket connection is up
// based on callback
if (!lws_engine_instance)
lws_engine_instance = aclk_lws_wss_client_init(&aclk_lws_engine_callbacks, aclk_hostname, aclk_port);
else
aclk_lws_wss_connect(lws_engine_instance);
aclk_lws_wss_service_loop(lws_engine_instance);
}
rc = _mqtt_lib_init(on_connect, on_disconnect);
if (rc!= MOSQ_ERR_SUCCESS) if (rc!= MOSQ_ERR_SUCCESS)
return rc; return rc;
if (mqtt_over_websockets) {
mosquitto_external_callbacks_set(mosq, _mqtt_external_write_hook, _mqtt_external_read_hook); mosquitto_external_callbacks_set(mosq, _mqtt_external_write_hook, _mqtt_external_read_hook);
if (!lws_engine_instance)
return 1;
else
return MOSQ_ERR_SUCCESS;
} else {
// if direct mqtt connection is used
// connect immediatelly
return _link_mqtt_connect(aclk_hostname, aclk_port);
}
}
static inline int _link_event_loop_wss()
{
if (unlikely(!lws_engine_instance)) {
return MOSQ_ERR_SUCCESS;
}
if (lws_engine_instance && lws_engine_instance->websocket_connection_up)
_link_mosquitto_write();
aclk_lws_wss_service_loop(lws_engine_instance);
// this is because if use LWS we don't want
// mqtt to reconnect by itself
return MOSQ_ERR_SUCCESS;
}
static inline int _link_event_loop_plain_mqtt(int timeout)
{
int rc;
rc = mosquitto_loop(mosq, timeout, 1);
if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
errno = 0;
error("Loop error code %d (%s)", rc, mosquitto_strerror(rc));
rc = mosquitto_reconnect(mosq);
if (unlikely(rc != MOSQ_ERR_SUCCESS)) {
error("Reconnect loop error code %d (%s)", rc, mosquitto_strerror(rc));
}
// TBD: Using delay
sleep_usec(USEC_PER_SEC * 10);
}
return rc; return rc;
} }
int _link_event_loop(int timeout) inline int _link_event_loop()
{ {
if (mqtt_over_websockets)
return _link_event_loop_wss();
return _link_event_loop_plain_mqtt(timeout); // TODO: Check if we need to flush undelivered messages from libmosquitto on new connection attempts (QoS=1).
_link_mosquitto_write();
aclk_lws_wss_service_loop();
// this is because if use LWS we don't want
// mqtt to reconnect by itself
return MOSQ_ERR_SUCCESS;
} }
void _link_shutdown() void _link_shutdown()
@ -366,12 +247,7 @@ void _link_shutdown()
mosquitto_destroy(mosq); mosquitto_destroy(mosq);
mosq = NULL; mosq = NULL;
if (lws_engine_instance) { aclk_lws_wss_client_destroy();
aclk_lws_wss_client_destroy(lws_engine_instance);
lws_engine_instance = NULL;
}
return;
} }
int _link_subscribe(char *topic, int qos) int _link_subscribe(char *topic, int qos)
@ -391,7 +267,6 @@ int _link_subscribe(char *topic, int qos)
} }
_link_mosquitto_write(); _link_mosquitto_write();
return 0; return 0;
} }
@ -418,9 +293,6 @@ int _link_send_message(char *topic, char *message, int *mid)
errno = 0; errno = 0;
error("MQTT message failed : %s", mosquitto_strerror(rc)); error("MQTT message failed : %s", mosquitto_strerror(rc));
} }
_link_mosquitto_write(); _link_mosquitto_write();
return rc; return rc;
} }
#endif

View File

@ -8,18 +8,15 @@
#endif #endif
void _show_mqtt_info(); void _show_mqtt_info();
int _link_event_loop(int timeout); int _link_event_loop();
void _link_shutdown(); void _link_shutdown();
int _link_lib_init(char *aclk_hostname, int aclk_port, void (*on_connect)(void *), void (*on_disconnect)(void *)); int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username, char *password);
//int _link_lib_init();
int _mqtt_lib_init();
int _link_subscribe(char *topic, int qos); int _link_subscribe(char *topic, int qos);
int _link_send_message(char *topic, char *message, int *mid); int _link_send_message(char *topic, char *message, int *mid);
const char *_link_strerror(int rc); const char *_link_strerror(int rc);
int aclk_handle_cloud_request(char *); int aclk_handle_cloud_request(char *);
extern int aclk_connection_initialized;
extern int aclk_mqtt_connected;
extern char *aclk_hostname;
extern int aclk_port;
#endif //NETDATA_MQTT_H #endif //NETDATA_MQTT_H

View File

@ -19,7 +19,7 @@ RUN pacman --noconfirm --needed -S autoconf \
pkgconfig \ pkgconfig \
python \ python \
libvirt \ libvirt \
libwebsockets \ cmake \
valgrind valgrind
ARG ACLK=no ARG ACLK=no

View File

@ -19,7 +19,7 @@ RUN pacman --noconfirm --needed -S autoconf \
pkgconfig \ pkgconfig \
python \ python \
libvirt \ libvirt \
libwebsockets cmake
ARG ACLK=no ARG ACLK=no
ARG EXTRA_CFLAGS ARG EXTRA_CFLAGS

View File

@ -7,13 +7,13 @@ services:
args: args:
- DISTRO=arch - DISTRO=arch
- VERSION=extras - VERSION=extras
image: arch_current_dev:latest image: arch_extras_dev:latest
command: > command: >
sh -c "echo -n 00000000-0000-0000-0000-000000000000 >/etc/netdata/claim.d/claimed_id && sh -c "echo -n 00000000-0000-0000-0000-000000000000 >/etc/netdata/claim.d/claimed_id &&
echo '[agent_cloud_link]' >>/etc/netdata/netdata.conf && echo '[agent_cloud_link]' >>/etc/netdata/netdata.conf &&
echo ' agent cloud link hostname = vernemq' >>/etc/netdata/netdata.conf && echo ' agent cloud link hostname = vernemq' >>/etc/netdata/netdata.conf &&
echo ' agent cloud link port = 9002' >>/etc/netdata/netdata.conf && echo ' agent cloud link port = 9002' >>/etc/netdata/netdata.conf &&
/usr/sbin/valgrind --leak-check=full /usr/sbin/netdata -D" /usr/sbin/valgrind --leak-check=full /usr/sbin/netdata -D -W debug_flags=0x200000000"
ports: ports:
- 20000:19999 - 20000:19999

View File

@ -36,8 +36,10 @@ extern struct registry registry;
/* rrd_init() must have been called before this function */ /* rrd_init() must have been called before this function */
void claim_agent(char *claiming_arguments) void claim_agent(char *claiming_arguments)
{ {
#ifndef ENABLE_ACLK
info("The claiming feature is under development and still subject to change before the next release"); info("The claiming feature is under development and still subject to change before the next release");
return; return;
#endif
int exit_code; int exit_code;
pid_t command_pid; pid_t command_pid;

View File

@ -93,6 +93,7 @@
#define CONFIG_SECTION_STREAM "stream" #define CONFIG_SECTION_STREAM "stream"
#define CONFIG_SECTION_EXPORTING "exporting:global" #define CONFIG_SECTION_EXPORTING "exporting:global"
#define CONFIG_SECTION_HOST_LABEL "host labels" #define CONFIG_SECTION_HOST_LABEL "host labels"
#define CONFIG_SECTION_ACLK "agent_cloud_link"
#define EXPORTING_CONF "exporting.conf" #define EXPORTING_CONF "exporting.conf"
// these are used to limit the configuration names and values lengths // these are used to limit the configuration names and values lengths

View File

@ -84,7 +84,7 @@ void security_openssl_common_options(SSL_CTX *ctx) {
* *
* @return It returns the context on success or NULL otherwise * @return It returns the context on success or NULL otherwise
*/ */
static SSL_CTX * security_initialize_openssl_client() { SSL_CTX * security_initialize_openssl_client() {
SSL_CTX *ctx; SSL_CTX *ctx;
#if OPENSSL_VERSION_NUMBER < 0x10100000L #if OPENSSL_VERSION_NUMBER < 0x10100000L
ctx = SSL_CTX_new(SSLv23_client_method()); ctx = SSL_CTX_new(SSLv23_client_method());

View File

@ -41,6 +41,7 @@ void security_clean_openssl();
void security_start_ssl(int selector); void security_start_ssl(int selector);
int security_process_accept(SSL *ssl,int msg); int security_process_accept(SSL *ssl,int msg);
int security_test_certificate(SSL *ssl); int security_test_certificate(SSL *ssl);
SSL_CTX * security_initialize_openssl_client();
# endif //ENABLE_HTTPS # endif //ENABLE_HTTPS
#endif //NETDATA_SECURITY_H #endif //NETDATA_SECURITY_H

View File

@ -607,7 +607,7 @@ static inline int connect_to_unix(const char *path, struct timeval *timeout) {
// service the service name or port to connect to // service the service name or port to connect to
// timeout the timeout for establishing a connection // timeout the timeout for establishing a connection
static inline int connect_to_this_ip46(int protocol, int socktype, const char *host, uint32_t scope_id, const char *service, struct timeval *timeout) { int connect_to_this_ip46(int protocol, int socktype, const char *host, uint32_t scope_id, const char *service, struct timeval *timeout) {
struct addrinfo hints; struct addrinfo hints;
struct addrinfo *ai_head = NULL, *ai = NULL; struct addrinfo *ai_head = NULL, *ai = NULL;

View File

@ -56,6 +56,7 @@ extern void listen_sockets_close(LISTEN_SOCKETS *sockets);
extern int connect_to_this(const char *definition, int default_port, struct timeval *timeout); extern int connect_to_this(const char *definition, int default_port, struct timeval *timeout);
extern int connect_to_one_of(const char *destination, int default_port, struct timeval *timeout, size_t *reconnects_counter, char *connected_to, size_t connected_to_size); extern int connect_to_one_of(const char *destination, int default_port, struct timeval *timeout, size_t *reconnects_counter, char *connected_to, size_t connected_to_size);
int connect_to_this_ip46(int protocol, int socktype, const char *host, uint32_t scope_id, const char *service, struct timeval *timeout);
#ifdef ENABLE_HTTPS #ifdef ENABLE_HTTPS
extern ssize_t recv_timeout(struct netdata_ssl *ssl,int sockfd, void *buf, size_t len, int flags, int timeout); extern ssize_t recv_timeout(struct netdata_ssl *ssl,int sockfd, void *buf, size_t len, int flags, int timeout);