Apache pulsar
Apache pulsar is the messaging system at the heart of celte. It is a pub / sub
system, where messages are produced on topics and consumers can subscribe to those topics in order to receive the data published to it.
Topics
Data published on pulsar topic is mainly Remote procedure calls but some other data streams exist such as Inputs or replication data.
Celte data is divided on various topics to segment the data and group relevant information together.
- Each peer gets a
{peer uuid}.rpc
topic where RPCs that concern this peer and only this peer can be called - Grapes have:
- a
{grape uuid}.rpc
topic where RPCS concerning all peers who have this grape instantiated are called - a
{grape uuid}
topic, which is only red on by the server node owning the grape. RPCs published on this topic are not meant for all to execute, only by the rightful owner of the grape
- a
- Entity containers have:
- an
{entity container uuid}.rpc
topic for listening to method calls that concern all entities replicated by this container - an
{entity container uuid}.repl
topic for server nodes to broadcast the replication data of the entities owned by the container - an
{entity container uuid}.input
topic for clients to send their inputs to the network. Peers replicating the container can use these inputs to simulate the game until the server node sends the associated replication data.
- an
- The master server gets:
master.hello.sn
where server nodes send their uuid when they are ready to start servingmaster.hello.client
where clients send their uuid to connect to the cluster and get assigned to a server nodemaster.rpc
where any remote call to run on the master can be sent.
Encapsulation
For the user who wants to learn more about pulsar, the official documentation is available here.
Celte encapsulates the pulsar consumer
and producer
to achieve the following goals:
- streamlining the creation of a message to send by serializing request
structs
. Structs make it easy for the caller / callee to manipulate the data in the requests while celte handles packaging the data into json (which might change to a more optimized format in the future).struct SpawnPositionRequest
: public celte::net::CelteRequest<SpawnPositionRequest> {
std::string clientId;
std::string grapeId;
float x;
float y;
float z;
void to_json(nlohmann::json &j) const {
j = nlohmann::json{{"clientId", clientId},
{"grapeId", grapeId},
{"x", x},
{"y", y},
{"z", z}};
}
void from_json(const nlohmann::json &j) {
j.at("clientId").get_to(clientId);
j.at("grapeId").get_to(grapeId);
j.at("x").get_to(x);
j.at("y").get_to(y);
j.at("z").get_to(z);
}
}; - making it easy to decide in which context the callbacks on message reception should be executed (i.e completely async or in celte's main thread).
- increase dynamism by automatically creating new producers for new topics while deleting usused producers.
To achieve this, Celte has three main classes dedicated to sending / receiving messages over pulsar.
ReaderStream
The ReaderStream
class lets the user subscribe to a topic and register message handlers.
The options when creating a ReaderStream
are the following:
template <typename Req> struct Options {
std::string thisPeerUuid;
// subscribe to multiple topics if needed
std::vector<std::string> topics;
// subscription name for pulsar partionning and source tracking
std::string subscriptionName;
// if true, only this stream can read from this topic on the whole network
bool exclusive = false;
// message handler that runs in celte's main thread
std::function<void(const pulsar::Consumer, Req)> messageHandlerSync =
nullptr;
// message handler than runs asyncronously
std::function<void(const pulsar::Consumer, Req)> messageHandler = nullptr;
// callback called in celte's main thread when the reader is ready to be used
std::function<void()> onReadySync = nullptr;
// callback called in celte's main thread when the reader encounters a connection error
std::function<void()> onConnectErrorSync = nullptr;
// callback called asyncronously when the reader is ready to be used.
std::function<void()> onReady = nullptr;
// callback called asyncronously when the reader encounters a connection error.
std::function<void()> onConnectError = nullptr;
};
ReaderStreams
are templated and only expect one type of request.
auto rs = std::make_shared<ReaderStream>();
_readerStreams.push_back(rs);
rs->Open<Req>(options); // subscribes to pulsar topics
return rs;
WriterStream
The WriterStream
class lets the user send messages asyncronously, and execute a callback when the message has been delivered successfully.
The options work in the same fashion as those for the ReaderStream
class.
struct Options {
std::string topic;
bool exclusive = false;
std::function<void(WriterStream &)> onReadySync = nullptr;
std::function<void()> onConnectErrorSync = nullptr;
std::function<void(WriterStream &)> onReady = nullptr;
std::function<void()> onConnectError = nullptr;
};
auto ws = std::make_shared<WriterStream>(options);
_writerStreams[options.topic] = ws;
ws->Open<Req>();
return ws;
WriterStreamPool
Sometimes, the destination topic for the messages changes often. In that case, having a single writer stream is not efficient. Using the WriterStreamPool
creates producers to the desired topic and destroys it when it stops being used.
_writerStreamPool.emplace(
net::WriterStreamPool::Options{
.idleTimeout = std::chrono::milliseconds(1000),
},
RUNTIME.IO());
std::string topic = "test_topic";
req::BinaryDataPacket packet{.binaryData = msg,
.peerUuid = RUNTIME.GetUUID()}; // any type inheriting from CelteRequest can be used
_writerStreamPool->Write(topic, packet, []() {
std::cout << "message delivered!" << std::endl;
});