vt  1.5.0
(Virtual Transport)
collection_chain_set.h
Go to the documentation of this file.
1 /*
2 //@HEADER
3 // *****************************************************************************
4 //
5 // collection_chain_set.h
6 // DARMA/vt => Virtual Transport
7 //
8 // Copyright 2019-2024 National Technology & Engineering Solutions of Sandia, LLC
9 // (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
10 // Government retains certain rights in this software.
11 //
12 // Redistribution and use in source and binary forms, with or without
13 // modification, are permitted provided that the following conditions are met:
14 //
15 // * Redistributions of source code must retain the above copyright notice,
16 // this list of conditions and the following disclaimer.
17 //
18 // * Redistributions in binary form must reproduce the above copyright notice,
19 // this list of conditions and the following disclaimer in the documentation
20 // and/or other materials provided with the distribution.
21 //
22 // * Neither the name of the copyright holder nor the names of its
23 // contributors may be used to endorse or promote products derived from this
24 // software without specific prior written permission.
25 //
26 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
27 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
28 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
29 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
30 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
31 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
32 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
33 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
34 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
35 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
36 // POSSIBILITY OF SUCH DAMAGE.
37 //
38 // Questions? Contact darma@sandia.gov
39 //
40 // *****************************************************************************
41 //@HEADER
42 */
43 
44 #if !defined INCLUDED_VT_MESSAGING_COLLECTION_CHAIN_SET_H
45 #define INCLUDED_VT_MESSAGING_COLLECTION_CHAIN_SET_H
46 
47 #include "vt/config.h"
49 
50 #include <unordered_map>
51 #include <unordered_set>
52 
53 namespace vt { namespace messaging {
54 
66 };
67 
79 template <typename Index>
80 class CollectionChainSet final {
81  public:
82  CollectionChainSet() = default;
83  CollectionChainSet(const CollectionChainSet&) = delete;
85 
97  template <typename ProxyT, typename IndexT = typename ProxyT::IndexType>
98  explicit CollectionChainSet(ProxyT proxy, ChainSetLayout layout = Local);
99 
101  if (deallocator_) {
102  deallocator_();
103  }
104  }
105 
106 private:
112  struct IdxMsg : vt::Message {
113  explicit IdxMsg(Index in_idx) : idx_(in_idx) {}
114  Index idx_;
115  };
116 
122  void addIndexHan(IdxMsg* msg) {
123  addIndex(msg->idx_);
124  }
125 
131  void removeIndexHan(IdxMsg* msg) {
132  removeIndex(msg->idx_);
133  }
134 
135 public:
143  void addIndex(Index idx) {
144  vtAssert(
145  chains_.find(idx) == chains_.end(),
146  "Cannot add an already-present chain");
147  chains_[idx] = DependentSendChain();
148  }
149 
160  void removeIndex(Index idx) {
161  auto iter = chains_.find(idx);
162  vtAssert(iter != chains_.end(), "Cannot remove a non-present chain");
163  vtAssert(
164  iter->second.isTerminated(), "Cannot remove a chain with pending work");
165 
166  chains_.erase(iter);
167  }
168 
182  void nextStep(
183  std::string const& label, std::function<PendingSend(Index)> step_action) {
184  for (auto& entry : chains_) {
185  auto& idx = entry.first;
186  auto& chain = entry.second;
187 
188  // The parameter `true` here tells VT to use an efficient rooted DS-epoch
189  // by default. This can still be overridden by command-line flags
190  EpochType new_epoch =
191  theTerm()->makeEpochRooted(label, term::UseDS{true});
192  vt::theMsg()->pushEpoch(new_epoch);
193 
194  chain.add(new_epoch, step_action(idx));
195 
196  vt::theMsg()->popEpoch(new_epoch);
197  theTerm()->finishedEpoch(new_epoch);
198  }
199  }
200 
213  void nextStep(std::function<PendingSend(Index)> step_action) {
214  return nextStep("", step_action);
215  }
216 
230  std::string const& label, std::function<PendingSend(Index)> step_action) {
231  auto epoch = theTerm()->makeEpochCollective(label);
233 
234  for (auto& entry : chains_) {
235  auto& idx = entry.first;
236  auto& chain = entry.second;
237  chain.add(epoch, step_action(idx));
238  }
239 
242  }
243 
255  void nextStepCollective(std::function<PendingSend(Index)> step_action) {
256  return nextStepCollective("", step_action);
257  }
258 
275  static void mergeStepCollective(
277  std::function<PendingSend(Index)> step_action) {
278  mergeStepCollective("", a, b, step_action);
279  }
280 
298  static void mergeStepCollective(
299  std::string const& label, CollectionChainSet& a, CollectionChainSet& b,
300  std::function<PendingSend(Index)> step_action) {
301  auto epoch = theTerm()->makeEpochCollective(label);
303 
304  for (auto& entry : a.chains_) {
305  auto& idx = entry.first;
306  auto& chaina = entry.second;
307  auto chainb_pos = b.chains_.find(entry.first);
308  vtAssert(
309  chainb_pos != b.chains_.end(),
310  fmt::format("index {} must be present in chainset b", entry.first));
311 
312  auto& chainb = chainb_pos->second;
314  chaina, chainb, epoch, step_action(idx));
315  }
316 
319  }
320 
325  void phaseDone() {
326  for (auto& entry : chains_) {
327  entry.second.done();
328  }
329  }
330 
334  std::unordered_set<Index> getSet() {
335  std::unordered_set<Index> index_set;
336  for (auto& entry : chains_) {
337  index_set.emplace(entry.first);
338  }
339  return index_set;
340  }
341 
345  void foreach (std::function<void(Index)> fn) {
346  for (auto& entry : chains_) {
347  fn(entry.first);
348  }
349  }
350 
351 private:
353  std::unordered_map<Index, DependentSendChain> chains_;
356 };
357 
358 }} /* end namespace vt::messaging */
359 
361 
362 #endif /*INCLUDED_VT_MESSAGING_COLLECTION_CHAIN_SET_H*/
vt::messaging::ActiveMsg
The base class for all messages. Common alias is vt::Message which uses the default envelope.
Definition: message.h:79
vt::messaging::CollectionChainSet::mergeStepCollective
static void mergeStepCollective(std::string const &label, CollectionChainSet &a, CollectionChainSet &b, std::function< PendingSend(Index)> step_action)
The next collective step of both CollectionChainSets to execute over all shared indices of the Collec...
Definition: collection_chain_set.h:298
vt::messaging::DependentSendChain::mergeChainStep
static void mergeChainStep(DependentSendChain &a, DependentSendChain &b, EpochType new_epoch, PendingSend &&link)
Add a task that is dependent on two DependentSendChain instances.
Definition: dependent_send_chain.h:151
vt::config::epoch
Definition: debug_config.h:82
vt::messaging::CollectionChainSet::removeIndex
void removeIndex(Index idx)
Remove an index from the set.
Definition: collection_chain_set.h:160
vt::messaging::ChainSetLayout
ChainSetLayout
Used to specify the layout for automatically managing dependency chains for a given collection.
Definition: collection_chain_set.h:63
vt::term::UseDS
Definition: epoch_tags.h:65
vt::messaging::CollectionChainSet::removeIndexHan
void removeIndexHan(IdxMsg *msg)
Remove an index remotely.
Definition: collection_chain_set.h:131
vt::messaging::CollectionChainSet::deallocator_
std::function< void()> deallocator_
Deallocator that type erases element listener de-registration.
Definition: collection_chain_set.h:355
vt::epoch::EpochType
Definition: epoch_type.h:52
vt::term::TerminationDetector::makeEpochRooted
EpochType makeEpochRooted(UseDS use_ds=UseDS{true}, ParentEpochCapture parent=ParentEpochCapture{})
Create a new rooted epoch.
Definition: termination.cc:1006
vt::messaging::CollectionChainSet::nextStep
void nextStep(std::function< PendingSend(Index)> step_action)
The next step to execute on all the chain indices in this collection chain set.
Definition: collection_chain_set.h:213
vt::messaging::CollectionChainSet::mergeStepCollective
static void mergeStepCollective(CollectionChainSet &a, CollectionChainSet &b, std::function< PendingSend(Index)> step_action)
The next collective step of both CollectionChainSets to execute over all shared indices of the Collec...
Definition: collection_chain_set.h:275
vtAssert
#define vtAssert(cond, str)
Definition: config_assert.h:151
vt::theMsg
messaging::ActiveMessenger * theMsg()
Definition: runtime_get.cc:103
vt::messaging::PendingSend
A pending send (or other similar operation) that is delayed until this holder goes out of scope.
Definition: pending_send.h:70
vt::messaging::ActiveMessenger::pushEpoch
void pushEpoch(EpochType const &epoch)
Push an epoch on the stack.
Definition: active.impl.h:454
vt::messaging::CollectionChainSet::phaseDone
void phaseDone()
Indicate that the current phase is complete. Resets the state on each DependentSendChain.
Definition: collection_chain_set.h:325
vt::messaging::CollectionChainSet::chains_
std::unordered_map< Index, DependentSendChain > chains_
Set of DependentSendChain managed on this node for indices.
Definition: collection_chain_set.h:353
vt
Definition: activefn.h:51
vt::messaging::Home
Definition: collection_chain_set.h:65
collection_chain_set.impl.h
vt::term::TerminationDetector::makeEpochCollective
EpochType makeEpochCollective(ParentEpochCapture parent=ParentEpochCapture{})
Create a new collective epoch.
Definition: termination.cc:1051
vt::messaging::CollectionChainSet::nextStepCollective
void nextStepCollective(std::function< PendingSend(Index)> step_action)
The next collective step to execute for each index that is added to the CollectionChainSet on each no...
Definition: collection_chain_set.h:255
vt::theTerm
term::TerminationDetector * theTerm()
Definition: runtime_get.cc:106
vt::messaging::CollectionChainSet::nextStepCollective
void nextStepCollective(std::string const &label, std::function< PendingSend(Index)> step_action)
The next collective step to execute for each index that is added to the CollectionChainSet on each no...
Definition: collection_chain_set.h:229
vt::messaging::CollectionChainSet::IdxMsg::IdxMsg
IdxMsg(Index in_idx)
Definition: collection_chain_set.h:113
vt::messaging::CollectionChainSet
A set of chains to maintain a sequence for a set of collection elements that may be local or remote.
Definition: collection_chain_set.h:80
vt::messaging::CollectionChainSet::getSet
std::unordered_set< Index > getSet()
Get the set of indices registered with this chain set.
Definition: collection_chain_set.h:334
vt::messaging::ActiveMessenger::popEpoch
EpochType popEpoch(EpochType const &epoch=no_epoch)
Pop an epoch off the stack.
Definition: active.impl.h:458
vt::messaging::CollectionChainSet::IdxMsg
Message that contains an index sent to remove or add remotely.
Definition: collection_chain_set.h:112
vt::messaging::CollectionChainSet::CollectionChainSet
CollectionChainSet()=default
vt::messaging::Local
Definition: collection_chain_set.h:64
vt::messaging::CollectionChainSet::addIndex
void addIndex(Index idx)
Add an index to the set.
Definition: collection_chain_set.h:143
config.h
vt::messaging::CollectionChainSet::~CollectionChainSet
~CollectionChainSet()
Definition: collection_chain_set.h:100
vt::messaging::CollectionChainSet::nextStep
void nextStep(std::string const &label, std::function< PendingSend(Index)> step_action)
The next step to execute on all the chain indices in this collection chain set.
Definition: collection_chain_set.h:182
dependent_send_chain.h
vt::messaging::CollectionChainSet::IdxMsg::idx_
Index idx_
Definition: collection_chain_set.h:114
vt::messaging::DependentSendChain
A sequenced chain of sends ordered by termination detection.
Definition: dependent_send_chain.h:117
vt::messaging::CollectionChainSet::addIndexHan
void addIndexHan(IdxMsg *msg)
Definition: collection_chain_set.h:122
vt::config::function
Definition: debug_config.h:99
vt::term::TerminationDetector::finishedEpoch
void finishedEpoch(EpochType const &epoch)
Tell the termination detector that all initial work has been enqueued for a given epoch on this node.
Definition: termination.cc:918