diff --git a/CMakeLists.txt b/CMakeLists.txt index 5e1115f8b..3af37cf5d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -506,6 +506,7 @@ set(exported_headers ${exported_headers} ${PROJECT_SOURCE_DIR}/include/ua_client_highlevel_async.h) set(internal_headers ${PROJECT_SOURCE_DIR}/deps/open62541_queue.h + ${PROJECT_SOURCE_DIR}/deps/ziptree.h ${PROJECT_SOURCE_DIR}/deps/pcg_basic.h ${PROJECT_SOURCE_DIR}/deps/libc_time.h ${PROJECT_SOURCE_DIR}/deps/base64.h diff --git a/include/ua_client.h b/include/ua_client.h index fb8239686..2dbdb3309 100644 --- a/include/ua_client.h +++ b/include/ua_client.h @@ -468,17 +468,35 @@ __UA_Client_AsyncServiceEx(UA_Client *client, const void *request, UA_UInt32 timeout); /** - * Repeated Callbacks - * ------------------ + * Timed Callbacks + * --------------- * Repeated callbacks can be attached to a client and will be executed in the * defined interval. */ typedef void (*UA_ClientCallback)(UA_Client *client, void *data); +/* Add a callback for execution at a specified time. If the indicated time lies + * in the past, then the callback is executed at the next iteration of the + * server's main loop. + * + * @param client The client object. + * @param callback The callback that shall be added. + * @param data Data that is forwarded to the callback. + * @param date The timestamp for the execution time. + * @param callbackId Set to the identifier of the repeated callback . This can + * be used to cancel the callback later on. If the pointer is null, the + * identifier is not set. + * @return Upon success, UA_STATUSCODE_GOOD is returned. An error code + * otherwise. */ +UA_StatusCode UA_EXPORT +UA_Client_addTimedCallback(UA_Client *client, UA_ClientCallback callback, + void *data, UA_DateTime date, UA_UInt64 *callbackId); + /* Add a callback for cyclic repetition to the client. * * @param client The client object. * @param callback The callback that shall be added. + * @param data Data that is forwarded to the callback. * @param interval_ms The callback shall be repeatedly executed with the given * interval (in ms). The interval must be positive. The first execution * occurs at now() + interval at the latest. @@ -487,19 +505,21 @@ typedef void (*UA_ClientCallback)(UA_Client *client, void *data); * identifier is not set. * @return Upon success, UA_STATUSCODE_GOOD is returned. An error code * otherwise. */ -UA_StatusCode -UA_Client_addRepeatedCallback(UA_Client *client, - UA_ClientCallback callback, +UA_StatusCode UA_EXPORT +UA_Client_addRepeatedCallback(UA_Client *client, UA_ClientCallback callback, void *data, UA_Double interval_ms, UA_UInt64 *callbackId); -UA_StatusCode +UA_StatusCode UA_EXPORT UA_Client_changeRepeatedCallbackInterval(UA_Client *client, UA_UInt64 callbackId, UA_Double interval_ms); -UA_StatusCode UA_Client_removeRepeatedCallback(UA_Client *client, - UA_UInt64 callbackId); +void UA_EXPORT +UA_Client_removeCallback(UA_Client *client, UA_UInt64 callbackId); + +#define UA_Client_removeRepeatedCallback(client, callbackId) \ + UA_Client_removeCallback(client, callbackId) /** * .. toctree:: diff --git a/include/ua_server.h b/include/ua_server.h index afa270706..366870497 100644 --- a/include/ua_server.h +++ b/include/ua_server.h @@ -77,14 +77,32 @@ UA_StatusCode UA_EXPORT UA_Server_run_shutdown(UA_Server *server); /** - * Repeated Callbacks - * ------------------ */ + * Timed Callbacks + * --------------- */ typedef void (*UA_ServerCallback)(UA_Server *server, void *data); +/* Add a callback for execution at a specified time. If the indicated time lies + * in the past, then the callback is executed at the next iteration of the + * server's main loop. + * + * @param server The server object. + * @param callback The callback that shall be added. + * @param data Data that is forwarded to the callback. + * @param date The timestamp for the execution time. + * @param callbackId Set to the identifier of the repeated callback . This can + * be used to cancel the callback later on. If the pointer is null, the + * identifier is not set. + * @return Upon success, UA_STATUSCODE_GOOD is returned. An error code + * otherwise. */ +UA_StatusCode UA_EXPORT +UA_Server_addTimedCallback(UA_Server *server, UA_ServerCallback callback, + void *data, UA_DateTime date, UA_UInt64 *callbackId); + /* Add a callback for cyclic repetition to the server. * * @param server The server object. * @param callback The callback that shall be added. + * @param data Data that is forwarded to the callback. * @param interval_ms The callback shall be repeatedly executed with the given * interval (in ms). The interval must be positive. The first execution * occurs at now() + interval at the latest. @@ -101,14 +119,15 @@ UA_StatusCode UA_EXPORT UA_Server_changeRepeatedCallbackInterval(UA_Server *server, UA_UInt64 callbackId, UA_Double interval_ms); -/* Remove a repeated callback. +/* Remove a repeated callback. Does nothing if the callback is not found. * * @param server The server object. - * @param callbackId The id of the callback that shall be removed. - * @return Upon success, UA_STATUSCODE_GOOD is returned. - * An error code otherwise. */ -UA_StatusCode UA_EXPORT -UA_Server_removeRepeatedCallback(UA_Server *server, UA_UInt64 callbackId); + * @param callbackId The id of the callback */ +void UA_EXPORT +UA_Server_removeCallback(UA_Server *server, UA_UInt64 callbackId); + +#define UA_Server_removeRepeatedCallback(server, callbackId) \ + UA_Server_removeCallback(server, callbackId); /** * Reading and Writing Node Attributes diff --git a/src/client/ua_client.c b/src/client/ua_client.c index 0f8b7a49a..c6e61370f 100644 --- a/src/client/ua_client.c +++ b/src/client/ua_client.c @@ -631,13 +631,18 @@ UA_Client_sendAsyncRequest(UA_Client *client, const void *request, responseType, userdata, requestId); } +UA_StatusCode UA_EXPORT +UA_Client_addTimedCallback(UA_Client *client, UA_ClientCallback callback, + void *data, UA_DateTime date, UA_UInt64 *callbackId) { + return UA_Timer_addTimedCallback(&client->timer, (UA_ApplicationCallback) callback, + client, data, date, callbackId); +} + UA_StatusCode UA_Client_addRepeatedCallback(UA_Client *client, UA_ClientCallback callback, - void *data, UA_Double interval_ms, - UA_UInt64 *callbackId) { - return UA_Timer_addRepeatedCallback(&client->timer, - (UA_ApplicationCallback) callback, client, data, - interval_ms, callbackId); + void *data, UA_Double interval_ms, UA_UInt64 *callbackId) { + return UA_Timer_addRepeatedCallback(&client->timer, (UA_ApplicationCallback) callback, + client, data, interval_ms, callbackId); } UA_StatusCode @@ -647,7 +652,7 @@ UA_Client_changeRepeatedCallbackInterval(UA_Client *client, UA_UInt64 callbackId interval_ms); } -UA_StatusCode -UA_Client_removeRepeatedCallback(UA_Client *client, UA_UInt64 callbackId) { - return UA_Timer_removeRepeatedCallback(&client->timer, callbackId); +void +UA_Client_removeCallback(UA_Client *client, UA_UInt64 callbackId) { + UA_Timer_removeCallback(&client->timer, callbackId); } diff --git a/src/pubsub/ua_pubsub.c b/src/pubsub/ua_pubsub.c index d300e5a32..81ad1a682 100644 --- a/src/pubsub/ua_pubsub.c +++ b/src/pubsub/ua_pubsub.c @@ -158,8 +158,7 @@ UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){ return UA_STATUSCODE_BADNOTFOUND; //unregister the publish callback - if(UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId) != UA_STATUSCODE_GOOD) - return UA_STATUSCODE_BADINTERNALERROR; + UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId); #ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL removeGroupRepresentation(server, wg); #endif diff --git a/src/pubsub/ua_pubsub_manager.c b/src/pubsub/ua_pubsub_manager.c index 2c0867dca..94ebdf302 100644 --- a/src/pubsub/ua_pubsub_manager.c +++ b/src/pubsub/ua_pubsub_manager.c @@ -308,6 +308,7 @@ UA_PubSubManager_delete(UA_Server *server, UA_PubSubManager *pubSubManager) { /***********************************/ /* PubSub Jobs abstraction */ /***********************************/ + UA_StatusCode UA_PubSubManager_addRepeatedCallback(UA_Server *server, UA_ServerCallback callback, void *data, UA_Double interval_ms, UA_UInt64 *callbackId) { @@ -321,9 +322,9 @@ UA_PubSubManager_changeRepeatedCallbackInterval(UA_Server *server, UA_UInt64 cal return UA_Timer_changeRepeatedCallbackInterval(&server->timer, callbackId, interval_ms); } -UA_StatusCode +void UA_PubSubManager_removeRepeatedPubSubCallback(UA_Server *server, UA_UInt64 callbackId) { - return UA_Timer_removeRepeatedCallback(&server->timer, callbackId); + UA_Timer_removeCallback(&server->timer, callbackId); } #endif /* UA_ENABLE_PUBSUB */ diff --git a/src/pubsub/ua_pubsub_manager.h b/src/pubsub/ua_pubsub_manager.h index 9bf86293e..2aa7b9f4b 100644 --- a/src/pubsub/ua_pubsub_manager.h +++ b/src/pubsub/ua_pubsub_manager.h @@ -41,7 +41,7 @@ UA_PubSubManager_addRepeatedCallback(UA_Server *server, UA_ServerCallback callba UA_StatusCode UA_PubSubManager_changeRepeatedCallbackInterval(UA_Server *server, UA_UInt64 callbackId, UA_Double interval_ms); -UA_StatusCode +void UA_PubSubManager_removeRepeatedPubSubCallback(UA_Server *server, UA_UInt64 callbackId); #endif /* UA_ENABLE_PUBSUB */ diff --git a/src/server/ua_server.c b/src/server/ua_server.c index 778063452..d0f9ef631 100644 --- a/src/server/ua_server.c +++ b/src/server/ua_server.c @@ -258,9 +258,17 @@ UA_Server_new(const UA_ServerConfig *config) { return server; } -/*****************/ -/* Repeated Jobs */ -/*****************/ +/*******************/ +/* Timed Callbacks */ +/*******************/ + +UA_StatusCode +UA_Server_addTimedCallback(UA_Server *server, UA_ServerCallback callback, + void *data, UA_DateTime date, UA_UInt64 *callbackId) { + return UA_Timer_addTimedCallback(&server->timer, + (UA_ApplicationCallback)callback, + server, data, date, callbackId); +} UA_StatusCode UA_Server_addRepeatedCallback(UA_Server *server, UA_ServerCallback callback, @@ -278,9 +286,9 @@ UA_Server_changeRepeatedCallbackInterval(UA_Server *server, UA_UInt64 callbackId interval_ms); } -UA_StatusCode -UA_Server_removeRepeatedCallback(UA_Server *server, UA_UInt64 callbackId) { - return UA_Timer_removeRepeatedCallback(&server->timer, callbackId); +void +UA_Server_removeCallback(UA_Server *server, UA_UInt64 callbackId) { + UA_Timer_removeCallback(&server->timer, callbackId); } UA_StatusCode UA_EXPORT diff --git a/src/server/ua_services_subscription.c b/src/server/ua_services_subscription.c index 5ed1d1928..11369704d 100644 --- a/src/server/ua_services_subscription.c +++ b/src/server/ua_services_subscription.c @@ -34,13 +34,7 @@ setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription, UA_UInt32 requestedMaxKeepAliveCount, UA_UInt32 maxNotificationsPerPublish, UA_Byte priority) { /* deregister the callback if required */ - UA_StatusCode retval = Subscription_unregisterPublishCallback(server, subscription); - if(retval != UA_STATUSCODE_GOOD) { - UA_LOG_DEBUG_SESSION(&server->config.logger, subscription->session, - "Subscription %u | Could not unregister publish callback with error code %s", - subscription->subscriptionId, UA_StatusCode_name(retval)); - return retval; - } + Subscription_unregisterPublishCallback(server, subscription); /* re-parameterize the subscription */ subscription->publishingInterval = requestedPublishingInterval; @@ -61,7 +55,7 @@ setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription, subscription->notificationsPerPublish = server->config.maxNotificationsPerPublish; subscription->priority = priority; - retval = Subscription_registerPublishCallback(server, subscription); + UA_StatusCode retval = Subscription_registerPublishCallback(server, subscription); if(retval != UA_STATUSCODE_GOOD) { UA_LOG_DEBUG_SESSION(&server->config.logger, subscription->session, "Subscription %u | Could not register publish callback with error code %s", diff --git a/src/server/ua_subscription.c b/src/server/ua_subscription.c index 5dc7b4113..9aa9af4a4 100644 --- a/src/server/ua_subscription.c +++ b/src/server/ua_subscription.c @@ -565,20 +565,16 @@ Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub) { return UA_STATUSCODE_GOOD; } -UA_StatusCode +void Subscription_unregisterPublishCallback(UA_Server *server, UA_Subscription *sub) { UA_LOG_DEBUG_SESSION(&server->config.logger, sub->session, "Subscription %u | " "Unregister subscription publishing callback", sub->subscriptionId); if(!sub->publishCallbackIsRegistered) - return UA_STATUSCODE_GOOD; - - UA_StatusCode retval = UA_Server_removeRepeatedCallback(server, sub->publishCallbackId); - if(retval != UA_STATUSCODE_GOOD) - return retval; + return; + UA_Server_removeRepeatedCallback(server, sub->publishCallbackId); sub->publishCallbackIsRegistered = false; - return UA_STATUSCODE_GOOD; } /* When the session has publish requests stored but the last subscription is diff --git a/src/server/ua_subscription.h b/src/server/ua_subscription.h index 66caf72f3..eafef3d66 100644 --- a/src/server/ua_subscription.h +++ b/src/server/ua_subscription.h @@ -133,7 +133,7 @@ void UA_MonitoredItem_init(UA_MonitoredItem *mon, UA_Subscription *sub); void UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *mon); void UA_MonitoredItem_sampleCallback(UA_Server *server, UA_MonitoredItem *mon); UA_StatusCode UA_MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon); -UA_StatusCode UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon); +void UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon); /* Remove entries until mon->maxQueueSize is reached. Sets infobits for lost * data if required. */ @@ -212,7 +212,7 @@ struct UA_Subscription { UA_Subscription * UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionId); void UA_Subscription_deleteMembers(UA_Server *server, UA_Subscription *sub); UA_StatusCode Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub); -UA_StatusCode Subscription_unregisterPublishCallback(UA_Server *server, UA_Subscription *sub); +void Subscription_unregisterPublishCallback(UA_Server *server, UA_Subscription *sub); void UA_Subscription_addMonitoredItem(UA_Subscription *sub, UA_MonitoredItem *newMon); UA_MonitoredItem * UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemId); diff --git a/src/server/ua_subscription_monitoreditem.c b/src/server/ua_subscription_monitoreditem.c index ad174cf7e..c4fa0e796 100644 --- a/src/server/ua_subscription_monitoreditem.c +++ b/src/server/ua_subscription_monitoreditem.c @@ -373,12 +373,12 @@ UA_MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon return retval; } -UA_StatusCode +void UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon) { if(!mon->sampleCallbackIsRegistered) - return UA_STATUSCODE_GOOD; + return; + UA_Server_removeRepeatedCallback(server, mon->sampleCallbackId); mon->sampleCallbackIsRegistered = false; - return UA_Server_removeRepeatedCallback(server, mon->sampleCallbackId); } #endif /* UA_ENABLE_SUBSCRIPTIONS */ diff --git a/src/ua_timer.c b/src/ua_timer.c index 3e99335b6..2956ddc17 100644 --- a/src/ua_timer.c +++ b/src/ua_timer.c @@ -2,87 +2,101 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. * - * Copyright 2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer) + * Copyright 2017, 2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer) * Copyright 2017 (c) Stefan Profanter, fortiss GmbH */ #include "ua_util_internal.h" #include "ua_timer.h" -/* Only one thread operates on the repeated jobs. This is usually the "main" - * thread with the event loop. All other threads introduce changes via a - * multi-producer single-consumer (MPSC) queue. The queue is based on a design - * by Dmitry Vyukov. - * http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue - * - * The RepeatedCallback structure is used both in the sorted list of callbacks - * and in the MPSC changes queue. For the changes queue, we differentiate - * between three cases encoded in the callback pointer. - * - * callback > 0x01: add the new repeated callback to the sorted list - * callback == 0x00: remove the callback with the same id - * callback == 0x01: change the interval of the existing callback */ - -#define REMOVE_SENTINEL 0x00 -#define CHANGE_SENTINEL 0x01 - -struct UA_TimerCallbackEntry { - SLIST_ENTRY(UA_TimerCallbackEntry) next; /* Next element in the list */ - UA_DateTime nextTime; /* The next time when the callbacks - * are to be executed */ +struct UA_TimerEntry { + ZIP_ENTRY(UA_TimerEntry) zipfields; + UA_DateTime nextTime; /* The next time when the callback + * is to be executed */ UA_UInt64 interval; /* Interval in 100ns resolution */ - UA_UInt64 id; /* Id of the repeated callback */ + UA_Boolean repeated; /* Repeated callback? */ UA_ApplicationCallback callback; void *application; void *data; + + ZIP_ENTRY(UA_TimerEntry) idZipfields; + UA_UInt64 id; /* Id of the entry */ }; +/* There may be several entries with the same nextTime in the tree. We give them + * an absolute order by considering the memory address to break ties. Because of + * this, the nextTime property cannot be used to lookup specific entries. */ +static enum ZIP_CMP +cmpDateTime(const UA_DateTime *a, const UA_DateTime *b) { + if(*a < *b) + return ZIP_CMP_LESS; + if(*a > *b) + return ZIP_CMP_MORE; + if(a == b) + return ZIP_CMP_EQ; + if(a < b) + return ZIP_CMP_LESS; + return ZIP_CMP_MORE; +} + +ZIP_PROTTYPE(UA_TimerZip, UA_TimerEntry, UA_DateTime) +ZIP_IMPL(UA_TimerZip, UA_TimerEntry, zipfields, UA_DateTime, nextTime, cmpDateTime) + +/* The identifiers of entries are unique */ +static enum ZIP_CMP +cmpId(const UA_UInt64 *a, const UA_UInt64 *b) { + if(*a < *b) + return ZIP_CMP_LESS; + if(*a == *b) + return ZIP_CMP_EQ; + return ZIP_CMP_MORE; +} + +ZIP_PROTTYPE(UA_TimerIdZip, UA_TimerEntry, UA_UInt64) +ZIP_IMPL(UA_TimerIdZip, UA_TimerEntry, idZipfields, UA_UInt64, id, cmpId) + void UA_Timer_init(UA_Timer *t) { - SLIST_INIT(&t->repeatedCallbacks); - t->changes_head = (UA_TimerCallbackEntry*)&t->changes_stub; - t->changes_tail = (UA_TimerCallbackEntry*)&t->changes_stub; - t->changes_stub = NULL; - t->idCounter = 0; + memset(t, 0, sizeof(UA_Timer)); } -static void -enqueueChange(UA_Timer *t, UA_TimerCallbackEntry *tc) { - tc->next.sle_next = NULL; - UA_TimerCallbackEntry *prev = (UA_TimerCallbackEntry*) - UA_atomic_xchg((void * volatile *)&t->changes_head, tc); - /* Nothing can be dequeued while the producer is blocked here */ - prev->next.sle_next = tc; /* Once this change is visible in the consumer, - * the node is dequeued in the following - * iteration */ +static UA_StatusCode +addCallback(UA_Timer *t, UA_ApplicationCallback callback, void *application, void *data, + UA_DateTime nextTime, UA_UInt64 interval, UA_Boolean repeated, + UA_UInt64 *callbackId) { + /* A callback method needs to be present */ + if(!callback) + return UA_STATUSCODE_BADINTERNALERROR; + + /* Allocate the repeated callback structure */ + UA_TimerEntry *te = (UA_TimerEntry*)UA_malloc(sizeof(UA_TimerEntry)); + if(!te) + return UA_STATUSCODE_BADOUTOFMEMORY; + + /* Set the repeated callback */ + te->interval = (UA_UInt64)interval; + te->id = ++t->idCounter; + te->callback = callback; + te->application = application; + te->data = data; + te->repeated = repeated; + te->nextTime = nextTime; + + /* Set the output identifier */ + if(callbackId) + *callbackId = te->id; + + ZIP_INSERT(UA_TimerZip, &t->root, te, ZIP_FFS32(UA_UInt32_random())); + ZIP_INSERT(UA_TimerIdZip, &t->idRoot, te, ZIP_RANK(te, zipfields)); + return UA_STATUSCODE_GOOD; } -static UA_TimerCallbackEntry * -dequeueChange(UA_Timer *t) { - UA_TimerCallbackEntry *tail = t->changes_tail; - UA_TimerCallbackEntry *next = tail->next.sle_next; - if(tail == (UA_TimerCallbackEntry*)&t->changes_stub) { - if(!next) - return NULL; - t->changes_tail = next; - tail = next; - next = next->next.sle_next; - } - if(next) { - t->changes_tail = next; - return tail; - } - UA_TimerCallbackEntry* head = t->changes_head; - if(tail != head) - return NULL; - enqueueChange(t, (UA_TimerCallbackEntry*)&t->changes_stub); - next = tail->next.sle_next; - if(next) { - t->changes_tail = next; - return tail; - } - return NULL; +UA_StatusCode +UA_Timer_addTimedCallback(UA_Timer *t, UA_ApplicationCallback callback, + void *application, void *data, UA_DateTime date, + UA_UInt64 *callbackId) { + return addCallback(t, callback, application, data, date, 0, false, callbackId); } /* Adding repeated callbacks: Add an entry with the "nextTime" timestamp in the @@ -92,70 +106,14 @@ UA_StatusCode UA_Timer_addRepeatedCallback(UA_Timer *t, UA_ApplicationCallback callback, void *application, void *data, UA_Double interval_ms, UA_UInt64 *callbackId) { - /* A callback method needs to be present */ - if(!callback) - return UA_STATUSCODE_BADINTERNALERROR; - /* The interval needs to be positive */ if(interval_ms <= 0.0) return UA_STATUSCODE_BADINTERNALERROR; - /* Allocate the repeated callback structure */ - UA_TimerCallbackEntry *tc = - (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry)); - if(!tc) - return UA_STATUSCODE_BADOUTOFMEMORY; - - /* Set the repeated callback */ - tc->interval = (UA_UInt64)(interval_ms * UA_DATETIME_MSEC); /* in 100ns resolution */ - tc->id = ++t->idCounter; - tc->callback = callback; - tc->application = application; - tc->data = data; - tc->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)tc->interval; - - /* Set the output identifier */ - if(callbackId) - *callbackId = tc->id; - - /* Enqueue the changes in the MPSC queue */ - enqueueChange(t, tc); - return UA_STATUSCODE_GOOD; -} - -static void -addTimerCallbackEntry(UA_Timer *t, UA_TimerCallbackEntry * UA_RESTRICT tc) { - /* Find the last entry before this callback */ - UA_TimerCallbackEntry *tmpTc, *afterTc = NULL; - SLIST_FOREACH(tmpTc, &t->repeatedCallbacks, next) { - if(tmpTc->nextTime >= tc->nextTime) - break; - - /* The goal is to have many repeated callbacks with the same repetition - * interval in a "block" in order to reduce linear search for re-entry - * to the sorted list after processing. Allow the first execution to lie - * between "nextTime - 1s" and "nextTime" if this adjustment groups - * callbacks with the same repetition interval. - * Callbacks of a block are added in reversed order. This design allows - * the monitored items of a subscription (if created in a sequence with the - * same publish/sample interval) to be executed before the subscription - * publish the notifications */ - if(tmpTc->interval == tc->interval && - tmpTc->nextTime > (tc->nextTime - UA_DATETIME_SEC)) { - tc->nextTime = tmpTc->nextTime; - break; - } - - /* tc is neither in the same interval nor supposed to be executed sooner - * than tmpTc. Update afterTc to push tc further back in the timer list. */ - afterTc = tmpTc; - } - - /* Add the repeated callback */ - if(afterTc) - SLIST_INSERT_AFTER(afterTc, tc, next); - else - SLIST_INSERT_HEAD(&t->repeatedCallbacks, tc, next); + UA_UInt64 interval = (UA_UInt64)(interval_ms * UA_DATETIME_MSEC); + UA_DateTime nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)interval; + return addCallback(t, callback, application, data, nextTime, + interval, true, callbackId); } UA_StatusCode @@ -165,211 +123,74 @@ UA_Timer_changeRepeatedCallbackInterval(UA_Timer *t, UA_UInt64 callbackId, if(interval_ms <= 0.0) return UA_STATUSCODE_BADINTERNALERROR; - /* Allocate the repeated callback structure */ - UA_TimerCallbackEntry *tc = - (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry)); - if(!tc) - return UA_STATUSCODE_BADOUTOFMEMORY; + /* Remove from the sorted list */ + UA_TimerEntry *te = ZIP_FIND(UA_TimerIdZip, &t->idRoot, &callbackId); + if(!te) + return UA_STATUSCODE_BADNOTFOUND; /* Set the repeated callback */ - tc->interval = (UA_UInt64)(interval_ms * UA_DATETIME_MSEC); /* in 100ns resolution */ - tc->id = callbackId; - tc->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)tc->interval; - tc->callback = (UA_ApplicationCallback)CHANGE_SENTINEL; - - /* Enqueue the changes in the MPSC queue */ - enqueueChange(t, tc); + ZIP_REMOVE(UA_TimerZip, &t->root, te); + te->interval = (UA_UInt64)(interval_ms * UA_DATETIME_MSEC); /* in 100ns resolution */ + te->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)te->interval; + ZIP_INSERT(UA_TimerZip, &t->root, te, ZIP_RANK(te, zipfields)); return UA_STATUSCODE_GOOD; } -static void -changeTimerCallbackEntryInterval(UA_Timer *t, UA_UInt64 callbackId, - UA_UInt64 interval, UA_DateTime nextTime) { - /* Remove from the sorted list */ - UA_TimerCallbackEntry *tc, *prev = NULL; - SLIST_FOREACH(tc, &t->repeatedCallbacks, next) { - if(callbackId == tc->id) { - if(prev) - SLIST_REMOVE_AFTER(prev, next); - else - SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next); - break; - } - prev = tc; - } - if(!tc) +void +UA_Timer_removeCallback(UA_Timer *t, UA_UInt64 callbackId) { + UA_TimerEntry *te = ZIP_FIND(UA_TimerIdZip, &t->idRoot, &callbackId); + if(!te) return; - /* Adjust settings */ - tc->interval = interval; - tc->nextTime = nextTime; - - /* Reinsert at the new position */ - addTimerCallbackEntry(t, tc); -} - -/* Removing a repeated callback: Add an entry with the "nextTime" timestamp set - * to UA_INT64_MAX. The next iteration picks this up and removes the repated - * callback from the linked list. */ -UA_StatusCode -UA_Timer_removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId) { - /* Allocate the repeated callback structure */ - UA_TimerCallbackEntry *tc = - (UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry)); - if(!tc) - return UA_STATUSCODE_BADOUTOFMEMORY; - - /* Set the repeated callback with the sentinel nextTime */ - tc->id = callbackId; - tc->callback = (UA_ApplicationCallback)REMOVE_SENTINEL; - - /* Enqueue the changes in the MPSC queue */ - enqueueChange(t, tc); - return UA_STATUSCODE_GOOD; -} - -static void -removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId) { - UA_TimerCallbackEntry *tc, *prev = NULL; - SLIST_FOREACH(tc, &t->repeatedCallbacks, next) { - if(callbackId == tc->id) { - if(prev) - SLIST_REMOVE_AFTER(prev, next); - else - SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next); - UA_free(tc); - break; - } - prev = tc; - } -} - -/* Process the changes that were added to the MPSC queue (by other threads) */ -static void -processChanges(UA_Timer *t) { - UA_TimerCallbackEntry *change; - while((change = dequeueChange(t))) { - switch((uintptr_t)change->callback) { - case REMOVE_SENTINEL: - removeRepeatedCallback(t, change->id); - UA_free(change); - break; - case CHANGE_SENTINEL: - changeTimerCallbackEntryInterval(t, change->id, change->interval, - change->nextTime); - UA_free(change); - break; - default: - addTimerCallbackEntry(t, change); - } - } + ZIP_REMOVE(UA_TimerZip, &t->root, te); + ZIP_REMOVE(UA_TimerIdZip, &t->idRoot, te); + UA_free(te); } UA_DateTime UA_Timer_process(UA_Timer *t, UA_DateTime nowMonotonic, UA_TimerExecutionCallback executionCallback, void *executionApplication) { - /* Insert and remove callbacks */ - processChanges(t); + UA_TimerEntry *first; + while((first = ZIP_MIN(UA_TimerZip, &t->root)) && + first->nextTime <= nowMonotonic) { + ZIP_REMOVE(UA_TimerZip, &t->root, first); - /* Find the last callback to be executed now */ - UA_TimerCallbackEntry *firstAfter, *lastNow = NULL; - SLIST_FOREACH(firstAfter, &t->repeatedCallbacks, next) { - if(firstAfter->nextTime > nowMonotonic) - break; - lastNow = firstAfter; - } + /* Reinsert / remove to their new position first. Because the callback + * can interact with the zip tree and expects the same entries in the + * root and idRoot trees. */ - /* Nothing to do */ - if(!lastNow) { - if(firstAfter) - return firstAfter->nextTime; - return UA_INT64_MAX; - } - - /* Put the callbacks that are executed now in a separate list */ - UA_TimerCallbackList executedNowList; - executedNowList.slh_first = SLIST_FIRST(&t->repeatedCallbacks); - lastNow->next.sle_next = NULL; - - /* Fake entry to represent the first element in the newly-sorted list */ - UA_TimerCallbackEntry tmp_first; - tmp_first.nextTime = nowMonotonic - 1; /* never matches for last_dispatched */ - tmp_first.next.sle_next = firstAfter; - UA_TimerCallbackEntry *last_dispatched = &tmp_first; - - /* Iterate over the list of callbacks to process now */ - UA_TimerCallbackEntry *tc; - while((tc = SLIST_FIRST(&executedNowList))) { - /* Remove from the list */ - SLIST_REMOVE_HEAD(&executedNowList, next); - - /* Dispatch/process callback */ - executionCallback(executionApplication, tc->callback, - tc->application, tc->data); + if(!first->repeated) { + ZIP_REMOVE(UA_TimerIdZip, &t->idRoot, first); + executionCallback(executionApplication, first->callback, + first->application, first->data); + UA_free(first); + continue; + } /* Set the time for the next execution. Prevent an infinite loop by * forcing the next processing into the next iteration. */ - tc->nextTime += (UA_Int64)tc->interval; - if(tc->nextTime < nowMonotonic) - tc->nextTime = nowMonotonic + 1; - - /* Find the new position for tc to keep the list sorted */ - UA_TimerCallbackEntry *prev_tc; - if(last_dispatched->nextTime == tc->nextTime) { - /* We try to "batch" repeatedCallbacks with the same interval. This - * saves a linear search when the last dispatched entry has the same - * nextTime timestamp as this entry. */ - UA_assert(last_dispatched != &tmp_first); - prev_tc = last_dispatched; - } else { - /* Find the position for the next execution by a linear search - * starting at last_dispatched or the first element */ - if(last_dispatched->nextTime < tc->nextTime) - prev_tc = last_dispatched; - else - prev_tc = &tmp_first; - - while(true) { - UA_TimerCallbackEntry *n = SLIST_NEXT(prev_tc, next); - if(!n || n->nextTime >= tc->nextTime) - break; - prev_tc = n; - } - } - - /* Update last_dispatched to make sure batched callbacks are added in the - * same sequence as before they were executed and to save some iterations - * of the linear search for callbacks to be added further back in the list. */ - last_dispatched = tc; - - /* Add entry to the new position in the sorted list */ - SLIST_INSERT_AFTER(prev_tc, tc, next); + first->nextTime += (UA_Int64)first->interval; + if(first->nextTime < nowMonotonic) + first->nextTime = nowMonotonic + 1; + ZIP_INSERT(UA_TimerZip, &t->root, first, ZIP_RANK(first, zipfields)); + executionCallback(executionApplication, first->callback, + first->application, first->data); } - /* Set the entry-point for the newly sorted list */ - t->repeatedCallbacks.slh_first = tmp_first.next.sle_next; + /* Return the timestamp of the earliest next callback */ + first = ZIP_MIN(UA_TimerZip, &t->root); + return (first) ? first->nextTime : UA_INT64_MAX; +} - /* Re-repeat processAddRemoved since one of the callbacks might have removed - * or added a callback. So we return a correct timeout. */ - processChanges(t); - - /* Return timestamp of next repetition */ - tc = SLIST_FIRST(&t->repeatedCallbacks); - if(!tc) - return UA_INT64_MAX; /* Main-loop has a max timeout / will continue earlier */ - return tc->nextTime; +static void +freeEntry(UA_TimerEntry *te, void *data) { + UA_free(te); } void UA_Timer_deleteMembers(UA_Timer *t) { - /* Process changes to empty the MPSC queue */ - processChanges(t); - - /* Remove repeated callbacks */ - UA_TimerCallbackEntry *current; - while((current = SLIST_FIRST(&t->repeatedCallbacks))) { - SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next); - UA_free(current); - } + /* Free all nodes and reset the root */ + ZIP_ITER(UA_TimerZip, &t->root, freeEntry, NULL); + ZIP_INIT(&t->root); } diff --git a/src/ua_timer.h b/src/ua_timer.h index edbe0fde5..220405132 100644 --- a/src/ua_timer.h +++ b/src/ua_timer.h @@ -2,7 +2,7 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. * - * Copyright 2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer) + * Copyright 2017, 2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer) * Copyright 2017 (c) Stefan Profanter, fortiss GmbH */ @@ -11,48 +11,37 @@ #include "ua_util_internal.h" #include "ua_workqueue.h" +#include "ziptree.h" _UA_BEGIN_DECLS -/* An (event) timer triggers callbacks with a recurring interval. Adding, - * removing and changing repeated callbacks can be done from independent - * threads. Processing the changes and dispatching callbacks must be done by a - * single "mainloop" process. - * Timer callbacks with the same recurring interval are batched into blocks in - * order to reduce linear search for re-entry to the sorted list after processing. - * Callbacks are inserted in reversed order (last callback are put first in the block) - * to allow the monitored items of a subscription (if created in a sequence with the - * same publish/sample interval) to be executed before the subscription publish the - * notifications. When callbacks are entered to the timer list after execution they - * are added in the same order as before execution. */ +struct UA_TimerEntry; +typedef struct UA_TimerEntry UA_TimerEntry; -/* Forward declaration */ -struct UA_TimerCallbackEntry; -typedef struct UA_TimerCallbackEntry UA_TimerCallbackEntry; +ZIP_HEAD(UA_TimerZip, UA_TimerEntry); +typedef struct UA_TimerZip UA_TimerZip; -/* Linked-list definition */ -typedef SLIST_HEAD(UA_TimerCallbackList, UA_TimerCallbackEntry) UA_TimerCallbackList; +ZIP_HEAD(UA_TimerIdZip, UA_TimerEntry); +typedef struct UA_TimerIdZip UA_TimerIdZip; +/* Only for a single thread. Protect by a mutex if required. */ typedef struct { - /* The linked list of callbacks is sorted according to the execution timestamp. */ - UA_TimerCallbackList repeatedCallbacks; - - /* Changes to the repeated callbacks in a multi-producer single-consumer queue */ - UA_TimerCallbackEntry * volatile changes_head; - UA_TimerCallbackEntry *changes_tail; - UA_TimerCallbackEntry *changes_stub; - + UA_TimerZip root; /* The root of the time-sorted zip tree */ + UA_TimerIdZip idRoot; /* The root of the id-sorted zip tree */ UA_UInt64 idCounter; } UA_Timer; -/* Initialize the Timer. Not thread-safe. */ void UA_Timer_init(UA_Timer *t); -/* Add a repated callback. Thread-safe, can be used in parallel and in parallel - * with UA_Timer_process. */ UA_StatusCode -UA_Timer_addRepeatedCallback(UA_Timer *t, UA_ApplicationCallback callback, void *application, - void *data, UA_Double interval_ms, UA_UInt64 *callbackId); +UA_Timer_addTimedCallback(UA_Timer *t, UA_ApplicationCallback callback, + void *application, void *data, UA_DateTime date, + UA_UInt64 *callbackId); + +UA_StatusCode +UA_Timer_addRepeatedCallback(UA_Timer *t, UA_ApplicationCallback callback, + void *application, void *data, UA_Double interval_ms, + UA_UInt64 *callbackId); /* Change the callback interval. If this is called from within the callback. The * adjustment is made during the next _process call. */ @@ -60,10 +49,8 @@ UA_StatusCode UA_Timer_changeRepeatedCallbackInterval(UA_Timer *t, UA_UInt64 callbackId, UA_Double interval_ms); -/* Remove a repated callback. Thread-safe, can be used in parallel and in - * parallel with UA_Timer_process. */ -UA_StatusCode -UA_Timer_removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId); +void +UA_Timer_removeCallback(UA_Timer *t, UA_UInt64 callbackId); /* Process (dispatch) the repeated callbacks that have timed out. Returns the * timestamp of the next scheduled repeated callback. Not thread-safe. @@ -78,7 +65,6 @@ UA_Timer_process(UA_Timer *t, UA_DateTime nowMonotonic, UA_TimerExecutionCallback executionCallback, void *executionApplication); -/* Remove all repeated callbacks. Not thread-safe. */ void UA_Timer_deleteMembers(UA_Timer *t); _UA_END_DECLS