mirror of
https://github.com/open62541/open62541.git
synced 2025-06-03 04:00:21 +00:00
refactor(pubsub): Remove UA_Server_computeReaderGroupOffsetTable
This commit is contained in:
parent
c96be2a37f
commit
1dc3564c37
@ -190,22 +190,7 @@ int main(int argc, char **argv) {
|
||||
addDataSetReader(server);
|
||||
|
||||
/* Print the Offset Table */
|
||||
printf("ReaderGroup Offset Table\n");
|
||||
UA_PubSubOffsetTable ot;
|
||||
UA_Server_computeReaderGroupOffsetTable(server, readerGroupIdentifier, &ot);
|
||||
for(size_t i = 0; i < ot.offsetsSize; i++) {
|
||||
UA_String out = UA_STRING_NULL;
|
||||
UA_NodeId_print(&ot.offsets[i].component, &out);
|
||||
printf("%u:\tOffset %u\tOffsetType %u\tComponent %.*s\n",
|
||||
(unsigned)i, (unsigned)ot.offsets[i].offset,
|
||||
(unsigned)ot.offsets[i].offsetType,
|
||||
(int)out.length, out.data);
|
||||
UA_String_clear(&out);
|
||||
}
|
||||
|
||||
UA_PubSubOffsetTable_clear(&ot);
|
||||
|
||||
printf("\nDataSetReader Offset Table\n");
|
||||
UA_Server_computeDataSetReaderOffsetTable(server, readerIdentifier, &ot);
|
||||
for(size_t i = 0; i < ot.offsetsSize; i++) {
|
||||
UA_String out = UA_STRING_NULL;
|
||||
|
@ -1015,15 +1015,19 @@ UA_Server_computeWriterGroupOffsetTable(UA_Server *server,
|
||||
const UA_NodeId writerGroupId,
|
||||
UA_PubSubOffsetTable *ot);
|
||||
|
||||
/* Compute the offset table for a ReaderGroup */
|
||||
UA_EXPORT UA_StatusCode UA_THREADSAFE
|
||||
UA_Server_computeReaderGroupOffsetTable(UA_Server *server,
|
||||
const UA_NodeId readerGroupId,
|
||||
UA_PubSubOffsetTable *ot);
|
||||
/**
|
||||
* For ReaderGroups we cannot compute the offset table up front, because it is
|
||||
* not ensured that all Readers end up with their DataSetMessage in the same
|
||||
* NetworkMessage. Furthermore the ReaderGroup might receive messages from
|
||||
* multiple different publishers.
|
||||
*
|
||||
* Instead the offset tables are computed beforehand for each DataSetReader. At
|
||||
* runtime, use UA_NetworkMessage_decodeBinaryHeaders to decode the
|
||||
* NetworkMessage headers. The information therein (e.g. the MessageCount and
|
||||
* and the DataSetWriterIds) can then be used to iterate over the
|
||||
* DataSetMessages in the payload with their respective offset tables. */
|
||||
|
||||
/* Similar to _computeReaderGroupOffsetTable, but only computes the offsets
|
||||
* within the DataSetMessage for one DataSetReader. The offsets begin at zero
|
||||
* for the DataSetMessage. */
|
||||
/* The offsets begin at zero for the DataSetMessage */
|
||||
UA_EXPORT UA_StatusCode UA_THREADSAFE
|
||||
UA_Server_computeDataSetReaderOffsetTable(UA_Server *server,
|
||||
const UA_NodeId dataSetReaderId,
|
||||
|
@ -986,6 +986,9 @@ UA_Server_computeDataSetReaderOffsetTable(UA_Server *server,
|
||||
return res;
|
||||
}
|
||||
|
||||
/* Reset the OffsetTable */
|
||||
memset(ot, 0, sizeof(UA_PubSubOffsetTable));
|
||||
|
||||
/* Prepare the encoding context */
|
||||
UA_DataSetMessage_EncodingMetaData emd;
|
||||
memset(&emd, 0, sizeof(UA_DataSetMessage_EncodingMetaData));
|
||||
|
@ -1186,210 +1186,4 @@ UA_Server_updateReaderGroupConfig(UA_Server *server, const UA_NodeId rgId,
|
||||
return retval;
|
||||
}
|
||||
|
||||
static UA_StatusCode
|
||||
readerGroupGenerateNetworkMessage(UA_ReaderGroup *wg, UA_DataSetReader **dsr,
|
||||
UA_DataSetMessage *dsm, UA_Byte dsmCount,
|
||||
UA_ExtensionObject *messageSettings,
|
||||
UA_NetworkMessage *nm) {
|
||||
if(messageSettings->content.decoded.type != &UA_TYPES[UA_TYPES_UADPDATASETREADERMESSAGEDATATYPE])
|
||||
return UA_STATUSCODE_BADNOTSUPPORTED;
|
||||
|
||||
/* Set the header flags */
|
||||
UA_UadpDataSetReaderMessageDataType *dsrm =
|
||||
(UA_UadpDataSetReaderMessageDataType*)messageSettings->content.decoded.data;
|
||||
nm->publisherIdEnabled = ((u64)dsrm->networkMessageContentMask &
|
||||
(u64)UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID) != 0;
|
||||
nm->groupHeaderEnabled = ((u64)dsrm->networkMessageContentMask &
|
||||
(u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER) != 0;
|
||||
nm->groupHeader.writerGroupIdEnabled =
|
||||
((u64)dsrm->networkMessageContentMask &
|
||||
(u64)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID) != 0;
|
||||
nm->groupHeader.groupVersionEnabled =
|
||||
((u64)dsrm->networkMessageContentMask &
|
||||
(u64)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPVERSION) != 0;
|
||||
nm->groupHeader.networkMessageNumberEnabled =
|
||||
((u64)dsrm->networkMessageContentMask &
|
||||
(u64)UA_UADPNETWORKMESSAGECONTENTMASK_NETWORKMESSAGENUMBER) != 0;
|
||||
nm->groupHeader.sequenceNumberEnabled =
|
||||
((u64)dsrm->networkMessageContentMask &
|
||||
(u64)UA_UADPNETWORKMESSAGECONTENTMASK_SEQUENCENUMBER) != 0;
|
||||
nm->payloadHeaderEnabled = ((u64)dsrm->networkMessageContentMask &
|
||||
(u64)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER) != 0;
|
||||
nm->timestampEnabled = ((u64)dsrm->networkMessageContentMask &
|
||||
(u64)UA_UADPNETWORKMESSAGECONTENTMASK_TIMESTAMP) != 0;
|
||||
nm->picosecondsEnabled = ((u64)dsrm->networkMessageContentMask &
|
||||
(u64)UA_UADPNETWORKMESSAGECONTENTMASK_PICOSECONDS) != 0;
|
||||
nm->dataSetClassIdEnabled = ((u64)dsrm->networkMessageContentMask &
|
||||
(u64)UA_UADPNETWORKMESSAGECONTENTMASK_DATASETCLASSID) != 0;
|
||||
nm->promotedFieldsEnabled = ((u64)dsrm->networkMessageContentMask &
|
||||
(u64)UA_UADPNETWORKMESSAGECONTENTMASK_PROMOTEDFIELDS) != 0;
|
||||
|
||||
/* Set the NetworkMessage header */
|
||||
nm->version = 1;
|
||||
nm->networkMessageType = UA_NETWORKMESSAGE_DATASET;
|
||||
nm->publisherId = dsr[0]->config.publisherId;
|
||||
|
||||
/* Set the group header (use default sequence numbers) */
|
||||
nm->groupHeader.networkMessageNumber = 1;
|
||||
nm->groupHeader.sequenceNumber = 1;
|
||||
nm->groupHeader.groupVersion = dsrm->groupVersion;
|
||||
nm->groupHeader.writerGroupId = dsr[0]->config.writerGroupId;
|
||||
|
||||
/* TODO Security Header */
|
||||
|
||||
/* Set the DataSetWriterIds */
|
||||
for(size_t i = 0; i < dsmCount; i++) {
|
||||
UA_DataSetReader *d = dsr[i];
|
||||
nm->dataSetWriterIds[i] = d->config.dataSetWriterId;
|
||||
}
|
||||
|
||||
/* Set the payload information from the dsm */
|
||||
nm->payload.dataSetMessages = dsm;
|
||||
nm->messageCount = dsmCount;
|
||||
|
||||
return UA_STATUSCODE_GOOD;
|
||||
}
|
||||
|
||||
UA_StatusCode
|
||||
UA_Server_computeReaderGroupOffsetTable(UA_Server *server,
|
||||
const UA_NodeId readerGroupId,
|
||||
UA_PubSubOffsetTable *ot) {
|
||||
if(!server || !ot)
|
||||
return UA_STATUSCODE_BADINVALIDARGUMENT;
|
||||
|
||||
lockServer(server);
|
||||
|
||||
/* Get the ReaderGroup */
|
||||
UA_PubSubManager *psm = getPSM(server);
|
||||
UA_ReaderGroup *rg = (psm) ? UA_ReaderGroup_find(psm, readerGroupId) : NULL;
|
||||
if(!rg) {
|
||||
unlockServer(server);
|
||||
return UA_STATUSCODE_BADNOTFOUND;
|
||||
}
|
||||
|
||||
memset(ot, 0, sizeof(UA_PubSubOffsetTable));
|
||||
|
||||
/* Define variables here to allow the goto cleanup later on */
|
||||
size_t msgSize;
|
||||
size_t fieldindex = 0;
|
||||
UA_FieldTargetDataType *tv = NULL;
|
||||
|
||||
/* Prepare the encoding context */
|
||||
PubSubEncodeCtx ctx;
|
||||
memset(&ctx, 0, sizeof(PubSubEncodeCtx));
|
||||
ctx.ot = ot;
|
||||
|
||||
/* Add the encoding metadata for the DataSetMessages */
|
||||
size_t i = 0;
|
||||
UA_DataSetReader *dsr;
|
||||
UA_STACKARRAY(UA_DataSetMessage_EncodingMetaData, emd, rg->readersCount);
|
||||
memset(emd, 0, sizeof(UA_DataSetMessage_EncodingMetaData) * rg->readersCount);
|
||||
ctx.eo.metaData = emd;
|
||||
ctx.eo.metaDataSize = rg->readersCount;
|
||||
LIST_FOREACH(dsr, &rg->readers, listEntry) {
|
||||
emd[i].dataSetWriterId = dsr->config.dataSetWriterId;
|
||||
emd[i].fields = dsr->config.dataSetMetaData.fields;
|
||||
emd[i].fieldsSize = dsr->config.dataSetMetaData.fieldsSize;
|
||||
i++;
|
||||
}
|
||||
|
||||
UA_NetworkMessage networkMessage;
|
||||
memset(&networkMessage, 0, sizeof(UA_NetworkMessage));
|
||||
|
||||
UA_DataSetMessage dsmStore[UA_NETWORKMESSAGE_MAXMESSAGECOUNT];
|
||||
UA_DataSetReader *dsrStore[UA_NETWORKMESSAGE_MAXMESSAGECOUNT];
|
||||
memset(dsmStore, 0, sizeof(UA_DataSetMessage) * UA_NETWORKMESSAGE_MAXMESSAGECOUNT);
|
||||
|
||||
size_t dsmCount = 0;
|
||||
UA_StatusCode res = UA_STATUSCODE_GOOD;
|
||||
LIST_FOREACH(dsr, &rg->readers, listEntry) {
|
||||
if(dsmCount >= UA_NETWORKMESSAGE_MAXMESSAGECOUNT) {
|
||||
res = UA_STATUSCODE_BADENCODINGERROR;
|
||||
goto cleanup;
|
||||
}
|
||||
dsrStore[dsmCount] = dsr;
|
||||
res = UA_DataSetReader_generateDataSetMessage(server, &dsmStore[dsmCount], dsr);
|
||||
dsmCount++;
|
||||
if(res != UA_STATUSCODE_GOOD)
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* Generate the NetworkMessage */
|
||||
dsr = LIST_FIRST(&rg->readers);
|
||||
res = readerGroupGenerateNetworkMessage(rg, dsrStore, dsmStore, (UA_Byte) dsmCount,
|
||||
&dsr->config.messageSettings, &networkMessage);
|
||||
if(res != UA_STATUSCODE_GOOD)
|
||||
goto cleanup;
|
||||
|
||||
/* Compute the message length and generate the old format offset-table (done
|
||||
* inside calcSizeBinary) */
|
||||
msgSize = UA_NetworkMessage_calcSizeBinaryInternal(&ctx, &networkMessage);
|
||||
if(msgSize == 0) {
|
||||
res = UA_STATUSCODE_BADINTERNALERROR;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
/* Create the encoded network message */
|
||||
res = UA_NetworkMessage_encodeBinary(&networkMessage, &ot->networkMessage, &ctx.eo);
|
||||
if(res != UA_STATUSCODE_GOOD)
|
||||
goto cleanup;
|
||||
|
||||
/* Pick up the component NodeIds */
|
||||
dsr = NULL;
|
||||
for(size_t i = 0; i < ot->offsetsSize; i++) {
|
||||
UA_PubSubOffset *o = &ot->offsets[i];
|
||||
switch(o->offsetType) {
|
||||
case UA_PUBSUBOFFSETTYPE_NETWORKMESSAGE_SEQUENCENUMBER:
|
||||
case UA_PUBSUBOFFSETTYPE_NETWORKMESSAGE_TIMESTAMP:
|
||||
case UA_PUBSUBOFFSETTYPE_NETWORKMESSAGE_PICOSECONDS:
|
||||
case UA_PUBSUBOFFSETTYPE_NETWORKMESSAGE_GROUPVERSION:
|
||||
res |= UA_NodeId_copy(&rg->head.identifier, &o->component);
|
||||
break;
|
||||
case UA_PUBSUBOFFSETTYPE_DATASETMESSAGE:
|
||||
dsr = (dsr == NULL) ? LIST_FIRST(&rg->readers) : LIST_NEXT(dsr, listEntry);
|
||||
fieldindex = 0;
|
||||
/* fall through */
|
||||
case UA_PUBSUBOFFSETTYPE_DATASETMESSAGE_SEQUENCENUMBER:
|
||||
case UA_PUBSUBOFFSETTYPE_DATASETMESSAGE_STATUS:
|
||||
case UA_PUBSUBOFFSETTYPE_DATASETMESSAGE_TIMESTAMP:
|
||||
case UA_PUBSUBOFFSETTYPE_DATASETMESSAGE_PICOSECONDS:
|
||||
UA_assert(dsr);
|
||||
res |= UA_NodeId_copy(&dsr->head.identifier, &o->component);
|
||||
break;
|
||||
case UA_PUBSUBOFFSETTYPE_DATASETFIELD_DATAVALUE:
|
||||
UA_assert(dsr);
|
||||
tv = &dsr->config.subscribedDataSet.target.targetVariables[fieldindex];
|
||||
res |= UA_NodeId_copy(&tv->targetNodeId, &o->component);
|
||||
fieldindex++;
|
||||
break;
|
||||
case UA_PUBSUBOFFSETTYPE_DATASETFIELD_VARIANT:
|
||||
UA_assert(dsr);
|
||||
tv = &dsr->config.subscribedDataSet.target.targetVariables[fieldindex];
|
||||
res |= UA_NodeId_copy(&tv->targetNodeId, &o->component);
|
||||
fieldindex++;
|
||||
break;
|
||||
case UA_PUBSUBOFFSETTYPE_DATASETFIELD_RAW:
|
||||
UA_assert(dsr);
|
||||
tv = &dsr->config.subscribedDataSet.target.targetVariables[fieldindex];
|
||||
res |= UA_NodeId_copy(&tv->targetNodeId, &o->component);
|
||||
fieldindex++;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
/* Clean up and return */
|
||||
for(size_t i = 0; i < dsmCount; i++) {
|
||||
UA_DataSetMessage_clear(&dsmStore[i]);
|
||||
}
|
||||
|
||||
if(res != UA_STATUSCODE_GOOD)
|
||||
UA_PubSubOffsetTable_clear(ot);
|
||||
|
||||
unlockServer(server);
|
||||
return res;
|
||||
}
|
||||
|
||||
#endif /* UA_ENABLE_PUBSUB */
|
||||
|
@ -284,7 +284,7 @@ START_TEST(SubscriberOffsets) {
|
||||
/* Print the Offset Table */
|
||||
UA_PubSubOffsetTable ot;
|
||||
UA_StatusCode res =
|
||||
UA_Server_computeReaderGroupOffsetTable(server, readerGroupIdentifier, &ot);
|
||||
UA_Server_computeDataSetReaderOffsetTable(server, readerIdentifier, &ot);
|
||||
ck_assert_uint_eq(res, UA_STATUSCODE_GOOD);
|
||||
for(size_t i = 0; i < ot.offsetsSize; i++) {
|
||||
UA_String out = UA_STRING_NULL;
|
||||
|
Loading…
Reference in New Issue
Block a user