YAMI4 C++
agent_impl.h
1 // Copyright Maciej Sobczak 2008-2019.
2 // This file is part of YAMI4.
3 //
4 // YAMI4 is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // YAMI4 is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with YAMI4. If not, see <http://www.gnu.org/licenses/>.
16 
17 #ifndef YAMICPP_AGENT_IMPL_H_INCLUDED
18 #define YAMICPP_AGENT_IMPL_H_INCLUDED
19 
20 #include "agent_impl_base.h"
21 #include "id_generator.h"
22 #include "incoming_message_info.h"
23 #include "incoming_message_queue.h"
24 #include "name_resolver.h"
25 #include "options.h"
26 #include "outgoing_message.h"
27 #include "outgoing_message_manager.h"
28 #include "water_flow_manager.h"
29 #include <yami4-core/agent.h>
30 #include <map>
31 #include <string>
32 #include <vector>
33 
34 // selected per platform
35 #include <mutex.h>
36 #include <flag.h>
37 
38 namespace yami
39 {
40 
41 class event_callback;
42 class parameters;
43 class serializable;
44 
45 namespace details
46 {
47 
48 class incoming_message_dispatcher_base;
49 
50 class agent_impl : public agent_impl_base
51 {
52 public:
53 
54  agent_impl(const parameters & options);
55 
56  agent_impl(const parameters & options, event_callback & event_listener);
57 
58  ~agent_impl();
59 
60  std::string add_listener(const std::string & listener);
61  void remove_listener(const std::string & listener);
62 
63  void open_connection(const std::string & target);
64  void open_connection(const std::string & target,
65  const parameters & options);
66 
67  std::unique_ptr<outgoing_message> send(
68  const std::string & target,
69  const std::string & object_name,
70  const std::string & message_name,
71  const serializable & content,
72  std::size_t priority,
73  bool auto_connect);
74 
75  void send(
76  outgoing_message & message,
77  const std::string & target,
78  const std::string & object_name,
79  const std::string & message_name,
80  const serializable & content,
81  std::size_t priority,
82  bool auto_connect);
83 
84  bool clean_outgoing_message_callback(long long id);
85 
86  void send_one_way(const std::string & target,
87  const std::string & object_name,
88  const std::string & message_name,
89  const serializable & content,
90  std::size_t priority,
91  bool auto_connect);
92 
93  void send_reply(const std::string & source,
94  long long message_id, const serializable & body,
95  std::size_t priority);
96 
97  void send_rejection(const std::string & source,
98  long long message_id, const std::string & reason,
99  std::size_t priority);
100 
101  void close_connection(const std::string & target, std::size_t priority);
102 
103  void hard_close_connection(const std::string & target);
104 
105  virtual void register_object(
106  const std::string & object_name,
107  std::unique_ptr<incoming_message_dispatcher_base> & object);
108 
109  void unregister_object(const std::string & object_name);
110 
111  virtual long long send(
112  std::unique_ptr<outgoing_message_dispatcher_base> &
113  outgoing_message_callback,
114  const std::string & target,
115  const std::string & object_name,
116  const std::string & message_name,
117  const serializable & content,
118  std::size_t priority,
119  bool auto_connect);
120 
121  virtual void register_connection_event_monitor(
122  std::unique_ptr<connection_event_dispatcher_base> & monitor);
123 
124  virtual void register_io_error_logger(
125  std::unique_ptr<io_error_dispatcher_base> & logger);
126 
127  void report_connection_event(
128  const char * name, connection_event event);
129 
130  // executed as a callback when the incoming message arrives
131  void queue_incoming(std::unique_ptr<incoming_message_info> & incoming);
132 
133  // executed as a callback when the reply arrives
134  void report_replied(
135  long long message_id, std::unique_ptr<parameters> & body);
136  void report_replied(
137  long long message_id, std::unique_ptr<std::vector<char> > & raw_buffer);
138 
139  // executed as a callback when the rejection/exception arrives
140  void report_rejected(long long message_id, const std::string & reason);
141 
142  // executed from separate thread, dispatches queued incoming messages
143  void do_message_dispatching(std::size_t dispatcher_index);
144 
145  // executed from dedicated worker thread, runs the I/O event loop
146  void do_event_loop();
147 
148  void get_outgoing_flow_state(std::size_t & current_level,
149  std::size_t & high_water_mark, std::size_t & low_water_mark) const;
150 
151  void decrease_outgoing_flow();
152 
153  void get_channel_usage(int & max_allowed, int & used);
154 
155  std::size_t get_pending_outgoing_bytes(const std::string & target);
156 
157  cpp_options options_;
158 
159 private:
160  agent_impl(const agent_impl &);
161  void operator=(const agent_impl &);
162 
163  void init(const parameters & options);
164  void clean();
165 
166  enum message_creation_policy
167  {
168  none,
169  create_new, // new out_msg object is created dynamically
170 
171  replace, // user-provided object is reinitialized in-place
172 
173  helper, // a helper outgoing_message is created to handle
174  // one-way interaction with forced wait
175 
176  keep_for_callback // out_msg object is created, stored for callback
177  // and destroyed after the callback is delivered
178  };
179 
180  std::unique_ptr<outgoing_message> do_send(
181  const std::string & target,
182  const std::string & object_name,
183  const std::string & message_name,
184  const serializable & content,
185  std::size_t priority,
186  bool auto_connect,
187  bool one_way,
188  std::unique_ptr<outgoing_message_dispatcher_base> &
189  outgoing_message_callback,
190  outgoing_message * message,
191  message_creation_policy message_create,
192  long long * out_message_id = NULL);
193 
194  std::unique_ptr<outgoing_message> do_send_to_single_target(
195  const std::string & target,
196  const core::parameters & header,
197  const serializable & content,
198  std::size_t priority,
199  long long message_id,
200  bool auto_connect,
201  bool one_way, bool wait_for_transmission, bool wait_for_completion,
202  std::unique_ptr<outgoing_message_dispatcher_base> &
203  outgoing_message_callback,
204  outgoing_message * message,
205  message_creation_policy message_create);
206 
207  core::channel_descriptor make_sure_channel_exists(
208  const std::string & target, bool auto_connect,
209  const parameters * overriding_options = NULL);
210 
211  void increase_outgoing_flow();
212  void increase_incoming_flow();
213  void decrease_incoming_flow();
214 
215  void stop_worker_thread();
216  void stop_dispatcher_threads(std::size_t num_of_threads);
217 
218  core::agent agent_;
219 
220  outgoing_message_manager outgoing_manager_;
221 
222  water_flow_manager outgoing_flow_manager_;
223  flag outgoing_flow_allowed_;
224 
225  incoming_message_queue incoming_queue_;
226  flag * dispatcher_stopped_flags_; // array
227 
228  water_flow_manager incoming_flow_manager_;
229  bool allow_incoming_traffic_;
230  mutex allow_incoming_traffic_mtx_;
231 
232  std::unique_ptr<connection_event_dispatcher_base> connection_event_monitor_;
233  std::unique_ptr<io_error_dispatcher_base> io_error_logger_;
234  event_callback * event_listener_;
235 
236  name_resolver resolver_;
237 
238  id_generator id_gen_;
239 
240  bool worker_stop_request_;
241  mutex worker_stop_mtx_;
242  flag worker_stopped_;
243 };
244 
245 } // namespace details
246 
247 } // namespace yami
248 
249 #endif // YAMICPP_AGENT_IMPL_H_INCLUDED
Namespace devoted to everything related to YAMI4.
Definition: activity_statistics_monitor.cpp:27
connection_event
Kind of connection event.
Definition: connection_event.h:24