////////////////////////////////////////////////////////////////////// // SPONHost.java // // --Host Process used to communicate on the SPON Network-- // // // // Supervised Peer-to-peer Overlay Network (SPON) Simulator // // // // // // Computer Science M.S.E. Project // // Programmed by: Joey Chau // // Copyright: February, 2003 // // // // Research Advisor: Christian Scheideler // // Computer Science // // Johns Hopkins University // ////////////////////////////////////////////////////////////////////// import java.io.*; import java.net.*; import java.util.*; /** The Actual Host process used to communicate on the SPON Network */ class SPONHost { static boolean debug = true; static void dbg(String str){ if(debug) { System.out.println(str); System.out.flush(); } } public static void main(String argv[]) throws Exception { // Broadcast Communication ServerSocket mServerSocket; // TCP Server Socket for communication from Parent or previous root node Socket mLeftChild = null; // Connection to Child Socket mRightChild = null; // Connection to Child Socket mNext = null; // Connection to Next Node for a root node Socket connectionSocket = null; boolean mLon = false; boolean mRon = false; boolean mNon = false; // Maintenance Communcation DatagramSocket mUDPSocket; // Display Frame TalkFrame frame = new TalkFrame(); // General Information Node myNode = new Node(); String sentence; BufferedReader inFromServer = null; BufferedReader inFromUser = new BufferedReader( new InputStreamReader(System.in)); InetAddress mSupervisorAddress; InetAddress mLocalAddress; String mMyName = null; int height; mLocalAddress = InetAddress.getLocalHost(); mSupervisorAddress = mLocalAddress; System.out.println( "Running SPON Host on <" + mLocalAddress + ">" ); mServerSocket = new ServerSocket( 7766 ); dbg( "Started Server: " + mServerSocket.toString() ); mUDPSocket = new DatagramSocket( 6677, mLocalAddress ); dbg( "Started Maintenance Socket: " + mUDPSocket.toString() ); //MEGA RUN LOOP while( true ) { // Join the SPON network int joinStat = 0; // 0=Failed, 1=Success, otherwise quit frame.setTitle( "SPON" ); frame.show(); while( joinStat != 1 ) { if( frame.readyToJoin() ) { InetAddress mJoinAddress; String[] joinInfo = frame.getJoinInfo(); mMyName = joinInfo[0]; sentence = joinInfo[1]; // JOIN PROCESS dbg("connecting to " + sentence + " ..."); try{ mJoinAddress = InetAddress.getByName( sentence ); joinStat = 1; } catch( Exception e ) { dbg( "Error finding host name " + e.getMessage() ); joinStat = 0; continue; } mJoinAddress = InetAddress.getByName( sentence ); // Datagram send int retry = 3; int timeoutLength = 3000; dbg( "Sending Join Information" ); String buffer = new String("JOIN>"); buffer = buffer + mLocalAddress.getHostAddress(); DatagramPacket mDPacket = new DatagramPacket( buffer.getBytes(), buffer.length(), mJoinAddress, 6677 ); DatagramPacket mReceivePacket = new DatagramPacket( new byte[75], 75); dbg( "Buffer: [" + buffer + "]" ); while( retry > 0 ) { dbg( "Sending: " + mDPacket ); // Diagnostic dbg( " Data: " + new String( mDPacket.getData() ) ); dbg( " Length: " + mDPacket.getLength() ); // try{ mUDPSocket.setSoTimeout( timeoutLength ); mUDPSocket.send( mDPacket ); mUDPSocket.receive( mReceivePacket ); break; } catch( SocketTimeoutException e ) { retry--; joinStat = 0; dbg( "Time Out!"); } catch( Exception e ) { retry = 0; joinStat = 0; dbg( "Socket Failure" ); } } if( retry == 0 ) { joinStat = 0; continue; } dbg("Received " + mReceivePacket.getLength() + " bytes =>" + (new String(mReceivePacket.getData())) + " from " + mDPacket.getAddress() ); mSupervisorAddress = mReceivePacket.getAddress(); myNode.parseString( new String(mReceivePacket.getData(), 4, mReceivePacket.getLength()) ); //dbg( myNode.toString() ); // Set Values based on Node height = myNode.getHeight(); if( !myNode.getLeftChild().startsWith(" ") ) { dbg( "Left: " + myNode.getLeftChild() ); mLeftChild = new Socket( myNode.getLeftChild(), 7766 ); mLon = true; } else { dbg( "Left: " ); mLeftChild = null; mLon = false; } if( !myNode.getRightChild().startsWith(" ") ) { dbg( "Right: " + myNode.getRightChild() ); mRightChild = new Socket( myNode.getRightChild(), 7766 ); mRon = true; } else { dbg( "Right: " ); mRightChild = null; mRon = false; } if( !myNode.getNext().startsWith(" ") ) { dbg( "Next: " + myNode.getNext() ); mNext = new Socket( myNode.getNext(), 7766 ); mNon = true; } else { dbg( "Next: " ); mNext = null; mNon = false; } connectionSocket = mServerSocket.accept(); inFromServer = new BufferedReader( new InputStreamReader( connectionSocket.getInputStream() ) ); dbg( "Joined SPON Supervised by: " + mSupervisorAddress ); } } // SPON Communication Loop int TimeoutLength = 50; mUDPSocket.setSoTimeout( TimeoutLength ); frame.setTitle( "SPON " + mLocalAddress.getHostAddress() ); // Active State Flag boolean stillActive = true; boolean exitOnClose = false; while(true) { DatagramPacket mReceiveUDP = new DatagramPacket( new byte[75], 75); try { mUDPSocket.receive( mReceiveUDP ); String sMess = new String( mReceiveUDP.getData(), 0, mReceiveUDP.getLength() ); // Diagnostic dbg( " Diagnostic:"+sMess+" length "+sMess.length()+" "+mReceiveUDP.getLength() ); // InetAddress senderAddress = mReceiveUDP.getAddress(); StringTokenizer token = new StringTokenizer( new String( sMess ), ">" ); String messType = token.nextToken(); dbg("Received: " + sMess + " from " + senderAddress); if( messType.equals("JOIN") && mSupervisorAddress != mLocalAddress) { mReceiveUDP.setAddress( mSupervisorAddress ); mReceiveUDP.setPort( 6677 ); mUDPSocket.send( mReceiveUDP ); } else if( messType.equals("NEXT") ) { String newNext = token.nextToken(); if( !newNext.startsWith(" ") ) { dbg( "Next: " + newNext ); InetAddress nextAddress = InetAddress.getByName( newNext ); mNext = new Socket( nextAddress, 7766 ); mNon = true; myNode.setNext( newNext ); } else { dbg( "Next: " ); mNext = null; mNon = false; myNode.setNext( " " ); } mServerSocket.setSoTimeout( 200 ); try { connectionSocket = mServerSocket.accept(); inFromServer = new BufferedReader( new InputStreamReader( connectionSocket.getInputStream() ) ); } catch( SocketTimeoutException e ) { } } else if( messType.equals("NEW0") ) { myNode.parseString( token.nextToken() ); //dbg( myNode.toString() ); // Set Values based on Node height = myNode.getHeight(); if( !myNode.getLeftChild().startsWith(" ") ) { dbg( "Left: " + myNode.getLeftChild() ); mLeftChild = new Socket( myNode.getLeftChild(), 7766 ); mLon = true; } else { dbg( "Left: " ); mLeftChild = null; mLon = false; } if( !myNode.getRightChild().startsWith(" ") ) { dbg( "Right: " + myNode.getRightChild() ); mRightChild = new Socket( myNode.getRightChild(), 7766 ); mRon = true; } else { dbg( "Right: " ); mRightChild = null; mRon = false; } if( !myNode.getNext().startsWith(" ") ) { dbg( "Next: " + myNode.getNext() ); mNext = new Socket( myNode.getNext(), 7766 ); mNon = true; } else { dbg( "Next: " ); mNext = null; mNon = false; } } else if( messType.equals("LNXT") ) { String newNext = token.nextToken(); if( !newNext.startsWith(" ") ) { dbg( "Next: " + newNext ); InetAddress nextAddress = InetAddress.getByName( newNext ); mNext = new Socket( nextAddress, 7766 ); mNon = true; myNode.setNext( newNext ); } else { dbg( "Next: " ); mNext = null; mNon = false; myNode.setNext( " " ); } } else if( messType.equals("PARN") ) { mServerSocket.setSoTimeout( 1000 ); try { connectionSocket = mServerSocket.accept(); inFromServer = new BufferedReader( new InputStreamReader( connectionSocket.getInputStream() ) ); } catch( SocketTimeoutException e ) { } } else if( messType.equals("NEWX") ) { myNode.parseString( token.nextToken() ); //dbg( myNode.toString() ); // Set Values based on Node height = myNode.getHeight(); if( !myNode.getLeftChild().startsWith(" ") ) { dbg( "Left: " + myNode.getLeftChild() ); mLeftChild = new Socket( myNode.getLeftChild(), 7766 ); mLon = true; } else { dbg( "Left: " ); mLeftChild = null; mLon = false; } if( !myNode.getRightChild().startsWith(" ") ) { dbg( "Right: " + myNode.getRightChild() ); mRightChild = new Socket( myNode.getRightChild(), 7766 ); mRon = true; } else { dbg( "Right: " ); mRightChild = null; mRon = false; } if( !myNode.getNext().startsWith(" ") ) { dbg( "Next: " + myNode.getNext() ); mNext = new Socket( myNode.getNext(), 7766 ); mNon = true; } else { dbg( "Next: " ); mNext = null; mNon = false; } // Wait for new Parent mServerSocket.setSoTimeout( 1000 ); try { connectionSocket = mServerSocket.accept(); inFromServer = new BufferedReader( new InputStreamReader( connectionSocket.getInputStream() ) ); } catch( SocketTimeoutException e ) { } } else if( messType.equals("NEED") ) { String infoMess = "INFO>" + myNode.toString(); DatagramPacket infoPacket = new DatagramPacket( infoMess.getBytes(), infoMess.length(), mSupervisorAddress, 6677 ); mUDPSocket.send( infoPacket ); } else if( messType.equals("CHLD") ) { StringTokenizer subTok = new StringTokenizer( token.nextToken(), "+"); String repNode = subTok.nextToken(); String newChild = subTok.nextToken(); if( myNode.getLeftChild().equals( repNode ) ) { myNode.setLeftChild( newChild ); mLeftChild = new Socket( myNode.getLeftChild(), 7766 ); } else if( myNode.getRightChild().equals( repNode ) ) { myNode.setRightChild( newChild ); mRightChild = new Socket( myNode.getRightChild(), 7766 ); } } else if( messType.equals("EXIT") ) { dbg( "EXITING" ); if( stillActive ) { break; } else { System.exit(1); } } else { dbg( "Recieved unknown packet: " + mReceiveUDP ); } }// try catch( SocketTimeoutException e ) { } if( inFromUser.ready() ) { String userInput = inFromUser.readLine(); if( userInput.equals( "quit" ) ){ String leaveMess = "LEAV>" + myNode.toString() + "+"; if( mNon ) { leaveMess = leaveMess + " "; } else { leaveMess = leaveMess + connectionSocket.getInetAddress().getHostAddress(); } DatagramPacket leavePacket = new DatagramPacket( leaveMess.getBytes(), leaveMess.length(), mSupervisorAddress, 6677 ); mUDPSocket.send( leavePacket ); } else if( userInput.equals( "diagnostic" ) ) { dbg( "Node Status "+myNode.toString() ); dbg( "Parent "+connectionSocket.getInetAddress().getHostAddress() ); } /* Write out operation else { Socket mySupervisor = new Socket( mSupervisorAddress, 7766 ); DataOutputStream outCom = new DataOutputStream( mySupervisor.getOutputStream() ); outCom.writeBytes( userInput + "<=" + mLocalAddress.getHostAddress() + "\n" ); mySupervisor.close(); } */ } if( frame.close() && stillActive ) { String leaveMess = "LEAV>" + myNode.toString() + "+"; if( mNon ) { leaveMess = leaveMess + " "; } else { leaveMess = leaveMess + connectionSocket.getInetAddress().getHostAddress(); } DatagramPacket leavePacket = new DatagramPacket( leaveMess.getBytes(), leaveMess.length(), mSupervisorAddress, 6677 ); mUDPSocket.send( leavePacket ); stillActive = false; } if( frame.readyToLeave() && stillActive ) { String leaveMess = "LEAV>" + myNode.toString() + "+"; if( mNon ) { leaveMess = leaveMess + " "; } else { leaveMess = leaveMess + connectionSocket.getInetAddress().getHostAddress(); } DatagramPacket leavePacket = new DatagramPacket( leaveMess.getBytes(), leaveMess.length(), mSupervisorAddress, 6677 ); mUDPSocket.send( leavePacket ); frame.leaveMe(); } if( frame.ready() ) { Socket mySupervisor = new Socket( mSupervisorAddress, 7766 ); DataOutputStream outCom = new DataOutputStream( mySupervisor.getOutputStream() ); outCom.writeBytes( mMyName + ":" + frame.getInfo() + "\n" ); mySupervisor.close(); } if( inFromServer.ready() ) { String incoming = inFromServer.readLine(); frame.update( incoming ); System.out.println( "Broadcast: " + incoming ); if( mLon ) { DataOutputStream outCom = new DataOutputStream( mLeftChild.getOutputStream() ); outCom.writeBytes( incoming + "\n" ); } if( mRon ) { DataOutputStream outCom = new DataOutputStream( mRightChild.getOutputStream() ); outCom.writeBytes( incoming + "\n" ); } if( mNon ) { DataOutputStream outCom = new DataOutputStream( mNext.getOutputStream() ); outCom.writeBytes( incoming + "\n" ); } } /* */ } // SPON Communication Loop dbg( "End of Communication" ); } } // Main method }