パブリッシュ/サブスクライブのプログラミング

パブリッシュ/サブスクライブはブロードキャストメカニズムの 1 つで、メッセージはトピックにパブリッシュされ、トピックのサブスクライバに自動的に配信されます。トピックは階層的に構成できます。こうすると最上位レベルのサブスクライバはすべてのメッセージを受信しますが、サブトピックのサブスクライバはサブトピックのメッセージだけを受信します。

パブリッシャのコーディング

パブリッシャは、次のオブジェクトを使用してメッセージをトピックに追加します。

パブリッシャクラスは、javax.jms クラスのいずれも拡張しません。このクラスは明示的に他のクラスを拡張したり、暗黙的に java.lang.Object を拡張したりできます。

次のコードを実行するには、JMC を使用して topic1 と名付けたトピックを定義する必要があります。

パブリッシャのコードを作成するには

  1. 次のパッケージをインポートします。
    import java.util.*;
    import javax.jms.*;
    import javax.naming.*;
    
  2. 次のように、JMS で使用されるオブジェクトの変数を宣言します。
    private Context _context = null;
    private TopicConnection _connection = null;
    private TopicSession _session = null;
    private TextMessage _message = null;
    private TopicPublisher _publisher = null;
    
  3. アプリケーション特有のメソッドを使用して、ユーザーとトピックの情報を取得します。わかりやすくするために、この例では各値をハードコーディングしています。
    // ユーザー ID とパスワード
    private String _user = new String("admin");
    private String _password = new String("admin");
    // トピックの名前を JMC 全体で一致させます。
    // この情報は、SERVER-INF/jrun-resources.xml ファイルでも表示できます。
    private String _topic = new String("topic1");
    // これは、リモートホスト名とJNDI ポート番号を連結したものです。
    private String _providerurl = new String("localhost:2918");
    private String _ctxtFactory = new String 
    ("jrun.naming.JRunContextFactory");
    
  4. メッセージを送信する前に、JMS 変数を作成して、値を挿入します。
    // JNDI で設定を行います。
    try {
      Properties p = new Properties();
      p.put(Context.SECURITY_PRINCIPAL, _user);
      p.put(Context.SECURITY_CREDENTIALS, _password);
      p.put(Context.PROVIDER_URL, _providerurl );
      p.put( Context.INITIAL_CONTEXT_FACTORY, ctxtFactory);
    
      // 同じサーバ上のコンテキストにアクセスする場合は、
      // これは空のコンストラクタでも構いません。
      _context = new InitialContext(p);
    
    // Get TopicConnectionFactory from JNDI. 
      final TopicConnectionFactory factory =
        (TopicConnectionFactory)_context.lookup
          ("jms/TopicConnectionFactory");
      // ファクトリを使用して TopicConnection を作成します。
      _connection = factory.createTopicConnection();
      // TopicConnection を使用して TopicSession を作成します。
      _session = _connection.createTopicSession(false,
        Session.AUTO_ACKNOWLEDGE);
      // TopicSession を使用して TopicSender を作成します。
      // topic1 を探します。
      _publisher = _session.createPublisher((Topic)_context.lookup
        ("java:jms/topic/topic1"));
      // TextMessage オブジェクトを作成します。
      _message = _session.createTextMessage();
      }
      catch(NamingException e) {
        System.out.println("Naming Exception:" + e.getMessage());
      }
      catch(JMSException e) {
        System.out.println("JMS Exception:" + e.getMessage());
      }
    
  5. メッセージをパブリッシュします。このサーブレット例では、フォームフィールドからメッセージを取得します。
    int priority = Message.DEFAULT_PRIORITY;
    int delivery = Message.DEFAULT_DELIVERY_MODE;
    String text = new String("Blank Message");
    
    String[] attrArray = req.getParameterValues("thisMessage");
    // 呼び出しフォームには、thisMessage 用の値が 1 つしかないと想定します。
    if(attrArray != null) {
       text = attrArray[0];
    }
    try {
      System.out.println("setText の直前");
      _message.setText(text);
      // トピックをパブリッシュします。
      System.out.println("パブリッシュの直前" );
      // トピックをパブリッシュします。メッセージは 5 分間存続します。
      _publisher.publish(_message, delivery, priority, 5 * 60 * 1000);
      System.out.println("パブリッシュの直後"); 
      } // try を終了
      catch(JMSException e) {
        System.out.println("JMS Exception:" + e.getMessage());
    }
    
  6. 完了したら、JMS オブジェクトを閉じます。
    // これらは if != null のチェックでラップしたほうがよいでしょう。
    try {
      _publisher.close();
      _session.close();
      _connection.close();
      _context.close();
      }
      catch(NamingException e) {
        System.out.println("Naming exception in destroy: " +
          e.getMessage());
      }
      catch(JMSException e) {
        System.out.println("JMS Exception:" + e.getMessage());
      }
    

サブスクライバのコーディング

JMS では、関連するトピックに対してトピックがパブリッシュされると、これらがサブスクライバに渡されます。次のオブジェクトを使用して、サブスクライバをコーディングします。

サブスクライバが非同期メッセージを使用する場合は、MessageListener インターフェイスと onMessage メソッドを実装する必要があります。サブスクライバクラスがサブスクライブしているトピックにメッセージがパブリッシュされると、JRun では onMessage メソッドが呼び出されます。

サブスクライバクラスは、javax.jms クラスのいずれも拡張しませんが、明示的に他のクラスを拡張したり、暗黙的に java.lang.Object を拡張したりできます。

パブリッシャのコードを作成するには

  1. 次のパッケージをインポートします。
    import java.util.*;
    import javax.jms.*;
    import javax.naming.*;
    
  2. MessageListener インターフェイスを実装するクラス宣言を作成します。
    public class MyTopicSubscriber implements MessageListener {
    
  3. 次のように、JMS で使用されるオブジェクトの変数を宣言します。
    private Context jndi = null;
    private TopicSession pubSession = null;
    private TopicSession subSession = null;
    private TopicPublisher publisher = null;
    private static TopicConnection connection = null;
    private String userName = null;
    
  4. アプリケーション特有のメソッドを使用して、ユーザーとトピックの情報を取得します。わかりやすくするために、この例では各値をハードコーディングしています。
    // トピックの名前を JMC 全体で一致させます。
    // この情報は、SERVER-INF/jrun-resources.xml ファイルでも表示できます。
    private String topicName = new String("topic1");
    // これは、リモートホスト名とJNDI ポート番号を連結したものです。
    private String providerurl = new String("localhost:2918");
    private String factoryName = new String 
    ("jrun.naming.JRunContextFactory");
    Properties env = new Properties();
    env.put(Context.PROVIDER_URL, providerurl );
    env.put( Context.INITIAL_CONTEXT_FACTORY, 
    "jrun.naming.JRunContextFactory" );
      jndi = new InitialContext(env);
    TopicConnectionFactory conFactory =
      (TopicConnectionFactory)jndi.lookup(factoryName);
    connection=conFactory.createTopicConnection();
    TopicSession pubSession =
      connection.createTopicSession(false,
        Session.AUTO_ACKNOWLEDGE);
    TopicSession subSession =
      connection.createTopicSession(false,
        Session.AUTO_ACKNOWLEDGE);
    
    // JMS トピックをルックアップします。
    Topic thisTopic = (Topic)jndi.lookup(topicName);
    
    // JMS メッセージリスナを設定します。
    TopicSubscriber subscriber=subSession.createSubscriber(thisTopic);
    subscriber.setMessageListener(this);
    
    // このサンプルはパブリッシュでも使用します。
    publisher=pubSession.createPublisher(thisTopic);
    
    // アプリケーションを初期化します。
    set(connection, pubSession, subSession, publisher, username);
    
    // JMS 接続を開始し、メッセージが配送されるようにします。
    connection.start();
    }
    catch(NamingException e) {
      System.out.println("Naming Exception:" + e.getMessage());
    }
    catch(JMSException e) {
      System.out.println("JMS Exception:" + e.getMessage());
    }
    
  5. トピックからメッセージを取り出す onMessage メソッドをコーディングします。
    ...
    public void onMessage(Message message) {
      try {
        TextMessage textMessage = (TextMessage) message;
        String text = textMessage.getText();
        System.out.println(text);
      } catch(JMSException jmse){ jmse.printStackTrace(); }
    }
    ...