本节将介绍在 play 里如何进行异步处理以实现典型的长轮询(long-polling) 、 流以及其他 Comet-style 类型的应用程序以支持上千个同时发生的连接。
10.1. 暂停 http 请求
Play使用的是短小的请求, 它使用固定的线程池来处理 http连接者的查询请求。
为了达到最佳效果,线程池应该尽可能小。我们通常使用的最适合的数字就是处 理器数量+1 来设置默认池大小。
也就是说如果请求的处理时间很长的话(比如等待一个长时的计算),这个请求 就会耗尽线程池,使应用程序的响应变得很慢。当然可以扩大线程池,但这样会 浪费资源,而且线程池不可能是无限的。
考虑一下聊天室应用,浏览器发送一个阻塞式的 http 请求用于等待显示新的信 息。这个请求将会非常这长(比如几分钟),并且会打破线程池。如果计划允许 100 个用户同时连接聊天室程序,那么我们就需要提供至少 100 个线程,当然, 这是可行的。但如果是 1000 个,或 100000 个呢?
在这种情况下,play 允许你暂停一个请求。http 请求将停留在连接状态,但请 求执行将被从线程池中弹出,过会再试。你即可在固定等待的时间内告诉 play 去测试一下该请求,也可等待一个允许值变为可能情况。
小提示: 请看一下真实的示例 samples-and-tests/chat 比如, 下面这个动作将要加载一个非常耗时的 job 并且一直等到 job 完成返回结 果:
public static void generatePDF(Long reportId) {
Promise<InputStream> pdf = new ReportAsPDFJob(report).now();
InputStream pdfStream = await(pdf);
renderBinary(pdfStream);
}
这里, 我们使用 await(„)来让 Play 暂停请求,直到 Promise<InputStream> 值 返回 redeemed。
Continuations
因为框架需要收回线程以便为其他请求服务,因此 play 就必须暂停你的代码。
在之前的 play 版本中 await(„) 等价于 waitFor(„),用于暂停你的 action, 之后又重新调用。
为了易于约定我们介绍的异步代码,Continuations 允许代码被暂停和被透明恢 复,因此书写如下代码是非常必要的:
public static void computeSomething() {
Promise<String> delayedResult = veryLongComputation(„);
String result = await(delayedResult);
render(result);
}
在这里,事实上你的代码将分成两步用两个不同的线程来执行。但这些代码对你 来说是完全透明的。
使用 await(„)和 continuations,你可能需要写一个循环:
public static void loopWithoutBlocking() {
for(int i=0; i<=10; i++) {
Logger.info(i);
await("1s");
}
renderText("Loop finished");
}
当使用一个线程处理这个请求时,在默认的开发模式下,Play 能够同时为不同 的请求运行这些循环。
更实际的示例是异步从远程 URL 获取内容。 下面将并行运行三个远程 http 请求,每个都调用 play.libs.WS.WSRequest.getAsync()方法来执行一个 GET 请求 , 异步返回一个 play.libs.F.Promise。 action 方法通过调用三个 Promise 组合实 例的 await(„)方法来暂停进入的 http 请求。当三个远程调用返回结果后,线 程将自动恢复并且渲染 response。
public class AsyncTest extends Controller {
public static void remoteData() {
F.Promise<WS.HttpResponse> r1 = WS.url("http://example.org/1").getAsync();
F.Promise<WS.HttpResponse> r2 = WS.url("http://example.org/2").getAsync();
F.Promise<WS.HttpResponse> r3 = WS.url("http://example.org/3").getAsync();
F.Promise<List<WS.HttpResponse>> promises = F.Promise.waitAll(r1, r2, r3);
//暂停处理,直到所有三个远程调用结束
List<WS.HttpResponse> httpResponses = await(promises);
render(httpResponses);
}
}
回调 Callbacks
还可以使用回调实现上面的示例。这次,await() 方法包含了 play.libs.F.Action 实现,当三个远程调用结束后就调用这个回调方法。
public class AsyncTest extends Controller {
public static void remoteData() {
F.Promise<WS.HttpResponse> r1 = WS.url("http://example.org/1").getAsync();
F.Promise<WS.HttpResponse> r2 = WS.url("http://example.org/2").getAsync();
F.Promise<WS.HttpResponse> r3 = WS.url("http://example.org/3").getAsync();
F.Promise<List<WS.HttpResponse>> promises = F.Promise.waitAll(r1, r2, r3);
//暂停处理,直到所有三个远程调用结束
await(promises, new F.Action<List<WS.HttpResponse>>() {
public void invoke(List<WS.HttpResponse> httpResponses) {
render(httpResponses);
}
});
}
}
10.2. HTTP response 流 streaming
既然不用中心请求也可执行循环, 你或许会希望向浏览器发送结果变量的部分数 据(不是全部)。这就是 Content-Type:Chunked HTTP response 大量 http response 类型。它允许你多次使用多个块来发送 http response。浏览器将实时 接收这些块。
使用 await(„)和 continuations,就可以实现这个功能:
public static void generateLargeCSV() {
CSVGenerator generator = new CSVGenerator();
response.contentType = "text/csv";
while(generator.hasMoreData()) {
String someCsvData = await(generator.nextDataChunk());
response.writeChunk(someCsvData);
}
}
即使 CSV 生成需要 1 个小时,play 也能同时使用单个线程处理多个请求,一旦 为客户端的数据生成好后,play 就会向客户端发送。
10.3. 使用 WebSockets
WebSockets 是一种在浏览器和应用程序间实现双向通信的途径。在浏览器端使 用 “ws://” url:
new Socket("ws://localhost:9000/helloSocket?name=Guillaume")
在 play 端需要声明一条 WS 路由:
WS /helloSocket MyWebSocket.hello
MyWebSocket 是一个 WebSocketController。一个 WebSocket 控制器和一个标准 的 http 控制器很相似,但处理的内容不同:
- 它有一个请求对象,但没有 response 对象
- 它有一个可访问的 session,但是只读的
- 它没有 renderArgs, routeArgs 和 flash 域
- 它只能从路由模式和 QueryString 里读取 params
- 它拥有两个通信通道:一进一出
当客户连接到 ws://localhost:9000/helloSocket 套接字时, Play 将调用 MyWebSocket.hello 动作方法。一旦 MyWebSocket.hello 动作方法存在,套接字 就会被关闭。
因此一个非常基础的套接字示例应该是这个样子:
public class MyWebSocket extends WebSocketController {
public static void hello(String name) {
outbound.send("Hello %s!", name);
}
}
在这里, 当客户端连接到 socket 时, 它将接收到‘Hello Guillaume’消息, play 随后将关闭这个 socket。
当然,通常情况下你不需要立即关闭 socket,用 await(„)和 continuations 也能实现。
比如一个基础的 Echo 服务器:
public class MyWebSocket extends WebSocketController {
public static void echo() {
while(inbound.isOpen()) {
WebSocketEvent e = await(inbound.nextEvent());
if(e instanceof WebSocketFrame) {
WebSocketFrame frame = (WebSocketFrame)e;
if(!e.isBinary) {
if(frame.textData.equals("quit")) {
outbound.send("Bye!");
disconnect();
} else {
outbound.send("Echo: %s", frame.textData);
}
}
}
if(e instanceof WebSocketClose) {
Logger.info("Socket closed!");
}
}
}
}
在上面的示例里,嵌套的‘if’和‘cast’很乏味,而且容易出错。即使这个简 单示例也不容易处理。更复杂的多个联合线程的情况下将会有更多的事件类型, 这将是一个恶梦。
这就是为什么我们要向你介绍在 play.libs.F 库里的基础的模式匹配的原因。
现在我们重写一下 echo 示例:
public static void echo() {
while(inbound.isOpen()) {
WebSocketEvent e = await(inbound.nextEvent());
for(String quit: TextFrame.and(Equals("quit")).match(e)) {
outbound.send("Bye!");
disconnect();
}
for(String msg: TextFrame.match(e)) {
outbound.send("Echo: %s", frame.textData);
}
for(WebSocketClose closed: SocketClosed.match(e)) {
Logger.info("Socket closed!");
}
}
}