Adding Functional power in Java programs

I am digging the Functional style power in my everyday regular programming. I was implementing some failover connection setup in a pure Java code and thought about adding the ugly java command pattern as an alternative to closure. At the end, the code turned up not so bad so I thought I would share it.

Problem:

We have two Quentin Server – Server A & Server B (A kind of recording server used in broadcast industry). The QuentinApi abstracts the interaction with the server which maintains some persistent TCP connection. Now the requirement is, we have to keep talking to Server A. However, if the connection is not valid / or it went down for some reason, we need to switch back to Server B.

Here is how the Quentin Api Looks like:

/**
 * 
 * Api to create/update/delete clips on Quantel. Clips are actually created from the channel. The quantel system has a pool of server. 
 * Every server has a fixed number of channel. 
 * 
 * @author smoinuddin
 *
 */
public interface QuentinApi {
	
	public String createClip(String clipName, int server, int channel, DateTime startTime, DateTime endTime) throws QuantelServerLookupFailedException, QuentinApiException;
	
	public void updateClip(String clipName, int server, int channel, DateTime startTime, DateTime endTime) throws QuentinApiException;
	
	public void cancelRecording(String clipName, int server, int channel) throws QuentinApiException;
      ...... // some more similar methods....	
}

Now all of these method should have the feature of failure detection and should revert to Server B whenever there is a problem. I have solved similar problem before with Java using Dynamic Proxy and Spring AOP, but I thought using Functional Closure style is simpler and easier to understand.

public class QuentinApiFailoverProxy implements QuentinApi {
	private static final Logger LOGGER = LoggerFactory.getLogger(QuentinApiFailoverProxy.class);
	
	private QuentinApiImpl quantelServerA;
	private QuentinApiImpl quantelServerB;
	
	private QuentinApiImpl currentConnection;
	
	public QuentinApiFailoverProxy(QuentinApiImpl quantelServerA,
			QuentinApiImpl quantelServerB) {
		super();
		this.quantelServerA = quantelServerA;
		this.quantelServerB = quantelServerB;
		
		this.currentConnection = quantelServerA;
	}
	
	private abstract class QuantelApiCall<T> {
		abstract T execute() throws QuentinApiException;
		
		public  String toString() {
			return null;
		}
	}
	
	private <T> T execute(QuantelApiCall<T> call) throws QuentinApiException{
		if(call.toString() != null) {
			LOGGER.debug("***executing Call: {}", call);
		}
		try {
			if(currentConnection.validate()) {
				return call.execute();
			} else {
				LOGGER.warn("Could not validate current connection: " + currentConnection);
				throw new QuantelCommunicationException("Could not validate current connection: " + currentConnection);
			}
		} catch (QuantelServerLookupFailedException | 
				QuantelCommunicationException | 
				NullPointerException| 
				org.omg.CORBA.COMM_FAILURE |
				org.omg.CORBA.OBJECT_NOT_EXIST e) {
			
			LOGGER.warn("Corba Failed to talk to {}", currentConnection, e);
			resetConnections();
			LOGGER.warn("Trying to talk to {}", currentConnection);
			
			return call.execute();
		} finally {
			if(call.toString() != null) {
				LOGGER.debug("***finished executing call: " + call);
			}
		}
	}
	
	private synchronized void resetConnections() throws QuantelCommunicationException {
		boolean validCurrentConnection = false;
		try {
			currentConnection.reset();
			if(currentConnection.validate()) {
				validCurrentConnection = true;
			}
		} catch (Exception e) {	}
		
		if(!validCurrentConnection) {
			currentConnection = getAlternative();			
			currentConnection.reset();
			if(!currentConnection.validate()) {
				LOGGER.warn("Failed to talk to main connection & alternative connection . Alternative connection just tried = {}" , currentConnection);
				throw new QuantelCommunicationException("Failed to talk to main connection & alternative connection . Alternative connection just tried: " + currentConnection);
			}
		} else {
			LOGGER.warn("re-initiated the current connection to {}");
		}
		
	}

	private QuentinApiImpl getAlternative() {
		if(currentConnection.equals(quantelServerA)) {
			return quantelServerB;
		} else {
			return quantelServerA;
		}
	}    

	@Override
	public String createClip(final String clipName, final int serverId, final int channel,
			final DateTime startTime,final  DateTime endTime)
			throws QuantelServerLookupFailedException, QuentinApiException {
		
		return execute(new QuantelApiCall<String>() {
			@Override
			String execute() throws QuentinApiException {
				return getCurrentConnection().createClip(clipName, serverId, channel, startTime, endTime);
			}

			@Override
			public String toString() {
				return "createClip: " + clipName ;
			}});		
	}

	@Override
	public void updateClip(final String clipName,final  int server,final  int channel,
			final DateTime startTime, final DateTime endTime) throws QuentinApiException {
		execute(new QuantelApiCall<Object>() {
			@Override
			Object execute() throws QuentinApiException {
				getCurrentConnection().updateClip(clipName, server, channel, startTime, endTime);
				return null;
			}

			@Override
			public String toString() {
				return "Update Clip: " + clipName;
			}});	
	}

	private QuentinApiImpl synchronized  getCurrentConnection() {
		return currentConnection;
	}

.................. and so on.
}

Notice how the execute method wraps each method call and adds exception handling, reset connection, try alternative connection feature. Introducing the QuantelApiCall lets us bundle our code and pass it as an argument to another method (Code as object). Again the execute() call does not take any parameter, instead it uses static binding functionality with the local variable to carry them over to the new context.

One thing you need to be cautious is to use the getCurrentConnection() in the closures to enable java dynamic binding. If I used the currentConnection instead, the currentConnection at the time of ‘Closing’ (aka Creating the closure object) would be used everytime you call execute(). But methods are always dynamically bounded and will be reevaluated with every call of the execute().

Again, the code can be called by multiple thread at the same time so you need synchronization in the reset() call (You don’t want two threads trying to reset the connection at the same time). Other interesting thing is you need ‘synchronized’ on the getCurrentConnection to avoid Thread data caching issue. I have noticed a lot of java developer mistakenly think synchronized is only used for mutual exclusion but it is also necessary for visibility of the data.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s