Midge  v3.9.2
Data Processing Framework
diptera.cc
Go to the documentation of this file.
1 #include "input.hh"
2 
3 #include <unistd.h>
4 
5 #include "midge_error.hh"
6 #include "bystander.hh"
7 #include "consumer.hh"
8 #include "coremsg.hh"
9 #include "diptera.hh"
10 #include "input.hh"
11 #include "node.hh"
12 #include "output.hh"
13 #include "producer.hh"
14 #include "transformer.hh"
15 
16 #include <chrono>
17 #include <thread>
18 
19 using std::string;
20 
21 namespace midge
22 {
23 
25  scarab::cancelable(),
26  f_nodes(),
27  f_instructables(),
28  f_threads(),
29  f_threads_mutex(),
30  f_run_e_ptr(),
31  f_running_callback( [](){ return; } )
32  {
33  }
35  {
36  reset();
37  }
38 
39  void diptera::add( node* p_node )
40  {
41  string_t t_name = p_node->get_name();
42  node_it_t t_it = f_nodes.find( t_name );
43  if( t_it == f_nodes.end() )
44  {
45  msg_normal( coremsg, "initializing node <" << t_name << ">" << eom );
46  try
47  {
48  p_node->initialize();
49  }
50  catch( std::exception& e )
51  {
52  msg_error( coremsg, "exception caught while initializing node <" << t_name << ">: " << e.what() << eom );
53  throw( e );
54  }
55 
56  f_nodes.insert( node_entry_t( t_name, p_node ) );
57  msg_normal( coremsg, "added node <" << t_name << ">" << eom );
58 
59  instructable* t_inst = dynamic_cast< instructable* >( p_node );
60  if( t_inst != nullptr )
61  {
62  f_instructables.insert( t_inst );
63  }
64  }
65  else
66  {
67  throw error() << "root add found preexisting node with name <" << t_name << ">";
68  }
69  return;
70  }
71 
72  void diptera::connect( const std::string& p_string )
73  {
74  size_t t_pos = p_string.find( s_connector );
75  if( t_pos <= 1 ||
76  t_pos >= p_string.length()-2 ||
77  p_string.find( s_connector, t_pos + s_connector.length() ) != string_t::npos )
78  {
79  throw error() << "connection specification <" << p_string << "> was not formatted correctly: signal_node.signal:slot_node.slot";
80  return;
81  }
82 
83  string_t t_signal_argument = p_string.substr( 0, t_pos );
84  string_t t_slot_argument = p_string.substr( t_pos + s_connector.length(), string_t::npos );
85 
86  t_pos = t_signal_argument.find( s_designator );
87  if( t_pos == 0 ||
88  t_pos == string::npos ||
89  t_signal_argument.find( s_designator, t_pos + s_designator.length() ) != string_t::npos )
90  {
91  throw error() << "signal specification <" << t_signal_argument << "> is not formatted correctly: signal_node.signal";
92  return;
93  }
94 
95  string_t t_signal_node_string = t_signal_argument.substr( 0, t_pos );
96  string_t t_signal_string = t_signal_argument.substr( t_pos + s_designator.length(), string_t::npos );
97 
98  t_pos = t_slot_argument.find( s_designator );
99  if( t_pos == 0 ||
100  t_pos == string::npos ||
101  t_slot_argument.find( s_designator, t_pos + s_designator.length() ) != string_t::npos )
102  {
103  throw error() << "slot specification <" << t_slot_argument << "> is not formatted correctly: slot_node.slot";
104  return;
105  }
106 
107  string_t t_slot_node_string = t_slot_argument.substr( 0, t_pos );
108  string_t t_slot_string = t_slot_argument.substr( t_pos + s_designator.length(), string_t::npos );
109 
110  node_it_t t_node_it = f_nodes.find( t_signal_node_string );
111  if( t_node_it == f_nodes.end() )
112  {
113  throw error() << "signal node was not found <" << t_signal_node_string << ">";
114  return;
115  }
116  node* t_signal_node = t_node_it->second;
117 
118  t_node_it = f_nodes.find( t_slot_node_string );
119  if( t_node_it == f_nodes.end() )
120  {
121  throw error() << "slot node was not found <" << t_slot_node_string << ">";
122  return;
123  }
124  node* t_slot_node = t_node_it->second;
125 
126  signal* t_signal = t_signal_node->signal_ptr( t_signal_string );
127  if( t_signal == NULL )
128  {
129  throw error() << "signal <" << t_signal_string << "> was not found for node <" << t_signal_node << ">";
130  return;
131  }
132 
133  slot* t_slot = t_slot_node->slot_ptr( t_slot_string );
134  if( t_slot == NULL )
135  {
136  throw error() << "slot <" << t_slot_string << "> was not found for node <" << t_slot_node_string << ">";
137  return;
138  }
139 
140  t_signal->connect( t_slot );
141 
142  msg_normal( coremsg, "connected signal --> slot: " << t_signal_node_string << "." << t_signal_string << " --> " << t_slot_node_string << "." << t_slot_string << eom );
143 
144  return;
145  }
146 
147  void diptera::join( const string_t& p_string )
148  {
149  string_t t_first_node_string;
150  node* t_first_node;
151  string_t t_first_out_string;
152  output* t_first_out;
153  string_t t_second_node_string;
154  node* t_second_node;
155  string_t t_second_in_string;
156  input* t_second_in;
157 
158  size_t t_first_pos;
159  size_t t_second_pos;
160  string_t t_first_argument;
161  string_t t_second_argument;
162 
163  t_first_pos = p_string.find( s_connector );
164  if( t_first_pos != string_t::npos )
165  {
166  t_second_pos = p_string.find( s_connector, t_first_pos + s_connector.length() );
167  if( t_second_pos == string_t::npos )
168  {
169  t_first_argument = p_string.substr( 0, t_first_pos );
170  t_second_argument = p_string.substr( t_first_pos + 1, string_t::npos );
171 
172  t_first_pos = t_first_argument.find( s_designator );
173  if( t_first_pos != string_t::npos )
174  {
175  t_second_pos = t_first_argument.find( s_designator, t_first_pos + s_designator.length() );
176  if( t_second_pos == string_t::npos )
177  {
178  t_first_node_string = t_first_argument.substr( 0, t_first_pos );
179  t_first_out_string = t_first_argument.substr( t_first_pos + 1, string_t::npos );
180  }
181  else
182  {
183  throw error() << "root join found multiple designators in first argument <" << t_first_argument << ">";
184  return;
185  }
186  }
187 
188  t_first_pos = t_second_argument.find( s_designator );
189  if( t_first_pos != string_t::npos )
190  {
191  t_second_pos = t_second_argument.find( s_designator, t_first_pos + s_designator.length() );
192  if( t_second_pos == string_t::npos )
193  {
194  t_second_node_string = t_second_argument.substr( 0, t_first_pos );
195  t_second_in_string = t_second_argument.substr( t_first_pos + 1, string_t::npos );
196  }
197  else
198  {
199  throw error() << "root join found multiple designators in second argument <" << t_second_argument << ">";
200  return;
201  }
202  }
203 
204  if( t_first_out_string.empty() != t_second_in_string.empty() )
205  {
206  throw error() << "root join must link either by stream or pointer";
207  return;
208  }
209 
210  node_it_t t_first_it = f_nodes.find( t_first_node_string );
211  if( t_first_it == f_nodes.end() )
212  {
213  throw error() << "root join found no first node with name <" << t_first_node_string << ">";
214  return;
215  }
216  t_first_node = t_first_it->second;
217 
218  node_it_t t_second_it = f_nodes.find( t_second_node_string );
219  if( t_second_it == f_nodes.end() )
220  {
221  throw error() << "root join found no second node with name <" << t_second_node_string << ">";
222  return;
223  }
224  t_second_node = t_second_it->second;
225 
226  if( t_first_out_string.empty() )
227  {
228  // joining nodes by pointer
229  t_first_node->node_ptr( t_second_node, t_second_node_string );
230 
231  msg_normal( coremsg, "joined <" << t_first_node_string << "> with <" << t_second_node_string << ">" << eom );
232  }
233  else
234  {
235 
236  t_first_out = t_first_node->out( t_first_out_string );
237  if( t_first_out == NULL )
238  {
239  throw error() << "root join found no first out with name <" << t_first_out_string << "> in node with name <" << t_first_node_string << ">";
240  return;
241  }
242 
243  t_second_in = t_second_node->in( t_second_in_string );
244  if( t_second_in == NULL )
245  {
246  throw error() << "root join found no second in with name <" << t_second_in_string << "> in node with name <" << t_second_node_string << ">";
247  return;
248  }
249  t_second_in->set( t_first_out->get() );
250 
251  msg_normal( coremsg, "joined <" << t_first_node_string << "." << t_first_out_string << "> with <" << t_second_node_string << "." << t_second_in_string << ">" << eom );
252  }
253 
254  return;
255  }
256  else
257  {
258  throw error() << "root join found multiple connectors in string_t <" << p_string << ">";
259  return;
260  }
261  }
262  else
263  {
264  throw error() << "root join found no connector in string_t <" << p_string << ">";
265  return;
266  }
267  }
268 
269  std::exception_ptr diptera::run( const string_t& p_string )
270  {
271  size_t t_start_pos;
272  size_t t_separator_pos;
273  string t_argument;
274  string t_node_name;
275  node_it_t t_node_it;
276  node* t_node;
277 
278  t_start_pos = 0;
279  t_argument = p_string;
280 
281  std::unique_lock< std::mutex >t_threads_lock( f_threads_mutex );
282 
283  // add nodes to the instructables map
284  for( node_cit_t t_it = f_nodes.begin(); t_it != f_nodes.end(); ++t_it )
285  {
286  instructable* t_inst = dynamic_cast< instructable* >( t_it->second );
287  if( t_inst != nullptr )
288  {
289  f_instructables.insert( t_inst );
290  }
291  }
292 
293  // run nodes specified in the string
294  while( true )
295  {
296  t_separator_pos = t_argument.find( s_separator, t_start_pos );
297 
298  t_node_name = t_argument.substr( t_start_pos, t_separator_pos - t_start_pos );
299  t_argument = t_argument.substr( t_separator_pos + s_separator.size(), string_t::npos );
300 
301  if( t_node_name.size() == 0 )
302  {
303  return std::make_exception_ptr( error() << "root run found node name with length zero in argument <" << p_string << ">" );
304  }
305 
306  t_node_it = f_nodes.find( t_node_name );
307  if( t_node_it == f_nodes.end() )
308  {
309  return std::make_exception_ptr( error() << "root run found no node with name <" << t_node_name << ">" );
310  }
311 
312  msg_normal( coremsg, "creating thread for node <" << t_node_name << ">" << eom );
313  t_node = t_node_it->second;
314  f_threads.push_back( std::thread( &node::execute, t_node, this ) );
315 
316  if( t_separator_pos == string_t::npos )
317  {
318  break;
319  }
320  }
321 
322  // delay to allow the threads to spin up
323  std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
324 
326 
327  t_threads_lock.unlock();
328 
329  msg_normal( coremsg, "waiting for threads to finish..." << eom );
330  for( thread_it_t t_it = f_threads.begin(); t_it != f_threads.end(); t_it++ )
331  {
332  t_it->join();
333  }
334 
335  t_threads_lock.lock();
336 
337  msg_normal( coremsg, "threads finished" << eom );
338 
339  f_threads.clear();
340 
341  // clear instructables set
342  f_instructables.clear();
343 
344  return f_run_e_ptr;
345  }
346 
347  void diptera::throw_ex( std::exception_ptr e_ptr )
348  {
349  try
350  {
351  if( e_ptr )
352  {
353  std::rethrow_exception( e_ptr );
354  }
355  }
356  catch( const midge::error& e )
357  {
358  msg_error( coremsg, "midge error thrown: " << e.what() << eom );
359  }
360  catch( const midge::node_fatal_error& e )
361  {
362  msg_error( coremsg, "fatal error thrown: " << e.what() << eom );
363  }
364  catch( const midge::node_nonfatal_error& e )
365  {
366  msg_error( coremsg, "non-fatal error thrown: " << e.what() << eom );
367  }
368  catch( const std::exception& e )
369  {
370  msg_error( coremsg, "non-node exception thrown: " << e.what() << eom );
371  }
372  msg_debug( coremsg, "canceling run and setting exception pointer" << eom );
373  cancel();
374  f_run_e_ptr = e_ptr;
375  return;
376  }
377 
378 
380  {
381  std::unique_lock< std::mutex >t_threads_lock( f_threads_mutex );
382 
383  node* t_node;
384  node_it_t t_it;
385  for( t_it = f_nodes.begin(); t_it != f_nodes.end(); t_it++ )
386  {
387  t_node = t_it->second;
388  t_node->finalize();
389  }
390 
391  for( t_it = f_nodes.begin(); t_it != f_nodes.end(); t_it++ )
392  {
393  t_node = t_it->second;
394  delete (t_node);
395  }
396  f_nodes.clear();
397 
398  f_instructables.clear();
399 
400  return;
401  }
402 
404  {
405  std::unique_lock< std::mutex >t_threads_lock( f_threads_mutex );
406 
407  for( inst_it_t t_it = f_instructables.begin(); t_it != f_instructables.end(); ++t_it )
408  {
409  (*t_it)->instruct( p_inst );
410  }
411 
412  return;
413  }
414 
415  void diptera::do_cancellation( int a_code )
416  {
417  std::unique_lock< std::mutex >t_threads_lock( f_threads_mutex );
418 
419  // cancel producers first
420  msg_debug( coremsg, "Canceling nodes: producers" << eom );
421  for( node_it_t t_it = f_nodes.begin(); t_it != f_nodes.end(); t_it++ )
422  {
423  if( dynamic_cast< producer* >( t_it->second ) != nullptr )
424  {
425  msg_debug( coremsg, "Canceling " << t_it->second->get_name() << eom );
426  t_it->second->cancel( a_code );
427  }
428  }
429 
430  // This delay is added to give the producers a chance to stop the chain(s) of nodes.
431  // Without this, shutting down midge is somewhat unstable and has resulted in deadlocked threads.
432  std::this_thread::sleep_for( std::chrono::milliseconds( 500 ) );
433 
434  // cancel transformers second
435  msg_debug( coremsg, "Canceling nodes: transformers" << eom );
436  for( node_it_t t_it = f_nodes.begin(); t_it != f_nodes.end(); t_it++ )
437  {
438  if( dynamic_cast< transformer* >( t_it->second ) != nullptr )
439  {
440  msg_debug( coremsg, "Canceling " << t_it->second->get_name() << eom );
441  t_it->second->cancel();
442  }
443  }
444  // cancel consumers third
445  msg_debug( coremsg, "Canceling nodes: consumers" << eom );
446  for( node_it_t t_it = f_nodes.begin(); t_it != f_nodes.end(); t_it++ )
447  {
448  if( dynamic_cast< consumer* >( t_it->second ) != nullptr )
449  {
450  msg_debug( coremsg, "Canceling " << t_it->second->get_name() << eom );
451  t_it->second->cancel();
452  }
453  }
454  // cancel bystanders fourth
455  msg_debug( coremsg, "Canceling nodes: bystanders" << eom );
456  for( node_it_t t_it = f_nodes.begin(); t_it != f_nodes.end(); t_it++ )
457  {
458  if( dynamic_cast< bystander* >( t_it->second ) != nullptr )
459  {
460  msg_debug( coremsg, "Canceling " << t_it->second->get_name() << eom );
461  t_it->second->cancel();
462  }
463  }
464  return;
465  }
466 
468  {
469  std::unique_lock< std::mutex >t_threads_lock( f_threads_mutex );
470 
471  for( node_it_t t_it = f_nodes.begin(); t_it != f_nodes.end(); t_it++ )
472  {
473  t_it->second->reset_cancel();
474  }
475 
476  return;
477  }
478 
479 
480  const string_t diptera::s_connector = string_t( ":" );
481  const string_t diptera::s_designator = string_t( "." );
482  const string_t diptera::s_separator = string_t( ":" );
483 
484  const std::string& diptera::connector()
485  {
486  return s_connector;
487  }
488 
489  const std::string& diptera::designator()
490  {
491  return s_designator;
492  }
493 
494  const std::string& diptera::separator()
495  {
496  return s_separator;
497  }
498 
499 }
static const std::string & connector()
Definition: diptera.cc:484
static const message_end eom
Definition: _buffer.hh:11
virtual void do_cancellation(int a_code)
Definition: diptera.cc:415
void throw_ex(std::exception_ptr e_ptr)
To be used by running nodes to throw an exception.
Definition: diptera.cc:347
static const std::string & designator()
Definition: diptera.cc:489
inst_set_t::iterator inst_it_t
Definition: diptera.hh:74
void instruct(instruction p_inst)
Definition: diptera.cc:403
signal * signal_ptr(const std::string &p_label)
Definition: node.cc:62
input * in(const std::string &p_label)
Definition: node.cc:42
virtual void execute(diptera *)=0
virtual unsigned connect(slot *p_slot)=0
node * node_ptr(const std::string &p_label)
Definition: node.cc:32
node_map_t::iterator node_it_t
Definition: diptera.hh:67
static const std::string s_designator
Definition: diptera.hh:92
node_map_t f_nodes
Definition: diptera.hh:71
thread_vector_t f_threads
Definition: diptera.hh:84
static const std::string s_separator
Definition: diptera.hh:93
void connect(const std::string &p_string)
Connect a signal to a slot: signal_node.signal:slot_node.slot.
Definition: diptera.cc:72
virtual void do_reset_cancellation()
Definition: diptera.cc:467
thread_vector_t::iterator thread_it_t
Definition: diptera.hh:81
std::function< void() > f_running_callback
Definition: diptera.hh:89
std::string string_t
Definition: types.hh:21
void reset()
Definition: diptera.cc:379
node_map_t::const_iterator node_cit_t
Definition: diptera.hh:68
std::exception_ptr f_run_e_ptr
Definition: diptera.hh:87
static const std::string & separator()
Definition: diptera.cc:494
#define msg_debug(x_name, x_content)
static const std::string s_connector
Definition: diptera.hh:91
node_map_t::value_type node_entry_t
Definition: diptera.hh:69
std::mutex f_threads_mutex
Definition: diptera.hh:85
output * out(const std::string &p_label)
Definition: node.cc:52
void join(const std::string &p_string)
Join one node to another.
Definition: diptera.cc:147
std::exception_ptr run(const std::string &p_string)
Definition: diptera.cc:269
void add(node *p_node)
Add a node.
Definition: diptera.cc:39
virtual void initialize()=0
inst_set_t f_instructables
Definition: diptera.hh:78
#define msg_normal(x_name, x_content)
slot * slot_ptr(const std::string &p_label)
Definition: node.cc:72
virtual void finalize()=0
#define msg_error(x_name, x_content)