////////////////////////////////////////////////////////////////////// // SPONSupervisor.java // // --Supervisor Process used to maintain the SPON Network-- // // // // Supervised Peer-to-peer Overlay Network (SPON) Simulator // // // // // // Computer Science M.S.E. Project // // Programmed by: Joey Chau // // Copyright: February, 2003 // // // // Research Advisor: Christian Scheideler // // Computer Science // // Johns Hopkins University // ////////////////////////////////////////////////////////////////////// import java.io.*; import java.net.*; import java.util.*; /** The Actual Supervisor process used to maintain the SPON Network */ class SPONSupervisor { static boolean debug = true; static void dbg(String str){ if(debug) { System.out.println(str); System.out.flush(); } } public static void main(String argv[]) throws Exception { // Broadcast Communication Socket mTopRoot = null; Socket mBottomRoot = null; ServerSocket mServerSocket; boolean mTon = false; // boolean marker for whether Top Node is on // Maintenance Communication DatagramSocket mUDPSocket; RootVector myRoot; // General Information BufferedReader inFromUser = new BufferedReader( new InputStreamReader(System.in)); InetAddress mLocalAddress; mLocalAddress = InetAddress.getLocalHost(); System.out.println( "Running SPON Supervisor on <" + mLocalAddress + ">" ); mServerSocket = new ServerSocket( 7766 ); mServerSocket.setSoTimeout( 50 ); dbg( "Started Server: " + mServerSocket.toString() ); myRoot = new RootVector( mLocalAddress.getHostAddress() ); mUDPSocket = new DatagramSocket( 6677, mLocalAddress ); dbg( "Started Maintenance Socket: " + mUDPSocket.toString() ); while(true) { DatagramPacket mDPacket = new DatagramPacket( new byte[75], 75); String sMess; StringTokenizer token; String messType; InetAddress senderAddress; int senderPort; int TimeoutLength = 50; mUDPSocket.setSoTimeout( TimeoutLength ); try { mUDPSocket.receive( mDPacket ); sMess = new String( mDPacket.getData(), 0, mDPacket.getLength() ); // Diagnostic dbg( " Diagnostic:"+sMess+" length "+sMess.length()+" "+mDPacket.getLength() ); // senderAddress = mDPacket.getAddress(); token = new StringTokenizer( new String( sMess ), ">" ); messType = token.nextToken(); dbg("Received: " + sMess + " from " + senderAddress); if( messType.equals("JOIN") ) { // Join Stuff String comString = new String(); String newNode = token.nextToken(); dbg("New Node: " + newNode ); senderAddress = InetAddress.getByName( newNode ); comString = myRoot.join( newNode ); StringTokenizer comStack = new StringTokenizer( comString, "|" ); while( comStack.hasMoreTokens() ) { String newInfo = comStack.nextToken(); dbg( "Token: " + newInfo + " length " + newInfo.length() ); StringTokenizer comToken = new StringTokenizer( newInfo, "," ); String cMess = comToken.nextToken(); String cDest = comToken.nextToken(); dbg( "MESSAGE: " + cMess + " len: " + cMess.length() ); InetAddress destAddy = InetAddress.getByName( cDest ); DatagramPacket cPacket = new DatagramPacket( cMess.getBytes(), cMess.length(), destAddy, 6677 ); mUDPSocket.send( cPacket ); } dbg( myRoot.toString() ); } else if( messType.equals( "TOP0" ) ) { String newNode = token.nextToken(); if( newNode.equals(" ") ) { mTon = false; } else { InetAddress newTop = InetAddress.getByName( newNode ); mTopRoot = new Socket( newTop, 7766 ); dbg( "Connected new Top Node: " + mTopRoot.toString() ); mTon = true; } } else if( messType.equals( "LEAV" ) ) { String comString = new String(); StringTokenizer reqLeaveTok = new StringTokenizer ( token.nextToken(),"+" ); Node byeNode = new Node(); byeNode.parseString( reqLeaveTok.nextToken() ); String parentNode = reqLeaveTok.nextToken(); dbg( "Host: " + byeNode + " with Parent " + parentNode + ", requesting to Leave" ); comString = myRoot.replace( byeNode, parentNode ); StringTokenizer comStack = new StringTokenizer( comString, "|" ); while( comStack.hasMoreTokens() ) { String newInfo = comStack.nextToken(); dbg( "Token: " + newInfo + " length " + newInfo.length() ); StringTokenizer comToken = new StringTokenizer( newInfo, "," ); String cMess = comToken.nextToken(); String cDest = comToken.nextToken(); dbg( "MESSAGE: " + cMess + " len: " + cMess.length() ); InetAddress destAddy = InetAddress.getByName( cDest ); DatagramPacket cPacket = new DatagramPacket( cMess.getBytes(), cMess.length(), destAddy, 6677 ); mUDPSocket.send( cPacket ); } dbg( myRoot.toString() ); } else if( messType.equals( "INFO" ) ) { String infoNode = token.nextToken(); Node newInfo = new Node(); newInfo.parseString( infoNode ); myRoot.update( newInfo ); } else { dbg( "Recieved unknown packet: " + mDPacket ); } } // try catch( SocketTimeoutException e ) { } if( inFromUser.ready() ) { String userInput = inFromUser.readLine(); if( userInput.equals( "diagnostic" ) ) { dbg( "Root Status "+myRoot.toString() ); } else { if( mTon ) { DataOutputStream outCom = new DataOutputStream( mTopRoot.getOutputStream() ); outCom.writeBytes( "[SUPERVISOR]:" + userInput + "\n" ); } } } mServerSocket.setSoTimeout( TimeoutLength ); try { Socket connectionSocket = mServerSocket.accept(); BufferedReader inFromServer = new BufferedReader( new InputStreamReader( connectionSocket.getInputStream() ) ); //if( inFromServer.ready() ) { String incoming = inFromServer.readLine(); System.out.println( "Broadcast: " + incoming ); if( myRoot.size() > 0 && mTon) { DataOutputStream outCom = new DataOutputStream( mTopRoot.getOutputStream() ); outCom.writeBytes( incoming + "\n" ); } //} connectionSocket.close(); } catch( SocketTimeoutException e ) { } } } }