Show / Hide Table of Contents

    Traces and Metrics for Akka.Cluster DistributedPubSub Scenario

    On this page we will see how Phobos handles publisher-subscriber scenario in distributed environment - using Distributed Publish Subscribe.

    Code

    Here is a code sample we will use:

    
    var phobosBuilder = new PhobosConfigBuilder().WithTracing(t =>
    {
        // Exclude traces with internal cluster messages
        t.AddMessageFilter(msg => !msg.GetType().Name.Contains("GossipTick") &&
                                  !msg.GetType().Name.Contains("CurrentClusterState") &&
                                  !msg.GetType().Name.Contains("Delta") &&
                                  !msg.GetType().Name.Contains("Status"));
    });
    var setup = Setup(Bootstrap(ClusterConfig), phobosBuilder);
    
    // Create "publisher" system
    var sys1 = ActorSystem.Create("ClusterSystem", setup);
    var node1 = sys1.ActorOf(Props.Create(() => new Node1Actor(TestActor)), "node1");
    
    // Create "subscribers" system
    var sys2 = ActorSystem.Create("ClusterSystem", setup);
    var node2 = sys2.ActorOf(Props.Create(() => new Node2Actor()), "node2");
    
    // connect systems with each other
    var cluster1 = Akka.Cluster.Cluster.Get(sys1);
    await cluster1.JoinAsync(cluster1.SelfAddress);
    await Akka.Cluster.Cluster.Get(sys2).JoinAsync(cluster1.SelfAddress);
    
    // wait for subscriptions activated
    await DistributedPubSub.Get(sys1).Mediator.Ask<SubscribeAck>(new Subscribe("strings", node1)); 
    await DistributedPubSub.Get(sys2).Mediator.Ask<SubscribeAck>(new Subscribe("numbers", node2));
    
    // wait until delivery is established
    await AwaitAssertAsync(() =>
    {
        node1.Tell(-1);
        ExpectMsg("-1");
    }, TimeSpan.FromSeconds(10));
    
    // publish and wait for messages
    for (var i = 0; i < 10; ++i)
        node1.Tell(i);
    for (var i = 0; i < 10; ++i)
        ExpectMsg(i.ToString());
    
    
    
    class Node1Actor : ReceiveActor
    {
        public Node1Actor(IActorRef testActor)
        {
            Receive<int>(msg => DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish("numbers", msg)));
            Receive<string>(msg => testActor.Tell(msg));
        }
    }
    
    class Node2Actor : ReceiveActor
    {
        public Node2Actor()
        {
            Receive<int>(msg => DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish("strings", msg.ToString())));
        }
    }
    
    

    Messages are published by publisher on one node, and received by subscriber on another node from numbers topic. And then subscriber is publishing converted message back to first node via strings topic.

    Metrics

    Unlike EventStream or Request-Response local scenarios, there are more actors created and messages published:

    Context: Akka.NET

    Counters

    Name Tags Value
    akka.actor.created actortype: Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubMediator
    akka_address: akka.tcp://[email protected]:port
    2
    akka.actor.created actortype: Akka.Cluster.Tools.PublishSubscribe.Internal.Topic
    akka_address: akka.tcp://[email protected]:port
    2
    akka.actor.created actortype: Phobos.Actor.End2End.Tests.Standalone.DistributedPubSubSpecs+Node1Actor
    akka_address: akka.tcp://[email protected]:port
    1
    akka.actor.created actortype: Phobos.Actor.End2End.Tests.Standalone.DistributedPubSubSpecs+Node2Actor
    akka_address: akka.tcp://[email protected]:port
    1

    Meters

    Name Tags Value
    akka.messages.recv actortype: Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubMediator
    messagetype: Akka.Cluster.Tools.PublishSubscribe.Internal.Subscribed
    akka_address: akka.tcp://[email protected]:port
    2
    akka.messages.recv actortype: Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubMediator
    messagetype: Akka.Cluster.Tools.PublishSubscribe.Publish
    akka_address: akka.tcp://[email protected]:port
    23
    akka.messages.recv actortype: Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubMediator
    messagetype: Akka.Cluster.Tools.PublishSubscribe.Subscribe
    akka_address: akka.tcp://[email protected]:port
    2
    akka.messages.recv actortype: Akka.Cluster.Tools.PublishSubscribe.Internal.Topic
    messagetype: Akka.Cluster.Tools.PublishSubscribe.Subscribe
    akka_address: akka.tcp://[email protected]:port
    2
    akka.messages.recv actortype: Akka.Cluster.Tools.PublishSubscribe.Internal.Topic
    messagetype: int
    akka_address: akka.tcp://[email protected]:port
    11
    akka.messages.recv actortype: Akka.Cluster.Tools.PublishSubscribe.Internal.Topic
    messagetype: string
    akka_address: akka.tcp://[email protected]:port
    11
    akka.messages.recv actortype: Phobos.Actor.End2End.Tests.Standalone.DistributedPubSubSpecs+Node1Actor
    messagetype: int
    akka_address: akka.tcp://[email protected]:port
    12
    akka.messages.recv actortype: Phobos.Actor.End2End.Tests.Standalone.DistributedPubSubSpecs+Node1Actor
    messagetype: string
    akka_address: akka.tcp://[email protected]:port
    11
    akka.messages.recv actortype: Phobos.Actor.End2End.Tests.Standalone.DistributedPubSubSpecs+Node2Actor
    messagetype: int
    akka_address: akka.tcp://[email protected]:port
    11

    Context: Test

    In addition to our Node1 and Node2 actors that we are creating ourselves, we can see two DistributedPubSubMediator actors created (one per each actor system in the cluster), and two Akka.Cluster.Tools.PublishSubscribe.Internal.Topic actors representing topics.

    We can also see a set of messages coming into actor's mailboxes, including some internal onces like Akka.Cluster.Tools.PublishSubscribe.Publish.

    Note: Some metrics are not displayed here, like GossipTick message metrics - these are service messages used for cluster nodes communication, and are not representative for this scenario.

    Also, local port numbers are replaced with port here - you will have your own random ports displayed.

    Traces

    For each messages published, we have full-tracked message trace from publishing to subscriber's mailboxes through the actor systems boundaries. Here is a sample trace for the first message:

    sequenceDiagram sys/.../testActor->>/user/node1: akka.msg.recv Int32 Note over sys/.../testActor,/user/node1: SpanIndex: 0
    ParentIndex: none
    Tags:
      akka.actor.recv.msgType: int
    Logs:
      message: 0
    /user/node1->>sys/.../distributedPub...: akka.msg.recv Publish Note over /user/node1,sys/.../distributedPub...: SpanIndex: 1
    ParentIndex: 0
    Tags:
      akka.actor.recv.msgType:
        Akka.Cluster.Tools.PublishSubscribe.Publish
    References:
      child_of: SpanIndex=0
    Logs:
      message:
        Publish"topic=numbers, sendOneToEachGroup=False, message=0"
    /user/node1->>sys/.../numbers: akka.msg.recv Int32 Note over /user/node1,sys/.../numbers: SpanIndex: 2
    ParentIndex: 1
    Tags:
      akka.actor.recv.msgType: int
    References:
      child_of: SpanIndex=1
    Logs:
      message: 0
    /user/node1->>/user/node2: akka.msg.recv Int32 Note over /user/node1,/user/node2: SpanIndex: 3
    ParentIndex: 2
    Tags:
      akka.actor.recv.msgType: int
    References:
      child_of: SpanIndex=2
    Logs:
      message: 0
    /user/node2->>sys/.../distributedPub...: akka.msg.recv Publish Note over /user/node2,sys/.../distributedPub...: SpanIndex: 4
    ParentIndex: 3
    Tags:
      akka.actor.recv.msgType:
        Akka.Cluster.Tools.PublishSubscribe.Publish
    References:
      child_of: SpanIndex=3
    Logs:
      message:
        Publish"topic=strings, sendOneToEachGroup=False, message=0"
    /user/node2->>sys/.../strings: akka.msg.recv String Note over /user/node2,sys/.../strings: SpanIndex: 5
    ParentIndex: 4
    Tags:
      akka.actor.recv.msgType: string
    References:
      child_of: SpanIndex=4
    Logs:
      message: 0
    /user/node2->>/user/node1: akka.msg.recv String Note over /user/node2,/user/node1: SpanIndex: 6
    ParentIndex: 5
    Tags:
      akka.actor.recv.msgType: string
    References:
      child_of: SpanIndex=5
    Logs:
      message: 0

    Here we can see that:

    1. Test actor is sending a message 0 to the publisher (node1 actor)
    2. Node1 is publishing a message - sending Publish envelope to system actor (it is DistributedPubSubMediator actor of his actor system), we can also see the actual message content in span logs - i.e. topic is set to numbers.
    3. Unwrapped message is forwarded to another system actor - Topic actor in this case, which in turn is forwarding it to the Node2 actor.
    4. After message conversion , Node2 is publishing wrapped message back to DistributedPubSubMediator which is forwarding it via another Topic actor to Node1.

    As you can see, we have a pretty clear picture about where the messages goes and which actors are communicating. Isn't that great?

    Back to top Generated by DocFX