ZBLOG

设计基于ace的c++ job Scheduler框架,并提供demo

下面是一个简单的基于 ACE 的 C++ Job Scheduler 框架的设计示例,包括 Job 类、JobQueue 类和 Worker 类:

#include "ace/Reactor.h"
#include "ace/Task.h"
#include <queue>
#include <mutex>

class Job {
public:
    typedef void (*JobFunc)(void*);

    Job(const std::string& name, int priority, JobFunc func, void* arg)
        : m_name(name), m_priority(priority), m_func(func), m_arg(arg) {}

    const std::string& getName() const { return m_name; }
    int getPriority() const { return m_priority; }
    void execute() const { (*m_func)(m_arg); }

private:
    std::string m_name;
    int m_priority;
    JobFunc m_func;
    void* m_arg;
};

class JobQueue {
public:
    typedef std::function<bool(const Job&, const Job&)> Comparator;

    explicit JobQueue(ACE_Reactor* reactor) : m_reactor(reactor) {}
    
    bool addJob(const Job& job);
    bool removeJob(const std::string& name);

private:
    ACE_Reactor* m_reactor;
    std::priority_queue<Job, std::vector<Job>, Comparator> m_jobs;
};

bool JobQueue::addJob(const Job& job) {
  // Push the job onto the queue.
  std::lock_guard<std::mutex> lock(m_mutex);
  bool wasEmpty = (m_jobs.empty());
  m_jobs.push(job);

  if (wasEmpty) {
      // Register with the Reactor to handle the next event loop.
      if (m_reactor->register_handler(this,
                                       ACE_Event_Handler::READ_MASK) == -1)
          return false;

      // Schedule an asynchronous notification event to wake up the event loop.
      ACE_Message_Block* message = new ACE_Message_Block(0, ACE_Message_Block::MB_HANGUP);
      if (m_reactor->notify(this, message) == -1)
          return false;
  }

  return true;
}

bool JobQueue::removeJob(const std::string& name) {
    // Remove all jobs with the given name.
    std::lock_guard<std::mutex> lock(m_mutex);
    bool found = false;

    while (!m_jobs.empty()) {
        const Job& job = m_jobs.top();
        if (job.getName() == name) {
            m_jobs.pop();
            found = true;
        } else
            break;
    }

    return found;
}

class Worker : public ACE_Task_Base {
public:
    explicit Worker(JobQueue* queue) : m_queue(queue) {}

    int svc() override;

private:
    JobQueue* m_queue;
};

int Worker::svc() {
  for (;;) {
      // Get the highest-priority job from the queue.
      Job job;

      {    
          std::lock_guard<std::mutex> lock(m_queue->m_mutex);

          if (m_queue->m_jobs.empty())
              break;  // No more work to do.

          job = m_queue->m_jobs.top();
          m_queue->m_jobs.pop();
      }

      // Execute the job.
      job.execute();
  }

  return 0;
}

使用方法:

void printMessage(void* arg) {
   char* message = static_cast<char*>(arg);
   printf("%s\n", message);
   delete[] message;  // Free the memory allocated by the caller.
}

int main(int argc, char** argv) {
   ACE_Reactor reactor;

   JobQueue queue(&reactor);

   for (int i = 0; i < 10; ++i) {
       char* message = new char[32];
       sprintf(message, "Message %d", i);

       queue.addJob(Job("Print", 0, &printMessage, message));
   }

   // Start worker threads.
   std::vector<std::unique_ptr<Worker>> workers;
   for (int i = 0; i < 4; ++i)
       workers.emplace_back(std::make_unique<Worker>(&queue));

   // Run the event loop.
   reactor.run_reactor_event_loop();

   return 0;
}

该示例创建了一个 JobQueue 对象,并向其中添加了一些任务。然后,它创建了四个 Worker 线程,并将 JobQueue 对象传递给它们。最后,它启动 ACE_Reactor 的事件循环来调度执行任务。

当有新的任务加入队列时,ACE_Reactor 将会触发 JobQueue 对象的回调函数,在该回调函数中可以根据优先级动态地重新安排任务队列。同时,Worker 线程会从队列中取出最高优先级的任务并执行。

需要注意的是,为了保证线程安全,在 JobQueue 中使用了 std::mutex 来保护任务队列的访问。此外,在使用完毕后需要显式释放申请的内存以避免内存泄漏。

本站部分文章来源于网络,版权归原作者所有,如有侵权请联系站长删除。
转载请注明出处:https://golang.0voice.com/?id=5505

分享:
扫描分享到社交APP
上一篇
下一篇
发表列表
游客 游客
此处应有掌声~
评论列表

还没有评论,快来说点什么吧~

联系我们

在线咨询: 点击这里给我发消息

微信号:3007537140

上班时间: 10:30-22:30

关注我们
x

注册

已经有帐号?