/*! * \file core.c * Core messenger functionality * \author Christos Choutouridis AEM:8997 */ #include #include #include #include #include #include "core.h" //! Global data //! @{ msgList_t msgList; //!< The message list for our application. //! @} /* * Local data types */ static pthread_mutex_t lock_msgList; //!< mutex for msgList locking static pthread_mutex_t lock_devList; //!< mutex for devList locking static pthread_mutex_t lock_stderr; //!< mutex for stderr locking static pthread_mutex_t lock_stdout; //!< mutex for stderr locking static pthread_mutex_t lock_stats; //!< mutex for stats locking //! Helper API //! @{ //! Macro to create a in_addr_t #define _ADHOC_SUBNET(A, B, C, D) (((A)<<24) | ((B)<<16) | ((C)<<8) | (D)) /*! * in_addr_t to devAEM_t conversion utility function * @param in_addr Internet address (host byte order) * @return devAEM_t representation of the address * @note * for example: * @code * devAEM_t dev = addr2devAEM (ntohl(clntAddr.sin_addr.s_addr)); * @endcode */ devAEM_t addr2devAEM (in_addr_t in_addr) { return (in_addr & 0x000000FF) + ((in_addr >> 8) & 0x000000FF) * 100; } /*! * devAEM_t to in_addr_t conversion utility function * @param dev devAEM_t address * @return Internet address (host byte order) * @note * for example: * @code * sockaddr_in_t srvAddr; * srvAddr.sin_addr.s_addr = htonl (devAEM2addr (dev)); * @endcode */ in_addr_t devAEM2addr (devAEM_t dev) { uint32_t add = _ADHOC_SUBNET (ADHOC_NET_A, ADHOC_NET_B, ADHOC_NET_C, ADHOC_NET_D); add |= (dev % 100) & 0x000000FF; add |= ((dev / 100) & 0x000000FF) << 8; return add; } /*! * DevIP_t to devAEM_t conversion utility function * @param ip pointer to devIP_t type address * @return devAEM_t representation of the address * @note * We discard the first 2 fields */ devAEM_t ip2AEM (devIP_t* ip) { return ip->C*100 + ip->D; } /*! * devAEM_t to DevIP_t conversion utility function * @param dev devAEM_t representation of the address * @return devIP_t type address */ devIP_t AEM2ip (devAEM_t dev) { devIP_t ip = { .A =ADHOC_NET_A, .B=ADHOC_NET_B, .C=dev/100, .D=dev%100 }; return ip; } //! @} /* * Log related local data */ static char_t* _frm_msg_in = "In from dev=%d, message: from=%d, to=%d, timestamp=%lld, text=%s\n"; static char_t* _frm_msg_out = "Out to dev=%d, message: from=%d, to=%d, timestamp=%lld, text=%s\n"; static char_t* _frm_msg_new = "New message: from=%d, to=%d, timestamp=%lld, text=%s\n"; //! log API //! @{ #define _HEAD_SIZE 25 /*! * Initialize log functionality * @return The status of the operation */ status_t log_init (void) { // Try to initialize pthreads for logging if (pthread_mutex_init(&lock_stderr, NULL) != 0) { fprintf (stderr, "Error %s: mutex init has failed\n", __FUNCTION__ ); return MSG_ERROR; } if (pthread_mutex_init(&lock_stdout, NULL) != 0) { fprintf (stderr, "Error %s: mutex init has failed\n", __FUNCTION__ ); return MSG_ERROR; } return MSG_OK; } /*! * Logs incoming messages * @param msg Pointer to msg to log * @note * This function depends on settings.outLevel */ void log_msg_in (msg_t* msg) { if (settings.outLevel >= OUTLEVEL_1) { pthread_mutex_lock(&lock_stdout); // lock stdout fprintf (stdout, _frm_msg_in, // print and flush msg->sender, msg->cMsg.from, msg->cMsg.to, msg->cMsg.ts, msg->cMsg.text ); fflush(stdout); pthread_mutex_unlock(&lock_stdout); // unlock stdout } } /*! * Logs outgoing messages * @param msg Pointer to msg to log * @param dev device that accepts the message * @note * This function depends on settings.outLevel */ void log_msg_out (msg_t* msg, devAEM_t dev) { if (settings.outLevel >= OUTLEVEL_1) { pthread_mutex_lock(&lock_stdout); // lock stdout fprintf (stdout, _frm_msg_out, // print and flush dev, msg->cMsg.from, msg->cMsg.to, msg->cMsg.ts, msg->cMsg.text ); fflush(stdout); pthread_mutex_unlock(&lock_stdout); // unlock stdout } } /*! * Logs new messages * @param msg Pointer to msg to log * @note * This function depends on settings.outLevel */ void log_msg_new (msg_t* msg) { if (settings.outLevel >= OUTLEVEL_1) { pthread_mutex_lock(&lock_stdout); // lock stdout fprintf (stdout, _frm_msg_new, // print and flush msg->cMsg.from, msg->cMsg.to, msg->cMsg.ts, msg->cMsg.text ); fflush(stdout); pthread_mutex_unlock(&lock_stdout); // unlock stdout } } /*! * Debug message log to stdout * Variadic formating print * @param fmt Pointer to printf like format string */ void log_debug (const char *fmt, ...) { if (settings.outLevel >= OUTLEVEL_2) { va_list ap; va_start(ap, fmt); pthread_mutex_lock(&lock_stdout); vfprintf (stdout, fmt, ap); fflush(stdout); pthread_mutex_unlock(&lock_stdout); va_end(ap); } } /*! * Debug error message log to stderr * Variadic formating print * @param fmt Pointer to printf like format string */ void log_error (const char *fmt, ...) { va_list ap; va_start(ap, fmt); pthread_mutex_lock(&lock_stderr); vfprintf (stderr, fmt, ap); fflush(stderr); pthread_mutex_unlock(&lock_stderr); va_end(ap); } //! @} //! cMsg_t API //! @{ /*! * Make a new message * @param msg Pointer to message to create */ void cMsg_make (cMsg_t* msg) { static int msgID =0; // unique msg ID msg->from = settings.me; // from me do { // randomly select recipient device msg->to = devList[rand() % AEMLIST_SIZE].dev; } while (msg->to == settings.me); msg->ts = time(NULL); // stream the first fields and take the quote text iterator sprintf (msg->text, "%s #%d", MESSAGE_BODY, msgID++); } /*! * Message concatenation * This function synthesize the ascii message for transmission * @param msg Pointer to message to serialize * @param buffer Pointer to output buffer * @return The size of the resulting message */ size_t cMsg_serialize (cMsg_t* msg, char_t* buffer) { return sprintf (buffer, "%d_%d_%lld_%s", msg->from, msg->to, msg->ts, msg->text ); } /*! * Custom strtok functionality * @param str Pointer to string to parse * @param max The size of string * @param c Single delimiter character (non compatible with strtok()) * @return Pointer to token (non compatible with strtok()) * @note * This function alters the given string by adding null termination in the places * of delimiters */ char_t* _strtok (char_t* str, size_t max, char_t c) { static char_t* last = NULL; char_t* ret = str; // init last if (str != NULL) last = str; // loop for (size_t i=0 ; i MSG_TEXT_SIZE) return MSG_ERROR; // Parse message char_t* rest = rawMsg; char_t* tok[4]; bool_t done = true; for (size_t i =0; i < 3; ++i) { tok[i] = _strtok (rest, size, MSG_DELIMITER); if (tok[i] == NULL) { done = false; break; } else { int l = strlen(rest); rest += l + 1; size -= l + 1; } } tok[3] = rest; if (done) { cMsg->from = atoi (tok[0]); cMsg->to = atoi (tok[1]); cMsg->ts = atoi (tok[2]); strcpy (cMsg->text, tok[3]); return MSG_OK; } return MSG_ERROR; } /*! getter for cMsg_t member fromAEM */ uint32_t cMsg_getFrom(cMsg_t* cMsg) { return cMsg->from; } /*! getter for cMsg_t member toAEM */ uint32_t cMsg_getTo(cMsg_t* cMsg) { return cMsg->to; } /*! getter for cMsg_t member fromAEM */ uint64_t cMsg_getTs(cMsg_t* cMsg) { return cMsg->ts; } /*! getter for payload text member */ char_t* cMsg_getText(cMsg_t* cMsg) { return cMsg->text; } /*! * Predicate to check core message equality * @param m1 Pointer to message 1 * @param m2 Pointer to message 2 * @return Equality result (true, false) */ bool_t cMsg_equal (cMsg_t* m1, cMsg_t* m2) { if (m1->from != m2->from) return false; if (m1->to != m2->to) return false; if (m1->ts != m2->ts) return false; if (strncmp (m1->text, m2->text, strlen(m1->text))) return false; return true; } //! @} /*! * mgs_t API */ //! @{ /*! * message initialization * @param msg */ void msg_init (msg_t* msg) { memset ((void*)msg, 0, sizeof(msg_t)); // init to 0 } //! @} //! devList API //! @{ /*! * Initialize the devList * @param msgList Pointer to mesList t initialize * @return The status of the operation */ status_t devList_init (devList_t* devList) { devAEM_t l[] = AEMLIST; if (pthread_mutex_init(&lock_devList, NULL) != 0) { log_error ("Error: mutex init has failed\n"); return MSG_ERROR; } memset ((void*)devList, 0, sizeof(devList_t)); for (size_t i =0 ; i= value) apply = value; \ } while (0) /*! Macro helper to saturate decreased values */ #define _btm_saturate(test, apply, value) do { \ if (test < value) apply = value; \ while (0) /*! * Initialize the msgList * @param msgList Pointer to mesList t initialize * @return The status of the operation */ status_t msgList_init (msgList_t* msgList) { if (pthread_mutex_init(&lock_msgList, NULL) != 0) { log_error ("Error: mutex init has failed\n"); return MSG_ERROR; } memset((void*)msgList, 0, sizeof(msgList_t)); msgList->first =-1; msgList->last =-1; srand (time(NULL)); return MSG_OK; } /*! * @brief msgList iterator pre-increment in the msg_t direction * * This iterator force a ring buffer behavior. This function takes pointer * to iterator to alter but return the altered value so it can be directly * used in expressions * * @param it Pointer to iterator to increase * @return The iterator values */ mIter_t msgList_preInc (mIter_t* it) { if (++*it >= MSG_LIST_SIZE) *it = 0; return *it; } /*! * @brief msgList iterator pre-decrement in the msg_t direction * * This iterator force a ring buffer behavior. This function takes pointer * to iterator to alter but return the altered value so it can be directly * used in expressions * * @param it Pointer to iterator to decrease * @return The iterator values */ mIter_t msgList_preDec (mIter_t* it) { if (--*it < 0) *it = MSG_LIST_SIZE-1; return *it; } /*! * @param this Pointer to msgList to use * @return An iterator to the first message of MSG_LIST_SIZE * of msgList.m[] * @note * As the msgList is a ring buffer we can not have a sensible end() iterator. * end() will eventually merge with begin() when the size reaches MSG_LIST_SIZE * For that reason in all of our loops through msgList we shall take a begin() * iterator and loop using size as sentinel for example: * @code * mIter_t it = msgList_begin (&msgList); // get a message iterator * size_t size= msgList_size(&msgList); // get current msgList size * for (size_t i=0 ; ifirst; } /*! * @param this Pointer to msgList to use * @return An iterator to the last inserted message of MSG_LIST_SIZE * of msgList.m[] */ mIter_t msgList_last (msgList_t* this) { return this->last; } /*! * @param this Pointer to msgList to use * @return The current used slots of msgList * @note * As the msgList is a ring buffer we can not have a sensible end() iterator. * end() will eventually merge with begin() when the size reaches MSG_LIST_SIZE * For that reason in all of our loops through msgList we shall take a begin() * iterator and loop using size as sentinel * @code * mIter_t it = msgList_begin (&msgList); // get a message iterator * size_t size= msgList_size(&msgList); // get current msgList size * for (size_t i=0 ; isize; } /*! * Searches for a message in the message list. * * @param this The msgList object to work with * @param msg Pointer to message to search * @return Iterator to message if found, or -1 if not */ mIter_t msgList_find (msgList_t* this, msg_t* msg) { mIter_t it =this->last; // get iterator // search from end to start to find msg, return on success for (size_t i=0 ; i < this->size ; ++i) { if (cMsg_equal (&this->m[it].cMsg, &msg->cMsg)) return it; msgList_preDec(&it); // We start from the end as we think, its more possible // to find msg in the recent messages. } return (mIter_t)-1; // fail to find } /*! * Add a new message in the message list * * @param this The msgList object to work with * @param msg Pointer to message * @return Iterator to the added item (last) */ mIter_t msgList_add (msgList_t* this, msg_t* msg) { if (this->first == -1) // if its first time init "first" iterator this->first = 0; this->m[msgList_preInc(&this->last)] = *msg; // store data *++it = *msg; _top_saturate(++this->size, this->size, MSG_LIST_SIZE); // count the items with saturation // if we reacher full capacity, move along first also if ((this->first == this->last) && (this->size > 1)) msgList_preInc(&this->first); return this->last; // return the iterator to newly created slot } //! Acquires msgList resources void msgList_acquire () { pthread_mutex_lock (&lock_msgList); } //! releases msgList resources void msgList_release () { pthread_mutex_unlock (&lock_msgList); } //! @} //! Statistics API //! @{ /*! * Initialize statistics * @param s Pointer to stat_t struct to initialize * @return The status of the operation */ status_t stats_init (stats_t* s) { memset ((void*)s, 0, sizeof (stats_t)); if (pthread_mutex_init(&lock_stats, NULL) != 0) { log_error ("Error: mutex init has failed\n"); return MSG_ERROR; } return MSG_OK; } /*! * Update statistics for newly created messages * @param msg Pointer to msg */ void statsUpdateCreate (msg_t* msg) { pthread_mutex_lock (&lock_stats); ++stats.totalMsg; ++stats.myMsg; // average message size int32_t saved = stats.totalMsg - stats.duplicateMsg; if ((saved-1) > 0) { // Append to average int32_t l = strlen(msg->cMsg.text); stats.avMsgSize += l / (fpdata_t)(saved -1); stats.avMsgSize *= (fpdata_t)(saved-1)/saved; } pthread_mutex_unlock (&lock_stats); } /*! * Update statistics for incoming message * @param msg Pointer to incoming message * @param dup Flag to indicate if the message was duplicate */ void statsUpdateIn (msg_t* msg, bool_t dup) { pthread_mutex_lock (&lock_stats); bool_t forMe = msg->cMsg.to == settings.me; stats.totalMsg++; stats.duplicateMsg += (dup) ? 1:0; stats.forMeMsg += (forMe) ? 1:0; stats.inDirectMsg += (forMe && (msg->cMsg.from == msg->sender)) ? 1:0; // averages int32_t saved = stats.totalMsg - stats.duplicateMsg; if ((saved-1) > 0) { // Append to message size average int32_t l = strlen(msg->cMsg.text); stats.avMsgSize += l / (fpdata_t)(saved -1); stats.avMsgSize *= (fpdata_t)(saved-1)/saved; if (settings.trackTime) { // append to time to me average tstamp_t dt = (tstamp_t)time(NULL) - msg->cMsg.ts; if (dt < 0) dt = 0; stats.avTimeToMe += dt / (fpdata_t)(saved -1); stats.avTimeToMe *= (fpdata_t)(saved-1)/saved; } } pthread_mutex_unlock (&lock_stats); } /*! * Update statistics for outgoing message * @param msg Pointer to message * @param dev The recipient device */ void statsUpdateOut (msg_t* msg, devAEM_t dev) { pthread_mutex_lock (&lock_stats); stats.outDirectMsg += (msg->cMsg.to == dev) ? 1:0; pthread_mutex_unlock (&lock_stats); } /*! * Statistics print functionality * @param stats Pointer to stats to print * @return The status of the operation */ status_t statsPrint (stats_t* stats) { FILE* fp = fopen ("statistics.txt", "w"); if (fp == NULL) { fclose (fp); return MSG_ERROR; } fprintf (fp, "\n Statistics\n============\n"); fprintf (fp, "Total messages: %d\n", stats->totalMsg); fprintf (fp, "Duplicate messages: %d\n", stats->duplicateMsg); fprintf (fp, "Messages for me: %d\n", stats->forMeMsg); fprintf (fp, "Messages by me: %d\n",stats->myMsg); fprintf (fp, "In messages direct for me: %d\n", stats->inDirectMsg); fprintf (fp, "Out direct messages: %d\n", stats->outDirectMsg); fprintf (fp, "Average message size: %g\n", stats->avMsgSize); fprintf (fp, "Average time to me: %g\n", stats->avTimeToMe); fclose (fp); return MSG_OK; } /*! * Device online timing print functionality * @param devList Pointer to devList to print * @return The status of the operation */ status_t statsTimesPrint (devList_t *devList) { FILE* fp = fopen ("devices.txt", "w"); if (fp == NULL) { fclose (fp); return MSG_ERROR; } fprintf (fp, "\n Device timings\n================\n"); for (size_t i =0 ; i