#include #include #include #include #include #include #include #include #include #include #include static char rcsid[] = "$Id: ubicomp_hwp1.c,v 1.1 2002/02/05 07:58:04 surendar Exp $"; typedef struct identification { char name[32]; /* Name of the current client */ // Specify how we can be contacted. in_addr_t location; /* IP address of the client */ in_port_t port; /* port where the client is listening */ // Specify my credentials char key[32]; /* My authentication key. Not used for this project */ // Specify how to talk to us. Not used for this project unsigned int type; /* whether we talk XML, HTTP, Corba protocols */ unsigned int length; /* Length of protocol specific data in buf */ } identification_t; identification_t myId; typedef struct ids { identification_t *id; time_t lastTime; /* Last time we heard from this peer. For garbage collection */ struct ids *nextptr; } ids_t; struct ids *knownId = NULL; pthread_mutex_t idMutex = PTHREAD_MUTEX_INITIALIZER; /* Lock */ /* Data structures for the key:tuple service */ struct tuples { char *key; char *value; struct tuples *nextptr; } *tupleSet; int mySocket; /* Socket for providing service */ #define MCAST_PORT 6556 #define MCAST_ADDR "239.192.192.239" /* How often the identification structure is multicast (in secs) */ #define BEAT 1 /* Useful utility function */ char *Inet_ntop(unsigned int in) { static char buf[1024]; inet_ntop(AF_INET, &in, buf, 1024); return buf; } // The sending part of the peer location system. This thread garbage // collects and sends the local identification structure every BEAT // seconds void *locatePeersSend(void *arg) { struct sockaddr_in s_sa; struct ids *ptr, *prevPtr; struct tm *tm; time_t curTime; int sendfd; int on; /* Create the socket for sending my identification_t */ sendfd = socket(AF_INET, SOCK_DGRAM, 0); bzero((void *) &s_sa, sizeof(s_sa)); s_sa.sin_family = AF_INET; s_sa.sin_port = htons(MCAST_PORT); s_sa.sin_addr.s_addr = inet_addr(MCAST_ADDR); #ifndef sparc on = 1; if (setsockopt(sendfd, IPPROTO_IP, IP_MULTICAST_LOOP, &on, sizeof(on)) < 0) { fprintf(stderr, "FATAL: Line %d. Cannot setsockopt for IP_MULTICAST_LOOP\n", __LINE__); exit(1); } #endif /* sparc */ while (1) { curTime = time(NULL); // lock shared data structures pthread_mutex_lock(&idMutex); ptr = knownId; prevPtr = NULL; while(ptr) { tm = localtime(&curTime); if (ptr->lastTime < (curTime - 3*BEAT)) { /* Have we received any packet within the last 3 ticks */ printf("%02d/%02d/%04d %02d:%02d:%d '%s' LEAVE %s:%d\n", tm->tm_mon+1, tm->tm_mday, 1900+tm->tm_year, tm->tm_hour, tm->tm_min, tm->tm_sec, ptr->id->name, Inet_ntop(ptr->id->location), ntohs(ptr->id->port)); if (prevPtr == NULL) { free(knownId); knownId = NULL; break; } else { prevPtr->nextptr = ptr->nextptr; free(ptr); } } else { /* We did hear something within 3*BEAT */ printf("%02d/%02d/%04d %02d:%02d:%d '%s' MAINTAIN %s:%d\n", tm->tm_mon+1, tm->tm_mday, 1900+tm->tm_year, tm->tm_hour, tm->tm_min, tm->tm_sec, ptr->id->name, Inet_ntop(ptr->id->location), ntohs(ptr->id->port)); } if (ptr == NULL) break; ptr = ptr->nextptr; } //UNLOCK pthread_mutex_unlock(&idMutex); if (sendto(sendfd, &myId, sizeof(myId), 0, (struct sockaddr*) &s_sa, sizeof(s_sa)) < 0) { fprintf(stderr, "FATAL: Line %d. Cannot send my identity\n", __LINE__); exit(1); } sleep(BEAT); } } // Thread that receives heart beats from other peers. void *locatePeersRecv(void *arg) { struct sockaddr_in r_sa; struct ip_mreq mreq; identification_t id, *tmpid; struct ids *ptr, *tmpids; struct tm *tm; time_t curTime; int recvfd; int on; /* Create the socket for receiving messages from other beacons */ recvfd = socket(AF_INET, SOCK_DGRAM, 0); on = 1; if (setsockopt(recvfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) { fprintf(stderr, "FATAL: Line %d. Setsockopt failed for SO_REUSERADDR\n", __LINE__); exit(1); } /* Join the multicast group */ mreq.imr_multiaddr.s_addr = inet_addr(MCAST_ADDR); mreq.imr_interface.s_addr = htonl(INADDR_ANY); if (setsockopt(recvfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { fprintf(stderr, "FATAL: Line %d. Setsockopt failed for IP_ADD_MEMBERSHIP\n", __LINE__); exit(1); } /* Bind to the multicast port */ bzero((void *) &r_sa, sizeof(r_sa)); r_sa.sin_family = AF_INET; r_sa.sin_port = htons (MCAST_PORT); r_sa.sin_addr.s_addr = inet_addr(MCAST_ADDR); if (bind(recvfd, (struct sockaddr *) &r_sa, sizeof(r_sa)) < 0) { fprintf(stderr, "FATAL: Line %d. bind failed\n", __LINE__); exit(1); } while (1) { if(recv(recvfd, &id, sizeof(identification_t), 0) < 0) { fprintf(stderr, "FATAL: Line %d. Receiving packet failed\n", __LINE__); exit(1); } // LOCK pthread_mutex_lock(&idMutex); ptr = knownId; while (ptr) { if (((ptr->id)->location == id.location) && ((ptr->id)->port == id.port)) { if (strcmp((ptr->id)->name, id.name) != 0) strcpy((ptr->id)->name, id.name); ptr->lastTime = time(NULL); break; } ptr = ptr->nextptr; } if (ptr == NULL) { /* Not already there, create a new entry */ tmpid = (identification_t *) malloc(sizeof (identification_t)); memcpy(tmpid, &id, sizeof(identification_t)); tmpids = (struct ids *) malloc (sizeof(struct ids)); tmpids->id = tmpid; tmpids->lastTime = time(NULL); tmpids->nextptr = knownId; knownId = tmpids; pthread_mutex_unlock(&idMutex); curTime = time(NULL); tm = localtime(&curTime); printf("%02d/%02d/%04d %02d:%02d:%d '%s' ENTER %s:%d\n", tm->tm_mon+1, tm->tm_mday, 1900+tm->tm_year, tm->tm_hour, tm->tm_min, tm->tm_sec, id.name, Inet_ntop(id.location), ntohs(id.port)); } else pthread_mutex_unlock(&idMutex); // New beacon. See if we need to create/update a route entry } } /* Thread that responds to queries for key:value tuple */ void *serviceRequest(void *_new_fd) { char buffer[1024], *ptr, passwd[132], *pptr, outbuf[32], tmpbuf[32], key[128], value[256]; int i; int new_fd = *((int *)_new_fd); struct tuples *curTuple, *tmpTuple; write(new_fd, "READY\n",6); while(1) { ptr = buffer; while(read(new_fd, ptr, 1) > 0) { if (*ptr == '\r' || *ptr == '\n') { *ptr = '\0'; break; } ptr++; } ptr = buffer; if (*ptr == '\0') continue; // Simple command parser if (strncasecmp(ptr, "OPEN", 4) == 0) { // process open command ptr+=4; while (isspace(*ptr)) ptr++; pptr = passwd; while (isalnum(*ptr)) *pptr++ = *ptr++; sprintf(outbuf, "OK OPEN %X\n----\n", time(NULL)+60); write(new_fd, outbuf, strlen(outbuf)); continue; } if (strncasecmp(ptr, "SET", 3) == 0) { // process set command ptr+=3; while (isspace(*ptr)) ptr++; pptr = tmpbuf; while (isalnum(*ptr)) *pptr++ = *ptr++; sscanf(tmpbuf, "%X", &i); if (time(NULL)>i) write(new_fd, "INFO token expired token\n", 25); while (isspace(*ptr)) ptr++; pptr = key; while (isalnum(*ptr)) *pptr++ = *ptr++; *pptr++ = '\0'; while (isspace(*ptr)) ptr++; pptr = value; while (isalnum(*ptr)) *pptr++ = *ptr++; *pptr++ = '\0'; tmpTuple = tupleSet; while (tmpTuple) { if (strcmp(tmpTuple->key, key) == 0) { free(tmpTuple->value); tmpTuple->value = strdup(value); break; } tmpTuple = tmpTuple->nextptr; } if (tmpTuple == NULL) { curTuple = (struct tuples *) malloc (sizeof (struct tuples)); curTuple->key = strdup(key); curTuple->value = strdup(value); curTuple->nextptr = tupleSet; tupleSet = curTuple; } sprintf(outbuf, "OK SET %s:%s\n----\n", key, value); write(new_fd, outbuf, strlen(outbuf)); continue; } if (strncasecmp(ptr, "GET", 3) == 0) { // process get command ptr+=3;while (isspace(*ptr)) ptr++; pptr = tmpbuf; while (isalnum(*ptr)) *pptr++ = *ptr++; sscanf(tmpbuf, "%X", &i); if (time(NULL)>i) write(new_fd, "INFO token expired token\n", 25); while (isspace(*ptr)) ptr++; pptr = key; while (isalnum(*ptr)) *pptr++ = *ptr++; *pptr++ = '\0'; sprintf(outbuf, "OK GET %s\n", key); write(new_fd, outbuf, strlen(outbuf)); tmpTuple = tupleSet; while (tmpTuple) { if (strcmp(tmpTuple->key, key) == 0) { sprintf(outbuf, "%s\n----\n", tmpTuple->value); write(new_fd, outbuf, strlen(outbuf)); break; } tmpTuple = tmpTuple->nextptr; } if (tmpTuple == NULL) { sprintf(outbuf, "ERROR key not found\n----\n"); write(new_fd, outbuf, strlen(outbuf)); } continue; } if (strncasecmp(ptr, "LIST", 4) == 0) { // process list command ptr+=4; while (isspace(*ptr)) ptr++; pptr = tmpbuf; while (isalnum(*ptr)) *pptr++ = *ptr++; sscanf(tmpbuf, "%X", &i); if (time(NULL)>i) write(new_fd, "INFO token expired token\n", 25); while (isspace(*ptr)) ptr++; pptr = key; while (isalnum(*ptr)) *pptr++ = *ptr++; *pptr++ = '\0'; sprintf(outbuf, "OK LIST %s\n", key); write(new_fd, outbuf, strlen(outbuf)); tmpTuple = tupleSet; while (tmpTuple) { sprintf(outbuf, "%s\n", tmpTuple->key); write(new_fd, outbuf, strlen(outbuf)); tmpTuple = tmpTuple->nextptr; } write(new_fd, "----\n", 5); continue; } if (strncasecmp(ptr, "CLOSE", 5) == 0) { // process close command write(new_fd, "OK CLOSE\n----\n", 14); close(new_fd); pthread_exit(NULL); } write(new_fd, "PARDON?\n----\n", 13); } // Unreached } /* The main function!! */ int main(int argc, char *argv[], char *envp[]) { pthread_t locationThreadSend, locationThreadRecv, serviceRequestThread; struct sockaddr_in myAddr; struct passwd *pw; struct hostent *host; char buf[132]; int new_fd; int len; if ((mySocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { fprintf(stderr, "FATAL: Line %d. Cannot create a socket to listen for requests\n", __LINE__); exit(1); } bzero((void *) &myAddr, sizeof(myAddr)); myAddr.sin_family = AF_INET; myAddr.sin_addr.s_addr = htonl(INADDR_ANY); myAddr.sin_port = 0; /* Some port */ if (bind(mySocket,(struct sockaddr *) &myAddr, sizeof(myAddr)) < 0) { fprintf(stderr, "Cannot bind socket. Another server running?\n"); exit(1); } listen(mySocket, 5); /* Find the assigned port */ len = sizeof(myAddr); if (getsockname(mySocket, (struct sockaddr *)&myAddr, &len) < 0) { fprintf(stderr, "Getsockname failed\n"); exit(1); } myId.port = myAddr.sin_port; /* Find the host name */ gethostname(buf, sizeof(buf)); if ((host = gethostbyname (buf)) == NULL) { fprintf(stderr, "Cannot find my host entry\n"); exit(1); } bcopy( host->h_addr, &myId.location, 4); pw = getpwuid(getuid()); strncpy(myId.name, pw->pw_name, 30); knownId = NULL; tupleSet = NULL; pthread_mutex_init(&idMutex, NULL); if (pthread_create(&locationThreadSend, NULL, locatePeersSend, NULL) < 0) { fprintf(stderr, "FATAL: Line %d. Cannot create a thread\n", __LINE__); exit(1); } if (pthread_create(&locationThreadRecv, NULL, locatePeersRecv, NULL) < 0) { fprintf(stderr, "FATAL: Line %d. Cannot create a thread\n", __LINE__); exit(1); } while(1) { new_fd = accept(mySocket, (struct sockaddr *)NULL, NULL); if (pthread_create(&serviceRequestThread, NULL, serviceRequest, &new_fd) < 0) { fprintf(stderr, "FATAL: Line %d. Cannot create a thread\n", __LINE__); exit(1); } } }