<dfn id="w48us"></dfn><ul id="w48us"></ul>
  • <ul id="w48us"></ul>
  • <del id="w48us"></del>
    <ul id="w48us"></ul>
  • 如何解決Java Socket通信技術收發線程互斥

    時間:2024-08-28 18:14:33 SUN認證 我要投稿
    • 相關推薦

    如何解決Java Socket通信技術收發線程互斥

      Java Socket通信技術在很長的時間里都在使用,在不少的程序員眼中都有很多高的評價。那么下面我們就看看如何才能掌握這門復雜的編程語言,希望大家在今后的Java Socket通信技術使用中有所收獲。

      下面就是Java Socket通信技術在解決收發線程互斥的代碼介紹。

      1.package com.bill99.svr;

      2.import java.io.IOException;

      3.import java.io.InputStream;

      4.import java.io.OutputStream;

      5.import java.net.InetSocketAddress;

      6.import java.net.Socket;

      7.import java.net.SocketException;

      8.import java.net.SocketTimeoutException;

      9.import java.text.SimpleDateFormat;

      10.import java.util.Date;

      11.import java.util.Properties;

      12.import java.util.Timer;

      13.import java.util.TimerTask;

      14.import java.util.concurrent.ConcurrentHashMap;

      15.import java.util.concurrent.TimeUnit;

      16.import java.util.concurrent.locks.Condition;

      17.import java.util.concurrent.locks.ReentrantLock;

      18.import org.apache.log4j.Logger;

      19./**

      20.*

    title: socket通信包裝類

     

      21.*

    Description:

     

      22.*

    CopyRight: CopyRight (c) 2009

     

      23.*

    Company: 99bill.com

     

      24.*

    Create date: 2009-10-14

     

      25.*author sunnylocus

      26. * v0.10 2009-10-14 初類

      27.* v0.11 2009-11-12 對命令收發邏輯及收發線程互斥機制進行了優化,

      處理命令速度由原來8~16個/秒提高到25~32個/秒

      28.*/ public class SocketConnection {

      29.private volatile Socket socket;

      30.private int timeout = 1000*10; //超時時間,初始值10秒

      31.private boolean isLaunchHeartcheck = false;//是否已啟動心跳檢測

      32.private boolean isNetworkConnect = false; //網絡是否已連接

      33.private static String host = "";

      34.private static int port;

      35.static InputStream inStream = null;

      36.static OutputStream outStream = null;

      37.private static Logger log =Logger.getLogger

      (SocketConnection.class);

      38.private static SocketConnection socketConnection = null;

      39.private static java.util.Timer heartTimer=null;

      40.//private final Map recMsgMap= Collections.

      synchronizedMap(new HashMap());

      41.private final ConcurrentHashMap recMsgMap

      = new ConcurrentHashMap();

      42.private static Thread receiveThread = null;

      43.private final ReentrantLock lock = new ReentrantLock();

      44.private SocketConnection(){

      45.Properties conf = new Properties();

      46.try {

      47.conf.load(SocketConnection.class.getResourceAsStream

      ("test.conf"));

      48.this.timeout = Integer.valueOf(conf.getProperty("timeout"));

      49.init(conf.getProperty("ip"),Integer.valueOf

      (conf.getProperty("port")));

      50.} catch(IOException e) {

      51.log.fatal("socket初始化異常!",e);

      52.throw new RuntimeException("socket初始化異常,請檢查配置參數");

      53.}

      54.}

      55./**

      56.* 單態模式

      57.*/

      58.public static SocketConnection getInstance() {

      59.if(socketConnection==null) {

      60.synchronized(SocketConnection.class) {

      61.if(socketConnection==null) {

      62.socketConnection = new SocketConnection();

      63.return socketConnection;

      64.}

      65.}

      66.}

      67.return socketConnection;

      68.}

      69.private void init(String host,int port) throws IOException {

      70.InetSocketAddress addr = new InetSocketAddress(host,port);

      71.socket = new Socket();

      72.synchronized (this) {

      73.log.info("【準備與"+addr+"建立連接】");

      74.socket.connect(addr, timeout);

      75.log.info("【與"+addr+"連接已建立】");

      76.inStream = socket.getInputStream();

      77.outStream = socket.getOutputStream();

      78.socket.setTcpNoDelay(true);//數據不作緩沖,立即發送

      79.socket.setSoLinger(true, 0);//socket關閉時,立即釋放資源

      80.socket.setKeepAlive(true);

      81.socket.setTrafficClass(0x04|0x10);//高可靠性和最小延遲傳輸

      82.isNetworkConnect=true;

      83.receiveThread = new Thread(new ReceiveWorker());

      84.receiveThread.start();

      85.SocketConnection.host=host;

      86.SocketConnection.port=port;

      87.if(!isLaunchHeartcheck)

      88.launchHeartcheck();

      89.}

      90.}

      91./**

      92.* 心跳包檢測

      93.*/

      94.private void launchHeartcheck() {

      95.if(socket == null)

      96.throw new IllegalStateException("socket is not

      established!");

      97.heartTimer = new Timer();

      98.isLaunchHeartcheck = true;

      99.heartTimer.schedule(new TimerTask() {

      100.public void run() {

      101.String msgStreamNo = StreamNoGenerator.getStreamNo("kq");

      102.int mstType =9999;//999-心跳包請求

      103.SimpleDateFormat dateformate = new SimpleDateFormat

      ("yyyyMMddHHmmss");

      104.String msgDateTime = dateformate.format(new Date());

      105.int msgLength =38;//消息頭長度

      106.String commandstr = "00" +msgLength + mstType + msgStreamNo;

      107.log.info("心跳檢測包 -> IVR "+commandstr);

      108.int reconnCounter = 1;

      109.while(true) {

      110.String responseMsg =null;

      111.try {

      112.responseMsg = readReqMsg(commandstr);

      113.} catch (IOException e) {

      114.log.error("IO流異常",e);

      115.reconnCounter ++;

      116.}

      117.if(responseMsg!=null) {

      118.log.info("心跳響應包 <- IVR "+responseMsg);

      119.reconnCounter = 1;

      120.break;

      121.} else {

      122.reconnCounter ++;

      123.}

      124.if(reconnCounter >3) {//重連次數已達三次,判定網絡連接中斷,

      重新建立連接。連接未被建立時不釋放鎖

      125.reConnectToCTCC(); break;

      126.}

      127.}

      128.}

      129.},1000 * 60*1,1000*60*2);

      130.}

      131./**

      132.* 重連與目標IP建立重連

      133.*/

      134.private void reConnectToCTCC() {

      135.new Thread(new Runnable(){

      136.public void run(){

      137.log.info("重新建立與"+host+":"+port+"的連接");

      138.//清理工作,中斷計時器,中斷接收線程,恢復初始變量

      139.heartTimer.cancel();

      140.isLaunchHeartcheck=false;

      141.isNetworkConnect = false;

      142.receiveThread.interrupt();

      143.try {

      144.socket.close();

      145.} catch (IOException e1) {log.error("重連時,關閉socket連

      接發生IO流異常",e1);}

      146.//----------------

      147.synchronized(this){

      148.for(; ;){

      149.try {

      150.Thread.currentThread();

      151.Thread.sleep(1000 * 1);

      152.init(host,port);

      153.this.notifyAll();

      154.break ;

      155.} catch (IOException e) {

      156.log.error("重新建立連接未成功",e);

      157.} catch (InterruptedException e){

      158.log.error("重連線程中斷",e);

      159.}

      160.}

      161.}

      162.}

      163.}).start();

      164.}

      165./**

      166.* 發送命令并接受響應

      167.* @param requestMsg

      168.* @return

      169.* @throws SocketTimeoutException

      170.* @throws IOException

      171.*/

      172.public String readReqMsg(String requestMsg) throws IOException {

      173.if(requestMsg ==null) {

      174.return null;

      175.}

      176.if(!isNetworkConnect) {

      177.synchronized(this){

      178.try {

      179.this.wait(1000*5); //等待5秒,如果網絡還沒有恢復,拋出IO流異常

      180.if(!isNetworkConnect) {

      181.throw new IOException("網絡連接中斷!");

      182.}

      183.} catch (InterruptedException e) {

      184.log.error("發送線程中斷",e);

      185.}

      186.}

      187.}

      188.String msgNo = requestMsg.substring(8, 8 + 24);//讀取流水號

      189.outStream = socket.getOutputStream();

      190.outStream.write(requestMsg.getBytes());

      191.outStream.flush();

      192.Condition msglock = lock.newCondition(); //消息鎖

      193.//注冊等待接收消息

      194.recMsgMap.put(msgNo, msglock);

      195.try {

      196.lock.lock();

      197.msglock.await(timeout,TimeUnit.MILLISECONDS);

      198.} catch (InterruptedException e) {

      199.log.error("發送線程中斷",e);

      200.} finally {

      201.lock.unlock();

      202.}

      203.Object respMsg = recMsgMap.remove(msgNo); //響應信息

      204.if(respMsg!=null &&(respMsg != msglock)) {

      205.//已經接收到消息,注銷等待,成功返回消息

      206.return (String) respMsg;

      207.} else {

      208.log.error(msgNo+" 超時,未收到響應消息");

      209.throw new SocketTimeoutException(msgNo+" 超時,未收到響應消息");

      210.}

      211.}

      212.public void finalize() {

      213.if (socket != null) {

      214.try {

      215.socket.close();

      216.} catch (IOException e) {

      217.e.printStackTrace();

      218.}

      219.}

      220.}

      221.//消息接收線程

      222.private class ReceiveWorker implements Runnable {

      223.String intStr= null;

      224.public void run() {

      225.while(!Thread.interrupted()){

      226.try {

      227.byte[] headBytes = new byte[4];

      228.if(inStream.read(headBytes)==-1){

      229.log.warn("讀到流未尾,對方已關閉流!");

      230.reConnectToCTCC();//讀到流未尾,對方已關閉流

      231.return;

      232.}

      233.byte[] tmp =new byte[4];

      234.tmp = headBytes;

      235.String tempStr = new String(tmp).trim();

      236.if(tempStr==null || tempStr.equals("")) {

      237.log.error("received message is null");

      238.continue;

      239.}

      240.intStr = new String(tmp);

      241.int totalLength =Integer.parseInt(intStr);

      242.//----------------

      243.byte[] msgBytes = new byte[totalLength-4];

      244.inStream.read(msgBytes);

      245.String resultMsg = new String(headBytes)+ new

      String(msgBytes);

      246.//抽出消息ID

      247.String msgNo = resultMsg.substring(8, 8 + 24);

      248.Condition msglock =(Condition) recMsgMap.get(msgNo);

      249.if(msglock ==null) {

      250.log.warn(msgNo+"序號可能已被注銷!響應消息丟棄");

      251.recMsgMap.remove(msgNo);

      252.continue;

      253.}

      254.recMsgMap.put(msgNo, resultMsg);

      255.try{

      256.lock.lock();

      257.msglock.signalAll();

      258.}finally {

      259.lock.unlock();

      260.}

      261.}catch(SocketException e){

      262.log.error("服務端關閉socket",e);

      263.reConnectToCTCC();

      264.} catch(IOException e) {

      265.log.error("接收線程讀取響應數據時發生IO流異常",e);

      266.} catch(NumberFormatException e){

      267.log.error("收到沒良心包,String轉int異常,異常字符:"+intStr);

      268.}

      269.}

      270.}

      271.}

      272.}

    【如何解決Java Socket通信技術收發線程互斥】相關文章:

    PHP中如何使用socket進行通信08-21

    Java線程同步的方法10-25

    Java多線程的實現方式07-08

    java多線程面試題201710-03

    2016年java多線程面試題及答案07-02

    sun認證考試輔導:java關于多線程的部分操作07-27

    PHP socket的配置08-04

    超線程技術是什么意思09-09

    如何編譯java程序09-28

    如何讓JAVA代碼更高效07-18

    主站蜘蛛池模板: 亚洲欧洲精品无码AV| 欧美精品一本久久男人的天堂| 国产精品香港三级国产AV| 精品一区二区三区免费视频 | 国产精品一区二区久久精品无码| 亚洲精品无码专区久久久| 国产精品成人小电影在线观看| 大伊香蕉精品一区视频在线| 亚洲国模精品一区| 国产天天综合永久精品日| 久久久久夜夜夜精品国产| 国产成人精品优优av| 久久这里只有精品18| 亚洲国产精品第一区二区三区| 国产精品H片在线播放| 青青草原综合久久大伊人精品| 国产精品v片在线观看不卡| 亚洲一日韩欧美中文字幕欧美日韩在线精品一区二| www亚洲欲色成人久久精品| 国产在线拍揄自揄视精品不卡| 国产成人久久精品区一区二区| 亚洲国产精品无码久久久久久曰| 国内精品国语自产拍在线观看| 色偷偷888欧美精品久久久| 国产三级精品久久| 女人香蕉久久**毛片精品| 999在线视频精品免费播放观看| 亚洲AV无码成人精品区在线观看 | 欧美日韩精品系列一区二区三区国产一区二区精品 | 亚洲av午夜成人片精品网站| 无码国内精品久久人妻麻豆按摩 | 久久精品亚洲福利| 国产在线观看一区二区三区精品| 成人国产精品日本在线观看| 四虎国产精品免费观看| 亚洲欧美日韩精品久久| 亚洲精品国产成人专区| 99久久精品免费| 国产区精品福利在线观看精品| 狠狠精品干练久久久无码中文字幕| 国产精品成人69XXX免费视频|