本程序的作用是,创建两个线程,线程A负责产生数据并存入数据库,线程B创建一个Unix套接字服务端并监听。

#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <unistd.h>
#include <mysql.h>
#include <errno.h>
#include <pthread.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <stddef.h>
#include <ctype.h>
#define TRUE 1
#define FALSE 0
#define MAX_STRING 128

// Unix Socket文件路径
char *socket_path = "server.socket";

// 条件变量和互斥锁
pthread_cond_t pready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t plock = PTHREAD_MUTEX_INITIALIZER;

// 基本结构体
struct varData{
    char name[5];
    int value;
    int status;
};

// 定义全局变量
struct varData p[2];

// #########################################################
// 信息提示
void error(char *msg)
{
    fprintf(stderr, "%s: %s\n", msg, strerror(errno));
    exit(1);
}

void info(char *msg)
{
    fprintf(stdout,"%s\n",msg);
}

void finish_with_error(MYSQL *con)
{
  fprintf(stderr, "%s\n", mysql_error(con));
  mysql_close(con);
  exit(1);
}
// #########################################################

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
// socket发送数据
int sendall(int s, char *buf, int *len)
{
    int total = 0;
    int bytesleft = *len;
    int n;

    while(total < *len) {
        n = send(s, buf+total, bytesleft, 0);
        if (n == -1) { break; }
        total += n;
        bytesleft -= n;
    }

    *len = total;

    return n==-1?-1:0;
} 
#pragma GCC diagnostic pop

// 生成数据
void collectData()
{
    int temperature,humidity;
    srand((unsigned)time(NULL));    // 根据时间来播种随机数种子
    // 生成数据
    temperature = rand()%40+10;     // 生成10~50的随机数 当做温度
    humidity = rand()%70+10;        // 生成10~80的随机数当做湿度 
    pthread_mutex_lock(&plock);
    // 字符串赋值到变量使用 strcpy 函数
    strcpy(p[0].name, "temp");
    p[0].value = temperature;
    p[0].status = 1;
    strcpy(p[1].name, "humd");
    p[1].value = humidity;
    p[1].status = 1;
    pthread_mutex_unlock(&plock);
}

// 启动MySQL建立连接
MYSQL* startMysql()
{
    MYSQL *con = mysql_init(NULL);

    if (con == NULL)
    {        
        fprintf(stderr, "%s\n", mysql_error(con));
        exit(1);
    }

    if (mysql_real_connect(con, "localhost", "root", "root#admin","test", 0, NULL, 0) == NULL)
    {
        finish_with_error(con);
    }

    return con;
}
// 线程A
// 生成数据并存入数据库
void * processData()
{
    MYSQL * con = startMysql();

    for(int i=0;i<100;i++)
    {        
        collectData();        
        pthread_cond_signal(&pready);
        char query[MAX_STRING] = {0};
        snprintf(query,MAX_STRING,"INSERT INTO humiture (temperature,humidity) VALUES (%d,%d)", p[0].value, p[1].value);

        if (mysql_query(con, query)) 
        {
            finish_with_error(con);
        }
        sleep(2);
        printf("i is: %d\n",i);
    } 

    mysql_close(con); 
    mysql_library_end();
    return NULL;
}

// 线程B
// 创建Unix套接字
void * someSocket()
{
    struct sockaddr_un serun,cliun;
    socklen_t cliun_len;
    int sockfd,connfd,size;

    sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
    if (sockfd == -1) {
        error("Socket creation failed...");
    }
    else
        info("Socket successfully created...");

    memset(&serun,0,sizeof(serun));

    serun.sun_family = AF_UNIX;
    strcpy(serun.sun_path,socket_path);
    size = offsetof(struct sockaddr_un,sun_path) + strlen(serun.sun_path);
    unlink(socket_path);
    if(bind(sockfd,(struct sockaddr *)&serun,size)!=0)
    {
        error("Socket bind failed...");
    }
    else
    {
        info("Socket successfully binded...");
    }

    if(listen(sockfd,5)!=0)
    {
        error("Listen failed...");
    }
    else
    {
        info("Server listening...");
    }

    while(TRUE)
    {
        cliun_len = sizeof(cliun);
        connfd = accept(sockfd, (struct sockaddr *)&cliun, &cliun_len);
        if (connfd < 0) {
            error("Server acccept failed...");
        }
        else
        {
            info("Server acccept the client...");
        }            

        // 定义发送内容大小
        int needSend = sizeof(p);   
        char *buffer = malloc(needSend);

        for(;;)
        {        
            char recv_data[5];
            recv(connfd,recv_data,sizeof(recv_data),0);
            if((strncmp(recv_data, "exit", 5)) == 0)
            {
                printf("get the message from client: %s\n",recv_data);
                shutdown(connfd,SHUT_RDWR);
                break;
            }
            if((strncmp(recv_data, "play", 5)) == 0)
            {
                printf("get the message from client: %s\n",recv_data);
                for(int i=0;i<10;i++)
                {
                    // 等待条件变量信号
                    pthread_mutex_lock(&plock);
                    pthread_cond_wait(&pready,&plock);                    
                    memcpy(buffer,&p,needSend);
                    pthread_mutex_unlock(&plock);
                    sendall(connfd,buffer,&needSend);
                }
            }
        }

        free(buffer);
        close(connfd);        
        break;
    }

    close(sockfd);
    return NULL;
}

int main(void)
{
    pthread_t t0,t1;

    if(pthread_create(&t0, NULL,processData,NULL)==-1)
    {
        error("Can't create thread processData");
    }
    if(pthread_create(&t1,NULL,someSocket,NULL)==-1)
    {
         error("Can't create thread someSocket");
    }

    void *reslut;
    if(pthread_join(t0,&reslut)==-1)
    {
        error("Can't reclaim thread t0");
    }
    if(pthread_join(t1,&reslut)==-1)
    {
        error("Can't reclaim thread t1");
    }
    //销毁条件变量
    pthread_cond_destroy(&pready);
    return 0;
}

编译

gcc -g -std=c11 -O -Wall -Wextra -Werror collect_data_unix_socket.c -o collect_data_unix_socket `mysql_config --cflags --libs` -pthread

-g 用于开启Valgrind测试
-std=c11使用C11标准编译
-O -Wall -Wextra -Werror 将编译时的告警都变成错误提示
ysql_config --cflags --libs 使用mysql
-pthread 使用多线程pthread

标签: none

评论已关闭