simplfy closing connections in the network layer

sockets are only closed in the main loop. if a worker wants to close a connection, it is shutdown.
this is then picked up by the select-call in the next main loop iteration.
when a connection is closed, two jobs are returned. one to immediately detach the securechannel and a delayed job that eventually frees the connection memory.
This commit is contained in:
Julius Pfrommer 2015-07-01 20:48:51 +02:00
parent 66caa20e93
commit 6ff67cf40c
3 changed files with 62 additions and 140 deletions

View File

@ -57,21 +57,30 @@ static UA_StatusCode socket_write(UA_Connection *connection, UA_ByteString *buf,
return UA_STATUSCODE_GOOD;
}
static void socket_close(UA_Connection *connection) {
connection->state = UA_CONNECTION_CLOSED;
shutdown(connection->sockfd,2);
CLOSESOCKET(connection->sockfd);
}
static UA_StatusCode socket_recv(UA_Connection *connection, UA_ByteString *response, UA_UInt32 timeout) {
response->data = malloc(connection->localConf.recvBufferSize);
if(!response->data)
return UA_STATUSCODE_BADOUTOFMEMORY;
if(!response->data) {
UA_ByteString_init(response);
return UA_STATUSCODE_GOOD; /* not enough memory retry */
}
struct timeval tmptv = {0, timeout * 1000};
if(0 != setsockopt(connection->sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tmptv, sizeof(struct timeval))){
free(response->data);
UA_ByteString_init(response);
socket_close(connection);
return UA_STATUSCODE_BADINTERNALERROR;
}
int ret = recv(connection->sockfd, (char*)response->data, connection->localConf.recvBufferSize, 0);
if(ret == 0) {
free(response->data);
UA_ByteString_init(response);
connection->close(connection);
connection->state = UA_CONNECTION_CLOSED;
socket_close(connection);
return UA_CONNECTION_CLOSED; /* ret == 0 -> server has closed the connection */
} else if(ret < 0) {
free(response->data);
@ -79,12 +88,11 @@ static UA_StatusCode socket_recv(UA_Connection *connection, UA_ByteString *respo
#ifdef _WIN32
if(WSAGetLastError() == WSAEINTR || WSAGetLastError() == WSAEWOULDBLOCK) {
#else
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if(errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
#endif
return UA_STATUSCODE_GOOD; /* retry */
} else {
connection->close(connection);
connection->state = UA_CONNECTION_CLOSED;
socket_close(connection);
return UA_STATUSCODE_BADCONNECTIONCLOSED;
}
}
@ -93,12 +101,6 @@ static UA_StatusCode socket_recv(UA_Connection *connection, UA_ByteString *respo
return UA_STATUSCODE_GOOD;
}
static void socket_close(UA_Connection *connection) {
connection->state = UA_CONNECTION_CLOSED;
shutdown(connection->sockfd,2);
CLOSESOCKET(connection->sockfd);
}
static UA_StatusCode socket_set_nonblocking(UA_Int32 sockfd) {
#ifdef _WIN32
u_long iMode = 1;
@ -112,6 +114,11 @@ static UA_StatusCode socket_set_nonblocking(UA_Int32 sockfd) {
return UA_STATUSCODE_GOOD;
}
static void FreeConnectionCallback(UA_Server *server, void *ptr) {
UA_Connection_deleteMembers((UA_Connection*)ptr);
free(ptr);
}
/***************************/
/* Server NetworkLayer TCP */
/***************************/
@ -167,12 +174,6 @@ typedef struct {
UA_Connection *connection;
UA_Int32 sockfd;
} *mappings;
/* to-be-deleted connections */
struct DeleteList {
struct DeleteList *next;
UA_Connection *connection;
} *deletes;
} ServerNetworkLayerTCP;
static UA_StatusCode ServerNetworkLayerGetBuffer(UA_Connection *connection, UA_ByteString *buf) {
@ -221,23 +222,8 @@ static void ServerNetworkLayerTCP_closeConnection(UA_Connection *connection) {
return;
connection->state = UA_CONNECTION_CLOSED;
#endif
socket_close(connection);
ServerNetworkLayerTCP *layer = (ServerNetworkLayerTCP*)connection->handle;
struct DeleteList *d = malloc(sizeof(struct DeleteList));
if(!d){
return;
}
d->connection = connection;
#ifdef UA_MULTITHREADING
while(1) {
d->next = layer->deletes;
if(uatomic_cmpxchg(&layer->deletes, d->next, d) == d->next)
break;
}
#else
d->next = layer->deletes;
layer->deletes = d;
#endif
shutdown(connection->sockfd, 2); /* only shut down here. this triggers the select, where the socket
is closed in the main thread */
}
/* call only from the single networking thread */
@ -305,48 +291,19 @@ static UA_StatusCode ServerNetworkLayerTCP_start(UA_ServerNetworkLayer *nl, UA_L
return UA_STATUSCODE_GOOD;
}
/* delayed callback that frees old connections */
static void freeConnections(UA_Server *server, struct DeleteList *d) {
while(d) {
UA_Connection_deleteMembers(d->connection);
free(d->connection);
struct DeleteList *old = d;
d = d->next;
free(old);
}
}
/* remove the closed sockets from the mappings array */
static void removeMappings(ServerNetworkLayerTCP *layer, struct DeleteList *d) {
while(d) {
size_t i = 0;
for(; i < layer->mappingsSize; i++) {
if(layer->mappings[i].sockfd == d->connection->sockfd)
break;
}
if(i >= layer->mappingsSize)
continue;
layer->mappingsSize--;
layer->mappings[i] = layer->mappings[layer->mappingsSize];
d = d->next;
}
}
static UA_Int32 ServerNetworkLayerTCP_getJobs(UA_ServerNetworkLayer *nl, UA_Job **jobs, UA_UInt16 timeout) {
ServerNetworkLayerTCP *layer = nl->handle;
/* remove the deleted sockets from the array */
struct DeleteList *deletes;
#ifdef UA_MULTITHREADING
deletes = uatomic_xchg(&layer->deletes, NULL);
#else
deletes = layer->deletes;
layer->deletes = NULL;
#endif
removeMappings(layer, deletes);
setFDSet(layer);
struct timeval tmptv = {0, timeout};
UA_Int32 resultsize = select(layer->highestfd+1, &layer->fdset, NULL, NULL, &tmptv);
UA_Int32 resultsize;
repeat_select:
resultsize = select(layer->highestfd+1, &layer->fdset, NULL, NULL, &tmptv);
if(resultsize < 0) {
if(errno == EINTR)
goto repeat_select;
*jobs = NULL;
return resultsize;
}
/* accept new connections (can only be a single one) */
if(FD_ISSET(layer->serversockfd, &layer->fdset)) {
@ -356,40 +313,18 @@ static UA_Int32 ServerNetworkLayerTCP_getJobs(UA_ServerNetworkLayer *nl, UA_Job
int newsockfd = accept(layer->serversockfd, (struct sockaddr *) &cli_addr, &cli_len);
int i = 1;
setsockopt(newsockfd, IPPROTO_TCP, TCP_NODELAY, (void *)&i, sizeof(i));
if (newsockfd >= 0) {
if(newsockfd >= 0) {
socket_set_nonblocking(newsockfd);
ServerNetworkLayerTCP_add(layer, newsockfd);
}
}
if(!deletes && resultsize <= 0) {
*jobs = NULL;
/* alloc enough space for a cleanup-connection and free-connection job per resulted socket */
if(resultsize == 0)
return 0;
UA_Job *js = malloc(sizeof(UA_Job) * resultsize * 2);
if(!js)
return 0;
}
if(resultsize < 0)
resultsize = 0;
UA_Int32 deletesJob = 0;
if(deletes)
deletesJob = 1;
UA_Job *items = malloc(sizeof(UA_Job) * (resultsize + deletesJob));
if(deletes && !items) {
/* abort. reattach the deletes so that they get deleted eventually. */
#ifdef UA_MULTITHREADING
struct DeleteList *last_delete;
while(deletes) {
last_delete = deletes;
deletes = deletes->next;
}
while(1) {
last_delete->next = layer->deletes;
if(uatomic_cmpxchg(&layer->deletes, last_delete->next, deletes) == last_delete->next)
break;
}
#else
layer->deletes = deletes;
#endif
}
/* read from established sockets */
UA_Int32 j = 0;
@ -400,63 +335,52 @@ static UA_Int32 ServerNetworkLayerTCP_getJobs(UA_ServerNetworkLayer *nl, UA_Job
if(socket_recv(layer->mappings[i].connection, &buf, 0) == UA_STATUSCODE_GOOD) {
if(!buf.data)
continue;
items[j].type = UA_JOBTYPE_BINARYMESSAGE;
items[j].job.binaryMessage.message = buf;
items[j].job.binaryMessage.connection = layer->mappings[i].connection;
buf.data = NULL;
js[j].type = UA_JOBTYPE_BINARYMESSAGE;
js[j].job.binaryMessage.message = buf;
js[j].job.binaryMessage.connection = layer->mappings[i].connection;
} else {
items[j].type = UA_JOBTYPE_CLOSECONNECTION;
items[j].job.closeConnection = layer->mappings[i].connection;
UA_Connection *c = layer->mappings[i].connection;
/* the socket is already closed */
js[j].type = UA_JOBTYPE_DETACHCONNECTION;
js[j].job.closeConnection = layer->mappings[i].connection;
layer->mappings[i] = layer->mappings[layer->mappingsSize-1];
layer->mappingsSize--;
j++;
i--; // iterate over the same index again
js[j].type = UA_JOBTYPE_DELAYEDMETHODCALL;
js[j].job.methodCall.method = FreeConnectionCallback;
js[j].job.methodCall.data = c;
}
j++;
}
/* add the delayed job that frees the connections */
if(deletes) {
items[j].type = UA_JOBTYPE_DELAYEDMETHODCALL;
items[j].job.methodCall.data = deletes;
items[j].job.methodCall.method = (void (*)(UA_Server *server, void *data))freeConnections;
j++;
}
/* free the array if there is no job */
if(j == 0) {
free(items);
*jobs = NULL;
} else
*jobs = items;
*jobs = js;
return j;
}
static UA_Int32 ServerNetworkLayerTCP_stop(UA_ServerNetworkLayer *nl, UA_Job **jobs) {
ServerNetworkLayerTCP *layer = nl->handle;
struct DeleteList *deletes;
#ifdef UA_MULTITHREADING
deletes = uatomic_xchg(&layer->deletes, NULL);
#else
deletes = layer->deletes;
layer->deletes = NULL;
#endif
removeMappings(layer, deletes);
UA_Job *items = malloc(sizeof(UA_Job) * layer->mappingsSize);
UA_Job *items = malloc(sizeof(UA_Job) * layer->mappingsSize * 2);
if(!items)
return 0;
for(size_t i = 0; i < layer->mappingsSize; i++) {
items[i].type = UA_JOBTYPE_CLOSECONNECTION;
items[i].job.closeConnection = layer->mappings[i].connection;
socket_close(layer->mappings[i].connection);
items[i*2].type = UA_JOBTYPE_DETACHCONNECTION;
items[i*2].job.closeConnection = layer->mappings[i].connection;
items[(i*2)+1].type = UA_JOBTYPE_DELAYEDMETHODCALL;
items[(i*2)+1].job.methodCall.method = FreeConnectionCallback;
items[(i*2)+1].job.methodCall.data = layer->mappings[i].connection;
}
#ifdef _WIN32
WSACleanup();
#endif
*jobs = items;
return layer->mappingsSize;
return layer->mappingsSize*2;
}
/* run only when the server is stopped */
static void ServerNetworkLayerTCP_deleteMembers(UA_ServerNetworkLayer *nl) {
ServerNetworkLayerTCP *layer = nl->handle;
removeMappings(layer, layer->deletes);
freeConnections(NULL, layer->deletes);
#ifndef UA_MULTITHREADING
UA_ByteString_deleteMembers(&layer->buffer);
#endif
@ -484,7 +408,6 @@ UA_ServerNetworkLayer ServerNetworkLayerTCP_new(UA_ConnectionConfig conf, UA_UIn
layer->mappingsSize = 0;
layer->mappings = NULL;
layer->port = port;
layer->deletes = NULL;
char hostname[256];
gethostname(hostname, 255);
UA_String_copyprintf("opc.tcp://%s:%d", &nl.discoveryUrl, hostname, port);

View File

@ -146,7 +146,7 @@ UA_Server_AddMonodirectionalReference(UA_Server *server, UA_NodeId sourceNodeId,
typedef struct {
enum {
UA_JOBTYPE_NOTHING,
UA_JOBTYPE_CLOSECONNECTION,
UA_JOBTYPE_DETACHCONNECTION,
UA_JOBTYPE_BINARYMESSAGE,
UA_JOBTYPE_METHODCALL,
UA_JOBTYPE_DELAYEDMETHODCALL,

View File

@ -35,9 +35,8 @@ static void processJobs(UA_Server *server, UA_Job *jobs, size_t jobsSize) {
UA_Server_processBinaryMessage(server, job->job.binaryMessage.connection,
&job->job.binaryMessage.message);
break;
case UA_JOBTYPE_CLOSECONNECTION:
case UA_JOBTYPE_DETACHCONNECTION:
UA_Connection_detachSecureChannel(job->job.closeConnection);
job->job.closeConnection->close(job->job.closeConnection);
break;
case UA_JOBTYPE_METHODCALL:
case UA_JOBTYPE_DELAYEDMETHODCALL: