17#ifndef MANTISBASE_SSE_H
18#define MANTISBASE_SSE_H
20#include <condition_variable>
26#include <unordered_map>
27#include <nlohmann/json.hpp>
32 using json = nlohmann::json;
36 std::string m_clientID;
37 std::set<std::string> m_topics;
39 std::mutex m_queueMutex;
40 std::condition_variable m_queueCV;
41 std::queue<std::pair<std::string, json> > m_eventQueue;
43 std::atomic<bool> m_isActive;
44 std::chrono::steady_clock::time_point m_lastActivity;
48 const std::set<std::string> &topics,
49 const json &auth = json::object(),
50 const json &verification = json::object());
58 std::chrono::milliseconds timeout);
79 const std::set<std::string> &
getTopics()
const;
81 void setTopics(
const std::set<std::string> &topics);
86 std::unordered_map<std::string, std::shared_ptr<SSESession>> m_sessions;
87 std::mutex m_sessions_mutex;
88 std::condition_variable m_cv;
89 std::thread m_cleanup_thread;
90 std::atomic<bool> m_running{
true};
99 std::string
createSession(
const std::set<std::string> &initial_topics);
101 std::shared_ptr<SSESession>
fetchSession(
const std::string &session_id);
106 std::shared_ptr<mb::SSESession>
getSession(
const std::string &sessionId);
126 static std::string generateClientID();
128 void cleanupIdleSessions();
A wrapper class around httplib::Request offering a consistent API and allowing for easy wrapper metho...
Definition http.h:42
Wrapper around httplib::Response for consistent API.
Definition http.h:318
std::string createSession(const std::set< std::string > &initial_topics)
Definition sse.cpp:155
bool isRunning() const
Definition sse.cpp:254
std::shared_ptr< SSESession > fetchSession(const std::string &session_id)
Definition sse.cpp:168
size_t getSessionCount()
Definition sse.cpp:221
static void createRoutes()
Definition sse.cpp:132
void removeSession(const std::string &session_id)
Definition sse.cpp:177
std::shared_ptr< mb::SSESession > getSession(const std::string &sessionId)
Definition sse.cpp:196
void updateActivity(const std::string &session_id)
Definition sse.cpp:189
~SSEMgr()
Definition sse.cpp:130
void broadcastChange(const json &change_event)
Definition sse.cpp:206
void stop()
Definition sse.cpp:243
void start()
Definition sse.cpp:226
static std::function< void(MantisRequest &, MantisResponse &)> handleSSESession()
Definition sse.cpp:256
static std::function< void(MantisRequest &, MantisResponse &)> handleSSESessionUpdate()
Definition sse.cpp:349
bool waitForEvent(std::string &eventType, json &data, std::chrono::milliseconds timeout)
Definition sse.cpp:31
const std::string & getClientID() const
Definition sse.cpp:122
json formatEvent(const json &change_event) const
Definition sse.cpp:77
void updateActivity()
Definition sse.cpp:105
bool isInterestedIn(const json &change_event) const
Definition sse.cpp:53
void queueEvent(const std::string &eventType, const json &data)
Definition sse.cpp:25
auto getLastActivity() const
Definition sse.cpp:113
void setTopics(const std::set< std::string > &topics)
Definition sse.cpp:126
const std::set< std::string > & getTopics() const
Definition sse.cpp:124
void updateTopics(std::set< std::string > &topics)
Definition sse.cpp:109
bool isActive() const
Definition sse.cpp:120
void close()
Definition sse.cpp:115
router.h
Definition auth.h:15
httplib::Server::HandlerResponse HandlerResponse
Definition types.h:33
nlohmann::json json
Shorten JSON namespace.
Definition context_store.h:18
Realtime database change detection for SQLite and PostgreSQL.