Files
GUI/src/device/libdms_mq.cpp

304 lines
8.2 KiB
C++
Raw Normal View History

2023-08-21 14:22:41 +08:00
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <stdarg.h>
#include <sys/msg.h>
2023-09-04 13:35:24 +08:00
#include <errno.h>
2023-08-21 14:22:41 +08:00
#include "dms_mq.h"
2023-09-04 13:35:24 +08:00
2023-08-21 14:22:41 +08:00
enum{
USRV_NONE = 0,
USRV_SCAN, //扫查服务
USRV_XFR, //数据传输服务
USRV_INFOCFG, //信息与配置服务
USRV_CONTROL, //杂类控制和调试服务
USRV_FIRMWARE, //设备升级
USRV_LOGALARM, //日志和报警服务
USRV_HEARTBEAT, //心跳服务
USRV_NET, //网络服务,设备端专用
};
//心跳服务
enum{
ACT_HB_NONE = 0,
ACT_HB_BEAT, //心跳包
};
#define MSG_S2C ( 202301 )
#define MSG_C2S ( 202302 )
int msgid_s2c = -1;
int msgid_c2s = -1;
#define MAX_BUFSIZE ( 4096 )
#define DBG_HEAD ( 0xDB )
#define DBG_TAIL ( 0xA1 )
#pragma pack( 1 )
typedef struct{
uint8_t head; // 0xDB
uint8_t srvid; //服务ID
uint8_t actid; //动作ID
uint16_t len; //数据长度
uint8_t tail; //数据尾固定0xA1
uint8_t data[ 0 ];
}prot_dbg_t;
#pragma pack( 0 )
typedef struct{
2023-09-04 13:35:24 +08:00
long type;
2023-08-21 14:22:41 +08:00
uint8_t data[ 0 ];
}msgbuf_t;
pthread_t pid = 0;
void *fn_heart( void *arg )
{
uint32_t cnt = 0;
for( ; ; ){
cnt++;
dmsmq_send( USRV_HEARTBEAT, ACT_HB_BEAT, ( uint8_t* )( &cnt ), sizeof( cnt ) );
// printf( "GUI Run cnt = %d\r", cnt );
// fflush( stdout );
sleep( 1 );
}
}
//初始化DMS的MQ序列
int dmsmq_init( void )
{
msgid_c2s = msgget( ( key_t)MSG_C2S, IPC_CREAT | 0600 );
if( msgid_c2s == -1 ){
perror( "Msgid C2S create error" );
return MQERR_NOMSGID;
}
printf( "MSGID C2S = %d\n", msgid_c2s );
msgid_s2c = msgget( ( key_t)MSG_S2C, IPC_CREAT | 0600 );
if( msgid_s2c == -1 ){
perror( "Msgid S2C create error" );
return MQERR_NOMSGID;
}
printf( "MSGID S2C = %d\n", msgid_s2c );
//FIXME 如果有阻塞内容先清理阻塞再次创建msgid。
struct msqid_ds mds;
int ret;
uint8_t tmpbuf[ 4096 ] = { 0x00 };
msgbuf_t *mbuf = ( msgbuf_t* )tmpbuf;
printf( "Start to clean queue.\n" );
int msgid[ 2 ];
msgid[ 0 ] = msgid_s2c;
msgid[ 1 ] = msgid_c2s;
for( int idx = 0; idx < 2; idx++ ){
printf( "Clean msgid %d\n", msgid[ idx ] );
for( ; ; ){
if( msgctl( msgid[ idx ], MSG_STAT, &mds ) < 0 ){
perror( "Get mq info failed..." );
return MQERR_MSGCTL_FAILED;
}
//判断长度并清除
if( mds.msg_qnum > 0 ){
ret = msgrcv( msgid[ idx ], mbuf, MAX_BUFSIZE, 0, 0 );
if( ret == -1 ){
perror( "MQ Recv Error" );
return MQERR_LOSTMSG;
}
}else{
break;
}
}
}
printf( "Start to create heartbeat thread.\n" );
if( pid == 0 ){
if( pthread_create( &pid, NULL, fn_heart, NULL ) < 0 ){
printf( "Thread create failed...\n" );
return MQERR_THDERR;
}
printf( "Thread create success...\n" );
}
return 0;
}
char hexbuf[4096];
int hexdump(uint8_t* data, int len, const char* str, ...)
{
va_list args;
int idx, xlen;
if ((str == NULL) || (strlen(str) == 0))
return -1;
va_start(args, str);
xlen = (uint32_t)vsprintf((char*)hexbuf, str, args);
va_end(args);
for (idx = 0; idx < len; idx++)
{
xlen += sprintf(hexbuf + xlen, "%02X ", data[idx]);
if (xlen > (int)(sizeof(hexbuf) * 3 / 4))
{
hexbuf[xlen] = 0x00;
printf("%s", hexbuf);
xlen = 0;;
}
}
xlen += sprintf(hexbuf + xlen, "\r\n");
hexbuf[xlen] = 0x00;
printf("%s", hexbuf);
return 0;
}
//接收DMS的MQ序列阻塞方式。
// srvid 服务ID
// actid 动作ID
// data 数据指针
// 返回值 >= 0 数据长度, < 0 异常信息
int dmsmq_recv( int *srvid, int *actid, uint8_t *data )
{
int ret;
uint8_t tmpbuf[ 4096 ] = { 0x00 };
msgbuf_t *mbuf = ( msgbuf_t* )tmpbuf;
ret = msgrcv( msgid_s2c, mbuf, MAX_BUFSIZE, 0, 0 );
if( ret == -1 ){
printf( "msgrecv error! msgid_s2c = %d\n", msgid_s2c );
perror( "MSGRECV : " );
msgid_s2c = msgget( ( key_t)MSG_S2C, IPC_CREAT | 0600 );
if( msgid_s2c == -1 ){
perror( "Msgid C2S create error" );
return MQERR_NOMSGID;
}
printf( "MSGID S2C = %d\n", msgid_s2c );
return MQERR_LOSTMSG;
}
// hexdump( mbuf->data, ret, "mbuf->data hexdump: " );
if( ret < sizeof( prot_dbg_t ) )
return MQERR_PACKERR;
prot_dbg_t *prot = ( prot_dbg_t* )( mbuf->data );
int head = prot->head;
int tail = ( mbuf->data )[ ret - 1 ];
if( head != DBG_HEAD || tail != DBG_TAIL )
return MQERR_PACKERR;
*srvid = prot->srvid;
*actid = prot->actid;
if( prot->len > 0 )
memcpy( data, mbuf->data + 5, prot->len );
return prot->len;
}
// 发送DMS的MQ序列立即返回。
// srvid 服务ID
// actid 动作ID
// data 发送数据指针
// len 发送数据长度
// 返回值 0 成功,< 0 异常信息
2023-09-04 13:35:24 +08:00
2023-08-21 14:22:41 +08:00
int dmsmq_send( int srvid, int actid, uint8_t *data, int len )
{
uint8_t buf[ 4096 ] = { 0x00 };
int dlen = 0;
msgbuf_t *mbuf = ( msgbuf_t* )buf;
mbuf->type = srvid;
mbuf->data[ 0 ] = DBG_HEAD;
mbuf->data[ 1 ] = srvid;
mbuf->data[ 2 ] = actid;
*( uint16_t* )( mbuf->data + 3 ) = len;
if( len > 0 )
memcpy( mbuf->data + 5, data, len );
( mbuf->data )[ len + 5 ] = DBG_TAIL;
dlen = 5 + len + 1;
2023-09-04 13:35:24 +08:00
2023-08-21 14:22:41 +08:00
struct msqid_ds mds;
if( msgctl( msgid_c2s, MSG_STAT, &mds ) < 0 ){
perror( "Get mq info failed..." );
return MQERR_MSGCTL_FAILED;
}
if( mds.msg_qnum > 2 ){
2023-09-04 13:35:24 +08:00
printf( "MQ Blocked!\n" );
2023-08-21 14:22:41 +08:00
return MQERR_BLOCKED;
}
2023-09-04 13:35:24 +08:00
int trytime = 1;
2023-08-21 14:22:41 +08:00
while( trytime-- >= 0 ){
2023-09-04 13:35:24 +08:00
// printf( "mbuf type : %d, data : %s, dlen = %d\n", mbuf->type, mbuf->data + 5, dlen );
// printf( "mbuf type : %d, dlen = %d [ %d ]\n", mbuf->type, dlen, len );
// hexdump( ( uint8_t* )mbuf, dlen + 4, "msgsnd:" );
2023-08-21 14:22:41 +08:00
if( msgsnd( msgid_c2s, mbuf, dlen, 0 ) == -1 ){
2023-09-04 13:35:24 +08:00
// printf( "[%d / %d ]mbuf type : %d, data : %s, dlen = %d\n", trytime, msgid_c2s, mbuf->type, mbuf->data + 5, dlen );
// printf( "Error no : %d ", errno );
2023-08-21 14:22:41 +08:00
perror( "msgsnd error!" );
msgid_c2s = msgget( ( key_t)MSG_S2C, IPC_CREAT | 0600 );
if( msgid_c2s < 0 ){
perror( "Msgid S2C create error" );
usleep( 10 * 1000 );
continue;
}
2023-09-04 13:35:24 +08:00
printf( "trytime = %d\n", trytime );
continue;
2023-08-21 14:22:41 +08:00
}
break;
}
if( trytime < 0 ){
return MQERR_DISCONNECT;
}
2023-09-04 13:35:24 +08:00
2023-08-21 14:22:41 +08:00
return 0;
}
2023-09-04 13:35:24 +08:00
int dmsmq_sendx( int srvid, int actid, uint8_t *data, int len )
{
uint8_t buf[ 4096 ] = { 0x00 };
int dlen = 0;
msgbuf_t *mbuf = ( msgbuf_t* )buf;
mbuf->type = srvid;
mbuf->data[ 0 ] = DBG_HEAD;
mbuf->data[ 1 ] = srvid;
mbuf->data[ 2 ] = actid;
*( uint16_t* )( mbuf->data + 3 ) = len;
// mbuf->data[ 3 ] = 127;//len % 0x100;
// mbuf->data[ 4 ] = len / 0x100;
if( len > 0 )
memcpy( mbuf->data + 5, data, len );
( mbuf->data )[ len + 5 ] = DBG_TAIL;
dlen = 5 + len + 1;
struct msqid_ds mds;
if( msgctl( msgid_c2s, MSG_STAT, &mds ) < 0 ){
perror( "Get mq info failed..." );
return MQERR_MSGCTL_FAILED;
}
if( mds.msg_qnum > 2 ){
printf( "MQ Blocked!\n" );
return MQERR_BLOCKED;
}
hexdump( ( uint8_t* )mbuf, dlen + 4 + 4, "xmsgsnd:" );
if( msgsnd( msgid_c2s, mbuf, dlen, 0 ) == -1 ){
perror( "dmsmq_sendx: msgsnd error!" );
}else{
printf( "xdmsmq_sendx send msg %d success!\n", len );
}
return 0;
}