Midge  v3.9.2
Data Processing Framework
test_buffer.cc
Go to the documentation of this file.
1 #include <random>
2 
3 #include <cmath>
4 #include <unistd.h>
5 #include <mutex>
6 #include <thread>
7 
8 #include "message_logger.hh"
9 #include "midge_error.hh"
10 //#include "thread.hh"
11 #include "types.hh"
12 
13 message_declare( testmsg )
14 message_define( testmsg, test, test )
15 
16 namespace midge
17 {
18  namespace test
19  {
20  class stream
21  {
22  public:
23  static const enum_t s_idle;
24  static const enum_t s_started;
25  static const enum_t s_stopped;
26 
27  public:
29  {
30  }
31  virtual ~stream()
32  {
33  }
34 
35  virtual stream& operator>( enum_t& p_state ) = 0;
36  virtual stream& operator<( const enum_t& p_state ) = 0;
37  };
38 
39  const enum_t stream::s_idle = 0;
40  const enum_t stream::s_started = 1;
41  const enum_t stream::s_stopped = 2;
42 
43  template< class x_type >
44  class read_stream :
45  public stream
46  {
47 
48  public:
50  {
51  }
52  virtual ~read_stream()
53  {
54  }
55 
56  virtual read_stream< x_type >& operator>>( const x_type*& p_pointer ) = 0;
57  virtual read_stream< x_type >& operator<<( const x_type*& p_pointer ) = 0;
58  };
59 
60  template< class x_type >
61  class write_stream :
62  public stream
63  {
64  public:
66  {
67  }
68  virtual ~write_stream()
69  {
70  }
71 
72  virtual write_stream< x_type >& operator>>( x_type*& p_pointer ) = 0;
73  virtual write_stream< x_type >& operator<<( x_type*& p_pointer ) = 0;
74  };
75 
76  template< class x_type >
77  class buffer
78  {
79  public:
80  buffer() :
81  f_length( 0 ),
82  f_write_state( stream::s_idle ),
83  f_write_state_mutex(),
84  f_write_stream( NULL ),
85  f_read_count( 0 ),
86  f_read_state( NULL ),
87  f_read_state_mutexes( NULL ),
88  f_read_data( NULL ),
89  f_read_data_mutexes( NULL ),
90  f_read_streams( NULL )
91  {
92  }
94  {
95  delete f_write_stream;
96 
97  for( index_t t_index = 0; t_index < f_length; t_index++ )
98  {
99  delete f_read_state[ t_index ];
100  }
101  delete[] f_read_state;
102 
103  for( index_t t_index = 0; t_index < f_length; t_index++ )
104  {
105  delete f_read_data[ t_index ];
106  }
107  delete[] f_read_data;
108 
109  for( count_t t_read_index = 0; t_read_index < f_read_count; t_read_index++ )
110  {
111  delete[] f_read_state_mutexes[ t_read_index ];
112  delete[] f_read_data_mutexes[ t_read_index ];
113  delete f_read_streams[ t_read_index ];
114  }
115  delete[] f_read_state_mutexes;
116  delete[] f_read_data_mutexes;
117  delete[] f_read_streams;
118  }
119 
120  void initialize( const count_t& p_length, enum_t* (p_state_factory)(), x_type* (*p_data_factory)() )
121  {
122  f_length = p_length;
123  f_read_state = new enum_t*[ p_length ];
124  f_read_data = new x_type*[ p_length ];
125  for( index_t t_index = 0; t_index < p_length; t_index++ )
126  {
127  f_read_state[ t_index ] = (*p_state_factory)();
128  f_read_data[ t_index ] = (*p_data_factory)();
129  }
130  return;
131  }
132 
134  {
135  f_write_stream = new buffer_write_stream( this );
136  return f_write_stream;
137  }
139  {
140  count_t t_new_read_index = f_read_count;
141  count_t t_new_read_count = f_read_count + 1;
142  std::mutex** t_new_read_state_mutexes = new std::mutex*[ t_new_read_count ];
143  std::mutex** t_new_read_data_mutexes = new std::mutex*[ t_new_read_count ];
144  buffer_read_stream** t_new_read_streams = new buffer_read_stream*[ t_new_read_count ];
145 
146  for( index_t t_index = 0; t_index < f_read_count; t_index++ )
147  {
148  t_new_read_state_mutexes[ t_index ] = f_read_state_mutexes[ t_index ];
149  t_new_read_data_mutexes[ t_index ] = f_read_data_mutexes[ t_index ];
150  t_new_read_streams[ t_index ] = f_read_streams[ t_index ];
151  }
152 
153  f_read_count = t_new_read_count;
154 
155  delete[] f_read_state_mutexes;
156  f_read_state_mutexes = t_new_read_state_mutexes;
157  f_read_state_mutexes[ t_new_read_index ] = new std::mutex[ f_length ];
158  f_read_state_mutexes[ t_new_read_index ][ 0 ].lock();
159 
160  delete[] f_read_data_mutexes;
161  f_read_data_mutexes = t_new_read_data_mutexes;
162  f_read_data_mutexes[ t_new_read_index ] = new std::mutex[ f_length ];
163  f_read_data_mutexes[ t_new_read_index ][ 0 ].lock();
164 
165  delete[] f_read_streams;
166  f_read_streams = t_new_read_streams;
167  f_read_streams[ t_new_read_index ] = new buffer_read_stream( this );
168  return f_read_streams[ t_new_read_index ];
169  }
170 
171  protected:
173 
174  protected:
176  public write_stream< x_type >
177  {
178  public:
180  f_buffer( *p_buffer ),
181  f_current_state_index( 0 ),
182  f_next_state_index( 0 ),
183  f_current_data_index( 0 ),
184  f_next_data_index( 0 )
185  {
186  }
188  {
189  }
190 
192  {
193  f_buffer.f_write_state_mutex.lock();
194  p_state = f_buffer.f_write_state;
195  f_buffer.f_write_state_mutex.unlock();
196 
197  return *this;
198  }
199  write_stream< x_type >& operator<( const enum_t& p_state )
200  {
201  if( ++f_next_state_index == f_buffer.f_length )
202  {
203  f_next_state_index = 0;
204  }
205 
206  for( index_t t_index = 0; t_index < f_buffer.f_read_count; t_index++ )
207  {
208  f_buffer.f_read_state_mutexes[ t_index ][ f_next_state_index ].lock();
209  }
210 
211  *(f_buffer.f_read_state[ f_current_state_index ]) = p_state;
212 
213  for( index_t t_index = 0; t_index < f_buffer.f_read_count; t_index++ )
214  {
215  f_buffer.f_read_state_mutexes[ t_index ][ f_current_state_index ].unlock();
216  }
217 
218  f_current_state_index = f_next_state_index;
219 
220  return *this;
221  }
222  write_stream< x_type >& operator>>( x_type*& p_pointer )
223  {
224  if( ++f_next_data_index == f_buffer.f_length )
225  {
226  f_next_data_index = 0;
227  }
228 
229  for( index_t t_index = 0; t_index < f_buffer.f_read_count; t_index++ )
230  {
231  f_buffer.f_read_data_mutexes[ t_index ][ f_next_data_index ].lock();
232  }
233 
234  p_pointer = f_buffer.f_read_data[ f_current_data_index ];
235 
236  return *this;
237  }
239  {
240  for( index_t t_index = 0; t_index < f_buffer.f_read_count; t_index++ )
241  {
242  f_buffer.f_read_data_mutexes[ t_index ][ f_current_data_index ].unlock();
243  }
244 
245  f_current_data_index = f_next_data_index;
246 
247  return *this;
248  }
249 
250  private:
256  };
257 
261 
262  protected:
264  public read_stream< x_type >
265  {
266  public:
267  buffer_read_stream( buffer* p_buffer ) :
268  f_buffer( *p_buffer ),
269  f_stream_index( f_buffer.f_read_count - 1 ),
270  f_current_state_index( 0 ),
271  f_current_data_index( 0 )
272  {
273  }
275  {
276  }
277 
279  {
280  f_buffer.f_read_state_mutexes[ f_stream_index ][ f_current_state_index ].lock();
281 
282  p_state = *(f_buffer.f_read_state[ f_current_state_index ]);
283 
284  f_buffer.f_read_state_mutexes[ f_stream_index ][ f_current_state_index ].unlock();
285 
286  if( ++f_current_state_index == f_buffer.f_length )
287  {
288  f_current_state_index = 0;
289  }
290 
291  return *this;
292  }
293  read_stream< x_type >& operator<( const enum_t& p_state )
294  {
295  f_buffer.f_write_state_mutex.lock();
296  f_buffer.f_write_state = p_state;
297  f_buffer.f_write_state_mutex.unlock();
298 
299  return *this;
300  }
301  read_stream< x_type >& operator>>( const x_type*& p_pointer )
302  {
303  f_buffer.f_read_data_mutexes[ f_stream_index ][ f_current_data_index ].lock();
304 
305  p_pointer = f_buffer.f_read_data[ f_current_data_index ];
306 
307  return *this;
308  }
310  {
311  f_buffer.f_read_data_mutexes[ f_stream_index ][ f_current_data_index ].unlock();
312 
313  if( ++f_current_data_index == f_buffer.f_length )
314  {
315  f_current_data_index = 0;
316  }
317 
318  return *this;
319  }
320 
321  private:
326  };
327 
330  std::mutex** f_read_state_mutexes;
331  x_type** f_read_data;
332  std::mutex** f_read_data_mutexes;
334 
335  };
336 
337  class writer
338  {
339  public:
340  writer( buffer< real_t >* p_buffer, const count_t& p_seed ) :
341  f_stream( *(p_buffer->write()) ),
342  f_seed( p_seed ),
343  f_rng( p_seed )
344  {}
346  {}
347 
348  void execute()
349  {
350 
351  count_t t_count = 0;
352  count_t t_sleep;
353  enum_t t_state;
354  double* t_value;
355  std::uniform_real_distribution<> t_dist_uniform_0_20( 0., 20. );
356  std::uniform_real_distribution<> t_dist_uniform_0_10( 0., 10. );
357  std::uniform_real_distribution<> t_dist_uniform_50k_150k( 50000., 150000. );
358  while( true )
359  {
360  f_stream > t_state;
361  msg_normal( testmsg, "writer <" << f_seed << "> pulling a state of <" << t_state << "> at <" << t_count << ">" );
362  if( t_state == stream::s_stopped )
363  {
364  msg_normal( testmsg, "writer <" << f_seed << "> stopping" );
365  f_stream < stream::s_stopped;
366  break;
367  }
368  if( t_dist_uniform_0_20( f_rng ) < 1. )
369  {
370  msg_normal( testmsg, "writer <" << f_seed << "> initiates stop" );
371  f_stream < stream::s_stopped;
372  break;
373  }
374  f_stream < stream::s_started;
375 
376  f_stream >> t_value;
377  (*t_value) = t_dist_uniform_0_10( f_rng );
378  msg_normal( testmsg, "writer <" << f_seed << "> pushing a value of <" << *t_value << "> at <" << t_count << ">" );
379  f_stream << t_value;
380  t_sleep = (count_t) (round(t_dist_uniform_50k_150k( f_rng )));
381  usleep( t_sleep );
382  t_count++;
383  }
384  return;
385  }
386 
387  private:
390  std::mt19937 f_rng;
391  };
392 
393  class reader
394  {
395  public:
396  reader( buffer< real_t >* p_buffer, const count_t& p_seed ) :
397  f_stream( *(p_buffer->read()) ),
398  f_seed( p_seed ),
399  f_rng( p_seed )
400  {}
402  {}
403 
404  void execute()
405  {
406  count_t t_count = 0;
407  count_t t_sleep;
408  enum_t t_state;
409  const double* t_value;
410  std::uniform_real_distribution<> t_dist_uniform_0_20( 0., 20. );
411  std::uniform_real_distribution<> t_dist_uniform_200k_600k( 200000., 600000. );
412  while( true )
413  {
414 
415  f_stream > t_state;
416  msg_normal( testmsg, " reader <" << f_seed << "> pulling a state of <" << t_state << "> at <" << t_count << ">" );
417  if( t_state == stream::s_stopped )
418  {
419  msg_normal( testmsg, " reader <" << f_seed << "> stopping" );
420  f_stream < stream::s_stopped;
421  break;
422  }
423 
424  if( t_dist_uniform_0_20( f_rng ) < 1. )
425  {
426  msg_normal( testmsg, " reader <" << f_seed << "> initiates stop" );
427  f_stream < stream::s_stopped;
428  break;
429  }
430  //f_stream < stream::s_started;
431 
432  f_stream >> t_value;
433  msg_normal( testmsg, " reader <" << f_seed << "> pulling a value of <" << *t_value << "> at <" << t_count << ">" );
434  f_stream << t_value;
435  t_sleep = (count_t) (round(t_dist_uniform_200k_600k( f_rng )));
436  usleep( t_sleep );
437  t_count++;
438  }
439  return;
440  }
441 
442  private:
445  std::mt19937 f_rng;
446  };
447 
449  {
450  real_t* t_real = new real_t;
451  (*t_real) = 0.;
452  return t_real;
453  }
455  {
456  enum_t* t_state = new enum_t;
457  (*t_state) = stream::s_idle;
458  return t_state;
459  }
460  }
461 }
462 
463 using namespace midge;
464 using namespace midge::test;
465 
466 int main()
467 {
468  buffer< real_t > t_buffer;
469  t_buffer.initialize( 10, &new_state, &new_real );
470 
471  writer t_dan_writer( &t_buffer, 51385 );
472  reader t_katie_reader( &t_buffer, 82284 );
473  reader t_erin_reader( &t_buffer, 112383 );
474  reader t_susanne_reader( &t_buffer, 53083 );
475  reader t_rose_reader( &t_buffer, 31387 );
476 
477  std::thread t_dan_thread( [&](){ t_dan_writer.execute(); } );
478  std::thread t_katie_thread( [&](){ t_katie_reader.execute(); } );
479  std::thread t_erin_thread( [&](){ t_erin_reader.execute(); } );
480  std::thread t_susanne_thread( [&](){ t_susanne_reader.execute(); } );
481  std::thread t_rose_thread( [&](){ t_rose_reader.execute(); } );
482 
483  t_dan_thread.join();
484  t_katie_thread.join();
485  t_erin_thread.join();
486  t_susanne_thread.join();
487  t_rose_thread.join();
488 
489  return 0;
490 }
reader(buffer< real_t > *p_buffer, const count_t &p_seed)
Definition: test_buffer.cc:396
Definition: _buffer.hh:11
std::ostream & operator<<(std::ostream &a_str, const message_line &)
double real_t
Definition: types.hh:18
uint64_t count_t
Definition: types.hh:15
std::mutex ** f_read_state_mutexes
Definition: test_buffer.cc:330
read_stream< x_type > & operator>(enum_t &p_state)
Definition: test_buffer.cc:278
write_stream< real_t > & f_stream
Definition: test_buffer.cc:388
static const enum_t s_stopped
Definition: test_buffer.cc:25
uint16_t enum_t
Definition: types.hh:13
message_declare(testmsg) message_define(testmsg
std::mutex ** f_read_data_mutexes
Definition: test_buffer.cc:332
writer(buffer< real_t > *p_buffer, const count_t &p_seed)
Definition: test_buffer.cc:340
static const enum_t s_started
Definition: test_buffer.cc:24
read_stream< x_type > * read()
Definition: test_buffer.cc:138
void initialize(const count_t &p_length, enum_t *(p_state_factory)(), x_type *(*p_data_factory)())
Definition: test_buffer.cc:120
buffer_read_stream ** f_read_streams
Definition: test_buffer.cc:333
int64_t index_t
Definition: types.hh:16
real_t * new_real()
Definition: test_buffer.cc:448
read_stream< x_type > & operator>>(const x_type *&p_pointer)
Definition: test_buffer.cc:301
enum_t ** f_read_state
Definition: test_buffer.cc:329
message_define(testmsg, test, test) using namespace midge
test
Definition: test_buffer.cc:14
std::mt19937 f_rng
Definition: test_buffer.cc:445
write_stream< x_type > * write()
Definition: test_buffer.cc:133
std::mt19937 f_rng
Definition: test_buffer.cc:390
int main()
Definition: test_buffer.cc:466
enum_t * new_state()
Definition: test_buffer.cc:454
write_stream< x_type > & operator>(enum_t &p_state)
Definition: test_buffer.cc:191
read_stream< real_t > & f_stream
Definition: test_buffer.cc:443
buffer_write_stream * f_write_stream
Definition: test_buffer.cc:260
std::mutex f_write_state_mutex
Definition: test_buffer.cc:259
static const enum_t s_idle
Definition: test_buffer.cc:23
#define msg_normal(x_name, x_content)
write_stream< x_type > & operator>>(x_type *&p_pointer)
Definition: test_buffer.cc:222