1. Introduction

nanomsg is a socket library that provides common communication patterns. It aims to make the networking layer fast, scalable, and easy to use.

Api documentation for Clojure and Java.

2. Project Maturity

jnanomsg is a young project and is in experimental phase.

3. Install

This section covers a installation of jnanomsg.

3.1. Leiningen

The simplest way to use jnanomsg on clojure project, is including it on dependency vector of your project.clj file:

on project.clj
[jnanomsg "0.4.3"]

3.2. Maven

Also, you can use it on your java projects with maven. As first step add a clojars repository:

<repository>
    <id>clojars.org</id>
    <url>http://clojars.org/repo</url>
</repository>

Following of jnanomsg package dependecy:

<dependency>
    <groupId>jnanomsg</groupId>
    <artifactId>jnanomsg</artifactId>
    <version>0.4.3</version>
</dependency>

3.3. Gradle

If you are using gradle, this is a dependency line for gradle dsl:

compile "jnanomsg:jnanomsg:0.4.3"

3.4. Get the Code

jnanomsg is open source and you can found the source on github.

You can clone the public repository with this command:

git clone https://github.com/niwibe/jnanomsg

4. Supported features

4.1. Transports

jnanomsg supports all transports supported by their backend (nanomsg): ipc, inproc, tcp.

4.2. Protocols

jnanomsg intends support all protocols supported by a native nanomsg library, but currently it only has support for this few protocols:

  • pub/sub protocol

  • req/rep protocol

  • push/pull (pipeline) protocol

  • bus protocol

  • pair protocol

Note
This documentation not intends explain how works each protocol. It intends only explain a public api of idiomatic library for java and clojure.

5. User Guide for Clojure

5.1. Introduction

All public clojure api for work with nanomsg library is exposed under nanomsg.core namespace, and it consist on only few functions: connect!, bind!, subscribe!, socket, send!, recv!, recv-str!, close!, terminate! and symbols that are fairly self-descriptive.

Example importing nanomsg namespace on clojure file:

yours/samplens.clj
(ns yours.samplens
  (:require [nanomsg.core :as nn]))

5.2. Creating a socket

Let start creating a new socket and listening messages from it. This can be done with nanomsg.core/socket function that creates a new socket instance and nanomsg.core/bind! function that tells to socket to bind to specific endpoint.

Example creating a :rep socket type and bind to unix socket endpoint.
(with-open [socket (nn/socket :rep)]
  (nn/bind! socket "tcp:///tmp/sock"))

You can done it in a single step, passing a endpoint in a third parameter to socket constructor:

(with-open [socket (nn/socket :rep {:bind "tcp:///tmp/sock"})]
  (do-something socket))

5.3. Connecting to a socket

Let start connecting to existing socket. That can be done with nanomsg.core/connect! function. It acts like a previously seen bind! function and has the same signature.

Example creating a :req socket type and connects to unix socket endpoint.
(with-open [socket (nn/socket :req)]
  (nn/connect! socket "tcp:///tmp/sock"))

Also, you can done it in a single step, passing a endpoint in a third parameter to socket constructor:

(with-open [socket (nn/socket :req {:connect "tcp:///tmp/sock"})]
  (do-something socket))
Note
On clojure, a socket types are represented by keywords. With this approach, you can create any socket type with one unique function: socket. As you can see on previous examples, I have used :req and :rep keywords for create respectively request and reply socket types that are part of Req/Rep protocol

5.4. Socket types

5.4.1. PubSub Sockets

This protocol has two socket types:

  • publisher - This socket is used to distribute messages to multiple destinations. Receive operation is not defined.

  • subscriber - Receives messages from the publisher. Only messages that the socket is subscribed to are received. When the socket is created there are no subscriptions and thus no messages will be received. Send operation is not defined on this socket.

Example of using pub/sub protocols in clojure:

publisher.clj
(with-open [sock (nn/socket :pub)]
  (nn/bind! sock "ipc:///tmp/sock")
  (dotimes [i 5]
    (nn/send! sock "test msg")))
subscriber.clj
(with-open [sock (nn/socket :sub)]
  (nn/connect! sock "ipc:///tmp/sock")
  (nn/subscribe! sock "test")
  (dotimes [i 5]
    (println (nn/recv sock))))

5.4.2. Req/Rep Sockets

This protocol is used to distribute the workload among multiple stateless workers, and it’s represented by two socket types:

  • req - Used to implement the client application that sends requests and receives replies.

  • rep - Used to implement the stateless worker that receives requests and sends replies.

Note
Both sockets implements read and write methods.
rep.clj (server)
(with-open [sock (nn/socket :rep)]
  (nn/bind! sock "tcp://*:6789")
  (loop []
    (nn/send! sock (nn/recv sock))
    (recur)))
req.clj (client)
(with-open [sock (nn/socket :req)]
  (nn/bind! sock "tcp://localhost:6789")
  (dotimes [i 5]
    (nn/send! sock (str "msg:" 1))
    (println "Received:" (nn/recv sock))))

5.4.3. Push/Pull Sockets.

Scalability protocol for passing tasks through a series of processing steps and it’s represented by two socket types:

  • push - This socket is used to send messages to a cluster of load-balanced nodes. Receive operation is not implemented on this socket type.

  • pull - This socket is used to receive a message from a cluster of nodes. Send operation is not implemented on this socket type.

push.clj (server)
(with-open [sock (nn/socket :push {:bind "tcp://*:6789"})]
  (doseq [name ["Foo" "Bar" "Baz"]]
    (nn/send! sock name)))
pull.clj (client)
(with-open [sock (nn/socket :pull {:connect "tcp://localhost:6789"})]
  (dotimes [i 3]
    (println "Hello " (nn/recv! sock))))

5.5. Non blocking sockets

jnanomsg also comes with support for non-blocking sockets, and exposes it with simple and unopinionated callback based interface.

You can create the async socket in same way as you have created socket previously, with the exception that you should pass the :async parameter with true as value in the third parameter to the socket constructor.

(nn/socket :req {:async true})
;; #<impl$async_socket$reify__216 nanomsg.impl$async_socket$reify__216@2d50f101>

Later on, you can start send and/or receive data throught async sockets with the same functions that are explained in previous examples, with a little difference: the third parameter should be a optional callback.

Let see some examples:

A req/rep echo server example using async socket.
(with-open [socket (nn/socket :rep {:async true})]
  (nn/bind! socket "tcp:///tmp/sock.sock")
  (nn/recv! socket (fn [data]
                     (nn/send! socket data))))
A req/rep echo client example using async socket.
(with-open [socket (nn/socket :req {:async true})]
  (nn/connect! socket "tcp:///tmp/sock.sock")
  (nn/send! socket "foobar"
    (fn [receiveddata]
      (println receiveddata))))

6. User Guide for Java

6.1. Introduction

Unlike in clojure, that exposes an uniform and high level api for all socket types, java api is slightly differet. Each supported socket type is available with own type in a specific java package.

You can see in more detailed java api on the javadoc.

6.2. Creating a socket

Let start creating a new socket and listening messages. For the below examples, we’ll use a Req/Rep socket types.

Example creating a RepSocket and bind it to unix socket endpoint.
import nanomsg.reqrep.RepSocket;

public class Server {
    public static void main(String[] args) {
        final RepSocket s = new RepSocket();
        s.bind("tcp:///tmp/sock");
    }
}

6.3. Connecting to a socket

Let start connecting to existing socket. For it we’ll use a ReqSocket class.

Creating a ReqSocket intance and connect to unix socket endpoint.
import nanomsg.reqrep.ReqSocket;

public class Client {
    public static void main(String[] args) {
        final ReqSocket s = new ReqSocket();
        s.connect("tcp:///tmp/sock");
    }
}

6.4. Socket types

6.4.1. PubSub Sockets

This protocol has two socket types:

  • publisher - This socket is used to distribute messages to multiple destinations. Receive operation is not defined.

  • subscriber - Receives messages from the publisher. Only messages that the socket is subscribed to are received. When the socket is created there are no subscriptions and thus no messages will be received. Send operation is not defined on this socket.

Example of using pub/sub protocols in java:

Publisher.java
import nanomsg.pubsub.PubSocket;

public class Publisher {
    public static void main(String[] args) {
        PubSocket sock = new PubSocket();
        sock.bind("ipc:///tmp/sock");

        for(int i=0; i<5; i++) {
            sock.send("test msg");
        }

        sock.close()
    }
}
Subscriber.java
import nanomsg.pubsub.SubSocket;

public class Subscriber {
    public static void main(String[] args) {
        SubSocket sock = new SubSocket();
        sock.connect("ipc:///tmp/sock");
        sock.subscribe("test");

        for(int i=0; i<5; i++) {
            System.out.println(sock.recvString());
        }

        sock.close()
    }
}

6.4.2. Req/Rep Sockets

This protocol is used to distribute the workload among multiple stateless workers, and it’s represented by two socket types:

  • req - Used to implement the client application that sends requests and receives replies.

  • rep - Used to implement the stateless worker that receives requests and sends replies.

Note
Both sockets implements read and write methods.
EchoServer.java
import nanomsg.reqrep.RepSocket;

public class EchoServer {
    public static void main(String[] args) {
        RepSocket sock = new RepSocket();
        sock.bind("tcp://*:6789");

        while (true) {
            byte[] receivedData = sock.recvBytes();
            sock.sendBytes(receivedData);
        }

        sock.close()
    }
}
EchoClient.java
import nanomsg.reqrep.ReqSocket;

public class EchoClient {
    public static void main(String[] args) {
        ReqSocket sock = new ReqSocket();
        sock.connect("tcp://localhost:6789");

        for (int i=0; i<5; i++) {
            sock.send("Hello!" + 1);
            System.out.println("Received:" + sock.recvString());
        }

        sock.close()
    }
}

6.4.3. Push/Pull Sockets.

Scalability protocol for passing tasks through a series of processing steps and it’s represented by two socket types:

  • push - This socket is used to send messages to a cluster of load-balanced nodes. Receive operation is not implemented on this socket type.

  • pull - This socket is used to receive a message from a cluster of nodes. Send operation is not implemented on this socket type.

Dispatcher.java
import nanomsg.pipeline.PushSocket;
import java.util.ArrayList;
import java.util.List;

public class Dispatcher {
    public static void main(String[] args) {
        PushSocket sock = new PushSocket();
        sock.bind("tcp://*:6789");

        List<String> people = new ArrayList<String>();
        people.add("Foo");
        people.add("Bar");
        people.add("Baz");

        for(int i=0; i<people.size(); ++i) {
            sock.send(people.get(i));
        }

        sock.close();
    }
}
Greeter.java
import nanomsg.pipeline.PullSocket;

public class Greeter {
    public static void main(String[] args) {
        PullSocket sock = new PullSocket();
        sock.connect("tcp://localhost:6789");

        for (int i=0; i<3; i++) {
            System.out.println("Hello " + sock.recvString());
        }

        sock.close()
    }
}

6.5. Non blocking sockets

jnanomsg also comes with support for non-blocking sockets, and exposes it with simple and unopinionated callback based interface.

In java, the async socket is a simple class that receives a normal socket instance as unique argument to its constructor and exposes api for non-blocking operations.

import nanomsg.pipeline.PullSocket;
import nanomsg.async.AsyncSocket;
import nanomsg.async.IAsyncCallback;

public class Greeter {
    public static void main(String[] args) {

        // Create a normal instance of any socket type
        final PullSocket sock = new PullSocket();

        // Create a ligweight async layer around the previously
        // created pull socket.
        final AsyncSocket asyncSock = new AsyncSocket(sock);

        // Use the standard socket connect method for connect
        // to remote endpoint.
        sock.connect("tcp://localhost:6789");

        // Use the async socket instance for execute
        // send/recv operations asynchronously.
        asyncSock.recvString(new IAsyncCallback<String> {
            public void success(final String data) {
                System.out.println("Hello " + data);
            }

            public void fail(Throwable t) {
                System.out.println("Error: " + t.toString());
            }
        });
    }
}

7. Faq

Why the async interface in clojure uses callbacks instead of core.async?

The callback interface is less opinionated and is a "lingua franca" for async interfaces. It not couples you to a concrete library o concrete async api. You can easy build on top of it an interface that works with core.async.

How efficient is the async interface?

Internally, jnanomsg uses a something like a eventloop called scheduler for handle the async operations. On linux platforms, it uses system native epoll for socket multiplexing.

For systems that does not supports epoll it also comes with an inneficient "long polling" scheduler, that should work for experimenting with it but is not recommended use of it in production. Improvements for unsupported platforms are welcome.

Can I bind one socket to multiple endpoints?

Yes, you can run the bind function/method multiple times for listen in multiple endpoints.

8. License

Copyright 2013-2015 Andrey Antukh <niwi@niwi.be>

Licensed under the Apache License, Version 2.0 (the "License")
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.