Quellcode durchsuchen

Testing version

master
Christos Houtouridis vor 4 Jahren
Ursprung
Commit
c9f13c07c8
10 geänderte Dateien mit 695 neuen und 595 gelöschten Zeilen
  1. +4
    -2
      eclipse_wifi/.cproject
  2. +2
    -2
      eclipse_wifi/.settings/language.settings.xml
  3. +184
    -86
      src/client.c
  4. +2
    -1
      src/client.h
  5. +226
    -173
      src/core.c
  6. +31
    -15
      src/core.h
  7. +79
    -55
      src/listener.c
  8. +1
    -2
      src/listener.h
  9. +85
    -215
      src/main.c
  10. +81
    -44
      src/msg_impl.h

+ 4
- 2
eclipse_wifi/.cproject Datei anzeigen

@@ -26,6 +26,7 @@
<option defaultValue="gnu.c.debugging.level.max" id="gnu.c.compiler.option.debugging.level.552014651" name="Debug Level" superClass="gnu.c.compiler.option.debugging.level" useByScannerDiscovery="false" valueType="enumerated"/>
<option id="gnu.c.compiler.option.dialect.std.575197221" name="Language standard" superClass="gnu.c.compiler.option.dialect.std" useByScannerDiscovery="true" value="gnu.c.compiler.dialect.c11" valueType="enumerated"/>
<option id="gnu.c.compiler.option.misc.other.1069586865" name="Other flags" superClass="gnu.c.compiler.option.misc.other" useByScannerDiscovery="false" value="-c -fmessage-length=0" valueType="string"/>
<option IS_BUILTIN_EMPTY="false" IS_VALUE_EMPTY="true" id="gnu.c.compiler.option.preprocessor.def.symbols.846393473" name="Defined symbols (-D)" superClass="gnu.c.compiler.option.preprocessor.def.symbols" useByScannerDiscovery="false" valueType="definedSymbols"/>
<inputType id="cdt.managedbuild.tool.gnu.c.compiler.input.1843280584" superClass="cdt.managedbuild.tool.gnu.c.compiler.input"/>
</tool>
<tool id="cdt.managedbuild.tool.gnu.cross.cpp.compiler.1306697816" name="Cross G++ Compiler" superClass="cdt.managedbuild.tool.gnu.cross.cpp.compiler">
@@ -33,7 +34,7 @@
<option defaultValue="gnu.cpp.compiler.debugging.level.max" id="gnu.cpp.compiler.option.debugging.level.419430772" name="Debug Level" superClass="gnu.cpp.compiler.option.debugging.level" useByScannerDiscovery="false" valueType="enumerated"/>
</tool>
<tool id="cdt.managedbuild.tool.gnu.cross.c.linker.2025263637" name="Cross GCC Linker" superClass="cdt.managedbuild.tool.gnu.cross.c.linker">
<option IS_BUILTIN_EMPTY="false" IS_VALUE_EMPTY="false" id="gnu.c.link.option.libs.1839870799" superClass="gnu.c.link.option.libs" useByScannerDiscovery="false" valueType="libs">
<option IS_BUILTIN_EMPTY="false" IS_VALUE_EMPTY="false" id="gnu.c.link.option.libs.1839870799" name="Libraries (-l)" superClass="gnu.c.link.option.libs" useByScannerDiscovery="false" valueType="libs">
<listOptionValue builtIn="false" srcPrefixMapping="" srcRootPath="" value="m"/>
<listOptionValue builtIn="false" srcPrefixMapping="" srcRootPath="" value="pthread"/>
</option>
@@ -81,6 +82,7 @@
<option defaultValue="gnu.c.debugging.level.none" id="gnu.c.compiler.option.debugging.level.1580896722" name="Debug Level" superClass="gnu.c.compiler.option.debugging.level" useByScannerDiscovery="false" valueType="enumerated"/>
<option id="gnu.c.compiler.option.dialect.std.190352368" name="Language standard" superClass="gnu.c.compiler.option.dialect.std" useByScannerDiscovery="true" value="gnu.c.compiler.dialect.c11" valueType="enumerated"/>
<option id="gnu.c.compiler.option.misc.other.274003114" name="Other flags" superClass="gnu.c.compiler.option.misc.other" useByScannerDiscovery="false" value="-c -fmessage-length=0" valueType="string"/>
<option IS_BUILTIN_EMPTY="false" IS_VALUE_EMPTY="true" id="gnu.c.compiler.option.preprocessor.def.symbols.1351750507" name="Defined symbols (-D)" superClass="gnu.c.compiler.option.preprocessor.def.symbols" useByScannerDiscovery="false" valueType="definedSymbols"/>
<inputType id="cdt.managedbuild.tool.gnu.c.compiler.input.1909710884" superClass="cdt.managedbuild.tool.gnu.c.compiler.input"/>
</tool>
<tool id="cdt.managedbuild.tool.gnu.cross.cpp.compiler.976316654" name="Cross G++ Compiler" superClass="cdt.managedbuild.tool.gnu.cross.cpp.compiler">
@@ -88,7 +90,7 @@
<option defaultValue="gnu.cpp.compiler.debugging.level.none" id="gnu.cpp.compiler.option.debugging.level.1210383027" name="Debug Level" superClass="gnu.cpp.compiler.option.debugging.level" useByScannerDiscovery="false" valueType="enumerated"/>
</tool>
<tool id="cdt.managedbuild.tool.gnu.cross.c.linker.1156911677" name="Cross GCC Linker" superClass="cdt.managedbuild.tool.gnu.cross.c.linker">
<option IS_BUILTIN_EMPTY="false" IS_VALUE_EMPTY="false" id="gnu.c.link.option.libs.1529695977" superClass="gnu.c.link.option.libs" valueType="libs">
<option IS_BUILTIN_EMPTY="false" IS_VALUE_EMPTY="false" id="gnu.c.link.option.libs.1529695977" name="Libraries (-l)" superClass="gnu.c.link.option.libs" valueType="libs">
<listOptionValue builtIn="false" srcPrefixMapping="" srcRootPath="" value="m"/>
<listOptionValue builtIn="false" srcPrefixMapping="" srcRootPath="" value="pthread"/>
</option>


+ 2
- 2
eclipse_wifi/.settings/language.settings.xml Datei anzeigen

@@ -5,7 +5,7 @@
<provider copy-of="extension" id="org.eclipse.cdt.ui.UserLanguageSettingsProvider"/>
<provider-reference id="org.eclipse.cdt.core.ReferencedProjectsLanguageSettingsProvider" ref="shared-provider"/>
<provider-reference id="org.eclipse.cdt.managedbuilder.core.MBSLanguageSettingsProvider" ref="shared-provider"/>
<provider class="org.eclipse.cdt.internal.build.crossgcc.CrossGCCBuiltinSpecsDetector" console="false" env-hash="-1387128168067808926" id="org.eclipse.cdt.build.crossgcc.CrossGCCBuiltinSpecsDetector" keep-relative-paths="false" name="CDT Cross GCC Built-in Compiler Settings" parameter="${COMMAND} ${FLAGS} -E -P -v -dD &quot;${INPUTS}&quot;" prefer-non-shared="true">
<provider class="org.eclipse.cdt.internal.build.crossgcc.CrossGCCBuiltinSpecsDetector" console="false" env-hash="755220398587321820" id="org.eclipse.cdt.build.crossgcc.CrossGCCBuiltinSpecsDetector" keep-relative-paths="false" name="CDT Cross GCC Built-in Compiler Settings" parameter="${COMMAND} ${FLAGS} -E -P -v -dD &quot;${INPUTS}&quot;" prefer-non-shared="true">
<language-scope id="org.eclipse.cdt.core.gcc"/>
<language-scope id="org.eclipse.cdt.core.g++"/>
</provider>
@@ -16,7 +16,7 @@
<provider copy-of="extension" id="org.eclipse.cdt.ui.UserLanguageSettingsProvider"/>
<provider-reference id="org.eclipse.cdt.core.ReferencedProjectsLanguageSettingsProvider" ref="shared-provider"/>
<provider-reference id="org.eclipse.cdt.managedbuilder.core.MBSLanguageSettingsProvider" ref="shared-provider"/>
<provider class="org.eclipse.cdt.internal.build.crossgcc.CrossGCCBuiltinSpecsDetector" console="false" env-hash="-1387128168067808926" id="org.eclipse.cdt.build.crossgcc.CrossGCCBuiltinSpecsDetector" keep-relative-paths="false" name="CDT Cross GCC Built-in Compiler Settings" parameter="${COMMAND} ${FLAGS} -E -P -v -dD &quot;${INPUTS}&quot;" prefer-non-shared="true">
<provider class="org.eclipse.cdt.internal.build.crossgcc.CrossGCCBuiltinSpecsDetector" console="false" env-hash="755220398587321820" id="org.eclipse.cdt.build.crossgcc.CrossGCCBuiltinSpecsDetector" keep-relative-paths="false" name="CDT Cross GCC Built-in Compiler Settings" parameter="${COMMAND} ${FLAGS} -E -P -v -dD &quot;${INPUTS}&quot;" prefer-non-shared="true">
<language-scope id="org.eclipse.cdt.core.gcc"/>
<language-scope id="org.eclipse.cdt.core.g++"/>
</provider>


+ 184
- 86
src/client.c Datei anzeigen

@@ -4,108 +4,206 @@
* \author Christos Choutouridis AEM:8997 <cchoutou@ece.auth.gr>
*/
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/ip_icmp.h>
#include <sys/time.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include <sys/select.h>
#include <fcntl.h>
#include <errno.h>
#include "client.h"
// Calculating the Check Sum
static unsigned short _checksum(void *b, int len) {
unsigned short *buf = b;
unsigned int sum=0;
unsigned short result;
for ( sum = 0; len > 1; len -= 2 )
sum += *buf++;
if ( len == 1 )
sum += *(unsigned char*)buf;
sum = (sum >> 16) + (sum & 0xFFFF);
sum += (sum >> 16);
result = ~sum;
return result;
static bool_t ping (devAEM_t dev) {
char_t cmd[72];
devIP_t ip = AEM2ip (dev);
// ask host to ping and make a little pre-process before take the answer
sprintf (cmd,
"test $(ping -c1 -w%d %u.%u.%u.%u| grep received |cut -d' ' -f4) = 1",
settings.pingTimeout, ip.A, ip.B, ip.C, ip.D
);
return (system(cmd) == 0) ? true:false;
}
static size_t seeker (void) {
size_t cnt =0; // count devices on range
log_debug ("Debug: Pinging devices...\n");
for (int i=0 ; i<AEMLIST_SIZE ; ++i) {
if (devList[i].dev == settings.me) {// Don't ping me, I'm not here, go away...
devList[i].dev = false;
continue;
}
if (ping (devList[i].dev)) { // Noc noc....
devList[i].onRange = true; // Who's there?
++cnt; // No one, bye bye!
log_debug ("Debug: Device %u found\n", devList[i].dev);
}
else
devList[i].onRange = false; // Where is everybody?
}
log_debug ("Debug: %d devices found\n", cnt);
return cnt;
}
static void _mk_ping_pkt (ping_pkt_t* pkt) {
memset ((void*)&pkt->hdr, 0, sizeof(pkt->hdr));
//memset ((void*)&pkt->msg, '0', sizeof(pkt->msg));
pkt->msg[PING_MSG_S -1] = 0;
static bool_t sendMsg (devAEM_t dev, cMsg_t* msg) {
int sock;
sockaddr_in_t srvAddr;
timeval_t timeout = settings.sendTimeout;
long arg;
bool_t ret = true; // have faith
// Make address
memset(&srvAddr, 0, sizeof (srvAddr));
srvAddr.sin_family= AF_INET;
srvAddr.sin_port= htons (settings.port);
srvAddr.sin_addr.s_addr = htonl (devAEM2addr (dev));
devIP_t ip = AEM2ip (dev);
// Create socket for sending
if ((sock = socket (PF_INET, SOCK_STREAM, 0)) == -1)
return false;
log_debug ("Debug: Socket for sending to %u.%u.%u.%u created\n", ip.A, ip.B, ip.C, ip.D);
do {
// Set non-blocking connect
if((arg = fcntl(sock, F_GETFL, NULL)) < 0) {
ret = false;
log_debug ("Debug: Reading socket's file status failed\n");
break;
}
if(fcntl(sock, F_SETFL, arg | O_NONBLOCK) < 0) {
ret = false;
log_debug ("Debug: Set socket to non-blocking mode failed\n");
break;
}
log_debug ("Debug: Socket switched to non-blocking mode\n");
// Trying to connect with timeout
log_debug ("Debug: Try to connect with timeout\n");
if (connect (sock, (sockaddr_t*)&srvAddr, sizeof(srvAddr)) == -1) {
if (errno == EINPROGRESS) {
fd_set myset;
FD_ZERO (&myset);
FD_SET (sock, &myset);
if (select (sock+1, NULL, &myset, NULL, &timeout) <= 0) {
ret = false;
log_debug ("Debug: Connection failed in select()\n");
break;
}
// Socket selected for write
int valopt;
socklen_t lon = sizeof(int);
if (getsockopt(sock, SOL_SOCKET, SO_ERROR, (void*)(&valopt), &lon) < 0) {
ret = false;
log_debug ("Debug: Reading socket's options failed\n");
break;
}
// Check the value returned...
if (valopt) {
ret = false;
log_debug ("Debug: Delayed connection failed\n");
break;
}
}
else {
ret = false;
log_debug ("Debug: Connection failed in select()\n");
break;
}
}
log_debug ("Debug: Connection to %u.%u.%u.%u succeed\n", ip.A, ip.B, ip.C, ip.D);
// Set to blocking mode again...
if( (arg = fcntl(sock, F_GETFL, NULL)) < 0) {
ret = false;
log_debug ("Debug: Reading socket's file status failed\n");
break;
}
arg &= (~O_NONBLOCK);
if( fcntl(sock, F_SETFL, arg) < 0) {
ret = false;
log_debug ("Debug: Set socket back to blocking mode failed\n");
break;
}
log_debug ("Debug: Socket switched to blocking mode\n");
// send the data with timeout
if (setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO, (void*)&timeout, sizeof(timeout)) == -1) {
ret = false;
log_debug ("Debug: Setting send timeout failed\n");
break;
}
log_debug ("Debug: Setting send timeout succeed\n");
if (send (sock, msg->text, strlen(msg->text), MSG_CONFIRM) == -1) {
ret = false;
log_debug ("Debug: Sending failed\n");
break;
}
log_debug ("Debug: Sending succeed\n");
} while (0);
pkt->hdr.type = ICMP_ECHO;
pkt->hdr.un.echo.id = getpid();
pkt->hdr.un.echo.sequence = 0;
pkt->hdr.checksum = _checksum (&pkt, sizeof(pkt));
close (sock);
log_debug ("Debug: Closing socket\n");
return ret;
}
bool ping (device_t* dev) {
static int sockfd =-1;
int ttl_val =64;
struct timeval tv_out = { 1, 0 };
static status_t client (void) {
msg_t msg; // new message buffer
while (true) {
sleep (settings.msgInterval); // Idle until the time comes
memset ((void*)&msg, 0, sizeof (msg)); // create a new message
cMsg_make (&msg.cMsg);
msg.sender = settings.me;
log_debug ("Debug: Message created for %d\n", msg.cMsg.to);
statsUpdateCreate (&msg);
msgList_acquire (); // try to lock resources
mIter_t at = msgList_add (&msgList, &msg); // Add message to msgList
log_debug ("Debug: Message added to msgList at %d\n", at);
if (sockfd == -1 && dev) {
// create socket and set options at ip to TTL and value to 64 and timeout of recv
if ((sockfd = socket(AF_INET, SOCK_RAW, IPPROTO_ICMP)) == -1) {
log_error ("Error: Can not create raw socket for icmp\n");
return false;
if (!seeker ()) { // If we are alone skip the rest
msgList_release (); // but unlock resources first
continue;
}
#ifndef NO_DEBUG
log_debug("Debug: Raw socket for icmp created\n");
#endif
int ret = setsockopt(sockfd, SOL_IP, IP_TTL, &ttl_val, sizeof(ttl_val));
ret |= setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv_out, sizeof(tv_out));
if (ret == -1) {
log_error ("Error: Can not set options to socket\n");
close (sockfd);
sockfd =-1;
return false;
log_debug ("Debug: Devices found on range\n");
mIter_t it = msgList_begin (&msgList); // get a message iterator
// begin with old messages first
// for each message -> for each recipient
for (size_t i=0 ; i<msgList_size(&msgList) ; ++i, msgList_preInc (&it)) {
for (size_t j=0 ; j<AEMLIST_SIZE ; ++j) {
// check when to send
if (devList[j].onRange // is on range
&& !msgList.m[it].recipients[j] // we haven't send the message to that device
&& msgList.m[it].cMsg.to != settings.me // the message it's not for me
) {
if (sendMsg (devList[j].dev, &msgList.m[it].cMsg)) {
msgList.m[it].recipients[j] = true;
statsUpdateOut (&msg, devList[j].dev);
log_debug ("Debug: Send message to device %u succeed\n", devList[j].dev);
}
else {
log_debug ("Debug: Send message to device %u failed\n", devList[j].dev);
}
//^ we try to send the message and mark the transmission on success
// if send fail, don't worry it may succeed the next time.
}
}
}
#ifndef NO_DEBUG
log_debug("Debug: Raw socket options set\n");
#endif
msgList_release (); // Unlock resources
}
return MSG_ERROR;
}
if (!dev) {
// Close socket
close (sockfd);
sockfd =-1;
#ifndef NO_DEBUG
log_debug ("Debug: Raw socket closed\n");
#endif
return true;
}
else {
// ping packet structure
ping_pkt_t pkt;
_mk_ping_pkt (&pkt);
int st;
//send packet
struct sockaddr_in ping_addr = {
.sin_family = AF_INET,
.sin_addr = { htonl (device2addr (dev)) }
};
if ((st=sendto (sockfd, &pkt, sizeof(pkt), 0, (struct sockaddr*)&ping_addr, sizeof(ping_addr))) <= 0)
return false;
#ifndef NO_DEBUG
log_debug ("Debug: Echo send to %s\n", inet_ntoa(ping_addr.sin_addr));
#endif
//receive packet
struct sockaddr_in r_addr;
socklen_t addr_len =sizeof(r_addr);
if ((st=recvfrom (sockfd, &pkt, sizeof(pkt), 0, (struct sockaddr*)&r_addr, &addr_len)) <= 0)
return false;
#ifndef NO_DEBUG
log_debug ("Debug: Echo received from %s\n", inet_ntoa(ping_addr.sin_addr));
#endif
return (pkt.hdr.type == 69 && pkt.hdr.code == 0) ? true: false;
}
return false;
void* pthClient (void* ptr) {
(void)&ptr; // use parameter
if (client () == MSG_ERROR)
exit(1); // we should not be here, client never returns
return NULL;
}

+ 2
- 1
src/client.h Datei anzeigen

@@ -10,6 +10,7 @@
#include "core.h"
#include "msg_impl.h"
bool ping (device_t* dev);
void* pthClient (void* ptr);
#endif /* __client_h__ */

+ 226
- 173
src/core.c Datei anzeigen

@@ -9,61 +9,158 @@
#include <stdlib.h>
#include <string.h>
#include <stdarg.h>
#include <pthread.h>
#include "core.h"
pthread_mutex_t lock_msgList;
pthread_mutex_t lock_stderr;
pthread_mutex_t lock_stdout;
pthread_mutex_t lock_stats;
//! Helper API
//! @{
#define _ADHOC_SUBNET(A, B, C, D) (((A)<<24) | ((B)<<16) | ((C)<<8) | (D))
device_t addr2device (uint32_t in_addr) {
device_t dev = {
.id = (in_addr & 0x000000FF) + ((in_addr >> 8) & 0x000000FF) * 100,
.next = NULL
};
return dev;
devAEM_t addr2devAEM (uint32_t in_addr) {
return (in_addr & 0x000000FF) + ((in_addr >> 8) & 0x000000FF) * 100;
}
uint32_t device2addr (const device_t* dev) {
uint32_t add = _adhoc_subnet;
add |= (dev->id % 100) & 0x000000FF;
add |= ((dev->id / 100) & 0x000000FF) << 8;
in_addr_t devAEM2addr (devAEM_t dev) {
uint32_t add = _ADHOC_SUBNET (ADHOC_NET_A, ADHOC_NET_B, ADHOC_NET_C, ADHOC_NET_D);
add |= (dev % 100) & 0x000000FF;
add |= ((dev / 100) & 0x000000FF) << 8;
return add;
}
device_t ip2device (devIP_t* ip) {
device_t dev = {
.id = ip->C*100 + ip->D,
.next = NULL
};
return dev;
devAEM_t ip2AEM (devIP_t* ip) {
return ip->C*100 + ip->D;
}
devIP_t device2ip (const device_t* dev) {
devIP_t AEM2ip (devAEM_t dev) {
devIP_t ip = {
.A=0, .B=0, .C=dev->id/100, .D=dev->id%100
.A =ADHOC_NET_A, .B=ADHOC_NET_B, .C=dev/100, .D=dev%100
};
return ip;
}
devIP_t addr2ip (uint32_t in_addr) {
devIP_t addr2ip (in_addr_t in_addr) {
devIP_t ip = {
.A=0, .B=0,
.C=(in_addr >> 8) & 0x000000FF,
.D=in_addr & 0x000000FF
.A = ADHOC_NET_A,
.B = ADHOC_NET_B,
.C = (in_addr >> 8) & 0x000000FF,
.D = in_addr & 0x000000FF
};
return ip;
}
//! @}
//! log API
//! @{
static char_t* _frm_msg_io = "dev=%d, message: from=%d, to=%d, timestamp=%lld, text=%s";
static char_t* _frm_msg_new = "new message: from=%d, to=%d, timestamp=%lld, text=%s";
status_t log_init (void) {
if (pthread_mutex_init(&lock_stderr, NULL) != 0) {
fprintf (stderr, "Error %s: mutex init has failed\n", __FUNCTION__ );
return MSG_ERROR;
}
if (pthread_mutex_init(&lock_stdout, NULL) != 0) {
fprintf (stderr, "Error %s: mutex init has failed\n", __FUNCTION__ );
return MSG_ERROR;
}
return MSG_OK;
}
void log_msg_io (msg_t* msg) {
if (settings.outLevel >= OUTLEVEL_1) {
char_t head[16];
strncpy (head, cMsg_getText(&msg->cMsg), sizeof(head));
head[16] = 0;
pthread_mutex_lock(&lock_stdout);
fprintf (stdout, _frm_msg_io,
msg->sender,
cMsg_getFrom (&msg->cMsg),
cMsg_getTo (&msg->cMsg),
cMsg_getTs (&msg->cMsg),
head
);
fflush(stdout);
pthread_mutex_unlock(&lock_stdout);
}
}
void log_msg_new (msg_t* msg) {
if (settings.outLevel >= OUTLEVEL_1) {
char_t head[16];
strncpy (head, cMsg_getText(&msg->cMsg), sizeof(head));
head[16] = 0;
pthread_mutex_lock(&lock_stdout);
fprintf (stdout, _frm_msg_new,
cMsg_getFrom (&msg->cMsg),
cMsg_getTo (&msg->cMsg),
cMsg_getTs (&msg->cMsg),
head
);
fflush(stdout);
pthread_mutex_unlock(&lock_stdout);
}
}
void log_debug (const char *fmt, ...) {
if (settings.outLevel >= OUTLEVEL_2) {
va_list ap;
va_start(ap, fmt);
pthread_mutex_lock(&lock_stdout);
vfprintf (stdout, fmt, ap);
fflush(stdout);
pthread_mutex_unlock(&lock_stdout);
va_end(ap);
}
}
void log_error (const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
pthread_mutex_lock(&lock_stderr);
vfprintf (stderr, fmt, ap);
fflush(stderr);
pthread_mutex_unlock(&lock_stderr);
va_end(ap);
}
//! @}
//! cMsg_t API
//! @{
/*!
* Make a new message
* @param msg Pointer to message to create
*/
void cMsg_make (cMsg_t* msg) {
static int msgID =0; // unique msg ID
msg->from = settings.me; // from me
msg->to = devList[rand()%AEMLIST_SIZE].dev; // randomly select device
msg->ts = time(NULL);
// stream the first fields and take the quote text iterator
msg->text_it = sprintf (msg->text, "%d_%d_%lld_",
msg->from,
msg->to,
msg->ts
);
// stream out the quote with a unique ID
sprintf (&msg->text[msg->text_it], MESSAGE_BODY" #%d", msgID++);
}
/*!
* Parse an incoming message
*
@@ -87,13 +184,13 @@ status_t cMsg_parse (cMsg_t* cMsg, char_t* rawMsg, size_t size) {
return MSG_ERROR;
// Get message
strcpy(cMsg->msg, rawMsg);
strcpy(cMsg->text, rawMsg);
// Parse message
char_t del[2] = {MSG_DELIMITER, '\0'};
char_t* tok;
tok = strtok (cMsg->msg, del);
tok = strtok (cMsg->text, del);
cMsg->from = atoi (tok);
tok = strtok(NULL, del);
@@ -103,7 +200,7 @@ status_t cMsg_parse (cMsg_t* cMsg, char_t* rawMsg, size_t size) {
cMsg->ts = atoll (tok);
tok = strtok(NULL, del);
cMsg->text = tok - cMsg->msg;
cMsg->text_it = tok - cMsg->text;
return MSG_OK;
}
@@ -115,7 +212,7 @@ uint32_t cMsg_getTo(cMsg_t* cMsg) { return cMsg->to; }
/*! getter for cMsg_t member fromAEM */
uint64_t cMsg_getTs(cMsg_t* cMsg) { return cMsg->ts; }
/*! getter for payload text member */
char_t* cMsg_getText(cMsg_t* cMsg) { return (char_t*)& cMsg->msg[cMsg->text]; }
char_t* cMsg_getText(cMsg_t* cMsg) { return (char_t*)& cMsg->text[cMsg->text_it]; }
/*!
* Predicate to check core message equality
@@ -123,16 +220,28 @@ char_t* cMsg_getText(cMsg_t* cMsg) { return (char_t*)& cMsg->msg[cMsg->text]; }
* @param m2 Pointer to message 2
* @return Equality result (true, false)
*/
bool cMsg_equal (cMsg_t* m1, cMsg_t* m2) {
bool_t cMsg_equal (cMsg_t* m1, cMsg_t* m2) {
if (m1->from != m2->from) return false;
if (m1->to != m2->to) return false;
if (m1->ts != m2->ts) return false;
if (strncmp (cMsg_getText(m1), cMsg_getText(m2), sizeof(m1->msg)))
if (strncmp (cMsg_getText(m1), cMsg_getText(m2), sizeof(m1->text)))
return false;
return true;
}
//! @}
/*!
* mgs_t API
*/
//! @{
void msg_init (msg_t* msg) {
memset ((void*)msg, 0, sizeof(msg_t));
}
//! @}
/*!
* Create a message list for our application.
*/
@@ -152,6 +261,27 @@ msgList_t msgList;
if (test < value) apply = value; \
while (0)
dIter_t devList_getIter (devAEM_t dev) {
for (dIter_t i =0 ; i<AEMLIST_SIZE ; ++i) {
if (devList[i].dev == dev)
return i;
}
return -1; // return end()
}
status_t msgList_init (msgList_t* msgList) {
if (pthread_mutex_init(&lock_msgList, NULL) != 0) {
log_error ("Error: mutex init has failed\n");
return MSG_ERROR;
}
memset((void*)msgList, 0, sizeof(msgList_t));
msgList->first =-1;
msgList->last =-1;
srand (time(NULL));
return MSG_OK;
}
/*!
* @brief msgList iterator pre-increment in the msg_t direction
*
@@ -162,7 +292,7 @@ while (0)
* @param it Pointer to iterator to increase
* @return The iterator values
*/
static mIter_t _preInc(mIter_t* it) {
mIter_t msgList_preInc (mIter_t* it) {
if (++*it >= MSG_LIST_SIZE) *it = 0;
return *it;
}
@@ -177,82 +307,21 @@ static mIter_t _preInc(mIter_t* it) {
* @param it Pointer to iterator to decrease
* @return The iterator values
*/
static mIter_t _preDec(mIter_t* it) {
mIter_t msgList_preDec (mIter_t* it) {
if (--*it < 0) *it = MSG_LIST_SIZE;
return *it;
}
/*!
* @brief Access devices in the device_t direction.
*
* This function returns the first device on the recipient list if there is one.
*
* @param this The msgList object to work with
* @param it The iterator value [msg_t direction]
* @return Pointer to first device on the list, or NULL if the list is empty
*/
static device_t* devList_get (msgList_t* this, mIter_t it) {
device_t* d = this->m[it].recipients;
return (d) ? d->next : NULL;
mIter_t msgList_begin (msgList_t* this) {
return this->first;
}
/*!
* Iterate through device list.
*
* This function return the next device on the recipient list if there is one.
* [device direction]
*
* @param d Pointer to current device
* @return Pointer to next device on the list, or NULL if the list is empty
*/
//static device_t* devList_getNext (device_t* d) {
// return (d) ? d->next : NULL;
//}
/*!
* @brief Adds a new device on the recipients list
*
* @param this The msgList object to work with
* @param it The iterator value [msg_t direction]
* @param dev Pointer to device to add [device direction]
* @return The status of the operation
* @arg MSG_OK Success
* @arg MSG_ERROR memory allocation error
*/
status_t devList_add (msgList_t* this, mIter_t it, device_t* dev) {
pthread_mutex_lock (&lock_msgList);
device_t* last = devList_get (this, it);
while (last && last->next)
last = last->next;
last = (device_t*)malloc (sizeof(device_t));
*last = *dev;
pthread_mutex_unlock (&lock_msgList);
return (last) ? MSG_OK : MSG_ERROR;
mIter_t msgList_last (msgList_t* this) {
return this->last;
}
/*!
* Frees up all allocated memory in a device list.
*
* @param this The msgList object to work with
* @param it The iterator value [msg_t direction]
*/
static void devList_free (msgList_t* this, mIter_t it) {
device_t* last = devList_get (this, it);
while (last) {
device_t* next = last->next;
free(last);
last = next;
}
}
status_t msgList_init (msgList_t* msgList) {
if (pthread_mutex_init(&lock_msgList, NULL) != 0) {
fprintf (stderr, "Error %s: mutex init has failed\n", __FUNCTION__ );
return MSG_ERROR;
}
memset((void*)msgList, 0, sizeof(msgList_t));
msgList->last =-1;
return MSG_OK;
size_t msgList_size (msgList_t* this) {
return this->size;
}
/*!
@@ -263,113 +332,97 @@ status_t msgList_init (msgList_t* msgList) {
* @return Iterator to message if found, or -1 if not
*/
mIter_t msgList_find (msgList_t* this, msg_t* msg) {
pthread_mutex_lock (&lock_msgList);
mIter_t it =this->last; // get iterator
mIter_t it =this->last; // get iterator
// search from end to start to find msg, return on success
for (size_t i=0 ; i < this->size ; ++i) {
if (cMsg_equal (&this->m[it].cMsg, &msg->cMsg)) {
pthread_mutex_unlock (&lock_msgList);
if (cMsg_equal (&this->m[it].cMsg, &msg->cMsg))
return it;
}
_preDec(&it);
msgList_preDec(&it);
// We start from the end as we think, its more possible
// to find msg in the recent messages.
}
pthread_mutex_unlock (&lock_msgList);
return (mIter_t)-1; // fail to find
}
/*!
* Add a new message in the message list
*
* @param this The msgList object to work with
* @param msg Pointer to message
* @return Iterator to the added item (last)
*/
void msgList_add (msgList_t* this, msg_t* msg) {
pthread_mutex_lock (&lock_msgList);
devList_free (this, _preInc(&this->last)); // free up possible previous recipients list
this->m[this->last] = *msg; // store data
pthread_mutex_unlock (&lock_msgList);
_top_saturate(++this->size, this->size, MSG_LIST_SIZE); // count the items
mIter_t msgList_add (msgList_t* this, msg_t* msg) {
if (this->first == -1) // if its first time init "first" iterator
this->first = 0;
this->m[msgList_preInc(&this->last)] = *msg; // store data *++it = *msg;
_top_saturate(++this->size, this->size, MSG_LIST_SIZE); // count the items with saturation
// if we reacher full capacity, move along first also
if ((this->first == this->last) && (this->size > 1))
msgList_preInc(&this->first);
return this->last; // return the iterator to newly created slot
}
//! @}
static char_t* _frm_msg_io = "dev=%d, message: from=%d, to=%d, timestamp=%lld, text=%s";
static char_t* _frm_msg_new = "new message: from=%d, to=%d, timestamp=%lld, text=%s";
void msgList_acquire () { pthread_mutex_lock (&lock_msgList); }
void msgList_release () { pthread_mutex_unlock (&lock_msgList); }
pthread_mutex_t lock_stderr;
pthread_mutex_t lock_stdout;
//! @}
//! Statistics API
//! @{
status_t log_init (void) {
if (pthread_mutex_init(&lock_stderr, NULL) != 0) {
fprintf (stderr, "Error %s: mutex init has failed\n", __FUNCTION__ );
status_t stats_init (stats_t* s) {
memset ((void*)s, 0, sizeof (stats_t));
if (pthread_mutex_init(&lock_stats, NULL) != 0) {
log_error ("Error: mutex init has failed\n");
return MSG_ERROR;
}
if (pthread_mutex_init(&lock_stdout, NULL) != 0) {
fprintf (stderr, "Error %s: mutex init has failed\n", __FUNCTION__ );
return MSG_ERROR;
}
return MSG_OK;
}
void log_msg_io (msg_t* msg) {
if (settings.outLevel >= OUTLEVEL_1) {
char_t head[16];
strncpy (head, cMsg_getText(&msg->cMsg), sizeof(head));
head[16] = 0;
pthread_mutex_lock(&lock_stdout);
fprintf (stdout, _frm_msg_io,
msg->sender.id,
cMsg_getFrom (&msg->cMsg),
cMsg_getTo (&msg->cMsg),
cMsg_getTs (&msg->cMsg),
head
);
fflush(stdout);
pthread_mutex_unlock(&lock_stdout);
}
}
void statsUpdateCreate (msg_t* msg) {
pthread_mutex_lock (&lock_stats);
void log_msg_new (msg_t* msg) {
if (settings.outLevel >= OUTLEVEL_1) {
char_t head[16];
strncpy (head, cMsg_getText(&msg->cMsg), sizeof(head));
head[16] = 0;
pthread_mutex_lock(&lock_stdout);
fprintf (stdout, _frm_msg_new,
cMsg_getFrom (&msg->cMsg),
cMsg_getTo (&msg->cMsg),
cMsg_getTs (&msg->cMsg),
head
);
fflush(stdout);
pthread_mutex_unlock(&lock_stdout);
}
++stats.totalMsg;
++stats.myMsg;
// average message size
uint32_t saved = stats.totalMsg - stats.duplicateMsg;
stats.avMsgSize += strlen(cMsg_getText(&msg->cMsg)) / (saved -1);
stats.avMsgSize *= (saved-1)/saved;
pthread_mutex_unlock (&lock_stats);
}
void log_debug (const char *fmt, ...) {
if (settings.outLevel >= OUTLEVEL_2) {
va_list ap;
va_start(ap, fmt);
pthread_mutex_lock(&lock_stdout);
vfprintf (stdout, fmt, ap);
fflush(stdout);
pthread_mutex_unlock(&lock_stdout);
va_end(ap);
void statsUpdateIn (msg_t* msg, bool_t dup) {
pthread_mutex_lock (&lock_stats);
bool_t forMe;
stats.totalMsg++;
stats.duplicateMsg += (dup) ? 1:0;
stats.forMeMsg += ((forMe = msg->cMsg.to == settings.me)) ? 1:0;
stats.inDirectMsg += (forMe && (msg->cMsg.from == msg->sender)) ? 1:0;
// averages
uint32_t saved = stats.totalMsg - stats.duplicateMsg;
stats.avMsgSize += strlen(cMsg_getText(&msg->cMsg)) / (saved -1);
stats.avMsgSize *= (saved-1)/saved;
if (settings.trackTime) {
stats.avTimeToMe += ((tstamp_t)time(NULL) - msg->cMsg.ts) / (saved -1);
stats.avTimeToMe *= (saved-1)/saved;
}
pthread_mutex_unlock (&lock_stats);
}
void log_error (const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
pthread_mutex_lock(&lock_stderr);
vfprintf (stderr, fmt, ap);
fflush(stderr);
pthread_mutex_unlock(&lock_stderr);
va_end(ap);
void statsUpdateOut (msg_t* msg, devAEM_t dev) {
pthread_mutex_lock (&lock_stats);
stats.outDirectMsg += (msg->cMsg.to == dev) ? 1:0;
pthread_mutex_unlock (&lock_stats);
}
//! @}

+ 31
- 15
src/core.h Datei anzeigen

@@ -7,38 +7,54 @@
#ifndef __core__
#define __core__
#include <netinet/in.h>
#include "msg_impl.h"
extern msgList_t msgList;
device_t addr2device (uint32_t in_addr);
uint32_t device2addr (const device_t* dev);
device_t ip2device (devIP_t* ip);
devIP_t device2ip (const device_t* dev);
devIP_t addr2ip (uint32_t in_addr);
devAEM_t addr2devAEM (uint32_t in_addr);
in_addr_t devAEM2addr (devAEM_t dev);
devAEM_t ip2AEM (devIP_t* ip);
devIP_t AEM2ip (devAEM_t dev);
devIP_t addr2ip (in_addr_t in_addr);
void cMsg_make (cMsg_t* msg);
status_t cMsg_parse (cMsg_t* cMsg, char_t* rawMsg, size_t size);
uint32_t cMsg_getFrom (cMsg_t* cMsg);
uint32_t cMsg_getTo (cMsg_t* cMsg);
uint64_t cMsg_getTs (cMsg_t* cMsg);
char_t* cMsg_getText (cMsg_t* cMsg);
bool cMsg_equal (cMsg_t* m1, cMsg_t* m2);
bool_t cMsg_equal (cMsg_t* m1, cMsg_t* m2);
void msg_init (msg_t* msg);
//device_t* devList_get (msgList_t* this, mIter_t it);
//device_t* devList_getNext (device_t* d);
status_t devList_add (msgList_t* this, mIter_t it, device_t* dev);
//void devList_free (msgList_t* this, mIter_t it);
dIter_t devList_getIter (devAEM_t dev);
mIter_t msgList_preInc (mIter_t* it);
mIter_t msgList_preDec (mIter_t* it);
status_t msgList_init (msgList_t* msgList);
mIter_t msgList_begin (msgList_t* this);
mIter_t msgList_last (msgList_t* this);
size_t msgList_size (msgList_t* this);
mIter_t msgList_find (msgList_t* this, msg_t* msg);
void msgList_add (msgList_t* this, msg_t* msg);
mIter_t msgList_add (msgList_t* this, msg_t* msg);
void msgList_acquire ();
void msgList_release ();
status_t stats_init (stats_t* s);
void statsUpdateCreate (msg_t* msg);
void statsUpdateIn (msg_t* msg, bool_t dup);
void statsUpdateOut (msg_t* msg, devAEM_t dev);
status_t log_init (void);
void log_msg_io (msg_t* msg);
void log_msg_new (msg_t* msg);
void log_debug (const char *fmt, ...);
void log_error (const char *fmt, ...);
void log_msg_io (msg_t* msg);
void log_msg_new (msg_t* msg);
void log_debug (const char *fmt, ...);
void log_error (const char *fmt, ...);
#endif /* __core__ */

+ 79
- 55
src/listener.c Datei anzeigen

@@ -10,103 +10,127 @@
#include <unistd.h>
#include <string.h>
#include <stdio.h>
//#include <pthread.h>
#include <stdlib.h>
#include "listener.h"
static void srv_action (device_t* d, char_t* buffer, size_t size) {
msg_t msg = { .sender =*d, .recipients =NULL };
cMsg_parse (&msg.cMsg, buffer, size);
char_t rx_buffer[4*MSG_TEXT_SIZE]; //!< receive buffer
mIter_t myCopy = msgList_find (&msgList, &msg);
/*!
* Incoming message handling
* @param d Pointer to sender/client's device
* @param buffer Buffer to message
* @param size Size of message
*/
static void listen_handler (devAEM_t dev, char_t* buffer, size_t size) {
msg_t msg;
msg_init (&msg);
cMsg_parse (&msg.cMsg, buffer, size); // parse the message
log_debug("Debug: Message parsed\n");
msgList_acquire ();
mIter_t myCopy = msgList_find (&msgList, &msg); // try to find message in msgList
if (myCopy == -1) {
// I don't have a copy after all
// We don't have a copy, accept and store it
msgList_add (&msgList, &msg);
log_msg_io (&msg); // semaphore
statsUpdateIn (&msg, false); // message process
log_msg_io (&msg); // log
}
else {
// We have a copy
// Do not forward a duplicate message to sender, he already has it
devList_add (&msgList, myCopy, &msg.sender);
#ifndef NO_DEBUG
log_debug("Debug: Duplicate message from: %d\n", msg.sender.id);
#endif
msgList.m[myCopy].recipients[devList_getIter (dev)] = true;
statsUpdateIn (&msg, true); // message process
log_debug("Debug: Duplicate message from: %d\n", msg.sender);
}
msgList_release ();
}
status_t listener() {
/*!
* Listener. This function normally never returns. If it does it is to
* report unrecoverable error.
* @return Error creating server side socket
*/
static status_t listener(void) {
int srvSock;
struct sockaddr_in srvAddPort;
char_t buffer[256];
sockaddr_in_t srvAddPort;
// Create socket
memset(&srvAddPort, 0, sizeof(srvAddPort));
srvAddPort.sin_family= AF_INET;
srvAddPort.sin_port= htons(settings.port);
srvAddPort.sin_addr.s_addr = htonl(INADDR_ANY);
// try to create tcp socket
if ((srvSock = socket (PF_INET, SOCK_STREAM, 0)) == -1) {
log_error ("Error: Can not create socket\n");
return MSG_ERROR;
goto _list_error1;
}
#ifndef NO_DEBUG
log_debug("Debug: Socket for listening created\n");
#endif
if(bind(srvSock, (struct sockaddr *) &srvAddPort, sizeof(srvAddPort)) == -1) {
log_debug("Debug: Socket for listening created\n");
// Try to bind socket
if(bind(srvSock, (sockaddr_t *) &srvAddPort, sizeof(srvAddPort)) == -1) {
log_error ("Error: Can not bind socket to port %d\n", settings.port);
close (srvSock);
return MSG_ERROR;
goto _list_error0;
}
// try to setup listener. We use DEVICE_LIST_SIZE for pending request
if (listen (srvSock, DEVICE_LIST_SIZE) == -1) {
log_error ("Error: Can not enable socket\n");
close (srvSock);
return MSG_ERROR;
goto _list_error0;
}
#ifndef NO_DEBUG
log_debug("Debug: Listening on [0.0.0.0], port %d\n", settings.port);
#endif
log_debug("Debug: Listening on [0.0.0.0], port %d\n", settings.port);
while (1) {
struct sockaddr_in clntAddr;
size_t clntLen= sizeof(clntAddr);
sockaddr_in_t clntAddr;
socklen_t clntLen= sizeof(clntAddr);
int clntSock;
int rcvBytes;
if ((clntSock = accept (srvSock, (struct sockaddr *)&clntAddr, (socklen_t*)&clntLen)) == -1) {
close (srvSock);
// block in accept and get client's connection socket
if ((clntSock = accept (srvSock, (sockaddr_t *)&clntAddr, &clntLen)) == -1) {
continue;
}
#ifndef NO_DEBUG
devIP_t ip = addr2ip(ntohl(clntAddr.sin_addr.s_addr));
log_debug ("Debug: Connection from %u.%u.%u.%u port:%u received\n",
ip.A, ip.B, ip.C, ip.D, clntAddr.sin_port
);
#endif
if ((rcvBytes = recv(clntSock, buffer, sizeof(buffer), 0)) < 0) {
// make a device from clients address
devAEM_t dev = addr2devAEM (ntohl(clntAddr.sin_addr.s_addr));
devIP_t ip = AEM2ip (dev);
log_debug (
"Debug: Connection from %u.%u.%u.%u port:%u received\n",
ip.A, ip.B, ip.C, ip.D, clntAddr.sin_port
);
// Block in receive, after the connection is accepted
if ((rcvBytes = recv(clntSock, rx_buffer, sizeof(rx_buffer), 0)) < 0) {
log_error ("Error: Fail to receive from socket\n");
close (srvSock);
close (clntSock);
continue;
}
buffer[rcvBytes] = '\0';
close(clntSock);
device_t dev = addr2device (ntohl(clntAddr.sin_addr.s_addr));
#ifndef NO_DEBUG
log_debug ("Debug: Message from %u.%u.%u.%u port:%u received\n");
#endif
srv_action (&dev, buffer, rcvBytes);
rx_buffer[rcvBytes] = '\0'; // add null termination to buffer
close(clntSock); // close the client socket
// call listen handler for the device and message
log_debug (
"Debug: Message from %u.%u.%u.%u port:%u received\n",
ip.A, ip.B, ip.C, ip.D, clntAddr.sin_port
);
listen_handler (dev, rx_buffer, rcvBytes);
}
close(srvSock);
#ifndef NO_DEBUG
log_debug ("Debug: Socket for listening closed\n");
#endif
return MSG_OK;
// Local error return handling
_list_error0:
close(srvSock); // server socket clean up
log_debug ("Debug: Socket for listening closed\n");
_list_error1:
return MSG_ERROR; // report back
}
void* thListener(void* ptr) {
/*!
* pthread listener wrapper
* @param ptr Not used
*/
void* pthListener(void* ptr) {
(void)&ptr; // use parameter
listener();
if (listener() == MSG_ERROR)
exit(1); // this is a deal breaker sorry.
return NULL;
}

+ 1
- 2
src/listener.h Datei anzeigen

@@ -11,7 +11,6 @@
#include "core.h"
#include "msg_impl.h"
status_t listener();
void* thListener(void* ptr);
void* pthListener(void* ptr);
#endif /* __listener_h__ */

+ 85
- 215
src/main.c Datei anzeigen

@@ -1,231 +1,101 @@
#include <stdio.h>
#include <stdlib.h>
#include <getopt.h>
#include <pthread.h>
#include "listener.h"
#include "client.h"
#include "msg_impl.h"
settings_t settings_init (settings);
//status_t client (char_t* ip) {
// int sockid;
// struct sockaddr_in srvAddPort;
//// struct sockaddr_in clntAddr;
//// char buffer[256];
//
// memset(&srvAddPort, 0, sizeof(srvAddPort));
// srvAddPort.sin_family= AF_INET;
// srvAddPort.sin_port= htons(2288);
// srvAddPort.sin_addr.s_addr = inet_addr(ip);
//
// if ((sockid = socket (PF_INET, SOCK_STREAM, 0)) == -1)
// return MSG_ERROR;
//
// if (connect(sockid, (struct sockaddr*)&srvAddPort, sizeof(srvAddPort)) == -1) {
// close (sockid);
// return MSG_ERROR;
// }
//
// if (send(sockid, sms, strlen(sms), MSG_CONFIRM) == -1) {
// close(sockid);
// return MSG_ERROR;
// }
//
// close(sockid);
// return MSG_OK;
//}
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/ip_icmp.h>
#include <stdio.h>
#include <unistd.h>
//// make a ping request
//void send_ping(int ping_sockfd, struct sockaddr_in *ping_addr,
// char *ping_dom, char *ping_ip, char *rev_host)
//{
// int ttl_val=64, msg_count=0, i, addr_len, flag=1,
// msg_received_count=0;
//
// struct ping_pkt pckt;
// struct sockaddr_in r_addr;
// struct timespec time_start, time_end, tfs, tfe;
// long double rtt_msec=0, total_msec=0;
// struct timeval tv_out;
// int failure_cnt= 0;
// int cnt;
//
// tv_out.tv_sec = RECV_TIMEOUT;
// tv_out.tv_usec = 0;
//
// clock_gettime(CLOCK_MONOTONIC, &tfs);
//
//
// // set socket options at ip to TTL and value to 64,
// // change to what you want by setting ttl_val
// if (setsockopt(ping_sockfd, SOL_IP, IP_TTL,
// &ttl_val, sizeof(ttl_val)) != 0)
// {
// printf("\nSetting socket options to TTL failed!\n");
// return;
// }
//
// else
// {
// printf("\nSocket set to TTL..\n");
// }
//
// // setting timeout of recv setting
// setsockopt(ping_sockfd, SOL_SOCKET, SO_RCVTIMEO,
// (const char*)&tv_out, sizeof tv_out);
//
// // send icmp packet in an infinite loop
// while(pingloop)
// {
// // flag is whether packet was sent or not
// flag=1;
//
// //filling packet
// bzero(&pckt, sizeof(pckt));
//
// pckt.hdr.type = ICMP_ECHO;
// pckt.hdr.un.echo.id = getpid();
//
// for ( i = 0; i < sizeof(pckt.msg)-1; i++ )
// pckt.msg[i] = i+'0';
//
// pckt.msg[i] = 0;
// pckt.hdr.un.echo.sequence = msg_count++;
// pckt.hdr.checksum = checksum(&pckt, sizeof(pckt));
//
//
// usleep(PING_SLEEP_RATE);
//
// //send packet
// clock_gettime(CLOCK_MONOTONIC, &time_start);
// if ( sendto(ping_sockfd, &pckt, sizeof(pckt), 0,
// (struct sockaddr*) ping_addr,
// sizeof(*ping_addr)) <= 0)
// {
// printf("\nPacket Sending Failed!\n");
// flag=0;
// }
//
// //receive packet
// addr_len=sizeof(r_addr);
//REC:
// cnt = recvfrom(ping_sockfd, &pckt, sizeof(pckt), 0,
// (struct sockaddr*)&r_addr, &addr_len);
// if ( cnt <= 0 )
// {
// printf("\nPacket receive failed!\n");
// failure_cnt++;
// if(failure_cnt > 5){
// break;
// }
// }
//
// else
// {
// clock_gettime(CLOCK_MONOTONIC, &time_end);
//
// double timeElapsed = ((double)(time_end.tv_nsec -
// time_start.tv_nsec))/1000000.0;
// rtt_msec = (time_end.tv_sec-
// time_start.tv_sec) * 1000.0
// + timeElapsed;
//
// // if packet was not sent, don't receive
// if(flag)
// {
// struct icmp* icmp_hdr;
// if (cnt >= 76) {
// struct iphdr *iphdr = (struct iphdr *) &pckt;
// /* skip ip hdr */
// icmp_hdr = (struct icmp *) (((char* )&pckt) + (iphdr->ihl << 2));
// }
// if(icmp_hdr->icmp_type == ICMP_ECHO){
// goto REC;
// }
//
// if(!(icmp_hdr->icmp_type !=ICMP_ECHOREPLY) )
// {
// printf("Error..Packet received with ICMP"
// "type %d code %d\n",
// icmp_hdr->icmp_type, icmp_hdr->icmp_code);
// }
// else
// {
// printf("%d bytes from %s (h: %s)"
// "(%s) msg_seq=%d ttl=%d "
// "rtt = %Lf ms.\n",
// PING_PKT_S, ping_dom, rev_host,
// ping_ip, msg_count,
// ttl_val, rtt_msec);
//
// msg_received_count++;
// }
// }
// }
// }
// clock_gettime(CLOCK_MONOTONIC, &tfe);
// double timeElapsed = ((double)(tfe.tv_nsec -
// tfs.tv_nsec))/1000000.0;
//
// total_msec = (tfe.tv_sec-tfs.tv_sec)*1000.0+
// timeElapsed;
//
// printf("\n===%s ping statistics===\n", ping_ip);
// printf("\n%d packets sent, %d packets received, %f percent "
// "packet loss. Total time: %Lf ms.\n\n",
// msg_count, msg_received_count,
// ((msg_count - msg_received_count)/msg_count) * 100.0,
// total_msec);
//}
/*!
* Global data
*/
//! @{
settings_t settings_init (settings);
devList_t devList_init (devList[AEMLIST_SIZE]);
stats_t stats;
//! @}
/*!
* CLI short options
*/
const char *short_opt = "nd:i:l:p:s:w:th";
/*!
* CLI long options
*/
const struct option long_opt[] = {
{"port", required_argument, NULL, 'n'},
{"duration", required_argument, NULL, 'd'},
{"interval", required_argument, NULL, 'i'},
{"outlevel", required_argument, NULL, 'l'},
{"pingtimeout",required_argument, NULL, 'p'},
{"sendtimeout",required_argument, NULL, 's'},
{"who", required_argument, NULL, 'w'},
{"tracktime", no_argument, NULL, 't'},
{"help", no_argument, NULL, 'h'},
{NULL, 0, NULL, 0}
};
/*!
* \brief
* Parse input argument and fill the kcli_input_t struct
* \param in Pointer to \ref kcli_input_t structure to fill
* \param argc The argument count as passed to the main()
* \param argv Argument array as passed to the main()
* \return The status of the operation
* \arg 0 Success
* \arg 1 Fail
*/
int parse_args (settings_t *s, int argc, char const *argv[]) {
int c;
while ((c = getopt_long (argc, (char *const *)argv, short_opt, long_opt, NULL)) != -1) {
switch (c) {
case -1: /* no more arguments */
case 0: /* long options toggles */
break;
case 'n': s->port = atoi(optarg); break;
case 'd': s->duration = atoi (optarg); break;
case 'i': s->msgInterval = atoi (optarg); break;
case 'l':
s->outLevel = atoi (optarg);
if (s->outLevel >= OUTLEVEL_2) s->outLevel = OUTLEVEL_2;
if (s->outLevel < OUTLEVEL_0) s->outLevel = OUTLEVEL_0;
break;
case 'p': s->pingTimeout = atoi (optarg); break;
case 's': s->sendTimeout.tv_sec = atoi (optarg); break;
case 'w': s->me = atoi (optarg); break;
case 't': s->trackTime = true; break;
case 'h': printf ("This will be the help text\n");
case ':':
default:
case '?':
fprintf (stderr, "%s: invalid option -- %c\n", argv[0], c);
fprintf (stderr, "Try `%s --help' for more information.\n", argv[0]);
exit(1);
}
}
return 0;
}
int main(int argc, char const *argv[]) {
pthread_t tL;
pthread_create( &tL, NULL, thListener, NULL);
parse_args (&settings, argc, argv);
log_init ();
stats_init (&stats);
msgList_init (&msgList);
bool b =true;
while (b) {
device_t d = { 0, NULL };
d.id = 43;
if (ping (&d)) printf ("found: 10.0.0.43\n");
else printf ("not found: 10.0.0.43\n");
d.id = 8997;
if (ping (&d)) printf ("found: 10.0.89.97\n");
else printf ("not found 10.0.89.97\n");
d.id = 7;
if (ping (&d)) printf ("found: 10.0.0.7\n");
else printf ("not found: 10.0.0.7\n");
d.id = 1;
if (ping (&d)) printf ("found: 10.0.0.1\n");
else printf ("not found: 10.0.0.1\n");
ping (NULL);
sleep (10);
}
// Create threads
pthread_t ptL, ptC;
pthread_create (&ptL, NULL, pthListener, NULL);
pthread_create (&ptC, NULL, pthClient, NULL);
while (b) { }
pthread_join( tL, NULL);
// block here
pthread_join (ptL, NULL);
pthread_join (ptC, NULL);
return 0;
}

+ 81
- 44
src/msg_impl.h Datei anzeigen

@@ -12,32 +12,48 @@
#include <stdint.h>
#include <stdbool.h>
#include <time.h>
#include <sys/time.h>
#include <netinet/ip_icmp.h>
#include <netinet/in.h>
/*!
* AEM list
*/
#define AEMLIST_SIZE (9)
#define devList_init(l) l = { \
{ 10, 0}, \
{ 43, 0}, \
{ 7200, 0}, \
{ 7300, 0}, \
{ 8000, 0}, \
{ 8765, 0}, \
{ 8844, 0}, \
{ 8855, 0}, \
{ 8997, 0} \
}
/*!
* General options
*/
//! @{
#define LISTENER_PORT 2288 //!< The server port
#define MSG_TEXT_SIZE 256 //!< Maximum size of each message
#define MSG_LIST_SIZE 2000 //!< Maximum size of message history buffer
#define DEVICE_LIST_SIZE 100 //!< Maximum size of the device list
#define MSG_DELIMITER '_' //!< Message delimiter
#define ADHOC_NET_A 10 //!< [A.B.C.D]
#define ADHOC_NET_B 0
#define ADHOC_NET_A 192 //!< [A.B.C.D]
#define ADHOC_NET_B 168
#define ADHOC_NET_C 0
#define ADHOC_NET_D 0
//#define NO_DEBUG 1
#define MESSAGE_BODY "The ships hung in the sky in much the same way that bricks don't!"
//! @}
/*!
* Helper macros
*/
#define _MK_ADHOC_SUBNET(A, B, C, D) (((A)<<24) | ((B)<<16) | ((C)<<8) | (D))
#define _adhoc_subnet _MK_ADHOC_SUBNET(ADHOC_NET_A, ADHOC_NET_B, ADHOC_NET_C, ADHOC_NET_D)
/*!
* Messenger types
*/
@@ -54,28 +70,32 @@ typedef enum {
typedef bool bool_t; //!< Boolean type
typedef char char_t; //!< Application wide character type
typedef int32_t iter_t; //!< General iterator type
typedef uint32_t aem_t; //!< AEM data type
typedef int64_t tstamp_t; //!< UNIX time in 64 bit wide signed integer
typedef aem_t devAEM_t; //!< device as AEM type
/*!
* IP wrapper type
* device as IP type
*/
typedef struct {
uint16_t A, B, C, D;
}devIP_t;
typedef double fpdata_t; //!< Select floating point data type for the application
// Syntactic sugar types
typedef struct sockaddr_in sockaddr_in_t; //!< internet socket address type definition
typedef struct sockaddr sockaddr_t; //!< general socket address type definition
typedef struct timeval timeval_t;
/*!
* A RTES node device representation
* \note
* Objects of this type are also acting as fwd list nodes.
* AEM list for our mesh network
*/
typedef struct device {
aem_t id; //!< AEM of the device
struct device* next; //!< link to the next linked device on the chain
} device_t;
typedef device_t* devList_t; //!< device list alias
typedef struct {
devAEM_t dev;
bool_t onRange;
} devList_t;
extern devList_t devList[];
/*!
* \brief
@@ -91,11 +111,11 @@ typedef device_t* devList_t; //!< device list alias
* \sa cMsg_getText() used as text getter
*/
typedef struct {
aem_t from; //!< sender's AEM
aem_t to; //!< destination AEM
devAEM_t from; //!< sender's AEM
devAEM_t to; //!< destination AEM
tstamp_t ts; //!< UNIX timestamp compatible
size_t text; //!< text offset
char_t msg[MSG_TEXT_SIZE]; //!< The actual message stream
size_t text_it; //!< text offset
char_t text[MSG_TEXT_SIZE]; //!< The actual message stream
} cMsg_t;
@@ -106,12 +126,15 @@ typedef struct {
* This type
*/
typedef struct {
device_t sender; //!< The sender's device
devList_t recipients; //!< List of all devices the message has reach
cMsg_t cMsg; //!< actual message payload
devAEM_t sender; //!< The sender's device
bool_t recipients[AEMLIST_SIZE]; //!< List of all devices the message has reached.
//! Used as pair mapped in devList array, so each slot here corresponds in
//! the same AEM in devList.
cMsg_t cMsg; //!< actual message payload
} msg_t;
typedef int32_t mIter_t; //!< message list iterator type
typedef iter_t mIter_t; //!< message list iterator type
typedef iter_t dIter_t; //!< device list iterator type
/*!
* \brief Message list
@@ -135,9 +158,10 @@ typedef int32_t mIter_t; //!< message list iterator type
* Layout example:
*
* msgList.m
* [ 0 ] --> [devx] --> [devy] -- >0
* | [ 1 ] --> [devy] -- > 0
* time | [ 2 ] --> 0
* dev1 dev2 dev3 ...
* [ 0 ] [ ] [x] [ ] <-- x marks "message has been send"
* | [ 1 ] [x] [x] [ ] <-- x marks "message has been send"
* time | [ 2 ]
* [*1] | [ 3 ] ...
* \|/
* ...
@@ -149,25 +173,13 @@ typedef int32_t mIter_t; //!< message list iterator type
*/
typedef struct {
msg_t m[MSG_LIST_SIZE]; //!< The actual data representation
mIter_t first; //!< A ring buffer iterator for begin()
mIter_t last; //!< A ring buffer iterator marking the last item on .m
size_t size;
} msgList_t;
//! @}
/*!
* Client types
*/
//! @{
#define PING_PKT_S (64)
#define PING_MSG_S (PING_PKT_S - sizeof(struct icmphdr))
typedef struct {
struct icmphdr hdr;
char msg[PING_MSG_S];
}ping_pkt_t;
//! @}
@@ -176,6 +188,23 @@ typedef struct {
*/
//! @{
/*!
* Statistical data type
*/
typedef struct {
uint32_t totalMsg; //!< Total messages processed (both incoming and created)
uint32_t duplicateMsg; //!< Incoming duplicate messages
uint32_t forMeMsg; //!< Incoming messages for me
uint32_t myMsg; //!< Messages created by me
uint32_t inDirectMsg; //!< Incoming messages created by the sender for me
uint32_t outDirectMsg; //!< Outgoing messages from me for the recipient
fpdata_t avMsgSize; //!< average message payload size
time_t avTimeToMe; //!< average time to me
} stats_t;
extern stats_t stats;
typedef enum {
OUTLEVEL_0, //!< Output only results [default]
OUTLEVEL_1, //!< Output results and every message also
@@ -184,19 +213,27 @@ typedef enum {
typedef struct {
devAEM_t me;
uint16_t port;
time_t duration;
time_t msgInterval;
outLevel_en outLevel;
time_t pingTimeout;
timeval_t sendTimeout;
bool_t trackTime;
}settings_t;
extern settings_t settings;
#define settings_init(s) s = { \
.me = 8997, \
.port = 2288, \
.duration = 7200, \
.msgInterval = 60, \
.outLevel = OUTLEVEL_1 \
.msgInterval = 2, \
.outLevel = OUTLEVEL_2, \
.pingTimeout = 1, \
.sendTimeout = {5, 0}, \
.trackTime = false \
}
//! @}


Laden…
Abbrechen
Speichern