在C语言中使用websocket和json序列化
本程序的作用,创建两个线程,线程A创建Unix套接字客户端,与Unix套接字服务端通信,收发数据,线程B创建websocket服务端,与浏览器通信,收发数据。
#include <stdio.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/wait.h>
#include <errno.h>
#include <arpa/inet.h>
#include <stddef.h>
#include <sys/un.h>
#include <pthread.h>
#include <stdatomic.h> // 原子操作
#include <stdbool.h>
#include <ws.h>
#include "../include/json_transfer.h"
#define TRUE 1
#define FALSE 0
#define PORT 3389
pthread_cond_t pready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t plock = PTHREAD_MUTEX_INITIALIZER;
char *client_path = "client.socket";
char *server_path = "server.socket";
// 基本结构体
struct varData{
char name[5];
int value;
int status;
};
// 定义全局变量
struct varData p[2];
// 自定义信号变量,用于判断数据是否发送和退出
atomic_int sig;
void onopen(int fd)
{
char *cli;
cli = ws_getaddress(fd);
#ifndef DISABLE_VERBOSE
printf("Connection opened, client: %d | addr: %s\n", fd, cli);
#endif
free(cli);
}
void onclose(int fd)
{
char *cli;
cli = ws_getaddress(fd);
#ifndef DISABLE_VERBOSE
printf("Connection closed, client: %d | addr: %s\n", fd, cli);
#endif
free(cli);
//atomic_store(&sig,0);
}
void onmessage(int fd, const unsigned char *msg, uint64_t size, int type)
{
char *cli;
cli = ws_getaddress(fd);
#ifndef DISABLE_VERBOSE
printf("I receive a message: %s (size: %" PRId64 ", type: %d), from: %s/%d\n",
msg, size, type, cli, fd);
#endif
free(cli);
char recvData[5];
memcpy(recvData, msg, sizeof(recvData));
if((strncmp(recvData, "play", 5)) == 0)
{
atomic_store(&sig,2);
for(int i=0; i<10;i++)
{
pthread_mutex_lock(&plock);
pthread_cond_wait(&pready,&plock);
json_elem_t root[] = {
{p[0].name,JSON_TYPE_INT,0,0,&(p[0].value)},
{p[1].name,JSON_TYPE_INT,0,0,&(p[1].value)},
{NULL,0,0,0,NULL}
};
pthread_mutex_unlock(&plock);
char *json = transfer_data_to_json((json_elem_t *)root);
ws_sendframe_txt(fd,json, false);
free(json);
}
}
if((strncmp(recvData, "exit", 5)) == 0)
{
atomic_store(&sig,1);
ws_stop(fd);
return;
}
}
void error(char *msg)
{
fprintf(stderr, "%s: %s\n", msg, strerror(errno));
exit(1);
}
void info(char *msg)
{
fprintf(stdout,"%s\n",msg);
}
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
int recvall (int s,char *buf,int *len)
{
int total = 0;
int bytesleft = *len;
int n;
while (total<*len)
{
n= recv(s,buf+total,bytesleft,0);
if(n==-1){break;}
total += n;
bytesleft -=n;
}
*len = total;
return n==-1?-1:0;
}
int sendall(int s, char *buf, int *len)
{
int total = 0;
int bytesleft = *len;
int n;
while(total < *len) {
n = send(s, buf+total, bytesleft, 0);
if (n == -1) { break; }
total += n;
bytesleft -= n;
}
*len = total;
return n==-1?-1:0;
}
#pragma GCC diagnostic pop
// websocket 线程
void * websockets()
{
struct ws_events evs;
evs.onopen = &onopen;
evs.onclose = &onclose;
evs.onmessage = &onmessage;
ws_socket(&evs, 8080, 0); /* Never returns. */
return NULL;
}
// Unix 线程,用于和数据采集进程通信
void * unixSocket()
{
struct sockaddr_un serun,cliun;
int sockfd,len;
if((sockfd = socket(AF_UNIX, SOCK_STREAM,0))<0)
{
error("Client socket error...");
}
memset(&cliun,0,sizeof(cliun));
cliun.sun_family = AF_UNIX;
strcpy(cliun.sun_path,client_path);
len=offsetof(struct sockaddr_un,sun_path)+strlen(cliun.sun_path);
unlink(cliun.sun_path);
if(bind(sockfd,(struct sockaddr *)&cliun,len)<0)
{
error("Bind error...");
}
memset(&serun,0,sizeof(serun));
serun.sun_family= AF_UNIX;
strcpy(serun.sun_path,server_path);
len=offsetof(struct sockaddr_un,sun_path)+strlen(serun.sun_path);
if(connect(sockfd,(struct sockaddr *)&serun,len)<0)
{
error("Connect error...");
}
int needRecv = sizeof(p);
char *buffer = malloc(needRecv);
char send_data[5];
for(;;)
{
if(sig == 2) // 信号为2时发送
{
strcpy(send_data, "play");
send(sockfd, send_data, sizeof(send_data),0);
for(int i = 0; i < 10; i++)
{
recvall(sockfd,buffer, &needRecv);
pthread_mutex_lock(&plock);
memcpy(p, buffer, needRecv);
pthread_mutex_unlock(&plock);
pthread_cond_signal(&pready);
printf("The temp is: %d, humd is: %d\n",p[0].value,p[1].value);
}
// 发送完成后重置信号
atomic_store(&sig,0);
}
else if(sig == 1) // 信号为1时退出
{
strcpy(send_data, "exit");
send(sockfd, send_data, sizeof(send_data),0);
shutdown(sockfd,SHUT_RDWR);
break;
}
}
free(buffer);
close(sockfd);
return NULL;
}
int main(void)
{
pthread_t t0,t1;
if(pthread_create(&t0, NULL,unixSocket,NULL)==-1)
{
error("Can't create thread unixSocket");
}
if(pthread_create(&t1,NULL,websockets,NULL)==-1)
{
error("Can't create thread websockets");
}
void *reslut;
if(pthread_join(t0,&reslut)==-1)
{
error("Can't reclaim thread t0");
}
if(pthread_join(t1,&reslut)==-1)
{
error("Can't reclaim thread t1");
}
//销毁条件变量
pthread_cond_destroy(&pready);
return 0;
}当浏览器端发送play字符串时,sig信号量设置为2,线程A发送play给服务端开始收发数据,同时线程B也将全局变量中的数据发送给浏览器端。
当线程A收发10次数据后,将sig信号量设置为0,等待浏览器再次发送play字符串,避免浏览器关闭后仍在收发数据造成浪费。
当浏览器发送exit字符串时,线程B自动结束并将sig信号量设置为1,线程A检测到sig为1时也自动结束,然后程序结束。
使用valgrind调试未发现内存泄露。
本程序使用到了几个开源库
另外wsServer库本身未包含结束功能,需要使用作者写的一个补丁实现
diff --git a/example/send_receive.c b/example/send_receive.c
index 27f28aa..9784a34 100644
--- a/example/send_receive.c
+++ b/example/send_receive.c
@@ -17,6 +17,7 @@
#include <stdbool.h>
#include <stdio.h>
+#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <ws.h>
@@ -87,6 +88,12 @@ void onmessage(int fd, const unsigned char *msg, uint64_t size, int type)
#endif
free(cli);
+ if (!strcmp((char*)msg, "exit"))
+ {
+ ws_stop(fd);
+ return;
+ }
+
/**
* Mimicks the same frame type received and re-send it again
*
diff --git a/include/ws.h b/include/ws.h
index 5ef66b8..e862e48 100644
--- a/include/ws.h
+++ b/include/ws.h
@@ -261,6 +261,7 @@
extern int ws_get_state(int fd);
extern int ws_close_client(int fd);
extern int ws_socket(struct ws_events *evs, uint16_t port, int thread_loop);
+ extern void ws_stop(int fd);
#ifdef AFL_FUZZ
extern int ws_file(struct ws_events *evs, const char *file);
diff --git a/src/ws.c b/src/ws.c
index 3c0e030..7e19393 100644
--- a/src/ws.c
+++ b/src/ws.c
@@ -114,6 +114,9 @@ struct ws_connection
*/
struct ws_connection client_socks[MAX_CLIENTS];
+int should_close = 0;
+struct ws_accept *accept_data;
+
/**
* @brief WebSocket frame data
*/
@@ -1367,6 +1370,13 @@ closed:
return (vsock);
}
+void ws_stop(int fd)
+{
+ should_close = 1;
+ close_socket(fd);
+ close_socket(accept_data->sock);
+}
+
/**
* @brief Main loop that keeps accepting new connections.
*
@@ -1381,7 +1391,6 @@ closed:
*/
static void *ws_accept(void *data)
{
- struct ws_accept *accept_data; /* Accept thread data. */
struct sockaddr_in client; /* Client. */
pthread_t client_thread; /* Client thread. */
int connection_index; /* Free connection slot. */
@@ -1390,17 +1399,19 @@ static void *ws_accept(void *data)
int i; /* Loop index. */
connection_index = 0;
- accept_data = data;
len = sizeof(struct sockaddr_in);
- while (1)
+ while (!should_close)
{
/* Accept. */
new_sock =
accept(accept_data->sock, (struct sockaddr *)&client, (socklen_t *)&len);
if (new_sock < 0)
- panic("Error on accepting connections..");
+ {
+ fprintf(stderr, "Error on accepting connections..");
+ goto out;
+ }
/* Adds client socket to socks list. */
pthread_mutex_lock(&mutex);
@@ -1435,6 +1446,7 @@ static void *ws_accept(void *data)
else
close_socket(new_sock);
}
+out:
free(data);
return (data);
}
@@ -1459,7 +1471,6 @@ static void *ws_accept(void *data)
*/
int ws_socket(struct ws_events *evs, uint16_t port, int thread_loop)
{
- struct ws_accept *accept_data; /* Accept thread data. */
struct sockaddr_in server; /* Server. */
pthread_t accept_thread; /* Accept thread. */
int reuse; /* Socket option. */命名为niceclose.patch
使用时
# Clone a fresh wsServer
$ git clone https://github.com/Theldus/wsServer.git
$ cd wsServer/
# Download the patch
$ wget https://git.io/JE7ms -O niceclose.patch
# Apply it
$ git apply niceclose.patch
# Build everything and run
$ make
$ ./example/send_receive此补丁只能在只有一个浏览器客户端时使用。不要在生产环境上使用。
评论已关闭