Search Results for

    Show / Hide Table of Contents

    Traces and Metrics for Akka.Cluster DistributedPubSub Scenario

    On this page we will see how Phobos handles publisher-subscriber scenario in distributed environment - using Distributed Publish Subscribe.

    Code

    Here is a code sample we will use:

    
    var phobosBuilder = new PhobosConfigBuilder().WithTracing(t =>
    {
        // As of https://github.com/petabridge/phobos/pull/779, these messages should be
        // filtered out automatically by Phobos.
        /*
        // Exclude traces with internal cluster messages
        t.AddIncludeMessageFilter(msg => !msg.GetType().Name.Contains("GossipTick") &&
                                  !msg.GetType().Name.Contains("CurrentClusterState") &&
                                  !msg.GetType().Name.Contains("Delta") &&
                                  !msg.GetType().Name.Contains("Status"));
        */
    });
    var setup = Setup(Bootstrap(ClusterConfig), phobosBuilder);
    
    // Create "publisher" system
    using var sys1 = ActorSystem.Create("ClusterSystem", setup);
    var node1 = sys1.ActorOf(Props.Create(() => new Node1Actor(TestActor)), "node1");
    
    // Create "subscribers" system
    using var sys2 = ActorSystem.Create("ClusterSystem", setup);
    var node2 = sys2.ActorOf(Props.Create(() => new Node2Actor()), "node2");
    
    // connect systems with each other
    var cluster1 = Akka.Cluster.Cluster.Get(sys1);
    await cluster1.JoinAsync(cluster1.SelfAddress);
    await Akka.Cluster.Cluster.Get(sys2).JoinAsync(cluster1.SelfAddress);
    
    // wait for subscriptions activated
    await DistributedPubSub.Get(sys1).Mediator.Ask<SubscribeAck>(new Subscribe("strings", node1));
    await DistributedPubSub.Get(sys2).Mediator.Ask<SubscribeAck>(new Subscribe("numbers", node2));
    
    // wait until delivery is established
    await AwaitAssertAsync(() =>
    {
        node1.Tell(-1);
        ExpectMsg("-1");
    }, TimeSpan.FromSeconds(10));
    
    // publish and wait for messages
    for (var i = 0; i < 10; ++i)
        node1.Tell(i);
    for (var i = 0; i < 10; ++i)
        ExpectMsg(i.ToString());
    
    
    
    private class Node1Actor : ReceiveActor
    {
        public Node1Actor(IActorRef testActor)
        {
            Receive<int>(msg => DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish("numbers", msg)));
            Receive<string>(msg => testActor.Tell(msg));
        }
    }
    
    private class Node2Actor : ReceiveActor
    {
        public Node2Actor()
        {
            Receive<int>(msg =>
                DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish("strings", msg.ToString())));
        }
    }
    
    

    Messages are published by publisher on one node, and received by subscriber on another node from numbers topic. And then subscriber is publishing converted message back to first node via strings topic.

    Metrics

    Unlike EventStream or Request-Response local scenarios, there are more actors created and messages published:

    [!include[Metrics](../../../../projects/phobos/2.x/src/end2end/Phobos.Actor.End2End.Tests/Standalone/DistributedPubSubSpecs.Pub_sub_between_2_actors.approved.md)]

    In addition to our Node1 and Node2 actors that we are creating ourselves, we can see two DistributedPubSubMediator actors created (one per each actor system in the cluster), and two Akka.Cluster.Tools.PublishSubscribe.Internal.Topic actors representing topics.

    We can also see a set of messages coming into actor's mailboxes, including some internal onces like Akka.Cluster.Tools.PublishSubscribe.Publish.

    Note: Some metrics are not displayed here, like GossipTick message metrics - these are service messages used for cluster nodes communication, and are not representative for this scenario.

    Also, local port numbers are replaced with port here - you will have your own random ports displayed.

    Traces

    For each messages published, we have full-tracked message trace from publishing to subscriber's mailboxes through the actor systems boundaries. Here is a sample trace for the first message:

    [!include[Sample](../../../../projects/phobos/2.x/src/end2end/Phobos.Actor.End2End.Tests/Standalone/DistributedPubSubSpecs.Pub_sub_between_2_actors.approved.mermaid)]

    Here we can see that:

    1. Test actor is sending a message 0 to the publisher (node1 actor)
    2. Node1 is publishing a message - sending Publish envelope to system actor (it is DistributedPubSubMediator actor of his actor system), we can also see the actual message content in span logs - i.e. topic is set to numbers.
    3. Unwrapped message is forwarded to another system actor - Topic actor in this case, which in turn is forwarding it to the Node2 actor.
    4. After message conversion , Node2 is publishing wrapped message back to DistributedPubSubMediator which is forwarding it via another Topic actor to Node1.

    As you can see, we have a pretty clear picture about where the messages goes and which actors are communicating. Isn't that great?

    In This Article
    Back to top Generated by DocFX