DARMA » Introduction » Pipe Manager

Create opaque callback endpoints

The pipe manager component vt::pipe::PipeManager, accessed via vt::theCB() allows the creation of general pipes and callbacks between opaque endpoints that are not revealed through the type. Callbacks allow one to supply a general endpoint that accepts a type of data without revealing the actual endpoint instance. For example, one may create a callback that triggers a handler invocation on a certain node, broadcasts to a handler, sends to a collection or objgroup, or broadcasts to a collection or objgroup, etc.

The pipe manager supports more complex use cases of multi-listener endpoints if one wants to trigger multiple endpoints on potentially different nodes. The lifetime of a pipe can also be configured—how many invocations are allowed before the callback is invalid. The pipe manager has a reference count for each pipe which gets decremented with each signal arrival. By default, callbacks are infinitely callable and do not expire.

The pipe manager also supports "typed" callbacks where the callee type is revealed to the caller. Typed callbacks are slightly more efficient because the type is exposed and registered type-erasure is not required (using lambdas).

Example callbacks

// Message sent from the callback to the callback endpoint
struct TestMsg : vt::Message {
  using MessageParentType = ::vt::Message;
  vt_msg_serialize_required(); // for string

  TestMsg() = default;

  explicit TestMsg(int in_val, std::string const& in_s = "hello")
    : val_(in_val),
      s_(in_s)
  { }

  template <typename SerializerT>
  void serialize(SerializerT& s) {
    MessageParentType::serialize(s);
    s | val_;
    s | s_;
  }

  int val_ = 0;
  std::string s_;
};

// Message containing the callback to invoke
struct HelloMsg : vt::Message {
  explicit HelloMsg(vt::Callback<TestMsg> in_cb)
    : cb_(in_cb)
  { }

  vt::Callback<TestMsg> cb_;
};

// Handler function to invoke the callback from
void hello_world(HelloMsg* msg) {
  static int val = 1;
  fmt::print("{}: Sending callback\n", vt::theContext()->getNode());
  msg->cb_.send(292 + val++, "test string");
}

void printOutput(TestMsg* msg, std::string type) {
  vt::NodeType this_node = vt::theContext()->getNode();
  fmt::print("{}: cb {}: val={}, str={}\n", this_node, type, msg->val_, msg->s_);
}

// Functor callback endpoint
struct CallbackFunctor {
  void operator()(TestMsg* msg) {
    printOutput(msg, "CallbackFunctor");
  }
};

// Function callback endpoint
static void callbackFunc(TestMsg* msg) {
  printOutput(msg, "callbackFunc");
}

struct MyObj {
  // Objgroup callback endpoint
  void handler(TestMsg* msg) {
    printOutput(msg, "MyObj::handler");
  }
};

struct MyCol : vt::Collection<MyCol, vt::Index1D> { };

// Collection handler callback endpoint
void colHan(MyCol* col, TestMsg* msg) {
  printOutput(msg, "MyCol colHan (non-intrusive)");
}

void bounceCallback(vt::Callback<TestMsg> cb) {
  auto msg = vt::makeMessage<HelloMsg>(cb);
  vt::theMsg()->sendMsg<hello_world>(1, msg);
}

int main(int argc, char** argv) {
  vt::initialize(argc, argv);

  vt::NodeType this_node = vt::theContext()->getNode();
  vt::NodeType num_nodes = vt::theContext()->getNumNodes();

  if (num_nodes == 1) {
    return vt::rerror("requires at least 2 nodes");
  }

  auto obj = vt::theObjGroup()->makeCollective<MyObj>("examples_callback");
  auto col = vt::makeCollection<MyCol>("examples_callback")
    .bounds(vt::Index1D(8))
    .bulkInsert()
    .wait();

  if (this_node == 0) {
    vt::NodeType dest = num_nodes > 2 ? 2 : 0;

    auto cb_functor = vt::theCB()->makeSend<CallbackFunctor>(dest);
    bounceCallback(cb_functor);

    auto cb_func = vt::theCB()->makeSend<callbackFunc>(dest);
    bounceCallback(cb_func);

    auto cb_obj = vt::theCB()->makeSend<&MyObj::handler>(obj[dest]);
    bounceCallback(cb_obj);

    auto cb_obj_bcast = vt::theCB()->makeBcast<&MyObj::handler>(obj);
    bounceCallback(cb_obj_bcast);

    auto cb_col = vt::theCB()->makeSend<colHan>(col[5]);
    bounceCallback(cb_col);

    auto cb_col_bcast = vt::theCB()->makeBcast<colHan>(col);
    bounceCallback(cb_col_bcast);
  }

  vt::finalize();

  return 0;
}