/* * LinkCommunication.c * * Created by Stephen Norum. * Copyright 2007 Stanford Linear Accelerator Center. All rights reserved. * */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "mpsCommon.h" #include "mpsConstantsAndTypes.h" #include "mpsLinkProtocol.h" #include "mpsLinkProtocolV2.h" #include "mpsLinkCommunication.h" #include "privateMpsError.h" /* Local global variables */ static int linkNodeSocket = -1; static epicsMutexId mpsMsgRcvdCallbackMutex = 0; static MPSLP_MSG_RCVD_CALLBACK msgRcvdCallback = NULL; static void *msgRcvdCallbackContext = NULL; static epicsUInt8 receiveTimeoutCallback = FALSE; static epicsUInt8 receiveInvalidMessageCallback = FALSE; enum commonErrorsEnum { mutex_not_created = 0, invalid_protocol_version = 1, invalid_message_type = 2, null_data_pointer = 3, null_header_pointer = 4, commonErrorsCount }; static char * commonErrors[commonErrorsCount] = { "mutex not created", "invalid protocol version", "invalid message type", "data pointer points to NULL", "header pointer points to NULL" }; static const char *linkProcessorIpString = "192.168.0.1"; static uint32_t linkProcessorIp = -1; #if 0 #pragma mark - #pragma mark Callback Function #endif /**************************************************************************************** * Callback Function Accessors ***************************************************************************************/ /* Note: These functions are not available inside callback function */ int MPSLPSetMpsMsgRcvdCallback(MPSLP_MSG_RCVD_CALLBACK function, void *context) { if (mpsMsgRcvdCallbackMutex == 0) { mpsSetLastErrorString(commonErrors[mutex_not_created]); return MPS_ERROR; } epicsMutexLock(mpsMsgRcvdCallbackMutex); msgRcvdCallback = function; msgRcvdCallbackContext = context; epicsMutexUnlock(mpsMsgRcvdCallbackMutex); return MPS_SUCCESS; } int MPSLPGetMpsMsgRcvdCallback(MPSLP_MSG_RCVD_CALLBACK *function, void **context) { if (mpsMsgRcvdCallbackMutex == 0) { mpsSetLastErrorString(commonErrors[mutex_not_created]); return MPS_ERROR; } epicsMutexLock(mpsMsgRcvdCallbackMutex); *function = msgRcvdCallback; *context = msgRcvdCallbackContext; epicsMutexUnlock(mpsMsgRcvdCallbackMutex); return MPS_SUCCESS; } int MPSLPSetReceiveTimeoutCallback(epicsUInt8 state) { if (mpsMsgRcvdCallbackMutex == 0) { mpsSetLastErrorString(commonErrors[mutex_not_created]); return MPS_ERROR; } epicsMutexLock(mpsMsgRcvdCallbackMutex); receiveTimeoutCallback = state; epicsMutexUnlock(mpsMsgRcvdCallbackMutex); return MPS_SUCCESS; } epicsUInt8 MPSLPGetReceiveTimeoutCallback() { epicsUInt8 state; if (mpsMsgRcvdCallbackMutex == 0) { mpsSetLastErrorString(commonErrors[mutex_not_created]); return MPS_ERROR; } epicsMutexLock(mpsMsgRcvdCallbackMutex); state = receiveTimeoutCallback; epicsMutexUnlock(mpsMsgRcvdCallbackMutex); return state; } int MPSLPSetReceiveInvalidMessageCallback(epicsUInt8 state) { if (mpsMsgRcvdCallbackMutex == 0) { mpsSetLastErrorString(commonErrors[mutex_not_created]); return MPS_ERROR; } epicsMutexLock(mpsMsgRcvdCallbackMutex); receiveInvalidMessageCallback= state; epicsMutexUnlock(mpsMsgRcvdCallbackMutex); return MPS_SUCCESS; } epicsUInt8 MPSLPGetReceiveInvalidMessageCallback() { epicsUInt8 state; if (mpsMsgRcvdCallbackMutex == 0) { mpsSetLastErrorString(commonErrors[mutex_not_created]); return MPS_ERROR; } epicsMutexLock(mpsMsgRcvdCallbackMutex); state = receiveInvalidMessageCallback; epicsMutexUnlock(mpsMsgRcvdCallbackMutex); return state; } #if 0 #pragma mark - #pragma mark Message Parsing #endif /**************************************************************************************** * Message Parsing ***************************************************************************************/ /* Returns whether or not a packet contains a valid message. * Sets message->header, data, protocolVersion, and type. */ int MPSLPParseHeader(MPSLPMessage *message) { epicsUInt8 valid = FALSE; /* Initialize message's data */ message->data = NULL; message->protocolVersion = MPSLinkProtocolVersionInvalid; message->type = MPSMessageTypeInvalid; if (message->header == NULL) { mpsSetLastErrorString(commonErrors[null_header_pointer]); return MPS_ERROR; } /* Check protocol version validity. * Only accept the current MLP version, version 3, as valid. */ message->protocolVersion = LINK_MESSAGE_PROTOCOL_VERSION(message->header); switch (message->protocolVersion) { case MPSLinkProtocolVersion1: case MPSLinkProtocolVersion2: valid = FALSE; break; case MPSLinkProtocolVersion3: valid = TRUE; break; default: valid = FALSE; break; } if (!(valid)) { mpsSetLastErrorString(commonErrors[invalid_protocol_version]); return MPS_ERROR; } /* Check message type validity. * Devices that are not Link Nodes and Link Processors * should only receive synchronization and permit messages. */ message->type = LINK_MESSAGE_TYPE(message->header); switch (message->type) { case MPSLinkProcessorMessageTypeSynchronization: case MPSLinkProcessorMessageTypePermit: valid = TRUE; break; default: valid = FALSE; break; } if (!(valid)) { mpsSetLastErrorString(commonErrors[invalid_message_type]); return MPS_ERROR; } message->data = LINK_MESSAGE_DATA_SECTION_POINTER(message->header); return MPS_SUCCESS; } int MPSLPParsePermitMessage(MPSLPMessagePermit *message) { int i; epicsUInt8 *permitArray; epicsUInt8 *maxRateArray; epicsInt8 *data = message->message.data; if (message->message.type != MPSLinkProcessorMessageTypePermit) { mpsSetLastErrorString(commonErrors[invalid_message_type]); return MPS_ERROR; } if (data == NULL) { mpsSetLastErrorString(commonErrors[null_data_pointer]); return MPS_ERROR; } permitArray = PERMIT_MITIGATION_PERMIT_POINTER(data); maxRateArray = PERMIT_MAX_RATE_AFTER_DEVICE_POINTER(data); message->timestamp.secPastEpoch = ntohl(PERMIT_TIMESTAMP_SECPASTEPOCH(data)); message->timestamp.nsec = ntohl(PERMIT_TIMESTAMP_NSEC_PULSEID(data)); message->timeslot = PERMIT_TIMESLOT(data); message->destination = PERMIT_DESTINATION(data); message->estimatedBeamRate = PERMIT_ESTIMATED_RATE(data); message->outputCard = PERMIT_OUTPUT_CARD_CONTROL(data); for (i = 0; i < MPSMitigationDeviceCount; i++) { message->beamPermitted[i] = permitArray[i]; message->rateAfterDevice[i] = maxRateArray[i]; } return MPS_SUCCESS; } int MPSLPParseSynchronizationMessage(MPSLPMessageSync *message) { epicsInt8 *data = message->message.data; if (message->message.type != MPSLinkProcessorMessageTypeSynchronization) { mpsSetLastErrorString(commonErrors[invalid_message_type]); return MPS_ERROR; } if (data == NULL) { mpsSetLastErrorString(commonErrors[null_data_pointer]); return MPS_ERROR; } message->timestamp.secPastEpoch = ntohl(LINK_SYNC_TIMESTAMP_SECPASTEPOCH(data)); message->timestamp.nsec = ntohl(LINK_SYNC_TIMESTAMP_NSEC_PULSEID(data)); message->timeslot = LINK_SYNC_TIMESLOT(data); return MPS_SUCCESS; } int MPSLPParseMessage(MPSLPMessage *message) { int status; /* Call to MPSLPParseHeader is likely a duplicate, but allows function * to be called with fewer restrictions on message's contents. */ status = MPSLPParseHeader(message); if (status != MPS_SUCCESS) { return status; } switch (message->type) { case MPSLinkProcessorMessageTypePermit: status = MPSLPParsePermitMessage((MPSLPMessagePermit *) message); break; case MPSLinkProcessorMessageTypeSynchronization: status = MPSLPParseSynchronizationMessage((MPSLPMessageSync *) message); break; default: mpsSetLastErrorString(commonErrors[invalid_message_type]); status = MPS_ERROR; break; } return status; } #if 0 #pragma mark - #pragma mark Message Sending #endif /**************************************************************************************** * Message Sending ***************************************************************************************/ int MPSMessageSendToLinkProcessor(const void *data, int length) { int err; err = udpCommSend(linkNodeSocket, (void *) data, length); return err; } int MPSMessageSendTestMessage() { int err; unsigned char data[2] = {0x01, 0xFF}; err = MPSMessageSendToLinkProcessor((void *) data, sizeof(data)/sizeof(data[0])); return err; } #if 0 #pragma mark - #pragma mark Message Receiving #endif /**************************************************************************************** * Message Receiving ***************************************************************************************/ static void mpsMessageReceiverThread(void *arg) { static int status = MPS_ERROR; static UdpCommPkt packet = NULL; static MPSLPMessageTypeAny message; static MPSLPMessage *stdmsg; stdmsg = &(message.standard); while(1) { /* Receive the data (blocking with timeout) */ packet = udpCommRecvFrom(linkNodeSocket, MPSLPReceiverTimeoutInMs, (uint32_t *) &(stdmsg->senderIp), (uint16_t *) &(stdmsg->senderPort)); stdmsg->senderIp = ntohl(stdmsg->senderIp); stdmsg->senderPort = ntohs(stdmsg->senderPort); epicsMutexLock(mpsMsgRcvdCallbackMutex); if (packet == NULL) { /* Haven't received a packet in over MPSLPReceiverTimeoutInMs ms */ if ((receiveTimeoutCallback) && (msgRcvdCallback != NULL)) { msgRcvdCallback(MPSLPMsgRcvdCallbackStatusTimeout, NULL, msgRcvdCallbackContext); } goto finish_msg_receiver; } /* Determine if the message is valid */ stdmsg->header = udpCommBufPtr(packet); status = MPSLPParseHeader(stdmsg); if (status != MPS_SUCCESS) { if ((receiveInvalidMessageCallback) && (msgRcvdCallback != NULL)) { msgRcvdCallback(MPSLPMsgRcvdCallbackStatusInvalidMessage, stdmsg, msgRcvdCallbackContext); } goto finish_msg_receiver; } /* Valid Message */ if (msgRcvdCallback != NULL) { msgRcvdCallback(MPSLPMsgRcvdCallbackStatusValidMessage, stdmsg, msgRcvdCallbackContext); } finish_msg_receiver: epicsMutexUnlock(mpsMsgRcvdCallbackMutex); if (packet != NULL) { udpCommFreePacket(packet); } } /* Should never reach this point */ cantProceed("mpsMessageReceiverThread: left infinite loop"); } #if 0 #pragma mark - #pragma mark Initialization #endif /**************************************************************************************** * Initialization ***************************************************************************************/ int MPSLPInitializeCommunication() { static int hasInitialized = FALSE; int err; if (hasInitialized) { return MPS_SUCCESS; } err = mpsInitializeError(); if (err != MPS_SUCCESS) { return err; } mpsMsgRcvdCallbackMutex = epicsMutexCreate(); if (mpsMsgRcvdCallbackMutex == 0) { mpsSetLastErrorString(commonErrors[mutex_not_created]); return MPS_ERROR; } /* Create socket for link node communication */ linkNodeSocket = udpCommSocket(MPSLinkProtocolPort); if (linkNodeSocket < 0) { mpsSetLastErrorString("could not create linkNodeSocket"); return linkNodeSocket; } /* Setup messages to be sent to and received from Link Processor */ linkProcessorIp = inet_addr(linkProcessorIpString); err = udpCommConnect(linkNodeSocket, linkProcessorIp, MPSLinkProtocolPort); if (err != 0) { mpsSetLastErrorString("could not connect socket to Link Processor"); return MPS_ERROR; } hasInitialized = TRUE; return MPS_SUCCESS; } int MPSLPStartReceiveThread(unsigned int receiverEpicsThreadPriority) { int err; static int hasInitialized = FALSE; if (hasInitialized) { return MPS_SUCCESS; } /* Start thread to receive data from Link Nodes */ err = (int) epicsThreadCreate("mpsCommMessageReceiverThread", receiverEpicsThreadPriority, epicsThreadGetStackSize(epicsThreadStackMedium), (EPICSTHREADFUNC) mpsMessageReceiverThread, NULL); if (err == 0) { mpsSetLastErrorString("could not create mpsCommMessageReceiverThread"); return MPS_ERROR_THREAD_CREATION_FAILED; } hasInitialized = TRUE; return MPS_SUCCESS; } #if 0 #pragma mark - #pragma mark EPICS Function Export #endif /**************************************************************************************** * EPICS Function Export ***************************************************************************************/ epicsRegisterFunction(MPSLPSetMpsMsgRcvdCallback); epicsRegisterFunction(MPSLPGetMpsMsgRcvdCallback); epicsRegisterFunction(MPSLPSetReceiveTimeoutCallback); epicsRegisterFunction(MPSLPGetReceiveTimeoutCallback); epicsRegisterFunction(MPSLPSetReceiveInvalidMessageCallback); epicsRegisterFunction(MPSLPGetReceiveInvalidMessageCallback); epicsRegisterFunction(MPSLPParseHeader); epicsRegisterFunction(MPSLPParsePermitMessage); epicsRegisterFunction(MPSLPParseSynchronizationMessage); epicsRegisterFunction(MPSLPParseMessage); epicsRegisterFunction(MPSLPInitializeCommunication);