irpas技术客

【MQTT】使用MQTT上报温度阿里云_夜里生花_mqtt 数据上报

网络 4863

MQTT上报温度到阿里云 前言iniparser配置文件cJSONsqlite3数据库流程图配置信息发布端代码实现运行结果

前言

在上几篇文章中我们用MQTT.fx模拟客户端实现了与阿里云物联网平台的双向通信,接下来我们自己动手编程使用mosquitto库实现一个发布端。

iniparser配置文件

iniparser介绍 与阿里云进行通信时,我们需要如下一些信息,例如在使用MQTT.fx时: 阿里云配置和MQTT.fx使用 在上篇文章中阿里云配置文件另外之前发布和订阅的topic也要记下来: 这些都是我们在项目中要用到的配置信息,具体应用在:

#客户端id clientid: struct mosquitto *mosquitto_new( const char * id, bool clean_session, void * obj ) #连接阿里云的端口,主机名 brokeraddress and brokerport: int mosquitto_connect( struct mosquitto * mosq, const char * host, int port, int keepalive ) #连接阿里云账号和密码 username and password: int mosquitto_username_pw_set(struct mosquitto * mosq, const char *username, const char *password ) #订阅的主题 topic: int mosquitto_publish( struct mosquitto * mosq, int * mid, const char * topic, int payloadlen, const void * payload, int qos, bool retain )

当然可以直接写死在代码里,但是这样一来,代码复用性不高,换一个设备那么就要改一次源码来更改设备配置信息。MQTT主机使用可以看这篇博客.

所以我们写一个.ini文件,这样当我们更换设备的时候只需要修改这个.ini文件中的信息即可。然后在代码中使用iniparser库中API。

cJSON

JSON(JavaScript Object Notation, JS 对象简谱) 是一种轻量级的数据交换格式。它基于 ECMAScript (欧洲计算机协会制定的js规范)的一个子集,采用完全独立于编程语言的文本格式来存储和表示数据。简洁和清晰的层次结构使得 JSON 成为理想的数据交换语言。 易于人阅读和编写,同时也易于机器解析和生成,并有效地提升网络传输效率。

阿里云上payload must be json format 负载(消息)必须为cJSON格式。 因为使用的阿里云提供的标准物模型,所以消息格式应该遵循阿里云的规范。这就是我们要使用cJSON的原因。

要是已其他格式发送,阿里云收到了消息,但是因为参数不匹配,无法对消息进行解析,也就无法获取到温度消息 学习cJSON可以参考博客

sqlite3数据库

SQLite是一个进程内的库,实现了自给自足的、无服务器的、零配置的、事务性的 SQL 数据库引擎。它是一个零配置的数据库,这意味着与其他数据库不一样,您不需要在系统中配置。 就像其他数据库,SQLite 引擎不是一个独立的进程,可以按应用程序需求进行静态或动态连接。SQLite 直接访问其存储文件。

sqlite3数据库优点

1.不需要一个单独的服务器进程或操作的系统(无服务器的)。 2.SQLite 不需要配置,这意味着不需要安装或管理。 3. 一个完整的 SQLite数据库是存储在一个单一的跨平台的磁盘文件。 4. SQLite 是非常小的,是轻量级的,完全配置时小于 400KiB,省略可选功能配置时小于250KiB。 5.SQLite 是自给自足的,这意味着不需要任何外部的依赖。 6 SQLite 事务是完全兼容ACID 的,允许从多个进程或线程安全访问。 7.SQLite 支持 SQL92(SQL2)标准的大多数查询语言的功能。 8. SQLite 使用ANSI-C 编写的,并提供了简单和易于使用的 API。 9. SQLite 可在 UNIX(Linux, Mac OS-X, Android,iOS)和 Windows(Win32, WinCE, WinRT)中运行。

sqlite3库函数

流程图

配置信息 #include "iniparser.h" #include "mqtt_conf.h" #include <stdio.h> #include <string.h> /* 载入设置参数 */ int get_config(char *path,mqtt_user_data *mqtt,int mode) { dictionary *ini = NULL; const char *brokeraddress; int brokerport; const char *username; const char *password; const char *clientid; const char *topic; int Qos; const char *method; const char *time; const char *jsonid; const char *identifier; const char *version; if(!path || !mqtt) { printf("invalid_argument: %s\n",__FUNCTION__); return -1; } ini = iniparser_load(path); if(!ini) { printf("iniparser_load failed!\n"); return -1; } brokeraddress = iniparser_getstring(ini,"address:host",DEFAULT_BROKER_ADDRESS); brokerport= iniparser_getint(ini,"address:port",DEFAULT_BROKER_PORT); username = iniparser_getstring(ini,"user_name_pwd:username",DEFAULT_USERNAME); password = iniparser_getstring(ini,"user_name_pwd:pwd",DEFAULT_PASSWORD); clientid = iniparser_getstring(ini,"clientid:id",DEFAULT_CLIENTID); identifier = iniparser_getstring(ini,"ali_json:identifier",DEFAULT_JSONID); if(mode == SUB) { topic = iniparser_getstring(ini,"sub_topic:topic",DEFAULT_SUB_TOPIC); } else if(mode == PUB) { topic = iniparser_getstring(ini,"pub_topic:topic",DEFAULT_PUB_TOPIC); method = iniparser_getstring(ini,"ali_json:method",DEFAULT_METHOD); //time = iniparser_getstring(ini,"ali_time:time",DEFAULT_TIME); //jsonid = iniparser_getstring(ini,"ali_json:id",DEFAULT_JSONID); version = iniparser_getstring(ini,"ali_json:version",DEFAULT_VERSION); } else if(mode != SUB && mode != PUB) { printf("invalid_argument : %s mode!\n",__FUNCTION__); return -1; } strncpy(mqtt->brokeraddress,brokeraddress,SIZE); mqtt->brokerport = brokerport; mqtt->Qos = Qos; strncpy(mqtt->username,username,SIZE); strncpy(mqtt->password,password,SIZE); strncpy(mqtt->clientid,clientid,SIZE); strncpy(mqtt->topic,topic,SIZE); strncpy(mqtt->identifier,identifier,SIZE); if(mode == PUB) { strncpy(mqtt->method,method,SIZE); //strncpy(mqtt->time,time,SIZE); //strncpy(mqtt->jsonid,jsonid,SIZE); strncpy(mqtt->version,version,SIZE); } iniparser_freedict(ini); return 0; } #ifndef MQTT_CONF_H #define MQTT_CONF_H #define SIZE 1024 #define KEEP_ALIVE 60 enum{ SUB , PUB }; typedef struct mqtt_user_data { char brokeraddress[SIZE] ; int brokerport; char username[SIZE]; char password[SIZE]; char clientid[SIZE]; char topic[SIZE]; int Qos; char method[SIZE] ; char time[SIZE]; char jsonid[SIZE] ; char identifier[SIZE] ; char version[SIZE] ; }mqtt_user_data; int set_config(char *path,char *host,int port, char *clientid,char *uname,char *pwd,char *topic); int get_config(char *path, mqtt_user_data *mqtt, int mode); #endif [address] host = iot-06z00i99uliom9a.mqtt.iothub.aliyuncs.com port = 1883 [user_name_pwd] username = ds18b20&gr71iZ9S48c pwd = 2a59fde4b9765b403df5bfe7ce63b06cfb79e7d57cb373a0dfa501f79623cc83 [clientid] id = gr71iZ9S48c.ds18b20|securemode=2,signmethod=hmacsha256,timestamp=2524608000000| [pub_topic] topic = /sys/gr71iZ9S48c/ds18b20/thing/event/property/post [sub_topic] topic = /sys/gr71iZ9S48c/ds18b20/thing/service/property/set [ali_json] method = thing.service.property.set id = ip identifier = CurrentTemperature version = 1.0.0 [KEEP_ALIVE] alive = 60 [ali_Qos] Qos = 0 发布端代码实现 #include <stdio.h> #include <errno.h> #include <unistd.h> #include <libgen.h> #include <getopt.h> #include <string.h> #include <mosquitto.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <dirent.h> #include <signal.h> #include <time.h> #include "mqtt_conf.h" #include "cJSON.h" #include "dictionary.h" #include "iniparser.h" #include "ifconfig.h" #include "db_sqlite3.h" #define PROG_VERSION "1.0.0" #define INI_PATH "./mqtt_aliyun.ini" static int g_stop = 0; /* 调用信号 */ void sig_handler(int sig_num) { if(sig_num == SIGUSR1) g_stop = 1; } int db_init(sqlite3 *db,char *zErrMsg); int db_table(char *zErrMsg, sqlite3 *db); int db_store(sqlite3 *db,char *zErrMsg); int get_temperature(float *temper); int get_time (char *datetime,int len); static inline void print_usage(char *progname); char *create_json(void *obj); void connect_callback(struct mosquitto *mosq,void *obj,int rc); int main (int argc, char **argv) { char *progname = basename(argv[0]); int brokerport = 1883; char *brokeraddress = NULL; char *clientid = NULL; char *username = NULL; char *password = NULL; char *topic = NULL; struct mosquitto *mosq = NULL; int daemon_run = 0; int opt = -1; int rv; int log_fd; struct mqtt_user_data mqtt; struct option options[] = { {"daemon",no_argument,NULL,'d'}, {"topic", required_argument,NULL,'t'}, {"brokeraddress", required_argument,NULL,'H'}, {"clientid", required_argument, NULL, 'i'}, {"brokerport",required_argument,NULL,'p'}, {"help",no_argument,NULL,'h'}, {"username",required_argument,NULL,'u'}, {"password",required_argument,NULL,'P'}, {NULL,0,NULL,0} }; while((opt = getopt_long(argc,argv,"dhp:t:i:u:P:H:",options,NULL)) != -1) { switch(opt) { case 't': topic = optarg; break; case 'i': clientid = optarg; break; case 'H': brokeraddress = optarg; break; case 'u': username = optarg; break; case 'P': password = optarg; break; case 'd': daemon_run = 1; break; case 'p': brokerport = atoi(optarg); break; case 'h': print_usage(argv[0]); return 0; default: break; } } /* 创建日志 */ if(daemon_run) { printf("program %s running in backgrund\n", progname); if( (log_fd = open("client.log", O_CREAT|O_RDWR, 0666)) < 0) { printf("open() failed:%s\n", strerror(errno)) ; return -2; } dup2(log_fd, STDOUT_FILENO); dup2(log_fd, STDERR_FILENO); daemon(1,1); } /* 安装信号 */ signal(SIGUSR1,sig_handler); /* 载入配置文件 */ memset(&mqtt,0,sizeof(mqtt)); rv = get_config(INI_PATH,&mqtt,PUB); /* MQTT 初始化 */ mosquitto_lib_init(); /* 创建MQTT 客户端 */ mosq = mosquitto_new(mqtt.clientid,true,(void *)&mqtt); if(!mosq) { printf("mosquitto_new() failed: %s\n",strerror(errno)); goto cleanup; return -1; } /* 设置账号密码 */ if(mosquitto_username_pw_set(mosq,mqtt.username,mqtt.password) != MOSQ_ERR_SUCCESS) { printf("mosquitto_username_pw_set failed: %s\n",strerror(errno)); goto cleanup; } printf("Create mosquitto successfully!\n"); /* 回调函数 */ mosquitto_connect_callback_set(mosq,connect_callback); while(!g_stop) { /* 连接MQTT服务器,ip,端口,时间 */ if(mosquitto_connect(mosq, mqtt.brokeraddress,mqtt.brokerport,KEEP_ALIVE) != MOSQ_ERR_SUCCESS) { printf("mosquitto_connect() failed: %s\n",strerror(errno)); goto cleanup; } printf("connect successfully\n"); /*无阻塞 断线连接 */ mosquitto_loop_forever(mosq,-1,1); sleep(3); } /* 释放客户端,清除 */ cleanup: close(log_fd); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return 0; } /* 帮助信息 */ void print_usage(char *progname) { printf("%s usage:\n",progname); printf("Example: %s -h ${brokeraddress} -p ${brokerport} -i ${clientid} -u ${username} -p ${password} -t${topic} -h ${help} -d ${daemon}\n",progname); printf("-h(--host): sepcify brokeraddress.\n"); printf("-p(--port): sepcify brokerport.\n"); printf("-h(--Help): print this help information.\n"); printf("-d(--daemon): set program running on background.\n"); printf("-i(--clientid): sepcify the clientid.\n"); printf("-u(--username): sepcify username of the client.\n"); printf("-p(--password): sepcify password of the username.\n"); printf("-t(--topic): sepcify topic of the client.\n"); printf("-d(--daemon): running in backgrund.\n"); } /* cJSON打包数据 */ void connect_callback(struct mosquitto *mosq,void *obj,int rc) { char ipaddr[16]; char *interface="eth0"; char datetime[64]; cJSON *root; cJSON *item; char *msg; struct mqtt_user_data *mqtt; char buf[2048]; printf("Connection successful cJSON call packaging\n"); float temper = 0.00000000; if(get_temperature(&temper) < 0) { printf("get_temperature failed.\n"); return; } if(get_time(datetime,sizeof(datetime))<0) { printf("get_time failure\n"); return ; } memset(ipaddr,0,sizeof(ipaddr)); if(get_ipaddr(interface,ipaddr,sizeof(ipaddr))<0) { printf("ERROR:get ip address failure\n"); return ; } snprintf(buf,sizeof(buf),"%s/%s/%f",ipaddr,datetime,temper); sqlite_tem(buf); printf("Data transfer database successfully!\n"); printf("\n"); root = cJSON_CreateObject(); item = cJSON_CreateObject(); if(!obj) { printf("invalid_argument in %s\n",__FUNCTION__); return; } mqtt = (mqtt_user_data *)obj; cJSON_AddItemToObject(root,"method",cJSON_CreateString(mqtt->method)); cJSON_AddItemToObject(root,"id",cJSON_CreateString(ipaddr)); cJSON_AddItemToObject(root,"time",cJSON_CreateString(datetime)); cJSON_AddItemToObject(root,"params",item); cJSON_AddItemToObject(item,"CurrentTemperature",cJSON_CreateNumber(temper)); cJSON_AddItemToObject(root,"version",cJSON_CreateString(mqtt->version)); msg = cJSON_Print(root); printf("%s\n",msg); if(!rc) { if(mosquitto_publish(mosq,NULL,mqtt->topic,strlen(msg),msg,mqtt->Qos,NULL) != MOSQ_ERR_SUCCESS) { printf("mosquitto_publish failed: %s\n",strerror(errno)); return; } } mosquitto_disconnect(mosq); } /* 获取温度 */ int get_temperature(float *temper) { char w1_path[64] = "/sys/bus/w1/devices/"; char chip[32]; char buf[128]; DIR *dirp; struct dirent *direntp; int fd = -1; char *ptr; int found = 0; if( !temper ) { return -1; } if((dirp = opendir(w1_path)) == NULL) { printf("Opendir '%s' error: %s\n", w1_path, strerror(errno)); return -2; } while((direntp = readdir(dirp)) != NULL) { if(strstr(direntp->d_name,"28-")) { strncpy(chip, direntp->d_name, sizeof(chip)); found = 1; break; } } closedir(dirp); if( !found ) { printf("Can not find ds18b20 in '%s'\n", w1_path); return -3; } strncat(w1_path, chip, sizeof(w1_path)-strlen(w1_path)); strncat(w1_path, "/w1_slave", sizeof(w1_path)-strlen(w1_path)); if( (fd=open(w1_path, O_RDONLY)) < 0 ) { printf("open %s error: %s\n", w1_path, strerror(errno)); return -4; } if(read(fd, buf, sizeof(buf)) < 0) { printf("read %s error: %s\n", w1_path, strerror(errno)); return -5; } ptr = strstr(buf, "t="); if( !ptr ) { printf("ERROR: Can not get temperature\n"); return -6; } *temper = atof(ptr+=2)/1000; close(fd); return 0; } /* 获取时间 */ int get_time(char *datime,int len) { time_t t; struct tm *p; time(&t); p = gmtime(&t); snprintf(datime, 32, "%d-%d-%d %d:%d:%d", 1900+p->tm_year,1+p->tm_mon, p->tm_mday, (p->tm_hour + 8), p->tm_min, p->tm_sec); return 0 ; } 运行结果

可以看到温度在实时更新(我设置睡眠时间短,所有没有显示,温度变化).

项目源码地址


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #MQTT #数据上报 # #iniparser配置文件