Asynchronous Pub/Sub Messaging in Java with NATS JetStream

Asynchronous Pub/Sub Messaging in Java with NATS JetStream

Introduction

When we build large-scale distributed clusters of applications, we utilize all our efforts to break the monoliths into small containerized workloads that communicate among themselves and share information to perform various actions.

We do not spend much time designing a messaging system.

Messaging is typically treated as the central nervous system of any large-scale distributed system. Usually, the in-memory communications within monoliths are being converted into over-the-wire communication.

If we wire all the communications within a cluster, it forms mesh-like modules where each service calls another service in a synchronous fashion, which isn't ideal due to large waiting times in the request-response lifecycle.

This messy mesh can be fixed by introducing an asynchronous messaging cluster in between the services, instead of a synchronous one.

Instead of having point-to-point communication between two microservices, we can delegate their messages into a kind of hub-and-spoke topology. Thus messaging is a glue that ties the whole system together.

In this guide, we'll be using NATS JetStream to perform asynchronous message delivery, via the Publish/Subscribe pattern.

So how do we choose a message broker or messaging architecture for our application?

Choosing a messaging system can feel pretty overwhelming, with a large number of options already available and new ones popping up every day, each with different advantages.

Choosing a Distributed Messaging System

Most notably, we've got the widely popular and quite frequently used Apache Kafka which is often referred to as a distributed log store.

The messages published to topics in Kafka persist for some time, and the concept of consumer groups allows messages to be distributed evenly among multiple instances of the same service. It is extremely powerful, but with power comes with great responsibility and maintentance. Kafka is notably difficult to maintain and has a steep learning curve for any team looking to skill up on the technology.

Another unique choice is RabbitMQ. RabbitMQ uses the Advanced Message Queuing Protocol for messaging. It is significantly lightweight as well.

Instead of using the concept of unique consumer groups, RabbitMQ takes the simpler approach of having clients consume queues. If a client doesn’t acknowledge a message, it will go back in the queue to be processed by another.

All of these products has some sweet-spot and do shine on their use-cases.

So what if someone wants to really embrace the idea of having a simplistic yet ultra-high performant system without the extra overhead of maintaining it? What if someone would like to do traditional pub/sub, but also request/reply and maybe even scatter-gather, all while keeping things simple and light?

This is where the NATS messaging system might be the best fit into your solution.

Introducing NATS

NATS is a production-proven, cloud-native messaging system made for developers or operators who wants to spend more time implementing business logic and less time worrying about how to do messaging.

It's an incredibly fast, open-source messaging system built on a simple yet powerful core. The server uses a text-based protocol, so while there are a number of language-specific client libraries, you can literally telnet into a NATS server to send and receive messages.

NATS is designed to be always-on, connected, and ready to accept commands. If you are old enough to know what a dial-tone is, then it's worth mentioning that the NATS team likes to use that analogy for its design.

Some of the salient features of NATS include:

  • Ultra-high performance
  • Low Configuration
    • Clients only need a URL and credentials
    • Servers auto-discover themselves
  • Ability to expand architecture without impacting running services
  • Self-heals and is always available
  • Supports multiple delivery modes:
    • At most once(Nats Core)
    • At least once(NATS Streaming or JetStream)
  • Store messages to persistent stores and replay by time or sequence
  • Wildcard support
  • Data at REST encryption
  • Cleanse specific messages(GDPR)
  • Horizontal Scalability
  • Full TLS support: CA certificates, bidirectional support
  • Support for standard user/password auth/usage of JWT
  • Permission restrictions
  • Secure Multi-tenancy with data isolation
  • Share data between accounts
  • Have 30+ client libraries written in different languages

Messaging Patterns

NATS supports 4 main patterns for communication. They are:

  • Subject-based
  • Publish-Subscribe
  • Request-Reply/Scatter-Gather
  • Queue-groups

Each of these is a different paradigm and has its use-case, with some overlap. Allowing for all four of these patterns gives NATS great flexibility and functionality for various different circumstances between multiple applications, or one large monolith.

Subject-based messaging

A Subject in NATS is simply a string representing an interest in data. It is hierarchially tokenized to support wildcard subscriptions:

  • foo.* matches foo.bar and foo.baz
  • foo.*.bar matches foo.a.bar and foo.b.bar
  • foo.> matches any of the above
  • > matches everything in NATS

This pattern of messaging allows the publisher to share data using a Subject, and consumers can receive these messages by listening to these subjects using wildcards.

In a sense, this paradigm is based on the Observer Design Pattern, which typically has a Subject and Observers.

For example, if someone sends the message to 'audit.us.east', then all the subscribers listening to the exact subject or a wildcard subject would receive this message.

Publish-Subscribe messaging

This is one of the traditional messaging patterns where Publishers publish a message to a Subscriber list where each subscriber is individually subscribed to it.

This is analogous to a newsletter, and this pattern is extensively used in various systems. From notification/alert systems to VoD platforms such as YouTube.

This is the pattern we'll be using in this guide.

Request-Reply Messaging/Scatter-Gather pattern

When we make REST API calls, where we issue an HTTP request and receive a response, we are using a traditional synchronous request-response pattern. The Request-Reply pattern is often difficult or sometimes requires complex solutions or compromises. This pattern is fairly simple when implemented using NATS as it just needs you to supply a "reply-to" subject while publishing a message.

This pattern can also be called as Scatter-Gather pattern, where a publisher publishes a message on a subject to an unknown number of subscribers concurrently. Then all the listeners listening to this subject would get active and start processing. The Publisher then would wait to accumulate all the replies from some or all of the subscribers.

Queue Groups

Sometimes in a distributed cluster, you have to load-balance multiple applications or multiple instances of the same application. This pattern would be a perfect solution to load-balance the messages across multiple subscribers who have subscribed to the same subject.

The best part of this solution is, unlike other messaging systems, it doesn't require any configuration at the NATS server. The queue groups are defined by the application and their queue subscribers and are being managed among themselves.

To create a queue subscription, all the subscribers register a queue name. As messages on the registered subject are published, one member of the group is chosen randomly to receive the message. Although queue groups have multiple subscribers, each message is consumed by only one.

All of these patterns need zero configuration at the NATS server.

It is totally driven by the application or the client libraries. So let's look into the jnats Java Client library to see how we can define some of these patterns and perform asynchronous messaging.

Basic NATS Server, NATS Streaming and NATS JetStream

The first NATS cloud-native messaging ecosystem was introduced with NATS server based on 'At-most once' delivery model - messages are delivered once at most. It used to forward the published messages to the consumers with incredible speeds, setting the new performance treshold for the industry. For some applications, the performance basic NATS offered outweighed the potential losses from lost messages.

But with the 'At-most once' delivery model, if any of the subscribers are down, the messages sent to the would never arrive, and thus there is no guarantee of delivery for the data.

This was analogous to the super-fast UDP protocol used for most streaming services, where the speed of data was more important than the integrity of data. You'd rather lose a few pixels in a video or have a lower resolution than have to wait a prolonged period to hear someone's voice.

But this isn't something you want to happen in a financial transaction. Losing a bit here and there might change someone's bill or the recepient's address.

As a response to this NATS Streaming was introduced, which traded some of the performance for message persistence. Not much performance was sacrificed and NATS Streaming was a lightweight and performant platform which used basic NATS under the hood. It was built with the 'At-least once' delivery model with the capability of sending ACK messages for publishers and subscribers.

This is analogous to TCP, which guarantees data integrity and resends the packages if an ACK message isn't received back, denoting that the client might not have received the package.

When the messages are published, they're persisted for some time (customizable) so that it can be replayed to consumers if they haven't received it. Although this component was extremely performant and lightweight, it is not as powerful as distributed streaming systems like Kafka in terms of capability and maturity.

Developers posed requirements like distributed security, decentralized management, multi-tenancy, global scaling with superclusters, and secure sharing of data which gave rise to the next-generation of NATS Streaming in the era of NATS 2.0, known as NATS JetStream.

For modern streaming systems with distributed clusters, it is advisable to use the latest NATS JetStream offering. JetStream was created to solve the problems identified with streaming technology today - complexity, fragility, and a lack of scalability. We are going to play around with JetStream further in this article.

Asynchronous Pub/Sub Messaging in Java with NATS JetStream

Project Setup

Running or installing a NATS JetStream server is pretty easy. Whether you want to host this cluster on a Windows, Mac or Linux machine, Docker Engine makes the setup really easy.

We will be using a Docker container to host a JetStream server. In order to run the Docker image, we can simply run:

$ docker run -ti -p 4222:4222 --name jetstream synadia/jsm:latest server

Once you run that, you wil be greeted with something along the lines of:

NATS has got a vast list of client libraries in different languages with an active community of 1000+ contributors. It joined CNCF (Cloud Native Computing Foundation) as an incubating project in 2018.

We will be using the NATS Java client known as jnats. In order to connect to NATS JetStream, we just need to define a dependency in the pom.xml:

<dependency>
    <groupId>io.nats</groupId>
    <artifactId>jnats</artifactId>
    <version>${version}</version>
</dependency>

That's it! We are ready to go. Now let's look into a few of our use-cases. As always, if you get stuck, you can find the full source code on GitHub.

Publisher/Subscriber Streaming

Let's try defining a traditional Publisher/Subscriber model by creating a new Stream and a subject. Streams in NATS JetStream represent any stream of data between two endpoints and are the central building block of the API.

We will be creating a single class to first publish few messages and then subscribe to read those messages and send an acknowledgement:

public class PubSubAsync {
// Proceeding code goes here
}

Let's go ahead and define some global static settings, such as the stream name, subject, default message and server:

private static final String defaultStream = "pubsubasync-stream";
private static final String defaultSubject = "pubsubasync-subject";
private static final String defaultMessage = "Hello User";
private static final int defaultMessageCount = 2;
private static final String defaultServer = "nats://localhost:4222";

We'll be using these later on while progamatically setting the streams up, to avoid hardcoding variables into them.

Let's start off by setting up a Connection to the NATS JetStream server, instantiating a JetStreamManagement instance, which is used to add Stream instances, and a StreamConnfiguration instance - built via the Builder Design Pattern to allow for flexibility when defining settings.

Free eBook: Git Essentials

Check out our hands-on, practical guide to learning Git, with best-practices, industry-accepted standards, and included cheat sheet. Stop Googling Git commands and actually learn it!

The connection made to the NATS server can fail so you'll want to wrap *all of the proceeding code in a try-catch block. We'll be using a try-with-resources block since this is a closable connection so we don't have to manually close it:

try (Connection nc = Nats.connect(defaultServer)) {
    // Creating streams, managers, sending messages, subscribing, etc.
} catch (Exception e) {
    e.printStackTrace();
}

Within the try block we'll start off by creating a JetStreamManagement instance alongside a StreamConfiguration and JetStream context.

The JetStream class is the central API of the framework. JetStream indirectly publishes the messages to subscribers by pushing the message to a subject that subscribers are listening to. It also subscribes subscribers to the subjects.

The subjects are defined when building the StreamConfiguration, and the JetStreamManagement instance lets us add Streams with that configuration to our pipeline. We'll cover JetStreamManagement in more detail in a later section. Let's create a single stream for publishing messages to a subject and create the JetStream context to manage the publishing and subscribing for the messages sent to that subject:

JetStreamManagement jsm = nc.jetStreamManagement();
// Create a stream, here will use an in-memory storage type, and one subject
StreamConfiguration sc = StreamConfiguration.builder()
        .name(defaultStream)
        .storageType(StorageType.Memory)
        .subjects(defaultSubject)
        .build();
            
// Add a stream via the `JetStreamManagement` instance and capture its info in a `StreamInfo` object
StreamInfo streamInfo = jsm.addStream(sc);
JsonUtils.printFormatted(streamInfo);

// Create a JetStream context. This hangs off the original connection
// allowing us to produce data to publish into streams and consume data from
// JetStream consumers.
JetStream js = nc.jetStream();         

Now, we can go ahead and create a list of Futures to hold our the results of our messages, since we're dealing with asynchronous messages and don't know when they'll arrive back. When publishing a message via the publishAsync() method of the JetStream instance, a PublishAck is returned, denoting the future acknowledgement of receival by a client.

If you'd like to read more about the Future interface, read our Guide to the Future Interface in Java.

Additionally, for each message, we'll create a Message instance, which accepts a subject and data. To whom we're sending a message and what the message is. Using the NatsMessage.builder() method, we can easily build a message we'd like to send, and omit certain arguments that we don't have any use for.

Once a Message is built, we can publish it asynchronously via JetStream's publishAsync() method:

// Create a future for asynchronous message processing
List<CompletableFuture<PublishAck>> futures = new ArrayList<>();
int stop = defaultMessageCount + 1;
for (int x = 1; x < stop; x++) {
    String data = defaultMessage + "-" + x;

    // Create a typical NATS message
    Message msg = NatsMessage.builder()
            .subject(defaultSubject)
            .data(data, StandardCharsets.UTF_8)
            .build();
    System.out.printf("Publishing message %s on subject %s.\n", data, defaultSubject);

    // Publish a message and add the result to our `CompletableFuture` list
    futures.add(js.publishAsync(msg));
}

Once we send the messages, we'll likely want to know what happened to them and if any issues have been raised. By iterating through our futures list, we can check if the CompletableFuture instances are done, printing their content if they are, and re-queueing them if they're not to check again later:

// Get Acknowledgement for the messages
while (futures.size() > 0) {
    CompletableFuture<PublishAck> f = futures.remove(0);
    if (f.isDone()) {
        try {
            PublishAck pa = f.get();
            System.out.printf("Publish Succeeded on subject %s, stream %s, seqno %d.\n",
                    defaultSubject, pa.getStream(), pa.getSeqno());
        }
        catch (ExecutionException ee) {
            System.out.println("Publish Failed " + ee);
        }
    }
    else {
        // Re-queue it and try again
        futures.add(f);
    }
} 

For a publisher to publish (sensibly), we'll need a subsciber, lest the messages dangle without much meaning. A subscriber is created as a JetStreamSubscription instance, returned by the subscribe() method of the JetStream context:

// Subscribe to the messages that have been published to the subject
JetStreamSubscription sub = js.subscribe(defaultSubject);
List<Message> messages = new ArrayList<>();
// Retrieve the next message and kick off an iteration of all the messages
Message msg = sub.nextMessage(Duration.ofSeconds(1));
boolean first = true;
while (msg != null) {
    if (first) {
        first = false;
        System.out.print("Read/Ack ->");
   }
   messages.add(msg);
   if (msg.isJetStream()) {
        msg.ack();
        System.out.print(" " + new String(msg.getData()) + "\n");                    
    }
    else if (msg.isStatusMessage()) {
            System.out.print(" !" + msg.getStatus().getCode() + "!");
    }
    JsonUtils.printFormatted(msg.metaData());
    msg = sub.nextMessage(Duration.ofSeconds(1));
}

// Make sure the message goes through before we close
// if you're not using the try-with-resources statement
nc.flush(Duration.ZERO);
nc.close();

Tying all of this together, when we run the code - we shuld be seeing messages like these:

We've successfully built a Stream of data, which carries messages to a subject and our subscribers are observing them as they arrive asynchronously! Sometimes though, our subject names aren't known before we want to subscribe to them. For instance, you might generate subject names, and want to subscribe to the new subjects as they're created. Or, there's an entire list of subjects with a common prefix that you want to subscribe to.

In both cases - instead of convoluted looping and generation-subscription logic - you can use wildcards to target more than a single subject.

Wildcard Publisher/Subscriber Streaming

NATS supports hierarchial tokenization to support wildcard subscription. As a refresher from the start of the guide:

A Subject in NATS is simply a string representing an interest in data. It is hierarchially tokenized to support wildcard subscriptions:

  • foo.* matches foo.bar and foo.baz
  • foo.*.bar matches foo.a.bar and foo.b.bar
  • foo.> matches any of the above
  • > matches everything in NATS

These wildcards can be configured either in the publisher or subscriber or in both. We'll take a look at a typical example of this in a moment. The logic behind the approach we'll be using now is much the same as what we've seen before:

public class PubWildcardSubWildcard {

	private static final String defaultStream = "pubsubwildcardasync-stream";
	private static final String defaultSubjectWildcard = "audit.us.*";
	private static final String defaultSubjectSpecific = "audit.us.east";
	private static final String defaultMessage = "Audit User";
	private static final int defaultMessageCount = 2;
	private static final String defaultServer = "nats://localhost:4222";
	
	public static void main( String[] args ) {
	    System.out.printf("\nPublishing to %s. Server is %s\n\n", defaultSubjectWildcard, defaultServer);
		
		  try (Connection nc = Nats.connect(defaultServer)) {      
          JetStreamManagement jsm = nc.jetStreamManagement();
            
         StreamConfiguration sc = StreamConfiguration.builder()
                 .name(defaultStream)
                 .storageType(StorageType.Memory)
                 .subjects(defaultSubjectWildcard)
                 .build();

         StreamInfo streamInfo = jsm.addStream(sc);
         JsonUtils.printFormatted(streamInfo);
      
         JetStream js = nc.jetStream();            
      
         List<CompletableFuture<PublishAck>> futures = new ArrayList<>();
         int stop = defaultMessageCount + 1;
         for (int x = 1; x < stop; x++) {
             String data = defaultMessage + "-" + x;

             Message msg = NatsMessage.builder()
                     .subject(defaultSubjectSpecific)
                     .data(data, StandardCharsets.UTF_8)
                     .build();
             System.out.printf("Publishing message %s on subject %s.\n", data, defaultSubjectSpecific);
 
             futures.add(js.publishAsync(msg));
         }

         while (futures.size() > 0) {
             CompletableFuture<PublishAck> f = futures.remove(0);
             if (f.isDone()) {
                 try {
                     PublishAck pa = f.get();
                     System.out.printf("Publish Succeeded on subject %s, stream %s, seqno %d.\n",
                     		defaultSubjectSpecific, pa.getStream(), pa.getSeqno());
                 }
                 catch (ExecutionException ee) {
                     System.out.println("Publish Failed " + ee);
                 }
             }
             else {
                 futures.add(f);
             }
        }
            
         JetStreamSubscription sub = js.subscribe(defaultSubjectWildcard);
         List<Message> messages = new ArrayList<>();
         Message msg = sub.nextMessage(Duration.ofSeconds(1));
         boolean first = true;
         while (msg != null) {
             if (first) {
                 first = false;
                 System.out.print("Read/Ack ->");
             }
             messages.add(msg);
             if (msg.isJetStream()) {
                 msg.ack();
                 System.out.print(" " + new String(msg.getData()) + "\n");            
             }
             else if (msg.isStatusMessage()) {
                     System.out.print(" !" + msg.getStatus().getCode() + "!");
             }
             JsonUtils.printFormatted(msg.metaData());
             msg = sub.nextMessage(Duration.ofSeconds(1));
         }
         // Make sure the message goes through before we close
         // if you're not using the try-with-resources statement
          nc.flush(Duration.ZERO)
          nc.close();
     }
     catch (Exception e) {
         e.printStackTrace();
     }
}
}

When we run this code, we'll be greeted with:

As alternatives to the Pub/Sub model, using msg.getReplyTo(), we can start building a Request-Reply pattern implementation, and by building queue groups and channels to subscribe and unsubscribe - we can build a Queue Group pattern implementation.

This is possible because we haven't done any pattern-specific configuration for NATS at all - the specific patterns you'd like to use depend only on how you use the library.

JetStream Management

At a certain point, you'll likely want to observe or manage your streams. In order to do that, we are going to look into the stream lifecycle in NATS JetStream:

  • Create or Add a Stream with a Subject
  • Update a Stream by adding a Subject
  • Get information on Streams
  • Purge a stream of its messages
  • Delete a stream

To demonstrate these, let's create a class with a few static fields and just a main() method. Within it, we'll test out a few of these operations, but based on your architecture and triggers for these operations, you'll want to attach the proceeding code segments accordingly:

public class NatsJsManageStreams {

    private static final String STREAM1 = "manage-stream1";
    private static final String STREAM2 = "manage-stream2";
    private static final String SUBJECT1 = "manage-subject1";
    private static final String SUBJECT2 = "manage-subject2";
    private static final String SUBJECT3 = "manage-subject3";
    private static final String SUBJECT4 = "manage-subject4";
    private static final String defaultServer = "nats://localhost:4222";

    public static void main(String[] args) {
        try (Connection nc = Nats.connect(defaultServer)) {
            JetStreamManagement jsm = nc.jetStreamManagement();
            // Management code
            // ...
          
          // Make sure the message goes through before we close
          // if you're not using the try-with-resources statement
            nc.flush(Duration.ZERO);
            nc.close();
        } catch (Exception exp) {
            exp.printStackTrace();
        }
    }
}

We'll be using the same JetStreamManagement instance for the remainder of the samples, since we're using them all in a single class. Though, do keep in mmind that in a real-world scenario, you would never/rarely be creating a multiple Stream setup. Instead, you would typically add subjects to an existing stream to re-utilize resources.

Note: Throughout the examples, we'll be using a custom Utility Class to handle the creation or updating of a Stream, publishing asynchronously without waiting, or to read a message with or without Acknowledgement - NatsJsUtils. This utility class can be found on GitHub.

Creating or Adding a Stream with a Subject

The first time we crated a Stream, we just set its name, subject and storage policy. There are various other settings we can tweak via the builder methods:

// 1. Create (add) a stream with a subject
System.out.println("\n----------\n1. Configure And Add Stream 1");
StreamConfiguration streamConfig = StreamConfiguration.builder()
        .name(STREAM1)
        .subjects(SUBJECT1)
        // .retentionPolicy()
        // .maxConsumers(...)
        // .maxBytes(...)
        // .maxAge(...)
        // .maxMsgSize(...)
         .storageType(StorageType.Memory)
        // .replicas(...)
        // .noAck(...)
        // .template(...)
        // .discardPolicy(...)
        .build();
StreamInfo streamInfo = jsm.addStream(streamConfig);
NatsJsUtils.printStreamInfo(streamInfo);

The RetentionPolicy sets when messages are deleted - when there's no more interest in them (no consumers will consume it), when they're consumed, etc. You can limit the number of consumers, how long the message can be in bytes, how long it can be persisted, whether an ACK response is required or not - etc.

In the simplest form - you supply a name, subject and storage type, and build() it. We can get the info on a Stream as a return type of the addStream() method of the JetStreamManagement instance, which is pretty-printed via the NatsJsUtils class:

Updating a Stream with a Subject

You can update existing streams via the updateStream() method of the JetStreamManagement instance. We'll reuse the streamConfig reference variable and build() a new configuration for the stream we'd like to update, based on the configuration extracted from the existing StreamInfo instance:

// 2. Update stream, in this case, adding a new subject
// -  StreamConfiguration is immutable once created
// -  but the builder can help with that.
System.out.println("----------\n2. Update Stream 1");
streamConfig = StreamConfiguration.builder(streamInfo.getConfiguration())
        .addSubjects(SUBJECT2).build();
streamInfo = jsm.updateStream(streamConfig);
NatsJsUtils.printStreamInfo(streamInfo);

// 3. Create (add) another stream with 2 subjects
System.out.println("----------\n3. Configure And Add Stream 2");
streamConfig = StreamConfiguration.builder()
        .name(STREAM2)
        .storageType(StorageType.Memory)
        .subjects(SUBJECT3, SUBJECT4)
        .build();
streamInfo = jsm.addStream(streamConfig);
NatsJsUtils.printStreamInfo(streamInfo);

This results in:

Getting Information on Streams

// 4. Get information on streams
// 4.0 publish some message for more interesting stream state information
// -   SUBJECT1 is associated with STREAM1
// 4.1 getStreamInfo on a specific stream
// 4.2 get a list of all streams
// 4.3 get a list of StreamInfo's for all streams
System.out.println("----------\n4.1 getStreamInfo");
NatsJsUtils.publish(nc, SUBJECT1, 5);
streamInfo = jsm.getStreamInfo(STREAM1);
NatsJsUtils.printStreamInfo(streamInfo);

System.out.println("----------\n4.2 getStreamNames");
List<String> streamNames = jsm.getStreamNames();
NatsJsUtils.printObject(streamNames);

System.out.println("----------\n4.2 getStreamNames");
List<StreamInfo> streamInfos = jsm.getStreams();
NatsJsUtils.printStreamInfoList(streamInfos);

Purging a Stream

You can easily purge a stream from all of its messages, fully emptying it out:

// 5. Purge a stream of it's messages
System.out.println("----------\n5. Purge stream");
PurgeResponse purgeResponse = jsm.purgeStream(STREAM1);
NatsJsUtils.printObject(purgeResponse);

Deleting a Stream

Or, if you're definitely done with a stream - you can easily delete it:

// 6. Delete a stream
System.out.println("----------\n6. Delete stream");
jsm.deleteStream(STREAM2);
System.out.println("----------\n");

Handling Security

NATS JetStream supports encryption of connections with TLS. TLS can be used to encrypt/decrypt traffic between the client/server connection and check the server's identity. When enabled in TLS mode, NATS would require all clients to connect with TLS.

You can define an SSLContext by loading all the Keystores and Truststores and then overload the SSLContext as an option while connecting to NATS. Let's define an SSLUtils class that we can use to load a keystore, create key managers, and an SSL Context:

class SSLUtils {
    public static String KEYSTORE_PATH = "keystore.jks";
    public static String TRUSTSTORE_PATH = "truststore.jks";
    public static String STORE_PASSWORD = "password";
    public static String KEY_PASSWORD = "password";
    public static String ALGORITHM = "SunX509";

    public static KeyStore loadKeystore(String path) throws Exception {
        KeyStore store = KeyStore.getInstance("JKS");
        BufferedInputStream in = new BufferedInputStream(new FileInputStream(path));

        try {
            store.load(in, STORE_PASSWORD.toCharArray());
        } finally {
            if (in != null) {
                in.close();
            }
        }

        return store;
    }

    public static KeyManager[] createTestKeyManagers() throws Exception {
        KeyStore store = loadKeystore(KEYSTORE_PATH);
        KeyManagerFactory factory = KeyManagerFactory.getInstance(ALGORITHM);
        factory.init(store, KEY_PASSWORD.toCharArray());
        return factory.getKeyManagers();
    }

    public static TrustManager[] createTestTrustManagers() throws Exception {
        KeyStore store = loadKeystore(TRUSTSTORE_PATH);
        TrustManagerFactory factory = TrustManagerFactory.getInstance(ALGORITHM);
        factory.init(store);
        return factory.getTrustManagers();
    }

    public static SSLContext createSSLContext() throws Exception {
        SSLContext ctx = SSLContext.getInstance(Options.DEFAULT_SSL_PROTOCOL);
        ctx.init(createTestKeyManagers(), createTestTrustManagers(), new SecureRandom());
        return ctx;
    }
}

Then, with our utility class all prepped - we can supply the SSLContext created by it to the sslContext() builder method when creating a NATS connection:

public class NatsConnectTLS {
    public static void main(String[] args) {
        try {
            SSLContext ctx = SSLUtils.createSSLContext();
            Options options = new Options.Builder()
                                .server("nats://localhost:4222")
                                .sslContext(ctx) // Set the SSL context
                                .build();
            Connection nc = Nats.connect(options);

            // Do something with the connection

            nc.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

We can also define an Authentication mechanism to restrict access to the NATS system. The client doesn't have control over access controls, but clients do provide the configurations required to authenticate with the system, bind to an account, and to require TLS.

A simple configuration to connect with a username and password can be set via the userInfo() method when setting the Options:

Options options = new Options.Builder().
                            .server("nats://localhost:4222")
                            .userInfo("myname","password") // Set a user and plain text password
                            .build();
Connection nc = Nats.connect(options);

Then, when creating a connection, we can connect to NATS server by providing the username and password in the URL:

Connection nc = Nats.connect("nats://myname:[email protected]:4222");

Similarly, we can also pass authentication tokens, such as JWTs, or secrets as part of the following configuration:

Options options = new Options.Builder()
                            .server("nats://localhost:4222")
                            .token("mytoken") // Set a token
                            .build();
Connection nc = Nats.connect(options);

We can now connect to NATS Url like below:

Connection nc = Nats.connect("nats://[email protected]:4222"); // Token in URL

Conclusion

When you think about using a distributed streaming system as the nervous system for building distributed microservice-based clusters, IoT-based systems, next-generation Edge systems, you can consider using NATS JetStream as a lightweight option compared to other popular, powerful frameworks such as Apache Kafka. Dealing with a massive volume of a streams of events and messages is becoming more and more common in a data-driven world. NATS JetStream provides the capabilities of distributed security, multi-tenancy, and horizontal scaling.

As always, you can find the full source code on GitHub.

Last Updated: August 13th, 2021
Was this article helpful?

Improve your dev skills!

Get tutorials, guides, and dev jobs in your inbox.

No spam ever. Unsubscribe at any time. Read our Privacy Policy.

Arpendu Kumar GaraiAuthor

Full-Stack developer with deep knowledge in Java, Microservices, Cloud Computing, Big Data, MERN, Javascript, Golang, and its relative frameworks. Besides coding and programming, I am a big foodie, love cooking, and love to travel.

Want a remote job?

    Prepping for an interview?

    • Improve your skills by solving one coding problem every day
    • Get the solutions the next morning via email
    • Practice on actual problems asked by top companies, like:
     
     
     

    Better understand your data with visualizations

    With over 330+ pages, you'll learn the ins and outs of visualizing data in Python with popular libraries like Matplotlib, Seaborn, Bokeh, and more.

    © 2013-2021 Stack Abuse. All rights reserved.