本文共 10378 字,大约阅读时间需要 34 分钟。
消息队列中的消息结构可以由我们自由定义,具备较强的灵活性。通过消息结构可以共享一个队列,进行消息复用。通常定义一个类似如下的消息结构:
#define MSGMAXDAT 1024struct mymsg{ long msg_len; //消息长度 long msg_type; //消息类型 long msg_data[MSGMAXDATA]; //消息内容};
消息结构相关联的类型字段(msg_type)提供了两个特性:
(1)标识消息,使得多个进程在单个队列上复用消息。
(2)用作优先级字段,允许接收者以不同于先进先出的某个顺序读出各个消息。
例子1:每个应用一个队列,可以在多个客户和单个服务器之间复用消息。使用一个消息队列进行通信,由消息类型标识消息是从客户到服务器,还是服务器到客户。通信模型如下:
按照通信模型编写程序如下:
公共头文件svmsg.h
1 #ifndef SVMSG_H 2 #define SVMSG_H 3 #include4 #include 5 #include 6 #include 7 #include 8 #include 9 #include 10 #include 11 12 #define MSG_R 0400 /* read permission */13 #define MSG_W 0200 /* write permission */14 #define SVMSG_MODE (MSG_R | MSG_W | MSG_R >>3 | MSG_R >>6)15 #define MQ_KEY 1234L16 #define MSGMAX 102417 //消息结构18 struct mymesg19 {20 long mesg_len;21 long mesg_type;22 char mesg_data[MSGMAX];23 };24 #endif
客户端程序sysv_client.c
1 #include "svmsg.h" 2 void client(int ,int); 3 4 int main(int argc,char *argv[]) 5 { 6 int msqid; 7 if((msqid = msgget(MQ_KEY,0)) == -1) 8 { 9 perror("megget()");10 exit(-1);11 }12 client(msqid,msqid);13 exit(0);14 }15 16 void client(int readfd,int writefd)17 {18 size_t len;19 ssize_t n;20 char *ptr;21 struct mymesg mesg;22 printf("Send request to server.\n");23 //set pid to message24 snprintf(mesg.mesg_data,MSGMAX,"%ld",(long)getpid());25 len = strlen(mesg.mesg_data);26 mesg.mesg_data[len] = ' '; //blank27 ptr = mesg.mesg_data+len+1;28 printf("Enter filename: ");29 fgets(ptr,MSGMAX-len,stdin);30 len = strlen(mesg.mesg_data);31 if(mesg.mesg_data[len-1] == '\n')32 len--;33 mesg.mesg_len = len;34 mesg.mesg_type = 1;35 printf("mesg_data is :%s len=%ld\n",mesg.mesg_data, mesg.mesg_len);36 if(msgsnd(writefd,&(mesg.mesg_type),mesg.mesg_len,0) == -1)37 {38 perror("msgsnd() error");39 exit(-1);40 }41 //read from IPC,write to standard output42 mesg.mesg_type = getpid();43 while( (n = msgrcv(readfd,&(mesg.mesg_type),MSGMAX,mesg.mesg_type,0))>0)44 {45 write(STDOUT_FILENO,mesg.mesg_data,n);46 putchar('\n');47 }48 if(n == 0 )49 {50 printf("Read file from server is completed.\n");51 }52 if(n == -1)53 {54 perror("msgrcv() error");55 exit(-1);56 }57 }
服务器程序sysv_server.c
1 #include "svmsg.h" 2 void server(int ,int); 3 int main(int argc,char *argv[]) 4 { 5 int msqid; 6 if((msqid = msgget(MQ_KEY,SVMSG_MODE | IPC_CREAT)) == -1) 7 { 8 perror("megget()"); 9 exit(-1);10 }11 server(msqid,msqid);12 exit(0);13 }14 15 void server(int readfd,int writefd)16 {17 FILE *fp;18 char *ptr;19 pid_t pid;20 ssize_t n;21 ssize_t len;22 struct mymesg mesg;23 printf("Waiting for client......\n");24 for(; ;)25 {26 mesg.mesg_type = 1;27 if((n = msgrcv(readfd,&(mesg.mesg_type),MSGMAX,mesg.mesg_type,0)) == 0)28 {29 printf("pathname missing.\n");30 continue;31 }32 mesg.mesg_data[n] = '\0';33 printf("Received message from client is: %s\n",mesg.mesg_data);34 if ((ptr = strchr(mesg.mesg_data,' ')) == NULL)35 {36 printf("bogus request: %s\n",mesg.mesg_data);37 continue;38 }39 *ptr++ = 0;40 pid = atoi(mesg.mesg_data);41 mesg.mesg_type = pid;42 //open fiel and read data43 if((fp = fopen(ptr,"r")) == NULL)44 {45 printf("open file failed.sent msg to client\n");46 snprintf(mesg.mesg_data+n,sizeof(mesg.mesg_data)-n,": can't open,%s\n",strerror(errno));47 mesg.mesg_len = strlen(ptr);48 memmove(mesg.mesg_data,ptr,mesg.mesg_len);49 if(msgsnd(writefd,&(mesg.mesg_type),mesg.mesg_len,0) == -1)50 {51 perror("msgsnd() error");52 exit(-1);53 }54 }55 else56 {57 printf("open file successed.sent file to client\n");58 while(fgets(mesg.mesg_data,MSGMAX,fp) != NULL)59 {60 mesg.mesg_len = strlen(mesg.mesg_data);61 if(msgsnd(writefd,&(mesg.mesg_type),mesg.mesg_len,0) == -1)62 {63 perror("msgsnd() error");64 exit(-1);65 }66 }67 fclose(fp);68 }69 printf("send compelted.\n");70 mesg.mesg_len = 0;71 if(msgsnd(writefd,&(mesg.mesg_type),mesg.mesg_len,0) == -1)72 {73 perror("msgsnd() error");74 exit(-1);75 }76 }77 }
程序测试结果如下所示:
例子2:每个客户一个队列,将例子1改成所有用户用一个共同的消息队列向服务器发送消息,给每个客户分配一个消息队列,使得服务器对每个客户进行应答。通信模型如下:
以并发服务器模型编写这个程序,服务器给每个客户fork一个子进程进行处理。程序如下:
公共头文件svmsg.h和svmsg.c:
1 //svmsg.h file 2 #ifndef SVMSG_H 3 #define SVMSG_H 4 #include5 #include 6 #include 7 #include 8 #include 9 #include 10 #include 11 #include 12 #include 13 14 #define MSG_R 0400 /* read permission */15 #define MSG_W 0200 /* write permission */16 #define SVMSG_MODE (MSG_R | MSG_W | MSG_R >>3 | MSG_R >>6)17 #define MQ_KEY 1234L18 #define MSGMAX 102419 //message structure20 struct mymesg21 {22 long mesg_len;23 long mesg_type;24 char mesg_data[MSGMAX];25 };26 27 ssize_t mesg_send(int id,struct mymesg *mptr);28 ssize_t mesg_recv(int id,struct mymesg *mptr);29 30 void Mesg_send(int id,struct mymesg *mptr);31 ssize_t Mesg_recv(int id,struct mymesg *mptr);32 #endif
1 //svmsg.c file 2 #include "svmsg.h" 3 4 ssize_t mesg_send(int id,struct mymesg *mptr) 5 { 6 return (msgsnd(id,&(mptr->mesg_type),mptr->mesg_len,0)); 7 } 8 9 ssize_t mesg_recv(int id,struct mymesg *mptr)10 {11 ssize_t n;12 n = msgrcv(id,&(mptr->mesg_type),MSGMAX,mptr->mesg_type,0);13 mptr->mesg_len = n;14 return n;15 }16 17 void Mesg_send(int id,struct mymesg *mptr)18 {19 if(mesg_send(id,mptr) == -1)20 {21 perror("mesg_send() error");22 exit(-1);23 }24 }25 ssize_t Mesg_recv(int id,struct mymesg *mptr)26 {27 ssize_t n;28 do29 {30 n = mesg_recv(id,mptr);31 }while(n==-1 && errno == EINTR);32 if(n == -1)33 {34 perror("mesg_recv() error");35 exit(-1);36 }37 return n;38 }
客户端程序如下:
1 #include "svmsg.h" 2 3 void client(int ,int); 4 5 int main(int argc,char *argv[]) 6 { 7 int readid,writeid; 8 if((writeid = msgget(MQ_KEY,0)) == -1) 9 {10 perror("megget()");11 exit(-1);12 }13 if((readid = msgget(IPC_PRIVATE,SVMSG_MODE | IPC_CREAT)) == -1)14 {15 perror("megget()");16 exit(-1);17 }18 client(readid,writeid);19 msgctl(readid,IPC_RMID,NULL);20 exit(0);21 }22 23 void client(int readid,int writeid)24 {25 size_t len;26 ssize_t n;27 char *ptr;28 struct mymesg mesg;29 printf("Send request to server.\n");30 //set pid to message31 snprintf(mesg.mesg_data,MSGMAX,"%d",readid);32 len = strlen(mesg.mesg_data);33 mesg.mesg_data[len] = ' '; //blank34 ptr = mesg.mesg_data+len+1;35 printf("Enter filename: ");36 fgets(ptr,MSGMAX-len,stdin);37 len = strlen(mesg.mesg_data);38 if(mesg.mesg_data[len-1] == '\n')39 len--;40 mesg.mesg_len = len;41 mesg.mesg_type = 1;42 printf("mesg_data is :%s\n",mesg.mesg_data);43 Mesg_send(writeid,&mesg);44 printf("Send messge to server successed.\n");45 //read from IPC,write to standard output46 while( (n = Mesg_recv(readid,&mesg))>0)47 {48 write(STDOUT_FILENO,mesg.mesg_data,n);49 putchar('\n');50 }51 if(n == 0 )52 {53 printf("Read file from server is completed.\n");54 }55 }
服务器程序如下:
1 #include "svmsg.h" 2 3 void server(int ,int); 4 void sig_child(int signo); 5 6 int main(int argc,char *argv[]) 7 { 8 int msqid; 9 if((msqid = msgget(MQ_KEY,SVMSG_MODE | IPC_CREAT)) == -1)10 {11 perror("megget()");12 exit(-1);13 }14 server(msqid,msqid);15 exit(0);16 }17 18 void server(int readid,int writeid)19 {20 FILE *fp;21 char *ptr;22 pid_t pid;23 ssize_t n;24 ssize_t len;25 struct mymesg mesg;26 signal(SIGCHLD,sig_child);27 printf("Waiting for client......\n");28 for(; ;)29 {30 mesg.mesg_type = 1;31 if((n = Mesg_recv(readid,&mesg)) == 0)32 {33 printf("pathname missing.\n");34 continue;35 }36 mesg.mesg_data[n] = '\0';37 printf("Received message from client is: %s\n",mesg.mesg_data);38 if ((ptr = strchr(mesg.mesg_data,' ')) == NULL)39 {40 printf("bogus request: %s\n",mesg.mesg_data);41 continue;42 }43 *ptr++ = 0;44 writeid = atoi(mesg.mesg_data);45 if(fork() == 0)46 {47 //open fiel and read data48 if((fp = fopen(ptr,"r")) == NULL)49 {50 printf("open file failed.sent msg to client\n");51 snprintf(mesg.mesg_data+n,sizeof(mesg.mesg_data)-n,": can't open,%s\n",strerror(errno));52 mesg.mesg_len = strlen(ptr);53 memmove(mesg.mesg_data,ptr,mesg.mesg_len);54 Mesg_send(writeid,&mesg);55 }56 else57 {58 printf("open file successed.sent file to client\n");59 while(fgets(mesg.mesg_data,MSGMAX,fp) != NULL)60 {61 mesg.mesg_len = strlen(mesg.mesg_data);62 Mesg_send(writeid,&mesg);63 }64 fclose(fp);65 }66 printf("send compelted.\n");67 mesg.mesg_len = 0;68 Mesg_send(writeid,&mesg);69 }70 }71 }72 73 void sig_child(int signo)74 {75 pid_t pid;76 int stat;77 while ((pid = waitpid(-1,&stat,WNOHANG)) > 0);78 return ;79 }
程序测试结果如下:
转载地址:http://adqta.baihongyu.com/