mirror of
https://github.com/open62541/open62541.git
synced 2025-06-03 04:00:21 +00:00
fix(pubsub): Use the external value source in the Reader
This commit is contained in:
parent
42fee7b6ea
commit
06da5d07bd
@ -775,8 +775,8 @@ DataSetReader_processRaw(UA_Server *server, UA_ReaderGroup *rg,
|
||||
msg->data.keyFrameData.fieldCount = (UA_UInt16)
|
||||
dsr->config.dataSetMetaData.fieldsSize;
|
||||
|
||||
/* Start iteration from beginning of rawFields buffer */
|
||||
size_t offset = 0;
|
||||
/* start iteration from beginning of rawFields buffer */
|
||||
msg->data.keyFrameData.rawFields.length = 0;
|
||||
for(size_t i = 0; i < dsr->config.dataSetMetaData.fieldsSize; i++) {
|
||||
/* TODO The datatype reference should be part of the internal
|
||||
@ -791,10 +791,12 @@ DataSetReader_processRaw(UA_Server *server, UA_ReaderGroup *rg,
|
||||
&offset, value, type, NULL);
|
||||
if(dsr->config.dataSetMetaData.fields[i].maxStringLength != 0) {
|
||||
if(type->typeKind == UA_DATATYPEKIND_STRING ||
|
||||
type->typeKind == UA_DATATYPEKIND_BYTESTRING) {
|
||||
type->typeKind == UA_DATATYPEKIND_BYTESTRING) {
|
||||
UA_ByteString *bs = (UA_ByteString *) value;
|
||||
//check if length < maxStringLength, The types ByteString and String are equal in their base definition
|
||||
size_t lengthDifference = dsr->config.dataSetMetaData.fields[i].maxStringLength - bs->length;
|
||||
/* Check if length < maxStringLength, The types ByteString and
|
||||
* String are equal in their base definition */
|
||||
size_t lengthDifference =
|
||||
dsr->config.dataSetMetaData.fields[i].maxStringLength - bs->length;
|
||||
offset += lengthDifference;
|
||||
}
|
||||
}
|
||||
@ -808,25 +810,16 @@ DataSetReader_processRaw(UA_Server *server, UA_ReaderGroup *rg,
|
||||
UA_FieldTargetVariable *tv =
|
||||
&dsr->config.subscribedDataSet.subscribedDataSetTarget.targetVariables[i];
|
||||
|
||||
if(rg->config.rtLevel == UA_PUBSUB_RT_FIXED_SIZE) {
|
||||
if (tv->beforeWrite) {
|
||||
void *pData = (**tv->externalDataValue).value.data;
|
||||
(**tv->externalDataValue).value.data = value; // set raw data as "preview"
|
||||
tv->beforeWrite(server,
|
||||
&dsr->identifier,
|
||||
&dsr->linkedReaderGroup,
|
||||
&dsr->config.subscribedDataSet.subscribedDataSetTarget.targetVariables[i].targetVariable.targetNodeId,
|
||||
dsr->config.subscribedDataSet.subscribedDataSetTarget.targetVariables[i].targetVariableContext,
|
||||
tv->externalDataValue);
|
||||
(**tv->externalDataValue).value.data = pData; // restore previous data pointer
|
||||
}
|
||||
memcpy((**tv->externalDataValue).value.data, value, type->memSize);
|
||||
if(tv->afterWrite)
|
||||
tv->afterWrite(server, &dsr->identifier,
|
||||
&dsr->linkedReaderGroup,
|
||||
if(tv->beforeWrite || tv->externalDataValue) {
|
||||
if(tv->beforeWrite)
|
||||
tv->beforeWrite(server, &dsr->identifier, &dsr->linkedReaderGroup,
|
||||
&tv->targetVariable.targetNodeId,
|
||||
tv->targetVariableContext,
|
||||
tv->externalDataValue);
|
||||
tv->targetVariableContext, tv->externalDataValue);
|
||||
memcpy((*tv->externalDataValue)->value.data, value, type->memSize);
|
||||
if(tv->afterWrite)
|
||||
tv->afterWrite(server, &dsr->identifier, &dsr->linkedReaderGroup,
|
||||
&tv->targetVariable.targetNodeId,
|
||||
tv->targetVariableContext, tv->externalDataValue);
|
||||
continue; /* No dynamic allocation for fixed-size msg, no need to _clear */
|
||||
}
|
||||
|
||||
@ -847,151 +840,118 @@ DataSetReader_processRaw(UA_Server *server, UA_ReaderGroup *rg,
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
DataSetReader_processFixedSize(UA_Server *server, UA_ReaderGroup *rg,
|
||||
UA_DataSetReader *dsr, UA_DataSetMessage *msg,
|
||||
size_t fieldCount) {
|
||||
for(size_t i = 0; i < fieldCount; i++) {
|
||||
if(!msg->data.keyFrameData.dataSetFields[i].hasValue)
|
||||
continue;
|
||||
|
||||
UA_FieldTargetVariable *tv =
|
||||
&dsr->config.subscribedDataSet.subscribedDataSetTarget.targetVariables[i];
|
||||
if(tv->targetVariable.attributeId != UA_ATTRIBUTEID_VALUE)
|
||||
continue;
|
||||
|
||||
if(msg->data.keyFrameData.dataSetFields[i].value.type !=
|
||||
(*tv->externalDataValue)->value.type) {
|
||||
UA_LOG_WARNING_READER(server->config.logging, dsr,
|
||||
"Mismatching type");
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tv->beforeWrite) {
|
||||
UA_DataValue *tmp = &msg->data.keyFrameData.dataSetFields[i];
|
||||
tv->beforeWrite(server,
|
||||
&dsr->identifier,
|
||||
&dsr->linkedReaderGroup,
|
||||
&dsr->config.subscribedDataSet.subscribedDataSetTarget.targetVariables[i].targetVariable.targetNodeId,
|
||||
dsr->config.subscribedDataSet.subscribedDataSetTarget.targetVariables[i].targetVariableContext,
|
||||
&tmp);
|
||||
}
|
||||
if(UA_LIKELY(tv->externalDataValue != NULL)) {
|
||||
memcpy((**tv->externalDataValue).value.data,
|
||||
msg->data.keyFrameData.dataSetFields[i].value.data,
|
||||
msg->data.keyFrameData.dataSetFields[i].value.type->memSize);
|
||||
}
|
||||
if(tv->afterWrite)
|
||||
tv->afterWrite(server, &dsr->identifier, &dsr->linkedReaderGroup,
|
||||
&tv->targetVariable.targetNodeId,
|
||||
tv->targetVariableContext, tv->externalDataValue);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
UA_DataSetReader_process(UA_Server *server, UA_ReaderGroup *rg,
|
||||
UA_DataSetReader *dsr, UA_DataSetMessage *msg) {
|
||||
if(!dsr || !rg || !msg || !server)
|
||||
return;
|
||||
|
||||
/* Check the metadata, to see if this reader is configured for a heartbeat */
|
||||
if(dsr->config.dataSetMetaData.fieldsSize == 0 &&
|
||||
dsr->config.dataSetMetaData.configurationVersion.majorVersion == 0 &&
|
||||
dsr->config.dataSetMetaData.configurationVersion.minorVersion == 0) {
|
||||
/* Expecting a heartbeat, check the message */
|
||||
if(msg->header.dataSetMessageType != UA_DATASETMESSAGE_DATAKEYFRAME ||
|
||||
msg->header.configVersionMajorVersion != 0 ||
|
||||
msg->header.configVersionMinorVersion != 0 ||
|
||||
msg->data.keyFrameData.fieldCount != 0) {
|
||||
UA_LOG_INFO_READER(server->config.logging, dsr,
|
||||
"This DSR expects heartbeat, but the received "
|
||||
"message doesn't seem to be so.");
|
||||
}
|
||||
UA_LOG_DEBUG_READER(server->config.logging, dsr, "Received a network message");
|
||||
|
||||
#ifdef UA_ENABLE_PUBSUB_MONITORING
|
||||
UA_DataSetReader_checkMessageReceiveTimeout(server, dsr);
|
||||
UA_DataSetReader_checkMessageReceiveTimeout(server, dsr);
|
||||
#endif
|
||||
dsr->lastHeartbeatReceived = UA_DateTime_nowMonotonic();
|
||||
|
||||
if(dsr->state != UA_PUBSUBSTATE_OPERATIONAL &&
|
||||
dsr->state != UA_PUBSUBSTATE_PREOPERATIONAL) {
|
||||
UA_LOG_WARNING_READER(server->config.logging, dsr,
|
||||
"Received a network message but not operational");
|
||||
return;
|
||||
}
|
||||
|
||||
UA_LOG_DEBUG_READER(server->config.logging, dsr,
|
||||
"DataSetReader '%.*s': received a network message",
|
||||
(int)dsr->config.name.length, dsr->config.name.data);
|
||||
|
||||
if(!msg->header.dataSetMessageValid) {
|
||||
UA_LOG_INFO_READER(server->config.logging, dsr,
|
||||
"DataSetMessage is discarded: message is not valid");
|
||||
/* To Do check ConfigurationVersion */
|
||||
/* if(msg->header.configVersionMajorVersionEnabled) {
|
||||
* if(msg->header.configVersionMajorVersion !=
|
||||
* dsr->config.dataSetMetaData.configurationVersion.majorVersion) {
|
||||
* UA_LOG_WARNING(server->config.logging, UA_LOGCATEGORY_SERVER,
|
||||
* "DataSetMessage is discarded: ConfigurationVersion "
|
||||
* "MajorVersion does not match");
|
||||
* return;
|
||||
* }
|
||||
* } */
|
||||
return;
|
||||
}
|
||||
|
||||
/* TODO: Check ConfigurationVersion */
|
||||
/* if(msg->header.configVersionMajorVersionEnabled) {
|
||||
* if(msg->header.configVersionMajorVersion !=
|
||||
* dsr->config.dataSetMetaData.configurationVersion.majorVersion) {
|
||||
* UA_LOG_WARNING(server->config.logging, UA_LOGCATEGORY_SERVER,
|
||||
* "DataSetMessage is discarded: ConfigurationVersion "
|
||||
* "MajorVersion does not match");
|
||||
* return;
|
||||
* }
|
||||
* } */
|
||||
|
||||
if(msg->header.dataSetMessageType != UA_DATASETMESSAGE_DATAKEYFRAME) {
|
||||
UA_LOG_WARNING_READER(server->config.logging, dsr,
|
||||
"DataSetMessage is discarded: Only keyframes are supported");
|
||||
return;
|
||||
}
|
||||
|
||||
/* Process message with raw encoding (realtime and non-realtime) */
|
||||
/* Process message with raw encoding. We have no field-count information for
|
||||
* the message. */
|
||||
if(msg->header.fieldEncoding == UA_FIELDENCODING_RAWDATA) {
|
||||
DataSetReader_processRaw(server, rg, dsr, msg);
|
||||
#ifdef UA_ENABLE_PUBSUB_MONITORING
|
||||
UA_DataSetReader_checkMessageReceiveTimeout(server, dsr);
|
||||
#endif
|
||||
return;
|
||||
}
|
||||
|
||||
/* Check and adjust the field count
|
||||
* TODO Throw an error if non-matching? */
|
||||
/* Received a heartbeat with no fields */
|
||||
if(msg->data.keyFrameData.fieldCount == 0) {
|
||||
dsr->lastHeartbeatReceived = UA_DateTime_nowMonotonic();
|
||||
return;
|
||||
}
|
||||
|
||||
/* Check whether the field count matches the configuration */
|
||||
size_t fieldCount = msg->data.keyFrameData.fieldCount;
|
||||
if(dsr->config.dataSetMetaData.fieldsSize < fieldCount)
|
||||
fieldCount = dsr->config.dataSetMetaData.fieldsSize;
|
||||
|
||||
if(dsr->config.subscribedDataSet.subscribedDataSetTarget.targetVariablesSize < fieldCount)
|
||||
fieldCount = dsr->config.subscribedDataSet.subscribedDataSetTarget.targetVariablesSize;
|
||||
|
||||
/* Process message with fixed size fields (realtime capable) */
|
||||
if(rg->config.rtLevel == UA_PUBSUB_RT_FIXED_SIZE) {
|
||||
DataSetReader_processFixedSize(server, rg, dsr, msg, fieldCount);
|
||||
#ifdef UA_ENABLE_PUBSUB_MONITORING
|
||||
UA_DataSetReader_checkMessageReceiveTimeout(server, dsr);
|
||||
#endif
|
||||
if(dsr->config.dataSetMetaData.fieldsSize != fieldCount) {
|
||||
UA_LOG_WARNING_READER(server->config.logging, dsr,
|
||||
"Number of fields does not match the DataSetMetaData configuration");
|
||||
return;
|
||||
}
|
||||
|
||||
/* Write the message fields via the write service (non realtime) */
|
||||
if(dsr->config.subscribedDataSet.subscribedDataSetTarget.targetVariablesSize != fieldCount) {
|
||||
UA_LOG_WARNING_READER(server->config.logging, dsr,
|
||||
"Number of fields does not match the TargetVariables configuration");
|
||||
return;
|
||||
}
|
||||
|
||||
/* Write the message fields. RT has the external data value configured. */
|
||||
UA_StatusCode res = UA_STATUSCODE_GOOD;
|
||||
for(size_t i = 0; i < fieldCount; i++) {
|
||||
if(!msg->data.keyFrameData.dataSetFields[i].hasValue)
|
||||
UA_DataValue *field = &msg->data.keyFrameData.dataSetFields[i];
|
||||
if(!field->hasValue)
|
||||
continue;
|
||||
|
||||
UA_FieldTargetVariable *tv =
|
||||
&dsr->config.subscribedDataSet.subscribedDataSetTarget.targetVariables[i];
|
||||
|
||||
/* RT-path: write directly into the target memory */
|
||||
if(tv->externalDataValue) {
|
||||
if(field->value.type != (*tv->externalDataValue)->value.type) {
|
||||
UA_LOG_WARNING_READER(server->config.logging, dsr, "Mismatching type");
|
||||
continue;
|
||||
}
|
||||
|
||||
if(tv->beforeWrite)
|
||||
tv->beforeWrite(server, &dsr->identifier, &dsr->linkedReaderGroup,
|
||||
&tv->targetVariable.targetNodeId,
|
||||
tv->targetVariableContext, tv->externalDataValue);
|
||||
memcpy((*tv->externalDataValue)->value.data,
|
||||
field->value.data, field->value.type->memSize);
|
||||
if(tv->afterWrite)
|
||||
tv->afterWrite(server, &dsr->identifier, &dsr->linkedReaderGroup,
|
||||
&tv->targetVariable.targetNodeId,
|
||||
tv->targetVariableContext, tv->externalDataValue);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Write via the Write-Service */
|
||||
UA_WriteValue writeVal;
|
||||
UA_WriteValue_init(&writeVal);
|
||||
writeVal.attributeId = tv->targetVariable.attributeId;
|
||||
writeVal.indexRange = tv->targetVariable.receiverIndexRange;
|
||||
writeVal.nodeId = tv->targetVariable.targetNodeId;
|
||||
writeVal.value = msg->data.keyFrameData.dataSetFields[i];
|
||||
writeVal.value = *field;
|
||||
Operation_Write(server, &server->adminSession, NULL, &writeVal, &res);
|
||||
if(res != UA_STATUSCODE_GOOD)
|
||||
UA_LOG_INFO_READER(server->config.logging, dsr,
|
||||
"Error writing KeyFrame field %u: %s",
|
||||
(unsigned)i, UA_StatusCode_name(res));
|
||||
}
|
||||
|
||||
#ifdef UA_ENABLE_PUBSUB_MONITORING
|
||||
UA_DataSetReader_checkMessageReceiveTimeout(server, dsr);
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef UA_ENABLE_PUBSUB_MONITORING
|
||||
|
Loading…
Reference in New Issue
Block a user