2. Project Maturity
jnanomsg is a young project and is in experimental phase.
3. Install
This section covers a installation of jnanomsg.
3.1. Leiningen
The simplest way to use jnanomsg on clojure project, is including it on dependency vector of your project.clj file:
[jnanomsg "0.4.3"]
3.2. Maven
Also, you can use it on your java projects with maven. As first step add a clojars repository:
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
Following of jnanomsg package dependecy:
<dependency>
<groupId>jnanomsg</groupId>
<artifactId>jnanomsg</artifactId>
<version>0.4.3</version>
</dependency>
3.3. Gradle
If you are using gradle, this is a dependency line for gradle dsl:
compile "jnanomsg:jnanomsg:0.4.3"
3.4. Get the Code
jnanomsg is open source and you can found the source on github.
You can clone the public repository with this command:
git clone https://github.com/niwibe/jnanomsg
4. Supported features
4.1. Transports
jnanomsg supports all transports supported by their backend (nanomsg): ipc, inproc, tcp.
4.2. Protocols
jnanomsg intends support all protocols supported by a native nanomsg library, but currently it only has support for this few protocols:
-
pub/sub protocol
-
req/rep protocol
-
push/pull (pipeline) protocol
-
bus protocol
-
pair protocol
Note
|
This documentation not intends explain how works each protocol. It intends only explain a public api of idiomatic library for java and clojure. |
5. User Guide for Clojure
5.1. Introduction
All public clojure api for work with nanomsg library is exposed under nanomsg.core
namespace,
and it consist on only few functions: connect!
, bind!
, subscribe!
, socket
, send!
,
recv!
, recv-str!
, close!
, terminate!
and symbols
that are fairly self-descriptive.
Example importing nanomsg
namespace on clojure file:
(ns yours.samplens
(:require [nanomsg.core :as nn]))
5.2. Creating a socket
Let start creating a new socket and listening messages from it. This can be done with
nanomsg.core/socket
function that creates a new socket instance and nanomsg.core/bind!
function that tells to socket to bind to specific endpoint.
:rep
socket type and bind to unix socket endpoint.(with-open [socket (nn/socket :rep)]
(nn/bind! socket "tcp:///tmp/sock"))
You can done it in a single step, passing a endpoint in a third parameter to socket constructor:
(with-open [socket (nn/socket :rep {:bind "tcp:///tmp/sock"})]
(do-something socket))
5.3. Connecting to a socket
Let start connecting to existing socket. That can be done with nanomsg.core/connect!
function. It acts like a previously seen bind!
function and has the same signature.
:req
socket type and connects to unix socket endpoint.(with-open [socket (nn/socket :req)]
(nn/connect! socket "tcp:///tmp/sock"))
Also, you can done it in a single step, passing a endpoint in a third parameter to socket constructor:
(with-open [socket (nn/socket :req {:connect "tcp:///tmp/sock"})]
(do-something socket))
Note
|
On clojure, a socket types are represented by keywords. With this approach,
you can create any socket type with one unique function: socket . As you can see
on previous examples, I have used :req and :rep keywords for create respectively
request and reply socket types that are part of
Req/Rep protocol
|
5.4. Socket types
5.4.1. PubSub Sockets
This protocol has two socket types:
-
publisher - This socket is used to distribute messages to multiple destinations. Receive operation is not defined.
-
subscriber - Receives messages from the publisher. Only messages that the socket is subscribed to are received. When the socket is created there are no subscriptions and thus no messages will be received. Send operation is not defined on this socket.
Example of using pub/sub protocols in clojure:
(with-open [sock (nn/socket :pub)]
(nn/bind! sock "ipc:///tmp/sock")
(dotimes [i 5]
(nn/send! sock "test msg")))
(with-open [sock (nn/socket :sub)]
(nn/connect! sock "ipc:///tmp/sock")
(nn/subscribe! sock "test")
(dotimes [i 5]
(println (nn/recv sock))))
5.4.2. Req/Rep Sockets
This protocol is used to distribute the workload among multiple stateless workers, and it’s represented by two socket types:
-
req - Used to implement the client application that sends requests and receives replies.
-
rep - Used to implement the stateless worker that receives requests and sends replies.
Note
|
Both sockets implements read and write methods. |
(with-open [sock (nn/socket :rep)]
(nn/bind! sock "tcp://*:6789")
(loop []
(nn/send! sock (nn/recv sock))
(recur)))
(with-open [sock (nn/socket :req)]
(nn/bind! sock "tcp://localhost:6789")
(dotimes [i 5]
(nn/send! sock (str "msg:" 1))
(println "Received:" (nn/recv sock))))
5.4.3. Push/Pull Sockets.
Scalability protocol for passing tasks through a series of processing steps and it’s represented by two socket types:
-
push - This socket is used to send messages to a cluster of load-balanced nodes. Receive operation is not implemented on this socket type.
-
pull - This socket is used to receive a message from a cluster of nodes. Send operation is not implemented on this socket type.
(with-open [sock (nn/socket :push {:bind "tcp://*:6789"})]
(doseq [name ["Foo" "Bar" "Baz"]]
(nn/send! sock name)))
(with-open [sock (nn/socket :pull {:connect "tcp://localhost:6789"})]
(dotimes [i 3]
(println "Hello " (nn/recv! sock))))
5.5. Non blocking sockets
jnanomsg also comes with support for non-blocking sockets, and exposes it with simple and unopinionated callback based interface.
You can create the async socket in same way as you have created socket previously, with the exception
that you should pass the :async
parameter with true
as value in the third parameter to the
socket constructor.
(nn/socket :req {:async true})
;; #<impl$async_socket$reify__216 nanomsg.impl$async_socket$reify__216@2d50f101>
Later on, you can start send and/or receive data throught async sockets with the same functions that are explained in previous examples, with a little difference: the third parameter should be a optional callback.
Let see some examples:
(with-open [socket (nn/socket :rep {:async true})]
(nn/bind! socket "tcp:///tmp/sock.sock")
(nn/recv! socket (fn [data]
(nn/send! socket data))))
(with-open [socket (nn/socket :req {:async true})]
(nn/connect! socket "tcp:///tmp/sock.sock")
(nn/send! socket "foobar"
(fn [receiveddata]
(println receiveddata))))
6. User Guide for Java
6.1. Introduction
Unlike in clojure, that exposes an uniform and high level api for all socket types, java api is slightly differet. Each supported socket type is available with own type in a specific java package.
You can see in more detailed java api on the javadoc.
6.2. Creating a socket
Let start creating a new socket and listening messages. For the below examples, we’ll use a Req/Rep socket types.
import nanomsg.reqrep.RepSocket;
public class Server {
public static void main(String[] args) {
final RepSocket s = new RepSocket();
s.bind("tcp:///tmp/sock");
}
}
6.3. Connecting to a socket
Let start connecting to existing socket. For it we’ll use a ReqSocket class.
import nanomsg.reqrep.ReqSocket;
public class Client {
public static void main(String[] args) {
final ReqSocket s = new ReqSocket();
s.connect("tcp:///tmp/sock");
}
}
6.4. Socket types
6.4.1. PubSub Sockets
This protocol has two socket types:
-
publisher - This socket is used to distribute messages to multiple destinations. Receive operation is not defined.
-
subscriber - Receives messages from the publisher. Only messages that the socket is subscribed to are received. When the socket is created there are no subscriptions and thus no messages will be received. Send operation is not defined on this socket.
Example of using pub/sub protocols in java:
import nanomsg.pubsub.PubSocket;
public class Publisher {
public static void main(String[] args) {
PubSocket sock = new PubSocket();
sock.bind("ipc:///tmp/sock");
for(int i=0; i<5; i++) {
sock.send("test msg");
}
sock.close()
}
}
import nanomsg.pubsub.SubSocket;
public class Subscriber {
public static void main(String[] args) {
SubSocket sock = new SubSocket();
sock.connect("ipc:///tmp/sock");
sock.subscribe("test");
for(int i=0; i<5; i++) {
System.out.println(sock.recvString());
}
sock.close()
}
}
6.4.2. Req/Rep Sockets
This protocol is used to distribute the workload among multiple stateless workers, and it’s represented by two socket types:
-
req - Used to implement the client application that sends requests and receives replies.
-
rep - Used to implement the stateless worker that receives requests and sends replies.
Note
|
Both sockets implements read and write methods. |
import nanomsg.reqrep.RepSocket;
public class EchoServer {
public static void main(String[] args) {
RepSocket sock = new RepSocket();
sock.bind("tcp://*:6789");
while (true) {
byte[] receivedData = sock.recvBytes();
sock.sendBytes(receivedData);
}
sock.close()
}
}
import nanomsg.reqrep.ReqSocket;
public class EchoClient {
public static void main(String[] args) {
ReqSocket sock = new ReqSocket();
sock.connect("tcp://localhost:6789");
for (int i=0; i<5; i++) {
sock.send("Hello!" + 1);
System.out.println("Received:" + sock.recvString());
}
sock.close()
}
}
6.4.3. Push/Pull Sockets.
Scalability protocol for passing tasks through a series of processing steps and it’s represented by two socket types:
-
push - This socket is used to send messages to a cluster of load-balanced nodes. Receive operation is not implemented on this socket type.
-
pull - This socket is used to receive a message from a cluster of nodes. Send operation is not implemented on this socket type.
import nanomsg.pipeline.PushSocket;
import java.util.ArrayList;
import java.util.List;
public class Dispatcher {
public static void main(String[] args) {
PushSocket sock = new PushSocket();
sock.bind("tcp://*:6789");
List<String> people = new ArrayList<String>();
people.add("Foo");
people.add("Bar");
people.add("Baz");
for(int i=0; i<people.size(); ++i) {
sock.send(people.get(i));
}
sock.close();
}
}
import nanomsg.pipeline.PullSocket;
public class Greeter {
public static void main(String[] args) {
PullSocket sock = new PullSocket();
sock.connect("tcp://localhost:6789");
for (int i=0; i<3; i++) {
System.out.println("Hello " + sock.recvString());
}
sock.close()
}
}
6.5. Non blocking sockets
jnanomsg also comes with support for non-blocking sockets, and exposes it with simple and unopinionated callback based interface.
In java, the async socket is a simple class that receives a normal socket instance as unique argument to its constructor and exposes api for non-blocking operations.
import nanomsg.pipeline.PullSocket;
import nanomsg.async.AsyncSocket;
import nanomsg.async.IAsyncCallback;
public class Greeter {
public static void main(String[] args) {
// Create a normal instance of any socket type
final PullSocket sock = new PullSocket();
// Create a ligweight async layer around the previously
// created pull socket.
final AsyncSocket asyncSock = new AsyncSocket(sock);
// Use the standard socket connect method for connect
// to remote endpoint.
sock.connect("tcp://localhost:6789");
// Use the async socket instance for execute
// send/recv operations asynchronously.
asyncSock.recvString(new IAsyncCallback<String> {
public void success(final String data) {
System.out.println("Hello " + data);
}
public void fail(Throwable t) {
System.out.println("Error: " + t.toString());
}
});
}
}
7. Faq
Why the async interface in clojure uses callbacks instead of core.async?
The callback interface is less opinionated and is a "lingua franca" for async interfaces. It not couples you to a concrete library o concrete async api. You can easy build on top of it an interface that works with core.async.
How efficient is the async interface?
Internally, jnanomsg uses a something like a eventloop called scheduler for handle the async operations. On linux platforms, it uses system native epoll for socket multiplexing.
For systems that does not supports epoll it also comes with an inneficient "long polling" scheduler, that should work for experimenting with it but is not recommended use of it in production. Improvements for unsupported platforms are welcome.
Can I bind one socket to multiple endpoints?
Yes, you can run the bind function/method multiple times for listen in multiple endpoints.
8. License
Copyright 2013-2015 Andrey Antukh <niwi@niwi.be> Licensed under the Apache License, Version 2.0 (the "License") you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.