17#ifndef MANTISBASE_SSE_H
18#define MANTISBASE_SSE_H
20#include <condition_variable>
26#include <unordered_map>
27#include <nlohmann/json.hpp>
32 using json = nlohmann::json;
36 std::string m_clientID;
37 std::set<std::string> m_topics;
39 std::mutex m_queueMutex;
40 std::condition_variable m_queueCV;
41 std::queue<std::pair<std::string, json> > m_eventQueue;
43 std::atomic<bool> m_isActive;
44 std::chrono::steady_clock::time_point m_lastActivity;
48 const std::set<std::string> &topics,
49 const json &auth = json::object(),
50 const json &verification = json::object());
58 std::chrono::milliseconds timeout);
79 const std::set<std::string> &
getTopics()
const;
81 void setTopics(
const std::set<std::string> &topics);
86 std::unordered_map<std::string, std::shared_ptr<SSESession>> m_sessions;
87 std::mutex m_sessions_mutex;
88 std::condition_variable m_cv;
89 std::thread m_cleanup_thread;
90 std::atomic<bool> m_running{
true};
99 std::string
createSession(
const std::set<std::string> &initial_topics);
101 std::shared_ptr<SSESession>
fetchSession(
const std::string &session_id);
106 std::shared_ptr<mb::SSESession>
getSession(
const std::string &sessionId);
126 static std::string generateClientID();
128 void cleanupIdleSessions();
A wrapper class around httplib::Request offering a consistent API and allowing for easy wrapper metho...
Definition http.h:39
Wrapper around httplib::Response for consistent API.
Definition http.h:315
std::string createSession(const std::set< std::string > &initial_topics)
Definition sse.cpp:157
bool isRunning() const
Definition sse.cpp:256
std::shared_ptr< SSESession > fetchSession(const std::string &session_id)
Definition sse.cpp:170
size_t getSessionCount()
Definition sse.cpp:223
static void createRoutes()
Definition sse.cpp:134
void removeSession(const std::string &session_id)
Definition sse.cpp:179
std::shared_ptr< mb::SSESession > getSession(const std::string &sessionId)
Definition sse.cpp:198
void updateActivity(const std::string &session_id)
Definition sse.cpp:191
~SSEMgr()
Definition sse.cpp:132
void broadcastChange(const json &change_event)
Definition sse.cpp:208
void stop()
Definition sse.cpp:245
void start()
Definition sse.cpp:228
static std::function< void(MantisRequest &, MantisResponse &)> handleSSESession()
Definition sse.cpp:258
static std::function< void(MantisRequest &, MantisResponse &)> handleSSESessionUpdate()
Definition sse.cpp:351
bool waitForEvent(std::string &eventType, json &data, std::chrono::milliseconds timeout)
Definition sse.cpp:33
const std::string & getClientID() const
Definition sse.cpp:124
json formatEvent(const json &change_event) const
Definition sse.cpp:79
void updateActivity()
Definition sse.cpp:107
bool isInterestedIn(const json &change_event) const
Definition sse.cpp:55
void queueEvent(const std::string &eventType, const json &data)
Definition sse.cpp:27
auto getLastActivity() const
Definition sse.cpp:115
void setTopics(const std::set< std::string > &topics)
Definition sse.cpp:128
const std::set< std::string > & getTopics() const
Definition sse.cpp:126
void updateTopics(std::set< std::string > &topics)
Definition sse.cpp:111
bool isActive() const
Definition sse.cpp:122
void close()
Definition sse.cpp:117
router.h
Definition auth.h:16
httplib::Server::HandlerResponse HandlerResponse
Definition types.h:33
nlohmann::json json
Shorten JSON namespace.
Definition context_store.h:18
Realtime database change detection for SQLite and PostgreSQL.