open62541/examples/pubsub/server_pubsub_subscribe_custom_monitoring.c
Julius Pfrommer 5bf6f0c3e7 refactor(pubsub): Remove the old direct-access API
The public Offset Table API should be used for realtime operations on
the NetworkMessages instead.
2025-01-19 22:33:50 +01:00

245 lines
10 KiB
C

/* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
* See http://creativecommons.org/publicdomain/zero/1.0/ for more information. */
/**
* This example shows how to be notified when a Reader does not receive messages
* in the configured timeout. */
#include <open62541/plugin/log_stdout.h>
#include <open62541/server.h>
#include <open62541/server_pubsub.h>
#include <open62541/server_config_default.h>
#include <stdio.h>
#include <stdlib.h>
UA_NodeId connectionIdentifier;
UA_NodeId readerGroupIdentifier;
UA_NodeId readerIdentifier;
UA_DataSetReaderConfig readerConfig;
static void
addPubSubConnection(UA_Server *server, UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl) {
UA_PubSubConnectionConfig connectionConfig;
memset (&connectionConfig, 0, sizeof(UA_PubSubConnectionConfig));
connectionConfig.name = UA_STRING("UDPMC Connection 1");
connectionConfig.transportProfileUri = *transportProfile;
UA_Variant_setScalar(&connectionConfig.address, networkAddressUrl,
&UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
connectionConfig.publisherId.idType = UA_PUBLISHERIDTYPE_UINT32;
connectionConfig.publisherId.id.uint32 = UA_UInt32_random();
UA_Server_addPubSubConnection (server, &connectionConfig, &connectionIdentifier);
}
static void
addReaderGroup(UA_Server *server) {
UA_ReaderGroupConfig readerGroupConfig;
memset (&readerGroupConfig, 0, sizeof(UA_ReaderGroupConfig));
readerGroupConfig.name = UA_STRING("ReaderGroup1");
UA_Server_addReaderGroup(server, connectionIdentifier, &readerGroupConfig,
&readerGroupIdentifier);
UA_Server_enableReaderGroup(server, readerGroupIdentifier);
}
static void
addDataSetReader(UA_Server *server, UA_Double messageReceiveTimeout) {
memset(&readerConfig, 0, sizeof(UA_DataSetReaderConfig));
readerConfig.name = UA_STRING("DataSet Reader 1");
/* Parameters to filter which DataSetMessage has to be processed
* by the DataSetReader */
/* The following parameters are used to show that the data published by
* tutorial_pubsub_publish.c is being subscribed and is being updated in
* the information model */
UA_UInt16 publisherIdentifier = 2234;
readerConfig.publisherId.idType = UA_PUBLISHERIDTYPE_UINT16;
readerConfig.publisherId.id.uint16 = publisherIdentifier;
readerConfig.writerGroupId = 100;
readerConfig.dataSetWriterId = 62541;
readerConfig.messageReceiveTimeout = messageReceiveTimeout;
/* Setting up Meta data configuration in DataSetReader */
UA_DataSetMetaDataType *pMetaData = &readerConfig.dataSetMetaData;
pMetaData->name = UA_STRING ("DataSet 1");
/* Static definition of number of fields size to 4 to create four different
* targetVariables of distinct datatype Currently the publisher sends only
* DateTime data type */
pMetaData->fieldsSize = 4;
pMetaData->fields = (UA_FieldMetaData*)UA_Array_new (pMetaData->fieldsSize,
&UA_TYPES[UA_TYPES_FIELDMETADATA]);
/* DateTime DataType */
UA_FieldMetaData_init (&pMetaData->fields[0]);
UA_NodeId_copy (&UA_TYPES[UA_TYPES_DATETIME].typeId,
&pMetaData->fields[0].dataType);
pMetaData->fields[0].builtInType = UA_NS0ID_DATETIME;
pMetaData->fields[0].name = UA_STRING ("DateTime");
pMetaData->fields[0].valueRank = -1; /* scalar */
/* Int32 DataType */
UA_FieldMetaData_init (&pMetaData->fields[1]);
UA_NodeId_copy(&UA_TYPES[UA_TYPES_INT32].typeId,
&pMetaData->fields[1].dataType);
pMetaData->fields[1].builtInType = UA_NS0ID_INT32;
pMetaData->fields[1].name = UA_STRING ("Int32");
pMetaData->fields[1].valueRank = -1; /* scalar */
/* Int64 DataType */
UA_FieldMetaData_init (&pMetaData->fields[2]);
UA_NodeId_copy(&UA_TYPES[UA_TYPES_INT64].typeId,
&pMetaData->fields[2].dataType);
pMetaData->fields[2].builtInType = UA_NS0ID_INT64;
pMetaData->fields[2].name = UA_STRING ("Int64");
pMetaData->fields[2].valueRank = -1; /* scalar */
/* Boolean DataType */
UA_FieldMetaData_init (&pMetaData->fields[3]);
UA_NodeId_copy (&UA_TYPES[UA_TYPES_BOOLEAN].typeId,
&pMetaData->fields[3].dataType);
pMetaData->fields[3].builtInType = UA_NS0ID_BOOLEAN;
pMetaData->fields[3].name = UA_STRING ("BoolToggle");
pMetaData->fields[3].valueRank = -1; /* scalar */
UA_Server_addDataSetReader(server, readerGroupIdentifier,
&readerConfig, &readerIdentifier);
}
/* Set SubscribedDataSet type to TargetVariables data type
* Add subscribedvariables to the DataSetReader */
static void
addSubscribedVariables(UA_Server *server, UA_NodeId dataSetReaderId) {
UA_NodeId folderId;
UA_String folderName = readerConfig.dataSetMetaData.name;
UA_ObjectAttributes oAttr = UA_ObjectAttributes_default;
UA_QualifiedName folderBrowseName;
oAttr.displayName.locale = UA_STRING ("en-US");
oAttr.displayName.text = folderName;
folderBrowseName.namespaceIndex = 1;
folderBrowseName.name = folderName;
UA_Server_addObjectNode(server, UA_NODEID_NULL, UA_NS0ID(OBJECTSFOLDER),
UA_NS0ID(ORGANIZES), folderBrowseName,
UA_NS0ID(BASEOBJECTTYPE), oAttr, NULL, &folderId);
/* Create the TargetVariables with respect to DataSetMetaData fields */
UA_FieldTargetDataType *targetVars = (UA_FieldTargetDataType*)
UA_calloc(readerConfig.dataSetMetaData.fieldsSize, sizeof(UA_FieldTargetDataType));
for(size_t i = 0; i < readerConfig.dataSetMetaData.fieldsSize; i++) {
/* Variable to subscribe data */
UA_VariableAttributes vAttr = UA_VariableAttributes_default;
UA_LocalizedText_copy(&readerConfig.dataSetMetaData.fields[i].description,
&vAttr.description);
vAttr.displayName.locale = UA_STRING("en-US");
vAttr.displayName.text = readerConfig.dataSetMetaData.fields[i].name;
vAttr.dataType = readerConfig.dataSetMetaData.fields[i].dataType;
UA_NodeId newNode;
UA_Server_addVariableNode(server, UA_NODEID_NUMERIC(1, (UA_UInt32)i + 50000),
folderId, UA_NS0ID(HASCOMPONENT),
UA_QUALIFIEDNAME(1, (char *)readerConfig.dataSetMetaData.fields[i].name.data),
UA_NS0ID(BASEDATAVARIABLETYPE), vAttr, NULL, &newNode);
/* For creating Targetvariables */
targetVars[i].attributeId = UA_ATTRIBUTEID_VALUE;
targetVars[i].targetNodeId = newNode;
}
UA_Server_DataSetReader_createTargetVariables(server, dataSetReaderId,
readerConfig.dataSetMetaData.fieldsSize, targetVars);
for(size_t i = 0; i < readerConfig.dataSetMetaData.fieldsSize; i++)
UA_FieldTargetDataType_clear(&targetVars[i]);
UA_free(targetVars);
UA_free(readerConfig.dataSetMetaData.fields);
}
/* Get notified about specific PubSub state changes (including timeouts) */
static void
pubsubStateChangeCallback(UA_Server *server,
const UA_NodeId pubsubComponentId,
UA_PubSubState state,
UA_StatusCode code) {
if(!UA_NodeId_equal(&pubsubComponentId, &readerIdentifier))
return;
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"State of the Reader changed to '%i' with StatusCode %s",
state, UA_StatusCode_name(code));
}
static void
run(UA_String *transportProfile, UA_NetworkAddressUrlDataType *networkAddressUrl,
UA_Double messageReceiveTimeout) {
/* Return value initialized to Status Good */
UA_Server *server = UA_Server_new();
UA_ServerConfig *config = UA_Server_getConfig(server);
/* Provide a callback to get notifications of specific PubSub state changes
* or timeouts (e.g. subscriber MessageReceiveTimeout) */
config->pubSubConfig.stateChangeCallback = pubsubStateChangeCallback;
addPubSubConnection(server, transportProfile, networkAddressUrl);
addReaderGroup(server);
addDataSetReader(server, messageReceiveTimeout);
addSubscribedVariables(server, readerIdentifier);
UA_Server_enableAllPubSubComponents(server);
UA_Server_runUntilInterrupt(server);
UA_Server_delete(server);
}
static void
usage(char *progname) {
printf("usage: %s [uri [eth-interface]] [-messageReceiveTimeout <timeout_ms>]\n", progname);
}
int main(int argc, char **argv) {
UA_String transportProfile =
UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
UA_NetworkAddressUrlDataType networkAddressUrl =
{UA_STRING_NULL , UA_STRING("opc.udp://224.0.0.22:4840/")};
UA_Double messageReceiveTimeout = 0.0; /* Default: disabled */
for(int i = 1; i < argc; i++) {
if(strcmp(argv[i], "-h") == 0 ||
strcmp(argv[i], "--help") == 0) {
usage(argv[0]);
return EXIT_SUCCESS;
}
if(strncmp(argv[i], "opc.udp://", 10) == 0) {
networkAddressUrl.url = UA_STRING(argv[i]);
continue;
}
if(strncmp(argv[i], "opc.eth://", 10) == 0) {
if(i + 1 >= argc) {
printf("Error: ETH URI needs an interface name\n");
return EXIT_FAILURE;
}
transportProfile =
UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-eth-uadp");
networkAddressUrl.url = UA_STRING(argv[i]);
networkAddressUrl.networkInterface = UA_STRING(argv[i+1]);
i++;
continue;
}
if(strcmp(argv[argc-1], "-messageReceiveTimeout") == 0) {
if(i + 1 >= argc) {
printf("Error: Timeout value not given\n");
return EXIT_FAILURE;
}
messageReceiveTimeout = strtod(argv[i+1], NULL);
i++;
continue;
}
}
run(&transportProfile, &networkAddressUrl, messageReceiveTimeout);
return 0;
}