자바 서블릿 컨테이너의 Comet 지원 1 - Jetty

이전글: Comet 에 대하여

Jetty 는 버전 6.0에서부터 Continuation 이라는 것을 도입해서 Comet 스타일의 웹 프로그래밍을 효과적으로 구현할 수 있게 되었다. jetty의 Continuation 을 이용하면 웹 요청을 처리를 중지(suspend)시켜서 대기 상태로 만들었다가 후에 필요할 때 다시 재개(resume)시킬 수 있다. 웹 요청 처리를 중지(suspend)시키면 요청을 처리하던 스레드는 다시 스레드 풀(Thread Pool)로 되돌아가 다른 요청을 처리할 수 있게 된다. 이로서 장시간 유지되는 HTTP 연결을 더 적은 수의 스레드로 효과적으로 처리할 수 있게 된다.

Continuation 은 scheme 등의 언어에서 지원하던 개념인데, jetty 에서는 이와 비슷하긴 하지만 꼭 같지는 않다. 요청 처리를 중지(suspend)시킨 후 재개(resume)하면 바로 중지시킨 그 지점에서 다시 진행하는 것이 아니라 요청 처리 체인(FilterChain)을 다시 처음부터 진행한다. 이것 때문에 프로그래밍에 약간 신경써야 할 것들이 있다.

실제 Jetty 의 Contiuation 을 이용하여 채팅을 구현한 예로 설명을 진행하겠다.

먼저 채팅 페이지를 보여주고 채팅 메시지를 받는 서블릿이다.

// ChatServlet.java
// ......
public void doGet(HttpServletRequest request, HttpServletResponse response) throws ... {
  request.getRequestDispatcher("/WEB-INF/jsp/chat.jsp").forward(request, response);
}

public void doPost(HttpServletRequest request, HttpServletResponse response) throws ... {
  String name = request.getParameter("name");
  String text = request.getParameter("text");

  String message = "<b>" + name + "</b>: " + text + "<br>";
  messageSender.sendMessage(message);

  response.setContentType("text/html");
  response.getWriter().println("OK");
}
// ......

채팅 페이지는 다음과 같다.

  chat.jsp
  ...
  <script type="text/javascript">
      function pollMessage() {
          new Ajax.Updater({ success: 'panel' }, '/broadcaster', {
              method: 'get',
              insertion: 'bottom',
              onComplete: pollMessage
          });
      }
      function sendMessage() {
          new Ajax.Request('/chat', {
              method: 'post',
              parameters: { nick: $F('name'), text: $F('text') },
          });
          $('text').clear();
          return false;
      }
  </script>
  </head>
  <body onload="pollMessage()">
  <h1>채팅 페이지</h1>
  <div id="panel"></div>
  <form action="" method="POST" onsubmit="return sendMessage()">
      Nick <input type="text" name="name" id="name" size="10">
      <input type="text" name="text" id="text" size="40">
      <input type="submit" value="보내기">
  </form>
  ...

ajax 를 이용해서 채팅 메시지를 ChatServlet 으로 보낸다. 그리고 BroadcasterServlet 에 long polling 으로 접속해서 메시지를 받아 이를 패널에 출력한다. pollMessage 함수가 Comet long polling 을 하는 코드이다.

ChatServlet 은 chat 메시지를 받으면 이를 messageSender 객체로 보낸다. messageSender 객체는 MessageSender 클래스의 인스턴스로 미리 ServletContext 에 저장해 놓고 최초 context 기동시에 기동시킨다.

다음은 Long Polling 으로 접속된 클라이언트에 채팅 메시지를 뿌려주는 BroadcasterServlet 이다. 이부분에 jetty의 Continuation 이 사용되었다.

// BroadcasterServlet.java
// ......
public void doGet(HttpServletRequest request, HttpServletResponse response) throws ... {
    Continuation continuation = ContinuationSupport.getContinuation(request, null);
    if (! continuation.isPending()) {
        // new session start
        response.setContentType("text/html; charset=utf-8");
        messageSender.addSession(continuation);
    }
    continuation.suspend(0); // request suspend
    // resumed
    String message = (String) continuation.getObject();
    PrintWriter out = response.getWriter();
    out.println(message);
    out.flush();
    response.flushBuffer();
}
// ......

여기서 messageSender 객체는 위의 ChatServlet 의 messageSender 객체와 같은 객체이다.

최초로 요청이 처리될 때에는 continuation.isPending() 메서드가 false 가 된다. 그리고 Contiunation 이 suspend 되었다가 resume 되어 다시 요청 처리 체인이 진행될 때에는 isPending() 메서드가 true 가 된다(다시 suspend 메서드가 호출될 때까지). 이를 이용하여 suspend 호출 이후 부분만을 다시 실행시키는 것처럼 할 수 있다.

continuation.suspend() 가 호출되면 이 요청 처리는 여기서 끝나게 된다. 그렇지만 HTTP 커넥션은 그대로 유지되고 있으며 request, response 객체도 여전히 유효한 상태이다. 이후 continuation.resume() 이 호출되면 이 요청 처리 체인은 다시 실행된다(resume 메서드는 messageSender 에서 호출한다). 이 때 continuation 객체는 이전과 같은 객체이며, 다시 suspend() 호출을 만날 때까지 isPending() 메서드는 true 가 된다(반면 최초 suspend() 호출 이전에 isPending() 메서드는 false 이다). 이를 이용하여 suspend() 이후를 진행시키는 것과 같은 효과를 낼 수 있는 것이다.

MessageSender 클래스는 다음과 같다.

// MessageSender.java
public class MessageSender implements Runnable {
  private volatile boolean running = true;
  private final BlockingQueue<String> messages =
                             new LinkedBlockingQueue<String>();
  private final Set<Continuation> sessions =
                             new CopyOnWriteArraySet<Continuation>();

  public void stop() {
    this.running = false;
  }

  public void addSession(Continuation cont) {
    sessions.add(cont);
  }

  public void sendMessage(String message) {
    try {
      messages.put(message);
    } catch (InterruptedException ignore) {
      // 종료 처리를 해야 하지만 생략
    }
  }

  public void run() {
    while (running) {
      String message = null;
      try {
        message = messages.take();
      } catch (InterruptedException ignore) {
        // 종료 처리를 해야 하지만 생략
      }
      for (Continuation continuation : sessions) {
        continuation.setObject(message);
        continuation.resume();
        sessions.remove(continuation);
      }
    }
  }
}

이 클래스는 ServletContextListener 에서 최초 context 가 기동될 때 인스턴스를 생성하여 새 스레드로 시작한다.

ChatServlet 에서 sendMessage 를 호출하면 메시지가 Queue 에 쌓이게 되고 이 메시지는 메시지 처리 스레드에서 받아 continuation 객체에 저장한 다음 continuation 을 resume 한다. 그러면 BroadcasterServlet 에서 다시 실행이 되는 것이다. continuation suspend() 호출과 resume 호출 그리고 resume 후의 진행은 모두 다른 스레드에서 진행된다. 따라서 이 사이의 데이타 교환은 Continuation 객체의 setObject 와 getObject 메서드를 통해서만 해야 한다.

Jetty 의 Continuation 방식의 Comet 지원은 기존 Servlet API 가 크게 바뀌지 않아도 된다는 장점이 있다. 그래서인지 Servlet 3.0 의 Comet 지원도 이와 비슷한 방식 - ServletRequest 를 suspend, resume 하는 방식 - 으로 논의되고 있는 것 같다. 그렇지만 약간은 억지같은 Continuation 방식으로 최초 suspend 시 이곳에서 처리가 끝난다는 점이 직관적이지 못하며 resume 을 위해 isPending 과 같은 메서드로 체크를 해야 한다는 번거로움이 있다.

Tomcat 6.0, Resin 3.1 의 방식은 훨씬 이해하기 쉬운 방식인데 이는 다음 번에 다루겠다.

2008-07-30 16:52 | Permlink | Comments
blog comments powered by Disqus

About

Laser Keyboard

I'm Jin Kim. Live in Seoul, Korea. Software Developer. Java, Perl, Scala, Common Lisp. Mainly programming in web program and its backend data store. Interested in data processing and distributed computing.

Archive

RSS