MantisBase v0.3.4
Loading...
Searching...
No Matches
sse.h
Go to the documentation of this file.
1
17#ifndef MANTISBASE_SSE_H
18#define MANTISBASE_SSE_H
19
20#include <condition_variable>
21#include <functional>
22#include <mutex>
23#include <queue>
24#include <set>
25#include <thread>
26#include <unordered_map>
27#include <nlohmann/json.hpp>
28
29#include "realtime.h"
30
31namespace mb {
32 using json = nlohmann::json;
33
35 class SSESession {
36 std::string m_clientID;
37 std::set<std::string> m_topics;
38
39 std::mutex m_queueMutex;
40 std::condition_variable m_queueCV;
41 std::queue<std::pair<std::string, json> > m_eventQueue; // <event_type, data>
42
43 std::atomic<bool> m_isActive;
44 std::chrono::steady_clock::time_point m_lastActivity;
45
46 public:
47 SSESession(const std::string &sessionId,
48 const std::set<std::string> &topics,
49 const json &auth = json::object(),
50 const json &verification = json::object());
51
52 // Queue an event to be sent
54 void queueEvent(const std::string &eventType, const json &data);
55
57 bool waitForEvent(std::string &eventType, json &data,
58 std::chrono::milliseconds timeout);
59
61 bool isInterestedIn(const json &change_event) const;
62
64 json formatEvent(const json &change_event) const;
65
66 void updateActivity();
67
68 void updateTopics(std::set<std::string> &topics);
69
70 auto getLastActivity() const;
71
73 void close();
74
75 bool isActive() const;
76
77 const std::string &getClientID() const;
78
79 const std::set<std::string> &getTopics() const;
80
81 void setTopics(const std::set<std::string> &topics);
82 };
83
85 class SSEMgr {
86 std::unordered_map<std::string, std::shared_ptr<SSESession>> m_sessions;
87 std::mutex m_sessions_mutex;
88 std::condition_variable m_cv;
89 std::thread m_cleanup_thread;
90 std::atomic<bool> m_running{true};
91
92 public:
93 SSEMgr() = default;
94 ~SSEMgr();
95
97 static void createRoutes();
99 std::string createSession(const std::set<std::string> &initial_topics);
100
101 std::shared_ptr<SSESession> fetchSession(const std::string &session_id);
103 void removeSession(const std::string &session_id);
104
105 void updateActivity(const std::string &session_id);
106 std::shared_ptr<mb::SSESession> getSession(const std::string &sessionId);
107
109 void broadcastChange(const json &change_event);
110 size_t getSessionCount();
111
112 void start();
113 void stop();
114 bool isRunning() const;
115
116 static std::function<void(MantisRequest &, MantisResponse &)> handleSSESession();
117 static std::function<void(MantisRequest &, MantisResponse &)> handleSSESessionUpdate();
118
119 private:
120 static std::function<HandlerResponse(MantisRequest &, MantisResponse &)> validateSubTopics(bool is_updating = false);
121
122 static std::function<HandlerResponse(MantisRequest &, MantisResponse &)> validateHasAccess();
123
124 static std::function<HandlerResponse(MantisRequest &, MantisResponse &)> updateAuthTokenForSSE();
125
126 static std::string generateClientID();
127
128 void cleanupIdleSessions();
129 };
130}
131
132
133#endif //MANTISBASE_SSE_H
A wrapper class around httplib::Request offering a consistent API and allowing for easy wrapper metho...
Definition http.h:42
Wrapper around httplib::Response for consistent API.
Definition http.h:318
Definition sse.h:85
std::string createSession(const std::set< std::string > &initial_topics)
Definition sse.cpp:155
bool isRunning() const
Definition sse.cpp:254
std::shared_ptr< SSESession > fetchSession(const std::string &session_id)
Definition sse.cpp:168
size_t getSessionCount()
Definition sse.cpp:221
static void createRoutes()
Definition sse.cpp:132
void removeSession(const std::string &session_id)
Definition sse.cpp:177
std::shared_ptr< mb::SSESession > getSession(const std::string &sessionId)
Definition sse.cpp:196
void updateActivity(const std::string &session_id)
Definition sse.cpp:189
~SSEMgr()
Definition sse.cpp:130
void broadcastChange(const json &change_event)
Definition sse.cpp:206
void stop()
Definition sse.cpp:243
void start()
Definition sse.cpp:226
SSEMgr()=default
static std::function< void(MantisRequest &, MantisResponse &)> handleSSESession()
Definition sse.cpp:256
static std::function< void(MantisRequest &, MantisResponse &)> handleSSESessionUpdate()
Definition sse.cpp:349
Definition sse.h:35
bool waitForEvent(std::string &eventType, json &data, std::chrono::milliseconds timeout)
Definition sse.cpp:31
const std::string & getClientID() const
Definition sse.cpp:122
json formatEvent(const json &change_event) const
Definition sse.cpp:77
void updateActivity()
Definition sse.cpp:105
bool isInterestedIn(const json &change_event) const
Definition sse.cpp:53
void queueEvent(const std::string &eventType, const json &data)
Definition sse.cpp:25
auto getLastActivity() const
Definition sse.cpp:113
void setTopics(const std::set< std::string > &topics)
Definition sse.cpp:126
const std::set< std::string > & getTopics() const
Definition sse.cpp:124
void updateTopics(std::set< std::string > &topics)
Definition sse.cpp:109
bool isActive() const
Definition sse.cpp:120
void close()
Definition sse.cpp:115
router.h
Definition auth.h:15
httplib::Server::HandlerResponse HandlerResponse
Definition types.h:33
nlohmann::json json
Shorten JSON namespace.
Definition context_store.h:18
Realtime database change detection for SQLite and PostgreSQL.