本程序的作用,创建两个线程,线程A创建Unix套接字客户端,与Unix套接字服务端通信,收发数据,线程B创建websocket服务端,与浏览器通信,收发数据。

#include <stdio.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/wait.h>
#include <errno.h>
#include <arpa/inet.h> 
#include <stddef.h>
#include <sys/un.h>
#include <pthread.h>
#include <stdatomic.h> // 原子操作
#include <stdbool.h>
#include <ws.h>
#include "../include/json_transfer.h"

#define TRUE 1
#define FALSE 0
#define PORT 3389

pthread_cond_t pready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t plock = PTHREAD_MUTEX_INITIALIZER;

char *client_path = "client.socket";
char *server_path = "server.socket";

// 基本结构体
struct varData{
    char name[5];
    int value;
    int status;
};

// 定义全局变量
struct varData p[2];

// 自定义信号变量,用于判断数据是否发送和退出
atomic_int sig;

void onopen(int fd)
{
    char *cli;
    cli = ws_getaddress(fd);
#ifndef DISABLE_VERBOSE
    printf("Connection opened, client: %d | addr: %s\n", fd, cli);
#endif
    free(cli);
}

void onclose(int fd)
{
    char *cli;
    cli = ws_getaddress(fd);
#ifndef DISABLE_VERBOSE
    printf("Connection closed, client: %d | addr: %s\n", fd, cli);
#endif
    free(cli);
    //atomic_store(&sig,0);
}

void onmessage(int fd, const unsigned char *msg, uint64_t size, int type)
{
    char *cli;
    cli = ws_getaddress(fd);
#ifndef DISABLE_VERBOSE
    printf("I receive a message: %s (size: %" PRId64 ", type: %d), from: %s/%d\n",
        msg, size, type, cli, fd);    
#endif
    free(cli);

    char recvData[5];    
    memcpy(recvData, msg, sizeof(recvData));

    if((strncmp(recvData, "play", 5)) == 0)
    {
        atomic_store(&sig,2);
        for(int i=0; i<10;i++)
        {
            pthread_mutex_lock(&plock);
            pthread_cond_wait(&pready,&plock);
            json_elem_t root[] = {
                {p[0].name,JSON_TYPE_INT,0,0,&(p[0].value)}, 
                {p[1].name,JSON_TYPE_INT,0,0,&(p[1].value)},
                {NULL,0,0,0,NULL}
            };
            pthread_mutex_unlock(&plock);
            char *json = transfer_data_to_json((json_elem_t *)root);
            ws_sendframe_txt(fd,json, false);
            free(json);
        }
    }
    if((strncmp(recvData, "exit", 5)) == 0)
    {
        atomic_store(&sig,1);
        ws_stop(fd);
        return;
    }
}

void error(char *msg)
{
    fprintf(stderr, "%s: %s\n", msg, strerror(errno));
    exit(1);
}

void info(char *msg)
{
    fprintf(stdout,"%s\n",msg);
}

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
int recvall (int s,char *buf,int *len)
{
    int total = 0;
    int bytesleft = *len;
    int n;

    while (total<*len)
    {
        n= recv(s,buf+total,bytesleft,0);
        if(n==-1){break;}
        total += n;
        bytesleft -=n;
    }

    *len = total;
    return n==-1?-1:0;
}

int sendall(int s, char *buf, int *len)
{
    int total = 0;
    int bytesleft = *len;
    int n;

    while(total < *len) {
        n = send(s, buf+total, bytesleft, 0);
        if (n == -1) { break; }
        total += n;
        bytesleft -= n;
    }

    *len = total;

    return n==-1?-1:0;
} 
#pragma GCC diagnostic pop

// websocket 线程
void * websockets()
{
    struct ws_events evs;
    evs.onopen    = &onopen;
    evs.onclose   = &onclose;
    evs.onmessage = &onmessage;
    ws_socket(&evs, 8080, 0); /* Never returns. */

    return NULL;
}

//  Unix 线程,用于和数据采集进程通信
void * unixSocket()
{
    struct sockaddr_un serun,cliun;
    int sockfd,len;
    if((sockfd = socket(AF_UNIX, SOCK_STREAM,0))<0)
    {
        error("Client socket error...");
    }

    memset(&cliun,0,sizeof(cliun));
    cliun.sun_family = AF_UNIX;
    strcpy(cliun.sun_path,client_path);
    len=offsetof(struct sockaddr_un,sun_path)+strlen(cliun.sun_path);
    unlink(cliun.sun_path);
    if(bind(sockfd,(struct sockaddr *)&cliun,len)<0)
    {
        error("Bind error...");
    }
    memset(&serun,0,sizeof(serun));
    serun.sun_family= AF_UNIX;
    strcpy(serun.sun_path,server_path);
    len=offsetof(struct sockaddr_un,sun_path)+strlen(serun.sun_path);
    if(connect(sockfd,(struct sockaddr *)&serun,len)<0)
    {
        error("Connect error...");
    }

    int needRecv = sizeof(p);
    char *buffer = malloc(needRecv);
    char send_data[5];

    for(;;)
    {
        if(sig == 2)    // 信号为2时发送
        {            
            strcpy(send_data, "play");
            send(sockfd, send_data, sizeof(send_data),0);
            for(int i = 0; i < 10; i++)
            {
                recvall(sockfd,buffer, &needRecv);
                pthread_mutex_lock(&plock);
                memcpy(p, buffer, needRecv);
                pthread_mutex_unlock(&plock);
                pthread_cond_signal(&pready);
                printf("The temp is: %d, humd is: %d\n",p[0].value,p[1].value);
            }
            // 发送完成后重置信号
            atomic_store(&sig,0);
        }
        else if(sig == 1)   // 信号为1时退出
        {
            strcpy(send_data, "exit");
            send(sockfd, send_data, sizeof(send_data),0);
            shutdown(sockfd,SHUT_RDWR);
            break;
        }
    }     

    free(buffer);
    close(sockfd);
    return NULL;

}

int main(void)
{

    pthread_t t0,t1;

    if(pthread_create(&t0, NULL,unixSocket,NULL)==-1)
    {
        error("Can't create thread unixSocket");
    }
    if(pthread_create(&t1,NULL,websockets,NULL)==-1)
    {
         error("Can't create thread websockets");
    }

    void *reslut;
    if(pthread_join(t0,&reslut)==-1)
    {
        error("Can't reclaim thread t0");
    }
    if(pthread_join(t1,&reslut)==-1)
    {
        error("Can't reclaim thread t1");
    }

    //销毁条件变量
    pthread_cond_destroy(&pready);

    return 0;
}

当浏览器端发送play字符串时,sig信号量设置为2,线程A发送play给服务端开始收发数据,同时线程B也将全局变量中的数据发送给浏览器端。

当线程A收发10次数据后,将sig信号量设置为0,等待浏览器再次发送play字符串,避免浏览器关闭后仍在收发数据造成浪费。

当浏览器发送exit字符串时,线程B自动结束并将sig信号量设置为1,线程A检测到sig为1时也自动结束,然后程序结束。

使用valgrind调试未发现内存泄露。

本程序使用到了几个开源库

wsServer

json_transfer

另外wsServer库本身未包含结束功能,需要使用作者写的一个补丁实现

diff --git a/example/send_receive.c b/example/send_receive.c
index 27f28aa..9784a34 100644
--- a/example/send_receive.c
+++ b/example/send_receive.c
@@ -17,6 +17,7 @@
 
 #include <stdbool.h>
 #include <stdio.h>
+#include <string.h>
 #include <stdlib.h>
 #include <unistd.h>
 #include <ws.h>
@@ -87,6 +88,12 @@ void onmessage(int fd, const unsigned char *msg, uint64_t size, int type)
 #endif
     free(cli);
 
+    if (!strcmp((char*)msg, "exit"))
+    {
+        ws_stop(fd);
+        return;
+    }
+
     /**
      * Mimicks the same frame type received and re-send it again
      *
diff --git a/include/ws.h b/include/ws.h
index 5ef66b8..e862e48 100644
--- a/include/ws.h
+++ b/include/ws.h
@@ -261,6 +261,7 @@
     extern int ws_get_state(int fd);
     extern int ws_close_client(int fd);
     extern int ws_socket(struct ws_events *evs, uint16_t port, int thread_loop);
+    extern void ws_stop(int fd);
 
 #ifdef AFL_FUZZ
     extern int ws_file(struct ws_events *evs, const char *file);
diff --git a/src/ws.c b/src/ws.c
index 3c0e030..7e19393 100644
--- a/src/ws.c
+++ b/src/ws.c
@@ -114,6 +114,9 @@ struct ws_connection
  */
 struct ws_connection client_socks[MAX_CLIENTS];
 
+int should_close = 0;
+struct ws_accept *accept_data;
+
 /**
  * @brief WebSocket frame data
  */
@@ -1367,6 +1370,13 @@ closed:
     return (vsock);
 }
 
+void ws_stop(int fd)
+{
+    should_close = 1;
+    close_socket(fd);
+    close_socket(accept_data->sock);
+}
+
 /**
  * @brief Main loop that keeps accepting new connections.
  *
@@ -1381,7 +1391,6 @@ closed:
  */
 static void *ws_accept(void *data)
 {
-    struct ws_accept *accept_data; /* Accept thread data.    */
     struct sockaddr_in client;     /* Client.                */
     pthread_t client_thread;       /* Client thread.         */
     int connection_index;          /* Free connection slot.  */
@@ -1390,17 +1399,19 @@ static void *ws_accept(void *data)
     int i;                         /* Loop index.            */
 
     connection_index = 0;
-    accept_data      = data;
     len              = sizeof(struct sockaddr_in);
 
-    while (1)
+    while (!should_close)
     {
         /* Accept. */
         new_sock =
             accept(accept_data->sock, (struct sockaddr *)&client, (socklen_t *)&len);
 
         if (new_sock < 0)
-            panic("Error on accepting connections..");
+        {
+            fprintf(stderr, "Error on accepting connections..");
+            goto out;
+        }
 
         /* Adds client socket to socks list. */
         pthread_mutex_lock(&mutex);
@@ -1435,6 +1446,7 @@ static void *ws_accept(void *data)
         else
             close_socket(new_sock);
     }
+out:
     free(data);
     return (data);
 }
@@ -1459,7 +1471,6 @@ static void *ws_accept(void *data)
  */
 int ws_socket(struct ws_events *evs, uint16_t port, int thread_loop)
 {
-    struct ws_accept *accept_data; /* Accept thread data.    */
     struct sockaddr_in server;     /* Server.                */
     pthread_t accept_thread;       /* Accept thread.         */
     int reuse;                     /* Socket option.         */

命名为niceclose.patch

使用时

# Clone a fresh wsServer
$ git clone https://github.com/Theldus/wsServer.git
$ cd wsServer/

# Download the patch
$ wget https://git.io/JE7ms -O niceclose.patch

# Apply it
$ git apply niceclose.patch

# Build everything and run
$ make
$ ./example/send_receive

此补丁只能在只有一个浏览器客户端时使用。不要在生产环境上使用。

标签: none

评论已关闭