Traces and Metrics for Akka.Cluster DistributedPubSub
Scenario
On this page we will see how Phobos handles publisher-subscriber scenario in distributed environment - using Distributed Publish Subscribe.
Code
Here is a code sample we will use:
var phobosBuilder = new PhobosConfigBuilder().WithTracing(t =>
{
// Exclude traces with internal cluster messages
t.AddMessageFilter(msg => !msg.GetType().Name.Contains("GossipTick") &&
!msg.GetType().Name.Contains("CurrentClusterState") &&
!msg.GetType().Name.Contains("Delta") &&
!msg.GetType().Name.Contains("Status"));
});
var setup = Setup(Bootstrap(ClusterConfig), phobosBuilder);
// Create "publisher" system
var sys1 = ActorSystem.Create("ClusterSystem", setup);
var node1 = sys1.ActorOf(Props.Create(() => new Node1Actor(TestActor)), "node1");
// Create "subscribers" system
var sys2 = ActorSystem.Create("ClusterSystem", setup);
var node2 = sys2.ActorOf(Props.Create(() => new Node2Actor()), "node2");
// connect systems with each other
var cluster1 = Akka.Cluster.Cluster.Get(sys1);
await cluster1.JoinAsync(cluster1.SelfAddress);
await Akka.Cluster.Cluster.Get(sys2).JoinAsync(cluster1.SelfAddress);
// wait for subscriptions activated
await DistributedPubSub.Get(sys1).Mediator.Ask<SubscribeAck>(new Subscribe("strings", node1));
await DistributedPubSub.Get(sys2).Mediator.Ask<SubscribeAck>(new Subscribe("numbers", node2));
// wait until delivery is established
await AwaitAssertAsync(() =>
{
node1.Tell(-1);
ExpectMsg("-1");
}, TimeSpan.FromSeconds(10));
// publish and wait for messages
for (var i = 0; i < 10; ++i)
node1.Tell(i);
for (var i = 0; i < 10; ++i)
ExpectMsg(i.ToString());
class Node1Actor : ReceiveActor
{
public Node1Actor(IActorRef testActor)
{
Receive<int>(msg => DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish("numbers", msg)));
Receive<string>(msg => testActor.Tell(msg));
}
}
class Node2Actor : ReceiveActor
{
public Node2Actor()
{
Receive<int>(msg => DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish("strings", msg.ToString())));
}
}
Messages are published by publisher on one node, and received by subscriber on another node from numbers
topic.
And then subscriber is publishing converted message back to first node via strings
topic.
Metrics
Unlike EventStream or Request-Response local scenarios, there are more actors created and messages published:
Context: Akka.NET
Counters
Name | Tags | Value |
---|---|---|
akka.actor.created | actortype: Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubMediator akka_address: akka.tcp://[email protected]:port |
2 |
akka.actor.created | actortype: Akka.Cluster.Tools.PublishSubscribe.Internal.Topic akka_address: akka.tcp://[email protected]:port |
2 |
akka.actor.created | actortype: Phobos.Actor.End2End.Tests.Standalone.DistributedPubSubSpecs+Node1Actor akka_address: akka.tcp://[email protected]:port |
1 |
akka.actor.created | actortype: Phobos.Actor.End2End.Tests.Standalone.DistributedPubSubSpecs+Node2Actor akka_address: akka.tcp://[email protected]:port |
1 |
Meters
Name | Tags | Value |
---|---|---|
akka.messages.recv | actortype: Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubMediator messagetype: Akka.Cluster.Tools.PublishSubscribe.Internal.Subscribed akka_address: akka.tcp://[email protected]:port |
2 |
akka.messages.recv | actortype: Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubMediator messagetype: Akka.Cluster.Tools.PublishSubscribe.Publish akka_address: akka.tcp://[email protected]:port |
23 |
akka.messages.recv | actortype: Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubMediator messagetype: Akka.Cluster.Tools.PublishSubscribe.Subscribe akka_address: akka.tcp://[email protected]:port |
2 |
akka.messages.recv | actortype: Akka.Cluster.Tools.PublishSubscribe.Internal.Topic messagetype: Akka.Cluster.Tools.PublishSubscribe.Subscribe akka_address: akka.tcp://[email protected]:port |
2 |
akka.messages.recv | actortype: Akka.Cluster.Tools.PublishSubscribe.Internal.Topic messagetype: int akka_address: akka.tcp://[email protected]:port |
11 |
akka.messages.recv | actortype: Akka.Cluster.Tools.PublishSubscribe.Internal.Topic messagetype: string akka_address: akka.tcp://[email protected]:port |
11 |
akka.messages.recv | actortype: Phobos.Actor.End2End.Tests.Standalone.DistributedPubSubSpecs+Node1Actor messagetype: int akka_address: akka.tcp://[email protected]:port |
12 |
akka.messages.recv | actortype: Phobos.Actor.End2End.Tests.Standalone.DistributedPubSubSpecs+Node1Actor messagetype: string akka_address: akka.tcp://[email protected]:port |
11 |
akka.messages.recv | actortype: Phobos.Actor.End2End.Tests.Standalone.DistributedPubSubSpecs+Node2Actor messagetype: int akka_address: akka.tcp://[email protected]:port |
11 |
Context: Test
In addition to our Node1
and Node2
actors that we are creating ourselves, we can see two DistributedPubSubMediator
actors created (one per each actor system in the cluster),
and two Akka.Cluster.Tools.PublishSubscribe.Internal.Topic
actors representing topics.
We can also see a set of messages coming into actor's mailboxes, including some internal onces like Akka.Cluster.Tools.PublishSubscribe.Publish
.
Note: Some metrics are not displayed here, like
GossipTick
message metrics - these are service messages used for cluster nodes communication, and are not representative for this scenario.Also, local port numbers are replaced with
port
here - you will have your own random ports displayed.
Traces
For each messages published, we have full-tracked message trace from publishing to subscriber's mailboxes through the actor systems boundaries. Here is a sample trace for the first message:
ParentIndex: none
Tags:
akka.actor.recv.msgType: int
Logs:
message: 0
/user/node1->>sys/.../distributedPub...: akka.msg.recv Publish Note over /user/node1,sys/.../distributedPub...: SpanIndex: 1
ParentIndex: 0
Tags:
akka.actor.recv.msgType:
Akka.Cluster.Tools.PublishSubscribe.Publish
References:
child_of: SpanIndex=0
Logs:
message:
Publish"topic=numbers, sendOneToEachGroup=False, message=0"
/user/node1->>sys/.../numbers: akka.msg.recv Int32 Note over /user/node1,sys/.../numbers: SpanIndex: 2
ParentIndex: 1
Tags:
akka.actor.recv.msgType: int
References:
child_of: SpanIndex=1
Logs:
message: 0
/user/node1->>/user/node2: akka.msg.recv Int32 Note over /user/node1,/user/node2: SpanIndex: 3
ParentIndex: 2
Tags:
akka.actor.recv.msgType: int
References:
child_of: SpanIndex=2
Logs:
message: 0
/user/node2->>sys/.../distributedPub...: akka.msg.recv Publish Note over /user/node2,sys/.../distributedPub...: SpanIndex: 4
ParentIndex: 3
Tags:
akka.actor.recv.msgType:
Akka.Cluster.Tools.PublishSubscribe.Publish
References:
child_of: SpanIndex=3
Logs:
message:
Publish"topic=strings, sendOneToEachGroup=False, message=0"
/user/node2->>sys/.../strings: akka.msg.recv String Note over /user/node2,sys/.../strings: SpanIndex: 5
ParentIndex: 4
Tags:
akka.actor.recv.msgType: string
References:
child_of: SpanIndex=4
Logs:
message: 0
/user/node2->>/user/node1: akka.msg.recv String Note over /user/node2,/user/node1: SpanIndex: 6
ParentIndex: 5
Tags:
akka.actor.recv.msgType: string
References:
child_of: SpanIndex=5
Logs:
message: 0
Here we can see that:
- Test actor is sending a message
0
to the publisher (node1
actor) Node1
is publishing a message - sendingPublish
envelope to system actor (it isDistributedPubSubMediator
actor of his actor system), we can also see the actual message content in span logs - i.e. topic is set tonumbers
.- Unwrapped message is forwarded to another system actor -
Topic
actor in this case, which in turn is forwarding it to theNode2
actor. - After message conversion ,
Node2
is publishing wrapped message back toDistributedPubSubMediator
which is forwarding it via anotherTopic
actor toNode1
.
As you can see, we have a pretty clear picture about where the messages goes and which actors are communicating. Isn't that great?