When you're developing an agent that will be exchanging messages, it's important to think about how message processing will integrate with the rest of the objects making up the agent. Ideally, you'd like to:
This leaves you free to design the bulk of the classes making up the application around application issues, not issues related to the communication scheme you happen to be using. Likewise, the communications subsystem can be designed and updated independently, based on the communication needs of the overall system.
You need a well-defined way for incoming messages to trigger method calls on application objects, and for object methods to generate messages to remote agents to service requests.
These may seem like conflicting requirements, but we'll see that they can both be satisfied to one degree or another by a single message-processing method.
A crucial question in designing a message-processing system is whether it needs to be asynchronous or not. In our chess example, the player agents can process messages synchronously, since they'll be handshaking throughout the entire game. That is, one player sends a move (in a message) to the other player; the second player applies the move to its copy of the "playing board," weighs its options, and sends its countermove to the first player. In this simple example, there isn't anything else a player needs to do while choosing its move or waiting for the other player to send its move, so there's no need for the ability to receive and process messages asynchronously.
This isn't usually the case, though. Messages usually trigger some significant processing by an agent, which can be carried on while waiting for any further messages. Agents in an application will usually be better off if they can send and receive messages asynchronously from any other work they may need to do. This "other work" may be in response to these messages, or may be independent of the message passing that's going on. If we were implementing a more sophisticated network game than our simple chess system, each player agent might have plenty of work to do in addition to sending and receiving messages. There may be user input to deal with, multiple remote players to synchronize with, graphical displays to update, and complicated internal models to keep straight. To keep everything in the agent running smoothly, asynchronous message I/O will probably be necessary, so your message-passing scheme should support it.
Based on our earlier definition of a message, the BasicMessage class shown in Example 6-1 is an implementation of a generic message object. It has a message identifier and a list of arguments, all represented as strings. The BasicMessage has public methods for querying its identifier (messageId()) and its arguments (argList()). It also has an abstract Do() method, which will be implemented in subclasses of BasicMessage to perform whatever is required by the given message. It's in the Do() method implementations that we'll both define our message protocol, and link our message-passing scheme to the application objects in the system. For each type of message in our protocol, a subclass of BasicMessage can be defined to interpret the message arguments and do whatever is required for that type of message.
package dcj.examples.messageV1; import java.util.Vector; public abstract class BasicMessage { protected String id; protected Vector argList; public BasicMessage() { argList = new Vector(); } public BasicMessage(String mid) { id = mid; argList = new Vector(); } protected void setId(String mid) { id = mid; } public void addArg(String arg) { argList.addElement(arg); } public String messageID() { return id; } public Vector argList() { Vector listCopy = (Vector)argList.clone(); return listCopy; } public abstract boolean Do(); }
To send and receive messages over a connection to a remote agent, we have the BasicMsgHandler, shown in Example 6-2. This class handles messages in terms of string tokens --a message is simply a series of tokens followed by an "end-of-message" indicator. The first token is the message identifier, and the rest are arguments to the message. The readMsg() method on BasicMsgHandler reads the message identifier of the incoming message first, then calls buildMessage() to construct the message object corresponding to the message type. The buildMessage() method is abstract in BasicMsgHandler, and is implemented by subclasses to match the message protocol being used.
To support asynchronous message processing, the BasicMsgHandler also implements the Runnable interface. The run() method is a loop that reads a message, calls the message's Do() method, then reads the next message. The BasicMsgHandler's run() method does not send messages directly. If a message needs to be sent to the remote agent, it will have to be done inside the Do() method of one of the incoming messages, or outside of the BasicMsg-Handler's thread altogether. A static message handler object is associated with the BasicMsgHandler class for this purpose. It's initialized by the BasicMsg-Handler constructors--when a BasicMsgHandler is created, it sets the static handler object to point to itself. In many message-passing applications there is one central message handler for each agent, and it's sometimes convenient to have this message handler globally accessible. For example, the Do() method on a BasicMessage subclass may need to send a message to the remote agent in response to the message it has just processed. Note that the static message handler object is declared public, so an application can change the global handler when necessary.
package dcj.examples.messageV1; import java.util.Vector; import java.lang.*; import java.io.*; public abstract class BasicMsgHandler implements Runnable { // Static message handler for applications where only one message // handler is used and needs to be globally accessible. public static BasicMsgHandler current = null; InputStream msgIn; OutputStream msgOut; StreamTokenizer tokenizer; String msgEndToken = "END"; public BasicMsgHandler(InputStream in, OutputStream out) { setStreams(in, out); current = this; } public BasicMsgHandler(InputStream in, OutputStream out, String endToken) { msgEndToken = endToken; setStreams(in, out); current = this; } protected void setStreams(InputStream in, OutputStream out) { msgIn = in; msgOut = out; } public BasicMessage readMsg() throws IOException { BasicMessage msg; String token; DataInputStream din = new DataInputStream(msgIn); token = din.readUTF(); msg = buildMessage(token); if (msg != null) { boolean msgEnd = false; while (!msgEnd) { token = din.readUTF(); if (token.compareTo(msgEndToken) == 0) msgEnd = true; else { msg.addArg(token); } } } return msg; } public void sendMsg(BasicMessage msg) throws IOException { boolean success = true; DataOutputStream dout = new DataOutputStream(msgOut); dout.writeUTF(msg.messageID()); Vector args = msg.argList(); int acnt = args.size(); for (int i = 0; i < acnt; i++) { dout.writeUTF((String)args.elementAt(i)); } dout.writeUTF(msgEndToken); } public void run() { try { while (true) { BasicMessage msg = readMsg(); if (msg != null) msg.Do(); } } // Treat an IOException as a termination of the message // exchange, and let this message-processing thread die. catch (IOException e) {} } protected abstract BasicMessage buildMessage(String msgId); }
These two classes define a framework for simple message-passing protocols. In the next section, we'll see how to use these classes in practice, and how this structure needs to be expanded to support more complex message-passing situations.
Copyright © 2001 O'Reilly & Associates. All rights reserved.