処理要求スレッドをキューに貯めておいて、ひとつずつ処理を実行するためのクラス。


import java.io.*;
import java.net.*;
import java.util.*;
import javax.servlet.*;
import javax.servlet.http.*;
import org.w3c.dom.*;
import javax.xml.parsers.*;
import javax.xml.xpath.*;
 
/**
 * この処理を実行できるのは、たった1つ。
 *
 * class SingleThrededExecutor が処理を管理。
 * interface Request が実際の処理を実行。
 * interface Response が処理結果を所有。
 *
 * 処理待ちキューで処理を待つスレッドの数を指定できる。
 * 処理と処理の間のインターバルを指定できる。
 *
 */
public class SingleThrededExecutor {
 
  //////////////////////////////////////////////////////////////////////
  // for test
  
  public static void main(String[] args) {
    
    final String url = "http://www.nilab.info/index.rdf";
    
    final long interval = 1000;
    final int  capacity = 4;
    final SingleThrededExecutor ste = new SingleThrededExecutor(interval, capacity);
    
    for(int i=0; i<10; i++){
      System.out.println("queue size = " + ste.getQueueSize());
      System.out.println("create  No." + i);
      new Thread(new Test(ste, i, url)).start();
      try{Thread.sleep(300);}catch(Exception e){}
    }
  }
 
  private static class Test implements Runnable {
 
    private final SingleThrededExecutor ste;
    private final int number;
    private final String url;
    
    Test(SingleThrededExecutor ste, int number, String url){
      this.ste = ste;
      this.number = number;
      this.url = url;
    }
 
    public void run(){
      try{
        SingleThrededExecutor.RestRequest req = new SingleThrededExecutor.RestRequest();
        req.url = url;
        SingleThrededExecutor.RestResponse res = (RestResponse)ste.doit(req);
        System.out.println("execute No." + number + ":" + new Date() + ":" + res);
      }catch(Exception e){
        e.printStackTrace();
      }
    }
  }
 
  // テスト結果
  //queue size = 0
  // create  No.0
  // queue size = 1
  // create  No.1
  // execute No.0:Wed Dec 20 21:46:42 JST 2006:SingleThrededExecutor$RestResponse@fa9cf
  // queue size = 1
  // create  No.2
  // queue size = 2
  // create  No.3
  // queue size = 3
  // create  No.4
  // queue size = 4
  // create  No.5
  // execute No.1:Wed Dec 20 21:46:43 JST 2006:SingleThrededExecutor$RestResponse@1891d8f
  // queue size = 4
  // create  No.6
  // queue size = 5
  // create  No.7
  // execute No.7:Wed Dec 20 21:46:44 JST 2006:null
  // queue size = 5
  // create  No.8
  // execute No.8:Wed Dec 20 21:46:44 JST 2006:null
  // execute No.2:Wed Dec 20 21:46:45 JST 2006:SingleThrededExecutor$RestResponse@f3d6a5
  // queue size = 4
  // create  No.9
  // execute No.3:Wed Dec 20 21:46:46 JST 2006:SingleThrededExecutor$RestResponse@911f71
  // execute No.4:Wed Dec 20 21:46:47 JST 2006:SingleThrededExecutor$RestResponse@1a73d3c
  // execute No.5:Wed Dec 20 21:46:48 JST 2006:SingleThrededExecutor$RestResponse@53ba3d
  // execute No.6:Wed Dec 20 21:46:49 JST 2006:SingleThrededExecutor$RestResponse@7b7072
  // execute No.9:Wed Dec 20 21:46:50 JST 2006:SingleThrededExecutor$RestResponse@136228
 
  //////////////////////////////////////////////////////////////////////
 
  private final long interval;
  private final int capacity;
  private long last_excution = -1;
 
  private volatile int counter = 0;
 
  /**
   * @param interval 処理実行のインターバル時間
   * @param capacity 待ち行列の最大数
   */
  public SingleThrededExecutor(long interval, int capacity) {
    this.interval = interval;
    this.capacity = capacity;
  }
 
  /**
   * 現在の処理実行要求数を返します。
   */
  public int getQueueSize(){
    return counter;
  }
  
  /**
   * 処理の実行を要求します。
   * 他のスレッドからの要求が先に来ている場合は、そちらを優先して処理します。
   * 処理の待ち行列が限界を超えている場合は、null を返します。
   * @param req 処理要求オブジェクト
   * @return 処理結果オブジェクト
   */
  public Response doit(Request req) throws Exception {
 
    if (counter > capacity) {
      // すでにcapacityより多くのスレッドが処理待ちで処理できず
      return null;
    } else {
      try {
        counter++;
        Response res = gate(req); // ここで待ち行列が発生
        return res;
      } finally {
        counter--;
      }
    }
  }
 
  /**
   * 処理の実行を要求します。
   * 他のスレッドからの要求が先に来ている場合は、そちらを優先して処理します。
   * 処理の待ち行列が限界を超えている場合は、null を返します。
   * @param req 処理要求オブジェクト
   * @return 処理結果オブジェクト
   */
  private synchronized Response gate(Request req) throws Exception {
    
    if(last_excution != -1){
      long now = System.currentTimeMillis();
      long remaining_time = (last_excution + interval) - now;
      if(remaining_time > 0){
        // inteval処理待ち
        Thread.sleep(remaining_time);
      }
    }
    
    try{
      Response res = req.doit();
      return res;
    }finally{
      // 処理が終わった時刻を最終実行時刻とする
      last_excution = System.currentTimeMillis();
    }
  }
 
  public static interface Request {
    public Response doit() throws Exception;
  }
 
  public static interface Response {
  }
 
  public static class RestRequest implements Request {
 
    public String url;
 
    public Response doit() throws Exception {
      RestResponse res = new RestResponse();
      res.doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(url);
      return res;
    }
  }
 
  public static class RestResponse implements Response {
    public Document doc;
  }
}

増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編 の旧版を横に置きながら書いてみたコードだけど、ぜんぜんちがうクラス構成になっているような。

tags: zlashdot Java Java

Posted by NI-Lab. (@nilab)