-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy patherlang-stomp-client.html
More file actions
152 lines (129 loc) · 9.27 KB
/
erlang-stomp-client.html
File metadata and controls
152 lines (129 loc) · 9.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
<html>
<head>
<title>STOMP Client for Erlang</title>
<style type="text/css">
body {
font-family: arial;
font-size: 10pt;
}
td {
font-family: arial;
font-size: 10pt;
background: ffffff;
}
.footnote {
font-family: arial;
font-size: 7pt;
}
table {
background: black;
}
th {
background: ffffff;
font-family: arial;
font-size: 10pt;
font-weight: bold;
}
tt {
font-family: arial;
font-size: 8pt;
}
.footer {
font-size: 10;
font-weight: bold;
}
.feed {background: #eeeeff;}
.doc {
border: 1px black dashed;
padding: 3px;
background: dddddd;
}
.command {
font-style: italic;
}
pre {
font-family: courier;
font-size: 8pt;
border-style: dashed;
border-color: gray;
border-width: 1px;
}
sup {
font-size: 6pt;
}
</style>
</head>
<body>
<h3>STOMP Client for Erlang</h3>
<p><a href="/erlang/stomp.erl">stomp.erl</a> is a simple client library for message brokers that support the <a href="http://stomp.codehaus.org/">STOMP</a> <a href="http://stomp.codehaus.org/Protocol">protocol</a>. It's usage is pretty straight forward, and below, I walk through a basic interaction with a broker and go into the public functions in greater detail. These examples will be using <a href="http://activemq.apache.org/">ActiveMQ</a> as the broker. ActiveMQ is a free open source message broker, written in Java, that is pretty easy to install, set up, and run. Go ahead and start up <i>erl</i> in the same directory as <i>stomp.erl</i> and compile the stomp client module.
<pre> > c(stomp.erl).</pre></p>
<p><h4>Opening a connection.</h4>The first thing we are going to do is open a connection to the broker. Underneath this call we will be opening a socket connection to the broker (stomp.erl uses the <a href="http://erlang.org/doc/man/gen_tcp.html">gen_tcp</a> module bundled with Erlang) and sending some simple protocol messages to set up a session with the broker. As mentioned above, these examples are assuming that ActiveMQ is up and running in your local environment, with the STOMP handler enabled, and listening on the default port (61613).</p><p>The <i>connect</i> function takes four arguments (the broker's hostname or address, the port number, a login, and a passcode for authentication), and returns a Connection that we will be using throughout the subsequent exchanges:
<pre> > Conn = stomp:connect("localhost", 61613, "", "").</pre></p>
<p>Note that in this instance we do not require any authentication on the broker, and are passing in empty strings for the login and passcode parameters.</p>
<p><h4>Sending messages.</h4></p>
<p>Ok, let's send a few messages to a queue (<i>we will use a queue called "foobar" . . . ActiveMQ can create queues and topics "on demand" . . . there is no need to do any explicit administration or configuration, just send a message to the desired destination, and if it does not already exist, the ActiveMQ broker will automatically create it</i>). To send a message, we will need to specify the underlying connection for transmission, the destination queue or topic, any headers we want to send (as a list of tuples), and the body of the message. Here is a simple example:
<pre> > stomp:send(Conn, "/queue/foobar", [], "hello world").
> stomp:send(Conn, "/queue/foobar", [{"priority","20"}], "we are specifying priority for this hello world").</pre></p>
<p>Note that I am using the "priority" header in the example above to illustrate how to specify headers during a send operation. ActiveMQ does not currently re-order messages based on priority, though the header will be passed to the receiving client, so actions/prioritizing can occur within the consuming application.</p>
<p><h4>Subscribing to a queue or topic.</h4> Before we can recieve messages from the broker, we must register ourselves as a subscriber to a particular queue or topic. To do this, we will pass the connection we instantiated during the first step, along with a destination (the queue or topic we are interested in), and a list of two-tuples representing the names and values of any optional headers we wish to send:
<pre> > stomp:subscribe("/queue/foobar", Conn, [{"ack", "client"}]).</pre></p>
<p>We are now subscribed to the queue "foobar", and have told the broker to wait for an ack before removing retrieved messages from the queue.</p>
<p><h4>Retrieving messages.</h4>
Now that we have subscribed to the queue "foobar", let's download those pending messages we sent earlier. To pull messages from a queue or topic, we can use the <i>get_messages</i> function. <i>get_messages</i> takes a connection as it's single argument, will pull down all available messages ("available" means available per broker and subscription policy, and not necessarily all pending messages), blocking if none are available. The return type is a list of messages, each message itself a list of 3 tuples: type, headers and message body. For example, here is what we get back if we call <i>get_messages</i> after sending the messages from the earlier step:
<pre> > get_messages(Conn).
[[{type,"MESSAGE"},
{headers,[{"destination","/queue/foobar"},
{"timestamp","1247956667243"},
{"priority","0"},
{"expires","0"},
{"message-id",
"ID:phosphorus-53442-1247930100064-2:5:-1:1:1"}]},
{body,"hello world"}],
[{type,"MESSAGE"},
{headers,[{"destination","/queue/foobar"},
{"timestamp","1247956684233"},
{"priority","20"},
{"expires","0"},
{"message-id",
"ID:phosphorus-53442-1247930100064-2:5:-1:1:2"}]},
{body,"we are specifying priority for this hello world"}]]</pre></p>
<p><h4>Ack'ing messages.</h4>
When we subscribed to the queue "foobar", you may have noticed that we passed in an optional header of the form <i>{"ack", "client"}</i>. This header tells the broker to not count a message as delivered until the client as explicitly acknowledged it's receipt...in fact if you examine the queue contents in the ActiveMQ broker console (<a href="http://localhost:8161/admin/browse.jsp?JMSDestination=foobar">http://localhost:8161/admin/browse.jsp?JMSDestination=foobar</a>), you will see that the two messages we sent are still pending. In order to prevent another subscriber from picking up one of these messages, we will need to send a message acknowledging our receipt. To dod this, use the <i>ack</i> function, which takes a connection and a message id as parameters. The message id for each of the messages we have downloaded is present in the accompanying headers (<i>{"message-id","ID:phosphorus-53442-1247930100064-2:5:-1:1:1"}</i> and <i>{"message-id","ID:phosphorus-53442-1247930100064-2:5:-1:1:2"}</i>). Example:
<pre> > stomp:ack(Conn, "ID:phosphorus-53442-1247930100064-2:5:-1:1:1").
> stomp:ack(Conn, "ID:phosphorus-53442-1247930100064-2:5:-1:1:2").</pre>
</p>
<p>Note, now if you check the queue, you will see these messages are no longer present, receipt by our subscriber having been confirmed.</p>
<p><h4>Transactions.</h4>
STOMP provides transaction semantics for grouping <i>send</i> and <i>ack</i> messages with commit/rollback facilities. The <i>begin_transaction</i>, <i>commit_transaction</i>, and <i>abort_transaction</i> functions provide a means of sending those message types, along with the <i>ack (Connection, MessageId, TransactionId)</i> function and an optional <i>"transaction"</i> header for <i>send</i> operations. Examples:
<pre> > stomp:begin_transaction(Conn, "MyUniqueTransactionIdBlahBlahBlah1234567890").
> stomp:send(Conn, "/queue/foobar", [{"transaction", "MyUniqueTransactionIdBlahBlahBlah1234567890"}], "transactional hello world").</pre></p>
<p>At this point, we have successfully sent a message to the broker, but if we inspect the queue contents in the ActiveMQ broker console (<a href="http://localhost:8161/admin/browse.jsp?JMSDestination=foobar">http://localhost:8161/admin/browse.jsp?JMSDestination=foobar</a>), we will see that there are no pending messages . . . this is because we sent the last message as part of a transaction that has not been committed yet. To close the transaction, we use the <i>commit_transaction</i> function:
<pre> > stomp:commit_transaction(Conn, "MyUniqueTransactionIdBlahBlahBlah1234567890").
> stomp:get_messages(Conn).
[[{type,"MESSAGE"},
{headers,[{"destination","/queue/foobar"},
{"transaction",
"MyUniqueTransactionIdBlahBlahBlah1234567890"},
{"timestamp","1248013136111"},
{"priority","0"},
{"expires","0"},
{"message-id",
"ID:phosphorus-53442-1247930100064-2:7:-1:1:5"}]},
{body,"transactional hello world"}]]</pre>
<p><h4>on_message</h4>
stomp.erl also provides an "on message" handler, that allows you to pass in a function that will be called on each recieved message. Unlike <i>get_messages</i> it will block <i>continuously</i> (<i>get_messages</i> will return after getting all available messages), waiting for messages to arrive on the queue. Example:
<pre> > stomp:send(Conn, "/queue/foobar", [], "message one").
> stomp:send(Conn, "/queue/foobar", [], "message two").
> stomp:send(Conn, "/queue/foobar", [], "message three").
> MyFunction=fun([_, _, {_, X}]) -> io:fwrite("message ~s ~n", [X]) end.
#Fun<erl_eval.6.13229925>
> stomp:on_message(MyFunction, Conn).
message message one
message message two
message message three
</pre>
</p>
<p class="footer">
7/19/2009</p>
</body>
</head>