>>分享孙卫琴的Java技术专稿和著作 书籍支持  卫琴直播  品书摘要  在线测试  资源下载  联系我们
发表一个新主题 开启一个新投票 回复文章 您是本文章第 24396 个阅读者 刷新本主题
 * 贴子主题:  【Java网络编程专题】异步通道和异步运算结果 回复文章 点赞(0)  收藏  
作者:sunweiqin    发表时间:2019-12-21 16:16:12     消息  查看  搜索  好友  邮件  复制  引用

本文参考《Java网络编程核心技术详解》,作者:孙卫琴,电子工业出版社出版
源代码下载地址为:http://www.javathinker.net/kecheng/javanet/javanetsourcecode.rar

从JDK7开始,引入了表示异步通道的AsynchronousSocketChannel类和AsynchronousServerSocketChannel类,这两个类的作用与SocketChannel类和ServerSocketChannel相似,区别在于异步通道的一些方法总是采用非阻塞工作模式,并且它们的非阻塞方法会立即返回一个Future对象,用来存放方法的异步运算结果。
AsynchronousSocketChannel类有以下非阻塞方法:
Future<Void> connect(SocketAddress remote):连接远程主机。
Future<Integer> read(ByteBuffer dst):从通道中读入数据,存放到ByteBuffer中。Future对象中包含了实际从通道中读到的字节数。
Future<Integer> write(ByteBuffer src):把ByteBuffer中的数据写入到通道中。Future对象中包含了实际写入通道的字节数。

AsynchronousServerSocketChannel类有以下非阻塞方法:
Future<AsynchronousSocketChannel> accept():接受客户连接请求。Future对象中包含了连接建立成功后创建的AsynchronousSocketChannel对象。

使用异步通道,可以使程序并行执行多个异步操作,例如:


SocketAddress socketAddress=……;
AsynchronousSocketChannel client= AsynchronousSocketChannel.open();
//请求建立连接
Future<Void > connected=client.connect(socketAddress);
ByteBuffer byteBuffer=ByteBuffer.allocate(128);

//执行其他操作
//……

//等待连接完成
connected.get();  
//读取数据
Future<Integer> future=client.read(byteBuffer);

//执行其他操作
//……

//等待从通道读取数据完成
future.get();

byteBuffer.flip();
WritableByteChannel out=Channels.newChannel(System.out);
out.write(byteBuffer);

以下PingClient类演示了异步通道的用法。它不断接收用户输入的域名(即网络上主机的名字),然后与这个主机上的80端口建立连接,最后打印建立连接所花费的时间。如果程序无法连接到指定的主机,就打印相关错误信息。如果用户输入“bye”,就结束程序。以下是运行PingClient类时用户输入的信息以及程序输出的信息。其中采用黑色字体的行表示用户向控制台输入的信息,采用绿色字体的行表示程序的输出结果:

C:\chapter04\classes>java nonblock.PingClient
www.abc888.com
www.javathinker.net
ping www.abc888.com的结果 : 连接失败
ping www.javathinker.net的结果 : 20ms
bye

从以上打印结果可以看出,PingClient连接远程主机www.javathinker.net用了20ms,而连接www.abc888.com主机失败。从打印结果还可以看出,PingClient采用异步通信方式,当用户输入一个主机名后,不必等到程序输出对这个主机名的处理结果,就可以继续输入下一个主机名。对每个主机名的处理结果要等到连接已经成功或者失败后才打印出来。


/* PingClient.java */
package nonblock;
import java.net.*;
……
class PingResult {  //表示连接一个主机的结果
  InetSocketAddress address;
  long connectStart;  //开始连接时的时间
  long connectFinish = 0;  //连接成功时的时间
  String failure;
  Future<Void> connectResult;  //连接操作的异步运算结果
  AsynchronousSocketChannel socketChannel;
  String host;
  final String ERROR="连接失败";

  PingResult(String host) {
      try {
          this.host=host;
          address =
              new InetSocketAddress(InetAddress.getByName(host),80);
      } catch (IOException x) {
          failure = ERROR;
      }
  }  

  public void print() {  //打印连接一个主机的执行结果
      String result;
      if (connectFinish != 0)
          result = Long.toString(connectFinish - connectStart) + "ms";
      else if (failure != null)
          result = failure;
      else
          result = "Timed out";
      System.out.println("ping "+ host+"的结果" + " : " + result);
  }
}

public class PingClient{
  //存放所有PingResult结果的队列
  private LinkedList<PingResult> pingResults=
               new LinkedList<PingResult>();
  boolean shutdown=false;
  ExecutorService executorService;

  public PingClient()throws IOException{
    executorService= Executors.newFixedThreadPool(4);
    executorService.execute(new Printer());
    receivePingAddress();
  }

  public static void main(String args[])throws IOException{
    new PingClient();
  }
  
  /** 接收用户输入的主机地址,由线程池执行PingHandler任务 */  
  public void receivePingAddress(){
    try{
      BufferedReader localReader=new BufferedReader(
                    new InputStreamReader(System.in));
      String msg=null;
      //接收用户输入的主机地址
      while((msg=localReader.readLine())!=null){
        if(msg.equals("bye")){
          shutdown=true;
          executorService.shutdown();
          break;
        }
        executorService.execute(new PingHandler(msg));
      }
    }catch(IOException e){ }
  }
  
  /** 尝试连接特定主机,并且把运算结果加入到PingResults结果队列中 */
  public void addPingResult(PingResult pingResult) {
     AsynchronousSocketChannel socketChannel = null;
     try {
       socketChannel = AsynchronousSocketChannel.open();
        
       pingResult.socketChannel=socketChannel;
       pingResult.connectStart = System.currentTimeMillis();

       synchronized (pingResults) {
         //向pingResults队列中加入一个PingResult对象
         pingResults.add(pingResult);
         pingResults.notify();
       }

       Future<Void> connectResult=
           socketChannel.connect(pingResult.address);
       pingResult.connectResult = connectResult;
    }catch (Exception x) {
      if (socketChannel != null) {
        try {socketChannel.close();} catch (IOException e) {}
      }
      pingResult.failure = pingResult.ERROR;
    }
  }

  /** 打印PingResults结果队列中已经执行完毕的任务的结果 */
  public void printPingResults() {
    PingResult pingResult = null;
    while(!shutdown ){
      synchronized (pingResults) {
        while (!shutdown && pingResults.size() == 0 ){
          try{
            pingResults.wait(100);
          }catch(InterruptedException e){e.printStackTrace();}
        }

        if(shutdown  && pingResults.size() == 0 )break;
        pingResult=pingResults.getFirst();
              
        try{
          if(pingResult.connectResult!=null)
            pingResult.connectResult.get(500,TimeUnit.MILLISECONDS);
        }catch(Exception e){
            pingResult.failure= pingResult.ERROR;
        }

        if(pingResult.connectResult!=null
           && pingResult.connectResult.isDone()){

          pingResult.connectFinish = System.currentTimeMillis();
        }
              
        if(pingResult.connectResult!=null
           && pingResult.connectResult.isDone()
           || pingResult.failure!=null){

           pingResult.print();
           pingResults.removeFirst();
           try {
              pingResult.socketChannel.close();
            } catch (IOException e) { }
         }
      }
    }
  }

  /** 尝试连接特定主机,生成一个PingResult对象,
     把它加入到PingResults结果队列中 */

  public class PingHandler implements Runnable{
    String msg;
    public PingHandler(String msg){
        this.msg=msg;  
    }
    public void run(){
        if(!msg.equals("bye")){
          PingResult pingResult=new PingResult(msg);
          addPingResult(pingResult);
        }
    }
  }

  /** 打印PingResults结果队列中已经执行完毕的任务的结果 */
  public class Printer implements Runnable{
    public void run(){
        printPingResults();
    }
  }
}

以上PingResult类表示连接一个主机的执行结果。PingClient类的PingResults队列存放所有的PingResult对象。
PingClient类还定义了两个表示特定任务的内部类:
(1)PingHandler任务类:负责通过异步通道去尝试连接客户端输入的主机地址,并且创建一个PingResult对象,它包含了连接操作的异步运算结果。再把PingResult对象加入到PingResults结果队列中。
(2)Printer任务类:负责打印PingResults结果队列中已经执行完毕的任务结果。打印完毕的PingResult对象会从PingResults队列中删除。

PingClient类的main主线程完成以下操作:
(1)创建线程池。
(2)向线程池提交Printer任务。
(3)不断读取客户端输入的主机地址,向线程池提交PingHandler任务。如果客户端输入“bye”,就结束程序。

PingClient类的线程池完成以下操作:
(1) 执行Printer任务。
(2)执行PingHander任务。



程序猿的技术大观园:www.javathinker.net



[这个贴子最后由 admin 在 2021-10-09 10:55:32 重新编辑]
  Java面向对象编程-->内部类
  JavaWeb开发-->自定义JSP标签(Ⅰ)
  JSP与Hibernate开发-->持久化层的映射类型
  Java网络编程-->通过JDBC API访问数据库
  精通Spring-->计算属性和数据监听
  Vue3开发-->Vue CLI脚手架工具
  【Vue.js技术专题】CSS中DOM元素的过渡模式
  【Vue.js技术专题】Vuex中异步操作
  【Spring Cloud Alibaba专题】SkyWalking整合MySQL
  【Spring Cloud Alibaba专题】按照集群模式搭建Redis集群
  【Spring Cloud Alibaba专题】Dubbo框架中提供者回调消费者
  【Spring Cloud Alibaba专题】Nacos集群的Raft算法
  【持久化专题】从JPA API中获得Hibernate API
  【持久化专题】@Access注解设定Hibernate访问类的属性的方式
  【持久化专题】Hibernate的配置文件
  【持久化专题】Spring与Hibernate与JPA的整合
  【Java网络编程专题】优化访问数据库的程序代码的一些技巧
  【JavaWeb专题】在JavaWeb应用中对客户请求的异步处理
  《大话Java程序设计从入门到精通》写作花絮
  【Java基础编程专题】Java基本类型和引用类型的三个区别
  IT培训课、视频教程和书本之PK
  更多...
 IPIP: 已设置保密
树形列表:   
1页 0条记录 当前第1
发表一个新主题 开启一个新投票 回复文章


中文版权所有: JavaThinker技术网站 Copyright 2016-2026 沪ICP备16029593号-2
荟萃Java程序员智慧的结晶,分享交流Java前沿技术。  联系我们
如有技术文章涉及侵权,请与本站管理员联系。