dune-spgrid 2.7
messagebuffer.hh
Go to the documentation of this file.
1#ifndef DUNE_GRID_SPGRID_MESSAGEBUFFER_HH
2#define DUNE_GRID_SPGRID_MESSAGEBUFFER_HH
3
4#include <cassert>
5#include <cstddef>
6#include <cstdlib>
7#include <cstring>
8#include <utility>
9#include <vector>
10
11
12#include <dune/common/parallel/communication.hh>
13#include <dune/common/parallel/mpicommunication.hh>
14
15namespace Dune
16{
17
18 // SPBasicPackedMessageWriteBuffer
19 // -------------------------------
20
22 {
24
25 public:
27
29
31 : buffer_( other.buffer_ ),
32 position_( other.position_ ), capacity_( other.capacity_ )
33 {
34 other.initialize();
35 }
36
38
39 This &operator= ( const This & ) = delete;
40
41 This &operator= ( This &&other )
42 {
43 buffer_ = other.buffer_;
44 position_ = other.position_;
45 capacity_ = other.capacity_;
46 other.initialize();
47 return *this;
48 }
49
50 template< class T >
51 void write ( const T &value )
52 {
53 reserve( position_ + sizeof( T ) );
54 std::memcpy( static_cast< char * >( buffer_ ) + position_, &value, sizeof( T ) );
55 position_ += sizeof( T );
56 }
57
58 std::size_t position () const { return position_; }
59
60 protected:
61 void initialize () { buffer_ = nullptr; position_ = 0; capacity_ = 0; }
62
63 void reserve ( std::size_t size )
64 {
65 if( size <= capacity_ )
66 return;
67
68 std::size_t capacity = std::max( size, 2*capacity_ );
69 void *buffer = std::realloc( buffer_, capacity );
70 if( !buffer )
71 {
72 capacity = capacity_ + size;
73 buffer = std::realloc( buffer_, capacity );
74 if( !buffer )
75 DUNE_THROW( OutOfMemoryError, "Cannot allocate sufficiently large buffer." );
76 }
77 buffer_ = buffer;
78 capacity_ = capacity;
79 }
80
81 void *buffer_;
82 std::size_t position_, capacity_;
83 };
84
85
86
87 // SPPackedMessageWriteBuffer
88 // --------------------------
89
90 template< class CollectiveCommunication >
92
93 template< class C >
94 class SPPackedMessageWriteBuffer< CollectiveCommunication< C > >
96 {
99
100 public:
101 explicit SPPackedMessageWriteBuffer ( const CollectiveCommunication< C > &comm ) {}
102
103 void send ( int rank, int tag ) {}
104 void wait () {}
105 };
106
107#if HAVE_MPI
108 template<>
109 class SPPackedMessageWriteBuffer< CollectiveCommunication< MPI_Comm > >
111 {
114
115 public:
116 explicit SPPackedMessageWriteBuffer ( const CollectiveCommunication< MPI_Comm > &comm ) : comm_( comm ) {}
117
118 void send ( int rank, int tag )
119 {
120 MPI_Isend( buffer_, position_, MPI_PACKED, rank, tag, comm_, &request_ );
121 }
122
123 void wait () { MPI_Wait( &request_, MPI_STATUS_IGNORE ); }
124
125 protected:
126 MPI_Comm comm_;
127 MPI_Request request_;
128 };
129#endif // #if HAVE_MPI
130
131
132
133 // SPBasicPackedMessageReadBuffer
134 // ------------------------------
135
137 {
139
140 public:
142
144
146 : buffer_( other.buffer_ ),
147 position_( other.position_ ), size_( other.size_ )
148 {
149 other.initialize();
150 }
151
153
154 This &operator= ( const This & ) = delete;
155
156 This &operator= ( This &&other )
157 {
158 buffer_ = other.buffer_;
159 position_ = other.position_;
160 size_ = other.size_;
161 other.initialize();
162 return *this;
163 }
164
165 template< class T >
166 void read ( T &value )
167 {
168 if( position_ + sizeof( T ) <= size_ )
169 {
170 std::memcpy( static_cast< void * >( &value ), static_cast< char * >( buffer_ ) + position_, sizeof( T ) );
171 position_ += sizeof( T );
172 }
173 else
174 DUNE_THROW( IOError, "Cannot read beyond the buffer's end." );
175 }
176
177 std::size_t position () const { return position_; }
178
179 protected:
180 void initialize () { buffer_ = nullptr; position_ = 0; size_ = 0; }
181
182 void reset ( std::size_t size )
183 {
184 std::free( buffer_ );
185 initialize();
186 if( size == 0 )
187 return;
188 buffer_ = std::malloc( size );
189 if( !buffer_ )
190 DUNE_THROW( OutOfMemoryError, "Cannot allocate sufficiently large buffer." );
191 size_ = size;
192 }
193
194 void *buffer_;
195 std::size_t position_, size_;
196 };
197
198
199
200 // SPPackedMessageReadBuffer
201 // -------------------------
202
203 template< class CollectiveCommunication >
205
206 template< class C >
207 class SPPackedMessageReadBuffer< CollectiveCommunication< C > >
209 {
212
213 public:
214 explicit SPPackedMessageReadBuffer ( const CollectiveCommunication< C > &comm ) {}
215
216 void receive ( int rank, int rag, std::size_t size )
217 {
218 DUNE_THROW( IOError, "Nothing to receive in a serial communication." );
219 }
220
221 void receive ( int rank, int tag ) { receive( rank, tag, 0 ); }
222 void receive ( int tag ) { receive( 0, tag, 0 ); }
223
224 int rank () const { return 0 ; }
225
226 void wait () {}
227
228 friend inline typename std::vector< This >::iterator waitAny ( std::vector< This > &readBuffers )
229 {
230 return readBuffers.end();
231 }
232 };
233
234#if HAVE_MPI
235 template<>
236 class SPPackedMessageReadBuffer< CollectiveCommunication< MPI_Comm > >
238 {
241
242 public:
243 SPPackedMessageReadBuffer ( const CollectiveCommunication< MPI_Comm > &comm ) : comm_( comm ) {}
244
245 void receive ( int rank, int tag, std::size_t size )
246 {
247 rank_ = rank;
248 reset( size );
249 MPI_Irecv( buffer_, size_, MPI_BYTE, rank, tag, comm_, &request_ );
250 }
251
252 void receive ( int rank, int tag )
253 {
254 MPI_Status status;
255 MPI_Probe( rank, tag, comm_, &status );
256 int count;
257 MPI_Get_count( &status, MPI_BYTE, &count );
258 receive( status.MPI_SOURCE, tag, count );
259 }
260
261 void receive ( int tag ) { receive( MPI_ANY_SOURCE, tag ); }
262
263 int rank () const { return rank_; }
264
265 void wait () { MPI_Wait( &request_, MPI_STATUS_IGNORE ); }
266
267 friend inline typename std::vector< This >::iterator waitAny ( std::vector< This > &readBuffers )
268 {
269 const std::size_t numBuffers = readBuffers.size();
270 std::vector< MPI_Request > requests( numBuffers );
271 for( std::size_t i = 0; i < numBuffers; ++i )
272 requests[ i ] = readBuffers[ i ].request_;
273
274 int index = MPI_UNDEFINED;
275 MPI_Waitany( numBuffers, requests.data(), &index, MPI_STATUS_IGNORE );
276 if( index == MPI_UNDEFINED )
277 return readBuffers.end();
278
279 readBuffers[ index ].request_ = requests[ index ];
280 return readBuffers.begin() + index;
281 }
282
283 protected:
284 int rank_;
285 MPI_Comm comm_;
286 MPI_Request request_;
287 };
288#endif // #if HAVE_MPI
289
290} // namespace Dune
291
292#endif // #ifndef DUNE_GRID_SPGRID_MESSAGEBUFFER_HH
Dune::SPMultiIndex< dim > max(const Dune::SPMultiIndex< dim > &a, const Dune::SPMultiIndex< dim > &b)
Definition: multiindex.hh:305
Definition: iostream.hh:7
Definition: messagebuffer.hh:22
void reserve(std::size_t size)
Definition: messagebuffer.hh:63
This & operator=(const This &)=delete
void initialize()
Definition: messagebuffer.hh:61
void * buffer_
Definition: messagebuffer.hh:81
SPBasicPackedMessageWriteBuffer(This &&other)
Definition: messagebuffer.hh:30
std::size_t capacity_
Definition: messagebuffer.hh:82
std::size_t position() const
Definition: messagebuffer.hh:58
SPBasicPackedMessageWriteBuffer(const This &)=delete
SPBasicPackedMessageWriteBuffer()
Definition: messagebuffer.hh:26
std::size_t position_
Definition: messagebuffer.hh:82
void write(const T &value)
Definition: messagebuffer.hh:51
~SPBasicPackedMessageWriteBuffer()
Definition: messagebuffer.hh:37
Definition: messagebuffer.hh:91
SPPackedMessageWriteBuffer(const CollectiveCommunication< C > &comm)
Definition: messagebuffer.hh:101
void send(int rank, int tag)
Definition: messagebuffer.hh:103
void send(int rank, int tag)
Definition: messagebuffer.hh:118
SPPackedMessageWriteBuffer(const CollectiveCommunication< MPI_Comm > &comm)
Definition: messagebuffer.hh:116
Definition: messagebuffer.hh:137
void reset(std::size_t size)
Definition: messagebuffer.hh:182
SPBasicPackedMessageReadBuffer(const This &)=delete
This & operator=(const This &)=delete
std::size_t size_
Definition: messagebuffer.hh:195
void * buffer_
Definition: messagebuffer.hh:194
std::size_t position_
Definition: messagebuffer.hh:195
SPBasicPackedMessageReadBuffer()
Definition: messagebuffer.hh:141
std::size_t position() const
Definition: messagebuffer.hh:177
void initialize()
Definition: messagebuffer.hh:180
~SPBasicPackedMessageReadBuffer()
Definition: messagebuffer.hh:152
SPBasicPackedMessageReadBuffer(This &&other)
Definition: messagebuffer.hh:145
void read(T &value)
Definition: messagebuffer.hh:166
Definition: messagebuffer.hh:204
int rank() const
Definition: messagebuffer.hh:224
void receive(int rank, int rag, std::size_t size)
Definition: messagebuffer.hh:216
SPPackedMessageReadBuffer(const CollectiveCommunication< C > &comm)
Definition: messagebuffer.hh:214
void receive(int tag)
Definition: messagebuffer.hh:222
friend std::vector< This >::iterator waitAny(std::vector< This > &readBuffers)
Definition: messagebuffer.hh:228
void receive(int rank, int tag)
Definition: messagebuffer.hh:221
SPPackedMessageReadBuffer(const CollectiveCommunication< MPI_Comm > &comm)
Definition: messagebuffer.hh:243
void receive(int tag)
Definition: messagebuffer.hh:261
friend std::vector< This >::iterator waitAny(std::vector< This > &readBuffers)
Definition: messagebuffer.hh:267
void receive(int rank, int tag)
Definition: messagebuffer.hh:252
void receive(int rank, int tag, std::size_t size)
Definition: messagebuffer.hh:245