DARMA » Introduction » Virtual Context Collection

Collection of tasks

The virtual context collection component vt::vrt::collection::CollectionManager, accessed via vt::theCollection() is a core VT component that manages multi-dimensional collections of virtual context (or a migratable C++ object registered with vt) elements. It manages the creation, deletion, and messaging across elements at runtime supporting dense, sparse, on-demand, and staged insert modes. It utilizes the Location Manager to manage the location of these elements to efficiently deliver messages. It also utilizes the Group Manager to build a spanning tree across the nodes that the collection is currently mapped to. This group makes broadcasts efficient and allows reductions to make progress without waiting for nodes that do not have collection elements.

The Node LB Data component stores the LB data for live collections that then passes the instrumented data to the LB Manager component to apply load balancing strategies. You can use --vt_lb_keep_last_elm flag to prohibit load balancer from migrating last element in collection.

The preferred interface for constructing a new collection is vt::makeCollection<T>(). Using a fluent-style interface, makeCollection returns a configuration object to set properties of the collection before actually constructing the collection. Once it is configured, the .wait() or .deferWithEpoch(callback) methods allows one to either block until the collection is created or wait on the returned epoch until construction finishes (respectively). The wait() variant returns the proxy for use immediately (after blocking), whereas deferWithEpoch(callback) supplies the proxy when the epoch terminates by triggering the callback passed to it.

The function vt::makeCollection<T>() will create a collection in a collective fashion, meaning it must be called in tandem on all nodes. Alternatively, one may call vt::makeCollectionRooted<T>() to construct a rooted collection, which is invoked only on a single rank. (and the proxy is returned to a single rank). After waiting for construction, elements will have been constructed on their appropriate ranks, and the provided collection proxy will be usable on any rank it's sent to.

For collections without dynamic membership at runtime, one must call .bounds(my_range) to specify the bounds in each dimension for the collection or specify exactly one bulk insertion range (.bulkInsert(my_range_1)), where my_range_1 will be the assumed bounds for the collection. Bulk insertion is one such way to specify how insertions should happen during construction. The .bulkInsert() method (with no parameter) tells the runtime to insert all collection elements within the bounds using the mapping function to determine placement. The user can also specify specific ranges to bulk insert using .bulkInsert(my_range_1) with a parameter (this can be called multiple times).

For collective collection constructions, one may also use list insertion (.listInsert(my_index_list)) to specify non-contiguous lists of indices that the runtime should insert at construction time. Finally, for collective constructions, one may call .listInsertHere(my_index_list) to specifically instruct the runtime to construct a list of collection elements on the particular rank where it is invoked. This overrides the mapping function that the user supplies.

By default, a mapping function is applied to every collection. If the collection has bounds, the system will choose a default blocked mapping (across all dimensions) for initial placement. For collections without bounds (ones with dynamic membership), the system uses a simple xor hash function to generate a valid initial location for each index deterministically. One may specify a mapping function in two ways: the user can provide a stateless function as a template argument to .mapperFunc<my_map>(), where my_map has the following definition (shown for a 1-dimensional collection):

vt::NodeType my_map(vt::Index1D* idx, vt::Index1D* bounds, vt::NodeType num_nodes) {
  return idx->x() % num_nodes;
}

Or, alternatively, one may specify a object group mapper, which contains an instance across all ranks that may communicate to determine placement. The .mapperObjGroup(proxy) method configures the mapping object with an object group instance that already exists by passing the proxy to it. Otherwise, one may just give the type and constructor arguments to create a new instance: .mapperObjGroupConstruct<MyObjectGroup>(args...). An object group mapper must inherit from vt::mapping::BaseMapper and implement the pure virtual method NodeType map(IdxT* idx, int ndim, NodeType num_nodes) to define the mapping for the runtime. As an example, the object group mapper used by default for unbounded collections is implemented as follows:

template <typename IdxT>
struct UnboundedDefaultMap : vt::mapping::BaseMapper<IdxT> {
  static ObjGroupProxyType construct() {
    auto proxy = theObjGroup()->makeCollective<UnboundedDefaultMap<IdxT>>();
    return proxy.getProxy();
  }

  NodeType map(IdxT* idx, int ndim, NodeType num_nodes) override {
    typename IdxT::DenseIndexType val = 0;
    for (int i = 0; i < ndim; i++) {
      val ^= idx->get(i);
    }
    return val % num_nodes;
  }
};

Note that all collection mapping functions or object groups must be deterministic across all nodes for the same inputs.

By default, the collection type T (that inherits from the runtime base type vt::Collection<T, IndexType>) must have a default constructor. However, this can be avoided by configuring the collection with a specialized element constructor using .elementConstructor(x), where x's type is std::function<std::unique_ptr<ColT>(IndexT idx)> and ColT is the collection type and IndexT is the index type for the collection. This configuration is only valid for collective constructions because the element constructor function can not be safely sent over the network. If this is provided, the collection manager will not try to default construct the collection elements, instead calling the user-provided constructor passed to this function.

By default, all collection elements are migratable and can be moved by the load balancer when it is invoked by the user. However, one may inform VT that collection is entirely non-migratable by setting the parameter .migratable(false) during construction. By doing this, work executed by its elements will be recorded as background load on the initially mapped rank and excluded from the load balancer migration decisions.

By default, collections do not have dynamic membership: they might be dense or sparse within the specified bounds, but the set of collection elements that are created at construction time persists (and never grows or shrinks) until the collection is completely destroyed. Dynamic membership allows the user to specify insertions and deletions as the program executes in a safe and orderly manner. To enable this, one must call .dynamicMembership(true). Note that the previous requirement of specifying collection bounds becomes optional with dynamic membership.

Once a collection is constructed with dynamic membership, one must start a collective modification epoch to make changes to the collection's membership. This is performed in the following way (note that this is a collective interface):

auto proxy = vt::makeCollection<MyCollection>("collection_label")
  .dynamicMembership(true)
  .collective(true)
  .wait();

auto range = vt::Index1D(num_elms);
auto token = proxy.beginModification();
for (int i = 0; i < range.x() / 2; i++) {
  if (i % num_nodes == this_node) {
    proxy[i].insertAt(token, i % 2);
  }
}
proxy.finishModification(std::move(token));

The calls to proxy.beginModification() start the insertion/deletion epoch by returning a token that must be passed to the actual modification calls. To insert a new collection element, the interface provides several methods on the indexed proxy: insert, insertAt, insertMsg or insertAtMsg. The insert method performs the insertion at the location specified by the mapping function/mapping object group that is provided when the collection is constructed. The insertAt or insertAtMsg allow the user to specify exactly where the new element should reside overriding the default mapping for the element. The insertMsg or insertAtMsg calls allow the user to pass a message to the collection element which invokes a non-default constructor that has the user-specified message as an argument.

Finally, one may call destroy on the indexed proxy to delete an element. All these modifications take place after finishModification is invoked—a blocking call that enacts the changes across the system. Once finishModification returns, the runtime guarantees that all changes have taken place across the system and all spanning trees are reconstructed or modified based on the changes made.

If a reduction is ongoing while insertions or deletions happen, the new elements are still expected to contribute. That is, new collection elements are part of the next sequenced reduction that has not causally terminated across the distributed system. For code readability, we generally recommend that the user wait on termination of any reductions before membership modifications are made.

Hello World 1D Dense Collection (Rooted)

struct Hello : vt::Collection<Hello, vt::Index1D> {
  Hello() = default;

  virtual ~Hello() {
    vtAssert(counter_ == 1, "Must be equal");
  }

  void doWork(int val) {
    fmt::print("Hello from {}: val={}\n", this->getIndex(), val);
    counter_++;
  }

private:
  int32_t counter_ = 0;
};

int main(int argc, char** argv) {
  vt::initialize(argc, argv);

  vt::NodeType this_node = vt::theContext()->getNode();

  int num_elms = 64;

  if (argc > 1) {
    num_elms = atoi(argv[1]);
  }

  if (this_node == 0) {
    auto range = vt::Index1D(num_elms);
    auto proxy = vt::makeCollectionRooted<Hello>("examples_hello_world_collection")
      .bounds(range)
      .bulkInsert()
      .wait();
    proxy.broadcast<&Hello::doWork>(10);
  }

  vt::finalize();

  return 0;
}

Hello World 1D Dense Collection (Collective)

struct Hello : vt::Collection<Hello, vt::Index1D> {
  Hello() = default;

  virtual ~Hello() {
    vt::NodeType num_nodes = vt::theContext()->getNumNodes();
    vtAssert(counter_ == num_nodes, "Should receive # nodes broadcasts");
  }

  void doWork(int val) {
    counter_++;
    fmt::print(
      "Hello from {}, val={}, counter_={}\n", getIndex(), val, counter_
    );
  }

private:
  int counter_ = 0;
};

int main(int argc, char** argv) {
  vt::initialize(argc, argv);

  int32_t num_elms = 16;
  if (argc > 1) {
    num_elms = atoi(argv[1]);
  }

  auto range = vt::Index1D(num_elms);
  auto proxy = vt::makeCollection<Hello>("examples_hello_world_collection_collective")
    .bounds(range)
    .bulkInsert()
    .wait();

  // All nodes send a broadcast to all elements
  proxy.broadcast<&Hello::doWork>(29);

  vt::finalize();

  return 0;
}

Hello World 1D Collection Reduce

struct Hello : vt::Collection<Hello, vt::Index1D> {
  void done(int val, double val2) {
    fmt::print("Reduce complete at {} values {} {}\n", getIndex(), val, val2);
  }

  void doWork() {
    fmt::print("Hello from {}\n", getIndex());

    // Get the proxy for the collection
    auto proxy = getCollectionProxy();

    auto val = getIndex().x();
    auto val2 = 2.4;
    proxy.allreduce<&Hello::done, vt::collective::PlusOp>(val, val2);
  }
};

int main(int argc, char** argv) {
  vt::initialize(argc, argv);

  vt::NodeType this_node = vt::theContext()->getNode();

  int32_t num_elms = 16;
  if (argc > 1) {
    num_elms = atoi(argv[1]);
  }

  auto range = vt::Index1D(num_elms);
  auto proxy = vt::makeCollection<Hello>("hello_world_collection_reduce")
    .bounds(range)
    .bulkInsert()
    .wait();

  if (this_node == 0) {
    proxy.broadcast<&Hello::doWork>();
  }

  vt::finalize();

  return 0;
}

Hello World 1D Collection Staged Insert

struct Hello : vt::Collection<Hello, vt::Index1D> {

  // Default constructor for migration
  Hello() = default;

  // Constructor used during insertion
  explicit Hello(std::string const& input_string)
    : in(input_string)
  { }

  virtual ~Hello() {
    vtAssert(counter_ == 1, "Must be equal");
  }

  void doWork() {
    counter_++;

    vt::NodeType this_node = vt::theContext()->getNode();
    fmt::print("{}: Hello from {}: {}\n", this_node, this->getIndex(), in);
  }

private:
  int counter_ = 0;
  std::string in;
};

int main(int argc, char** argv) {
  vt::initialize(argc, argv);

  vt::NodeType this_node = vt::theContext()->getNode();
  vt::NodeType num_nodes = vt::theContext()->getNumNodes();

  if(num_nodes < 2){
    vt::finalize();
    return 0;
  }

  int num_elms = 32;
  if (argc > 1) {
    num_elms = atoi(argv[1]);
  }

  auto range = vt::Index1D(num_elms);

  std::vector<std::tuple<vt::Index1D, std::unique_ptr<Hello>>> elms;
  for (int i = 0; i < num_elms; i++) {
    // Insert even elements, round-robin the insertions from each node
    if ((i / 2) % num_nodes == this_node and i % 2 == 0) {
      auto str = fmt::format("inserted from {}", this_node);
      elms.emplace_back(
        std::make_tuple(vt::Index1D{i}, std::make_unique<Hello>(str))
      );
    }
  }

  auto proxy = vt::makeCollection<Hello>("examples_hello_world_collection_staged_insert")
    .bounds(range)
    .listInsertHere(std::move(elms))
    .wait();

  if (this_node == 1) {
    proxy.broadcast<&Hello::doWork>();
  }

  vt::finalize();

  return 0;
}