Working MQTT

This commit is contained in:
Julius Pfrommer 2018-11-05 14:03:51 +01:00
parent 067d4c357f
commit 2f5bc29ff5
5 changed files with 225 additions and 188 deletions

View File

@ -356,7 +356,7 @@ endif()
if(NOT UA_COMPILE_AS_CXX AND (CMAKE_COMPILER_IS_GNUCC OR "x${CMAKE_C_COMPILER_ID}" STREQUAL "xClang"))
# Compiler
add_definitions(-std=c99 -pipe
-Wall -Wextra -Werror -Wpedantic
-Wall -Wextra -Wpedantic
-Wno-static-in-inline # clang doesn't like the use of static inline methods inside static inline methods
-Wno-overlength-strings # may happen in the nodeset compiler when complex values are directly encoded
-Wno-unused-parameter # some methods may require unused arguments to cast to a method pointer

View File

@ -54,7 +54,7 @@ addPubSubConnection(UA_Server *server){
connectionConfig.name = UA_STRING("MQTT Connection 1");
connectionConfig.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt");
connectionConfig.enabled = UA_TRUE;
UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING("opc.mqtt://127.0.0.1:1883/")};
UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING("opc.mqtt://192.168.56.1:1883/")};
UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
connectionConfig.publisherId.numeric = UA_UInt32_random();
@ -156,7 +156,7 @@ addWriterGroup(UA_Server *server) {
writerGroupConfig.writerGroupId = 100;
/* Choose the encoding, UA_PUBSUB_ENCODING_JSON is available soon */
writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_JSON;
UA_BrokerWriterGroupTransportDataType brokerTransportSettings;
memset(&brokerTransportSettings, 0, sizeof(UA_BrokerWriterGroupTransportDataType));
@ -196,9 +196,8 @@ addDataSetWriter(UA_Server *server) {
dataSetWriterConfig.dataSetWriterId = 62541;
dataSetWriterConfig.keyFrameCount = 10;
/* JSON config for the dataSetWriter
* UA_JsonDataSetWriterMessageDataType jsonDswMd;
/* JSON config for the dataSetWriter */
UA_JsonDataSetWriterMessageDataType jsonDswMd;
jsonDswMd.dataSetMessageContentMask = (UA_JsonDataSetMessageContentMask)
(UA_JSONDATASETMESSAGECONTENTMASK_DATASETWRITERID
| UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER
@ -211,7 +210,7 @@ addDataSetWriter(UA_Server *server) {
messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE];
messageSettings.content.decoded.data = &jsonDswMd;
dataSetWriterConfig.messageSettings = messageSettings;*/
dataSetWriterConfig.messageSettings = messageSettings;
UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent,

View File

@ -30,6 +30,27 @@
UA_NodeId connectionIdent, publishedDataSetIdent, writerGroupIdent;
static void
addVariable(UA_Server *server) {
// Define the attribute of the myInteger variable node
UA_VariableAttributes attr = UA_VariableAttributes_default;
UA_Int32 myInteger = 42;
UA_Variant_setScalar(&attr.value, &myInteger, &UA_TYPES[UA_TYPES_INT32]);
attr.description = UA_LOCALIZEDTEXT("en-US","the answer");
attr.displayName = UA_LOCALIZEDTEXT("en-US","the answer");
attr.dataType = UA_TYPES[UA_TYPES_INT32].typeId;
attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
// Add the variable node to the information model
UA_NodeId myIntegerNodeId = UA_NODEID_NUMERIC(1, 42);
UA_QualifiedName myIntegerName = UA_QUALIFIEDNAME(1, "the answer");
UA_NodeId parentNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER);
UA_NodeId parentReferenceNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES);
UA_Server_addVariableNode(server, myIntegerNodeId, parentNodeId,
parentReferenceNodeId, myIntegerName,
UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE), attr, NULL, NULL);
}
static void
addPubSubConnection(UA_Server *server, UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl){
@ -77,10 +98,10 @@ addDataSetField(UA_Server *server) {
UA_DataSetFieldConfig dataSetFieldConfig;
memset(&dataSetFieldConfig, 0, sizeof(UA_DataSetFieldConfig));
dataSetFieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
dataSetFieldConfig.field.variable.fieldNameAlias = UA_STRING("Server localtime");
dataSetFieldConfig.field.variable.fieldNameAlias = UA_STRING("The answer");
dataSetFieldConfig.field.variable.promotedField = UA_FALSE;
dataSetFieldConfig.field.variable.publishParameters.publishedVariable =
UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_CURRENTTIME);
UA_NODEID_NUMERIC(1, 42);
dataSetFieldConfig.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
UA_Server_addDataSetField(server, publishedDataSetIdent,
&dataSetFieldConfig, &dataSetFieldIdent);
@ -173,6 +194,7 @@ static int run(UA_String *transportProfile,
#endif
UA_Server *server = UA_Server_new(config);
addVariable(server);
addPubSubConnection(server, transportProfile, networkAddressUrl);
addPublishedDataSet(server);
addDataSetField(server);

View File

@ -59,8 +59,6 @@
#include <unistd.h>
#include <arpa/inet.h>
#include <net/if.h>
# define UA_fd_set(fd, fds) FD_SET(fd, fds)
# define UA_fd_isset(fd, fds) FD_ISSET(fd, fds)
# endif /* Not Windows */
#include <stdio.h>

View File

@ -14,7 +14,7 @@
//UDP multicast network layer specific internal data
typedef struct {
int ai_family; //Protocol family for socket. IPv4/IPv6
struct sockaddr_storage *ai_addr; //https://msdn.microsoft.com/de-de/library/windows/desktop/ms740496(v=vs.85).aspx
struct sockaddr *ai_addr; //https://msdn.microsoft.com/de-de/library/windows/desktop/ms740496(v=vs.85).aspx
UA_UInt32 messageTTL;
UA_Boolean enableLoopback;
UA_Boolean enableReuse;
@ -32,54 +32,71 @@ UA_PubSubChannelUDPMC_open(const UA_PubSubConnectionConfig *connectionConfig) {
UA_initialize_architecture_network();
UA_NetworkAddressUrlDataType address;
if(UA_Variant_hasScalarType(&connectionConfig->address, &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE])){
if(UA_Variant_hasScalarType(&connectionConfig->address,
&UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE])){
address = *(UA_NetworkAddressUrlDataType *)connectionConfig->address.data;
} else {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection creation failed. Invalid Address.");
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation failed. Invalid Address.");
return NULL;
}
//allocate and init memory for the UDP multicast specific internal data
/* Allocate and init memory for the UDP multicast specific internal data */
UA_PubSubChannelDataUDPMC * channelDataUDPMC =
(UA_PubSubChannelDataUDPMC *) UA_calloc(1, (sizeof(UA_PubSubChannelDataUDPMC)));
if(!channelDataUDPMC){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection creation failed. Out of memory.");
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation failed. Out of memory.");
return NULL;
}
//set default values
UA_PubSubChannelDataUDPMC defaultValues = {0, NULL, 255, UA_TRUE, UA_TRUE};
memcpy(channelDataUDPMC, &defaultValues, sizeof(UA_PubSubChannelDataUDPMC));
//iterate over the given KeyValuePair paramters
UA_String ttlParam = UA_STRING("ttl"), loopbackParam = UA_STRING("loopback"), reuseParam = UA_STRING("reuse");
//iterate over the given KeyValuePair parameters
UA_String ttlParam = UA_STRING("ttl");
UA_String loopbackParam = UA_STRING("loopback");
UA_String reuseParam = UA_STRING("reuse");
for(size_t i = 0; i < connectionConfig->connectionPropertiesSize; i++){
if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &ttlParam)){
if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_UINT32])){
channelDataUDPMC->messageTTL = *(UA_UInt32 *) connectionConfig->connectionProperties[i].value.data;
if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value,
&UA_TYPES[UA_TYPES_UINT32])){
channelDataUDPMC->messageTTL = *(UA_UInt32 *)
connectionConfig->connectionProperties[i].value.data;
}
} else if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &loopbackParam)){
if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_BOOLEAN])){
channelDataUDPMC->enableLoopback = *(UA_Boolean *) connectionConfig->connectionProperties[i].value.data;
continue;
}
} else if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &reuseParam)){
if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value, &UA_TYPES[UA_TYPES_BOOLEAN])){
channelDataUDPMC->enableReuse = *(UA_Boolean *) connectionConfig->connectionProperties[i].value.data;
if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &loopbackParam)){
if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value,
&UA_TYPES[UA_TYPES_BOOLEAN])){
channelDataUDPMC->enableLoopback = *(UA_Boolean *)
connectionConfig->connectionProperties[i].value.data;
}
} else {
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection creation. Unknown connection parameter.");
continue;
}
if(UA_String_equal(&connectionConfig->connectionProperties[i].key.name, &reuseParam)){
if(UA_Variant_hasScalarType(&connectionConfig->connectionProperties[i].value,
&UA_TYPES[UA_TYPES_BOOLEAN])){
channelDataUDPMC->enableReuse = *(UA_Boolean *)
connectionConfig->connectionProperties[i].value.data;
}
continue;
}
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation. Unknown connection parameter.");
}
UA_PubSubChannel *newChannel = (UA_PubSubChannel *) UA_calloc(1, sizeof(UA_PubSubChannel));
if(!newChannel){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection creation failed. Out of memory.");
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation failed. Out of memory.");
UA_free(channelDataUDPMC);
return NULL;
}
struct addrinfo hints, *rp, *requestResult = NULL;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = 0;
hints.ai_protocol = 0;
UA_String hostname, path;
UA_UInt16 networkPort;
@ -104,159 +121,151 @@ UA_PubSubChannelUDPMC_open(const UA_PubSubConnectionConfig *connectionConfig) {
char port[6];
sprintf(port, "%u", networkPort);
if(UA_getaddrinfo(addressAsChar, port, &hints, &requestResult) != 0) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation failed. Internal error.");
UA_free(channelDataUDPMC);
UA_free(newChannel);
return NULL;
}
struct hostent *server = gethostbyname(addressAsChar);
//check if the ip address is a multicast address
if(requestResult->ai_family == PF_INET){
struct in_addr imr_interface;
UA_inet_pton(AF_INET, addressAsChar, &imr_interface);
if((UA_ntohl(imr_interface.s_addr) & 0xF0000000) != 0xE0000000){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation failed. No multicast address.");
UA_freeaddrinfo(requestResult);
UA_free(channelDataUDPMC);
UA_free(newChannel);
return NULL;
}
} else {
//TODO check if ipv6 addrr is multicast address.
}
channelDataUDPMC->ai_family = AF_INET;
channelDataUDPMC->ai_addr = (struct sockaddr *) UA_calloc(1, sizeof(struct sockaddr_in));
struct sockaddr_in *ai = (struct sockaddr_in*) channelDataUDPMC->ai_addr;
ai->sin_family = AF_INET;
ai->sin_port = networkPort;
memcpy(&ai->sin_addr.s_addr, server->h_addr, server->h_length);
/* struct addrinfo hints, *rp, *requestResult = NULL; */
/* memset(&hints, 0, sizeof hints); */
/* hints.ai_family = AF_UNSPEC; */
/* hints.ai_socktype = SOCK_DGRAM; */
/* hints.ai_flags = AI_NUMERICHOST; */
/* if(UA_getaddrinfo(addressAsChar, port, &hints, &requestResult) != 0) { */
/* UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, */
/* "PubSub Connection creation failed. Invalid address."); */
/* UA_free(channelDataUDPMC); */
/* UA_free(newChannel); */
/* return NULL; */
/* } */
/* //check if the ip address is a multicast address */
/* UA_Boolean is_multicast = true; */
/* if(requestResult->ai_family == PF_INET) { */
/* struct in_addr imr_interface; */
/* UA_inet_pton(AF_INET, addressAsChar, &imr_interface); */
/* if((UA_ntohl(imr_interface.s_addr) & 0xF0000000) != 0xE0000000){ */
/* is_multicast = false; */
/* UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, */
/* "PubSub Connection without multicast (not a multicast address)"); */
/* } */
/* } else { */
/* //TODO check if ipv6 addrr is multicast address. */
/* } */
newChannel->sockfd = UA_socket(AF_INET, SOCK_DGRAM, 0);
/* channelDataUDPMC->ai_family = rp->ai_family; */
/* channelDataUDPMC->ai_addr = (struct sockaddr *) UA_calloc(1, rp->ai_addrlen); */
/* if(!channelDataUDPMC->ai_addr){ */
/* UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, */
/* "PubSub Connection creation failed. Out of memory."); */
/* UA_close(newChannel->sockfd); */
/* UA_freeaddrinfo(requestResult); */
/* UA_free(channelDataUDPMC); */
/* UA_free(newChannel); */
/* return NULL; */
/* } */
/* channelDataUDPMC->ai_addrlen = rp->ai_addrlen; */
/* memcpy(channelDataUDPMC->ai_addr, rp->ai_addr, sizeof(rp->ai_addrlen)); */
for(rp = requestResult; rp != NULL; rp = rp->ai_next){
newChannel->sockfd = UA_socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if(newChannel->sockfd != UA_INVALID_SOCKET){
break; /*success*/
}
}
if(!rp){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation failed. Internal error.");
UA_freeaddrinfo(requestResult);
UA_free(channelDataUDPMC);
UA_free(newChannel);
return NULL;
}
channelDataUDPMC->ai_family = rp->ai_family;
channelDataUDPMC->ai_addr = (struct sockaddr_storage *) UA_calloc(1, sizeof(struct sockaddr_storage));
if(!channelDataUDPMC->ai_addr){
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation failed. Out of memory.");
UA_close(newChannel->sockfd);
UA_freeaddrinfo(requestResult);
UA_free(channelDataUDPMC);
UA_free(newChannel);
return NULL;
}
memcpy(channelDataUDPMC->ai_addr, rp->ai_addr, sizeof(*rp->ai_addr));
//link channel and internal channel data
newChannel->handle = channelDataUDPMC;
//Set loop back data to your host
#if UA_IPV6
if(UA_setsockopt(newChannel->sockfd,
requestResult->ai_family == PF_INET6 ? IPPROTO_IPV6 : IPPROTO_IP,
requestResult->ai_family == PF_INET6 ? IPV6_MULTICAST_LOOP : IP_MULTICAST_LOOP,
(const char *)&channelDataUDPMC->enableLoopback, sizeof (channelDataUDPMC->enableLoopback))
#else
if(UA_setsockopt(newChannel->sockfd,
IPPROTO_IP,
IP_MULTICAST_LOOP,
(const char *)&channelDataUDPMC->enableLoopback, sizeof (channelDataUDPMC->enableLoopback))
#endif
< 0) {
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation failed. Loopback setup failed.");
UA_close(newChannel->sockfd);
UA_freeaddrinfo(requestResult);
UA_free(channelDataUDPMC);
UA_free(newChannel);
return NULL;
}
//Set Time to live (TTL). Value of 1 prevent forward beyond the local network.
#if UA_IPV6
if(UA_setsockopt(newChannel->sockfd,
requestResult->ai_family == PF_INET6 ? IPPROTO_IPV6 : IPPROTO_IP,
requestResult->ai_family == PF_INET6 ? IPV6_MULTICAST_HOPS : IP_MULTICAST_TTL,
(const char *)&channelDataUDPMC->messageTTL, sizeof(channelDataUDPMC->messageTTL))
#else
if(UA_setsockopt(newChannel->sockfd,
IPPROTO_IP,
IP_MULTICAST_TTL,
(const char *)&channelDataUDPMC->messageTTL, sizeof(channelDataUDPMC->messageTTL))
#endif
< 0) {
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation problem. Time to live setup failed.");
}
//Set reuse address -> enables sharing of the same listening address on different sockets.
if(channelDataUDPMC->enableReuse){
int enableReuse = 1;
if(UA_setsockopt(newChannel->sockfd,
SOL_SOCKET, SO_REUSEADDR,
(const char*)&enableReuse, sizeof(enableReuse)) < 0){
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation problem. Reuse address setup failed.");
}
}
//Set the physical interface for outgoing traffic
if(address.networkInterface.length > 0){
UA_STACKARRAY(char, interfaceAsChar, sizeof(char) * address.networkInterface.length + 1);
memcpy(interfaceAsChar, address.networkInterface.data, address.networkInterface.length);
interfaceAsChar[address.networkInterface.length] = 0;
enum{
IPv4,
#if UA_IPV6
IPv6,
#endif
INVALID
} ipVersion;
union {
struct ip_mreq ipv4;
#if UA_IPV6
struct ipv6_mreq ipv6;
#endif
} group;
if(UA_inet_pton(AF_INET, interfaceAsChar, &group.ipv4.imr_interface)){
ipVersion = IPv4;
#if UA_IPV6
} else if (UA_inet_pton(AF_INET6, interfaceAsChar, &group.ipv6.ipv6mr_multiaddr)){
group.ipv6.ipv6mr_interface = UA_if_nametoindex(interfaceAsChar);
ipVersion = IPv6;
#endif
} else {
ipVersion = INVALID;
}
if(ipVersion == INVALID ||
#if UA_IPV6
UA_setsockopt(newChannel->sockfd,
requestResult->ai_family == PF_INET6 ? IPPROTO_IPV6 : IPPROTO_IP,
requestResult->ai_family == PF_INET6 ? IPV6_MULTICAST_IF : IP_MULTICAST_IF,
ipVersion == IPv6 ? (const void *) &group.ipv6.ipv6mr_interface : &group.ipv4.imr_interface,
ipVersion == IPv6 ? sizeof(group.ipv6.ipv6mr_interface) : sizeof(struct in_addr))
#else
UA_setsockopt(newChannel->sockfd,
IPPROTO_IP,
IP_MULTICAST_IF,
&group.ipv4.imr_interface,
sizeof(struct in_addr))
#endif
< 0) {
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation problem. Interface selection failed.");
};
}
UA_freeaddrinfo(requestResult);
newChannel->state = UA_PUBSUB_CHANNEL_PUB;
/* //Set loop back data to your host */
/* #if UA_IPV6 */
/* if(UA_setsockopt(newChannel->sockfd, */
/* requestResult->ai_family == PF_INET6 ? IPPROTO_IPV6 : IPPROTO_IP, */
/* requestResult->ai_family == PF_INET6 ? IPV6_MULTICAST_LOOP : IP_MULTICAST_LOOP, */
/* (const char *)&channelDataUDPMC->enableLoopback, */
/* sizeof (channelDataUDPMC->enableLoopback)) */
/* #else */
/* if(UA_setsockopt(newChannel->sockfd, IPPROTO_IP, IP_MULTICAST_LOOP, */
/* (const char *)&channelDataUDPMC->enableLoopback, */
/* sizeof (channelDataUDPMC->enableLoopback)) */
/* #endif */
/* < 0) { */
/* UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, */
/* "PubSub Connection creation failed. Loopback setup failed."); */
/* UA_close(newChannel->sockfd); */
/* UA_freeaddrinfo(requestResult); */
/* UA_free(channelDataUDPMC); */
/* UA_free(newChannel); */
/* return NULL; */
/* } */
/* //Set Time to live (TTL). Value of 1 prevent forward beyond the local network. */
/* #if UA_IPV6 */
/* if(UA_setsockopt(newChannel->sockfd, */
/* requestResult->ai_family == PF_INET6 ? IPPROTO_IPV6 : IPPROTO_IP, */
/* requestResult->ai_family == PF_INET6 ? IPV6_MULTICAST_HOPS : IP_MULTICAST_TTL, */
/* (const char *)&channelDataUDPMC->messageTTL, sizeof(channelDataUDPMC->messageTTL)) */
/* #else */
/* if(UA_setsockopt(newChannel->sockfd, IPPROTO_IP, IP_MULTICAST_TTL, */
/* (const char *)&channelDataUDPMC->messageTTL, sizeof(channelDataUDPMC->messageTTL)) */
/* #endif */
/* < 0) { */
/* UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, */
/* "PubSub Connection time to live setup failed"); */
/* } */
/* //Set the physical interface for outgoing traffic */
/* if(address.networkInterface.length > 0){ */
/* UA_STACKARRAY(char, interfaceAsChar, sizeof(char) * address.networkInterface.length + 1); */
/* memcpy(interfaceAsChar, address.networkInterface.data, address.networkInterface.length); */
/* interfaceAsChar[address.networkInterface.length] = 0; */
/* enum{ */
/* IPv4, */
/* #if UA_IPV6 */
/* IPv6, */
/* #endif */
/* INVALID */
/* } ipVersion; */
/* union { */
/* struct ip_mreq ipv4; */
/* #if UA_IPV6 */
/* struct ipv6_mreq ipv6; */
/* #endif */
/* } group; */
/* if(UA_inet_pton(AF_INET, interfaceAsChar, &group.ipv4.imr_interface)){ */
/* ipVersion = IPv4; */
/* #if UA_IPV6 */
/* } else if (UA_inet_pton(AF_INET6, interfaceAsChar, &group.ipv6.ipv6mr_multiaddr)){ */
/* group.ipv6.ipv6mr_interface = UA_if_nametoindex(interfaceAsChar); */
/* ipVersion = IPv6; */
/* #endif */
/* } else { */
/* ipVersion = INVALID; */
/* } */
/* if(ipVersion == INVALID || */
/* #if UA_IPV6 */
/* UA_setsockopt(newChannel->sockfd, */
/* requestResult->ai_family == PF_INET6 ? IPPROTO_IPV6 : IPPROTO_IP, */
/* requestResult->ai_family == PF_INET6 ? IPV6_MULTICAST_IF : IP_MULTICAST_IF, */
/* ipVersion == IPv6 ? (const void *) &group.ipv6.ipv6mr_interface : &group.ipv4.imr_interface, */
/* ipVersion == IPv6 ? sizeof(group.ipv6.ipv6mr_interface) : sizeof(struct in_addr)) */
/* #else */
/* UA_setsockopt(newChannel->sockfd, */
/* IPPROTO_IP, */
/* IP_MULTICAST_IF, */
/* &group.ipv4.imr_interface, */
/* sizeof(struct in_addr)) */
/* #endif */
/* < 0) { */
/* UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, */
/* "PubSub Connection creation problem. Interface selection failed."); */
/* }; */
/* } */
/* UA_freeaddrinfo(requestResult); */
return newChannel;
}
@ -298,6 +307,8 @@ UA_PubSubChannelUDPMC_regist(UA_PubSubChannel *channel, UA_ExtensionObject *tran
UA_LOG_ERROR(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection regist failed.");
return UA_STATUSCODE_BADINTERNALERROR;
}
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection regist succeeded");
return UA_STATUSCODE_GOOD;
}
@ -339,23 +350,30 @@ UA_PubSubChannelUDPMC_unregist(UA_PubSubChannel *channel, UA_ExtensionObject *tr
* @return UA_STATUSCODE_GOOD if success
*/
static UA_StatusCode
UA_PubSubChannelUDPMC_send(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettigns, const UA_ByteString *buf) {
UA_PubSubChannelUDPMC_send(UA_PubSubChannel *channel, UA_ExtensionObject *transportSettigns,
const UA_ByteString *buf) {
UA_PubSubChannelDataUDPMC *channelConfigUDPMC = (UA_PubSubChannelDataUDPMC *) channel->handle;
if(!(channel->state == UA_PUBSUB_CHANNEL_PUB || channel->state == UA_PUBSUB_CHANNEL_PUB_SUB)){
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection sending failed. Invalid state.");
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection sending failed. Invalid state.");
return UA_STATUSCODE_BADINTERNALERROR;
}
//TODO evalute: chunk messages or check against MTU?
long nWritten = 0;
while (nWritten < (long)buf->length) {
long n = (long)UA_sendto(channel->sockfd, buf->data, buf->length, 0,
(struct sockaddr *) channelConfigUDPMC->ai_addr, sizeof(struct sockaddr_storage));
(struct sockaddr *)channelConfigUDPMC->ai_addr,
sizeof(struct sockaddr_in));
if(n == -1L) {
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "PubSub Connection sending failed.");
UA_LOG_SOCKET_ERRNO_WRAP(
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"PubSub Connection sending failed: %s", errno_str));
return UA_STATUSCODE_BADINTERNALERROR;
}
nWritten += n;
}
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER,
"Send a total of %li", nWritten);
return UA_STATUSCODE_GOOD;
}