Traces and Metrics for Akka.Cluster DistributedPubSub
Scenario
On this page we will see how Phobos handles publisher-subscriber scenario in distributed environment - using Distributed Publish Subscribe.
Code
Here is a code sample we will use:
var phobosBuilder = new PhobosConfigBuilder().WithTracing(t =>
{
// As of https://github.com/petabridge/phobos/pull/779, these messages should be
// filtered out automatically by Phobos.
/*
// Exclude traces with internal cluster messages
t.AddIncludeMessageFilter(msg => !msg.GetType().Name.Contains("GossipTick") &&
!msg.GetType().Name.Contains("CurrentClusterState") &&
!msg.GetType().Name.Contains("Delta") &&
!msg.GetType().Name.Contains("Status"));
*/
});
var setup = Setup(Bootstrap(ClusterConfig), phobosBuilder);
// Create "publisher" system
using var sys1 = ActorSystem.Create("ClusterSystem", setup);
var node1 = sys1.ActorOf(Props.Create(() => new Node1Actor(TestActor)), "node1");
// Create "subscribers" system
using var sys2 = ActorSystem.Create("ClusterSystem", setup);
var node2 = sys2.ActorOf(Props.Create(() => new Node2Actor()), "node2");
// connect systems with each other
var cluster1 = Akka.Cluster.Cluster.Get(sys1);
await cluster1.JoinAsync(cluster1.SelfAddress);
await Akka.Cluster.Cluster.Get(sys2).JoinAsync(cluster1.SelfAddress);
// wait for subscriptions activated
await DistributedPubSub.Get(sys1).Mediator.Ask<SubscribeAck>(new Subscribe("strings", node1));
await DistributedPubSub.Get(sys2).Mediator.Ask<SubscribeAck>(new Subscribe("numbers", node2));
// wait until delivery is established
await AwaitAssertAsync(() =>
{
node1.Tell(-1);
ExpectMsg("-1");
}, TimeSpan.FromSeconds(10));
// publish and wait for messages
for (var i = 0; i < 10; ++i)
node1.Tell(i);
for (var i = 0; i < 10; ++i)
ExpectMsg(i.ToString());
private class Node1Actor : ReceiveActor
{
public Node1Actor(IActorRef testActor)
{
Receive<int>(msg => DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish("numbers", msg)));
Receive<string>(msg => testActor.Tell(msg));
}
}
private class Node2Actor : ReceiveActor
{
public Node2Actor()
{
Receive<int>(msg =>
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish("strings", msg.ToString())));
}
}
Messages are published by publisher on one node, and received by subscriber on another node from numbers
topic.
And then subscriber is publishing converted message back to first node via strings
topic.
Metrics
Unlike EventStream or Request-Response local scenarios, there are more actors created and messages published:
[!include[Metrics](../../../../projects/phobos/2.x/src/end2end/Phobos.Actor.End2End.Tests/Standalone/DistributedPubSubSpecs.Pub_sub_between_2_actors.approved.md)]In addition to our Node1
and Node2
actors that we are creating ourselves, we can see two DistributedPubSubMediator
actors created (one per each actor system in the cluster),
and two Akka.Cluster.Tools.PublishSubscribe.Internal.Topic
actors representing topics.
We can also see a set of messages coming into actor's mailboxes, including some internal onces like Akka.Cluster.Tools.PublishSubscribe.Publish
.
Note: Some metrics are not displayed here, like
GossipTick
message metrics - these are service messages used for cluster nodes communication, and are not representative for this scenario.Also, local port numbers are replaced with
port
here - you will have your own random ports displayed.
Traces
For each messages published, we have full-tracked message trace from publishing to subscriber's mailboxes through the actor systems boundaries. Here is a sample trace for the first message:
Here we can see that:
- Test actor is sending a message
0
to the publisher (node1
actor) Node1
is publishing a message - sendingPublish
envelope to system actor (it isDistributedPubSubMediator
actor of his actor system), we can also see the actual message content in span logs - i.e. topic is set tonumbers
.- Unwrapped message is forwarded to another system actor -
Topic
actor in this case, which in turn is forwarding it to theNode2
actor. - After message conversion ,
Node2
is publishing wrapped message back toDistributedPubSubMediator
which is forwarding it via anotherTopic
actor toNode1
.
As you can see, we have a pretty clear picture about where the messages goes and which actors are communicating. Isn't that great?