1. Introduction

nanomsg is a socket library that provides common communication patterns. It aims to make the networking layer fast, scalable, and easy to use.

Api documentation for Clojure and Java.

2. Project Maturity

jnanomsg is a young project and is in experimental phase.

3. Install

This section covers a installation of jnanomsg.

3.1. Leiningen

The simplest way to use jnanomsg on clojure project, is including it on dependency vector of your project.clj file:

on project.clj
[jnanomsg "0.3.1"]

3.2. Maven

Also, you can use it on your java projects with maven. As first step add a clojars repository:

<repository>
    <id>clojars.org</id>
    <url>http://clojars.org/repo</url>
</repository>

Following of jnanomsg package dependecy:

<dependency>
    <groupId>jnanomsg</groupId>
    <artifactId>jnanomsg</artifactId>
    <version>0.3.1</version>
</dependency>

4. Supported features

4.1. Transports

jnanomsg supports all transports supported by their backend (nanomsg): ipc, inproc, tcp

4.2. Protocols

jnanomsg intends support all protocols supported by a native nanomsg library, but currently it only has support for this few protocols:

  • pub/sub protocol

  • req/rep protocol

  • push/pull (pipeline) protocol

  • bus protocol

  • pair protocol

Note
This documentation not intends explain how works each protocol. It intends only explain a public api of idiomatic library for java and clojure.

5. User guide

5.1. Introduction

All public clojure api for work with nanomsg library is exposed under nanomsg namespace, and it consist on only few functions: connect!, bind!, subscribe!, socket, send!, recv!, send-bytes!, recv-bytes!, close!, terminate! and symbols that are fairly self-descriptive.

Example importing nanomsg namespace on clojure file:

yours/samplens.clj
(ns yours.samplens
  (:require [nanomsg :as nn]))

However, on java api, each protocol has it own package that contains a corresponding socket classes.

5.1.1. Connect/Bind

Clojure

With clojure, you have two ways a do this: creating a socket and connection or do the two operations in one, using socket! function optional parameters.

Example of connect/bind operation using two steps:

client.clj
(let [s (nn/socket :req)]
  (nn/connect! s "tcp:///tmp/sock"))
server.clj
(let [s (nn/socket :rep)]
  (nn/bind! s "tcp:///tmp/sock"))

Example of connect/bind operation using a single step:

client.clj
(nn/socket :req {:connect "tcp:///tmp/sock"})
server.clj
(nn/socket :rep {:bind "tcp:///tmp/sock"})
Note
On clojure, a socket types are represented by keywords. With this approach, you can create any socket type with one unique function: socket. As you can see on previous examples, I have used :req and :rep keywords for create respectively request and reply socket types that are part of Req/Rep protocol
Java

With java, each socket class has it own connect/bind method.

Here one example of how connect to a remote socket:

Client.java
import nanomsg.reqrep.ReqSocket

public class Client {
    public static void main(String[] args) {
        final ReqSocket s = new ReqSocket();
        s.connect("tcp:///tmp/sock");
    }
}

And, here an example how to bind and listen on local socket:

Server.java
import nanomsg.reqrep.RepSocket

public class Server {
    public static void main(String[] args) {
        final RepSocket s = new RepSocket();
        s.bind("tcp:///tmp/sock");
    }
}

INFO: You can see in more detail a java api on javadoc.

Note
With both languages, you can execute bind multiple times for listen in multiple socket locations.

5.2. Pub/Sub Sockets

This protocol has two socket types:

  • publisher - This socket is used to distribute messages to multiple destinations. Receive operation is not defined.

  • subscriber - Receives messages from the publisher. Only messages that the socket is subscribed to are received. When the socket is created there are no subscriptions and thus no messages will be received. Send operation is not defined on this socket.

5.2.1. Clojure

Example of using pub/sub protocols in clojure:

publisher.clj
(let [sock (nn/socket :pub)]
  (nn/bind! sock "ipc:///tmp/sock")
  (dotimes [i 5]
    (nn/send! sock "test msg"))
  (nn/close! sock))
subscriber.clj
(let [sock (nn/socket :sub)]
  (nn/connect! sock "ipc:///tmp/sock")
  (nn/subscribe! sock "test")
  (dotimes [i 5]
    (println (nn/recv sock)))
  (nn/close! sock))

5.2.2. Java

Example of using pub/sub protocols in java:

Publisher.java
import nanomsg.pubsub.PubSocket;

public class Publisher {
    public static void main(String[] args) {
        PubSocket sock = new PubSocket();
        sock.bind("ipc:///tmp/sock");

        for(int i=0; i<5; i++) {
            sock.sendString("test msg");
        }

        sock.close()
    }
}
Subscriber.java
import nanomsg.pubsub.SubSocket;

public class Subscriber {
    public static void main(String[] args) {
        SubSocket sock = new SubSocket();
        sock.connect("ipc:///tmp/sock");
        sock.subscribe("test");

        for(int i=0; i<5; i++) {
            System.out.println(sock.recvString());
        }

        sock.close()
    }
}

5.3. Req/Rep Sockets

This protocol is used to distribute the workload among multiple stateless workers, and it’s represented by two socket types:

  • req - Used to implement the client application that sends requests and receives replies.

  • rep - Used to implement the stateless worker that receives requests and sends replies.

Note
Both sockets implements read and write methods.

5.3.1. Clojure

"Hello World" echo server using clojure:

rep.clj (server)
(let [sock (nn/socket :rep)]
  (nn/bind! sock "tcp://*:6789")
  (loop []
    (nn/send! sock (nn/recv sock))
    (recur)))
req.clj (client)
(let [sock (nn/socket :req)]
  (nn/bind! sock "tcp://localhost:6789")
  (dotimes [i 5]
    (nn/send! sock (str "msg:" 1))
    (println "Received:" (nn/recv sock)))
  (nn/close! sock))

5.3.2. Java

Same examples as on clojure section, but using java:

EchoServer.java
import nanomsg.reqrep.RepSocket;

public class EchoServer {
    public static void main(String[] args) {
        RepSocket sock = new RepSocket();
        sock.bind("tcp://*:6789");

        while (true) {
            byte[] receivedData = sock.recvBytes();
            sock.sendBytes(receivedData);
        }

        sock.close()
    }
}
EchoClient.java
import nanomsg.reqrep.ReqSocket;

public class EchoClient {
    public static void main(String[] args) {
        ReqSocket sock = new ReqSocket();
        sock.connect("tcp://localhost:6789");

        for (int i=0; i<5; i++) {
            sock.sendString("Hello!" + 1);
            System.out.println("Received:" + sock.recvString());
        }

        sock.close()
    }
}

5.4. Push/Pull Sockets

Scalability protocol for passing tasks through a series of processing steps and it’s represented by two socket types:

  • push - This socket is used to send messages to a cluster of load-balanced nodes. Receive operation is not implemented on this socket type.

  • pull - This socket is used to receive a message from a cluster of nodes. Send operation is not implemented on this socket type.

5.4.1. Clojure

"Hello World" task generator:

push.clj (server)
(let [sock (nn/socket :push {:bind "tcp://*:6789"})]
  (doseq [name ["Foo" "Bar" "Baz"]]
    (nn/send! sock name))
  (nn/close! sock))
pull.clj (client)
(let [sock (nn/socket :pull {:connect "tcp://localhost:6789"})]
  (dotimes [i 3]
    (println "Hello " (nn/recv! sock))))

5.4.2. Java

Same examples as on clojure section, but using java:

Dispatcher.java
import nanomsg.pipeline.PushSocket;
import java.util.ArrayList;
import java.util.List;

public class Dispatcher {
    public static void main(String[] args) {
        PushSocket sock = new PushSocket();
        sock.bind("tcp://*:6789");

        List<String> people = new ArrayList<String>();
        people.add("Foo");
        people.add("Bar");
        people.add("Baz");

        for(int i=0; i<people.size(); ++i) {
            sock.sendString(people.get(i));
        }

        sock.close();
    }
}
Greeter.java
import nanomsg.pipeline.PullSocket;

public class Greeter {
    public static void main(String[] args) {
        PullSocket sock = new PullSocket();
        sock.connect("tcp://localhost:6789");

        for (int i=0; i<3; i++) {
            System.out.println("Hello " + sock.recvString());
        }

        sock.close()
    }
}

5.5. Async support

Since version 0.3, jnanomsg comes with basic and experimental support for asynchronous sockets for both languages.

5.5.1. Java

Java bindings have same approach as some java.nio channels apis. Socket operations takes a instance of any class that implements nanomsg.async.IAsyncCallback.

IAsyncCallback consists on two methods: success(T data) and fail(Throwable t).

This is a simple example of using it:

import nanomsg.pipeline.PullSocket;
import nanomsg.async.AsyncSocket;
import nanomsg.async.IAsyncCallback;

public class Greeter {
    public static void main(String[] args) {
        final PullSocket sock = new PullSocket();
        final AsyncSocket asyncSock = new AsyncSocket(sock);

        sock.connect("tcp://localhost:6789");

        asyncSock.recvString(new IAsyncCallback<String> {
            public void success(final String data) {
                System.out.println("Hello " + data);
            }

            public void fail(Throwable t) {
                System.out.println("Error: " + t.toString());
            }
        });
    }
}

5.5.2. Clojure

Clojure takes very different approach than java. For it, it uses core.async as dependency (you always can use java/clojure api for implement the same behavior with other libraries, such as Pulsar).

(ns some.yourns
  (:require [clojure.core.async :as async]
            [nanomsg :as nn])
  (:gen-class))

(defn -main
  [& args]
  (let [s (nn/socket :pull {:connect "ipc:///tmp/sock" :async true})]
    (go
      (dotimes [i 3]
        (println "Hello: " (async/<! (nn/recv! s)))
        (recur)))))
Note
async implementation is experimental and not efficient, because it uses a thread pool for execute all blocking call instead of use epoll (or any other poll implementaitions).
core.async channels support
Warning
this feature is an experiment and it should be used with precaution.

jnanomsg exposes limited and not fully compliant implementation of core.async channels over jnanomsg sockets, that allow create transparent distributed communication for code that works over core.async channels.

There one simple example of core.async channel over push/pull (pipeline) scalability protocol:

server.clj
(ns serverns.core
  (:require [nanomsg.async :as nna]
            [clojure.core.async :refer [put!]]))

(defn -main
  [& args]
  (let [c (nna/chan :socktype :push :bind "ipc:///tmp/sock")]
    (put! c "message")))
client.clj
(ns clientns.core
  (:require [nanomsg.async :as nna]
            [clojure.core.async :refer [take!]]))

(defn -main
  [& args]
  (let [c (nna/chan :socktype :pull :connect "ipc:///tmp/sock")]
    (take! c (fn [msg] (println msg)))))
Note
At this momment only pipeline scalability protocol is implemented for use with channels, that has the limitation of a unidirectional channels. In near future other scalability protocols should to be implemented for use them as core.async channels.
Warning
jnanomsg sockets have small default timeout value for send/recv operations and your code should handle well if a socket returns a exception or channel returns a nil (nil with jnanomsg channels not marks channel as closed).

6. License

Copyright 2013-2014 Andrey Antukh <niwi@niwi.be>

Licensed under the Apache License, Version 2.0 (the "License")
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.