多线程和条件变量的使用
本程序的作用是,创建两个线程,线程A负责产生数据并存入数据库,线程B创建一个Unix套接字服务端并监听。
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <unistd.h>
#include <mysql.h>
#include <errno.h>
#include <pthread.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <stddef.h>
#include <ctype.h>
#define TRUE 1
#define FALSE 0
#define MAX_STRING 128
// Unix Socket文件路径
char *socket_path = "server.socket";
// 条件变量和互斥锁
pthread_cond_t pready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t plock = PTHREAD_MUTEX_INITIALIZER;
// 基本结构体
struct varData{
char name[5];
int value;
int status;
};
// 定义全局变量
struct varData p[2];
// #########################################################
// 信息提示
void error(char *msg)
{
fprintf(stderr, "%s: %s\n", msg, strerror(errno));
exit(1);
}
void info(char *msg)
{
fprintf(stdout,"%s\n",msg);
}
void finish_with_error(MYSQL *con)
{
fprintf(stderr, "%s\n", mysql_error(con));
mysql_close(con);
exit(1);
}
// #########################################################
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
// socket发送数据
int sendall(int s, char *buf, int *len)
{
int total = 0;
int bytesleft = *len;
int n;
while(total < *len) {
n = send(s, buf+total, bytesleft, 0);
if (n == -1) { break; }
total += n;
bytesleft -= n;
}
*len = total;
return n==-1?-1:0;
}
#pragma GCC diagnostic pop
// 生成数据
void collectData()
{
int temperature,humidity;
srand((unsigned)time(NULL)); // 根据时间来播种随机数种子
// 生成数据
temperature = rand()%40+10; // 生成10~50的随机数 当做温度
humidity = rand()%70+10; // 生成10~80的随机数当做湿度
pthread_mutex_lock(&plock);
// 字符串赋值到变量使用 strcpy 函数
strcpy(p[0].name, "temp");
p[0].value = temperature;
p[0].status = 1;
strcpy(p[1].name, "humd");
p[1].value = humidity;
p[1].status = 1;
pthread_mutex_unlock(&plock);
}
// 启动MySQL建立连接
MYSQL* startMysql()
{
MYSQL *con = mysql_init(NULL);
if (con == NULL)
{
fprintf(stderr, "%s\n", mysql_error(con));
exit(1);
}
if (mysql_real_connect(con, "localhost", "root", "root#admin","test", 0, NULL, 0) == NULL)
{
finish_with_error(con);
}
return con;
}
// 线程A
// 生成数据并存入数据库
void * processData()
{
MYSQL * con = startMysql();
for(int i=0;i<100;i++)
{
collectData();
pthread_cond_signal(&pready);
char query[MAX_STRING] = {0};
snprintf(query,MAX_STRING,"INSERT INTO humiture (temperature,humidity) VALUES (%d,%d)", p[0].value, p[1].value);
if (mysql_query(con, query))
{
finish_with_error(con);
}
sleep(2);
printf("i is: %d\n",i);
}
mysql_close(con);
mysql_library_end();
return NULL;
}
// 线程B
// 创建Unix套接字
void * someSocket()
{
struct sockaddr_un serun,cliun;
socklen_t cliun_len;
int sockfd,connfd,size;
sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sockfd == -1) {
error("Socket creation failed...");
}
else
info("Socket successfully created...");
memset(&serun,0,sizeof(serun));
serun.sun_family = AF_UNIX;
strcpy(serun.sun_path,socket_path);
size = offsetof(struct sockaddr_un,sun_path) + strlen(serun.sun_path);
unlink(socket_path);
if(bind(sockfd,(struct sockaddr *)&serun,size)!=0)
{
error("Socket bind failed...");
}
else
{
info("Socket successfully binded...");
}
if(listen(sockfd,5)!=0)
{
error("Listen failed...");
}
else
{
info("Server listening...");
}
while(TRUE)
{
cliun_len = sizeof(cliun);
connfd = accept(sockfd, (struct sockaddr *)&cliun, &cliun_len);
if (connfd < 0) {
error("Server acccept failed...");
}
else
{
info("Server acccept the client...");
}
// 定义发送内容大小
int needSend = sizeof(p);
char *buffer = malloc(needSend);
for(;;)
{
char recv_data[5];
recv(connfd,recv_data,sizeof(recv_data),0);
if((strncmp(recv_data, "exit", 5)) == 0)
{
printf("get the message from client: %s\n",recv_data);
shutdown(connfd,SHUT_RDWR);
break;
}
if((strncmp(recv_data, "play", 5)) == 0)
{
printf("get the message from client: %s\n",recv_data);
for(int i=0;i<10;i++)
{
// 等待条件变量信号
pthread_mutex_lock(&plock);
pthread_cond_wait(&pready,&plock);
memcpy(buffer,&p,needSend);
pthread_mutex_unlock(&plock);
sendall(connfd,buffer,&needSend);
}
}
}
free(buffer);
close(connfd);
break;
}
close(sockfd);
return NULL;
}
int main(void)
{
pthread_t t0,t1;
if(pthread_create(&t0, NULL,processData,NULL)==-1)
{
error("Can't create thread processData");
}
if(pthread_create(&t1,NULL,someSocket,NULL)==-1)
{
error("Can't create thread someSocket");
}
void *reslut;
if(pthread_join(t0,&reslut)==-1)
{
error("Can't reclaim thread t0");
}
if(pthread_join(t1,&reslut)==-1)
{
error("Can't reclaim thread t1");
}
//销毁条件变量
pthread_cond_destroy(&pready);
return 0;
}编译
gcc -g -std=c11 -O -Wall -Wextra -Werror collect_data_unix_socket.c -o collect_data_unix_socket `mysql_config --cflags --libs` -pthread-g 用于开启Valgrind测试-std=c11使用C11标准编译-O -Wall -Wextra -Werror 将编译时的告警都变成错误提示ysql_config --cflags --libs 使用mysql库-pthread 使用多线程pthread库
评论已关闭