|
vt
1.6.0
(Virtual Transport)
|
Go to the documentation of this file.
44 #if !defined INCLUDED_VT_MESSAGING_ACTIVE_H
45 #define INCLUDED_VT_MESSAGING_ACTIVE_H
70 #if vt_check_enabled(trace_enabled)
74 #include <type_traits>
77 #include <unordered_map>
103 static constexpr
TagType const PutPackedTag =
104 std::numeric_limits<TagType>::max();
117 static constexpr
TagType const starting_direct_buffer_tag = 1000;
118 static constexpr
MsgSizeType const max_pack_direct_size = 512;
143 template <
typename Serializer>
167 template <
typename Serializer>
190 MPI_Request in_req = MPI_REQUEST_NULL
195 bool test(
int& num_mpi_tests) {
200 MPI_Test(&
req, &flag, &stat);
206 MPI_Request
req = MPI_REQUEST_NULL;
217 std::vector<MPI_Request> in_reqs, std::byte*
const in_user_buf,
226 bool test(
int& num_mpi_tests) {
232 MPI_Test(&
reqs[
cur], &flag, &stat);
243 template <
typename Serializer>
281 template <
typename Serializer>
350 std::string
name()
override {
return "ActiveMessenger"; }
364 template <
typename MsgPtrT>
372 template <
typename MsgPtrT>
380 template <
typename MsgPtrT>
388 template <
typename MsgPtrT>
397 template <
typename MsgT>
407 template <
typename MsgT>
416 template <
typename MsgT>
431 std::enable_if_t<
true
432 and not ::vt::messaging::msg_defines_serialize_mode<MsgT>::value,
442 #ifndef vt_quirked_serialize_method_detection
444 not ::vt::messaging::has_own_serialize<MsgT>,
445 "Message prohibiting serialization must not have a serialization function."
448 return sendMsgCopyableImpl<MsgT>(dest, han, msg, tag);
454 std::enable_if_t<
true
455 and ::vt::messaging::msg_defines_serialize_mode<MsgT>::value
456 and ::vt::messaging::msg_serialization_mode<MsgT>::required,
466 #ifndef vt_quirked_serialize_method_detection
468 ::vt::messaging::has_own_serialize<MsgT>,
469 "Message requiring serialization must have a serialization function."
472 return sendMsgSerializableImpl<MsgT>(dest, han, msg, tag);
479 std::enable_if_t<
true
480 and ::vt::messaging::msg_defines_serialize_mode<MsgT>::value
481 and ::vt::messaging::msg_serialization_mode<MsgT>::supported,
491 #ifndef vt_quirked_serialize_method_detection
493 ::vt::messaging::has_own_serialize<MsgT>,
494 "Message supporting serialization must have a serialization function."
497 return sendMsgCopyableImpl<MsgT>(dest, han, msg, tag);
504 std::enable_if_t<
true
505 and ::vt::messaging::msg_defines_serialize_mode<MsgT>::value
506 and ::vt::messaging::msg_serialization_mode<MsgT>::prohibited,
516 #ifndef vt_quirked_serialize_method_detection
518 not ::vt::messaging::has_own_serialize<MsgT>,
519 "Message prohibiting serialization must not have a serialization function."
522 return sendMsgCopyableImpl<MsgT>(dest, han, msg, tag);
525 template <
typename MsgT>
576 template <
typename MsgT>
577 [[deprecated(
"size must be set in makeMessageSz, use regular sendMsg")]]
598 template <
typename MsgT>
620 template <
typename MsgT>
677 template <
typename MsgT, ActiveTypedFnType<MsgT>* f>
681 bool deliver_to_sender =
true,
696 template <
typename MsgT, ActiveTypedFnType<MsgT>* f>
699 bool deliver_to_sender =
true,
717 bool deliver_to_sender =
true,
720 using MsgT =
typename FuncTraits<decltype(f)>::MsgT;
721 return broadcastMsg<MsgT, f>(msg, deliver_to_sender, tag);
735 template <
typename MsgT, ActiveTypedFnType<MsgT>* f>
759 using MsgT =
typename FuncTraits<decltype(f)>::MsgT;
760 return sendMsg<MsgT, f>(dest, msg, tag);
771 template <
auto f,
typename... Params>
773 using Tuple =
typename FuncTraits<decltype(f)>::TupleType;
775 auto msg = vt::makeMessage<MsgT>();
776 msg->setParams(std::forward<Params>(params)...);
777 auto han = auto_registry::makeAutoHandlerParam<decltype(f), f, MsgT>();
778 return sendMsg<MsgT>(dest.
get(), han, msg, no_tag);
788 template <
auto f,
typename... Params>
790 using Tuple =
typename FuncTraits<decltype(f)>::TupleType;
792 auto msg = vt::makeMessage<MsgT>();
793 msg->setParams(std::forward<Params>(params)...);
794 auto han = auto_registry::makeAutoHandlerParam<decltype(f), f, MsgT>();
795 constexpr
bool deliver_to_sender =
true;
796 return broadcastMsg<MsgT>(han, msg, deliver_to_sender, no_tag);
815 template <
typename MsgT, ActiveTypedFnType<MsgT>* f>
816 [[deprecated(
"size must be set in makeMessageSz, use regular sendMsg")]]
836 template <
typename MsgT, ActiveTypedFnType<MsgT>* f>
855 template <
typename MsgT, ActiveTypedFnType<MsgT>* f>
913 template <ActiveFnType* f,
typename MsgT>
916 bool deliver_to_sender =
true,
931 template <ActiveFnType* f,
typename MsgT>
992 bool deliver_to_sender =
true,
1007 template <
typename FunctorT>
1010 bool deliver_to_sender =
true,
1046 template <
typename FunctorT,
typename MsgT>
1064 template <
typename FunctorT>
1154 template <
typename MsgT>
1158 bool deliver_to_sender =
true,
1174 template <
typename MsgT>
1193 template <
typename MsgT, ActiveTypedFnType<MsgT>* f>
1213 template <
typename MsgT>
1323 NodeType const& sender,
bool const& enqueue,
1345 NodeType const&
node = uninitialized_destination,
bool const& enqueue =
true,
1366 int nchunks, std::byte*
const user_buf,
TagType const& tag,
1367 NodeType const&
node = uninitialized_destination,
bool const& enqueue =
true,
1388 int nchunks, std::byte*
const buf,
TagType const tag,
NodeType const from,
1391 #
if MPI_VERSION >= 3
1392 , MPI_Message first_msg = MPI_MESSAGE_NULL,
MsgSizeType first_chunk_bytes = 0
1573 template <
typename MsgT>
1585 template <
typename MsgT>
1601 template <
typename MsgT>
1612 template <
typename MsgT>
1633 template <
typename SerializerT>
1655 # if vt_check_enabled(trace_enabled)
1658 | trace_irecv_polling_am
1659 | trace_irecv_polling_dm
1753 MPI_Request
req = MPI_REQUEST_NULL;
1785 return std::make_tuple(
1792 # if vt_check_enabled(trace_enabled)
The base class for all messages. Common alias is vt::Message which uses the default envelope.
Definition: message.h:79
ActiveRecvBroker active_broker_
Definition: active.h:1837
void serialize(Serializer &s)
Definition: active.h:144
Helper to unify 'stealing' message ownership.
Definition: smart_ptr.h:313
bool valid
Definition: active.h:178
InProgressBase(std::byte *in_buf, MsgSizeType in_probe_bytes, NodeType in_sender)
Definition: active.h:161
ActionType dealloc_user_buf
Definition: active.h:254
RequestHolder< AsyncOpWrapper > in_progress_ops
Definition: active.h:1804
ActiveMessenger()
Definition: active.cc:64
int cap
Definition: active.h:1750
PendingSendType broadcastMsg(MsgPtrThief< MsgT > msg, bool deliver_to_sender=true, TagType tag=no_tag)
Broadcast a message.
Definition: active.impl.h:250
Used to hoist weak C++ types (like vt::VirtualProxyType ) into strongly typed values that have a uniq...
Definition: strong_type.h:60
bool progress(ActiveMessenger *self)
Definition: active.cc:1377
bool recvDataMsgPriority(int nchunks, PriorityType priority, TagType const &tag, NodeType const &node, ContinuationDeleterType next=nullptr)
Receive data as bytes from a node with a priority.
Definition: active.cc:647
RequestHolder< InProgressDataIRecv > in_progress_data_irecv
Definition: active.h:1803
T & get()
Get reference.
Definition: strong_type.h:153
std::byte * user_buf
Definition: active.h:253
void initialize() override
Empty default overridden initialize method.
Definition: active.cc:149
Definition: debug_config.h:82
diagnostic::Counter dmPollCount
Definition: active.h:1824
void finishPendingDataMsgAsyncRecv(InProgressDataIRecv *irecv)
Called when a VT-MPI message has been received.
Definition: active.cc:921
MPI_Request req
Definition: active.h:206
ActiveMessenger::PendingSendType sendMsgSerializableImpl(NodeType dest, HandlerType han, MsgSharedPtr< MsgT > &msg, TagType tag)
Definition: active.impl.h:111
void prepareActiveMsgToRun(MsgSharedPtr< BaseMsgType > const &base, NodeType const &from_node, bool insert, ActionType cont)
Prepare an active message to run by building a RunnableNew.
Definition: active.cc:1167
PendingSendType sendMsgAuto(NodeType dest, HandlerType han, MsgPtrThief< MsgT > msg, TagType tag=no_tag)
Send a message with a pre-registered handler.
Definition: active.impl.h:222
ShortMessage MessageType
Definition: active.h:329
elm::ElementLBData bare_handler_lb_data_
Definition: active.h:1833
std::function< void(SendFnType)> UserSendFnType
Definition: active.h:334
Definition: active.h:1749
diagnostic::CounterGauge amForwardCounterGauge
Definition: active.h:1829
ActionType dealloc_user_buf
Definition: active.h:129
PendingSendType sendMsgSz(NodeType dest, HandlerType han, MsgPtrThief< MsgT > msg, ByteType msg_size, TagType tag=no_tag)
Send a message with a pre-registered handler.
Definition: active.impl.h:210
int MPI_TagType
Definition: active.h:101
std::vector< Slot > slots_
Definition: active.h:1763
diagnostic::CounterGauge amRecvCounterGauge
Definition: active.h:1813
EventType doMessageSend(MsgSharedPtr< BaseMsgType > &msg)
Low-level send of a message, handler and other control data should be set already.
Definition: active.cc:456
Definition: debug_config.h:66
std::function< void()> ActionType
Used for generically store an action to perform.
Definition: types_type.h:125
int64_t UserEventIDType
Definition: trace_common.h:65
void registerAsyncOp(std::unique_ptr< AsyncOp > op)
Register a async operation that needs polling.
Definition: active.cc:1459
NodeType sender
Definition: active.h:130
static constexpr MPI_TagType tags_[num_caps_]
Definition: active.h:1764
std::unordered_map< TagType, PendingRecvType > ContainerPendingType
Definition: active.h:335
ActionType cont
Definition: active.h:273
std::tuple< HandlerType, TagType > ReadyHanTagType
Definition: active.h:336
bool posted
Definition: active.h:1754
Holds a set of pending MPI Irecvs to poll for completion.
Definition: request_holder.h:69
int64_t HandlerType
Used to hold a handler ID which identifier a function pointer/context.
Definition: types_type.h:63
PriorityType priority
Definition: active.h:256
std::byte * user_buf
Definition: active.h:127
elm::ElementIDStruct bare_handler_dummy_elm_id_for_lb_data_
Definition: active.h:1832
static void chunkedMultiMsg(MultiMsg *msg)
Handle a control message; immediately calls handleChunkedMultiMsg.
Definition: active.cc:316
Definition: fntraits.h:390
EpochType getEpoch() const
Returns the top of the epoch stack.
Definition: active.impl.h:468
Definition: epoch_type.h:54
MPI_Request req
Definition: active.h:1753
uint64_t ByteType
Used to store some number of bytes.
Definition: types_type.h:83
void recordLBDataCommForSend(NodeType const dest, MsgSharedPtr< BaseMsgType > const &base, MsgSizeType const msg_size)
Record LB's data for sending a message.
Definition: active.cc:967
bool tryProcessDataMsgRecv()
Poll MPI for raw data messages.
Definition: active.cc:661
diagnostic::Counter tdRecvCount
Definition: active.h:1826
PhysicalResourceType NodeType
Used to hold the current node/rank or the number of nodes.
Definition: types_type.h:57
RequestHolder< InProgressIRecv > in_progress_active_msg_irecv
Definition: active.h:1802
void markAsTermMessage(MsgPtrT const msg)
Mark a message as a termination message.
Definition: active.impl.h:63
void markAsLocationMessage(MsgPtrT const msg)
Mark a message as a location message.
int32_t TagType
Used to hold an tag, e.g., on messages or reduces.
Definition: types_type.h:73
bool is_user_buf
Definition: active.h:132
MessageType buffered_msg
Definition: active.h:271
void markAsCollectionMessage(MsgPtrT const msg)
Mark a message as a collection message.
void serialize(Serializer &s)
Definition: active.h:282
PendingSendType send(Node dest, Params &&... params)
Send parameters to a handler in a message.
Definition: active.h:772
EventRecord EventRecordType
Definition: event.h:89
Active receive broker that keeps a pool of pre-posted Irecv slots per size-class tag to drain unexpec...
Definition: active.h:1748
std::vector< MPI_Request > reqs
Definition: active.h:260
void serialize(Serializer &s)
Definition: active.h:244
PendingSendType broadcastMsgSz(MsgPtrThief< MsgT > msg, ByteType msg_size, bool deliver_to_sender=true, TagType tag=no_tag)
Broadcast a message with an explicit size.
messaging::ActiveMessenger * theMsg()
Definition: runtime_get.cc:103
diagnostic::CounterGauge dmPostedCounterGauge
Definition: active.h:1818
virtual ~ActiveMessenger()
Definition: active.cc:170
A pending send (or other similar operation) that is delayed until this holder goes out of scope.
Definition: pending_send.h:70
void pushEpoch(EpochType const &epoch)
Push an epoch on the stack.
Definition: active.impl.h:460
bool recvDataMsgBuffer(int nchunks, std::byte *const user_buf, PriorityType priority, TagType const &tag, NodeType const &node=uninitialized_destination, bool const &enqueue=true, ActionType dealloc_user_buf=nullptr, ContinuationDeleterType next=nullptr, bool is_user_buf=false)
Receive data as bytes with a buffer and priority.
Definition: active.cc:700
Core component of VT used to send messages.
Definition: active.h:327
int32_t CountType
Definition: active.h:330
bool test(int &num_mpi_tests)
Definition: active.h:226
static constexpr int caps_[num_caps_]
Definition: active.h:1761
NodeType this_node_
Definition: active.h:1805
messaging::ActiveMsg< Envelope > ShortMessage
Alias to the shortest message available with no epoch or tag allowed.
Definition: message.h:211
EventType sendMsgBytesWithPut(NodeType const &dest, MsgSharedPtr< BaseMsgType > const &base)
Send message as low-level bytes that is already packed.
Definition: active.cc:225
EpochType getEpochContextMsg(MsgT *msg)
Get the epoch for a message based on the current context so an subsequent operation on it can be safe...
Definition: active.impl.h:473
NodeType from_node
Definition: active.h:272
void serialize(Serializer &s)
Definition: active.h:168
diagnostic::CounterGauge dmRecvCounterGauge
Definition: active.h:1814
std::function< void(PtrLenPairType data, ActionType deleter)> ContinuationDeleterType
A continuation function with an allocated pointer with a deleter function.
Definition: active.h:88
void postSlot(ActiveMessenger *self, Slot &s)
Definition: active.cc:1323
std::function< SendInfo(PtrLenPairType, NodeType, TagType)> SendFnType
Definition: active.h:333
PendingRecv(int in_nchunks, std::byte *in_user_buf, ContinuationDeleterType in_cont, ActionType in_dealloc_user_buf, NodeType node, PriorityType in_priority, bool in_is_user_buf)
Definition: active.h:134
Definition: timing_type.h:58
Holds a buffered active message, used internally.
Definition: active.h:268
EpochType setupEpochMsg(MsgT *msg)
Set the epoch on a message.
Definition: active.impl.h:506
A general identifier for a task context. The id is unique in the system.
Definition: elm_id.h:67
PendingSend PendingSendType
Definition: active.h:338
bool testPendingAsyncOps()
Test pending general asynchronous events.
Definition: active.cc:1313
Definition: activefn.h:51
ActiveMessenger::PendingSendType sendMsgCopyableImpl(NodeType dest, HandlerType han, MsgSharedPtr< MsgT > &msg, TagType tag)
Definition: active.impl.h:147
uint16_t PriorityType
Used for hold the priority of a message.
Definition: types_type.h:113
bool recvDataMsg(int nchunks, TagType const &tag, NodeType const &node, ContinuationDeleterType next=nullptr)
Receive data as bytes from a node.
Definition: active.cc:654
diagnostic::Counter amHandlerCount
Definition: active.h:1821
bool test(int &num_mpi_tests)
Definition: active.h:195
diagnostic::Counter tdSentCount
Definition: active.h:1825
PendingSendType sendMsg(NodeType dest, MsgPtrThief< typename FuncTraits< decltype(f)>::MsgT > msg, TagType tag=no_tag)
Send a message (message type not required).
Definition: active.h:754
auto getRankLBData()
Get the rank-based LB data along with element ID for rank-based work.
Definition: active.h:1784
void blockOnAsyncOp(std::unique_ptr< AsyncOp > op)
Block the current task's execution on an pollable async operation until it completes.
Definition: active.cc:1463
Definition: elm_lb_data.h:59
TagType cur_direct_buffer_tag_
Definition: active.h:1801
InProgressDataIRecv(std::byte *in_buf, MsgSizeType in_probe_bytes, NodeType in_sender, std::vector< MPI_Request > in_reqs, std::byte *const in_user_buf, ActionType in_dealloc_user_buf, ContinuationDeleterType in_next, PriorityType in_priority)
Definition: active.h:215
static constexpr int slots_per_class_
Definition: active.h:1770
diagnostic::Counter amPollCount
Definition: active.h:1823
void startup() override
Empty default overridden startup method.
Definition: active.cc:153
diagnostic::CounterGauge amPostedCounterGauge
Definition: active.h:1817
diagnostic::CounterGauge dmSentCounterGauge
Definition: active.h:1810
void setTagMessage(MsgT *msg, TagType tag)
Set the tag in the envelope of a message.
Definition: active.impl.h:106
Definition: event_record.h:79
An pending receive event.
Definition: active.h:125
std::string name() override
Get the name of the component.
Definition: active.h:350
Returned from a data send to be used to receive the data.
Definition: send_info.h:56
int64_t MsgSizeType
Used for hold the size of a message.
Definition: types_type.h:101
MsgSizeType packMsg(MessageType *msg, MsgSizeType size, std::byte *ptr, MsgSizeType ptr_bytes)
Pack a message, used by the system.
Definition: active.cc:194
ContinuationDeleterType next
Definition: active.h:255
EventType sendMsgBytes(NodeType const &dest, MsgSharedPtr< BaseMsgType > const &base, MsgSizeType const &msg_size, TagType const &send_tag)
Send message as low-level bytes after packing put bytes if needed.
Definition: active.cc:413
MPI_TagType allocateNewTag()
Allocate a new, unused tag.
Definition: active.cc:532
void setup(ActiveMessenger *self)
Definition: active.cc:1356
ActiveMessenger::PendingSendType sendMsgImpl(NodeType dest, HandlerType han, MsgSharedPtr< MsgT > &msg, TagType tag)
Definition: active.h:436
MPI_TagType tag
Definition: active.h:1751
EpochType popEpoch(EpochType const &epoch=no_epoch)
Pop an epoch off the stack.
Definition: active.impl.h:464
void handleChunkedMultiMsg(MultiMsg *msg)
Handle a control message that coordinates multiple payloads arriving that constitute a contiguous pay...
Definition: active.cc:320
PendingSendType broadcast(Params &&... params)
Broadcast parameters to a handler in a message.
Definition: active.h:789
void setEpochMessage(MsgT *msg, EpochType epoch)
Set the epoch in the envelope of a message.
Definition: active.impl.h:101
MPITag
Definition: active.h:106
MPI_Comm comm_
Definition: active.h:1834
int nchunks
Definition: active.h:126
#define VT_ALLOW_MPI_CALLS
Definition: mpi_access.h:52
bool tryProcessIncomingActiveMsg()
Poll MPI to discover an incoming message with a handler.
Definition: active.cc:1032
void markAsSerialMsgMessage(MsgPtrT const msg)
Mark a message as a serialization control message.
std::tuple< EventType, int > sendDataMPI(PtrLenPairType const &ptr, NodeType const &dest, TagType const &tag)
Send raw bytes to a node with potentially multiple sends.
Definition: active.cc:581
std::byte * buf
Definition: active.h:1752
PendingSendType sendMsg(NodeType dest, HandlerType han, MsgPtrThief< MsgT > msg, TagType tag=no_tag)
Send a message with a pre-registered handler.
Definition: active.impl.h:199
EventType sendMsgMPI(NodeType const &dest, MsgSharedPtr< BaseMsgType > const &base, MsgSizeType const &msg_size, TagType const &send_tag)
Send already-packed message bytes with MPI using multiple sends if necessary.
Definition: active.cc:342
Component class for a generic, pollable VT runtime module, CRTP'ed over the component's actual type....
Definition: component.h:161
void recvDataDirect(int nchunks, std::byte *const buf, TagType const tag, NodeType const from, MsgSizeType len, PriorityType prio, ActionType dealloc=nullptr, ContinuationDeleterType next=nullptr, bool is_user_buf=false)
Receive data from MPI in multiple chunks.
Definition: active.cc:810
MsgSizeType probe_bytes
Definition: active.h:176
ContainerPendingType pending_recvs_
Definition: active.h:1800
void serialize(SerializerT &s)
Definition: active.h:1634
SendInfo sendData(PtrLenPairType const &ptr, NodeType const &dest, TagType const &tag)
Send raw bytes to a node.
Definition: active.cc:543
Base class for an in-progress MPI operation.
Definition: active.h:160
std::byte * buf
Definition: active.h:175
void finishPendingActiveMsgAsyncRecv(InProgressIRecv *irecv)
Called when a VT-MPI message has been received.
Definition: active.cc:1221
int progress(TimeType current_time) override
Call into the progress engine.
Definition: active.cc:1446
std::tuple< std::byte *, ByteType > PtrLenPairType
A pair of a std::byte* and number of bytes (length) for sending data.
Definition: active.h:84
trace::TraceEventIDType makeTraceCreationSend(HandlerType const handler, ByteType serialized_msg_size, bool is_bcast)
Definition: active.cc:175
diagnostic::Counter bcastsSentCount
Definition: active.h:1822
bool testPendingDataMsgAsyncRecv()
Test pending MPI request for data message receives.
Definition: active.cc:1301
Definition: active.cc:299
std::size_t cur
Definition: active.h:259
void cleanup()
Definition: active.cc:1426
diagnostic::CounterGauge amSentCounterGauge
Definition: active.h:1809
uint32_t TraceEventIDType
Definition: trace_common.h:62
PendingSendType broadcastMsg(MsgPtrThief< typename FuncTraits< decltype(f)>::MsgT > msg, bool deliver_to_sender=true, TagType tag=no_tag)
Broadcast a message (message type not required).
Definition: active.h:715
Definition: debug_config.h:88
InProgressIRecv(std::byte *in_buf, MsgSizeType in_probe_bytes, NodeType in_sender, MPI_Request in_req=MPI_REQUEST_NULL)
Definition: active.h:188
void processActiveMsg(MsgSharedPtr< BaseMsgType > const &base, NodeType const &sender, bool insert, ActionType cont=nullptr)
Process an incoming active message.
Definition: active.cc:998
static constexpr int num_caps_
Definition: active.h:1760
PriorityType priority
Definition: active.h:131
An in-progress MPI_Irecv watched by the runtime.
Definition: active.h:186
Definition: debug_config.h:99
PendingSendType broadcastMsgAuto(MsgPtrThief< MsgT > msg, TagType tag=no_tag)
Broadcast a message.
Definition: active.impl.h:300
static MPI_TagType selectActiveTag(MsgSizeType size)
Select an active message tag based on the size of the message.
Definition: active.cc:217
bool testPendingActiveMsgAsyncRecv()
Test pending MPI request for active message receives.
Definition: active.cc:1289
BufferedActiveMsg(MessageType const &in_buffered_msg, NodeType const &in_from_node, ActionType in_cont)
Definition: active.h:275
Definition: param_msg.h:150
uint64_t EventType
Used to hold a local/remote event to wait for completion.
Definition: types_type.h:69
NodeType sender
Definition: active.h:177
ContinuationDeleterType cont
Definition: active.h:128
An in-progress pure data MPI_Irecv watched by the runtime.
Definition: active.h:214