feat(el): cached address udp (#5355)

* feat(el): cached addresses for udp eventloop

Instead of using connect + send use sendto and cache
the destination address for each opened connection.

* fix(el): choose correct error wrapper

* fix(el): win32 use size_t not socklen

* docs(el): add netif param & active passiv distinct

* build: add new subproj to idea structure

* Use ai_addrlen for memcpy of address information

Co-authored-by: Julius Pfrommer <jpfr@users.noreply.github.com>
This commit is contained in:
JanSurft 2022-10-06 22:42:00 +02:00 committed by GitHub
parent eb1abac236
commit 90068f2811
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 52 additions and 40 deletions

View File

@ -3,6 +3,7 @@
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
<mapping directory="$PROJECT_DIR$/deps/mdnsd" vcs="Git" />
<mapping directory="$PROJECT_DIR$/deps/mqtt-c" vcs="Git" />
<mapping directory="$PROJECT_DIR$/deps/ua-nodeset" vcs="Git" />
</component>
</project>

View File

@ -671,7 +671,7 @@ TCP_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId,
return UA_STATUSCODE_GOOD;
shutdown:
UA_LOG_SOCKET_ERRNO_GAI_WRAP(
UA_LOG_SOCKET_ERRNO_WRAP(
UA_LOG_ERROR(cm->eventSource.eventLoop->logger, UA_LOGCATEGORY_NETWORK,
"TCP %u\t| Send failed with error %s",
(unsigned)connectionId, errno_str));

View File

@ -30,6 +30,12 @@
/* A registered file descriptor with an additional method pointer */
typedef struct {
UA_RegisteredFD fd;
struct sockaddr_storage sendAddr;
#ifdef _WIN32
size_t sendAddrLength;
#else
socklen_t sendAddrLength;
#endif
UA_ConnectionManager_connectionCallback connectionCallback;
} UDP_FD;
@ -127,7 +133,7 @@ getNetworkInterfaceFromParams(size_t paramsSize, const UA_KeyValuePair *params,
/* Prepare the networkinterface string */
const UA_String *networkInterface = (const UA_String*)
UA_KeyValueMap_getScalar(params, paramsSize,
UA_QUALIFIEDNAME(0, "networkInterface"),
UA_QUALIFIEDNAME(0, "network-interface"),
&UA_TYPES[UA_TYPES_STRING]);
if(!networkInterface) {
UA_LOG_DEBUG(logger, UA_LOGCATEGORY_NETWORK,
@ -290,7 +296,8 @@ setConnectionConfig(UA_FD socket, const UA_KeyValuePair *connectionProperties,
UA_String hostnameParam = UA_STRING("hostname");
UA_String portParam = UA_STRING("port");
UA_String listenHostnamesParam = UA_STRING("listen-hostnames");
UA_String listenPortParam= UA_STRING("listen-port");
UA_String listenPortParam = UA_STRING("listen-port");
UA_String networkInterfaceParam = UA_STRING("network-interface");
#ifdef __linux__
UA_String socketPriorityParam = UA_STRING("sockpriority");
@ -330,8 +337,9 @@ setConnectionConfig(UA_FD socket, const UA_KeyValuePair *connectionProperties,
} else if (UA_String_equal(&prop->key.name, &hostnameParam) ||
UA_String_equal(&prop->key.name, &portParam) ||
UA_String_equal(&prop->key.name, &listenHostnamesParam) ||
UA_String_equal(&prop->key.name, &listenPortParam)) {
/* ignore, required args are handled elsewhere explicitly */
UA_String_equal(&prop->key.name, &listenPortParam) ||
UA_String_equal(&prop->key.name, &networkInterfaceParam)) {
/* ignore, required args are handled elsewhere explicitly */
} else {
UA_LOG_WARNING(logger, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation. Unknown connection parameter: '%.*s'.",
@ -377,6 +385,8 @@ setupSendMulticastIPv4(UA_FD socket, struct sockaddr_in *addr, size_t paramsSize
memcpy(interfaceAsChar, netif.data, netif.length);
interfaceAsChar[netif.length] = 0;
UA_String_clear(&netif);
if(UA_inet_pton(AF_INET, interfaceAsChar, &ipMulticastRequest.ipv4.imr_interface) <= 0) {
UA_LOG_ERROR(logger, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation problem. "
@ -416,6 +426,8 @@ setupListenMulticastIPv4(UA_FD socket, size_t paramsSize, const UA_KeyValuePair
memcpy(interfaceAsChar, netif.data, netif.length);
interfaceAsChar[netif.length] = 0;
UA_String_clear(&netif);
if(UA_inet_pton(AF_INET, interfaceAsChar, &ipMulticastRequest.ipv4.imr_interface) <= 0) {
UA_LOG_ERROR(logger, UA_LOGCATEGORY_SERVER,
"PubSub Connection creation problem. "
@ -680,7 +692,7 @@ UDP_connectionSocketCallback(UA_ConnectionManager *cm, UA_RegisteredFD *rfd,
"UDP %u\t| recv signaled the socket was shutdown (%s)",
(unsigned)rfd->fd, errno_str));
UDP_close(ucm, rfd);
UA_free(rfd);
UA_free((UDP_FD*)rfd);
return;
}
@ -950,7 +962,6 @@ UDP_shutdownConnection(UA_ConnectionManager *cm, uintptr_t connectionId) {
return UA_STATUSCODE_GOOD;
}
static UA_StatusCode
UDP_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId,
size_t paramsSize, const UA_KeyValuePair *params,
@ -965,6 +976,7 @@ UDP_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId,
/* Send the full buffer. This may require several calls to send */
size_t nWritten = 0;
UDP_FD *ufd = (UDP_FD *) UDP_findRegisteredFD((UDPConnectionManager *)cm, connectionId);
do {
ssize_t n = 0;
do {
@ -972,9 +984,9 @@ UDP_sendWithConnection(UA_ConnectionManager *cm, uintptr_t connectionId,
UA_LOGCATEGORY_NETWORK,
"UDP %u\t| Attempting to send", (unsigned)connectionId);
size_t bytes_to_send = buf->length - nWritten;
n = UA_send((UA_FD)connectionId,
n = UA_sendto((UA_FD)connectionId,
(const char*)buf->data + nWritten,
bytes_to_send, flags);
bytes_to_send, flags, (struct sockaddr*) &ufd->sendAddr, ufd->sendAddrLength);
if(n < 0) {
/* An error we cannot recover from? */
if(UA_ERRNO != UA_INTERRUPTED &&
@ -1051,7 +1063,7 @@ checkForSendMulticastAndConfigure(size_t paramsSize, const UA_KeyValuePair *para
static UA_StatusCode
registerSocketAndDestinationForSend(size_t paramsSize, const UA_KeyValuePair *params,
const char *hostname, struct addrinfo *info,
int error, UA_FD *sock, const UA_Logger *logger) {
int error, UDP_FD * ufd, UA_FD *sock, const UA_Logger *logger) {
UA_FD newSock = socket(info->ai_family, info->ai_socktype, info->ai_protocol);
*sock = newSock;
if(newSock == UA_INVALID_FD) {
@ -1075,18 +1087,9 @@ registerSocketAndDestinationForSend(size_t paramsSize, const UA_KeyValuePair *pa
"UDP\t| Configuring send multicast failed");
return UA_STATUSCODE_BADINTERNALERROR;
}
/* Non-blocking connect */
error = UA_connect(newSock, info->ai_addr, info->ai_addrlen);
if(error != 0 &&
UA_ERRNO != UA_INPROGRESS &&
UA_ERRNO != UA_WOULDBLOCK) {
UA_LOG_SOCKET_ERRNO_WRAP(
UA_LOG_WARNING(logger, UA_LOGCATEGORY_NETWORK,
"UDP\t| Connecting the socket to %s failed (%s)",
hostname, errno_str));
UA_close(newSock);
return UA_STATUSCODE_BADDISCONNECT;
}
memcpy(&ufd->sendAddr, info->ai_addr, info->ai_addrlen);
ufd->sendAddrLength = info->ai_addrlen;
return res;
}
@ -1114,25 +1117,26 @@ UDP_openSendConnection(UA_ConnectionManager *cm,
}
UA_LOG_DEBUG(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
"UDP\t| Open a connection to \"%s\" on port %s", hostname, portStr);
/* Create a socket and register the destination address from the provided parameters */
UA_FD newSock = UA_INVALID_FD;
UA_StatusCode res =
registerSocketAndDestinationForSend(paramsSize, params, hostname, info,
error, &newSock, el->eventLoop.logger);
UA_freeaddrinfo(info);
if(res != UA_STATUSCODE_GOOD)
return res;
/* Allocate the UA_RegisteredFD */
UDP_FD *newudpfd = (UDP_FD*)UA_calloc(1, sizeof(UDP_FD));
if(!newudpfd) {
UA_LOG_WARNING(el->eventLoop.logger, UA_LOGCATEGORY_NETWORK,
"UDP %u\t| Error allocating memory for the socket, closing",
(unsigned)newSock);
UA_close(newSock);
"UDP\t| Error allocating memory for the socket, closing");
return UA_STATUSCODE_BADOUTOFMEMORY;
}
/* Create a socket and register the destination address from the provided parameters */
UA_FD newSock = UA_INVALID_FD;
UA_StatusCode res =
registerSocketAndDestinationForSend(paramsSize, params, hostname, info,
error, newudpfd, &newSock, el->eventLoop.logger);
UA_freeaddrinfo(info);
if(res != UA_STATUSCODE_GOOD) {
UA_free(newudpfd);
return res;
}
newudpfd->fd.fd = newSock;
newudpfd->fd.es = &cm->eventSource;
newudpfd->fd.callback = (UA_FDCallback)UDP_connectionSocketCallback;

View File

@ -466,17 +466,24 @@ UA_ConnectionManager_new_POSIX_TCP(const UA_String eventSourceName);
* effect.
*
* Configuration Parameters:
* - 0:listen-port [uint16]: Port to listen for new connections (default: do not
* listen on any port).
* - 0:listen-hostnames [string | string array]: Hostnames of the devices to
* listen on (default: listen on
* all devices).
* - 0:recv-bufsize [uint32]: Size of the buffer that is allocated for receiving
* messages (default 64kB).
*
* Open Connection Parameters:
* - 0:hostname [string]: Hostname (or IPv4/v6 address) to connect to (required).
* - 0:port [uint16]: Port of the target host (required).
*
* - Active Connection
* - 0:hostname [string]: Hostname (or IPv4/v6 address) to connect to (required).
* - 0:port [uint16]: Port of the target host (required).
* - Passive Connection
* - 0:listen-port [uint16]: Port to listen for new connections (default: do not
* listen on any port).
* - 0:listen-hostnames [string | string array]: Hostnames of the devices to
* listen on (default: listen on
* all devices).
*
* - 0:network-interface [string]: Network interface to listen on or send through when using
* multicast addresses
* - 0:ttl [uint32]: Multicast time to live, (optional, default: 1 - meaning multicast is
* available only to the local subnet).
* - 0:loopback [boolean]: Whether or not to use multicast loopback, enabling