A messenger application for Raspberry Pi Zerofor A.U.TH (Real time Embedded systems).
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

487 lines
13 KiB

  1. /*!
  2. * \file core.c
  3. *
  4. * \author Christos Choutouridis AEM:8997 <cchoutou@ece.auth.gr>
  5. */
  6. #include <stdio.h>
  7. #include <stdlib.h>
  8. #include <string.h>
  9. #include <stdarg.h>
  10. #include <pthread.h>
  11. #include "core.h"
  12. pthread_mutex_t lock_msgList;
  13. pthread_mutex_t lock_stderr;
  14. pthread_mutex_t lock_stdout;
  15. pthread_mutex_t lock_stats;
  16. //! Helper API
  17. //! @{
  18. #define _ADHOC_SUBNET(A, B, C, D) (((A)<<24) | ((B)<<16) | ((C)<<8) | (D))
  19. devAEM_t addr2devAEM (uint32_t in_addr) {
  20. return (in_addr & 0x000000FF) + ((in_addr >> 8) & 0x000000FF) * 100;
  21. }
  22. in_addr_t devAEM2addr (devAEM_t dev) {
  23. uint32_t add = _ADHOC_SUBNET (ADHOC_NET_A, ADHOC_NET_B, ADHOC_NET_C, ADHOC_NET_D);
  24. add |= (dev % 100) & 0x000000FF;
  25. add |= ((dev / 100) & 0x000000FF) << 8;
  26. return add;
  27. }
  28. devAEM_t ip2AEM (devIP_t* ip) {
  29. return ip->C*100 + ip->D;
  30. }
  31. devIP_t AEM2ip (devAEM_t dev) {
  32. devIP_t ip = {
  33. .A =ADHOC_NET_A, .B=ADHOC_NET_B, .C=dev/100, .D=dev%100
  34. };
  35. return ip;
  36. }
  37. devIP_t addr2ip (in_addr_t in_addr) {
  38. devIP_t ip = {
  39. .A = ADHOC_NET_A,
  40. .B = ADHOC_NET_B,
  41. .C = (in_addr >> 8) & 0x000000FF,
  42. .D = in_addr & 0x000000FF
  43. };
  44. return ip;
  45. }
  46. //! @}
  47. //! log API
  48. //! @{
  49. static char_t* _frm_msg_io = "dev=%d, message: from=%d, to=%d, timestamp=%lld, text=%s\n";
  50. static char_t* _frm_msg_new = "new message: from=%d, to=%d, timestamp=%lld, text=%s\n";
  51. #define _HEAD_SIZE 25
  52. status_t log_init (void) {
  53. if (pthread_mutex_init(&lock_stderr, NULL) != 0) {
  54. fprintf (stderr, "Error %s: mutex init has failed\n", __FUNCTION__ );
  55. return MSG_ERROR;
  56. }
  57. if (pthread_mutex_init(&lock_stdout, NULL) != 0) {
  58. fprintf (stderr, "Error %s: mutex init has failed\n", __FUNCTION__ );
  59. return MSG_ERROR;
  60. }
  61. return MSG_OK;
  62. }
  63. void log_msg_io (msg_t* msg) {
  64. if (settings.outLevel >= OUTLEVEL_1) {
  65. char_t head[_HEAD_SIZE];
  66. memcpy (head, msg->cMsg.text, (_HEAD_SIZE-1));
  67. head[_HEAD_SIZE-1] = 0;
  68. pthread_mutex_lock(&lock_stdout);
  69. fprintf (stdout, _frm_msg_io,
  70. msg->sender,
  71. cMsg_getFrom (&msg->cMsg),
  72. cMsg_getTo (&msg->cMsg),
  73. cMsg_getTs (&msg->cMsg),
  74. head
  75. );
  76. fflush(stdout);
  77. pthread_mutex_unlock(&lock_stdout);
  78. }
  79. }
  80. void log_msg_new (msg_t* msg) {
  81. if (settings.outLevel >= OUTLEVEL_1) {
  82. char_t head[_HEAD_SIZE];
  83. memcpy (head, msg->cMsg.text, (_HEAD_SIZE-1));
  84. head[_HEAD_SIZE-1] = 0;
  85. pthread_mutex_lock(&lock_stdout);
  86. fprintf (stdout, _frm_msg_new,
  87. cMsg_getFrom (&msg->cMsg),
  88. cMsg_getTo (&msg->cMsg),
  89. cMsg_getTs (&msg->cMsg),
  90. head
  91. );
  92. fflush(stdout);
  93. pthread_mutex_unlock(&lock_stdout);
  94. }
  95. }
  96. void log_debug (const char *fmt, ...) {
  97. if (settings.outLevel >= OUTLEVEL_2) {
  98. va_list ap;
  99. va_start(ap, fmt);
  100. pthread_mutex_lock(&lock_stdout);
  101. vfprintf (stdout, fmt, ap);
  102. fflush(stdout);
  103. pthread_mutex_unlock(&lock_stdout);
  104. va_end(ap);
  105. }
  106. }
  107. void log_error (const char *fmt, ...) {
  108. va_list ap;
  109. va_start(ap, fmt);
  110. pthread_mutex_lock(&lock_stderr);
  111. vfprintf (stderr, fmt, ap);
  112. fflush(stderr);
  113. pthread_mutex_unlock(&lock_stderr);
  114. va_end(ap);
  115. }
  116. //! @}
  117. //! cMsg_t API
  118. //! @{
  119. /*!
  120. * Make a new message
  121. * @param msg Pointer to message to create
  122. */
  123. void cMsg_make (cMsg_t* msg) {
  124. static int msgID =0; // unique msg ID
  125. msg->from = settings.me; // from me
  126. do {
  127. // randomly select recipient device
  128. msg->to = devList[rand() % AEMLIST_SIZE].dev;
  129. } while (msg->to == settings.me);
  130. msg->ts = time(NULL);
  131. // stream the first fields and take the quote text iterator
  132. sprintf (msg->text, "%s #%d", MESSAGE_BODY, msgID++);
  133. }
  134. size_t cMsg_cat (cMsg_t* msg, char_t* buffer) {
  135. return sprintf (buffer, "%d_%d_%lld_%s",
  136. msg->from,
  137. msg->to,
  138. msg->ts,
  139. msg->text
  140. );
  141. }
  142. char_t* _strtok (char_t* str, char_t c, size_t max) {
  143. static char_t* last = NULL;
  144. char_t* ret = str;
  145. // init last
  146. if (str != NULL) last = str;
  147. // loop
  148. for (size_t i=0 ; i<max ; ++i) {
  149. if (*last == c) {
  150. *last++ = 0;
  151. return ret;
  152. }
  153. ++last;
  154. }
  155. return NULL;
  156. }
  157. /*!
  158. * Parse an incoming message
  159. *
  160. * @param cMsg Pointer to cMsg object to store the parsed data
  161. * @param rawMsg Pointer to raw message
  162. * @param size The size f raw message buffer
  163. * @return The status of the operation
  164. * @arg MSG_OK Success
  165. * @arg MSG_ERROR Parse failure, incoming message format error
  166. */
  167. status_t cMsg_parse (cMsg_t* cMsg, char_t* rawMsg, size_t size) {
  168. // Check message integrity
  169. if (size > MSG_TEXT_SIZE)
  170. return MSG_ERROR;
  171. // Parse message
  172. char_t* rest = rawMsg;
  173. char_t* tok[4];
  174. bool_t done = true;
  175. for (size_t i =0; i < 3; ++i) {
  176. tok[i] = _strtok (rest, MSG_DELIMITER, size);
  177. if (tok[i] == NULL) {
  178. done = false;
  179. break;
  180. }
  181. else {
  182. int l = strlen(rest);
  183. rest += l + 1;
  184. size -= l + 1;
  185. }
  186. }
  187. tok[3] = rest;
  188. if (done) {
  189. cMsg->from = atoi (tok[0]);
  190. cMsg->to = atoi (tok[1]);
  191. cMsg->ts = atoi (tok[2]);
  192. strcpy (cMsg->text, tok[3]);
  193. return MSG_OK;
  194. }
  195. return MSG_ERROR;
  196. }
  197. /*! getter for cMsg_t member fromAEM */
  198. uint32_t cMsg_getFrom(cMsg_t* cMsg) { return cMsg->from; }
  199. /*! getter for cMsg_t member toAEM */
  200. uint32_t cMsg_getTo(cMsg_t* cMsg) { return cMsg->to; }
  201. /*! getter for cMsg_t member fromAEM */
  202. uint64_t cMsg_getTs(cMsg_t* cMsg) { return cMsg->ts; }
  203. /*! getter for payload text member */
  204. char_t* cMsg_getText(cMsg_t* cMsg) { return cMsg->text; }
  205. /*!
  206. * Predicate to check core message equality
  207. * @param m1 Pointer to message 1
  208. * @param m2 Pointer to message 2
  209. * @return Equality result (true, false)
  210. */
  211. bool_t cMsg_equal (cMsg_t* m1, cMsg_t* m2) {
  212. if (m1->from != m2->from) return false;
  213. if (m1->to != m2->to) return false;
  214. if (m1->ts != m2->ts) return false;
  215. if (strncmp (m1->text, m2->text, sizeof(m1->text)))
  216. return false;
  217. return true;
  218. }
  219. //! @}
  220. /*!
  221. * mgs_t API
  222. */
  223. //! @{
  224. void msg_init (msg_t* msg) {
  225. memset ((void*)msg, 0, sizeof(msg));
  226. }
  227. //! @}
  228. /*!
  229. * Create a message list for our application.
  230. */
  231. msgList_t msgList;
  232. //! msgList API
  233. //! @{
  234. /*! Macro helper to saturate increased values */
  235. #define _top_saturate(test, apply, value) do { \
  236. if (test >= value) apply = value; \
  237. } while (0)
  238. /*! Macro helper to saturate decreased values */
  239. #define _btm_saturate(test, apply, value) do { \
  240. if (test < value) apply = value; \
  241. while (0)
  242. dIter_t devList_getIter (devAEM_t dev) {
  243. for (dIter_t i =0 ; i<AEMLIST_SIZE ; ++i) {
  244. if (devList[i].dev == dev)
  245. return i;
  246. }
  247. return -1; // return end()
  248. }
  249. status_t msgList_init (msgList_t* msgList) {
  250. if (pthread_mutex_init(&lock_msgList, NULL) != 0) {
  251. log_error ("Error: mutex init has failed\n");
  252. return MSG_ERROR;
  253. }
  254. memset((void*)msgList, 0, sizeof(msgList_t));
  255. msgList->first =-1;
  256. msgList->last =-1;
  257. srand (time(NULL));
  258. return MSG_OK;
  259. }
  260. /*!
  261. * @brief msgList iterator pre-increment in the msg_t direction
  262. *
  263. * This iterator force a ring buffer behavior. This function takes pointer
  264. * to iterator to alter but return the altered value so it can be directly
  265. * used in expressions
  266. *
  267. * @param it Pointer to iterator to increase
  268. * @return The iterator values
  269. */
  270. mIter_t msgList_preInc (mIter_t* it) {
  271. if (++*it >= MSG_LIST_SIZE) *it = 0;
  272. return *it;
  273. }
  274. /*!
  275. * @brief msgList iterator pre-decrement in the msg_t direction
  276. *
  277. * This iterator force a ring buffer behavior. This function takes pointer
  278. * to iterator to alter but return the altered value so it can be directly
  279. * used in expressions
  280. *
  281. * @param it Pointer to iterator to decrease
  282. * @return The iterator values
  283. */
  284. mIter_t msgList_preDec (mIter_t* it) {
  285. if (--*it < 0) *it = MSG_LIST_SIZE;
  286. return *it;
  287. }
  288. mIter_t msgList_begin (msgList_t* this) {
  289. return this->first;
  290. }
  291. mIter_t msgList_last (msgList_t* this) {
  292. return this->last;
  293. }
  294. size_t msgList_size (msgList_t* this) {
  295. return this->size;
  296. }
  297. /*!
  298. * Searches for a message in the message list.
  299. *
  300. * @param this The msgList object to work with
  301. * @param msg Pointer to message to search
  302. * @return Iterator to message if found, or -1 if not
  303. */
  304. mIter_t msgList_find (msgList_t* this, msg_t* msg) {
  305. mIter_t it =this->last; // get iterator
  306. // search from end to start to find msg, return on success
  307. for (size_t i=0 ; i < this->size ; ++i) {
  308. if (cMsg_equal (&this->m[it].cMsg, &msg->cMsg))
  309. return it;
  310. msgList_preDec(&it);
  311. // We start from the end as we think, its more possible
  312. // to find msg in the recent messages.
  313. }
  314. return (mIter_t)-1; // fail to find
  315. }
  316. /*!
  317. * Add a new message in the message list
  318. *
  319. * @param this The msgList object to work with
  320. * @param msg Pointer to message
  321. * @return Iterator to the added item (last)
  322. */
  323. mIter_t msgList_add (msgList_t* this, msg_t* msg) {
  324. if (this->first == -1) // if its first time init "first" iterator
  325. this->first = 0;
  326. this->m[msgList_preInc(&this->last)] = *msg; // store data *++it = *msg;
  327. _top_saturate(++this->size, this->size, MSG_LIST_SIZE); // count the items with saturation
  328. // if we reacher full capacity, move along first also
  329. if ((this->first == this->last) && (this->size > 1))
  330. msgList_preInc(&this->first);
  331. return this->last; // return the iterator to newly created slot
  332. }
  333. void msgList_acquire () { pthread_mutex_lock (&lock_msgList); }
  334. void msgList_release () { pthread_mutex_unlock (&lock_msgList); }
  335. //! @}
  336. //! Statistics API
  337. //! @{
  338. status_t stats_init (stats_t* s) {
  339. memset ((void*)s, 0, sizeof (stats_t));
  340. if (pthread_mutex_init(&lock_stats, NULL) != 0) {
  341. log_error ("Error: mutex init has failed\n");
  342. return MSG_ERROR;
  343. }
  344. return MSG_OK;
  345. }
  346. void statsUpdateCreate (msg_t* msg) {
  347. pthread_mutex_lock (&lock_stats);
  348. ++stats.totalMsg;
  349. ++stats.myMsg;
  350. // average message size
  351. int32_t saved = stats.totalMsg - stats.duplicateMsg;
  352. if ((saved-1) > 0) {
  353. int32_t l = strlen(msg->cMsg.text);
  354. stats.avMsgSize += l / (fpdata_t)(saved -1);
  355. stats.avMsgSize *= (fpdata_t)(saved-1)/saved;
  356. }
  357. pthread_mutex_unlock (&lock_stats);
  358. }
  359. void statsUpdateIn (msg_t* msg, bool_t dup) {
  360. pthread_mutex_lock (&lock_stats);
  361. bool_t forMe = msg->cMsg.to == settings.me;
  362. stats.totalMsg++;
  363. stats.duplicateMsg += (dup) ? 1:0;
  364. stats.forMeMsg += (forMe) ? 1:0;
  365. stats.inDirectMsg += (forMe && (msg->cMsg.from == msg->sender)) ? 1:0;
  366. // averages
  367. int32_t saved = stats.totalMsg - stats.duplicateMsg;
  368. if ((saved-1) > 0) {
  369. int32_t l = strlen(msg->cMsg.text);
  370. stats.avMsgSize += l / (fpdata_t)(saved -1);
  371. stats.avMsgSize *= (fpdata_t)(saved-1)/saved;
  372. if (settings.trackTime) {
  373. tstamp_t dt = (tstamp_t)time(NULL) - msg->cMsg.ts;
  374. if (dt < 0)
  375. dt = 0;
  376. stats.avTimeToMe += dt / (fpdata_t)(saved -1);
  377. stats.avTimeToMe *= (fpdata_t)(saved-1)/saved;
  378. }
  379. }
  380. pthread_mutex_unlock (&lock_stats);
  381. }
  382. void statsUpdateOut (msg_t* msg, devAEM_t dev) {
  383. pthread_mutex_lock (&lock_stats);
  384. stats.outDirectMsg += (msg->cMsg.to == dev) ? 1:0;
  385. pthread_mutex_unlock (&lock_stats);
  386. }
  387. status_t statsPrint (stats_t* stats) {
  388. FILE* fp = fopen ("statistics.txt", "w");
  389. if (fp == NULL) {
  390. fclose (fp);
  391. return MSG_ERROR;
  392. }
  393. fprintf (fp, "\nStatistics\n");
  394. fprintf (fp, " total messages: %d\n", stats->totalMsg);
  395. fprintf (fp, " duplicate messages: %d\n", stats->duplicateMsg);
  396. fprintf (fp, " messages for me: %d\n", stats->forMeMsg);
  397. fprintf (fp, " messages by me: %d\n",stats->myMsg);
  398. fprintf (fp, " In messages direct for me: %d\n", stats->inDirectMsg);
  399. fprintf (fp, " Out direct messages: %d\n", stats->outDirectMsg);
  400. fprintf (fp, " Average message size: %g\n", stats->avMsgSize);
  401. fprintf (fp, " Average time to me: %g\n", stats->avTimeToMe);
  402. for (size_t i =0 ; i<AEMLIST_SIZE ; ++i) {
  403. fprintf (fp, " Device %u found on %lld, last: %lld\n",
  404. devList[i].dev, devList[i].begin, devList[i].end);
  405. }
  406. fclose (fp);
  407. return MSG_OK;
  408. }
  409. //! @}