|
- /*!
- * \file core.c
- * Core messenger functionality
- * \author Christos Choutouridis AEM:8997 <cchoutou@ece.auth.gr>
- */
-
-
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <stdarg.h>
-
- #include <pthread.h>
-
- #include "core.h"
-
- //! Global data
- //! @{
- msgList_t msgList; //!< The message list for our application.
- //! @}
-
- /*
- * Local data types
- */
- static pthread_mutex_t lock_msgList; //!< mutex for msgList locking
- static pthread_mutex_t lock_stderr; //!< mutex for stderr locking
- static pthread_mutex_t lock_stdout; //!< mutex for stderr locking
- static pthread_mutex_t lock_stats; //!< mutex for stats locking
-
- //! Helper API
- //! @{
-
- //! Macro to create a in_addr_t
- #define _ADHOC_SUBNET(A, B, C, D) (((A)<<24) | ((B)<<16) | ((C)<<8) | (D))
-
- /*!
- * in_addr_t to devAEM_t conversion utility function
- * @param in_addr Internet address (host byte order)
- * @return devAEM_t representation of the address
- * @example
- * @code{.c}
- * devAEM_t dev = addr2devAEM (ntohl(clntAddr.sin_addr.s_addr));
- * @endcode
- */
- devAEM_t addr2devAEM (in_addr_t in_addr) {
- return (in_addr & 0x000000FF) + ((in_addr >> 8) & 0x000000FF) * 100;
- }
-
- /*!
- * devAEM_t to in_addr_t conversion utility function
- * @param dev devAEM_t address
- * @return Internet address (host byte order)
- * @example
- * @code{.c}
- * sockaddr_in_t srvAddr;
- * srvAddr.sin_addr.s_addr = htonl (devAEM2addr (dev));
- * @endcode
- */
- in_addr_t devAEM2addr (devAEM_t dev) {
- uint32_t add = _ADHOC_SUBNET (ADHOC_NET_A, ADHOC_NET_B, ADHOC_NET_C, ADHOC_NET_D);
- add |= (dev % 100) & 0x000000FF;
- add |= ((dev / 100) & 0x000000FF) << 8;
- return add;
- }
-
- /*!
- * DevIP_t to devAEM_t conversion utility function
- * @param ip pointer to devIP_t type address
- * @return devAEM_t representation of the address
- * @note
- * We discard the first 2 fields
- */
- devAEM_t ip2AEM (devIP_t* ip) {
- return ip->C*100 + ip->D;
- }
-
- /*!
- * devAEM_t to DevIP_t conversion utility function
- * @param dev devAEM_t representation of the address
- * @return devIP_t type address
- */
- devIP_t AEM2ip (devAEM_t dev) {
- devIP_t ip = {
- .A =ADHOC_NET_A, .B=ADHOC_NET_B, .C=dev/100, .D=dev%100
- };
- return ip;
- }
- //! @}
-
- /*
- * Log related local data
- */
- static char_t* _frm_msg_in = "In from dev=%d, message: from=%d, to=%d, timestamp=%lld, text=%s\n";
- static char_t* _frm_msg_out = "Out to dev=%d, message: from=%d, to=%d, timestamp=%lld, text=%s\n";
- static char_t* _frm_msg_new = "New message: from=%d, to=%d, timestamp=%lld, text=%s\n";
-
- //! log API
- //! @{
-
- #define _HEAD_SIZE 25
-
- /*!
- * Initialize log functionality
- * @return The status of the operation
- */
- status_t log_init (void) {
- // Try to initialize pthreads for logging
- if (pthread_mutex_init(&lock_stderr, NULL) != 0) {
- fprintf (stderr, "Error %s: mutex init has failed\n", __FUNCTION__ );
- return MSG_ERROR;
- }
- if (pthread_mutex_init(&lock_stdout, NULL) != 0) {
- fprintf (stderr, "Error %s: mutex init has failed\n", __FUNCTION__ );
- return MSG_ERROR;
- }
- return MSG_OK;
- }
-
- /*!
- * Logs incoming messages
- * @param msg Pointer to msg to log
- * @note
- * This function depends on \sa settings.outLevel
- */
- void log_msg_in (msg_t* msg) {
- if (settings.outLevel >= OUTLEVEL_1) {
- pthread_mutex_lock(&lock_stdout); // lock stdout
- fprintf (stdout, _frm_msg_in, // print and flush
- msg->sender,
- msg->cMsg.from,
- msg->cMsg.to,
- msg->cMsg.ts,
- msg->cMsg.text
- );
- fflush(stdout);
- pthread_mutex_unlock(&lock_stdout); // unlock stdout
- }
- }
-
- /*!
- * Logs outgoing messages
- * @param msg Pointer to msg to log
- * @param dev device that accepts the message
- * @note
- * This function depends on \sa settings.outLevel
- */
- void log_msg_out (msg_t* msg, devAEM_t dev) {
- if (settings.outLevel >= OUTLEVEL_1) {
- pthread_mutex_lock(&lock_stdout); // lock stdout
- fprintf (stdout, _frm_msg_out, // print and flush
- dev,
- msg->cMsg.from,
- msg->cMsg.to,
- msg->cMsg.ts,
- msg->cMsg.text
- );
- fflush(stdout);
- pthread_mutex_unlock(&lock_stdout); // unlock stdout
- }
- }
-
- /*!
- * Logs new messages
- * @param msg Pointer to msg to log
- * @note
- * This function depends on \sa settings.outLevel
- */
- void log_msg_new (msg_t* msg) {
- if (settings.outLevel >= OUTLEVEL_1) {
- pthread_mutex_lock(&lock_stdout); // lock stdout
- fprintf (stdout, _frm_msg_new, // print and flush
- msg->cMsg.from,
- msg->cMsg.to,
- msg->cMsg.ts,
- msg->cMsg.text
- );
- fflush(stdout);
- pthread_mutex_unlock(&lock_stdout); // unlock stdout
- }
- }
-
- /*!
- * Debug message log to stdout
- * Variadic formating print
- * @param fmt Pointer to printf like format string
- */
- void log_debug (const char *fmt, ...) {
- if (settings.outLevel >= OUTLEVEL_2) {
- va_list ap;
- va_start(ap, fmt);
- pthread_mutex_lock(&lock_stdout);
- vfprintf (stdout, fmt, ap);
- fflush(stdout);
- pthread_mutex_unlock(&lock_stdout);
- va_end(ap);
- }
- }
-
- /*!
- * Debug error message log to stderr
- * Variadic formating print
- * @param fmt Pointer to printf like format string
- */
- void log_error (const char *fmt, ...) {
- va_list ap;
- va_start(ap, fmt);
- pthread_mutex_lock(&lock_stderr);
- vfprintf (stderr, fmt, ap);
- fflush(stderr);
- pthread_mutex_unlock(&lock_stderr);
- va_end(ap);
- }
-
- //! @}
-
- //! cMsg_t API
- //! @{
-
-
- /*!
- * Make a new message
- * @param msg Pointer to message to create
- */
- void cMsg_make (cMsg_t* msg) {
- static int msgID =0; // unique msg ID
- msg->from = settings.me; // from me
- do {
- // randomly select recipient device
- msg->to = devList[rand() % AEMLIST_SIZE].dev;
- } while (msg->to == settings.me);
- msg->ts = time(NULL);
-
- // stream the first fields and take the quote text iterator
- sprintf (msg->text, "%s #%d", MESSAGE_BODY, msgID++);
- }
-
- /*!
- * Message concatenation
- * This function synthesize the ascii message for transmission
- * @param msg Pointer to message to serialize
- * @param buffer Pointer to output buffer
- * @return The size of the resulting message
- */
- size_t cMsg_serialize (cMsg_t* msg, char_t* buffer) {
- return sprintf (buffer, "%d_%d_%lld_%s",
- msg->from,
- msg->to,
- msg->ts,
- msg->text
- );
- }
-
- /*!
- * Custom strtok functionality
- * @param str Pointer to string to parse
- * @param max The size of string
- * @param c Single delimiter character (non compatible with strtok())
- * @return Pointer to token (non compatible with strtok())
- * @note
- * This function alters the given string by adding null termination in the places
- * of delimiters
- */
- char_t* _strtok (char_t* str, size_t max, char_t c) {
- static char_t* last = NULL;
- char_t* ret = str;
-
- // init last
- if (str != NULL) last = str;
-
- // loop
- for (size_t i=0 ; i<max ; ++i) {
- if (*last == c) {
- *last++ = 0;
- return ret;
- }
- ++last;
- }
- return NULL;
- }
-
- /*!
- * Parse an incoming message
- *
- * @param cMsg Pointer to cMsg object to store the parsed data
- * @param rawMsg Pointer to raw message
- * @param size The size f raw message buffer
- * @return The status of the operation
- * @arg MSG_OK Success
- * @arg MSG_ERROR Parse failure, incoming message format error
- */
- status_t cMsg_parse (cMsg_t* cMsg, char_t* rawMsg, size_t size) {
-
- // Check message integrity
- if (size > MSG_TEXT_SIZE)
- return MSG_ERROR;
-
- // Parse message
- char_t* rest = rawMsg;
- char_t* tok[4];
- bool_t done = true;
-
- for (size_t i =0; i < 3; ++i) {
- tok[i] = _strtok (rest, size, MSG_DELIMITER);
-
- if (tok[i] == NULL) {
- done = false;
- break;
- }
- else {
- int l = strlen(rest);
- rest += l + 1;
- size -= l + 1;
- }
- }
- tok[3] = rest;
-
- if (done) {
- cMsg->from = atoi (tok[0]);
- cMsg->to = atoi (tok[1]);
- cMsg->ts = atoi (tok[2]);
- strcpy (cMsg->text, tok[3]);
- return MSG_OK;
- }
-
- return MSG_ERROR;
- }
-
- /*! getter for cMsg_t member fromAEM */
- uint32_t cMsg_getFrom(cMsg_t* cMsg) { return cMsg->from; }
- /*! getter for cMsg_t member toAEM */
- uint32_t cMsg_getTo(cMsg_t* cMsg) { return cMsg->to; }
- /*! getter for cMsg_t member fromAEM */
- uint64_t cMsg_getTs(cMsg_t* cMsg) { return cMsg->ts; }
- /*! getter for payload text member */
- char_t* cMsg_getText(cMsg_t* cMsg) { return cMsg->text; }
-
- /*!
- * Predicate to check core message equality
- * @param m1 Pointer to message 1
- * @param m2 Pointer to message 2
- * @return Equality result (true, false)
- */
- bool_t cMsg_equal (cMsg_t* m1, cMsg_t* m2) {
- if (m1->from != m2->from) return false;
- if (m1->to != m2->to) return false;
- if (m1->ts != m2->ts) return false;
- if (strncmp (m1->text, m2->text, sizeof(m1->text)))
- return false;
- return true;
- }
- //! @}
-
- /*!
- * mgs_t API
- */
- //! @{
-
- /*!
- * message initialization
- * @param msg
- */
- void msg_init (msg_t* msg) {
- memset ((void*)msg, 0, sizeof(msg_t)); // init to 0
- }
-
- //! @}
-
-
- //! msgList API
- //! @{
-
- /*! Macro helper to saturate increased values */
- #define _top_saturate(test, apply, value) do { \
- if (test >= value) apply = value; \
- } while (0)
-
- /*! Macro helper to saturate decreased values */
- #define _btm_saturate(test, apply, value) do { \
- if (test < value) apply = value; \
- while (0)
-
- /*!
- * Returns an iterator for \sa devList AND \sa msg_t.recipients
- * @param dev The device to search
- * @return The iterator, namely the index to devList array in which is the \p dev
- */
- dIter_t devList_getIter (devAEM_t dev) {
- for (dIter_t i =0 ; i<AEMLIST_SIZE ; ++i) {
- if (devList[i].dev == dev)
- return i;
- }
- return -1; // return end()
- }
-
- /*!
- * Initialize the msgList
- * @param msgList Pointer to mesList t initialize
- * @return The status of the operation
- */
- status_t msgList_init (msgList_t* msgList) {
- if (pthread_mutex_init(&lock_msgList, NULL) != 0) {
- log_error ("Error: mutex init has failed\n");
- return MSG_ERROR;
- }
- memset((void*)msgList, 0, sizeof(msgList_t));
- msgList->first =-1;
- msgList->last =-1;
- srand (time(NULL));
- return MSG_OK;
- }
-
- /*!
- * @brief msgList iterator pre-increment in the msg_t direction
- *
- * This iterator force a ring buffer behavior. This function takes pointer
- * to iterator to alter but return the altered value so it can be directly
- * used in expressions
- *
- * @param it Pointer to iterator to increase
- * @return The iterator values
- */
- mIter_t msgList_preInc (mIter_t* it) {
- if (++*it >= MSG_LIST_SIZE) *it = 0;
- return *it;
- }
-
- /*!
- * @brief msgList iterator pre-decrement in the msg_t direction
- *
- * This iterator force a ring buffer behavior. This function takes pointer
- * to iterator to alter but return the altered value so it can be directly
- * used in expressions
- *
- * @param it Pointer to iterator to decrease
- * @return The iterator values
- */
- mIter_t msgList_preDec (mIter_t* it) {
- if (--*it < 0) *it = MSG_LIST_SIZE;
- return *it;
- }
-
- /*!
- * @param this Pointer to msgList to use
- * @return An iterator to the first message of \sa MSG_LIST_SIZE
- * of msgList.m[]
- * @note
- * As the msgList is a ring buffer we can not have a sensible end() iterator.
- * end() will eventually merge with begin() when the size reaches \sa MSG_LIST_SIZE
- * For that reason in all of our loops through msgList we shall take a begin()
- * iterator and loop using size as sentinel
- * @example
- * @code{.c}
- * mIter_t it = msgList_begin (&msgList); // get a message iterator
- * size_t size= msgList_size(&msgList); // get current msgList size
- * for (size_t i=0 ; i<size ; ++i, msgList_preInc (&it)) { // don't forget to increase iterator
- * // use msgList[it]
- * }
- * @endcode
- */
- mIter_t msgList_begin (msgList_t* this) {
- return this->first;
- }
-
- /*!
- * @param this Pointer to msgList to use
- * @return An iterator to the last inserted message of \sa MSG_LIST_SIZE
- * of msgList.m[]
- */
- mIter_t msgList_last (msgList_t* this) {
- return this->last;
- }
-
- /*!
- * @param this Pointer to msgList to use
- * @return The current used slots of msgList
- * @note
- * As the msgList is a ring buffer we can not have a sensible end() iterator.
- * end() will eventually merge with begin() when the size reaches \sa MSG_LIST_SIZE
- * For that reason in all of our loops through msgList we shall take a begin()
- * iterator and loop using size as sentinel
- * @example
- * @code{.c}
- * mIter_t it = msgList_begin (&msgList); // get a message iterator
- * size_t size= msgList_size(&msgList); // get current msgList size
- * for (size_t i=0 ; i<size ; ++i, msgList_preInc (&it)) { // don't forget to increase iterator
- * // use msgList[it]
- * }
- * @endcode
- */
- size_t msgList_size (msgList_t* this) {
- return this->size;
- }
-
- /*!
- * Searches for a message in the message list.
- *
- * @param this The msgList object to work with
- * @param msg Pointer to message to search
- * @return Iterator to message if found, or -1 if not
- */
- mIter_t msgList_find (msgList_t* this, msg_t* msg) {
- mIter_t it =this->last; // get iterator
- // search from end to start to find msg, return on success
- for (size_t i=0 ; i < this->size ; ++i) {
- if (cMsg_equal (&this->m[it].cMsg, &msg->cMsg))
- return it;
- msgList_preDec(&it);
- // We start from the end as we think, its more possible
- // to find msg in the recent messages.
- }
- return (mIter_t)-1; // fail to find
- }
-
-
-
- /*!
- * Add a new message in the message list
- *
- * @param this The msgList object to work with
- * @param msg Pointer to message
- * @return Iterator to the added item (last)
- */
- mIter_t msgList_add (msgList_t* this, msg_t* msg) {
- if (this->first == -1) // if its first time init "first" iterator
- this->first = 0;
- this->m[msgList_preInc(&this->last)] = *msg; // store data *++it = *msg;
- _top_saturate(++this->size, this->size, MSG_LIST_SIZE); // count the items with saturation
-
- // if we reacher full capacity, move along first also
- if ((this->first == this->last) && (this->size > 1))
- msgList_preInc(&this->first);
- return this->last; // return the iterator to newly created slot
- }
-
-
- //! Acquires msgList resources
- void msgList_acquire () { pthread_mutex_lock (&lock_msgList); }
- //! releases msgList resources
- void msgList_release () { pthread_mutex_unlock (&lock_msgList); }
-
- //! @}
-
- //! Statistics API
- //! @{
-
- /*!
- * Initialize statistics
- * @param s Pointer to \sa stat_t struct to initialize
- * @return The status of the operation
- */
- status_t stats_init (stats_t* s) {
- memset ((void*)s, 0, sizeof (stats_t));
- if (pthread_mutex_init(&lock_stats, NULL) != 0) {
- log_error ("Error: mutex init has failed\n");
- return MSG_ERROR;
- }
- return MSG_OK;
- }
-
- /*!
- * Update statistics for newly created messages
- * @param msg Pointer to msg
- */
- void statsUpdateCreate (msg_t* msg) {
- pthread_mutex_lock (&lock_stats);
-
- ++stats.totalMsg;
- ++stats.myMsg;
-
- // average message size
- int32_t saved = stats.totalMsg - stats.duplicateMsg;
- if ((saved-1) > 0) {
- // Append to average
- int32_t l = strlen(msg->cMsg.text);
- stats.avMsgSize += l / (fpdata_t)(saved -1);
- stats.avMsgSize *= (fpdata_t)(saved-1)/saved;
- }
- pthread_mutex_unlock (&lock_stats);
- }
-
- /*!
- * Update statistics for incoming message
- * @param msg Pointer to incoming message
- * @param dup Flag to indicate if the message was duplicate
- */
- void statsUpdateIn (msg_t* msg, bool_t dup) {
- pthread_mutex_lock (&lock_stats);
-
- bool_t forMe = msg->cMsg.to == settings.me;
- stats.totalMsg++;
- stats.duplicateMsg += (dup) ? 1:0;
- stats.forMeMsg += (forMe) ? 1:0;
- stats.inDirectMsg += (forMe && (msg->cMsg.from == msg->sender)) ? 1:0;
-
- // averages
- int32_t saved = stats.totalMsg - stats.duplicateMsg;
- if ((saved-1) > 0) {
- // Append to message size average
- int32_t l = strlen(msg->cMsg.text);
- stats.avMsgSize += l / (fpdata_t)(saved -1);
- stats.avMsgSize *= (fpdata_t)(saved-1)/saved;
-
- if (settings.trackTime) {
- // append to time to me average
- tstamp_t dt = (tstamp_t)time(NULL) - msg->cMsg.ts;
- if (dt < 0)
- dt = 0;
- stats.avTimeToMe += dt / (fpdata_t)(saved -1);
- stats.avTimeToMe *= (fpdata_t)(saved-1)/saved;
- }
- }
- pthread_mutex_unlock (&lock_stats);
- }
-
- /*!
- * Update statistics for outgoing message
- * @param msg Pointer to message
- * @param dev The recipient device
- */
- void statsUpdateOut (msg_t* msg, devAEM_t dev) {
- pthread_mutex_lock (&lock_stats);
- stats.outDirectMsg += (msg->cMsg.to == dev) ? 1:0;
- pthread_mutex_unlock (&lock_stats);
- }
-
- /*!
- * Statistics print functionality
- * @param stats Pointer to stats to print
- * @return The status of the operation
- */
- status_t statsPrint (stats_t* stats) {
- FILE* fp = fopen ("statistics.txt", "w");
- if (fp == NULL) {
- fclose (fp);
- return MSG_ERROR;
- }
- fprintf (fp, "\n Statistics\n============\n");
- fprintf (fp, "Total messages: %d\n", stats->totalMsg);
- fprintf (fp, "Duplicate messages: %d\n", stats->duplicateMsg);
- fprintf (fp, "Messages for me: %d\n", stats->forMeMsg);
- fprintf (fp, "Messages by me: %d\n",stats->myMsg);
- fprintf (fp, "In messages direct for me: %d\n", stats->inDirectMsg);
- fprintf (fp, "Out direct messages: %d\n", stats->outDirectMsg);
- fprintf (fp, "Average message size: %g\n", stats->avMsgSize);
- fprintf (fp, "Average time to me: %g\n", stats->avTimeToMe);
-
- for (size_t i =0 ; i<AEMLIST_SIZE ; ++i) {
- fprintf (fp, " Device %u found on %lld, last: %lld\n",
- devList[i].dev, devList[i].begin, devList[i].end);
- }
- fclose (fp);
- return MSG_OK;
- }
-
- //! @}
-
|