在C++很多框架中都有异步事件处理机制,这导致我们在看源码时经常很疑惑,难以理解,而其中包含的编程套路可能是一些成熟的技术,只是我们不熟悉,比如WebRTC中类似于Qt的信号槽机制,线程事件处理, 或者使用系统异步IO等等,如果看不懂这些套路,理解代码会很难,本篇博客来尝使用用C++线程池实现一种异步事件处理机制。
异步事件处理机制的基本实现
C++可以使用std::future和std::promise来实现异步操作。然而,为了实现一个异步事件绑定的框架,我们需要更复杂的设计。下面是一个简单的例子,说明如何实现一个异步事件处理器。
首先,定义一个事件处理器类,该类将接收并处理事件:
class EventHandler {
public:
virtual ~EventHandler() = default;
virtual void handleEvent(int eventID) = 0;
};
然后,我们需要创建一个事件分发器,它将异步地调用事件处理器:
/*
事件注册,分发
*/
#pragma once
#include "EventHandler.hpp"
#include
#include
#include
#include
#include
class EventDispatcher {
public:
// 注册事件处理器
void registerHandler(int eventID, std::shared_ptrEventHandler> handler) {
handlers[eventID] = handler;
}
// 异步事件分发函数
void postEvent(int eventID) {
auto it = handlers.find(eventID);
if (it != handlers.end()) {
std::thread eventThread(&EventDispatcher::dispatchEvent, this, it->second, eventID);
eventThread.detach();
}
}
private:
// 事件分发函数
void dispatchEvent(std::shared_ptrEventHandler> handler, int eventID) {
handler->handleEvent(eventID);
}
private:
std::mapint, std::shared_ptrEventHandler>> handlers; // 存储事件,int 事件id, std::shared_ptr 事件处理器
};
在这个例子中,EventDispatcher类的postEvent方法接收一个事件ID,并在新线程中调用相应的事件处理器。这样做可以实现事件的异步处理。
然后,你可以创建一个或多个处理器类,比如下面的打印事件处理器PrintEventHandler ,它实现EventHandler接口,
/*
具体的事件处理器
*/
#include "EventHandler.hpp"
#include
using namespace std;
class PrintEventHandler : public EventHandler {
public:
void handleEvent(int eventID) override {
std::cout "Handling event " eventID std::endl;
}
};
然后再main函数中进行注册:
/*
C++异步事件框架demo01
*/
#include
#include
#include
#include
#include "EventDispatcher.hpp"
#include "PrintEventHandler.hpp"
int main() {
EventDispatcher dispatcher;
std::shared_ptrEventHandler> printHandler = std::make_sharedPrintEventHandler>();
dispatcher.registerHandler(1, printHandler);
dispatcher.postEvent(1);
// Sleep main thread to let the event thread finish.
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
运行结果:
Handling event 1
代码组织如下,有兴趣的可以自行编写实现:
cmake脚本
#[[
编译方法
cmake -S . -B build
cd build
make
./demo01
]]
cmake_minimum_required(VERSION 3.20)
project(demo01)
set(INCLUDE_PATH1 "./")
# 添加头文件目录
include_directories(
${INCLUDE_PATH1}
)
# 添加子目录src
aux_source_directory("./" SRC)
add_executable(demo01 ${SRC})
这个实现是非常基础的,并没有考虑到线程安全问题和异常处理等等。在实际的项目中,你需要更复杂的设计,并使用更高级的并发编程技术,如线程池、任务队列、互斥锁等等。
添加线程池、任务队列
如果想要更复杂的设计,包括线程池、任务队列、互斥锁等,你可以考虑使用以下的设计。下面的例子使用了C++17的std::async和std::future来实现线程池和任务队列。
首先,我们需要一个线程安全的任务队列:
#pragma once
#include
#include
#include
template typename T>
class ThreadSafeQueue {
public:
ThreadSafeQueue() = default;
ThreadSafeQueue(const ThreadSafeQueueT> &) = delete;
ThreadSafeQueue& operator=(const ThreadSafeQueueT>服务器托管网 &) = delete;
void push(T value) {
std::lock_guardstd::mutex> lock(mMutex);
mQueue.push(std::move(value));
mCondition.notify_one();
}
bool try_pop(T& value) {
std::lock_guardstd::mutex> lock(mMutex);
if (mQueue.empty()) {
return false;
}
value = std::move(mQueue.front());
mQueue.pop();
return true;
}
void wait_and_pop(T& value) {
std::unique_lockstd::mutex> lock(mMutex);
mCondition.wait(lock, [this](){ return !mQueue.empty(); });
value = std::move(mQueue.front());
mQueue.pop();
}
private:
std::queueT> mQueue;
std::mutex mMutex;
std::condition_variable mCondition;
};
然后,我们需要一个线程池来处理这些任务:
#pragma once
#include "ThreadSafeQueue.hpp"
#include
#include
class ThreadPool {
public:
ThreadPool(size_t numThreads) {
start(numThreads);
}
~ThreadPool() {
stop();
}
templatetypename T>
void enqueue(T task) {
mTasks.push(std::make_sharedstd::packaged_taskvoid()>>(std::move(task)));
}
private:
std::vectorstd::thread> mThreads;
ThreadSafeQueuestd::shared_ptrstd::packaged_taskvoid()>>> mTasks;
std::atomicbool> mContinue { true };
void start(size_t numThreads) {
for (auto i = 0u; i numThreads; ++i) {
mThreads.emplace_back([=] {
while (mContinue) {
std::shared_ptrstd::packaged_taskvoid()>> task;
if (mTasks.try_pop(task)) {
(*task)();
} else {
std::this_thread::yield();
}
}
});
}
}
void stop() noexcept {
mContinue = false;
for (auto &thread : mThreads) {
thread.join();
}
}
};
然后,我们可以在EventDispatcher中使用线程池来异步地处理事件:
#pragma once
#include "ThreadPool.hpp"
#include
#include
#include
#include "EventHandler.hpp"
class EventDispatcher {
public:
EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}
void registerHandler(int eventID, std::shared_ptrEventHandler> handler) {
std::lock_guardstd::mutex> lock(mMutex);
mHandlers[eventID] = handler;
}
void postEvent(int eventID) {
std::lock_guardstd::mutex> lock(mMutex);
auto it = mHandlers.find(eventID);
if (it != mHandlers.end()) {
mThreadPool.enqueue([handler = it->second, eventID]() {
handler->handleEvent(eventID);
});
}
}
private:
std::mapint, std::shared_ptrEventHandler>> mHandlers;
ThreadPool mThreadPool;
std::mutex mMutex;
};
这个实现现在是线程安全的,并且使用了线程池来处理事件。对于异常处理,你可以在EventHandler的handleEvent方法内部进行。
添加异常处理
如何在EventDispatcher中处理异常?
在EventDispatcher
中处理异常,我们可以在任务的执行过程中增加一个try/catch
块,捕获并处理可能出现的异常。下面是一个示例:
class EventDispatcher {
public:
EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}
void registerHandler(int eventID, std::shared_ptr handler) {
std::lock_guard lock(mMutex);
mHandlers[eventID] = handler;
}
void postEvent(int eventID) {
std::lock_guard lock(mMutex);
auto it = mHandlers.find(eventID);
if (it != mHandlers.end()) {
mThreadPool.enqueue([handler = it->second, eventID]() {
try {
handler->handleEvent(eventID);
} catch (const std::exception &ex) {
std::cerr > mHandlers;
ThreadPool mThreadPool;
std::mutex mMutex;
};
在这个示例中,如果handleEvent
方法抛出了一个异常,那么我们将在控制台上打印出一个错误消息。具体如何处理这些异常,取决于你的应用程序的特定需求。
请注意,这种方法仅处理了在handleEvent
方法中抛出的异常。如果在其他地方(如ThreadPool
的构造函数或enqueue
方法)抛出异常,你可能需要额外的代码来处理这些情况。
注册多个事件
在我们的事件分发器设计中,注册多个事件是非常简单的。你只需要创建不同的事件处理器,然后使用不同的事件ID将它们注册到事件分发器中。以下是一个示例:
class PrintEventHandler : public EventHandler {
public:
PrintEventHandler(int id) : id(id) {}
void handleEvent(int eventID) override {
std::cout handler1 = std::make_shared(1);
dispatcher.registerHandler(1, handler1);
std::shared_ptr handler2 = std::make_shared(2);
dispatcher.registerHandler(2, handler2);
dispatcher.postEvent(1);
dispatcher.postEvent(2);
// Sleep main thread to let the event threads finish
std::this_thread::sleep_for(std::chrono::seconds(1));
return 0;
}
在这个示例中,我们创建了两个PrintEventHandler
对象,每个对象都有一个唯一的ID。然后,我们使用不同的事件ID将这两个处理器注册到EventDispatcher
中。当我们发布事件时,相应的处理器将会被调用。
这就是如何在我们的异步事件绑定框架中注册多个事件的方法。
如何取消注册
要从事件分发器中取消注册一个事件处理器,你可以在EventDispatcher
类中添加一个新的方法,例如unregisterHandler
,用于从处理器映射中删除指定的事件ID。下面是如何实现这个函数的例子:
class EventDispatcher {
public:
EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}
void registerHandler(int eventID, std::shared_ptr handler) {
std::lock_guard lock(mMutex);
mHandlers[eventID] = handler;
}
void unregisterHandler(int eventID) {
std::lock_guard lock(mMutex);
mHandlers.erase(eventID);
}
void postEvent(int eventID) {
std::lock_guard lock(mMutex);
auto it = mHandlers.find(eventID);
if (it != mHandlers.end()) {
mThreadPool.enqueue([handler = it->second, eventID]() {
try {
handler->handleEvent(eventID);
} catch (const std::exception &ex) {
std::cerr > mHandlers;
ThreadPool mThreadPool;
std::mutex mMutex;
};
这个unregisterHandler
函数将删除与给定事件ID关联的事件处理器。请注意,这个函数并不会停止已经在处理该事件的任何线程。如果你想要取消正在进行的事件处理,你可能需要一个更复杂的设计,例如使用std::future
和std::promise
来控制线程的执行。
如何停止正在进行的事件
要停止正在进行的事件处理,我们需要更复杂的设计,它可能包括使用std::future
和std::promise
来控制线程的执行。在这种设计中,每当一个事件被发布时,我们将创建一个std::promise
,并将相应的std::future
存储在某个地方,以便我们可以稍后在需要时停止事件处理。
但是,要注意的是,根据C++的设计,没有一个简单且安全的方法可以强制停止正在运行的线程,因为这可能会导致资源泄漏或其他未定义的行为。因此,更常见的做法是让事件处理器定期检查一个“停止标记”,然后在检查到该标记时优雅地停止执行。以下是一个简单的示例,演示了如何实现这种设计:
class StoppableEvent {
public:
StoppableEvent(std::future future, std::function func)
: mFuture(std::move(future)), mFunc(std::move(func)) {}
void operator()() {
while(mFuture.wait_for(std::chrono::milliseconds(100)) == std::future_status::timeout) {
mFunc();
}
}
private:
std::future mFuture;
std::function mFunc;
};
class EventDispatcher {
public:
EventDispatcher(size_t numThreads) : mThreadPool(numThreads) {}
void registerHandler(int eventID, std::shared_ptr handler) {
std::lock_guard lock(mMutex);
mHandlers[eventID] = handler;
}
void postEvent(int eventID) {
std::lock_guard lock(mMutex);
auto it = mHandlers.find(eventID);
if (it != mHandlers.end()) {
std::promise stopSignal;
auto stopFuture = stopSignal.get_future();
mStopSignals[eventID服务器托管网] = std::move(stopSignal);
mThreadPool.enqueue(StoppableEvent(std::move(stopFuture), [handler = it->second, eventID]() {
handler->handleEvent(eventID);
}));
}
}
void stopEvent(int eventID) {
std::lock_guard lock(mMutex);
auto it = mStopSignals.find(eventID);
if (it != mStopSignals.end()) {
it->second.set_value();
mStopSignals.erase(it);
}
}
private:
std::map> mHandlers;
std::map> mStopSignals;
ThreadPool mThreadPool;
std::mutex mMutex;
};
在这个例子中,我们定义了一个StoppableEvent
类,它将一个std::future
和一个函数组合在一起。当operator()
被调用时,它将定期检查future
,如果future
的状态不是timeout
,则停止执行函数。
然后,当我们在EventDispatcher
中发布一个事件时,我们将创建一个新的std::promise
和相应的std::future
,并将这个future
和事件处理器的handleEvent
方法一起传递给StoppableEvent
。我们还将promise
存储在一个映射中,以便我们可以稍后通过调用set_value
来发出停止信号。
最后,我们添加了一个stopEvent
方法,它将查找与给定事件ID关联的promise
,并通过调用set_value
来发出停止信号。然后,它将从映射中删除这个promise
,因为我们不再需要它。
这是一个基本的示例,你可能需要根据你的具体需求来修改和扩展它。请注意,这个设计假设事件处理器的handleEvent
方法将被调用多次,每次调用都可能被中断。如果你的事件处理器只执行一次长时间运行的任务,那么这个设计可能并不适合。
以上是一个简易的异步事件处理demo, 在项目开发中,需要根据具体的业务需求进行调整完善。
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
相关推荐: Databend Cloud 如何助力 AIGC 初创公司成本下降百倍
某主营虚拟社交的 AIGC 领域海外初创公司,致力于将虚拟世界与现实世界联系起来。旗下平台基于开源的 Stable Diffusion 生态,专注于动漫角色制作,提供集 AI 绘图、图片搜索与社交分享于一体的一站式服务。 Databend Cloud 提供了易…