#include #include #include #include #include #include #include #include #include #include #include static char rcsid[] = "$Id: networks_hwp1.c,v 1.1 2002/02/05 07:58:18 surendar Exp $"; typedef struct identification { char name[32]; /* Name of the current client */ // Specify how we can be contacted. in_addr_t location; /* IP address of the client */ in_port_t port; /* port where the client is listening */ // Internal data structures in_port_t rtt_port; struct timeval tv; } identification_t; identification_t myId; typedef struct ids { identification_t *id; time_t lastTime; /* Last time we heard from this peer. For garbage collection */ struct ids *nextptr; } ids_t; struct ids *knownId = NULL; pthread_mutex_t idMutex = PTHREAD_MUTEX_INITIALIZER; /* Lock */ /* Data structures for the key:tuple service */ struct tuples { char *key; char *value; struct tuples *nextptr; } *tupleSet; int mySocket; /* Socket for providing service */ /* Routing table to maintain RTT */ struct rttable { int count; in_addr_t host; int rtt; struct rttable *nextptr; } *routes; pthread_mutex_t rtMutex = PTHREAD_MUTEX_INITIALIZER; #define MCAST_PORT 5665 #define MCAST_ADDR "" /* How often the identification structure is multicast (in secs) */ #define BEAT 1 /* Useful utility function */ char *Inet_ntop(unsigned int in) { static char buf[1024]; inet_ntop(AF_INET, &in, buf, 1024); return buf; } // The sending part of the peer location system. This thread garbage // collects and sends the local identification structure every BEAT // seconds void *locatePeersSend(void *arg) { struct sockaddr_in s_sa; struct ids *ptr, *prevPtr; struct tm *tm; struct rttable *tmpRttable; time_t curTime; int sendfd; int on; /* Create the socket for sending my identification_t */ sendfd = socket(AF_INET, SOCK_DGRAM, 0); bzero((void *) &s_sa, sizeof(s_sa)); s_sa.sin_family = AF_INET; s_sa.sin_port = htons(MCAST_PORT); s_sa.sin_addr.s_addr = inet_addr(MCAST_ADDR); #ifndef sparc /* Don't want to receive my own identification_t */ on = 0; if (setsockopt(sendfd, IPPROTO_IP, IP_MULTICAST_LOOP, &on, sizeof(on)) < 0) { fprintf(stderr, "FATAL: Line %d. Cannot setsockopt for IP_MULTICAST_LOOP\n", __LINE__); exit(1); } #endif /* sparc */ while (1) { curTime = time(NULL); // lock shared data structures pthread_mutex_lock(&idMutex); ptr = knownId; prevPtr = NULL; while(ptr) { tm = localtime(&curTime); if (ptr->lastTime < (curTime - 3*BEAT)) { /* Have we received any packet within the last 3 ticks */ pthread_mutex_lock(&rtMutex); tmpRttable = routes; while(tmpRttable) { if (tmpRttable->host == ptr->id->location) { tmpRttable->count--; break; } else tmpRttable = tmpRttable->nextptr; } printf("%02d/%02d/%04d %02d:%02d:%d '%s' LEAVE %s:%d %d microsec [%d]\n", tm->tm_mon+1, tm->tm_mday, 1900+tm->tm_year, tm->tm_hour, tm->tm_min, tm->tm_sec, ptr->id->name, Inet_ntop(ptr->id->location), ntohs(ptr->id->port), (tmpRttable) ? tmpRttable->rtt : -1, (tmpRttable) ? (tmpRttable->count) : 0); pthread_mutex_unlock(&rtMutex); if (prevPtr == NULL) { free(knownId); knownId = NULL; break; } else { prevPtr->nextptr = ptr->nextptr; free(ptr); } } else { /* We did hear something within 3*BEAT */ pthread_mutex_lock(&rtMutex); tmpRttable = routes; while(tmpRttable) { if (tmpRttable->host == ptr->id->location) break; tmpRttable = tmpRttable->nextptr; } printf("%02d/%02d/%04d %02d:%02d:%d '%s' MAINTAIN %s:%d %d microsec [%d]\n", tm->tm_mon+1, tm->tm_mday, 1900+tm->tm_year, tm->tm_hour, tm->tm_min, tm->tm_sec, ptr->id->name, Inet_ntop(ptr->id->location), ntohs(ptr->id->port), tmpRttable->rtt, tmpRttable->count); pthread_mutex_unlock(&rtMutex); } if (ptr == NULL) break; ptr = ptr->nextptr; } //UNLOCK pthread_mutex_unlock(&idMutex); gettimeofday(&(myId.tv), NULL); // microuptime(&(myId.tv)); if (sendto(sendfd, &myId, sizeof(myId), 0, (struct sockaddr*) &s_sa, sizeof(s_sa)) < 0) { fprintf(stderr, "FATAL: Line %d. Cannot send my identity\n", __LINE__); exit(1); } sleep(BEAT); } } // Thread that receives heart beats from other peers. Once we receive // a multicast message, we send a ack back (which can also be used to // measure RTT delay void *locatePeersRecv(void *arg) { struct sockaddr_in r_sa, rtt_addr; struct ip_mreq mreq; identification_t id, *tmpid; struct ids *ptr, *tmpids; struct tm *tm; struct rttable *tmpRttable; time_t curTime; int recvfd, sendfd; int on; /* Create the socket for receiving messages from other beacons and sending the ACK back (for RTT calculation) */ recvfd = socket(AF_INET, SOCK_DGRAM, 0); sendfd = socket(AF_INET, SOCK_DGRAM, 0); on = 1; if (setsockopt(recvfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) { fprintf(stderr, "FATAL: Line %d. Setsockopt failed for SO_REUSERADDR\n", __LINE__); exit(1); } /* Join the multicast group */ mreq.imr_multiaddr.s_addr = inet_addr(MCAST_ADDR); mreq.imr_interface.s_addr = htonl(INADDR_ANY); if (setsockopt(recvfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { fprintf(stderr, "FATAL: Line %d. Setsockopt failed for IP_ADD_MEMBERSHIP\n", __LINE__); exit(1); } /* Bind to the multicast port */ bzero((void *) &r_sa, sizeof(r_sa)); r_sa.sin_family = AF_INET; r_sa.sin_port = htons (MCAST_PORT); r_sa.sin_addr.s_addr = inet_addr(MCAST_ADDR); if (bind(recvfd, (struct sockaddr *) &r_sa, sizeof(r_sa)) < 0) { fprintf(stderr, "FATAL: Line %d. bind failed\n", __LINE__); exit(1); } while (1) { if(recv(recvfd, &id, sizeof(identification_t), 0) < 0) { fprintf(stderr, "FATAL: Line %d. Receiving packet failed\n", __LINE__); exit(1); } if (id.rtt_port) { // Send RTT probe back bzero((void *) &rtt_addr, sizeof(rtt_addr)); rtt_addr.sin_family = AF_INET; bcopy(&(id.location), (void *) &(rtt_addr.sin_addr.s_addr), sizeof(struct sockaddr_in)); rtt_addr.sin_port = (id.rtt_port); if (sendto(sendfd, &id, sizeof(id), 0, (struct sockaddr*) &rtt_addr, sizeof(rtt_addr)) < 0) { fprintf(stderr, "FATAL: Line %d. Sending RTT probe failed\n", __LINE__); exit(1); } } // LOCK pthread_mutex_lock(&idMutex); ptr = knownId; while (ptr) { if (((ptr->id)->location == id.location) && ((ptr->id)->port == id.port)) { if (strcmp((ptr->id)->name, id.name) != 0) strcpy((ptr->id)->name, id.name); ptr->lastTime = time(NULL); break; } ptr = ptr->nextptr; } if (ptr == NULL) { /* Not already there, create a new entry */ tmpid = (identification_t *) malloc(sizeof (identification_t)); memcpy(tmpid, &id, sizeof(identification_t)); tmpids = (struct ids *) malloc (sizeof(struct ids)); tmpids->id = tmpid; tmpids->lastTime = time(NULL); tmpids->nextptr = knownId; knownId = tmpids; pthread_mutex_unlock(&idMutex); curTime = time(NULL); tm = localtime(&curTime); pthread_mutex_lock(&rtMutex); tmpRttable = routes; while(tmpRttable) { if (tmpRttable->host == id.location) break; tmpRttable = tmpRttable->nextptr; } printf("%02d/%02d/%04d %02d:%02d:%d '%s' ENTER %s:%d %d microsec [%d]\n", tm->tm_mon+1, tm->tm_mday, 1900+tm->tm_year, tm->tm_hour, tm->tm_min, tm->tm_sec, id.name, Inet_ntop(id.location), ntohs(id.port), (tmpRttable) ? tmpRttable->rtt : -1, (tmpRttable) ? (tmpRttable->count) : 0); pthread_mutex_unlock(&rtMutex); } else pthread_mutex_unlock(&idMutex); // New beacon. See if we need to create/update a route entry if (ptr == NULL) { pthread_mutex_lock(&rtMutex); tmpRttable = routes; while(tmpRttable) { if (tmpRttable->host == id.location) { tmpRttable->count++; break; } else tmpRttable = tmpRttable->nextptr; } if (tmpRttable == NULL) { /* New entry */ tmpRttable = (struct rttable *)malloc (sizeof( struct rttable)); tmpRttable->count=1; tmpRttable->host=id.location; tmpRttable->rtt=0; tmpRttable->nextptr = routes; routes = tmpRttable; } pthread_mutex_unlock(&rtMutex); } } } /* Thread that responds to queries for key:value tuple */ void *serviceRequest(void *_new_fd) { char buffer[1024], *ptr, *pptr, outbuf[32], key[128], value[256]; int new_fd = *((int *)_new_fd); struct tuples *curTuple, *tmpTuple; write(new_fd, "READY\n",6); while(1) { ptr = buffer; while(read(new_fd, ptr, 1) > 0) { if (*ptr == '\r' || *ptr == '\n') { *ptr = '\0'; break; } ptr++; } ptr = buffer; if (*ptr == '\0') continue; // Simple command parser if (strncasecmp(ptr, "SET", 3) == 0) { // process set command ptr+=3; while (isspace(*ptr)) ptr++; pptr = key; while (isalnum(*ptr)) *pptr++ = *ptr++; *pptr++ = '\0'; while (isspace(*ptr)) ptr++; pptr = value; while (isalnum(*ptr)) *pptr++ = *ptr++; *pptr++ = '\0'; tmpTuple = tupleSet; while (tmpTuple) { if (strcmp(tmpTuple->key, key) == 0) { free(tmpTuple->value); tmpTuple->value = strdup(value); break; } tmpTuple = tmpTuple->nextptr; } if (tmpTuple == NULL) { curTuple = (struct tuples *) malloc (sizeof (struct tuples)); curTuple->key = strdup(key); curTuple->value = strdup(value); curTuple->nextptr = tupleSet; tupleSet = curTuple; } sprintf(outbuf, "OK SET %s:%s\n----\n", key, value); write(new_fd, outbuf, strlen(outbuf)); continue; } if (strncasecmp(ptr, "GET", 3) == 0) { // process get command ptr+=3;while (isspace(*ptr)) ptr++; pptr = key; while (isalnum(*ptr)) *pptr++ = *ptr++; *pptr++ = '\0'; sprintf(outbuf, "OK GET %s\n", key); write(new_fd, outbuf, strlen(outbuf)); tmpTuple = tupleSet; while (tmpTuple) { if (strcmp(tmpTuple->key, key) == 0) { sprintf(outbuf, "%s\n----\n", tmpTuple->value); write(new_fd, outbuf, strlen(outbuf)); break; } tmpTuple = tmpTuple->nextptr; } if (tmpTuple == NULL) { sprintf(outbuf, "ERROR key not found\n----\n"); write(new_fd, outbuf, strlen(outbuf)); } continue; } if (strncasecmp(ptr, "LIST", 4) == 0) { // process list command ptr+=4; while (isspace(*ptr)) ptr++; pptr = key; while (isalnum(*ptr)) *pptr++ = *ptr++; *pptr++ = '\0'; sprintf(outbuf, "OK LIST %s\n", key); write(new_fd, outbuf, strlen(outbuf)); tmpTuple = tupleSet; while (tmpTuple) { sprintf(outbuf, "%s\n", tmpTuple->key); write(new_fd, outbuf, strlen(outbuf)); tmpTuple = tmpTuple->nextptr; } write(new_fd, "----\n", 5); continue; } write(new_fd, "PARDON?\n----\n", 13); } // Unreached } /* Thread that maintains the route trip time, sort of like a routing table */ void *RTT(void *arg) { struct sockaddr_in myAddr, rttPeer; identification_t id; struct timeval curTime; struct rttable *tmpRttable; int rttfd; int len, rttPeerLen; rttfd = socket(AF_INET, SOCK_DGRAM, 0); bzero((void *) &myAddr, sizeof(myAddr)); myAddr.sin_family = AF_INET; myAddr.sin_addr.s_addr = htonl(INADDR_ANY); myAddr.sin_port = 0; /* Some port */ if (bind(rttfd,(struct sockaddr *) &myAddr, sizeof(myAddr)) < 0) { fprintf(stderr, "Cannot bind socket. Another server running?\n"); exit(1); } listen(rttfd, 5); /* Find the assigned port */ len = sizeof(myAddr); if (getsockname(rttfd, (struct sockaddr *)&myAddr, &len) < 0) { fprintf(stderr, "Getsockname failed\n"); exit(1); } myId.rtt_port = myAddr.sin_port; while (1) { rttPeerLen = sizeof(rttPeer); if(recvfrom(rttfd, &id, sizeof(identification_t), 0, (struct sockaddr *)&rttPeer, &rttPeerLen) > 0) { gettimeofday(&curTime, NULL); pthread_mutex_lock(&rtMutex); tmpRttable = routes; while (tmpRttable) { if (tmpRttable->host == rttPeer.sin_addr.s_addr) { tmpRttable->rtt = (((curTime.tv_sec)%100)*1000000 + curTime.tv_usec) - (((id.tv.tv_sec)%100)*1000000+id.tv.tv_usec); break; } tmpRttable = tmpRttable->nextptr; } pthread_mutex_unlock(&rtMutex); } else { fprintf(stderr, "FATAL: Line %d. Receiving packet failed\n", __LINE__); exit(1); } } } /* The main function!! */ int main(int argc, char *argv[], char *envp[]) { pthread_t locationThreadSend, locationThreadRecv, serviceRequestThread, RTTThread; struct sockaddr_in myAddr; struct passwd *pw; struct hostent *host; char buf[132]; int new_fd; int len; if ((mySocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { fprintf(stderr, "FATAL: Line %d. Cannot create a socket to listen for requests\n", __LINE__); exit(1); } bzero((void *) &myAddr, sizeof(myAddr)); myAddr.sin_family = AF_INET; myAddr.sin_addr.s_addr = htonl(INADDR_ANY); myAddr.sin_port = 0; /* Some port */ if (bind(mySocket,(struct sockaddr *) &myAddr, sizeof(myAddr)) < 0) { fprintf(stderr, "Cannot bind socket. Another server running?\n"); exit(1); } listen(mySocket, 5); /* Find the assigned port */ len = sizeof(myAddr); if (getsockname(mySocket, (struct sockaddr *)&myAddr, &len) < 0) { fprintf(stderr, "Getsockname failed\n"); exit(1); } myId.port = myAddr.sin_port; /* Find the host name */ gethostname(buf, sizeof(buf)); if ((host = gethostbyname (buf)) == NULL) { fprintf(stderr, "Cannot find my host entry\n"); exit(1); } bcopy( host->h_addr, &myId.location, 4); pw = getpwuid(getuid()); strncpy(myId.name, pw->pw_name, 30); knownId = NULL; tupleSet = NULL; pthread_mutex_init(&idMutex, NULL); pthread_mutex_init(&rtMutex, NULL); if (pthread_create(&locationThreadSend, NULL, locatePeersSend, NULL) < 0) { fprintf(stderr, "FATAL: Line %d. Cannot create a thread\n", __LINE__); exit(1); } if (pthread_create(&locationThreadRecv, NULL, locatePeersRecv, NULL) < 0) { fprintf(stderr, "FATAL: Line %d. Cannot create a thread\n", __LINE__); exit(1); } if (pthread_create(&RTTThread, NULL, RTT, NULL) < 0) { fprintf(stderr, "FATAL: Line %d. Cannot create a thread\n", __LINE__); exit(1); } while(1) { new_fd = accept(mySocket, (struct sockaddr *)NULL, NULL); if (pthread_create(&serviceRequestThread, NULL, serviceRequest, &new_fd) < 0) { fprintf(stderr, "FATAL: Line %d. Cannot create a thread\n", __LINE__); exit(1); } } }