Midge  v3.9.2
Data Processing Framework
_buffer.hh
Go to the documentation of this file.
1 #ifndef _midge__buffer_hh_
2 #define _midge__buffer_hh_
3 
4 #include "midge_error.hh"
5 #include "_stream.hh"
6 #include "node.hh"
7 #include "coremsg.hh"
8 
9 #include <mutex>
10 
11 namespace midge
12 {
13 
14  template< class x_type >
15  class _buffer
16  {
17  public:
18  _buffer( node* a_out_node ) :
19  f_length( 0 ),
20  f_out_node( a_out_node ),
21  f_write_stream_name( "out_?" ),
22  f_write_command( stream::s_none ),
23  f_write_mutex(),
24  f_write_stream( NULL ),
25  f_read_count( 0 ),
26  f_read_data( NULL ),
27  f_read_command( NULL ),
28  f_read_mutexes( NULL ),
29  f_mutex_wait_msec( 200 ),
30  f_read_streams( NULL )
31  {
32  }
34  {
35  }
36 
37  public:
38  void set_write_stream_name( const std::string& a_name )
39  {
40  f_write_stream_name = a_name;
41  return;
42  }
43 
44  void initialize( const count_t& p_length )
45  {
46  f_length = p_length;
48  f_read_data = new x_type[ f_length ];
49  f_write_stream = new _write_stream( *this );
50  f_write_stream->label() = f_out_node->get_name() + ":" + f_write_stream_name;
51 
52  return;
53  }
54  template< class x_r, class... x_args >
55  void call( x_r (x_type::*p_member)(x_args...), x_args... args)
56  {
57  for( count_t t_index = 0; t_index < f_length; ++t_index )
58  {
59  (f_read_data[ t_index ].*p_member)( args... );
60  }
61  return;
62  }
63  void finalize()
64  {
66 
67  delete f_write_stream;
68 
69  delete[] f_read_command;
70  delete[] f_read_data;
71 
72  for( count_t t_read_index = 0; t_read_index < f_read_count; t_read_index++ )
73  {
74  IF_STREAM_TIMING_ENABLED( f_read_streams[ t_read_index ]->timer_report(); )
75 
76  delete[] f_read_mutexes[ t_read_index ];
77  delete f_read_streams[ t_read_index ];
78  }
79  delete[] f_read_mutexes;
80  delete[] f_read_streams;
81  }
82 
84  {
85  return f_write_stream;
86  }
88  {
89  count_t t_new_read_index = f_read_count;
90  count_t t_new_read_count = f_read_count + 1;
91  std::timed_mutex** t_new_read_mutexes = new std::timed_mutex*[ t_new_read_count ];
92  _read_stream** t_new_read_streams = new _read_stream*[ t_new_read_count ];
93 
94  for( count_t t_read_index = 0; t_read_index < f_read_count; t_read_index++ )
95  {
96  t_new_read_mutexes[ t_read_index ] = f_read_mutexes[ t_read_index ];
97  t_new_read_streams[ t_read_index ] = f_read_streams[ t_read_index ];
98  }
99 
100  f_read_count = t_new_read_count;
101 
102  delete[] f_read_mutexes;
103  f_read_mutexes = t_new_read_mutexes;
104  f_read_mutexes[ t_new_read_index ] = new std::timed_mutex[ f_length ];
105  f_read_mutexes[ t_new_read_index ][ 0 ].lock();
106  f_read_mutexes[ t_new_read_index ][ f_length - 1 ].lock();
107 
108  delete[] f_read_streams;
109  f_read_streams = t_new_read_streams;
110  f_read_streams[ t_new_read_index ] = new _read_stream( *this );
111  return f_read_streams[ t_new_read_index ];
112  }
113 
114  protected:
116 
117  protected:
119  public _stream< x_type >
120  {
121  public:
122  _write_stream( _buffer& p_buffer ) :
123  _stream< x_type >(),
124  f_buffer( p_buffer ),
125  f_count( 0 ),
126  f_current_index( 0 ),
127  f_next_index( 0 )
128  {
129  }
130  virtual ~_write_stream()
131  {
132  }
133 
134  enum_t get()
135  {
136  enum_t t_command;
137  while( ! f_buffer.f_write_mutex.try_lock_for( std::chrono::milliseconds(f_buffer.f_mutex_wait_msec) ) )
138  {
139  if( scarab::cancelable::is_canceled() || (f_buffer.f_out_node != nullptr && f_buffer.f_out_node->is_canceled()) ) return stream::s_error;
140  //return stream::s_none;
141  }
142  t_command = f_buffer.f_write_command;
143  f_buffer.f_write_mutex.unlock();
144  return t_command;
145  }
146  bool set( enum_t p_command )
147  {
148  if( scarab::cancelable::is_canceled() || (f_buffer.f_out_node != nullptr && f_buffer.f_out_node->is_canceled()) ) return false;
149 
150  f_buffer.f_read_command[ f_current_index ] = p_command;
151 
152  //coremsg( s_debug ) << "OUT STREAM SET: command <" << p_command << "> set for index " << f_current_index << eom;
153 
154  //count_t t_last_next_index = f_next_index;
155  if( ++f_next_index == f_buffer.f_length )
156  {
157  f_next_index = 0;
158  }
159 
160  IF_STREAM_TIMING_ENABLED( if( p_command == stream::s_run ) this->f_timer.increment_begin() );
161 
162  for( count_t t_index = 0; t_index < f_buffer.f_read_count; t_index++ )
163  {
164  while( ! f_buffer.f_read_mutexes[ t_index ][ f_next_index ].try_lock_for( std::chrono::milliseconds(f_buffer.f_mutex_wait_msec) ) )
165  {
166  if( scarab::cancelable::is_canceled() || (f_buffer.f_out_node != nullptr && f_buffer.f_out_node->is_canceled()) ) return false;
167  }
168  }
169 
170  IF_STREAM_TIMING_ENABLED( if( p_command == stream::s_run ) this->f_timer.increment_locked() );
171 
172  for( count_t t_index = 0; t_index < f_buffer.f_read_count; t_index++ )
173  {
174  f_buffer.f_read_mutexes[ t_index ][ f_current_index ].unlock();
175  }
176 
178 
179  if( (++f_count % 100000) == 0 )
180  {
181  msg_normal( coremsg, "write stream <" << this << "> processed <" << f_count << "> requests" << eom );
182  }
183 
184  return true;
185  }
186 
187  x_type* data()
188  {
189  return &(f_buffer.f_read_data[ f_current_index ]);
190  }
191 
193  {
194  return f_current_index;
195  }
196 
197  private:
199  mutable count_t f_count;
202  };
203 
205  std::string f_write_stream_name;
207  std::timed_mutex f_write_mutex;
209 
210  protected:
211  class _read_stream :
212  public _stream< x_type >
213  {
214  public:
215  _read_stream( _buffer& p_buffer ) :
216  _stream< x_type >(),
217  f_buffer( p_buffer ),
218  f_stream_index( f_buffer.f_read_count - 1 ),
219  f_current_index( f_buffer.f_length - 1 ),
220  f_next_index( f_buffer.f_length - 1 )
221  {
222  }
223  virtual ~_read_stream()
224  {
225  }
226 
227  enum_t get()
228  {
229  if( scarab::cancelable::is_canceled() || (f_buffer.f_out_node != nullptr && f_buffer.f_out_node->is_canceled()) ) return s_error;
230 
231  if( ++f_next_index == f_buffer.f_length )
232  {
233  f_next_index = 0;
234  }
235 
236  IF_STREAM_TIMING_ENABLED( if( f_buffer.f_read_command[ f_current_index ] == stream::s_run ) this->f_timer.increment_begin(); )
237 
238  //f_buffer.f_read_mutexes[ f_stream_index ][ f_next_index ].lock();
239  while( ! f_buffer.f_read_mutexes[ f_stream_index ][ f_next_index ].try_lock_for( std::chrono::milliseconds(f_buffer.f_mutex_wait_msec) ) )
240  {
241  if( scarab::cancelable::is_canceled() || (f_buffer.f_out_node != nullptr && f_buffer.f_out_node->is_canceled()) ) return stream::s_error;
242  //return stream::s_none;
243  }
244 
245  IF_STREAM_TIMING_ENABLED( if( f_buffer.f_read_command[ f_current_index ] == stream::s_run ) this->f_timer.increment_locked(); )
246 
247  f_buffer.f_read_mutexes[ f_stream_index ][ f_current_index ].unlock();
248 
250 
251  //coremsg( s_debug ) << "IN STREAM GET: command <" << f_buffer.f_read_command[ f_current_index ] << "> retrieved from index " << f_current_index << eom;
252 
254  }
255  bool set( enum_t p_command )
256  {
257  while( ! f_buffer.f_write_mutex.try_lock_for( std::chrono::milliseconds(f_buffer.f_mutex_wait_msec) ) )
258  {
259  if( scarab::cancelable::is_canceled() || (f_buffer.f_out_node != nullptr && f_buffer.f_out_node->is_canceled()) ) return false;
260  }
261  f_buffer.f_write_command = p_command;
262  f_buffer.f_write_mutex.unlock();
263  return true;
264  }
265 
266  x_type* data()
267  {
268  return &(f_buffer.f_read_data[ f_current_index ]);
269  }
270 
272  {
273  return f_current_index;
274  }
275 
276  private:
281  };
282 
284  x_type* f_read_data;
286  std::timed_mutex** f_read_mutexes;
289 
290  };
291 
292 }
293 
294 #endif
std::timed_mutex ** f_read_mutexes
Definition: _buffer.hh:286
std::timed_mutex f_write_mutex
Definition: _buffer.hh:207
_buffer(node *a_out_node)
Definition: _buffer.hh:18
static const message_end eom
Definition: _buffer.hh:11
enum_t * f_read_command
Definition: _buffer.hh:285
uint64_t count_t
Definition: types.hh:15
_write_stream * f_write_stream
Definition: _buffer.hh:208
#define IF_STREAM_TIMING_ENABLED(x_line)
_stream< x_type > * write()
Definition: _buffer.hh:83
uint16_t enum_t
Definition: types.hh:13
_read_stream(_buffer &p_buffer)
Definition: _buffer.hh:215
count_t get_current_index() const
Definition: _buffer.hh:192
count_t f_read_count
Definition: _buffer.hh:283
count_t f_length
Definition: _buffer.hh:115
count_t f_mutex_wait_msec
Definition: _buffer.hh:287
static const enum_t s_error
Definition: stream.hh:20
void initialize(const count_t &p_length)
Definition: _buffer.hh:44
void call(x_r(x_type::*p_member)(x_args...), x_args... args)
Definition: _buffer.hh:55
std::string f_write_stream_name
Definition: _buffer.hh:205
virtual void timer_report() const
Definition: _stream.hh:45
_stream< x_type > * read()
Definition: _buffer.hh:87
void set_write_stream_name(const std::string &a_name)
Definition: _buffer.hh:38
enum_t f_write_command
Definition: _buffer.hh:206
x_type * f_read_data
Definition: _buffer.hh:284
count_t get_current_index() const
Definition: _buffer.hh:271
_write_stream(_buffer &p_buffer)
Definition: _buffer.hh:122
node * f_out_node
Definition: _buffer.hh:204
_read_stream ** f_read_streams
Definition: _buffer.hh:288
#define msg_normal(x_name, x_content)
void finalize()
Definition: _buffer.hh:63
static const enum_t s_run
Definition: stream.hh:17