mirror of
https://github.com/open62541/open62541.git
synced 2025-06-03 04:00:21 +00:00
simplfy closing connections in the network layer
sockets are only closed in the main loop. if a worker wants to close a connection, it is shutdown. this is then picked up by the select-call in the next main loop iteration. when a connection is closed, two jobs are returned. one to immediately detach the securechannel and a delayed job that eventually frees the connection memory.
This commit is contained in:
parent
66caa20e93
commit
6ff67cf40c
@ -57,21 +57,30 @@ static UA_StatusCode socket_write(UA_Connection *connection, UA_ByteString *buf,
|
||||
return UA_STATUSCODE_GOOD;
|
||||
}
|
||||
|
||||
static void socket_close(UA_Connection *connection) {
|
||||
connection->state = UA_CONNECTION_CLOSED;
|
||||
shutdown(connection->sockfd,2);
|
||||
CLOSESOCKET(connection->sockfd);
|
||||
}
|
||||
|
||||
static UA_StatusCode socket_recv(UA_Connection *connection, UA_ByteString *response, UA_UInt32 timeout) {
|
||||
response->data = malloc(connection->localConf.recvBufferSize);
|
||||
if(!response->data)
|
||||
return UA_STATUSCODE_BADOUTOFMEMORY;
|
||||
if(!response->data) {
|
||||
UA_ByteString_init(response);
|
||||
return UA_STATUSCODE_GOOD; /* not enough memory retry */
|
||||
}
|
||||
struct timeval tmptv = {0, timeout * 1000};
|
||||
if(0 != setsockopt(connection->sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tmptv, sizeof(struct timeval))){
|
||||
free(response->data);
|
||||
UA_ByteString_init(response);
|
||||
socket_close(connection);
|
||||
return UA_STATUSCODE_BADINTERNALERROR;
|
||||
}
|
||||
int ret = recv(connection->sockfd, (char*)response->data, connection->localConf.recvBufferSize, 0);
|
||||
if(ret == 0) {
|
||||
free(response->data);
|
||||
UA_ByteString_init(response);
|
||||
connection->close(connection);
|
||||
connection->state = UA_CONNECTION_CLOSED;
|
||||
socket_close(connection);
|
||||
return UA_CONNECTION_CLOSED; /* ret == 0 -> server has closed the connection */
|
||||
} else if(ret < 0) {
|
||||
free(response->data);
|
||||
@ -79,12 +88,11 @@ static UA_StatusCode socket_recv(UA_Connection *connection, UA_ByteString *respo
|
||||
#ifdef _WIN32
|
||||
if(WSAGetLastError() == WSAEINTR || WSAGetLastError() == WSAEWOULDBLOCK) {
|
||||
#else
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
if(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
#endif
|
||||
return UA_STATUSCODE_GOOD; /* retry */
|
||||
} else {
|
||||
connection->close(connection);
|
||||
connection->state = UA_CONNECTION_CLOSED;
|
||||
socket_close(connection);
|
||||
return UA_STATUSCODE_BADCONNECTIONCLOSED;
|
||||
}
|
||||
}
|
||||
@ -93,12 +101,6 @@ static UA_StatusCode socket_recv(UA_Connection *connection, UA_ByteString *respo
|
||||
return UA_STATUSCODE_GOOD;
|
||||
}
|
||||
|
||||
static void socket_close(UA_Connection *connection) {
|
||||
connection->state = UA_CONNECTION_CLOSED;
|
||||
shutdown(connection->sockfd,2);
|
||||
CLOSESOCKET(connection->sockfd);
|
||||
}
|
||||
|
||||
static UA_StatusCode socket_set_nonblocking(UA_Int32 sockfd) {
|
||||
#ifdef _WIN32
|
||||
u_long iMode = 1;
|
||||
@ -112,6 +114,11 @@ static UA_StatusCode socket_set_nonblocking(UA_Int32 sockfd) {
|
||||
return UA_STATUSCODE_GOOD;
|
||||
}
|
||||
|
||||
static void FreeConnectionCallback(UA_Server *server, void *ptr) {
|
||||
UA_Connection_deleteMembers((UA_Connection*)ptr);
|
||||
free(ptr);
|
||||
}
|
||||
|
||||
/***************************/
|
||||
/* Server NetworkLayer TCP */
|
||||
/***************************/
|
||||
@ -167,12 +174,6 @@ typedef struct {
|
||||
UA_Connection *connection;
|
||||
UA_Int32 sockfd;
|
||||
} *mappings;
|
||||
|
||||
/* to-be-deleted connections */
|
||||
struct DeleteList {
|
||||
struct DeleteList *next;
|
||||
UA_Connection *connection;
|
||||
} *deletes;
|
||||
} ServerNetworkLayerTCP;
|
||||
|
||||
static UA_StatusCode ServerNetworkLayerGetBuffer(UA_Connection *connection, UA_ByteString *buf) {
|
||||
@ -221,23 +222,8 @@ static void ServerNetworkLayerTCP_closeConnection(UA_Connection *connection) {
|
||||
return;
|
||||
connection->state = UA_CONNECTION_CLOSED;
|
||||
#endif
|
||||
socket_close(connection);
|
||||
ServerNetworkLayerTCP *layer = (ServerNetworkLayerTCP*)connection->handle;
|
||||
struct DeleteList *d = malloc(sizeof(struct DeleteList));
|
||||
if(!d){
|
||||
return;
|
||||
}
|
||||
d->connection = connection;
|
||||
#ifdef UA_MULTITHREADING
|
||||
while(1) {
|
||||
d->next = layer->deletes;
|
||||
if(uatomic_cmpxchg(&layer->deletes, d->next, d) == d->next)
|
||||
break;
|
||||
}
|
||||
#else
|
||||
d->next = layer->deletes;
|
||||
layer->deletes = d;
|
||||
#endif
|
||||
shutdown(connection->sockfd, 2); /* only shut down here. this triggers the select, where the socket
|
||||
is closed in the main thread */
|
||||
}
|
||||
|
||||
/* call only from the single networking thread */
|
||||
@ -305,48 +291,19 @@ static UA_StatusCode ServerNetworkLayerTCP_start(UA_ServerNetworkLayer *nl, UA_L
|
||||
return UA_STATUSCODE_GOOD;
|
||||
}
|
||||
|
||||
/* delayed callback that frees old connections */
|
||||
static void freeConnections(UA_Server *server, struct DeleteList *d) {
|
||||
while(d) {
|
||||
UA_Connection_deleteMembers(d->connection);
|
||||
free(d->connection);
|
||||
struct DeleteList *old = d;
|
||||
d = d->next;
|
||||
free(old);
|
||||
}
|
||||
}
|
||||
|
||||
/* remove the closed sockets from the mappings array */
|
||||
static void removeMappings(ServerNetworkLayerTCP *layer, struct DeleteList *d) {
|
||||
while(d) {
|
||||
size_t i = 0;
|
||||
for(; i < layer->mappingsSize; i++) {
|
||||
if(layer->mappings[i].sockfd == d->connection->sockfd)
|
||||
break;
|
||||
}
|
||||
if(i >= layer->mappingsSize)
|
||||
continue;
|
||||
layer->mappingsSize--;
|
||||
layer->mappings[i] = layer->mappings[layer->mappingsSize];
|
||||
d = d->next;
|
||||
}
|
||||
}
|
||||
|
||||
static UA_Int32 ServerNetworkLayerTCP_getJobs(UA_ServerNetworkLayer *nl, UA_Job **jobs, UA_UInt16 timeout) {
|
||||
ServerNetworkLayerTCP *layer = nl->handle;
|
||||
/* remove the deleted sockets from the array */
|
||||
struct DeleteList *deletes;
|
||||
#ifdef UA_MULTITHREADING
|
||||
deletes = uatomic_xchg(&layer->deletes, NULL);
|
||||
#else
|
||||
deletes = layer->deletes;
|
||||
layer->deletes = NULL;
|
||||
#endif
|
||||
removeMappings(layer, deletes);
|
||||
|
||||
setFDSet(layer);
|
||||
struct timeval tmptv = {0, timeout};
|
||||
UA_Int32 resultsize = select(layer->highestfd+1, &layer->fdset, NULL, NULL, &tmptv);
|
||||
UA_Int32 resultsize;
|
||||
repeat_select:
|
||||
resultsize = select(layer->highestfd+1, &layer->fdset, NULL, NULL, &tmptv);
|
||||
if(resultsize < 0) {
|
||||
if(errno == EINTR)
|
||||
goto repeat_select;
|
||||
*jobs = NULL;
|
||||
return resultsize;
|
||||
}
|
||||
|
||||
/* accept new connections (can only be a single one) */
|
||||
if(FD_ISSET(layer->serversockfd, &layer->fdset)) {
|
||||
@ -356,40 +313,18 @@ static UA_Int32 ServerNetworkLayerTCP_getJobs(UA_ServerNetworkLayer *nl, UA_Job
|
||||
int newsockfd = accept(layer->serversockfd, (struct sockaddr *) &cli_addr, &cli_len);
|
||||
int i = 1;
|
||||
setsockopt(newsockfd, IPPROTO_TCP, TCP_NODELAY, (void *)&i, sizeof(i));
|
||||
if (newsockfd >= 0) {
|
||||
if(newsockfd >= 0) {
|
||||
socket_set_nonblocking(newsockfd);
|
||||
ServerNetworkLayerTCP_add(layer, newsockfd);
|
||||
}
|
||||
}
|
||||
|
||||
if(!deletes && resultsize <= 0) {
|
||||
*jobs = NULL;
|
||||
/* alloc enough space for a cleanup-connection and free-connection job per resulted socket */
|
||||
if(resultsize == 0)
|
||||
return 0;
|
||||
UA_Job *js = malloc(sizeof(UA_Job) * resultsize * 2);
|
||||
if(!js)
|
||||
return 0;
|
||||
}
|
||||
if(resultsize < 0)
|
||||
resultsize = 0;
|
||||
UA_Int32 deletesJob = 0;
|
||||
if(deletes)
|
||||
deletesJob = 1;
|
||||
|
||||
UA_Job *items = malloc(sizeof(UA_Job) * (resultsize + deletesJob));
|
||||
if(deletes && !items) {
|
||||
/* abort. reattach the deletes so that they get deleted eventually. */
|
||||
#ifdef UA_MULTITHREADING
|
||||
struct DeleteList *last_delete;
|
||||
while(deletes) {
|
||||
last_delete = deletes;
|
||||
deletes = deletes->next;
|
||||
}
|
||||
while(1) {
|
||||
last_delete->next = layer->deletes;
|
||||
if(uatomic_cmpxchg(&layer->deletes, last_delete->next, deletes) == last_delete->next)
|
||||
break;
|
||||
}
|
||||
#else
|
||||
layer->deletes = deletes;
|
||||
#endif
|
||||
}
|
||||
|
||||
/* read from established sockets */
|
||||
UA_Int32 j = 0;
|
||||
@ -400,63 +335,52 @@ static UA_Int32 ServerNetworkLayerTCP_getJobs(UA_ServerNetworkLayer *nl, UA_Job
|
||||
if(socket_recv(layer->mappings[i].connection, &buf, 0) == UA_STATUSCODE_GOOD) {
|
||||
if(!buf.data)
|
||||
continue;
|
||||
items[j].type = UA_JOBTYPE_BINARYMESSAGE;
|
||||
items[j].job.binaryMessage.message = buf;
|
||||
items[j].job.binaryMessage.connection = layer->mappings[i].connection;
|
||||
buf.data = NULL;
|
||||
js[j].type = UA_JOBTYPE_BINARYMESSAGE;
|
||||
js[j].job.binaryMessage.message = buf;
|
||||
js[j].job.binaryMessage.connection = layer->mappings[i].connection;
|
||||
} else {
|
||||
items[j].type = UA_JOBTYPE_CLOSECONNECTION;
|
||||
items[j].job.closeConnection = layer->mappings[i].connection;
|
||||
UA_Connection *c = layer->mappings[i].connection;
|
||||
/* the socket is already closed */
|
||||
js[j].type = UA_JOBTYPE_DETACHCONNECTION;
|
||||
js[j].job.closeConnection = layer->mappings[i].connection;
|
||||
layer->mappings[i] = layer->mappings[layer->mappingsSize-1];
|
||||
layer->mappingsSize--;
|
||||
j++;
|
||||
i--; // iterate over the same index again
|
||||
js[j].type = UA_JOBTYPE_DELAYEDMETHODCALL;
|
||||
js[j].job.methodCall.method = FreeConnectionCallback;
|
||||
js[j].job.methodCall.data = c;
|
||||
}
|
||||
j++;
|
||||
}
|
||||
|
||||
/* add the delayed job that frees the connections */
|
||||
if(deletes) {
|
||||
items[j].type = UA_JOBTYPE_DELAYEDMETHODCALL;
|
||||
items[j].job.methodCall.data = deletes;
|
||||
items[j].job.methodCall.method = (void (*)(UA_Server *server, void *data))freeConnections;
|
||||
j++;
|
||||
}
|
||||
|
||||
/* free the array if there is no job */
|
||||
if(j == 0) {
|
||||
free(items);
|
||||
*jobs = NULL;
|
||||
} else
|
||||
*jobs = items;
|
||||
*jobs = js;
|
||||
return j;
|
||||
}
|
||||
|
||||
static UA_Int32 ServerNetworkLayerTCP_stop(UA_ServerNetworkLayer *nl, UA_Job **jobs) {
|
||||
ServerNetworkLayerTCP *layer = nl->handle;
|
||||
struct DeleteList *deletes;
|
||||
#ifdef UA_MULTITHREADING
|
||||
deletes = uatomic_xchg(&layer->deletes, NULL);
|
||||
#else
|
||||
deletes = layer->deletes;
|
||||
layer->deletes = NULL;
|
||||
#endif
|
||||
removeMappings(layer, deletes);
|
||||
UA_Job *items = malloc(sizeof(UA_Job) * layer->mappingsSize);
|
||||
UA_Job *items = malloc(sizeof(UA_Job) * layer->mappingsSize * 2);
|
||||
if(!items)
|
||||
return 0;
|
||||
for(size_t i = 0; i < layer->mappingsSize; i++) {
|
||||
items[i].type = UA_JOBTYPE_CLOSECONNECTION;
|
||||
items[i].job.closeConnection = layer->mappings[i].connection;
|
||||
socket_close(layer->mappings[i].connection);
|
||||
items[i*2].type = UA_JOBTYPE_DETACHCONNECTION;
|
||||
items[i*2].job.closeConnection = layer->mappings[i].connection;
|
||||
items[(i*2)+1].type = UA_JOBTYPE_DELAYEDMETHODCALL;
|
||||
items[(i*2)+1].job.methodCall.method = FreeConnectionCallback;
|
||||
items[(i*2)+1].job.methodCall.data = layer->mappings[i].connection;
|
||||
}
|
||||
#ifdef _WIN32
|
||||
WSACleanup();
|
||||
#endif
|
||||
*jobs = items;
|
||||
return layer->mappingsSize;
|
||||
return layer->mappingsSize*2;
|
||||
}
|
||||
|
||||
/* run only when the server is stopped */
|
||||
static void ServerNetworkLayerTCP_deleteMembers(UA_ServerNetworkLayer *nl) {
|
||||
ServerNetworkLayerTCP *layer = nl->handle;
|
||||
removeMappings(layer, layer->deletes);
|
||||
freeConnections(NULL, layer->deletes);
|
||||
#ifndef UA_MULTITHREADING
|
||||
UA_ByteString_deleteMembers(&layer->buffer);
|
||||
#endif
|
||||
@ -484,7 +408,6 @@ UA_ServerNetworkLayer ServerNetworkLayerTCP_new(UA_ConnectionConfig conf, UA_UIn
|
||||
layer->mappingsSize = 0;
|
||||
layer->mappings = NULL;
|
||||
layer->port = port;
|
||||
layer->deletes = NULL;
|
||||
char hostname[256];
|
||||
gethostname(hostname, 255);
|
||||
UA_String_copyprintf("opc.tcp://%s:%d", &nl.discoveryUrl, hostname, port);
|
||||
|
@ -146,7 +146,7 @@ UA_Server_AddMonodirectionalReference(UA_Server *server, UA_NodeId sourceNodeId,
|
||||
typedef struct {
|
||||
enum {
|
||||
UA_JOBTYPE_NOTHING,
|
||||
UA_JOBTYPE_CLOSECONNECTION,
|
||||
UA_JOBTYPE_DETACHCONNECTION,
|
||||
UA_JOBTYPE_BINARYMESSAGE,
|
||||
UA_JOBTYPE_METHODCALL,
|
||||
UA_JOBTYPE_DELAYEDMETHODCALL,
|
||||
|
@ -35,9 +35,8 @@ static void processJobs(UA_Server *server, UA_Job *jobs, size_t jobsSize) {
|
||||
UA_Server_processBinaryMessage(server, job->job.binaryMessage.connection,
|
||||
&job->job.binaryMessage.message);
|
||||
break;
|
||||
case UA_JOBTYPE_CLOSECONNECTION:
|
||||
case UA_JOBTYPE_DETACHCONNECTION:
|
||||
UA_Connection_detachSecureChannel(job->job.closeConnection);
|
||||
job->job.closeConnection->close(job->job.closeConnection);
|
||||
break;
|
||||
case UA_JOBTYPE_METHODCALL:
|
||||
case UA_JOBTYPE_DELAYEDMETHODCALL:
|
||||
|
Loading…
Reference in New Issue
Block a user