1
0
Fork 0
Univerxel/src/data/safe_priority_queue.hpp

158 lines
4.8 KiB
C++

#pragma once
#include <tuple>
#include <vector>
#include <robin_hood.h>
#include <mutex>
#include <condition_variable>
#include <algorithm>
#include <Tracy.hpp> // NOLINT
namespace data {
/// Thread safe queue with unique keys updating priority and value
template <class K, class V, class W, class hash = robin_hood::hash<K>>
class safe_priority_queue_map {
private:
static bool cmpByWeight(const std::pair<K, W> &a, const std::pair<K, W> &b) {
return a.second < b.second;
}
std::vector<std::pair<K, W>> heap;
robin_hood::unordered_map<K, V, hash> map;
TracyLockableN(std::mutex, mutex, "PriorityQueueMap");
std::condition_variable_any cv;
public:
std::pair<std::function<void(const K&, const V&, const W&)>, std::unique_lock<LockableBase(std::mutex)>> inserter() {
return std::make_pair([&](const K& key, const V& val, const W& weight) {
heap.emplace_back(key, weight);
std::push_heap(heap.begin(), heap.end(), cmpByWeight);
map.insert_or_assign(key, val);
}, std::unique_lock<LockableBase(std::mutex)>(mutex));
}
void push(const K& key, const V& val, const W& weight) {
std::unique_lock<LockableBase(std::mutex)> lock(mutex);
heap.emplace_back(key, weight);
std::push_heap(heap.begin(), heap.end(), cmpByWeight);
map.insert_or_assign(key, val);
cv.notify_one();
}
bool pop(std::pair<K, V>& out) {
std::unique_lock<LockableBase(std::mutex)> lock(mutex);
if (heap.empty())
return false;
std::pop_heap(heap.begin(), heap.end(), cmpByWeight);
const auto priority = heap.back();
heap.pop_back();
const auto it = map.find(priority.first);
if(it == map.end())
return false;
out = std::make_pair(it->first, it->second);
map.erase(it);
return true;
}
bool empty() {
std::unique_lock<LockableBase(std::mutex)> lock(mutex);
return heap.empty();
}
size_t size() {
std::unique_lock<LockableBase(std::mutex)> lock(mutex);
return map.size();
}
void notify_all() {
cv.notify_all();
}
void notify_one() {
cv.notify_one();
}
void wait() {
std::unique_lock<LockableBase(std::mutex)> lock(mutex);
if (heap.empty())
cv.wait(lock);
}
};
/// Thread safe queue with unique keys updating priority
template <class K, class W>
class safe_priority_queue {
private:
static bool cmpByWeight(const std::pair<K, W> &a, const std::pair<K, W> &b) {
return a.second < b.second;
}
std::vector<std::pair<K, W>> heap;
robin_hood::unordered_flat_set<K> set;
TracyLockableN(std::mutex, mutex, "PriorityQueue");
std::condition_variable_any cv;
public:
std::pair<std::function<void(const K&, const W&)>, std::unique_lock<LockableBase(std::mutex)>> inserter() {
return std::make_pair([&](const K& key, const W& weight) {
heap.emplace_back(key, weight);
std::push_heap(heap.begin(), heap.end(), cmpByWeight);
set.insert(key);
}, std::unique_lock<LockableBase(std::mutex)>(mutex));
}
void push(const K& key, const W& weight) {
std::unique_lock<LockableBase(std::mutex)> lock(mutex);
heap.emplace_back(key, weight);
std::push_heap(heap.begin(), heap.end(), cmpByWeight);
set.insert(key);
cv.notify_one();
}
bool pop(K& out) {
std::unique_lock<LockableBase(std::mutex)> lock(mutex);
if (heap.empty())
return false;
std::pop_heap(heap.begin(), heap.end(), cmpByWeight);
const auto priority = heap.back();
heap.pop_back();
const auto it = set.find(priority.first);
if(it == set.end())
return false;
out = *it;
set.erase(it);
return true;
}
bool empty() {
std::unique_lock<LockableBase(std::mutex)> lock(mutex);
return heap.empty();
}
size_t size() {
std::unique_lock<LockableBase(std::mutex)> lock(mutex);
return set.size();
}
void notify_all() {
cv.notify_all();
}
void notify_one() {
cv.notify_one();
}
void wait() {
std::unique_lock<LockableBase(std::mutex)> lock(mutex);
if (heap.empty())
cv.wait(lock);
}
};
}