39 const enum_t stream::s_idle = 0;
40 const enum_t stream::s_started = 1;
41 const enum_t stream::s_stopped = 2;
43 template<
class x_type >
60 template<
class x_type >
76 template<
class x_type >
82 f_write_state(
stream::s_idle ),
83 f_write_state_mutex(),
84 f_write_stream( NULL ),
87 f_read_state_mutexes( NULL ),
89 f_read_data_mutexes( NULL ),
90 f_read_streams( NULL )
95 delete f_write_stream;
97 for(
index_t t_index = 0; t_index < f_length; t_index++ )
99 delete f_read_state[ t_index ];
101 delete[] f_read_state;
103 for(
index_t t_index = 0; t_index < f_length; t_index++ )
105 delete f_read_data[ t_index ];
107 delete[] f_read_data;
109 for(
count_t t_read_index = 0; t_read_index < f_read_count; t_read_index++ )
111 delete[] f_read_state_mutexes[ t_read_index ];
112 delete[] f_read_data_mutexes[ t_read_index ];
113 delete f_read_streams[ t_read_index ];
115 delete[] f_read_state_mutexes;
116 delete[] f_read_data_mutexes;
117 delete[] f_read_streams;
123 f_read_state =
new enum_t*[ p_length ];
124 f_read_data =
new x_type*[ p_length ];
125 for(
index_t t_index = 0; t_index < p_length; t_index++ )
127 f_read_state[ t_index ] = (*p_state_factory)();
128 f_read_data[ t_index ] = (*p_data_factory)();
136 return f_write_stream;
140 count_t t_new_read_index = f_read_count;
141 count_t t_new_read_count = f_read_count + 1;
142 std::mutex** t_new_read_state_mutexes =
new std::mutex*[ t_new_read_count ];
143 std::mutex** t_new_read_data_mutexes =
new std::mutex*[ t_new_read_count ];
146 for(
index_t t_index = 0; t_index < f_read_count; t_index++ )
148 t_new_read_state_mutexes[ t_index ] = f_read_state_mutexes[ t_index ];
149 t_new_read_data_mutexes[ t_index ] = f_read_data_mutexes[ t_index ];
150 t_new_read_streams[ t_index ] = f_read_streams[ t_index ];
153 f_read_count = t_new_read_count;
155 delete[] f_read_state_mutexes;
156 f_read_state_mutexes = t_new_read_state_mutexes;
157 f_read_state_mutexes[ t_new_read_index ] =
new std::mutex[ f_length ];
158 f_read_state_mutexes[ t_new_read_index ][ 0 ].lock();
160 delete[] f_read_data_mutexes;
161 f_read_data_mutexes = t_new_read_data_mutexes;
162 f_read_data_mutexes[ t_new_read_index ] =
new std::mutex[ f_length ];
163 f_read_data_mutexes[ t_new_read_index ][ 0 ].lock();
165 delete[] f_read_streams;
166 f_read_streams = t_new_read_streams;
168 return f_read_streams[ t_new_read_index ];
180 f_buffer( *p_buffer ),
181 f_current_state_index( 0 ),
182 f_next_state_index( 0 ),
183 f_current_data_index( 0 ),
184 f_next_data_index( 0 )
193 f_buffer.f_write_state_mutex.lock();
194 p_state = f_buffer.f_write_state;
195 f_buffer.f_write_state_mutex.unlock();
201 if( ++f_next_state_index == f_buffer.f_length )
203 f_next_state_index = 0;
206 for(
index_t t_index = 0; t_index < f_buffer.f_read_count; t_index++ )
208 f_buffer.f_read_state_mutexes[ t_index ][ f_next_state_index ].lock();
211 *(f_buffer.f_read_state[ f_current_state_index ]) = p_state;
213 for(
index_t t_index = 0; t_index < f_buffer.f_read_count; t_index++ )
215 f_buffer.f_read_state_mutexes[ t_index ][ f_current_state_index ].unlock();
218 f_current_state_index = f_next_state_index;
224 if( ++f_next_data_index == f_buffer.f_length )
226 f_next_data_index = 0;
229 for(
index_t t_index = 0; t_index < f_buffer.f_read_count; t_index++ )
231 f_buffer.f_read_data_mutexes[ t_index ][ f_next_data_index ].lock();
234 p_pointer = f_buffer.f_read_data[ f_current_data_index ];
240 for(
index_t t_index = 0; t_index < f_buffer.f_read_count; t_index++ )
242 f_buffer.f_read_data_mutexes[ t_index ][ f_current_data_index ].unlock();
245 f_current_data_index = f_next_data_index;
268 f_buffer( *p_buffer ),
269 f_stream_index( f_buffer.f_read_count - 1 ),
270 f_current_state_index( 0 ),
271 f_current_data_index( 0 )
280 f_buffer.f_read_state_mutexes[ f_stream_index ][ f_current_state_index ].lock();
282 p_state = *(f_buffer.f_read_state[ f_current_state_index ]);
284 f_buffer.f_read_state_mutexes[ f_stream_index ][ f_current_state_index ].unlock();
286 if( ++f_current_state_index == f_buffer.f_length )
288 f_current_state_index = 0;
295 f_buffer.f_write_state_mutex.lock();
296 f_buffer.f_write_state = p_state;
297 f_buffer.f_write_state_mutex.unlock();
303 f_buffer.f_read_data_mutexes[ f_stream_index ][ f_current_data_index ].lock();
305 p_pointer = f_buffer.f_read_data[ f_current_data_index ];
311 f_buffer.f_read_data_mutexes[ f_stream_index ][ f_current_data_index ].unlock();
313 if( ++f_current_data_index == f_buffer.f_length )
315 f_current_data_index = 0;
341 f_stream( *(p_buffer->write()) ),
355 std::uniform_real_distribution<> t_dist_uniform_0_20( 0., 20. );
356 std::uniform_real_distribution<> t_dist_uniform_0_10( 0., 10. );
357 std::uniform_real_distribution<> t_dist_uniform_50k_150k( 50000., 150000. );
361 msg_normal( testmsg,
"writer <" << f_seed <<
"> pulling a state of <" << t_state <<
"> at <" << t_count <<
">" );
362 if( t_state == stream::s_stopped )
364 msg_normal( testmsg,
"writer <" << f_seed <<
"> stopping" );
365 f_stream < stream::s_stopped;
368 if( t_dist_uniform_0_20( f_rng ) < 1. )
370 msg_normal( testmsg,
"writer <" << f_seed <<
"> initiates stop" );
371 f_stream < stream::s_stopped;
374 f_stream < stream::s_started;
377 (*t_value) = t_dist_uniform_0_10( f_rng );
378 msg_normal( testmsg,
"writer <" << f_seed <<
"> pushing a value of <" << *t_value <<
"> at <" << t_count <<
">" );
380 t_sleep = (
count_t) (round(t_dist_uniform_50k_150k( f_rng )));
397 f_stream( *(p_buffer->read()) ),
409 const double* t_value;
410 std::uniform_real_distribution<> t_dist_uniform_0_20( 0., 20. );
411 std::uniform_real_distribution<> t_dist_uniform_200k_600k( 200000., 600000. );
416 msg_normal( testmsg,
" reader <" << f_seed <<
"> pulling a state of <" << t_state <<
"> at <" << t_count <<
">" );
417 if( t_state == stream::s_stopped )
419 msg_normal( testmsg,
" reader <" << f_seed <<
"> stopping" );
420 f_stream < stream::s_stopped;
424 if( t_dist_uniform_0_20( f_rng ) < 1. )
426 msg_normal( testmsg,
" reader <" << f_seed <<
"> initiates stop" );
427 f_stream < stream::s_stopped;
433 msg_normal( testmsg,
" reader <" << f_seed <<
"> pulling a value of <" << *t_value <<
"> at <" << t_count <<
">" );
435 t_sleep = (
count_t) (round(t_dist_uniform_200k_600k( f_rng )));
457 (*t_state) = stream::s_idle;
463 using namespace midge;
471 writer t_dan_writer( &t_buffer, 51385 );
472 reader t_katie_reader( &t_buffer, 82284 );
473 reader t_erin_reader( &t_buffer, 112383 );
474 reader t_susanne_reader( &t_buffer, 53083 );
475 reader t_rose_reader( &t_buffer, 31387 );
477 std::thread t_dan_thread( [&](){ t_dan_writer.
execute(); } );
478 std::thread t_katie_thread( [&](){ t_katie_reader.
execute(); } );
479 std::thread t_erin_thread( [&](){ t_erin_reader.
execute(); } );
480 std::thread t_susanne_thread( [&](){ t_susanne_reader.
execute(); } );
481 std::thread t_rose_thread( [&](){ t_rose_reader.
execute(); } );
484 t_katie_thread.join();
485 t_erin_thread.join();
486 t_susanne_thread.join();
487 t_rose_thread.join();
reader(buffer< real_t > *p_buffer, const count_t &p_seed)
count_t f_current_data_index
std::ostream & operator<<(std::ostream &a_str, const message_line &)
count_t f_current_state_index
std::mutex ** f_read_state_mutexes
read_stream< x_type > & operator>(enum_t &p_state)
write_stream< real_t > & f_stream
static const enum_t s_stopped
message_declare(testmsg) message_define(testmsg
std::mutex ** f_read_data_mutexes
count_t f_next_state_index
writer(buffer< real_t > *p_buffer, const count_t &p_seed)
static const enum_t s_started
read_stream< x_type > * read()
virtual ~buffer_write_stream()
void initialize(const count_t &p_length, enum_t *(p_state_factory)(), x_type *(*p_data_factory)())
buffer_read_stream ** f_read_streams
read_stream< x_type > & operator>>(const x_type *&p_pointer)
message_define(testmsg, test, test) using namespace midge
count_t f_next_data_index
write_stream< x_type > * write()
buffer_read_stream(buffer *p_buffer)
write_stream< x_type > & operator>(enum_t &p_state)
read_stream< real_t > & f_stream
buffer_write_stream(buffer *p_buffer)
buffer_write_stream * f_write_stream
std::mutex f_write_state_mutex
virtual ~buffer_read_stream()
static const enum_t s_idle
#define msg_normal(x_name, x_content)
count_t f_current_data_index
write_stream< x_type > & operator>>(x_type *&p_pointer)
count_t f_current_state_index