上个版本: 只是用到ctypes进行传输, 这次将python服务端更改为C++服务端,方便后续维护.
本文实现功能: python传输图片给C++, C++接受图片后对图片进行处理,并将结果返回给python客户端, pass image from python to C++
C++ 服务端
.h文件
注意文中的model
// .h
#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
using namespace cv;
class ModelManager;
class ServerManager
{
private:
int m_port;
char *m_addr;
cv::VideoCapture m_cap;
int m_server;
int m_accept; // client conn
public:
bool initialization(const int &port, const cv::VideoCapture &cap, char *addr = nullptr);
bool initialization(const int &port, char *addr = nullptr);
bool build_connect();
bool acceptClient();
void error_print(const char *ptr);
bool free_connect();
bool send_data_frame(ModelManager& model);
bool receive_data_frame(cv::Mat &frame, ModelManager& model);
};
.cpp文件
#include "ServerManager.h"
#include "ModelManager.h"
#define BUFFER_SIZE 65538
void ServerManager::error_print(const char * ptr) {
perror(ptr);
exit(EXIT_FAILURE);
}
bool ServerManager::initialization(const int& port, const cv::VideoCapture& cap, char* addr){
m_port = htons(port);
m_addr = addr;
m_cap = cap;
return true;
}
bool ServerManager::initialization(const int& port, char* addr){
m_port = htons(port);
m_addr = addr;
return true;
}
bool ServerManager::build_connect() {
struct sockaddr_in server_addr;
bzero(&server_addr,sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = m_addr?inet_addr(m_addr):INADDR_ANY;
server_addr.sin_port = m_port;
// create socket
m_server = socket(AF_INET, SOCK_STREAM, 0);
if(m_server decode;
// int bytes_received = 0;
// do
// {
// int nBytes = recv(m_accept, buf, data_size - bytes_received, 0);
// for (int i = 0; i decode(data_size, 0);
int index = 0;
int bytes_received = 0;
int count = data_size;
while (count > 0)// if count >= 0, dead loop
{
int iRet = recv(m_accept, recv_char, count, 0);
if (index >= data_size) index = data_size;
memcpy(&decode[index], recv_char , iRet);
index += iRet;
if (!iRet) { return -1; }
count -= iRet;
}
// decode message
frame = imdecode(decode, cv::IMREAD_COLOR);
// push into Model's queueMat
model.mtxQueueImg.lock();
model.queueMat.push(frame);
model.mtxQueueImg.unlock();
return true;
}
bool ServerManager::free_connect() {
m_cap.release();
close(m_accept);
close(m_server);
return true;
}
C++ model部分代码
.h文件
#pragma once
#include "CV_Classify.h"
#include "CV_Detect.h"
#include "ServerManager.h"
#include
#include
#include
#include // usleep
#include
#include "cJSON.h"
#include
using namespace std;
using namespace cv;
class ModelManager{
public:
Detect objdetect;
Classify objclassify;
std::mutex mtxQueueDet; // mutex for detect queue
std::mutex mtxQueueImg; // mutex for image queue
std::mutex mtxQueueCls; // mutex for classify queue
std::queue queueMat;
std::queue queueDetOut;// Detect queue
std::queue queueClsOut;// Classify queue
bool DetectFlag = true;
bool ClassifyFlag = true;
bool empty_flag = false;
friend class ServerManager;
public:
void initDetectModel() ;
void initClassifyModel() ;
void DetectImg();
void ClassifyImg();
void getClsResult(ObjClassifyOutput &output);
// ObjClassifyOutput getClsResult();
char* createJson();
};
.cpp文件
部分有删减,createJson可参考使用,利用json来传递值
#include "ModelManager.h"
void ModelManager::initDetectModel()
{
std::string config_path = "DetectConfig.yaml";
objdetect.Init(config_path, 1);
}
void ModelManager::initClassifyModel()
{
std::string config_path = "ClassiflyConfig.yaml";
objclassify.Init(config_path, 1);
}
void ModelManager::DetectImg()
{
DetectInput detect_input;
DetectOutput detect_output;
cv::Mat frame;
size_t mm = 0;
while(1)
{
if (queueMat.empty())
{
if(!DetectFlag)
{
break;
}
usleep(2000);
continue;
}
// get image from queueMat
mtxQueueImg.lock();
frame = queueMat.front();
queueMat.pop();
mtxQueueImg.unlock();
// run model
objdetect.Run(detect_input, detect_output);
// push detect result into queueDetOut
mtxQueueDet.lock();
queueDetOut.push(detect_output);
// cout
C++ 服务端运行
#include
#include
#include
#define PORT 8080
void recvServer(ServerManager& s, ModelManager& model){
int idx = 0;
while (true){
// auto start = std::chrono::steady_clock::now();
cv::Mat frame;
s.receive_data_frame(frame, model);
// cal time cost
// auto end = std::chrono::steady_clock::now();
// std::chrono::duration elapsed = end - start;
// std::cout
python客户端
import json
import socket
import struct
import time
from multiprocessing import JoinableQueue
from threading import Thread
import os
from natsort import ns, natsorted
host = '192.168.0.2' # '192.168.0.2' 'localhost'
port = 8080
def img_encode(img_path):
img = cv2.imread(img_path)
# img = cv2.resize(img, (500, 500), interpolation=cv2.INTER_CUBIC)
img_param = [95] # 图片压缩率0-100
_, img = cv2.imencode('.jpg', img, img_param)
img = img.tobytes()
return img
def img_product(img_queue, path, path_mode='image'):
if path_mode == 'image':
image = img_encode(path)
img_queue.put(image)
elif path_mode == 'dir':
dir_list = os.listdir(path)
files = natsorted(dir_list, alg=ns.PATH) # 顺序读取文件名
for filename in files:
img_path = path + '/' + filename
image = img_encode(img_path)
img_queue.put(image)
img_queue.put('E')
img_queue.join()
def server_consumer(img_queue):
while True:
start = int(round(time.time() * 1000))
# 1. get img from queue
img_obj = img_queue.get()
img_queue.task_done()
# get end signal
if img_obj[0] == 'E':
client.close()
break
# 2. send package(img_bytes_size, img_bytes)
pack_size = struct.pack("i", len(img_obj))
client.send(pack_size + img_obj)
end = int(round(time.time() * 1000))
data = client.recv(65536)
json_str = data.decode('utf8', 'ignore').strip(b'x00'.decode())
results = json.loads(json_str)
end = int(round(time.time() * 1000))
end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
print('send and recv cost time: ', (end - start))
print(results)
if __name__ == '__main__':
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
img_dir = 'data'
one_img = './data/image.jpg'
mode = 'dir'
img_jq = JoinableQueue()
producer = Thread(target=img_product, args=(img_jq, img_dir, mode,))
consumer = Thread(target=server_consumer, args=(img_jq,))
producer.daemon = True # set daemon but not set join()
producer.start()
consumer.start()
# producer.join() // 让生产者先关闭,防止close错误
consumer.join()
总结
- 其实这个项目真正做完感觉还是挺简单, 就是对socket通信不太熟悉, 以及传图片没做过.
- 实际上传图片只需要读取图片后,imencode,然后tobytes,最后发送size和data即可.而接受端只需要拼接数组,然后imdecode即可.
- 另外传输结果的话利用json传输可以让结果可读性可高, 传输也比较方便, 当时copy别人的发送代码, 没有细看,导致使用memcpy让json格式乱码,导致无法解码json.
- 如果你感觉接收端没问题,一定要看看发送端.
- 之后的新任务就是视频传输利用rtsp流,敬请期待
参考博客
- use memcpy in receive_frame function if you want,对应github地址
- a pure client and server code to create a simple demo
-
如果用ctrl+z中断,可能导致address in use,使用bg/fg
- 用户可以使用fg/bg操作继续前台或后台的任务,fg命令重新启动前台被中断的任务,bg命令把被中断的任务放在后台执行
- zmq send img
- 优化imdecode速度,本代码未使用
- 视频传输跟图片传输差不多
- Linux c++获取本地毫秒级精确时间
- 如果不想用json,可以试试struct
- 利用json来传输分类结果
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
问题背景: 客户在一套集群上配置了两个服务名,两套业务分别在两个节点进行业务处理 但每当集群发生重启问题,则会导致服务名的漂移,需要手动将业务切换回去 操作: 切换命令如下 srvctl relocate service -d DB_NAME -s SERVI…