C++ Distributed Hash Table
dhtrunner.h
1 /*
2  * Copyright (C) 2014-2017 Savoir-faire Linux Inc.
3  * Authors: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4  * Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
5  * Sébastien Blin <sebastien.blin@savoirfairelinux.com>
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program. If not, see <https://www.gnu.org/licenses/>.
19  */
20 
21 #pragma once
22 
23 #include "infohash.h"
24 #include "value.h"
25 #include "callbacks.h"
26 #include "sockaddr.h"
27 #include "log_enable.h"
28 #include "def.h"
29 
30 #include <thread>
31 #include <mutex>
32 #include <atomic>
33 #include <condition_variable>
34 #include <future>
35 #include <exception>
36 #include <queue>
37 #include <chrono>
38 
39 namespace dht {
40 
41 struct Node;
42 class SecureDht;
43 struct SecureDhtConfig;
44 
51 class OPENDHT_PUBLIC DhtRunner {
52 
53 public:
54  typedef std::function<void(NodeStatus, NodeStatus)> StatusCallback;
55 
56  struct Config {
57  SecureDhtConfig dht_config;
58  bool threaded;
59  std::string proxy_server;
60  std::string push_node_id;
61  };
62 
63  DhtRunner();
64  virtual ~DhtRunner();
65 
66  void get(InfoHash id, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter f = Value::AllFilter(), Where w = {}) {
67  get(id, bindGetCb(cb), donecb, f, w);
68  }
69 
70  void get(InfoHash id, GetCallbackSimple cb, DoneCallbackSimple donecb={}, Value::Filter f = Value::AllFilter(), Where w = {}) {
71  get(id, bindGetCb(cb), donecb, f, w);
72  }
73 
74  void get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f={}, Where w = {});
75 
76  void get(InfoHash id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter f = Value::AllFilter(), Where w = {}) {
77  get(id, cb, bindDoneCb(donecb), f, w);
78  }
79  void get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb={}, Value::Filter f = Value::AllFilter(), Where w = {});
80 
81  template <class T>
82  void get(InfoHash hash, std::function<bool(std::vector<T>&&)> cb, DoneCallbackSimple dcb={})
83  {
84  get(hash, [=](const std::vector<std::shared_ptr<Value>>& vals) {
85  return cb(unpackVector<T>(vals));
86  },
87  dcb,
88  getFilterSet<T>());
89  }
90  template <class T>
91  void get(InfoHash hash, std::function<bool(T&&)> cb, DoneCallbackSimple dcb={})
92  {
93  get(hash, [=](const std::vector<std::shared_ptr<Value>>& vals) {
94  for (const auto& v : vals) {
95  try {
96  if (not cb(Value::unpack<T>(*v)))
97  return false;
98  } catch (const std::exception&) {
99  continue;
100  }
101  }
102  return true;
103  },
104  dcb,
105  getFilterSet<T>());
106  }
107 
108  std::future<std::vector<std::shared_ptr<dht::Value>>> get(InfoHash key, Value::Filter f = Value::AllFilter(), Where w = {}) {
109  auto p = std::make_shared<std::promise<std::vector<std::shared_ptr< dht::Value >>>>();
110  auto values = std::make_shared<std::vector<std::shared_ptr< dht::Value >>>();
111  get(key, [=](const std::vector<std::shared_ptr<dht::Value>>& vlist) {
112  values->insert(values->end(), vlist.begin(), vlist.end());
113  return true;
114  }, [=](bool) {
115  p->set_value(std::move(*values));
116  },
117  f, w);
118  return p->get_future();
119  }
120 
121  template <class T>
122  std::future<std::vector<T>> get(InfoHash key) {
123  auto p = std::make_shared<std::promise<std::vector<T>>>();
124  auto values = std::make_shared<std::vector<T>>();
125  get<T>(key, [=](T&& v) {
126  values->emplace_back(std::move(v));
127  return true;
128  }, [=](bool) {
129  p->set_value(std::move(*values));
130  });
131  return p->get_future();
132  }
133 
134  void query(const InfoHash& hash, QueryCallback cb, DoneCallback done_cb = {}, Query q = {});
135  void query(const InfoHash& hash, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query q = {}) {
136  query(hash, cb, bindDoneCb(done_cb), q);
137  }
138 
139  std::future<size_t> listen(InfoHash key, ValueCallback vcb, Value::Filter f = Value::AllFilter(), Where w = {});
140 
141  std::future<size_t> listen(InfoHash key, GetCallback cb, Value::Filter f={}, Where w={}) {
142  return listen(key, [cb](const std::vector<Sp<Value>>& vals, bool expired){
143  if (not expired)
144  return cb(vals);
145  return true;
146  }, std::forward<Value::Filter>(f), std::forward<Where>(w));
147  }
148  std::future<size_t> listen(const std::string& key, GetCallback vcb, Value::Filter f = Value::AllFilter(), Where w = {});
149  std::future<size_t> listen(InfoHash key, GetCallbackSimple cb, Value::Filter f = Value::AllFilter(), Where w = {}) {
150  return listen(key, bindGetCb(cb), f, w);
151  }
152 
153  template <class T>
154  std::future<size_t> listen(InfoHash hash, std::function<bool(std::vector<T>&&)> cb)
155  {
156  return listen(hash, [=](const std::vector<std::shared_ptr<Value>>& vals) {
157  return cb(unpackVector<T>(vals));
158  },
159  getFilterSet<T>());
160  }
161  template <typename T>
162  std::future<size_t> listen(InfoHash hash, std::function<bool(T&&)> cb, Value::Filter f = Value::AllFilter(), Where w = {})
163  {
164  return listen(hash, [=](const std::vector<std::shared_ptr<Value>>& vals) {
165  for (const auto& v : vals) {
166  try {
167  if (not cb(Value::unpack<T>(*v)))
168  return false;
169  } catch (const std::exception&) {
170  continue;
171  }
172  }
173  return true;
174  },
175  getFilterSet<T>(f), w);
176  }
177 
178  void cancelListen(InfoHash h, size_t token);
179  void cancelListen(InfoHash h, std::shared_future<size_t> token);
180 
181  void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, time_point created=time_point::max(), bool permanent = false);
182  void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, time_point created=time_point::max(), bool permanent = false) {
183  put(hash, value, bindDoneCb(cb), created, permanent);
184  }
185 
186  void put(InfoHash hash, Value&& value, DoneCallback cb={}, time_point created=time_point::max(), bool permanent = false);
187  void put(InfoHash hash, Value&& value, DoneCallbackSimple cb, time_point created=time_point::max(), bool permanent = false) {
188  put(hash, std::forward<Value>(value), bindDoneCb(cb), created, permanent);
189  }
190  void put(const std::string& key, Value&& value, DoneCallbackSimple cb={}, time_point created=time_point::max(), bool permanent = false);
191 
192  void cancelPut(const InfoHash& h, const Value::Id& id);
193 
194  void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={});
195  void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb) {
196  putSigned(hash, value, bindDoneCb(cb));
197  }
198 
199  void putSigned(InfoHash hash, Value&& value, DoneCallback cb={});
200  void putSigned(InfoHash hash, Value&& value, DoneCallbackSimple cb) {
201  putSigned(hash, std::forward<Value>(value), bindDoneCb(cb));
202  }
203  void putSigned(const std::string& key, Value&& value, DoneCallbackSimple cb={});
204 
205  void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb={});
206  void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallbackSimple cb) {
207  putEncrypted(hash, to, value, bindDoneCb(cb));
208  }
209 
210  void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb={});
211  void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallbackSimple cb) {
212  putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb));
213  }
214  void putEncrypted(const std::string& key, InfoHash to, Value&& value, DoneCallback cb={});
215 
220  void bootstrap(const std::vector<SockAddr>& nodes, DoneCallbackSimple&& cb={});
221  void bootstrap(const SockAddr& addr, DoneCallbackSimple&& cb={});
222 
227  void bootstrap(const std::vector<NodeExport>& nodes);
228 
235  void bootstrap(const std::string& host, const std::string& service);
236 
240  void clearBootstrap();
241 
247  void connectivityChanged();
248 
249  void dumpTables() const;
250 
251  InfoHash getId() const;
252 
253  InfoHash getNodeId() const;
254 
259  const SockAddr& getBound(sa_family_t f = AF_INET) const {
260  return (f == AF_INET) ? bound4 : bound6;
261  }
262 
267  in_port_t getBoundPort(sa_family_t f = AF_INET) const {
268  return getBound(f).getPort();
269  }
270 
271  std::pair<size_t, size_t> getStoreSize() const;
272 
273  void setStorageLimit(size_t limit = DEFAULT_STORAGE_LIMIT);
274 
275  std::vector<NodeExport> exportNodes() const;
276 
277  std::vector<ValuesExport> exportValues() const;
278 
279  void setLoggers(LogMethod err = NOLOG, LogMethod warn = NOLOG, LogMethod debug = NOLOG);
280 
284  void setLogFilter(const InfoHash& f = {});
285 
286  void registerType(const ValueType& type);
287 
288  void importValues(const std::vector<ValuesExport>& values);
289 
290  bool isRunning() const {
291  return running;
292  }
293 
294  NodeStats getNodesStats(sa_family_t af) const;
295  unsigned getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const;
296  NodeInfo getNodeInfo() const;
297 
298  std::vector<unsigned> getNodeMessageStats(bool in = false) const;
299  std::string getStorageLog() const;
300  std::string getStorageLog(const InfoHash&) const;
301  std::string getRoutingTablesLog(sa_family_t af) const;
302  std::string getSearchesLog(sa_family_t af = AF_UNSPEC) const;
303  std::string getSearchLog(const InfoHash&, sa_family_t af = AF_UNSPEC) const;
304  std::vector<SockAddr> getPublicAddress(sa_family_t af = AF_UNSPEC);
305  std::vector<std::string> getPublicAddressStr(sa_family_t af = AF_UNSPEC);
306 
307  // securedht methods
308 
309  void findCertificate(InfoHash hash, std::function<void(const std::shared_ptr<crypto::Certificate>)>);
310  void registerCertificate(std::shared_ptr<crypto::Certificate> cert);
311  void setLocalCertificateStore(CertificateStoreQuery&& query_method);
312 
319  void run(in_port_t port = 4222, const crypto::Identity identity = {}, bool threaded = false, NetId network = 0) {
320  run(port, {
321  /*.dht_config = */{
322  /*.node_config = */{
323  /*.node_id = */{},
324  /*.network = */network,
325  /*.is_bootstrap = */false,
326  /*.maintain_storage*/false
327  },
328  /*.id = */identity
329  },
330  /*.threaded = */threaded,
331  /*.proxy_server = */"",
332  /*.push_node_id = */""
333  });
334  }
335  void run(in_port_t port, Config config);
336 
345  void run(const SockAddr& local4, const SockAddr& local6, Config config);
346 
350  void run(const char* ip4, const char* ip6, const char* service, Config config);
351 
352  void setOnStatusChanged(StatusCallback&& cb) {
353  statusCb = std::move(cb);
354  }
355 
361  time_point loop() {
362  std::lock_guard<std::mutex> lck(dht_mtx);
363  time_point wakeup = time_point::min();
364  try {
365  wakeup = loop_();
366  } catch (const dht::SocketException& e) {
367  startNetwork(bound4, bound6);
368  }
369  return wakeup;
370  }
371 
375  void shutdown(ShutdownCallback cb);
376 
382  void join();
383 
384  void setProxyServer(const std::string& proxy, const std::string& pushNodeId = "");
385 
391  void enableProxy(bool proxify);
392 
393  /* Push notification methods */
394 
398  void setPushNotificationToken(const std::string& token);
399 
403  void pushNotificationReceived(const std::map<std::string, std::string>& data);
404 
405  /* Proxy server mothods */
406  void forwardAllMessages(bool forward);
407 
408 private:
409  static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10};
410 
417  void tryBootstrapContinuously();
418 
419  void startNetwork(const SockAddr sin4, const SockAddr sin6);
420  time_point loop_();
421 
422  NodeStatus getStatus() const {
423  return std::max(status4, status6);
424  }
425 
427  std::unique_ptr<SecureDht> dht_;
428 
430  std::unique_ptr<SecureDht> dht_via_proxy_;
431 
433  std::atomic_bool use_proxy {false};
434 
436  Config config_;
437 
441  void resetDht();
445  SecureDht* activeDht() const;
446 
450  struct Listener;
451  std::map<size_t, Listener> listeners_;
452  size_t listener_token_ {1};
453 
454  mutable std::mutex dht_mtx {};
455  std::thread dht_thread {};
456  std::condition_variable cv {};
457 
458  std::thread rcv_thread {};
459  std::mutex sock_mtx {};
460 
461  struct ReceivedPacket {
462  Blob data;
463  SockAddr from;
464  time_point received;
465  };
466  std::queue<ReceivedPacket> rcv {};
467 
469  std::atomic_bool bootstraping {false};
470  /* bootstrap nodes given as (host, service) pairs */
471  std::vector<std::pair<std::string,std::string>> bootstrap_nodes_all {};
472  std::vector<std::pair<std::string,std::string>> bootstrap_nodes {};
473  std::thread bootstrap_thread {};
475  std::mutex bootstrap_mtx {};
476  std::condition_variable bootstrap_cv {};
477 
478  std::queue<std::function<void(SecureDht&)>> pending_ops_prio {};
479  std::queue<std::function<void(SecureDht&)>> pending_ops {};
480  std::mutex storage_mtx {};
481 
482  std::atomic_bool running {false};
483  std::atomic_bool running_network {false};
484 
485  NodeStatus status4 {NodeStatus::Disconnected},
486  status6 {NodeStatus::Disconnected};
487  StatusCallback statusCb {nullptr};
488 
489  int s4 {-1}, s6 {-1};
490  SockAddr bound4 {};
491  SockAddr bound6 {};
492 
494  std::string pushToken_;
495 };
496 
497 }
void NOLOG(char const *, va_list)
Definition: log_enable.h:38
OPENDHT_PUBLIC Blob hash(const Blob &data, size_t hash_length=512/8)
void run(in_port_t port=4222, const crypto::Identity identity={}, bool threaded=false, NetId network=0)
Definition: dhtrunner.h:319
NodeStatus
Definition: callbacks.h:41
const SockAddr & getBound(sa_family_t f=AF_INET) const
Definition: dhtrunner.h:259
in_port_t getBoundPort(sa_family_t f=AF_INET) const
Definition: dhtrunner.h:267
std::vector< uint8_t > Blob
Definition: utils.h:114
Serializable dht::Value filter.
Definition: value.h:739
time_point loop()
Definition: dhtrunner.h:361
Definition: callbacks.h:34