<delect id="sj01t"></delect>
  1. <em id="sj01t"><label id="sj01t"></label></em>
  2. <div id="sj01t"></div>
    1. <em id="sj01t"></em>

            <div id="sj01t"></div>

            如何解決Java Socket通信技術收發線程互斥

            時間:2024-08-28 18:14:33 SUN認證 我要投稿
            • 相關推薦

            如何解決Java Socket通信技術收發線程互斥

              Java Socket通信技術在很長的時間里都在使用,在不少的程序員眼中都有很多高的評價。那么下面我們就看看如何才能掌握這門復雜的編程語言,希望大家在今后的Java Socket通信技術使用中有所收獲。

              下面就是Java Socket通信技術在解決收發線程互斥的代碼介紹。

              1.package com.bill99.svr;

              2.import java.io.IOException;

              3.import java.io.InputStream;

              4.import java.io.OutputStream;

              5.import java.net.InetSocketAddress;

              6.import java.net.Socket;

              7.import java.net.SocketException;

              8.import java.net.SocketTimeoutException;

              9.import java.text.SimpleDateFormat;

              10.import java.util.Date;

              11.import java.util.Properties;

              12.import java.util.Timer;

              13.import java.util.TimerTask;

              14.import java.util.concurrent.ConcurrentHashMap;

              15.import java.util.concurrent.TimeUnit;

              16.import java.util.concurrent.locks.Condition;

              17.import java.util.concurrent.locks.ReentrantLock;

              18.import org.apache.log4j.Logger;

              19./**

              20.*

            title: socket通信包裝類

             

              21.*

            Description:

             

              22.*

            CopyRight: CopyRight (c) 2009

             

              23.*

            Company: 99bill.com

             

              24.*

            Create date: 2009-10-14

             

              25.*author sunnylocus

              26. * v0.10 2009-10-14 初類

              27.* v0.11 2009-11-12 對命令收發邏輯及收發線程互斥機制進行了優化,

              處理命令速度由原來8~16個/秒提高到25~32個/秒

              28.*/ public class SocketConnection {

              29.private volatile Socket socket;

              30.private int timeout = 1000*10; //超時時間,初始值10秒

              31.private boolean isLaunchHeartcheck = false;//是否已啟動心跳檢測

              32.private boolean isNetworkConnect = false; //網絡是否已連接

              33.private static String host = "";

              34.private static int port;

              35.static InputStream inStream = null;

              36.static OutputStream outStream = null;

              37.private static Logger log =Logger.getLogger

              (SocketConnection.class);

              38.private static SocketConnection socketConnection = null;

              39.private static java.util.Timer heartTimer=null;

              40.//private final Map recMsgMap= Collections.

              synchronizedMap(new HashMap());

              41.private final ConcurrentHashMap recMsgMap

              = new ConcurrentHashMap();

              42.private static Thread receiveThread = null;

              43.private final ReentrantLock lock = new ReentrantLock();

              44.private SocketConnection(){

              45.Properties conf = new Properties();

              46.try {

              47.conf.load(SocketConnection.class.getResourceAsStream

              ("test.conf"));

              48.this.timeout = Integer.valueOf(conf.getProperty("timeout"));

              49.init(conf.getProperty("ip"),Integer.valueOf

              (conf.getProperty("port")));

              50.} catch(IOException e) {

              51.log.fatal("socket初始化異常!",e);

              52.throw new RuntimeException("socket初始化異常,請檢查配置參數");

              53.}

              54.}

              55./**

              56.* 單態模式

              57.*/

              58.public static SocketConnection getInstance() {

              59.if(socketConnection==null) {

              60.synchronized(SocketConnection.class) {

              61.if(socketConnection==null) {

              62.socketConnection = new SocketConnection();

              63.return socketConnection;

              64.}

              65.}

              66.}

              67.return socketConnection;

              68.}

              69.private void init(String host,int port) throws IOException {

              70.InetSocketAddress addr = new InetSocketAddress(host,port);

              71.socket = new Socket();

              72.synchronized (this) {

              73.log.info("【準備與"+addr+"建立連接】");

              74.socket.connect(addr, timeout);

              75.log.info("【與"+addr+"連接已建立】");

              76.inStream = socket.getInputStream();

              77.outStream = socket.getOutputStream();

              78.socket.setTcpNoDelay(true);//數據不作緩沖,立即發送

              79.socket.setSoLinger(true, 0);//socket關閉時,立即釋放資源

              80.socket.setKeepAlive(true);

              81.socket.setTrafficClass(0x04|0x10);//高可靠性和最小延遲傳輸

              82.isNetworkConnect=true;

              83.receiveThread = new Thread(new ReceiveWorker());

              84.receiveThread.start();

              85.SocketConnection.host=host;

              86.SocketConnection.port=port;

              87.if(!isLaunchHeartcheck)

              88.launchHeartcheck();

              89.}

              90.}

              91./**

              92.* 心跳包檢測

              93.*/

              94.private void launchHeartcheck() {

              95.if(socket == null)

              96.throw new IllegalStateException("socket is not

              established!");

              97.heartTimer = new Timer();

              98.isLaunchHeartcheck = true;

              99.heartTimer.schedule(new TimerTask() {

              100.public void run() {

              101.String msgStreamNo = StreamNoGenerator.getStreamNo("kq");

              102.int mstType =9999;//999-心跳包請求

              103.SimpleDateFormat dateformate = new SimpleDateFormat

              ("yyyyMMddHHmmss");

              104.String msgDateTime = dateformate.format(new Date());

              105.int msgLength =38;//消息頭長度

              106.String commandstr = "00" +msgLength + mstType + msgStreamNo;

              107.log.info("心跳檢測包 -> IVR "+commandstr);

              108.int reconnCounter = 1;

              109.while(true) {

              110.String responseMsg =null;

              111.try {

              112.responseMsg = readReqMsg(commandstr);

              113.} catch (IOException e) {

              114.log.error("IO流異常",e);

              115.reconnCounter ++;

              116.}

              117.if(responseMsg!=null) {

              118.log.info("心跳響應包 <- IVR "+responseMsg);

              119.reconnCounter = 1;

              120.break;

              121.} else {

              122.reconnCounter ++;

              123.}

              124.if(reconnCounter >3) {//重連次數已達三次,判定網絡連接中斷,

              重新建立連接。連接未被建立時不釋放鎖

              125.reConnectToCTCC(); break;

              126.}

              127.}

              128.}

              129.},1000 * 60*1,1000*60*2);

              130.}

              131./**

              132.* 重連與目標IP建立重連

              133.*/

              134.private void reConnectToCTCC() {

              135.new Thread(new Runnable(){

              136.public void run(){

              137.log.info("重新建立與"+host+":"+port+"的連接");

              138.//清理工作,中斷計時器,中斷接收線程,恢復初始變量

              139.heartTimer.cancel();

              140.isLaunchHeartcheck=false;

              141.isNetworkConnect = false;

              142.receiveThread.interrupt();

              143.try {

              144.socket.close();

              145.} catch (IOException e1) {log.error("重連時,關閉socket連

              接發生IO流異常",e1);}

              146.//----------------

              147.synchronized(this){

              148.for(; ;){

              149.try {

              150.Thread.currentThread();

              151.Thread.sleep(1000 * 1);

              152.init(host,port);

              153.this.notifyAll();

              154.break ;

              155.} catch (IOException e) {

              156.log.error("重新建立連接未成功",e);

              157.} catch (InterruptedException e){

              158.log.error("重連線程中斷",e);

              159.}

              160.}

              161.}

              162.}

              163.}).start();

              164.}

              165./**

              166.* 發送命令并接受響應

              167.* @param requestMsg

              168.* @return

              169.* @throws SocketTimeoutException

              170.* @throws IOException

              171.*/

              172.public String readReqMsg(String requestMsg) throws IOException {

              173.if(requestMsg ==null) {

              174.return null;

              175.}

              176.if(!isNetworkConnect) {

              177.synchronized(this){

              178.try {

              179.this.wait(1000*5); //等待5秒,如果網絡還沒有恢復,拋出IO流異常

              180.if(!isNetworkConnect) {

              181.throw new IOException("網絡連接中斷!");

              182.}

              183.} catch (InterruptedException e) {

              184.log.error("發送線程中斷",e);

              185.}

              186.}

              187.}

              188.String msgNo = requestMsg.substring(8, 8 + 24);//讀取流水號

              189.outStream = socket.getOutputStream();

              190.outStream.write(requestMsg.getBytes());

              191.outStream.flush();

              192.Condition msglock = lock.newCondition(); //消息鎖

              193.//注冊等待接收消息

              194.recMsgMap.put(msgNo, msglock);

              195.try {

              196.lock.lock();

              197.msglock.await(timeout,TimeUnit.MILLISECONDS);

              198.} catch (InterruptedException e) {

              199.log.error("發送線程中斷",e);

              200.} finally {

              201.lock.unlock();

              202.}

              203.Object respMsg = recMsgMap.remove(msgNo); //響應信息

              204.if(respMsg!=null &&(respMsg != msglock)) {

              205.//已經接收到消息,注銷等待,成功返回消息

              206.return (String) respMsg;

              207.} else {

              208.log.error(msgNo+" 超時,未收到響應消息");

              209.throw new SocketTimeoutException(msgNo+" 超時,未收到響應消息");

              210.}

              211.}

              212.public void finalize() {

              213.if (socket != null) {

              214.try {

              215.socket.close();

              216.} catch (IOException e) {

              217.e.printStackTrace();

              218.}

              219.}

              220.}

              221.//消息接收線程

              222.private class ReceiveWorker implements Runnable {

              223.String intStr= null;

              224.public void run() {

              225.while(!Thread.interrupted()){

              226.try {

              227.byte[] headBytes = new byte[4];

              228.if(inStream.read(headBytes)==-1){

              229.log.warn("讀到流未尾,對方已關閉流!");

              230.reConnectToCTCC();//讀到流未尾,對方已關閉流

              231.return;

              232.}

              233.byte[] tmp =new byte[4];

              234.tmp = headBytes;

              235.String tempStr = new String(tmp).trim();

              236.if(tempStr==null || tempStr.equals("")) {

              237.log.error("received message is null");

              238.continue;

              239.}

              240.intStr = new String(tmp);

              241.int totalLength =Integer.parseInt(intStr);

              242.//----------------

              243.byte[] msgBytes = new byte[totalLength-4];

              244.inStream.read(msgBytes);

              245.String resultMsg = new String(headBytes)+ new

              String(msgBytes);

              246.//抽出消息ID

              247.String msgNo = resultMsg.substring(8, 8 + 24);

              248.Condition msglock =(Condition) recMsgMap.get(msgNo);

              249.if(msglock ==null) {

              250.log.warn(msgNo+"序號可能已被注銷!響應消息丟棄");

              251.recMsgMap.remove(msgNo);

              252.continue;

              253.}

              254.recMsgMap.put(msgNo, resultMsg);

              255.try{

              256.lock.lock();

              257.msglock.signalAll();

              258.}finally {

              259.lock.unlock();

              260.}

              261.}catch(SocketException e){

              262.log.error("服務端關閉socket",e);

              263.reConnectToCTCC();

              264.} catch(IOException e) {

              265.log.error("接收線程讀取響應數據時發生IO流異常",e);

              266.} catch(NumberFormatException e){

              267.log.error("收到沒良心包,String轉int異常,異常字符:"+intStr);

              268.}

              269.}

              270.}

              271.}

              272.}

            【如何解決Java Socket通信技術收發線程互斥】相關文章:

            PHP中如何使用socket進行通信08-21

            Java線程同步的方法10-25

            Java多線程的實現方式07-08

            java多線程面試題201710-03

            2016年java多線程面試題及答案07-02

            sun認證考試輔導:java關于多線程的部分操作07-27

            PHP socket的配置08-04

            超線程技術是什么意思09-09

            如何編譯java程序09-28

            如何讓JAVA代碼更高效07-18

            <delect id="sj01t"></delect>
            1. <em id="sj01t"><label id="sj01t"></label></em>
            2. <div id="sj01t"></div>
              1. <em id="sj01t"></em>

                      <div id="sj01t"></div>
                      黄色视频在线观看