Timer: Use zip tree for the timer

Timer: Add callbacks that are executed once at a defined date

Timer: Implement changes in the timer immediately
This commit is contained in:
Julius Pfrommer 2018-12-24 23:46:03 +01:00 committed by Julius Pfrommer
parent 835b83b2f9
commit 4d804b5935
14 changed files with 244 additions and 394 deletions

View File

@ -506,6 +506,7 @@ set(exported_headers ${exported_headers}
${PROJECT_SOURCE_DIR}/include/ua_client_highlevel_async.h)
set(internal_headers ${PROJECT_SOURCE_DIR}/deps/open62541_queue.h
${PROJECT_SOURCE_DIR}/deps/ziptree.h
${PROJECT_SOURCE_DIR}/deps/pcg_basic.h
${PROJECT_SOURCE_DIR}/deps/libc_time.h
${PROJECT_SOURCE_DIR}/deps/base64.h

View File

@ -468,17 +468,35 @@ __UA_Client_AsyncServiceEx(UA_Client *client, const void *request,
UA_UInt32 timeout);
/**
* Repeated Callbacks
* ------------------
* Timed Callbacks
* ---------------
* Repeated callbacks can be attached to a client and will be executed in the
* defined interval. */
typedef void (*UA_ClientCallback)(UA_Client *client, void *data);
/* Add a callback for execution at a specified time. If the indicated time lies
* in the past, then the callback is executed at the next iteration of the
* server's main loop.
*
* @param client The client object.
* @param callback The callback that shall be added.
* @param data Data that is forwarded to the callback.
* @param date The timestamp for the execution time.
* @param callbackId Set to the identifier of the repeated callback . This can
* be used to cancel the callback later on. If the pointer is null, the
* identifier is not set.
* @return Upon success, UA_STATUSCODE_GOOD is returned. An error code
* otherwise. */
UA_StatusCode UA_EXPORT
UA_Client_addTimedCallback(UA_Client *client, UA_ClientCallback callback,
void *data, UA_DateTime date, UA_UInt64 *callbackId);
/* Add a callback for cyclic repetition to the client.
*
* @param client The client object.
* @param callback The callback that shall be added.
* @param data Data that is forwarded to the callback.
* @param interval_ms The callback shall be repeatedly executed with the given
* interval (in ms). The interval must be positive. The first execution
* occurs at now() + interval at the latest.
@ -487,19 +505,21 @@ typedef void (*UA_ClientCallback)(UA_Client *client, void *data);
* identifier is not set.
* @return Upon success, UA_STATUSCODE_GOOD is returned. An error code
* otherwise. */
UA_StatusCode
UA_Client_addRepeatedCallback(UA_Client *client,
UA_ClientCallback callback,
UA_StatusCode UA_EXPORT
UA_Client_addRepeatedCallback(UA_Client *client, UA_ClientCallback callback,
void *data, UA_Double interval_ms,
UA_UInt64 *callbackId);
UA_StatusCode
UA_StatusCode UA_EXPORT
UA_Client_changeRepeatedCallbackInterval(UA_Client *client,
UA_UInt64 callbackId,
UA_Double interval_ms);
UA_StatusCode UA_Client_removeRepeatedCallback(UA_Client *client,
UA_UInt64 callbackId);
void UA_EXPORT
UA_Client_removeCallback(UA_Client *client, UA_UInt64 callbackId);
#define UA_Client_removeRepeatedCallback(client, callbackId) \
UA_Client_removeCallback(client, callbackId)
/**
* .. toctree::

View File

@ -77,14 +77,32 @@ UA_StatusCode UA_EXPORT
UA_Server_run_shutdown(UA_Server *server);
/**
* Repeated Callbacks
* ------------------ */
* Timed Callbacks
* --------------- */
typedef void (*UA_ServerCallback)(UA_Server *server, void *data);
/* Add a callback for execution at a specified time. If the indicated time lies
* in the past, then the callback is executed at the next iteration of the
* server's main loop.
*
* @param server The server object.
* @param callback The callback that shall be added.
* @param data Data that is forwarded to the callback.
* @param date The timestamp for the execution time.
* @param callbackId Set to the identifier of the repeated callback . This can
* be used to cancel the callback later on. If the pointer is null, the
* identifier is not set.
* @return Upon success, UA_STATUSCODE_GOOD is returned. An error code
* otherwise. */
UA_StatusCode UA_EXPORT
UA_Server_addTimedCallback(UA_Server *server, UA_ServerCallback callback,
void *data, UA_DateTime date, UA_UInt64 *callbackId);
/* Add a callback for cyclic repetition to the server.
*
* @param server The server object.
* @param callback The callback that shall be added.
* @param data Data that is forwarded to the callback.
* @param interval_ms The callback shall be repeatedly executed with the given
* interval (in ms). The interval must be positive. The first execution
* occurs at now() + interval at the latest.
@ -101,14 +119,15 @@ UA_StatusCode UA_EXPORT
UA_Server_changeRepeatedCallbackInterval(UA_Server *server, UA_UInt64 callbackId,
UA_Double interval_ms);
/* Remove a repeated callback.
/* Remove a repeated callback. Does nothing if the callback is not found.
*
* @param server The server object.
* @param callbackId The id of the callback that shall be removed.
* @return Upon success, UA_STATUSCODE_GOOD is returned.
* An error code otherwise. */
UA_StatusCode UA_EXPORT
UA_Server_removeRepeatedCallback(UA_Server *server, UA_UInt64 callbackId);
* @param callbackId The id of the callback */
void UA_EXPORT
UA_Server_removeCallback(UA_Server *server, UA_UInt64 callbackId);
#define UA_Server_removeRepeatedCallback(server, callbackId) \
UA_Server_removeCallback(server, callbackId);
/**
* Reading and Writing Node Attributes

View File

@ -631,13 +631,18 @@ UA_Client_sendAsyncRequest(UA_Client *client, const void *request,
responseType, userdata, requestId);
}
UA_StatusCode UA_EXPORT
UA_Client_addTimedCallback(UA_Client *client, UA_ClientCallback callback,
void *data, UA_DateTime date, UA_UInt64 *callbackId) {
return UA_Timer_addTimedCallback(&client->timer, (UA_ApplicationCallback) callback,
client, data, date, callbackId);
}
UA_StatusCode
UA_Client_addRepeatedCallback(UA_Client *client, UA_ClientCallback callback,
void *data, UA_Double interval_ms,
UA_UInt64 *callbackId) {
return UA_Timer_addRepeatedCallback(&client->timer,
(UA_ApplicationCallback) callback, client, data,
interval_ms, callbackId);
void *data, UA_Double interval_ms, UA_UInt64 *callbackId) {
return UA_Timer_addRepeatedCallback(&client->timer, (UA_ApplicationCallback) callback,
client, data, interval_ms, callbackId);
}
UA_StatusCode
@ -647,7 +652,7 @@ UA_Client_changeRepeatedCallbackInterval(UA_Client *client, UA_UInt64 callbackId
interval_ms);
}
UA_StatusCode
UA_Client_removeRepeatedCallback(UA_Client *client, UA_UInt64 callbackId) {
return UA_Timer_removeRepeatedCallback(&client->timer, callbackId);
void
UA_Client_removeCallback(UA_Client *client, UA_UInt64 callbackId) {
UA_Timer_removeCallback(&client->timer, callbackId);
}

View File

@ -158,8 +158,7 @@ UA_Server_removeWriterGroup(UA_Server *server, const UA_NodeId writerGroup){
return UA_STATUSCODE_BADNOTFOUND;
//unregister the publish callback
if(UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId) != UA_STATUSCODE_GOOD)
return UA_STATUSCODE_BADINTERNALERROR;
UA_PubSubManager_removeRepeatedPubSubCallback(server, wg->publishCallbackId);
#ifdef UA_ENABLE_PUBSUB_INFORMATIONMODEL
removeGroupRepresentation(server, wg);
#endif

View File

@ -308,6 +308,7 @@ UA_PubSubManager_delete(UA_Server *server, UA_PubSubManager *pubSubManager) {
/***********************************/
/* PubSub Jobs abstraction */
/***********************************/
UA_StatusCode
UA_PubSubManager_addRepeatedCallback(UA_Server *server, UA_ServerCallback callback,
void *data, UA_Double interval_ms, UA_UInt64 *callbackId) {
@ -321,9 +322,9 @@ UA_PubSubManager_changeRepeatedCallbackInterval(UA_Server *server, UA_UInt64 cal
return UA_Timer_changeRepeatedCallbackInterval(&server->timer, callbackId, interval_ms);
}
UA_StatusCode
void
UA_PubSubManager_removeRepeatedPubSubCallback(UA_Server *server, UA_UInt64 callbackId) {
return UA_Timer_removeRepeatedCallback(&server->timer, callbackId);
UA_Timer_removeCallback(&server->timer, callbackId);
}
#endif /* UA_ENABLE_PUBSUB */

View File

@ -41,7 +41,7 @@ UA_PubSubManager_addRepeatedCallback(UA_Server *server, UA_ServerCallback callba
UA_StatusCode
UA_PubSubManager_changeRepeatedCallbackInterval(UA_Server *server, UA_UInt64 callbackId,
UA_Double interval_ms);
UA_StatusCode
void
UA_PubSubManager_removeRepeatedPubSubCallback(UA_Server *server, UA_UInt64 callbackId);
#endif /* UA_ENABLE_PUBSUB */

View File

@ -258,9 +258,17 @@ UA_Server_new(const UA_ServerConfig *config) {
return server;
}
/*****************/
/* Repeated Jobs */
/*****************/
/*******************/
/* Timed Callbacks */
/*******************/
UA_StatusCode
UA_Server_addTimedCallback(UA_Server *server, UA_ServerCallback callback,
void *data, UA_DateTime date, UA_UInt64 *callbackId) {
return UA_Timer_addTimedCallback(&server->timer,
(UA_ApplicationCallback)callback,
server, data, date, callbackId);
}
UA_StatusCode
UA_Server_addRepeatedCallback(UA_Server *server, UA_ServerCallback callback,
@ -278,9 +286,9 @@ UA_Server_changeRepeatedCallbackInterval(UA_Server *server, UA_UInt64 callbackId
interval_ms);
}
UA_StatusCode
UA_Server_removeRepeatedCallback(UA_Server *server, UA_UInt64 callbackId) {
return UA_Timer_removeRepeatedCallback(&server->timer, callbackId);
void
UA_Server_removeCallback(UA_Server *server, UA_UInt64 callbackId) {
UA_Timer_removeCallback(&server->timer, callbackId);
}
UA_StatusCode UA_EXPORT

View File

@ -34,13 +34,7 @@ setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription,
UA_UInt32 requestedMaxKeepAliveCount,
UA_UInt32 maxNotificationsPerPublish, UA_Byte priority) {
/* deregister the callback if required */
UA_StatusCode retval = Subscription_unregisterPublishCallback(server, subscription);
if(retval != UA_STATUSCODE_GOOD) {
UA_LOG_DEBUG_SESSION(&server->config.logger, subscription->session,
"Subscription %u | Could not unregister publish callback with error code %s",
subscription->subscriptionId, UA_StatusCode_name(retval));
return retval;
}
Subscription_unregisterPublishCallback(server, subscription);
/* re-parameterize the subscription */
subscription->publishingInterval = requestedPublishingInterval;
@ -61,7 +55,7 @@ setSubscriptionSettings(UA_Server *server, UA_Subscription *subscription,
subscription->notificationsPerPublish = server->config.maxNotificationsPerPublish;
subscription->priority = priority;
retval = Subscription_registerPublishCallback(server, subscription);
UA_StatusCode retval = Subscription_registerPublishCallback(server, subscription);
if(retval != UA_STATUSCODE_GOOD) {
UA_LOG_DEBUG_SESSION(&server->config.logger, subscription->session,
"Subscription %u | Could not register publish callback with error code %s",

View File

@ -565,20 +565,16 @@ Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub) {
return UA_STATUSCODE_GOOD;
}
UA_StatusCode
void
Subscription_unregisterPublishCallback(UA_Server *server, UA_Subscription *sub) {
UA_LOG_DEBUG_SESSION(&server->config.logger, sub->session, "Subscription %u | "
"Unregister subscription publishing callback", sub->subscriptionId);
if(!sub->publishCallbackIsRegistered)
return UA_STATUSCODE_GOOD;
UA_StatusCode retval = UA_Server_removeRepeatedCallback(server, sub->publishCallbackId);
if(retval != UA_STATUSCODE_GOOD)
return retval;
return;
UA_Server_removeRepeatedCallback(server, sub->publishCallbackId);
sub->publishCallbackIsRegistered = false;
return UA_STATUSCODE_GOOD;
}
/* When the session has publish requests stored but the last subscription is

View File

@ -133,7 +133,7 @@ void UA_MonitoredItem_init(UA_MonitoredItem *mon, UA_Subscription *sub);
void UA_MonitoredItem_delete(UA_Server *server, UA_MonitoredItem *mon);
void UA_MonitoredItem_sampleCallback(UA_Server *server, UA_MonitoredItem *mon);
UA_StatusCode UA_MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon);
UA_StatusCode UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon);
void UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon);
/* Remove entries until mon->maxQueueSize is reached. Sets infobits for lost
* data if required. */
@ -212,7 +212,7 @@ struct UA_Subscription {
UA_Subscription * UA_Subscription_new(UA_Session *session, UA_UInt32 subscriptionId);
void UA_Subscription_deleteMembers(UA_Server *server, UA_Subscription *sub);
UA_StatusCode Subscription_registerPublishCallback(UA_Server *server, UA_Subscription *sub);
UA_StatusCode Subscription_unregisterPublishCallback(UA_Server *server, UA_Subscription *sub);
void Subscription_unregisterPublishCallback(UA_Server *server, UA_Subscription *sub);
void UA_Subscription_addMonitoredItem(UA_Subscription *sub, UA_MonitoredItem *newMon);
UA_MonitoredItem * UA_Subscription_getMonitoredItem(UA_Subscription *sub, UA_UInt32 monitoredItemId);

View File

@ -373,12 +373,12 @@ UA_MonitoredItem_registerSampleCallback(UA_Server *server, UA_MonitoredItem *mon
return retval;
}
UA_StatusCode
void
UA_MonitoredItem_unregisterSampleCallback(UA_Server *server, UA_MonitoredItem *mon) {
if(!mon->sampleCallbackIsRegistered)
return UA_STATUSCODE_GOOD;
return;
UA_Server_removeRepeatedCallback(server, mon->sampleCallbackId);
mon->sampleCallbackIsRegistered = false;
return UA_Server_removeRepeatedCallback(server, mon->sampleCallbackId);
}
#endif /* UA_ENABLE_SUBSCRIPTIONS */

View File

@ -2,87 +2,101 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Copyright 2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
* Copyright 2017, 2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
* Copyright 2017 (c) Stefan Profanter, fortiss GmbH
*/
#include "ua_util_internal.h"
#include "ua_timer.h"
/* Only one thread operates on the repeated jobs. This is usually the "main"
* thread with the event loop. All other threads introduce changes via a
* multi-producer single-consumer (MPSC) queue. The queue is based on a design
* by Dmitry Vyukov.
* http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
*
* The RepeatedCallback structure is used both in the sorted list of callbacks
* and in the MPSC changes queue. For the changes queue, we differentiate
* between three cases encoded in the callback pointer.
*
* callback > 0x01: add the new repeated callback to the sorted list
* callback == 0x00: remove the callback with the same id
* callback == 0x01: change the interval of the existing callback */
#define REMOVE_SENTINEL 0x00
#define CHANGE_SENTINEL 0x01
struct UA_TimerCallbackEntry {
SLIST_ENTRY(UA_TimerCallbackEntry) next; /* Next element in the list */
UA_DateTime nextTime; /* The next time when the callbacks
* are to be executed */
struct UA_TimerEntry {
ZIP_ENTRY(UA_TimerEntry) zipfields;
UA_DateTime nextTime; /* The next time when the callback
* is to be executed */
UA_UInt64 interval; /* Interval in 100ns resolution */
UA_UInt64 id; /* Id of the repeated callback */
UA_Boolean repeated; /* Repeated callback? */
UA_ApplicationCallback callback;
void *application;
void *data;
ZIP_ENTRY(UA_TimerEntry) idZipfields;
UA_UInt64 id; /* Id of the entry */
};
/* There may be several entries with the same nextTime in the tree. We give them
* an absolute order by considering the memory address to break ties. Because of
* this, the nextTime property cannot be used to lookup specific entries. */
static enum ZIP_CMP
cmpDateTime(const UA_DateTime *a, const UA_DateTime *b) {
if(*a < *b)
return ZIP_CMP_LESS;
if(*a > *b)
return ZIP_CMP_MORE;
if(a == b)
return ZIP_CMP_EQ;
if(a < b)
return ZIP_CMP_LESS;
return ZIP_CMP_MORE;
}
ZIP_PROTTYPE(UA_TimerZip, UA_TimerEntry, UA_DateTime)
ZIP_IMPL(UA_TimerZip, UA_TimerEntry, zipfields, UA_DateTime, nextTime, cmpDateTime)
/* The identifiers of entries are unique */
static enum ZIP_CMP
cmpId(const UA_UInt64 *a, const UA_UInt64 *b) {
if(*a < *b)
return ZIP_CMP_LESS;
if(*a == *b)
return ZIP_CMP_EQ;
return ZIP_CMP_MORE;
}
ZIP_PROTTYPE(UA_TimerIdZip, UA_TimerEntry, UA_UInt64)
ZIP_IMPL(UA_TimerIdZip, UA_TimerEntry, idZipfields, UA_UInt64, id, cmpId)
void
UA_Timer_init(UA_Timer *t) {
SLIST_INIT(&t->repeatedCallbacks);
t->changes_head = (UA_TimerCallbackEntry*)&t->changes_stub;
t->changes_tail = (UA_TimerCallbackEntry*)&t->changes_stub;
t->changes_stub = NULL;
t->idCounter = 0;
memset(t, 0, sizeof(UA_Timer));
}
static void
enqueueChange(UA_Timer *t, UA_TimerCallbackEntry *tc) {
tc->next.sle_next = NULL;
UA_TimerCallbackEntry *prev = (UA_TimerCallbackEntry*)
UA_atomic_xchg((void * volatile *)&t->changes_head, tc);
/* Nothing can be dequeued while the producer is blocked here */
prev->next.sle_next = tc; /* Once this change is visible in the consumer,
* the node is dequeued in the following
* iteration */
static UA_StatusCode
addCallback(UA_Timer *t, UA_ApplicationCallback callback, void *application, void *data,
UA_DateTime nextTime, UA_UInt64 interval, UA_Boolean repeated,
UA_UInt64 *callbackId) {
/* A callback method needs to be present */
if(!callback)
return UA_STATUSCODE_BADINTERNALERROR;
/* Allocate the repeated callback structure */
UA_TimerEntry *te = (UA_TimerEntry*)UA_malloc(sizeof(UA_TimerEntry));
if(!te)
return UA_STATUSCODE_BADOUTOFMEMORY;
/* Set the repeated callback */
te->interval = (UA_UInt64)interval;
te->id = ++t->idCounter;
te->callback = callback;
te->application = application;
te->data = data;
te->repeated = repeated;
te->nextTime = nextTime;
/* Set the output identifier */
if(callbackId)
*callbackId = te->id;
ZIP_INSERT(UA_TimerZip, &t->root, te, ZIP_FFS32(UA_UInt32_random()));
ZIP_INSERT(UA_TimerIdZip, &t->idRoot, te, ZIP_RANK(te, zipfields));
return UA_STATUSCODE_GOOD;
}
static UA_TimerCallbackEntry *
dequeueChange(UA_Timer *t) {
UA_TimerCallbackEntry *tail = t->changes_tail;
UA_TimerCallbackEntry *next = tail->next.sle_next;
if(tail == (UA_TimerCallbackEntry*)&t->changes_stub) {
if(!next)
return NULL;
t->changes_tail = next;
tail = next;
next = next->next.sle_next;
}
if(next) {
t->changes_tail = next;
return tail;
}
UA_TimerCallbackEntry* head = t->changes_head;
if(tail != head)
return NULL;
enqueueChange(t, (UA_TimerCallbackEntry*)&t->changes_stub);
next = tail->next.sle_next;
if(next) {
t->changes_tail = next;
return tail;
}
return NULL;
UA_StatusCode
UA_Timer_addTimedCallback(UA_Timer *t, UA_ApplicationCallback callback,
void *application, void *data, UA_DateTime date,
UA_UInt64 *callbackId) {
return addCallback(t, callback, application, data, date, 0, false, callbackId);
}
/* Adding repeated callbacks: Add an entry with the "nextTime" timestamp in the
@ -92,70 +106,14 @@ UA_StatusCode
UA_Timer_addRepeatedCallback(UA_Timer *t, UA_ApplicationCallback callback,
void *application, void *data, UA_Double interval_ms,
UA_UInt64 *callbackId) {
/* A callback method needs to be present */
if(!callback)
return UA_STATUSCODE_BADINTERNALERROR;
/* The interval needs to be positive */
if(interval_ms <= 0.0)
return UA_STATUSCODE_BADINTERNALERROR;
/* Allocate the repeated callback structure */
UA_TimerCallbackEntry *tc =
(UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
if(!tc)
return UA_STATUSCODE_BADOUTOFMEMORY;
/* Set the repeated callback */
tc->interval = (UA_UInt64)(interval_ms * UA_DATETIME_MSEC); /* in 100ns resolution */
tc->id = ++t->idCounter;
tc->callback = callback;
tc->application = application;
tc->data = data;
tc->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)tc->interval;
/* Set the output identifier */
if(callbackId)
*callbackId = tc->id;
/* Enqueue the changes in the MPSC queue */
enqueueChange(t, tc);
return UA_STATUSCODE_GOOD;
}
static void
addTimerCallbackEntry(UA_Timer *t, UA_TimerCallbackEntry * UA_RESTRICT tc) {
/* Find the last entry before this callback */
UA_TimerCallbackEntry *tmpTc, *afterTc = NULL;
SLIST_FOREACH(tmpTc, &t->repeatedCallbacks, next) {
if(tmpTc->nextTime >= tc->nextTime)
break;
/* The goal is to have many repeated callbacks with the same repetition
* interval in a "block" in order to reduce linear search for re-entry
* to the sorted list after processing. Allow the first execution to lie
* between "nextTime - 1s" and "nextTime" if this adjustment groups
* callbacks with the same repetition interval.
* Callbacks of a block are added in reversed order. This design allows
* the monitored items of a subscription (if created in a sequence with the
* same publish/sample interval) to be executed before the subscription
* publish the notifications */
if(tmpTc->interval == tc->interval &&
tmpTc->nextTime > (tc->nextTime - UA_DATETIME_SEC)) {
tc->nextTime = tmpTc->nextTime;
break;
}
/* tc is neither in the same interval nor supposed to be executed sooner
* than tmpTc. Update afterTc to push tc further back in the timer list. */
afterTc = tmpTc;
}
/* Add the repeated callback */
if(afterTc)
SLIST_INSERT_AFTER(afterTc, tc, next);
else
SLIST_INSERT_HEAD(&t->repeatedCallbacks, tc, next);
UA_UInt64 interval = (UA_UInt64)(interval_ms * UA_DATETIME_MSEC);
UA_DateTime nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)interval;
return addCallback(t, callback, application, data, nextTime,
interval, true, callbackId);
}
UA_StatusCode
@ -165,211 +123,74 @@ UA_Timer_changeRepeatedCallbackInterval(UA_Timer *t, UA_UInt64 callbackId,
if(interval_ms <= 0.0)
return UA_STATUSCODE_BADINTERNALERROR;
/* Allocate the repeated callback structure */
UA_TimerCallbackEntry *tc =
(UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
if(!tc)
return UA_STATUSCODE_BADOUTOFMEMORY;
/* Remove from the sorted list */
UA_TimerEntry *te = ZIP_FIND(UA_TimerIdZip, &t->idRoot, &callbackId);
if(!te)
return UA_STATUSCODE_BADNOTFOUND;
/* Set the repeated callback */
tc->interval = (UA_UInt64)(interval_ms * UA_DATETIME_MSEC); /* in 100ns resolution */
tc->id = callbackId;
tc->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)tc->interval;
tc->callback = (UA_ApplicationCallback)CHANGE_SENTINEL;
/* Enqueue the changes in the MPSC queue */
enqueueChange(t, tc);
ZIP_REMOVE(UA_TimerZip, &t->root, te);
te->interval = (UA_UInt64)(interval_ms * UA_DATETIME_MSEC); /* in 100ns resolution */
te->nextTime = UA_DateTime_nowMonotonic() + (UA_DateTime)te->interval;
ZIP_INSERT(UA_TimerZip, &t->root, te, ZIP_RANK(te, zipfields));
return UA_STATUSCODE_GOOD;
}
static void
changeTimerCallbackEntryInterval(UA_Timer *t, UA_UInt64 callbackId,
UA_UInt64 interval, UA_DateTime nextTime) {
/* Remove from the sorted list */
UA_TimerCallbackEntry *tc, *prev = NULL;
SLIST_FOREACH(tc, &t->repeatedCallbacks, next) {
if(callbackId == tc->id) {
if(prev)
SLIST_REMOVE_AFTER(prev, next);
else
SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
break;
}
prev = tc;
}
if(!tc)
void
UA_Timer_removeCallback(UA_Timer *t, UA_UInt64 callbackId) {
UA_TimerEntry *te = ZIP_FIND(UA_TimerIdZip, &t->idRoot, &callbackId);
if(!te)
return;
/* Adjust settings */
tc->interval = interval;
tc->nextTime = nextTime;
/* Reinsert at the new position */
addTimerCallbackEntry(t, tc);
}
/* Removing a repeated callback: Add an entry with the "nextTime" timestamp set
* to UA_INT64_MAX. The next iteration picks this up and removes the repated
* callback from the linked list. */
UA_StatusCode
UA_Timer_removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId) {
/* Allocate the repeated callback structure */
UA_TimerCallbackEntry *tc =
(UA_TimerCallbackEntry*)UA_malloc(sizeof(UA_TimerCallbackEntry));
if(!tc)
return UA_STATUSCODE_BADOUTOFMEMORY;
/* Set the repeated callback with the sentinel nextTime */
tc->id = callbackId;
tc->callback = (UA_ApplicationCallback)REMOVE_SENTINEL;
/* Enqueue the changes in the MPSC queue */
enqueueChange(t, tc);
return UA_STATUSCODE_GOOD;
}
static void
removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId) {
UA_TimerCallbackEntry *tc, *prev = NULL;
SLIST_FOREACH(tc, &t->repeatedCallbacks, next) {
if(callbackId == tc->id) {
if(prev)
SLIST_REMOVE_AFTER(prev, next);
else
SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
UA_free(tc);
break;
}
prev = tc;
}
}
/* Process the changes that were added to the MPSC queue (by other threads) */
static void
processChanges(UA_Timer *t) {
UA_TimerCallbackEntry *change;
while((change = dequeueChange(t))) {
switch((uintptr_t)change->callback) {
case REMOVE_SENTINEL:
removeRepeatedCallback(t, change->id);
UA_free(change);
break;
case CHANGE_SENTINEL:
changeTimerCallbackEntryInterval(t, change->id, change->interval,
change->nextTime);
UA_free(change);
break;
default:
addTimerCallbackEntry(t, change);
}
}
ZIP_REMOVE(UA_TimerZip, &t->root, te);
ZIP_REMOVE(UA_TimerIdZip, &t->idRoot, te);
UA_free(te);
}
UA_DateTime
UA_Timer_process(UA_Timer *t, UA_DateTime nowMonotonic,
UA_TimerExecutionCallback executionCallback,
void *executionApplication) {
/* Insert and remove callbacks */
processChanges(t);
UA_TimerEntry *first;
while((first = ZIP_MIN(UA_TimerZip, &t->root)) &&
first->nextTime <= nowMonotonic) {
ZIP_REMOVE(UA_TimerZip, &t->root, first);
/* Find the last callback to be executed now */
UA_TimerCallbackEntry *firstAfter, *lastNow = NULL;
SLIST_FOREACH(firstAfter, &t->repeatedCallbacks, next) {
if(firstAfter->nextTime > nowMonotonic)
break;
lastNow = firstAfter;
}
/* Reinsert / remove to their new position first. Because the callback
* can interact with the zip tree and expects the same entries in the
* root and idRoot trees. */
/* Nothing to do */
if(!lastNow) {
if(firstAfter)
return firstAfter->nextTime;
return UA_INT64_MAX;
}
/* Put the callbacks that are executed now in a separate list */
UA_TimerCallbackList executedNowList;
executedNowList.slh_first = SLIST_FIRST(&t->repeatedCallbacks);
lastNow->next.sle_next = NULL;
/* Fake entry to represent the first element in the newly-sorted list */
UA_TimerCallbackEntry tmp_first;
tmp_first.nextTime = nowMonotonic - 1; /* never matches for last_dispatched */
tmp_first.next.sle_next = firstAfter;
UA_TimerCallbackEntry *last_dispatched = &tmp_first;
/* Iterate over the list of callbacks to process now */
UA_TimerCallbackEntry *tc;
while((tc = SLIST_FIRST(&executedNowList))) {
/* Remove from the list */
SLIST_REMOVE_HEAD(&executedNowList, next);
/* Dispatch/process callback */
executionCallback(executionApplication, tc->callback,
tc->application, tc->data);
if(!first->repeated) {
ZIP_REMOVE(UA_TimerIdZip, &t->idRoot, first);
executionCallback(executionApplication, first->callback,
first->application, first->data);
UA_free(first);
continue;
}
/* Set the time for the next execution. Prevent an infinite loop by
* forcing the next processing into the next iteration. */
tc->nextTime += (UA_Int64)tc->interval;
if(tc->nextTime < nowMonotonic)
tc->nextTime = nowMonotonic + 1;
/* Find the new position for tc to keep the list sorted */
UA_TimerCallbackEntry *prev_tc;
if(last_dispatched->nextTime == tc->nextTime) {
/* We try to "batch" repeatedCallbacks with the same interval. This
* saves a linear search when the last dispatched entry has the same
* nextTime timestamp as this entry. */
UA_assert(last_dispatched != &tmp_first);
prev_tc = last_dispatched;
} else {
/* Find the position for the next execution by a linear search
* starting at last_dispatched or the first element */
if(last_dispatched->nextTime < tc->nextTime)
prev_tc = last_dispatched;
else
prev_tc = &tmp_first;
while(true) {
UA_TimerCallbackEntry *n = SLIST_NEXT(prev_tc, next);
if(!n || n->nextTime >= tc->nextTime)
break;
prev_tc = n;
}
}
/* Update last_dispatched to make sure batched callbacks are added in the
* same sequence as before they were executed and to save some iterations
* of the linear search for callbacks to be added further back in the list. */
last_dispatched = tc;
/* Add entry to the new position in the sorted list */
SLIST_INSERT_AFTER(prev_tc, tc, next);
first->nextTime += (UA_Int64)first->interval;
if(first->nextTime < nowMonotonic)
first->nextTime = nowMonotonic + 1;
ZIP_INSERT(UA_TimerZip, &t->root, first, ZIP_RANK(first, zipfields));
executionCallback(executionApplication, first->callback,
first->application, first->data);
}
/* Set the entry-point for the newly sorted list */
t->repeatedCallbacks.slh_first = tmp_first.next.sle_next;
/* Return the timestamp of the earliest next callback */
first = ZIP_MIN(UA_TimerZip, &t->root);
return (first) ? first->nextTime : UA_INT64_MAX;
}
/* Re-repeat processAddRemoved since one of the callbacks might have removed
* or added a callback. So we return a correct timeout. */
processChanges(t);
/* Return timestamp of next repetition */
tc = SLIST_FIRST(&t->repeatedCallbacks);
if(!tc)
return UA_INT64_MAX; /* Main-loop has a max timeout / will continue earlier */
return tc->nextTime;
static void
freeEntry(UA_TimerEntry *te, void *data) {
UA_free(te);
}
void
UA_Timer_deleteMembers(UA_Timer *t) {
/* Process changes to empty the MPSC queue */
processChanges(t);
/* Remove repeated callbacks */
UA_TimerCallbackEntry *current;
while((current = SLIST_FIRST(&t->repeatedCallbacks))) {
SLIST_REMOVE_HEAD(&t->repeatedCallbacks, next);
UA_free(current);
}
/* Free all nodes and reset the root */
ZIP_ITER(UA_TimerZip, &t->root, freeEntry, NULL);
ZIP_INIT(&t->root);
}

View File

@ -2,7 +2,7 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Copyright 2017 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
* Copyright 2017, 2018 (c) Fraunhofer IOSB (Author: Julius Pfrommer)
* Copyright 2017 (c) Stefan Profanter, fortiss GmbH
*/
@ -11,48 +11,37 @@
#include "ua_util_internal.h"
#include "ua_workqueue.h"
#include "ziptree.h"
_UA_BEGIN_DECLS
/* An (event) timer triggers callbacks with a recurring interval. Adding,
* removing and changing repeated callbacks can be done from independent
* threads. Processing the changes and dispatching callbacks must be done by a
* single "mainloop" process.
* Timer callbacks with the same recurring interval are batched into blocks in
* order to reduce linear search for re-entry to the sorted list after processing.
* Callbacks are inserted in reversed order (last callback are put first in the block)
* to allow the monitored items of a subscription (if created in a sequence with the
* same publish/sample interval) to be executed before the subscription publish the
* notifications. When callbacks are entered to the timer list after execution they
* are added in the same order as before execution. */
struct UA_TimerEntry;
typedef struct UA_TimerEntry UA_TimerEntry;
/* Forward declaration */
struct UA_TimerCallbackEntry;
typedef struct UA_TimerCallbackEntry UA_TimerCallbackEntry;
ZIP_HEAD(UA_TimerZip, UA_TimerEntry);
typedef struct UA_TimerZip UA_TimerZip;
/* Linked-list definition */
typedef SLIST_HEAD(UA_TimerCallbackList, UA_TimerCallbackEntry) UA_TimerCallbackList;
ZIP_HEAD(UA_TimerIdZip, UA_TimerEntry);
typedef struct UA_TimerIdZip UA_TimerIdZip;
/* Only for a single thread. Protect by a mutex if required. */
typedef struct {
/* The linked list of callbacks is sorted according to the execution timestamp. */
UA_TimerCallbackList repeatedCallbacks;
/* Changes to the repeated callbacks in a multi-producer single-consumer queue */
UA_TimerCallbackEntry * volatile changes_head;
UA_TimerCallbackEntry *changes_tail;
UA_TimerCallbackEntry *changes_stub;
UA_TimerZip root; /* The root of the time-sorted zip tree */
UA_TimerIdZip idRoot; /* The root of the id-sorted zip tree */
UA_UInt64 idCounter;
} UA_Timer;
/* Initialize the Timer. Not thread-safe. */
void UA_Timer_init(UA_Timer *t);
/* Add a repated callback. Thread-safe, can be used in parallel and in parallel
* with UA_Timer_process. */
UA_StatusCode
UA_Timer_addRepeatedCallback(UA_Timer *t, UA_ApplicationCallback callback, void *application,
void *data, UA_Double interval_ms, UA_UInt64 *callbackId);
UA_Timer_addTimedCallback(UA_Timer *t, UA_ApplicationCallback callback,
void *application, void *data, UA_DateTime date,
UA_UInt64 *callbackId);
UA_StatusCode
UA_Timer_addRepeatedCallback(UA_Timer *t, UA_ApplicationCallback callback,
void *application, void *data, UA_Double interval_ms,
UA_UInt64 *callbackId);
/* Change the callback interval. If this is called from within the callback. The
* adjustment is made during the next _process call. */
@ -60,10 +49,8 @@ UA_StatusCode
UA_Timer_changeRepeatedCallbackInterval(UA_Timer *t, UA_UInt64 callbackId,
UA_Double interval_ms);
/* Remove a repated callback. Thread-safe, can be used in parallel and in
* parallel with UA_Timer_process. */
UA_StatusCode
UA_Timer_removeRepeatedCallback(UA_Timer *t, UA_UInt64 callbackId);
void
UA_Timer_removeCallback(UA_Timer *t, UA_UInt64 callbackId);
/* Process (dispatch) the repeated callbacks that have timed out. Returns the
* timestamp of the next scheduled repeated callback. Not thread-safe.
@ -78,7 +65,6 @@ UA_Timer_process(UA_Timer *t, UA_DateTime nowMonotonic,
UA_TimerExecutionCallback executionCallback,
void *executionApplication);
/* Remove all repeated callbacks. Not thread-safe. */
void UA_Timer_deleteMembers(UA_Timer *t);
_UA_END_DECLS