Browse Source

Seeker now is seperate thread

master
Christos Houtouridis 5 years ago
parent
commit
0f1f9ac088
7 changed files with 127 additions and 50 deletions
  1. +33
    -12
      src/client.c
  2. +1
    -1
      src/client.h
  3. +54
    -11
      src/core.c
  4. +4
    -0
      src/core.h
  5. +8
    -3
      src/listener.c
  6. +14
    -12
      src/main.c
  7. +13
    -11
      src/msg_impl.h

+ 33
- 12
src/client.c View File

@@ -46,7 +46,11 @@ static bool_t ping (devAEM_t dev) {
*/ */
static size_t seeker (void) { static size_t seeker (void) {
size_t cnt =0; // count devices on range size_t cnt =0; // count devices on range
sleep (settings.seekerInterval);
log_debug ("Debug: Pinging devices...\n"); log_debug ("Debug: Pinging devices...\n");
devList_acquire ();
for (int i=0 ; i<AEMLIST_SIZE ; ++i) { for (int i=0 ; i<AEMLIST_SIZE ; ++i) {
if (devList[i].dev == settings.me) {// Don't ping me, I'm not here, go away... if (devList[i].dev == settings.me) {// Don't ping me, I'm not here, go away...
devList[i].onRange = false; devList[i].onRange = false;
@@ -59,11 +63,16 @@ static size_t seeker (void) {
if (!devList[i].begin) if (!devList[i].begin)
devList[i].begin = time(NULL); // first time only devList[i].begin = time(NULL); // first time only
devList[i].end = time(NULL); // every time devList[i].end = time(NULL); // every time
log_debug ("Debug: Device %u found\n", devList[i].dev);
devList_t cDev = devList[i];
devList_release ();
log_debug ("Debug: Device %u found\n", cDev.dev);
devList_acquire ();
} }
else else
devList[i].onRange = false; // Where is everybody? devList[i].onRange = false; // Where is everybody?
} }
statsTimesPrint (devList);
devList_release ();
log_debug ("Debug: %d devices found\n", cnt); log_debug ("Debug: %d devices found\n", cnt);
return cnt; return cnt;
} }
@@ -209,32 +218,31 @@ static void client (void) {
msgList_release (); msgList_release ();
log_debug ("Debug: Message added to msgList at %d\n", at); log_debug ("Debug: Message added to msgList at %d\n", at);
if (!seeker ()) // If we are alone skip the rest
continue;
log_debug ("Debug: Devices found on range\n");
msgList_acquire (); msgList_acquire ();
mIter_t it = msgList_begin (&msgList); // get a message iterator mIter_t it = msgList_begin (&msgList); // get a message iterator
// for each message -> for each recipient // for each message -> for each recipient
for (size_t i=0 ; i<msgList_size(&msgList) ; ++i, msgList_preInc (&it)) { for (size_t i=0 ; i<msgList_size(&msgList) ; ++i, msgList_preInc (&it)) {
for (size_t j=0 ; j<AEMLIST_SIZE ; ++j) { for (size_t j=0 ; j<AEMLIST_SIZE ; ++j) {
// get current dev instance
devList_acquire ();
devList_t currentDev = devList[j];
devList_release ();
// check when to send // check when to send
if (devList[j].onRange // is on range
if (currentDev.onRange // is on range
&& !msgList.m[it].recipients[j] // we haven't send the message to that device && !msgList.m[it].recipients[j] // we haven't send the message to that device
&& msgList.m[it].cMsg.to != settings.me // the message it's not for me && msgList.m[it].cMsg.to != settings.me // the message it's not for me
) { ) {
if (sendMsg (devList[j].dev, &msgList.m[it].cMsg)) {
if (sendMsg (currentDev.dev, &msgList.m[it].cMsg)) {
msgList.m[it].recipients[j] = true; msgList.m[it].recipients[j] = true;
msgList_release (); msgList_release ();
statsUpdateOut (&msg, devList[j].dev);
log_msg_out (&msg, devList[j].dev);
log_debug ("Debug: Send message to device %u succeed\n", devList[j].dev);
statsUpdateOut (&msg, currentDev.dev);
log_msg_out (&msg, currentDev.dev);
log_debug ("Debug: Send message to device %u succeed\n", currentDev.dev);
msgList_acquire (); msgList_acquire ();
} }
else { else {
msgList_release (); msgList_release ();
log_debug ("Debug: Send message to device %u failed\n", devList[j].dev);
log_debug ("Debug: Send message to device %u failed\n", currentDev.dev);
msgList_acquire (); msgList_acquire ();
} }
//^ we try to send the message and mark the transmission on success //^ we try to send the message and mark the transmission on success
@@ -249,6 +257,19 @@ static void client (void) {
return; return;
} }
/*!
* pthread wrapper for \sa seeker()
* @param ptr
*/
void* pthSeeker (void* ptr) {
(void)&ptr; // use parameter
while (true)
seeker ();
exit(1); // we should not be here
return NULL;
}
/*! /*!
* pthread wrapper for \sa client() * pthread wrapper for \sa client()
* @param ptr * @param ptr


+ 1
- 1
src/client.h View File

@@ -10,7 +10,7 @@
#include "core.h" #include "core.h"
#include "msg_impl.h" #include "msg_impl.h"
void* pthSeeker (void* ptr);
void* pthClient (void* ptr); void* pthClient (void* ptr);
#endif /* __client_h__ */ #endif /* __client_h__ */

+ 54
- 11
src/core.c View File

@@ -23,6 +23,7 @@ msgList_t msgList; //!< The message list for our application.
* Local data types * Local data types
*/ */
static pthread_mutex_t lock_msgList; //!< mutex for msgList locking static pthread_mutex_t lock_msgList; //!< mutex for msgList locking
static pthread_mutex_t lock_devList; //!< mutex for devList locking
static pthread_mutex_t lock_stderr; //!< mutex for stderr locking static pthread_mutex_t lock_stderr; //!< mutex for stderr locking
static pthread_mutex_t lock_stdout; //!< mutex for stderr locking static pthread_mutex_t lock_stdout; //!< mutex for stderr locking
static pthread_mutex_t lock_stats; //!< mutex for stats locking static pthread_mutex_t lock_stats; //!< mutex for stats locking
@@ -366,18 +367,26 @@ void msg_init (msg_t* msg) {
//! @} //! @}
//! msgList API
//! devList API
//! @{ //! @{
/*! Macro helper to saturate increased values */
#define _top_saturate(test, apply, value) do { \
if (test >= value) apply = value; \
} while (0)
/*!
* Initialize the devList
* @param msgList Pointer to mesList t initialize
* @return The status of the operation
*/
status_t devList_init (devList_t* devList) {
devAEM_t l[] = AEMLIST;
if (pthread_mutex_init(&lock_devList, NULL) != 0) {
log_error ("Error: mutex init has failed\n");
return MSG_ERROR;
}
memset ((void*)devList, 0, sizeof(devList_t));
for (size_t i =0 ; i<AEMLIST_SIZE ; ++i)
devList[i].dev = l[i];
/*! Macro helper to saturate decreased values */
#define _btm_saturate(test, apply, value) do { \
if (test < value) apply = value; \
while (0)
return MSG_OK;
}
/*! /*!
* Returns an iterator for \sa devList AND \sa msg_t.recipients * Returns an iterator for \sa devList AND \sa msg_t.recipients
@@ -392,6 +401,26 @@ dIter_t devList_getIter (devAEM_t dev) {
return -1; // return end() return -1; // return end()
} }
//! Acquires devList resources
void devList_acquire (void) { pthread_mutex_lock(&lock_devList); }
//! Releases devList resources
void devList_release (void) { pthread_mutex_unlock(&lock_devList); }
//! @}
//! msgList API
//! @{
/*! Macro helper to saturate increased values */
#define _top_saturate(test, apply, value) do { \
if (test >= value) apply = value; \
} while (0)
/*! Macro helper to saturate decreased values */
#define _btm_saturate(test, apply, value) do { \
if (test < value) apply = value; \
while (0)
/*! /*!
* Initialize the msgList * Initialize the msgList
* @param msgList Pointer to mesList t initialize * @param msgList Pointer to mesList t initialize
@@ -643,14 +672,28 @@ status_t statsPrint (stats_t* stats) {
fprintf (fp, "Out direct messages: %d\n", stats->outDirectMsg); fprintf (fp, "Out direct messages: %d\n", stats->outDirectMsg);
fprintf (fp, "Average message size: %g\n", stats->avMsgSize); fprintf (fp, "Average message size: %g\n", stats->avMsgSize);
fprintf (fp, "Average time to me: %g\n", stats->avTimeToMe); fprintf (fp, "Average time to me: %g\n", stats->avTimeToMe);
fclose (fp);
return MSG_OK;
}
/*!
* Device online timing print functionality
* @param devList Pointer to devList to print
* @return The status of the operation
*/
status_t statsTimesPrint (devList_t *devList) {
FILE* fp = fopen ("devices.txt", "w");
if (fp == NULL) {
fclose (fp);
return MSG_ERROR;
}
fprintf (fp, "\n Device timings\n================\n");
for (size_t i =0 ; i<AEMLIST_SIZE ; ++i) { for (size_t i =0 ; i<AEMLIST_SIZE ; ++i) {
fprintf (fp, " Device %u found on %lld, last: %lld\n",
fprintf (fp, "Device %u found on %lld, last: %lld\n",
devList[i].dev, devList[i].begin, devList[i].end); devList[i].dev, devList[i].begin, devList[i].end);
} }
fclose (fp); fclose (fp);
return MSG_OK; return MSG_OK;
} }
//! @} //! @}

+ 4
- 0
src/core.h View File

@@ -29,7 +29,10 @@ bool_t cMsg_equal (cMsg_t* m1, cMsg_t* m2);
void msg_init (msg_t* msg); void msg_init (msg_t* msg);
status_t devList_init (devList_t* devList);
dIter_t devList_getIter (devAEM_t dev); dIter_t devList_getIter (devAEM_t dev);
void devList_acquire (void);
void devList_release (void);
mIter_t msgList_preInc (mIter_t* it); mIter_t msgList_preInc (mIter_t* it);
mIter_t msgList_preDec (mIter_t* it); mIter_t msgList_preDec (mIter_t* it);
@@ -47,6 +50,7 @@ void statsUpdateCreate (msg_t* msg);
void statsUpdateIn (msg_t* msg, bool_t dup); void statsUpdateIn (msg_t* msg, bool_t dup);
void statsUpdateOut (msg_t* msg, devAEM_t dev); void statsUpdateOut (msg_t* msg, devAEM_t dev);
status_t statsPrint (stats_t* stats); status_t statsPrint (stats_t* stats);
status_t statsTimesPrint (devList_t *devList);
status_t log_init(void); status_t log_init(void);
void log_msg_in (msg_t* msg); void log_msg_in (msg_t* msg);


+ 8
- 3
src/listener.c View File

@@ -43,14 +43,19 @@ static void listen_handler (devAEM_t dev, char_t* buffer, size_t size) {
// We have a copy // We have a copy
msgList_release (); msgList_release ();
statsUpdateIn (&msg, true); // message process statsUpdateIn (&msg, true); // message process
log_debug("Debug: Duplicate message from: %d\n", msg.sender);
log_debug ("Debug: Duplicate message from: %d\n", msg.sender);
} }
// Processing... // Processing...
devList_acquire();
dIter_t d = devList_getIter (dev);
dIter_t f = devList_getIter (msg.cMsg.from);
devList_release();
msgList_acquire (); msgList_acquire ();
// Do not echo message to sender, he already has it // Do not echo message to sender, he already has it
msgList.m[myCopy].recipients[devList_getIter (dev)] = true;
msgList.m[myCopy].recipients[d] = true;
// don't push back message to creator, he already has it // don't push back message to creator, he already has it
msgList.m[myCopy].recipients[devList_getIter (msg.cMsg.from)] = true;
msgList.m[myCopy].recipients[f] = true;
msgList_release (); msgList_release ();
} }


+ 14
- 12
src/main.c View File

@@ -1,6 +1,6 @@
/*! /*!
* \file main.c * \file main.c
* This is the main file of the RTES final task
* This is the main file of the RTES final task.
* *
* \author Christos Choutouridis AEM:8997 <cchoutou@ece.auth.gr> * \author Christos Choutouridis AEM:8997 <cchoutou@ece.auth.gr>
*/ */
@@ -21,24 +21,23 @@
*/ */
//! @{ //! @{
settings_t settings_init (settings);
devList_t devList_init(devList[AEMLIST_SIZE]);
stats_t stats;
settings_t settings_init (settings); //!< Application settings
devList_t devList[AEMLIST_SIZE]; //!< Device list
stats_t stats; //!< Statistical data
//! @} //! @}
/*! /*!
* CLI short options * CLI short options
*/ */
const char *short_opt = "li:v:p:s:w:th";
const char *short_opt = "v:i:p:s:w:th";
/*! /*!
* CLI long options * CLI long options
*/ */
const struct option long_opt[] = { const struct option long_opt[] = {
{"port", required_argument, NULL, 'l'},
{"interval", required_argument, NULL, 'i'},
{"outlevel", required_argument, NULL, 'v'}, {"outlevel", required_argument, NULL, 'v'},
{"interval", required_argument, NULL, 'i'},
{"pingtimeout",required_argument, NULL, 'p'}, {"pingtimeout",required_argument, NULL, 'p'},
{"sendtimeout",required_argument, NULL, 's'}, {"sendtimeout",required_argument, NULL, 's'},
{"who", required_argument, NULL, 'w'}, {"who", required_argument, NULL, 'w'},
@@ -50,7 +49,7 @@ const struct option long_opt[] = {
/*! /*!
* \brief * \brief
* Parse input argument and fill the kcli_input_t struct * Parse input argument and fill the kcli_input_t struct
* \param in Pointer to \ref kcli_input_t structure to fill
* \param s Pointer to settings_t data to fill
* \param argc The argument count as passed to the main() * \param argc The argument count as passed to the main()
* \param argv Argument array as passed to the main() * \param argv Argument array as passed to the main()
* \return The status of the operation * \return The status of the operation
@@ -65,13 +64,12 @@ int parse_args (settings_t *s, int argc, char const *argv[]) {
case -1: /* no more arguments */ case -1: /* no more arguments */
case 0: /* long options toggles */ case 0: /* long options toggles */
break; break;
case 'l': s->port = atoi(optarg); break;
case 'i': s->msgInterval = atoi (optarg); break;
case 'v': case 'v':
s->outLevel = atoi (optarg); s->outLevel = atoi (optarg);
if (s->outLevel >= OUTLEVEL_2) s->outLevel = OUTLEVEL_2; if (s->outLevel >= OUTLEVEL_2) s->outLevel = OUTLEVEL_2;
if (s->outLevel < OUTLEVEL_0) s->outLevel = OUTLEVEL_0; if (s->outLevel < OUTLEVEL_0) s->outLevel = OUTLEVEL_0;
break; break;
case 'i': s->seekerInterval = atoi (optarg); break;
case 'p': s->pingTimeout = atoi (optarg); break; case 'p': s->pingTimeout = atoi (optarg); break;
case 's': s->sendTimeout.tv_sec = atoi (optarg); break; case 's': s->sendTimeout.tv_sec = atoi (optarg); break;
case 'w': s->me = atoi (optarg); break; case 'w': s->me = atoi (optarg); break;
@@ -89,22 +87,26 @@ int parse_args (settings_t *s, int argc, char const *argv[]) {
} }
int main(int argc, char const *argv[]) {
int main (int argc, char const *argv[]) {
// get command line arguments
parse_args (&settings, argc, argv); parse_args (&settings, argc, argv);
// Initialize all subsystems // Initialize all subsystems
log_init (); log_init ();
stats_init (&stats); stats_init (&stats);
devList_init (devList);
msgList_init (&msgList); msgList_init (&msgList);
// Create threads // Create threads
pthread_t ptL, ptC;
pthread_t ptL, ptS, ptC;
pthread_create (&ptL, NULL, pthListener, NULL); pthread_create (&ptL, NULL, pthListener, NULL);
pthread_create (&ptS, NULL, pthSeeker, NULL);
pthread_create (&ptC, NULL, pthClient, NULL); pthread_create (&ptC, NULL, pthClient, NULL);
// block here // block here
pthread_join (ptL, NULL); pthread_join (ptL, NULL);
pthread_join (ptS, NULL);
pthread_join (ptC, NULL); pthread_join (ptC, NULL);
return 0; return 0;
} }

+ 13
- 11
src/msg_impl.h View File

@@ -21,17 +21,17 @@
*/ */
#define AEMLIST_SIZE (10) #define AEMLIST_SIZE (10)
#define devList_init(l) l = { \
{ 7700, 0, 0, 0}, \
{ 8261, 0, 0, 0}, \
{ 8765, 0, 0, 0}, \
{ 8844, 0, 0, 0}, \
{ 8880, 0, 0, 0}, \
{ 8861, 0, 0, 0}, \
{ 8877, 0, 0, 0}, \
{ 8941, 0, 0, 0}, \
{ 8934, 0, 0, 0}, \
{ 8997, 0, 0, 0} \
#define AEMLIST { \
7700, \
8261, \
8765, \
8844, \
8880, \
8861, \
8877, \
8941, \
8934, \
8997 \
} }
/*! /*!
@@ -217,6 +217,7 @@ typedef enum {
typedef struct { typedef struct {
devAEM_t me; devAEM_t me;
uint16_t port; uint16_t port;
time_t seekerInterval;
time_t msgInterval; time_t msgInterval;
time_t msgRand; time_t msgRand;
outLevel_en outLevel; outLevel_en outLevel;
@@ -230,6 +231,7 @@ extern settings_t settings;
#define settings_init(s) s = { \ #define settings_init(s) s = { \
.me = 8997, \ .me = 8997, \
.port = 2288, \ .port = 2288, \
.seekerInterval = 30, \
.msgInterval = 60, \ .msgInterval = 60, \
.msgRand = 240, \ .msgRand = 240, \
.outLevel = OUTLEVEL_1, \ .outLevel = OUTLEVEL_1, \


Loading…
Cancel
Save