1
0
Fork 0
Univerxel/src/data/safe_priority_queue.hpp

136 lines
3.5 KiB
C++

#pragma once
#include <tuple>
#include <vector>
#include <unordered_set>
#include <robin_hood.h>
#include <mutex>
#include <condition_variable>
#include <algorithm>
namespace data {
/// Thread safe queue with unique keys updating priority and value
template <class K, class V, class W>
class safe_priority_queue_map {
private:
static bool cmpByWeight(const std::pair<K, W> &a, const std::pair<K, W> &b) {
return a.second < b.second;
}
std::vector<std::pair<K, W>> heap;
robin_hood::unordered_map<K, V> map;
std::mutex mutex;
std::condition_variable cv;
public:
void push(const K& key, const V& val, const W& weight) {
std::unique_lock<std::mutex> lock(mutex);
heap.push_back({key, weight});
std::push_heap(heap.begin(), heap.end(), cmpByWeight);
map.insert_or_assign(key, val);
cv.notify_one();
}
bool pop(std::pair<K, V>& out) {
std::unique_lock<std::mutex> lock(mutex);
if (heap.empty())
return false;
std::pop_heap(heap.begin(), heap.end(), cmpByWeight);
const auto priority = heap.back();
heap.pop_back();
const auto it = map.find(priority.first);
if(it == map.end())
return false;
out = std::make_pair(it->first, it->second);
map.erase(it);
return true;
}
bool empty() {
std::unique_lock<std::mutex> lock(mutex);
return heap.empty();
}
size_t size() {
std::unique_lock<std::mutex> lock(mutex);
return map.size();
}
void notify() {
cv.notify_all();
}
void wait() {
std::unique_lock<std::mutex> lock(mutex);
if (heap.empty())
cv.wait(lock);
}
};
/// Thread safe queue with unique keys updating priority
template <class K, class W>
class safe_priority_queue {
private:
static bool cmpByWeight(const std::pair<K, W> &a, const std::pair<K, W> &b) {
return a.second < b.second;
}
std::vector<std::pair<K, W>> heap;
robin_hood::unordered_flat_set<K> set;
std::mutex mutex;
std::condition_variable cv;
public:
void push(const K& key, const W& weight) {
std::unique_lock<std::mutex> lock(mutex);
heap.push_back({key, weight});
std::push_heap(heap.begin(), heap.end(), cmpByWeight);
set.insert(key);
cv.notify_one();
}
bool pop(K& out) {
std::unique_lock<std::mutex> lock(mutex);
if (heap.empty())
return false;
std::pop_heap(heap.begin(), heap.end(), cmpByWeight);
const auto priority = heap.back();
heap.pop_back();
const auto it = set.find(priority.first);
if(it == set.end())
return false;
out = *it;
set.erase(it);
return true;
}
bool empty() {
std::unique_lock<std::mutex> lock(mutex);
return heap.empty();
}
size_t size() {
std::unique_lock<std::mutex> lock(mutex);
return set.size();
}
void notify() {
cv.notify_all();
}
void wait() {
std::unique_lock<std::mutex> lock(mutex);
if (heap.empty())
cv.wait(lock);
}
};
}