fix(pubsub): Integrate the mqtt-c library as a Git submodule in 1.3

This commit is contained in:
Noel Graf 2022-06-14 09:38:16 +02:00 committed by Julius Pfrommer
parent 3ea73d0438
commit b30001429d
18 changed files with 563 additions and 4023 deletions

4
.gitmodules vendored
View File

@ -5,3 +5,7 @@
path = deps/ua-nodeset
url = https://github.com/OPCFoundation/UA-Nodeset
branch = v1.04
[submodule "deps/mqtt-c"]
path = deps/mqtt-c
url = https://github.com/LiamBindle/MQTT-C.git
branch = master

View File

@ -21,6 +21,7 @@ include(GNUInstallDirs)
# Set when installed via make install
set(open62541_TOOLS_DIR ${PROJECT_SOURCE_DIR}/tools)
set(open62541_NODESET_DIR ${PROJECT_SOURCE_DIR}/deps/ua-nodeset)
set(open62541_MQTT_DIR ${PROJECT_SOURCE_DIR}/deps/mqtt-c)
include(macros_internal)
include(macros_public)
@ -416,29 +417,49 @@ endif()
option(UA_ENABLE_PUBSUB_MQTT "Enable publish/subscribe with mqtt (experimental)" OFF)
mark_as_advanced(UA_ENABLE_PUBSUB_MQTT)
if(UA_ENABLE_PUBSUB_MQTT)
# Warn if run in Windows
if(WIN32)
MESSAGE(WARNING "Multithreading is enabled in MQTT plugin. This feature is under development and marked as EXPERIMENTAL")
if(NOT UA_FILE_MQTT)
set(UA_FILE_MQTT ${open62541_MQTT_DIR}/src/mqtt.c)
endif()
if(NOT UA_ENABLE_PUBSUB)
message(FATAL_ERROR "Mqtt cannot be used with disabled PubSub function.")
if(NOT EXISTS "${UA_FILE_MQTT}")
message(FATAL_ERROR "File ${UA_FILE_MQTT} not found.You probably need to initialize the git submodule for deps/mqtt-c.Therefore execute the following command \ngit submodule update --init --recursive")
else()
# Warn if run in Windows
if(WIN32)
MESSAGE(WARNING "Multithreading is enabled in MQTT plugin. This feature is under development and marked as EXPERIMENTAL")
endif()
if(NOT UA_ENABLE_PUBSUB)
message(FATAL_ERROR "Mqtt cannot be used with disabled PubSub function.")
endif()
configure_file(${PROJECT_SOURCE_DIR}/deps/mqtt-c/include/mqtt.h "${PROJECT_BINARY_DIR}/src_generated/mqtt.h" COPYONLY)
configure_file(${PROJECT_SOURCE_DIR}/deps/mqtt-c/include/mqtt_pal.h "${PROJECT_BINARY_DIR}/src_generated/mqtt_pal.h" COPYONLY)
configure_file(${PROJECT_SOURCE_DIR}/plugins/mqtt/templates/posix_sockets.h "${PROJECT_BINARY_DIR}/src_generated/posix_sockets.h" COPYONLY)
endif()
endif()
option(UA_ENABLE_MQTT_TLS_OPENSSL "Enable TLS support for publish/subscribe with mqtt (use OpenSSL)" OFF)
mark_as_advanced(UA_ENABLE_MQTT_TLS_OPENSSL)
option(UA_ENABLE_MQTT_TLS_MBEDTLS "Enable TLS support for publish/subscribe with mqtt (use MBEDTLS)" OFF)
mark_as_advanced(UA_ENABLE_MQTT_TLS_MBEDTLS)
option(UA_ENABLE_MQTT_TLS "Enable TLS support for publish/subscribe with mqtt" OFF)
mark_as_advanced(UA_ENABLE_MQTT_TLS)
if(UA_ENABLE_MQTT_TLS)
if(NOT UA_ENABLE_PUBSUB_MQTT)
message(FATAL_ERROR "Mqtt with TLS cannot be used without UA_ENABLE_PUBSUB_MQTT.")
endif()
if(NOT UA_ENABLE_PUBSUB_MQTT)
message(FATAL_ERROR "Mqtt with TLS cannot be used without UA_ENABLE_PUBSUB_MQTT.")
endif()
if(NOT UA_ENABLE_ENCRYPTION)
message(FATAL_ERROR "Mqtt with TLS cannot be used without UA_ENABLE_ENCRYPTION.")
elseif(UA_ENABLE_ENCRYPTION STREQUAL "OPENSSL")
set(UA_ENABLE_MQTT_TLS_OPENSSL ON)
endif()
if(UA_ENABLE_MQTT_TLS_OPENSSL)
set(UA_ENABLE_MQTT_TLS ON)
add_definitions(-DMQTT_USE_BIO)
configure_file(${PROJECT_SOURCE_DIR}/plugins/mqtt/templates/openssl_sockets.h "${PROJECT_BINARY_DIR}/src_generated/openssl_sockets.h" COPYONLY)
elseif(UA_ENABLE_ENCRYPTION STREQUAL "MBEDTLS")
set(UA_ENABLE_MQTT_TLS_MBEDTLS ON)
add_definitions(-DMQTT_USE_MBEDTLS)
configure_file(${PROJECT_SOURCE_DIR}/plugins/mqtt/templates/mbedtls_sockets.h "${PROJECT_BINARY_DIR}/src_generated/mbedtls_sockets.h" COPYONLY)
else()
message(FATAL_ERROR "UA_ENABLE_ENCRYPTION is set to an undefined value")
endif()
endif()
option(UA_ENABLE_STATUSCODE_DESCRIPTIONS "Enable conversion of StatusCode to human-readable error message" ON)
@ -1076,14 +1097,14 @@ if(UA_ENABLE_PUBSUB)
if(WIN32)
MESSAGE(WARNING "Multithreading is enabled in MQTT plugin. This feature is under development and marked as EXPERIMENTAL")
else()
list(APPEND default_plugin_headers ${PROJECT_SOURCE_DIR}/deps/mqtt-c/mqtt_pal.h
${PROJECT_SOURCE_DIR}/deps/mqtt-c/mqtt.h
${PROJECT_SOURCE_DIR}/plugins/ua_network_pubsub_mqtt.h
${PROJECT_SOURCE_DIR}/plugins/mqtt/ua_mqtt_adapter.h)
list(APPEND default_plugin_sources ${PROJECT_SOURCE_DIR}/plugins/mqtt/ua_mqtt_pal.c
${PROJECT_SOURCE_DIR}/deps/mqtt-c/mqtt.c
${PROJECT_SOURCE_DIR}/plugins/ua_network_pubsub_mqtt.c
${PROJECT_SOURCE_DIR}/plugins/mqtt/ua_mqtt_adapter.c)
list(APPEND default_plugin_headers ${PROJECT_SOURCE_DIR}/deps/mqtt-c/include/mqtt_pal.h
${PROJECT_SOURCE_DIR}/deps/mqtt-c/include/mqtt.h
${PROJECT_SOURCE_DIR}/plugins/include/open62541/plugin/pubsub_mqtt.h
${PROJECT_SOURCE_DIR}/plugins/mqtt/ua_mqtt-c_adapter.h)
list(APPEND default_plugin_sources ${PROJECT_SOURCE_DIR}/deps/mqtt-c/src/mqtt_pal.c
${PROJECT_SOURCE_DIR}/deps/mqtt-c/src/mqtt.c
${PROJECT_SOURCE_DIR}/plugins/ua_pubsub_mqtt.c
${PROJECT_SOURCE_DIR}/plugins/mqtt/ua_mqtt-c_adapter.c)
endif()
endif()
if(UA_ENABLE_MALLOC_SINGLETON AND UA_ENABLE_PUBSUB_BUFMALLOC)

1
deps/mqtt-c vendored Submodule

@ -0,0 +1 @@
Subproject commit f69ce1e7fd54f3b1834c9c9137ce0ec5d703cb4d

1846
deps/mqtt-c/mqtt.c vendored

File diff suppressed because it is too large Load Diff

1590
deps/mqtt-c/mqtt.h vendored

File diff suppressed because it is too large Load Diff

115
deps/mqtt-c/mqtt_pal.h vendored
View File

@ -1,115 +0,0 @@
#ifndef __MQTT_PAL_H__
#define __MQTT_PAL_H__
/**
* Copyright (c) 2019 Kalycito Infotech Private Limited
* @file
* @brief Includes/supports the types/calls required by the MQTT-C client.
*
* @note This is the \em only file included in mqtt.h, and mqtt.c. It is therefore
* responsible for including/supporting all the required types and calls.
*
* @defgroup pal Platform abstraction layer
* @brief Documentation of the types and calls required to port MQTT-C to a new platform.
*
* mqtt_pal.h is the \em only header file included in mqtt.c. Therefore, to port MQTT-C to a
* new platform the following types, functions, constants, and macros must be defined in
* mqtt_pal.h:
* - Types:
* - \c size_t, \c ssize_t
* - \c uint8_t, \c uint16_t, \c uint32_t
* - \c va_list
* - \c mqtt_pal_time_t : return type of \c MQTT_PAL_TIME()
* - \c mqtt_pal_mutex_t : type of the argument that is passed to \c MQTT_PAL_MUTEX_LOCK and
* \c MQTT_PAL_MUTEX_RELEASE
* - Functions:
* - \c memcpy, \c strlen
* - \c va_start, \c va_arg, \c va_end
* - Constants:
* - \c INT_MIN
*
* Additionally, three macro's are required:
* - \c MQTT_PAL_HTONS(s) : host-to-network endian conversion for uint16_t.
* - \c MQTT_PAL_NTOHS(s) : network-to-host endian conversion for uint16_t.
* - \c MQTT_PAL_TIME() : returns [type: \c mqtt_pal_time_t] current time in seconds.
* - \c MQTT_PAL_MUTEX_LOCK(mtx_pointer) : macro that locks the mutex pointed to by \c mtx_pointer.
* - \c MQTT_PAL_MUTEX_RELEASE(mtx_pointer) : macro that unlocks the mutex pointed to by
* \c mtx_pointer.
*
* Lastly, \ref mqtt_pal_sendall and \ref mqtt_pal_recvall, must be implemented in mqtt_pal.c
* for sending and receiving data using the platforms socket calls.
*/
#include <open62541/types.h>
/* UNIX-like platform support */
#ifdef __unix__
#include <limits.h>
#include <string.h>
#include <stdarg.h>
#include <time.h>
#include <arpa/inet.h>
#include <pthread.h>
/*#ifdef MQTT_USE_BIO
#include <openssl/bio.h>
typedef BIO* mqtt_pal_socket_handle;
#else
typedef int mqtt_pal_socket_handle;
#endif*/
#endif
#ifdef __MINGW32__
#include <pthread.h>
#endif
#define MQTT_PAL_HTONS(s) htons(s)
#define MQTT_PAL_NTOHS(s) ntohs(s)
#define MQTT_PAL_TIME() time(NULL)
typedef time_t mqtt_pal_time_t;
typedef pthread_mutex_t mqtt_pal_mutex_t;
#define MQTT_PAL_MUTEX_INIT(mtx_ptr) pthread_mutex_init(mtx_ptr, NULL)
#define MQTT_PAL_MUTEX_LOCK(mtx_ptr) pthread_mutex_lock(mtx_ptr)
#define MQTT_PAL_MUTEX_UNLOCK(mtx_ptr) pthread_mutex_unlock(mtx_ptr)
struct my_custom_socket_handle {
void* client;
void* connection;
#ifdef UA_ENABLE_MQTT_TLS
void* tls;
#endif
uint16_t timeout;
};
typedef struct my_custom_socket_handle* mqtt_pal_socket_handle;
/**
* @brief Sends all the bytes in a buffer.
* @ingroup pal
*
* @param[in] fd The file-descriptor (or handle) of the socket.
* @param[in] buf A pointer to the first byte in the buffer to send.
* @param[in] len The number of bytes to send (starting at \p buf).
* @param[in] flags Flags which are passed to the underlying socket.
*
* @returns The number of bytes sent if successful, an \ref MQTTErrors otherwise.
*/
ssize_t mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags);
/**
* @brief Non-blocking receive all the byte available.
* @ingroup pal
*
* @param[in] fd The file-descriptor (or handle) of the socket.
* @param[in] buf A pointer to the receive buffer.
* @param[in] bufsz The max number of bytes that can be put into \p buf.
* @param[in] flags Flags which are passed to the underlying socket.
*
* @returns The number of bytes received if successful, an \ref MQTTErrors otherwise.
*/
ssize_t mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags);
#endif /* __MQTT_PAL_H__ */

View File

@ -44,6 +44,7 @@
#cmakedefine UA_ENABLE_PUBSUB_MQTT
#cmakedefine UA_ENABLE_MQTT_TLS
#cmakedefine UA_ENABLE_MQTT_TLS_OPENSSL
#cmakedefine UA_ENABLE_MQTT_TLS_MBEDTLS
#cmakedefine UA_ENABLE_ENCRYPTION_MBEDTLS
#cmakedefine UA_ENABLE_TPM2_SECURITY
#cmakedefine UA_ENABLE_ENCRYPTION_OPENSSL

View File

@ -12,11 +12,13 @@
extern "C" {
#endif
#include "open62541/plugin/pubsub.h"
#include "open62541/network_tcp.h"
#include <open62541/plugin/pubsub.h>
#include <open62541/network_tcp.h>
#ifdef UA_ENABLE_MQTT_TLS
#if defined(UA_ENABLE_MQTT_TLS_OPENSSL)
#include <openssl/ssl.h>
#elif defined(UA_ENABLE_MQTT_TLS_MBEDTLS)
#include <mqtt_pal.h>
#endif
/* mqtt network layer specific internal data */
@ -28,8 +30,12 @@ typedef struct {
uint8_t *mqttRecvBuffer;
UA_String *mqttClientId;
UA_Connection *connection;
#ifdef UA_ENABLE_MQTT_TLS_OPENSSL
SSL *ssl;
#if defined(UA_ENABLE_MQTT_TLS_OPENSSL)
BIO *sockfd;
#elif defined(UA_ENABLE_MQTT_TLS_MBEDTLS)
mqtt_pal_socket_handle sockfd;
#else
int sockfd;
#endif
void * mqttClient;
void (*callback)(UA_ByteString *encodedBuffer, UA_ByteString *topic);

View File

@ -0,0 +1,159 @@
#if !defined(__MBEDTLS_SOCKET_TEMPLATE_H__)
#define __MBEDTLS_SOCKET_TEMPLATE_H__
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <mbedtls/error.h>
#include <mbedtls/entropy.h>
#include <mbedtls/ctr_drbg.h>
#include <mbedtls/net_sockets.h>
#include <mbedtls/ssl.h>
#include <open62541/plugin/log_stdout.h>
#include <open62541/util.h>
#if !defined(MBEDTLS_NET_POLL_READ)
/* compat for older mbedtls */
#define MBEDTLS_NET_POLL_READ 1
#define MBEDTLS_NET_POLL_WRITE 1
int
mbedtls_net_poll(mbedtls_net_context * ctx, uint32_t rw, uint32_t timeout)
{
/* XXX this is not ideal but good enough for an example */
usleep(300);
return 1;
}
#endif
struct mbedtls_context {
mbedtls_net_context net_ctx;
mbedtls_ssl_context ssl_ctx;
mbedtls_ssl_config ssl_conf;
mbedtls_x509_crt ca_crt;
mbedtls_entropy_context entropy;
mbedtls_ctr_drbg_context ctr_drbg;
};
UA_StatusCode failed(const char *fn, int rv);
UA_StatusCode cert_verify_failed(uint32_t rv);
UA_StatusCode open_nb_socket(struct mbedtls_context *ctx,
const char *hostname,
const char *port,
const char *ca_file);
UA_StatusCode failed(const char *fn, int rv) {
char buf[100];
mbedtls_strerror(rv, buf, sizeof(buf));
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"MQTT PubSub: %s failed with %x %s", fn, -rv, buf);
return UA_STATUSCODE_BADINTERNALERROR;
}
UA_StatusCode cert_verify_failed(uint32_t rv) {
char buf[512];
mbedtls_x509_crt_verify_info(buf, sizeof(buf), "\t", rv);
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"MQTT PubSub: Certificate verification failed (%x) %s", rv, buf);
return UA_STATUSCODE_BADINTERNALERROR;
}
/*
A template for opening a non-blocking mbed TLS connection.
*/
UA_StatusCode open_nb_socket(struct mbedtls_context *ctx,
const char *hostname,
const char *port,
const char *ca_file) {
const unsigned char *additional = (const unsigned char *)"MQTT-C";
size_t additional_len = 6;
int rv;
mbedtls_net_context *net_ctx = &ctx->net_ctx;
mbedtls_ssl_context *ssl_ctx = &ctx->ssl_ctx;
mbedtls_ssl_config *ssl_conf = &ctx->ssl_conf;
mbedtls_x509_crt *ca_crt = &ctx->ca_crt;
mbedtls_entropy_context *entropy = &ctx->entropy;
mbedtls_ctr_drbg_context *ctr_drbg = &ctx->ctr_drbg;
mbedtls_entropy_init(entropy);
mbedtls_ctr_drbg_init(ctr_drbg);
rv = mbedtls_ctr_drbg_seed(ctr_drbg, mbedtls_entropy_func, entropy,
additional, additional_len);
if (rv != 0) {
return failed("mbedtls_ctr_drbg_seed", rv);
}
mbedtls_x509_crt_init(ca_crt);
rv = mbedtls_x509_crt_parse_file(ca_crt, ca_file);
if (rv != 0) {
return failed("mbedtls_x509_crt_parse_file", rv);
}
mbedtls_ssl_config_init(ssl_conf);
rv = mbedtls_ssl_config_defaults(ssl_conf, MBEDTLS_SSL_IS_CLIENT,
MBEDTLS_SSL_TRANSPORT_STREAM,
MBEDTLS_SSL_PRESET_DEFAULT);
if (rv != 0) {
return failed("mbedtls_ssl_config_defaults", rv);
}
mbedtls_ssl_conf_ca_chain(ssl_conf, ca_crt, NULL);
mbedtls_ssl_conf_authmode(ssl_conf, MBEDTLS_SSL_VERIFY_OPTIONAL);
mbedtls_ssl_conf_rng(ssl_conf, mbedtls_ctr_drbg_random, ctr_drbg);
mbedtls_net_init(net_ctx);
rv = mbedtls_net_connect(net_ctx, hostname, port, MBEDTLS_NET_PROTO_TCP);
if (rv != 0) {
return failed("mbedtls_net_connect", rv);
}
rv = mbedtls_net_set_nonblock(net_ctx);
if (rv != 0) {
return failed("mbedtls_net_set_nonblock", rv);
}
mbedtls_ssl_init(ssl_ctx);
rv = mbedtls_ssl_setup(ssl_ctx, ssl_conf);
if (rv != 0) {
return failed("mbedtls_ssl_setup", rv);
}
rv = mbedtls_ssl_set_hostname(ssl_ctx, hostname);
if (rv != 0) {
return failed("mbedtls_ssl_set_hostname", rv);
}
mbedtls_ssl_set_bio(ssl_ctx, net_ctx,
mbedtls_net_send, mbedtls_net_recv, NULL);
for (;;) {
rv = mbedtls_ssl_handshake(ssl_ctx);
uint32_t want = 0;
if (rv == MBEDTLS_ERR_SSL_WANT_READ) {
want |= MBEDTLS_NET_POLL_READ;
} else if (rv == MBEDTLS_ERR_SSL_WANT_WRITE) {
want |= MBEDTLS_NET_POLL_WRITE;
} else {
break;
}
rv = mbedtls_net_poll(net_ctx, want, (uint32_t)-1);
if (rv < 0) {
return failed("mbedtls_net_poll", rv);
}
}
if (rv != 0) {
return failed("mbedtls_ssl_handshake", rv);
}
uint32_t result = mbedtls_ssl_get_verify_result(ssl_ctx);
if (result != 0) {
if (result == (uint32_t)-1) {
return failed("mbedtls_ssl_get_verify_result", (int)result);
} else {
cert_verify_failed(result);
}
}
return UA_STATUSCODE_GOOD;
}
#endif

View File

@ -0,0 +1,102 @@
#if !defined(__OPENSSL_SOCKET_TEMPLATE_H__)
#define __OPENSSL_SOCKET_TEMPLATE_H__
#include <openssl/bio.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <string.h>
#include <open62541/plugin/log_stdout.h>
#include <open62541/util.h>
/*
A template for opening a non-blocking OpenSSL connection.
*/
UA_StatusCode open_nb_socket(BIO** bio,
SSL_CTX** ssl_ctx,
const char* addr,
const char* port,
const char* ca_file,
const char* ca_path,
const char* cert_file,
const char* key_file);
UA_StatusCode open_nb_socket(BIO** bio,
SSL_CTX** ssl_ctx,
const char* addr,
const char* port,
const char* ca_file,
const char* ca_path,
const char* cert_file,
const char* key_file)
{
*ssl_ctx = SSL_CTX_new(SSLv23_client_method());
SSL* ssl;
/* load certificate */
if (!SSL_CTX_load_verify_locations(*ssl_ctx, ca_file, ca_path)) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"MQTT PubSub: Failed to load ca certificate");
return UA_STATUSCODE_BADINTERNALERROR;
}
if (cert_file && key_file)
{
if (!SSL_CTX_use_certificate_file(*ssl_ctx, cert_file, SSL_FILETYPE_PEM))
{
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"MQTT PubSub: Failed to load client certificate");
return UA_STATUSCODE_BADINTERNALERROR;
}
if (!SSL_CTX_use_PrivateKey_file(*ssl_ctx, key_file, SSL_FILETYPE_PEM))
{
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"MQTT PubSub: Failed to load client key");
return UA_STATUSCODE_BADINTERNALERROR;
}
}
/* open BIO socket */
char * addr_copy = (char*)malloc(strlen(addr) + 1);
strcpy(addr_copy,addr);
char * port_copy = (char*)malloc(strlen(port) + 1);
strcpy(port_copy,port);
*bio = BIO_new_ssl_connect(*ssl_ctx);
BIO_get_ssl(*bio, &ssl);
SSL_set_mode(ssl, SSL_MODE_AUTO_RETRY);
BIO_set_conn_hostname(*bio, addr_copy);
BIO_set_nbio(*bio, 1);
BIO_set_conn_port(*bio, port_copy);
free(addr_copy);
free(port_copy);
/* wait for connect with 10 second timeout */
int start_time = (int)time(NULL);
int do_connect_rv = (int)BIO_do_connect(*bio);
while(do_connect_rv <= 0 && BIO_should_retry(*bio) && (int)time(NULL) - start_time < 10) {
do_connect_rv = (int)BIO_do_connect(*bio);
}
if (do_connect_rv <= 0) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"MQTT PubSub: Failed to open socket: %s", ERR_reason_error_string(ERR_get_error()));
BIO_free_all(*bio);
SSL_CTX_free(*ssl_ctx);
*bio = NULL;
*ssl_ctx=NULL;
return UA_STATUSCODE_BADINTERNALERROR;
}
/* verify certificate */
if (SSL_get_verify_result(ssl) != X509_V_OK) {
/* Handle the failed verification */
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"MQTT PubSub: x509 certificate verification failed");
return UA_STATUSCODE_BADINTERNALERROR;
}
return UA_STATUSCODE_GOOD;
}
#endif

View File

@ -0,0 +1,86 @@
#if !defined(__POSIX_SOCKET_TEMPLATE_H__)
#define __POSIX_SOCKET_TEMPLATE_H__
#include <stdio.h>
#include <sys/types.h>
#if !defined(WIN32)
#include <sys/socket.h>
#include <netdb.h>
#else
#include <ws2tcpip.h>
#endif
#if defined(__VMS)
#include <ioctl.h>
#endif
#include <fcntl.h>
#include <open62541/plugin/log_stdout.h>
#include <open62541/util.h>
/*
A template for opening a non-blocking POSIX socket.
*/
UA_StatusCode open_nb_socket(int* sockfd, const char* addr, const char* port);
UA_StatusCode open_nb_socket(int* sockfd, const char* addr, const char* port) {
struct addrinfo hints = {0};
hints.ai_family = AF_UNSPEC; /* IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* Must be TCP */
*sockfd = -1;
int rv;
struct addrinfo *p, *servinfo;
/* get address information */
rv = getaddrinfo(addr, port, &hints, &servinfo);
if(rv != 0) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"MQTT PubSub: Failed to open socket (getaddrinfo): %s", gai_strerror(rv));
return UA_STATUSCODE_BADINTERNALERROR;
}
/* open the first possible socket */
for(p = servinfo; p != NULL; p = p->ai_next) {
*sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
if (*sockfd == -1) continue;
/* connect to server */
rv = connect(*sockfd, p->ai_addr, p->ai_addrlen);
if(rv == -1) {
close(*sockfd);
*sockfd = -1;
continue;
}
break;
}
/* free servinfo */
freeaddrinfo(servinfo);
/* make non-blocking */
#if !defined(WIN32)
if (*sockfd != -1) fcntl(*sockfd, F_SETFL, fcntl(*sockfd, F_GETFL) | O_NONBLOCK);
#else
if (*sockfd != INVALID_SOCKET) {
int iMode = 1;
ioctlsocket(*sockfd, FIONBIO, &iMode);
}
#endif
#if defined(__VMS)
/*
OpenVMS only partially implements fcntl. It works on file descriptors
but silently fails on socket descriptors. So we need to fall back on
to the older ioctl system to set non-blocking IO
*/
int on = 1;
if (*sockfd != -1) ioctl(*sockfd, FIONBIO, &on);
#endif
/* return the new socket fd */
if(*sockfd == -1)
return UA_STATUSCODE_BADINTERNALERROR;
else
return UA_STATUSCODE_GOOD;
}
#endif

View File

@ -6,28 +6,39 @@
* Copyright (c) 2020 basysKom GmbH
*/
#include "ua_mqtt_adapter.h"
#include "../../deps/mqtt-c/mqtt.h"
#include "open62541/plugin/log_stdout.h"
#include "open62541/util.h"
#include "ua_mqtt-c_adapter.h"
#include <mqtt.h>
#include <open62541/plugin/log_stdout.h>
#include <open62541/util.h>
#ifdef UA_ENABLE_MQTT_TLS_OPENSSL
#include <openssl/ssl.h>
#include <openssl/err.h>
#if defined(UA_ENABLE_MQTT_TLS_OPENSSL)
#include <openssl_sockets.h>
#elif defined(UA_ENABLE_MQTT_TLS_MBEDTLS)
#include <mbedtls_sockets.h>
#else
#include <posix_sockets.h>
#endif
#include <time.h>
/* forward decl for callback */
void
publish_callback(void**, struct mqtt_response_publish*);
void freeTLS(UA_PubSubChannelDataMQTT *data) {
#ifdef UA_ENABLE_MQTT_TLS_OPENSSL
if (!data->ssl)
#if defined(UA_ENABLE_MQTT_TLS_OPENSSL)
if (!data->sockfd)
return;
SSL_shutdown(data->ssl);
SSL_CTX_free(SSL_get_SSL_CTX(data->ssl));
SSL_free(data->ssl);
data->ssl = NULL;
BIO_free_all(data->sockfd);
//SSL_CTX_free(SSL_get_SSL_CTX(data->ssl));
data->sockfd = NULL;
#elif defined(UA_ENABLE_MQTT_TLS_MBEDTLS)
if (!data->sockfd)
return;
mbedtls_ssl_free(data->sockfd);
data->sockfd = NULL;
#else
close(data->sockfd);
#endif
}
@ -37,7 +48,7 @@ connectMqtt(UA_PubSubChannelDataMQTT* channelData){
return UA_STATUSCODE_BADINVALIDARGUMENT;
}
#if defined(UA_ENABLE_MQTT_TLS_OPENSSL) // Extend condition when mbedTLS support is added
#if defined(UA_ENABLE_MQTT_TLS_OPENSSL) || defined(UA_ENABLE_MQTT_TLS_MBEDTLS) // Extend condition when mbedTLS support is added
if ((channelData->mqttClientCertPath.length && !channelData->mqttClientKeyPath.length) ||
(channelData->mqttClientKeyPath.length && !channelData->mqttClientCertPath.length)) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
@ -52,8 +63,7 @@ connectMqtt(UA_PubSubChannelDataMQTT* channelData){
}
#endif
/* Get address and replace mqtt with tcp
* because we use a tcp UA_ClientConnectionTCP for mqtt */
/* Get address and remove 'opc.mqtt://' from the address */
UA_NetworkAddressUrlDataType address = channelData->address;
UA_String hostname, path;
@ -64,59 +74,30 @@ connectMqtt(UA_PubSubChannelDataMQTT* channelData){
return UA_STATUSCODE_BADINVALIDARGUMENT;
}
/* Build the url, replace mqtt with tcp */
UA_STACKARRAY(UA_Byte, addressAsChar, 10 + (sizeof(char) * path.length));
memcpy((char*)addressAsChar, "opc.tcp://", 10);
memcpy((char*)&addressAsChar[10],(char*)path.data, path.length);
address.url.data = addressAsChar;
address.url.length = 10 + (sizeof(char) * path.length);
/* Build the url and port*/
char rest[512];
memcpy(rest, path.data, path.length);
rest[path.length] = '\0';
char *rest2 = rest;
const char *addr = strtok_r(rest2, ":", &rest2);
const char *port = strtok_r(rest2, ":", &rest2);
/* check if buffers are correct */
if(!(channelData->mqttRecvBufferSize > 0 && channelData->mqttRecvBuffer != NULL
&& channelData->mqttSendBufferSize > 0 && channelData->mqttSendBuffer != NULL)){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "MQTT PubSub Connection creation failed. No Mqtt buffer allocated.");
return UA_STATUSCODE_BADARGUMENTSMISSING;
}
/* Config with default parameters */
UA_ConnectionConfig conf;
memset(&conf, 0, sizeof(UA_ConnectionConfig));
conf.protocolVersion = 0;
conf.sendBufferSize = channelData->mqttSendBufferSize;
conf.recvBufferSize = channelData->mqttRecvBufferSize;
conf.localMaxMessageSize = 1000;
conf.remoteMaxMessageSize = 1000;
conf.localMaxChunkCount = 1;
conf.remoteMaxChunkCount = 1;
/* Create TCP connection: open the blocking TCP socket (connecting to the broker) */
UA_Connection connection = UA_ClientConnectionTCP_init(conf, address.url,
1000, UA_Log_Stdout);
UA_ClientConnectionTCP_poll(&connection, 1000, UA_Log_Stdout);
if(connection.state != UA_CONNECTIONSTATE_ESTABLISHED &&
connection.state != UA_CONNECTIONSTATE_OPENING){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK,
"PubSub MQTT: Connection creation failed. Tcp connection failed!");
return UA_STATUSCODE_BADCOMMUNICATIONERROR;
}
/* save connection */
channelData->connection = (UA_Connection*)UA_calloc(1, sizeof(UA_Connection));
if(!channelData->connection){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory.");
return UA_STATUSCODE_BADOUTOFMEMORY;
}
memcpy(channelData->connection, &connection, sizeof(UA_Connection));
#if defined(UA_ENABLE_MQTT_TLS_OPENSSL) // Extend condition when mbedTLS support is added
#if defined(UA_ENABLE_MQTT_TLS_OPENSSL) || defined(UA_ENABLE_MQTT_TLS_MBEDTLS)
#if defined(UA_ENABLE_MQTT_TLS_OPENSSL)
BIO *sockfd = NULL;
SSL_CTX *ctx;
#else
struct mbedtls_context ctx;
mqtt_pal_socket_handle sockfd;
#endif
if (channelData->mqttUseTLS) {
char *mqttCaFilePath = NULL;
if (channelData->mqttCaFilePath.length > 0) {
if(channelData->mqttCaFilePath.length > 0) {
/* Convert tls certificate path UA_String to char* null terminated */
mqttCaFilePath = (char*)calloc(1,channelData->mqttCaFilePath.length + 1);
if(!mqttCaFilePath){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory.");
mqttCaFilePath = (char *)calloc(1, channelData->mqttCaFilePath.length + 1);
if(!mqttCaFilePath) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub MQTT: Connection creation failed. Out of memory.");
// TODO: There are several places in the existing code where channelData->connection is freed but not set to NULL
// The call to disconnectMqtt by the function calling this function in case of error causes heap-use-after-free
// Where should channelData->connection be cleaned up?
@ -126,11 +107,12 @@ connectMqtt(UA_PubSubChannelDataMQTT* channelData){
}
char *mqttCaPath = NULL;
if (channelData->mqttCaPath.length > 0) {
if(channelData->mqttCaPath.length > 0) {
/* Convert tls CA path UA_String to char* null terminated */
mqttCaPath = (char*)calloc(1,channelData->mqttCaPath.length + 1);
if(!mqttCaPath){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory (mqttCAPath).");
mqttCaPath = (char *)calloc(1, channelData->mqttCaPath.length + 1);
if(!mqttCaPath) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub MQTT: Connection creation failed. Out of memory (mqttCAPath).");
UA_free(mqttCaFilePath);
return UA_STATUSCODE_BADOUTOFMEMORY;
}
@ -138,11 +120,12 @@ connectMqtt(UA_PubSubChannelDataMQTT* channelData){
}
char *mqttClientCertPath = NULL;
if (channelData->mqttClientCertPath.length > 0) {
if(channelData->mqttClientCertPath.length > 0) {
/* Convert tls client cert path UA_String to char* null terminated */
mqttClientCertPath = (char*)calloc(1,channelData->mqttClientCertPath.length + 1);
if(!mqttClientCertPath){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory (mqttClientCertPath).");
mqttClientCertPath = (char *)calloc(1, channelData->mqttClientCertPath.length + 1);
if(!mqttClientCertPath) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub MQTT: Connection creation failed. Out of memory (mqttClientCertPath).");
UA_free(mqttCaFilePath);
UA_free(mqttCaPath);
return UA_STATUSCODE_BADOUTOFMEMORY;
@ -151,11 +134,12 @@ connectMqtt(UA_PubSubChannelDataMQTT* channelData){
}
char *mqttClientKeyPath = NULL;
if (channelData->mqttClientKeyPath.length > 0) {
if(channelData->mqttClientKeyPath.length > 0) {
/* Convert tls client key path UA_String to char* null terminated */
mqttClientKeyPath = (char*)calloc(1,channelData->mqttClientKeyPath.length + 1);
if(!mqttClientKeyPath){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory (mqttClientKeyPath).");
mqttClientKeyPath = (char *)calloc(1, channelData->mqttClientKeyPath.length + 1);
if(!mqttClientKeyPath) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub MQTT: Connection creation failed. Out of memory (mqttClientKeyPath).");
UA_free(mqttCaFilePath);
UA_free(mqttCaPath);
UA_free(mqttClientCertPath);
@ -164,122 +148,62 @@ connectMqtt(UA_PubSubChannelDataMQTT* channelData){
memcpy(mqttClientKeyPath, channelData->mqttClientKeyPath.data, channelData->mqttClientKeyPath.length);
}
#endif
#ifdef UA_ENABLE_MQTT_TLS_OPENSSL
SSL_library_init();
OpenSSL_add_all_algorithms();
SSL_load_error_strings();
ERR_load_crypto_strings();
// OpenSSL 1.0 doesn't have TLS_client_method(), use SSLv23 and forbid SSLv2 and SSLv3
SSL_CTX *ctx = SSL_CTX_new(SSLv23_client_method());
SSL_CTX_set_options(ctx, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3);
if (!ctx) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory (SSL_CTX_new).");
#if defined(UA_ENABLE_MQTT_TLS_OPENSSL) // Extend condition when mbedTLS support is added
/* open the non-blocking TCP socket (connecting to the broker) */
UA_StatusCode rv = open_nb_socket(&sockfd, &ctx, addr, port, mqttCaFilePath, NULL, NULL, NULL);;
if(rv != UA_STATUSCODE_GOOD){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed.");
UA_free(mqttCaFilePath);
UA_free(mqttCaPath);
UA_free(mqttClientCertPath);
UA_free(mqttClientKeyPath);
return UA_STATUSCODE_BADOUTOFMEMORY;
return rv;
}
int result = 0;
if (mqttCaFilePath || mqttCaPath)
result = SSL_CTX_load_verify_locations(ctx, mqttCaFilePath, mqttCaPath);
else
result = SSL_CTX_set_default_verify_paths(ctx);
UA_free(mqttCaFilePath);
UA_free(mqttCaPath);
if (!result) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: TLS initialization failed.");
SSL_CTX_free(ctx);
if (!ctx) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed.");
UA_free(mqttCaFilePath);
UA_free(mqttCaPath);
UA_free(mqttClientCertPath);
UA_free(mqttClientKeyPath);
return UA_STATUSCODE_BADSECURITYCHECKSFAILED;
}
SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER, NULL);
if (mqttClientKeyPath && mqttClientCertPath) {
result = SSL_CTX_use_certificate_file(ctx, mqttClientCertPath, SSL_FILETYPE_PEM);
if (result != 1) {
result = SSL_CTX_use_certificate_file(ctx, mqttClientCertPath, SSL_FILETYPE_ASN1);
}
if (SSL_get_error(channelData->ssl, result) != SSL_ERROR_NONE) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Failed to load client certificate.");
SSL_CTX_free(ctx);
UA_free(mqttClientCertPath);
UA_free(mqttClientKeyPath);
return UA_STATUSCODE_BADCOMMUNICATIONERROR;
}
result = SSL_CTX_use_PrivateKey_file(ctx, mqttClientKeyPath, SSL_FILETYPE_PEM);
if (result != 1) {
result = SSL_CTX_use_PrivateKey_file(ctx, mqttClientKeyPath, SSL_FILETYPE_ASN1);
}
if (SSL_get_error(channelData->ssl, result) != SSL_ERROR_NONE) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Failed to load client private key.");
SSL_CTX_free(ctx);
UA_free(mqttClientCertPath);
UA_free(mqttClientKeyPath);
return UA_STATUSCODE_BADCOMMUNICATIONERROR;
}
UA_free(mqttClientCertPath);
UA_free(mqttClientKeyPath);
}
channelData->ssl = SSL_new(ctx);
if (!channelData->ssl) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory (SSL_new).");
SSL_CTX_free(ctx);
return UA_STATUSCODE_BADOUTOFMEMORY;
}
SSL_set_fd(channelData->ssl, connection.sockfd);
int connectError = SSL_ERROR_NONE;
do {
result = SSL_connect(channelData->ssl);
connectError = SSL_get_error(channelData->ssl, result);
} while (connectError == SSL_ERROR_WANT_READ);
if (connectError != SSL_ERROR_NONE) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: TLS connect failed");
const unsigned long err = ERR_get_error();
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "SSL_connect error code: %lu %s", err, ERR_error_string(err, NULL));
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Error description: %s", ERR_reason_error_string(err));
return UA_STATUSCODE_BADCOMMUNICATIONERROR;
}
long verifyResult = SSL_get_verify_result(channelData->ssl);
if (verifyResult != SSL_ERROR_NONE) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: TLS certificate verification failed with result %ld.", verifyResult);
freeTLS(channelData);
return UA_STATUSCODE_BADSECURITYCHECKSFAILED;
return UA_STATUSCODE_BADINTERNALERROR;
}
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: TLS connection successfully opened.");
#endif
}
#if defined(UA_ENABLE_MQTT_TLS_OPENSSL) // Extend condition when mbedTLS support is added
#elif defined(UA_ENABLE_MQTT_TLS_MBEDTLS)
/* open the non-blocking TCP socket (connecting to the broker) */
UA_StatusCode rv = open_nb_socket(&ctx,addr,port,mqttCaFilePath);
if(rv != UA_STATUSCODE_GOOD){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed.");
UA_free(mqttCaFilePath);
UA_free(mqttCaPath);
UA_free(mqttClientCertPath);
UA_free(mqttClientKeyPath);
return rv;
}
sockfd = &ctx.ssl_ctx;
if (sockfd == NULL) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed.");
UA_free(mqttCaFilePath);
UA_free(mqttCaPath);
UA_free(mqttClientCertPath);
UA_free(mqttClientKeyPath);
return UA_STATUSCODE_BADINTERNALERROR;
}
}
#else
int sockfd = -1;
/* open the non-blocking TCP socket (connecting to the broker) */
UA_StatusCode rv = open_nb_socket(&sockfd,addr, port);
if(rv != UA_STATUSCODE_GOOD){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed.");
return rv;
}
#endif
/* Set socket to nonblocking!*/
UA_socket_set_nonblocking(connection.sockfd);
/* calloc mqtt_client */
struct mqtt_client* client = (struct mqtt_client*)UA_calloc(1, sizeof(struct mqtt_client));
if(!client){
@ -294,25 +218,11 @@ connectMqtt(UA_PubSubChannelDataMQTT* channelData){
/* save reference */
channelData->mqttClient = client;
/* create custom sockethandle */
struct my_custom_socket_handle* handle =
(struct my_custom_socket_handle*)UA_calloc(1, sizeof(struct my_custom_socket_handle));
if(!handle){
freeTLS(channelData);
UA_free(channelData->connection);
UA_free(client);
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: Connection creation failed. Out of memory (client socket).");
return UA_STATUSCODE_BADOUTOFMEMORY;
}
handle->client = client;
handle->connection = channelData->connection;
#ifdef UA_ENABLE_MQTT_TLS_OPENSSL
handle->tls = channelData->ssl;
#endif
/* save socket fd*/
channelData->sockfd = sockfd;
/* init mqtt client struct with buffers and callback */
enum MQTTErrors mqttErr = mqtt_init(client, handle, channelData->mqttSendBuffer, channelData->mqttSendBufferSize,
enum MQTTErrors mqttErr = mqtt_init(client, sockfd, channelData->mqttSendBuffer, channelData->mqttSendBufferSize,
channelData->mqttRecvBuffer, channelData->mqttRecvBufferSize, publish_callback);
if(mqttErr != MQTT_OK){
freeTLS(channelData);
@ -371,103 +281,12 @@ connectMqtt(UA_PubSubChannelDataMQTT* channelData){
memcpy(password, channelData->mqttPassword.data, channelData->mqttPassword.length);
}
#ifdef UA_ENABLE_MQTT_TLS_OPENSSL
char *caFilePath = NULL;
if (channelData->mqttCaFilePath.length > 0) {
/* Convert caFilePath UA_String to char* null terminated */
caFilePath = (char*)calloc(1, channelData->mqttCaFilePath.length + 1);
if(!caFilePath){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "connectMqtt() : PubSub MQTT: Connection creation failed. Out of memory.");
freeTLS(channelData);
UA_free(channelData->connection);
UA_free(client);
UA_free(clientId);
UA_free(username);
UA_free(password);
return UA_STATUSCODE_BADOUTOFMEMORY;
}
memcpy(caFilePath, channelData->mqttCaFilePath.data, channelData->mqttCaFilePath.length);
}
char *caPath = NULL;
if (channelData->mqttCaPath.length > 0) {
/* Convert caPath UA_String to char* null terminated */
caPath = (char*)calloc(1, channelData->mqttCaPath.length + 1);
if(!caPath){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "connectMqtt() : PubSub MQTT: Connection creation failed. Out of memory.");
freeTLS(channelData);
UA_free(channelData->connection);
UA_free(client);
UA_free(clientId);
UA_free(username);
UA_free(password);
UA_free(caFilePath);
return UA_STATUSCODE_BADOUTOFMEMORY;
}
memcpy(caPath, channelData->mqttCaPath.data, channelData->mqttCaPath.length);
}
char *clientCertPath = NULL;
if (channelData->mqttClientCertPath.length > 0) {
/* Convert ClientCertPath UA_String to char* null terminated */
clientCertPath = (char*)calloc(1, channelData->mqttClientCertPath.length + 1);
if (!clientCertPath){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "connectMqtt() : PubSub MQTT: Connection creation failed. Out of memory.");
freeTLS(channelData);
UA_free(channelData->connection);
UA_free(client);
UA_free(clientId);
UA_free(username);
UA_free(password);
UA_free(caFilePath);
UA_free(caPath);
return UA_STATUSCODE_BADOUTOFMEMORY;
}
memcpy(clientCertPath, channelData->mqttClientCertPath.data, channelData->mqttClientCertPath.length);
}
char *clientKeyPath = NULL;
if (channelData->mqttClientKeyPath.length > 0) {
/* Convert ClientKeyPath UA_String to char* null terminated */
clientKeyPath = (char*)calloc(1, channelData->mqttClientKeyPath.length + 1);
if (!clientKeyPath){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "connectMqtt() : PubSub MQTT: Connection creation failed. Out of memory.");
freeTLS(channelData);
UA_free(channelData->connection);
UA_free(client);
UA_free(clientId);
UA_free(username);
UA_free(password);
UA_free(caFilePath);
UA_free(caPath);
UA_free(clientCertPath);
return UA_STATUSCODE_BADOUTOFMEMORY;
}
memcpy(clientKeyPath, channelData->mqttClientKeyPath.data, channelData->mqttClientKeyPath.length);
}
UA_Boolean useTLS;
useTLS = channelData->mqttUseTLS;
#endif
/* Connect mqtt with socket fd of networktcp */
#ifdef UA_ENABLE_MQTT_TLS_OPENSSL
mqttErr = mqtt_connect(client, clientId, NULL, NULL, 0, username, password,
caFilePath, caPath, clientCertPath, clientKeyPath, useTLS, 0, client->keep_alive);
UA_free(clientId);
UA_free(username);
UA_free(password);
UA_free(caFilePath);
UA_free(caPath);
UA_free(clientCertPath);
UA_free(clientKeyPath);
#else
/* Connect mqtt with socket */
mqttErr = mqtt_connect(client, clientId, NULL, NULL, 0, username, password,
0, client->keep_alive);
UA_free(clientId);
UA_free(username);
UA_free(password);
#endif
if(mqttErr != MQTT_OK){
freeTLS(channelData);
@ -492,18 +311,13 @@ disconnectMqtt(UA_PubSubChannelDataMQTT* channelData){
if(client){
mqtt_disconnect(client);
yieldMqtt(channelData, 10);
UA_free(client->socketfd);
#ifdef UA_ENABLE_MQTT_TLS_OPENSSL //mbedTLS condition is missing
BIO_free_all(client->socketfd);
#endif
}
freeTLS(channelData);
if(channelData->connection != NULL){
channelData->connection->close(channelData->connection);
channelData->connection->free(channelData->connection);
UA_free(channelData->connection);
channelData->connection = NULL;
}
UA_free(channelData->mqttRecvBuffer);
channelData->mqttRecvBuffer = NULL;
UA_free(channelData->mqttSendBuffer);
@ -571,29 +385,28 @@ subscribeMqtt(UA_PubSubChannelDataMQTT* channelData, UA_String topic, UA_Byte qo
UA_StatusCode
unSubscribeMqtt(UA_PubSubChannelDataMQTT* channelData, UA_String topic){
return UA_STATUSCODE_BADNOTIMPLEMENTED;
if(channelData == NULL || topic.length == 0){
return UA_STATUSCODE_BADINVALIDARGUMENT;
}
struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient;
UA_STACKARRAY(char, topicStr, sizeof(char) * topic.length +1);
memcpy(topicStr, topic.data, topic.length);
topicStr[topic.length] = '\0';
enum MQTTErrors mqttErr = mqtt_unsubscribe(client, topicStr);
if(mqttErr != MQTT_OK){
const char* errorStr = mqtt_error_str(mqttErr);
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: subscribe: %s", errorStr);
return UA_STATUSCODE_BADCOMMUNICATIONERROR;
}
return UA_STATUSCODE_GOOD;
}
UA_StatusCode
yieldMqtt(UA_PubSubChannelDataMQTT* channelData, UA_UInt16 timeout){
if(channelData == NULL || timeout == 0){
return UA_STATUSCODE_BADINVALIDARGUMENT;
}
UA_Connection *connection = channelData->connection;
if(!connection) {
return UA_STATUSCODE_BADCOMMUNICATIONERROR;
}
if(connection->state != UA_CONNECTIONSTATE_ESTABLISHED &&
connection->state != UA_CONNECTIONSTATE_OPENING) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK,
"PubSub MQTT: yield: Tcp Connection not established!");
return UA_STATUSCODE_BADCOMMUNICATIONERROR;
}
struct mqtt_client* client = (struct mqtt_client*)channelData->mqttClient;
client->socketfd->timeout = timeout;
enum MQTTErrors error = mqtt_sync(client);
if(error == MQTT_OK){
@ -646,16 +459,17 @@ publishMqtt(UA_PubSubChannelDataMQTT* channelData, UA_String topic, const UA_Byt
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_NETWORK, "PubSub MQTT: publish: Bad Qos Level.");
return UA_STATUSCODE_BADINVALIDARGUMENT;
}
mqtt_publish(client, topicChar, buf->data, buf->length, (uint8_t)flags);
if (client->error != MQTT_OK) {
if(client->error == MQTT_ERROR_SEND_BUFFER_IS_FULL){
enum MQTTErrors mqttErr = mqtt_publish(client, topicChar, buf->data, buf->length, (uint8_t)flags);
if(mqttErr != MQTT_OK){
if(mqttErr == MQTT_ERROR_SEND_BUFFER_IS_FULL) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: publish: Send buffer is full. "
"Possible reasons: send buffer is to small, "
"sending to fast, broker not responding.");
}else{
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: publish: %s", mqtt_error_str(client->error));
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub MQTT: publish: %s", mqtt_error_str(mqttErr));
}
return UA_STATUSCODE_BADCONNECTIONREJECTED;
return UA_STATUSCODE_BADCOMMUNICATIONERROR;
}
return UA_STATUSCODE_GOOD;
}

View File

@ -13,7 +13,7 @@
extern "C" {
#endif
#include "ua_network_pubsub_mqtt.h"
#include <open62541/plugin/pubsub_mqtt.h>
void freeTLS(UA_PubSubChannelDataMQTT *data);

View File

@ -1,108 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Copyright (c) 2018 Fraunhofer IOSB (Author: Lukas Meling)
* Copyright (c) 2019 Kalycito Infotech Private Limited
* Copyright (c) 2020 basysKom GmbH
*/
#include "../../deps/mqtt-c/mqtt.h"
#include <open62541/network_tcp.h>
#ifdef UA_ENABLE_MQTT_TLS_OPENSSL
#include <openssl/ssl.h>
#endif
ssize_t
mqtt_pal_sendall(mqtt_pal_socket_handle fd, const void* buf, size_t len, int flags) {
#ifdef UA_ENABLE_MQTT_TLS
if (fd->tls) {
#ifdef UA_ENABLE_MQTT_TLS_OPENSSL
SSL *ssl = (SSL*) fd->tls;
int written = 0;
const UA_Byte *buffer = (const UA_Byte *) buf;
while (written < (int) len) {
int rv = SSL_write(ssl, buffer + written, (int) len - written);
if (rv > 0) {
written += rv;
} else {
const int error = SSL_get_error(ssl, rv);
switch (error) {
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
case SSL_ERROR_WANT_ACCEPT:
case SSL_ERROR_WANT_CONNECT:
case SSL_ERROR_WANT_X509_LOOKUP:
return written;
default:
return MQTT_ERROR_SOCKET_ERROR;
}
}
}
return written;
#endif
}
#endif
UA_Connection *connection = (UA_Connection*) fd->connection;
UA_ByteString sendBuffer;
sendBuffer.data = (UA_Byte*)UA_malloc(len);
sendBuffer.length = len;
memcpy(sendBuffer.data, buf, len);
UA_StatusCode ret = connection->send(connection, &sendBuffer);
if(ret != UA_STATUSCODE_GOOD)
return -1;
return (ssize_t)len;
}
ssize_t
mqtt_pal_recvall(mqtt_pal_socket_handle fd, void* buf, size_t bufsz, int flags) {
#ifdef UA_ENABLE_MQTT_TLS
if (fd->tls) {
#ifdef UA_ENABLE_MQTT_TLS_OPENSSL
SSL *ssl = (SSL*) fd->tls;
int read = 0;
UA_Byte * buffer = (UA_Byte *) buf;
do {
int rv = SSL_read(ssl, buffer + read, (int) bufsz - read);
if (rv > 0) {
read += rv;
} else {
const int error = SSL_get_error(ssl, rv);
switch (error) {
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
case SSL_ERROR_WANT_ACCEPT:
case SSL_ERROR_WANT_CONNECT:
case SSL_ERROR_WANT_X509_LOOKUP:
return read;
default:
return MQTT_ERROR_SOCKET_ERROR;
}
}
} while (SSL_pending(ssl) && read < (int) bufsz);
return read;
#endif
}
#endif
UA_Connection *connection = (UA_Connection*)fd->connection;
UA_ByteString inBuffer;
inBuffer.data = (UA_Byte*)buf;
inBuffer.length = bufsz;
UA_StatusCode ret = connection->recv(connection, &inBuffer, fd->timeout);
if(ret == UA_STATUSCODE_GOOD ) {
return (ssize_t)inBuffer.length;
} else if(ret == UA_STATUSCODE_GOODNONCRITICALTIMEOUT) {
return 0;
} else {
return -1; //error case, no free necessary
}
}

View File

@ -11,13 +11,13 @@
* Currently only ua_mqtt_adapter.c implements this
* interface and maps them to the specific "MQTT-C" library functions.
* Another mqtt lib could be used.
* "ua_mqtt_pal.c" forwards the network calls (send/recv) to UA_Connection (TCP).
* "mqtt_pal.c" forwards the network calls (send/recv) to UA_Connection (TCP).
*/
#include <open62541/server_pubsub.h>
#include <open62541/util.h>
#include "mqtt/ua_mqtt_adapter.h"
#include "mqtt/ua_mqtt-c_adapter.h"
#include "open62541/plugin/log_stdout.h"
static UA_StatusCode
@ -67,8 +67,12 @@ UA_PubSubChannelMQTT_open(const UA_PubSubConnectionConfig *connectionConfig) {
/* set default values */
UA_String mqttClientId = UA_STRING("open62541_pub");
memcpy(channelDataMQTT, &(UA_PubSubChannelDataMQTT){address, 2000, 2000, NULL, NULL, &mqttClientId, NULL,
#ifdef UA_ENABLE_MQTT_TLS_OPENSSL // Initialize the "ssl" member
#if defined(UA_ENABLE_MQTT_TLS_OPENSSL) // Initialize the "ssl" member
NULL,
#elif defined(UA_ENABLE_MQTT_TLS_MBEDTLS)
NULL,
#else
-1,
#endif
NULL, NULL,
UA_STRING_NULL, UA_STRING_NULL, UA_STRING_NULL, UA_STRING_NULL,

View File

@ -840,18 +840,18 @@ UA_NetworkMessage_decodePayload(const UA_ByteString *src, size_t *offset, UA_Net
UA_CHECK_MEM(dst->payload.dataSetPayload.dataSetMessages,
return UA_STATUSCODE_BADOUTOFMEMORY);
for(UA_Byte i = 0; i < count; i++) {
if(&dst->payload.dataSetPayload.sizes[i] == NULL){
rv = UA_DataSetMessage_decodeBinary(src, offset,
&(dst->payload.dataSetPayload.dataSetMessages[i]),
0);
} else {
if(count == 1)
rv = UA_DataSetMessage_decodeBinary(src, offset,
&(dst->payload.dataSetPayload.dataSetMessages[0]),
0);
else {
for(UA_Byte i = 0; i < count; i++) {
rv = UA_DataSetMessage_decodeBinary(src, offset,
&(dst->payload.dataSetPayload.dataSetMessages[i]),
dst->payload.dataSetPayload.sizes[i]);
}
UA_CHECK_STATUS(rv, return rv);
}
UA_CHECK_STATUS(rv, return rv);
return UA_STATUSCODE_GOOD;

View File

@ -130,14 +130,14 @@ if(UA_ENABLE_PUBSUB)
endif()
if(UA_ENABLE_PUBSUB_MQTT)
list(APPEND test_plugin_sources ${PROJECT_SOURCE_DIR}/deps/mqtt-c/mqtt_pal.h
${PROJECT_SOURCE_DIR}/deps/mqtt-c/mqtt.h
${PROJECT_SOURCE_DIR}/plugins/ua_network_pubsub_mqtt.h
${PROJECT_SOURCE_DIR}/plugins/mqtt/ua_mqtt_adapter.h
${PROJECT_SOURCE_DIR}/plugins/mqtt/ua_mqtt_pal.c
${PROJECT_SOURCE_DIR}/deps/mqtt-c/mqtt.c
${PROJECT_SOURCE_DIR}/plugins/ua_network_pubsub_mqtt.c
${PROJECT_SOURCE_DIR}/plugins/mqtt/ua_mqtt_adapter.c)
list(APPEND test_plugin_sources ${PROJECT_SOURCE_DIR}/deps/mqtt-c/include/mqtt_pal.h
${PROJECT_SOURCE_DIR}/deps/mqtt-c/include/mqtt.h
${PROJECT_SOURCE_DIR}/plugins/include/open62541/plugin/pubsub_mqtt.h
${PROJECT_SOURCE_DIR}/plugins/mqtt/ua_mqtt-c_adapter.h
${PROJECT_SOURCE_DIR}/deps/mqtt-c/src/mqtt.c
${PROJECT_SOURCE_DIR}/deps/mqtt-c/src/mqtt_pal.c
${PROJECT_SOURCE_DIR}/plugins/ua_pubsub_mqtt.c
${PROJECT_SOURCE_DIR}/plugins/mqtt/ua_mqtt-c_adapter.c)
endif()
endif()

View File

@ -10,13 +10,14 @@
#include <open62541/server.h>
#include <open62541/server_config_default.h>
#include "ua_network_pubsub_mqtt.h"
#include <open62541/plugin/pubsub_mqtt.h>
#include "ua_server_internal.h"
#include <check.h>
//#define TEST_MQTT_SERVER "opc.mqtt://test.mosquitto.org:1883/"
#define TEST_MQTT_SERVER "opc.mqtt://localhost:1883/"
//#define TEST_MQTT_SERVER "opc.mqtt://test.mosquitto.org:1883"
#define TEST_MQTT_SERVER "opc.mqtt://localhost:1883"
UA_Server *server = NULL;
UA_ServerConfig *config = NULL;