C++ Distributed Hash Table
dht_proxy_server.h
1 /*
2  * Copyright (C) 2017-2018 Savoir-faire Linux Inc.
3  * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4  * Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program. If not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 #if OPENDHT_PROXY_SERVER
21 
22 #pragma once
23 
24 #include "callbacks.h"
25 #include "def.h"
26 #include "infohash.h"
27 #include "proxy.h"
28 #include "scheduler.h"
29 #include "sockaddr.h"
30 #include "value.h"
31 
32 #include <thread>
33 #include <memory>
34 #include <mutex>
35 #include <restbed>
36 
37 namespace Json {
38  class Value;
39 }
40 
41 namespace dht {
42 
43 class DhtRunner;
44 
48 class OPENDHT_PUBLIC DhtProxyServer
49 {
50 public:
59  DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port = 8000, const std::string& pushServer = "");
60  virtual ~DhtProxyServer();
61 
62  DhtProxyServer(const DhtProxyServer& other) = delete;
63  DhtProxyServer(DhtProxyServer&& other) = delete;
64  DhtProxyServer& operator=(const DhtProxyServer& other) = delete;
65  DhtProxyServer& operator=(DhtProxyServer&& other) = delete;
66 
67  struct ServerStats {
69  size_t listenCount;
71  size_t putCount;
73  size_t pushListenersCount;
75  double requestRate;
76 
77  std::string toString() const {
78  std::ostringstream ss;
79  ss << "Listens: " << listenCount << " Puts: " << putCount << " PushListeners: " << pushListenersCount << std::endl;
80  ss << "Requests: " << requestRate << " per second." << std::endl;
81  return ss.str();
82  }
83  };
84  ServerStats getStats() const;
85 
86  std::shared_ptr<DhtRunner> getNode() const { return dht_; }
87 
91  void stop();
92 
93 private:
101  void getNodeInfo(const std::shared_ptr<restbed::Session>& session) const;
102 
113  void get(const std::shared_ptr<restbed::Session>& session) const;
114 
125  void listen(const std::shared_ptr<restbed::Session>& session);
126 
136  void put(const std::shared_ptr<restbed::Session>& session);
137 
138  void cancelPut(const InfoHash& key, Value::Id vid);
139 
140 #if OPENDHT_PROXY_SERVER_IDENTITY
141 
150  void putSigned(const std::shared_ptr<restbed::Session>& session) const;
151 
161  void putEncrypted(const std::shared_ptr<restbed::Session>& session) const;
162 #endif // OPENDHT_PROXY_SERVER_IDENTITY
163 
174  void getFiltered(const std::shared_ptr<restbed::Session>& session) const;
175 
183  void handleOptionsMethod(const std::shared_ptr<restbed::Session>& session) const;
184 
189  void removeClosedListeners(bool testSession = true);
190 
191 #if OPENDHT_PUSH_NOTIFICATIONS
192 
201  void subscribe(const std::shared_ptr<restbed::Session>& session);
209  void unsubscribe(const std::shared_ptr<restbed::Session>& session);
215  void sendPushNotification(const std::string& key, const Json::Value& json, bool isAndroid) const;
216 
223  void cancelPushListen(const std::string& pushToken, const InfoHash& key, const std::string& clientId);
224 
225 
226 #endif //OPENDHT_PUSH_NOTIFICATIONS
227 
228  using clock = std::chrono::steady_clock;
229  using time_point = clock::time_point;
230 
231  std::thread server_thread {};
232  std::unique_ptr<restbed::Service> service_;
233  std::shared_ptr<DhtRunner> dht_;
234 
235  std::mutex schedulerLock_;
236  std::condition_variable schedulerCv_;
237  Scheduler scheduler_;
238  std::thread schedulerThread_;
239 
240  Sp<Scheduler::Job> printStatsJob_;
241  mutable std::mutex statsMutex_;
242  mutable NodeInfo nodeInfo_ {};
243 
244  // Handle client quit for listen.
245  // NOTE: can be simplified when we will supports restbed 5.0
246  std::thread listenThread_;
247  struct SessionToHashToken {
248  std::shared_ptr<restbed::Session> session;
249  InfoHash hash;
250  std::future<size_t> token;
251  };
252  std::vector<SessionToHashToken> currentListeners_;
253  std::mutex lockListener_;
254  std::atomic_bool stopListeners {false};
255 
256  struct PermanentPut;
257  struct SearchPuts;
258  std::map<InfoHash, SearchPuts> puts_;
259 
260  mutable std::atomic<size_t> requestNum_ {0};
261  mutable std::atomic<time_point> lastStatsReset_ {time_point::min()};
262 
263  const std::string pushServer_;
264 
265 #if OPENDHT_PUSH_NOTIFICATIONS
266  struct Listener;
267  struct PushListener;
268  std::mutex lockPushListeners_;
269  std::map<std::string, PushListener> pushListeners_;
270  proxy::ListenToken tokenPushNotif_ {0};
271 #endif //OPENDHT_PUSH_NOTIFICATIONS
272 };
273 
274 }
275 
276 #endif //OPENDHT_PROXY_SERVER
OPENDHT_PUBLIC Blob hash(const Blob &data, size_t hash_length=512/8)
Definition: callbacks.h:34