Sunday, March 21, 2010

Amazon FPS library for the Google App Engine environment

Here is a post on Amazon Flexible Payments System (FPS) and the beauty of the open source model!

Amazon FPS as the payment platform

!twetailer is an application connecting people to people, consumers to retailers and vice-versa. The business model is freemiun-based:
  • Posting demands, receiving proposals, and accepting proposals is available for free to any customers.
  • Listening to demands, proposing products or services, and closing deals is available for free to any registered retailers.
  • At one point, !twetailer is going to offer retailers to ask consumers to pay for the accepted products or services. The payment platform is going to be Amazon FPS1.

Amazon FPS is a really neat platform, the only one to my knowledge allowing to organize money transfers between third-parties. With Amazon FPS, !twetailer will be able to convey money from the consumers directly to the retailers, without the money transiting on !twetailer own account! This is a really safe mechanism.

As a quick introduction to Amazon FPS, I would strongly suggest you listen to that one hour webcast introduced on Amazon Payments blog on April 7, 2009: Monetize your innovation with Amazon FPS. If you use the open-source tool VideoLAN VLC, you can load the ASX file directly from Akamai from here.

Amazon and the open-source model

Amazon FPS, as many others Amazon Web Services (AWS), allows third-party applications to use its services through very simple APIs which are HTTP based! The libraries that developers need to use are mostly wrappers over HTTP connections with some specific controllers formatting the requests and signing them (to avoid a man-in-the-middle process tampering them).

Because HTTP is an open protocol and because Amazon could not probably develop its libraries for all possible Web servers, Amazon opened the libraries' source and attached to them a very liberal license2.

This is a very respectable attitude regarding their customers and also very well thought on the business-side: if developers can adopt their libraries for their own needs, Amazon won't have to pay for the corresponding development and it will enlarge the set of applications their platform can serve!

Amazon FPS on Google App Engine platform

The !twetailer server-side logic is Java based and dropping the Amazon FPS library freshly compiled in war/WEB-INF/lib is simple. However, the Amazon FPS code cannot run as-is because of few App Engine limitations...

The first one is encountered when the application needs to build the URL that will launch the co-branded service, a page that will allow consumers to pay for the service or product previously proposed by a retailer.
The static method HttpURLConnection.setFollowRedirects(boolean) controls the VM behavior and is then guarded by a JVM permission.
Read the incident report in the Google App Engine discussion group.

Fixing this issue is simple: tune the ability to follow redirection on the connection itself instead of applying the settings globally.

The second issue is really major:
The library uses the Jakarta Commons HttpClient component to convey payment requests from the application to the Amazon infrastructure. And many of its underlying calls are blocked in Google App Engine Java environment.
I asked for advices on AWS FPS forums. But without response, I have decided to go with my own wrapper of the Google URL Fetch mimicking the HttpClient HttpConnectionManager and HttpConnection classes.

Wrappers of Google URL Fetch for Amazon FPS

Following Amazon's leadership, I offer the URL Fetch wrappers that allows Amazon FPS to work on Google App Engine platform:
The code currently available works in the simple scenario !twetailer needs. But it is still under development. And the test suite covering it is not yet completed.

UrlFetchConnectionManager class definition
/******************************************************************************* 
 *  Adaptation for the Amazon FPS library to work on the Java platform of
 *  Google App Engine.
 *  
 *  Copyright 2010 Dom Derrien
 *  Licensed under the Apache License, Version 2.0
 */

package domderrien.wrapper.UrlFetch;

import org.apache.commons.httpclient.ConnectionPoolTimeoutException;
import org.apache.commons.httpclient.HostConfiguration;
import org.apache.commons.httpclient.HttpConnection;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.httpclient.HttpException;
import org.apache.commons.httpclient.params.HttpConnectionManagerParams;

public class UrlFetchConnectionManager implements HttpConnectionManager {

 private HttpConnectionManagerParams params;
 private HttpConnection connection;
 
 public void closeIdleConnections(long timeout) {
  throw new RuntimeException("closeIdleConnections(long)");
 }

 public HttpConnection getConnection(HostConfiguration hostConfiguration) {
  throw new RuntimeException("getConnection(HostConfiguration)");
  // return null;
 }

 public HttpConnection getConnection(HostConfiguration hostConfiguration, long timeout) throws HttpException {
  throw new RuntimeException("getConnection(HostConfiguration, long)");
  // return null;
 }

 public HttpConnection getConnectionWithTimeout(HostConfiguration hostConfiguration, long timeout) throws ConnectionPoolTimeoutException {
  // As reported in http://code.google.com/appengine/docs/java/urlfetch/usingjavanet.html#Java_Net_Features_Not_Supported
  // > The app cannot set explicit connection timeouts for the request.
  if (connection != null) {
   releaseConnection(connection);
  }
  connection = new UrlFetchHttpConnection(hostConfiguration);
  return connection;
 }

 public HttpConnectionManagerParams getParams() {
  return params;
 }

 public void releaseConnection(HttpConnection connection) {
  connection.releaseConnection();
 }

 public void setParams(HttpConnectionManagerParams params) {
  // Parameters set in AmazonFPSClient#configureHttpClient:
        // - ConnectionTimeout: 50000 ms
  // - SoTimeout: 50000 ms
  // - StaleCheckingEnabled: true
  // - TcpNoDelay: true
  // - MaxTotalConnections: 100 (as proposed in the default config.properties file)
  // - MaxConnectionsPerHost: 100 (as proposed in the default config.properties file)

  this.params = params;
 }
 
}

UrlFetchConnection class definition
/******************************************************************************* 
 *  Adaptation for the Amazon FPS library to work on the Java platform of
 *  Google App Engine.
 *  
 *  Copyright 2010 Dom Derrien
 *  Licensed under the Apache License, Version 2.0
 */

package domderrien.wrapper.UrlFetch;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.Socket;
import java.net.SocketException;
import java.net.URL;

import javamocks.io.MockInputStream;
import javamocks.io.MockOutputStream;

import org.apache.commons.httpclient.HostConfiguration;
import org.apache.commons.httpclient.HttpConnection;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.params.HttpConnectionParams;
import org.apache.commons.httpclient.protocol.Protocol;

import com.google.appengine.api.urlfetch.FetchOptions;
import com.google.appengine.api.urlfetch.HTTPHeader;
import com.google.appengine.api.urlfetch.HTTPMethod;
import com.google.appengine.api.urlfetch.HTTPRequest;
import com.google.appengine.api.urlfetch.HTTPResponse;
import com.google.appengine.api.urlfetch.URLFetchService;
import com.google.appengine.api.urlfetch.URLFetchServiceFactory;

public class UrlFetchHttpConnection extends HttpConnection {

 private static URLFetchService urlFS = URLFetchServiceFactory.getURLFetchService();

 private HostConfiguration hostConfiguration;
 private HTTPRequest _request;
 private HTTPResponse _response;
 private MockOutputStream _requestBody = new MockOutputStream();
 private MockInputStream _responseBody = new MockInputStream();

 
 private HTTPRequest getRequest() throws MalformedURLException {
  if (_request == null) {
   _request = new HTTPRequest(
     new URL(hostConfiguration.getHostURL()), 
     HTTPMethod.POST, // AmazonFPSClient#invoke(Class, Map) uses only POST method
     FetchOptions.Builder.disallowTruncate().followRedirects()
   );
  }
  return _request;
 }
 
 private static final String SEPARATOR = ": ";
 private static final int SEPARATOR_LENGTH = SEPARATOR.length();
 private static final String NEW_LINE = "\r\n";

 private HTTPResponse getResponse() throws MalformedURLException, IOException {
  if (_response == null) {
   // Get the response from the remote service
   _response = urlFS.fetch(getRequest());
   // Rebuild stream of HTTP headers (except the HTTP status retrieved from readLine(String) method)
   StringBuilder buffer = new StringBuilder();
   for (HTTPHeader header: _response.getHeaders()) {
    buffer.append(header.getName()).append(SEPARATOR).append(header.getValue()).append(NEW_LINE);
   }
   buffer.append("Content-Length: ").append(_response.getContent().length).append(NEW_LINE);
   buffer.append(NEW_LINE);
   // Rebuild stream of HTTP content (chunked-encoded)
   buffer.append(Integer.toString(_response.getContent().length, 16)).append(";chunk size").append(NEW_LINE);
   buffer.append(new String(_response.getContent())).append(NEW_LINE);
   buffer.append("0;").append(NEW_LINE);
   _responseBody.resetActualContent(buffer.toString());
  }
  return _response;
 }

 /**
  * Default constructor
  * @param hostConfiguration
  */
 public UrlFetchHttpConnection(HostConfiguration hostConfiguration) {
  super(hostConfiguration);
  this.hostConfiguration = hostConfiguration;
 }

 @Override
 protected void assertNotOpen() throws IllegalStateException {
  throw new RuntimeException("assertNotOpen()");
 }

 @Override
 protected void assertOpen() throws IllegalStateException {
  assert(_response != null);
 }

 @Override
 public void close() {
  // Nothing to do!
 }

 @Override
 public boolean closeIfStale() throws IOException {
  // Safe call, passed to the inherited method
  return super.closeIfStale();
 }

 @Override
 protected void closeSocketAndStreams() {
  throw new RuntimeException("closeSocketAndStreams()");
 }

 @Override
 public void flushRequestOutputStream() throws IOException {
  getRequest().setPayload(_requestBody.getStream().toString().getBytes());
 }

 @Override
 public String getHost() {
  return hostConfiguration.getHost();
 }

 @Override
 public HttpConnectionManager getHttpConnectionManager() {
  throw new RuntimeException("getHttpConnectionManager()");
 }

 @Override
 public InputStream getLastResponseInputStream() {
  throw new RuntimeException("getLastResponseInputStream()");
 }

 @Override
 public InetAddress getLocalAddress() {
  throw new RuntimeException("getLocalAddress()");
 }

 @Override
 public HttpConnectionParams getParams() {
  return new HttpConnectionParams();
 }

 @Override
 public int getPort() {
  return hostConfiguration.getPort();
 }

 @Override
 public Protocol getProtocol() {
  return hostConfiguration.getProtocol();
 }

 @Override
 public String getProxyHost() {
  throw new RuntimeException("getProxyHost()");
 }

 @Override
 public int getProxyPort() {
  throw new RuntimeException("getProxyPort()");
 }

 @Override
 public OutputStream getRequestOutputStream() throws IOException, IllegalStateException {
  return _requestBody;
 }

 @Override
 public InputStream getResponseInputStream() throws IOException {
  return _responseBody;
 }

 @Override
 public int getSendBufferSize() throws SocketException {
  throw new RuntimeException("getSendBufferSize()");
 }

 @Override
 protected Socket getSocket() {
  throw new RuntimeException("getSocket()");
 }

 @Override
 public int getSoTimeout() throws SocketException {
  throw new RuntimeException("getSoTimeout()");
 }

 @Override
 public String getVirtualHost() {
  throw new RuntimeException("getVirtualHost()");
 }

 @Override
 protected boolean isLocked() {
  throw new RuntimeException("isLocked()");
 }

 @Override
 public boolean isOpen() {
  // Safe call, passed to inherited method
  return super.isOpen();
 }

 @Override
 public boolean isProxied() {
  // Safe call, passed to inherited method
  return super.isProxied();
 }

 @Override
 public boolean isResponseAvailable() throws IOException {
  return _response != null;
 }

 @Override
 public boolean isResponseAvailable(int timeout) throws IOException {
  return _response != null;
 }

 @Override
 public boolean isSecure() {
  return hostConfiguration.getPort() == 443;
 }

 @Override
 protected boolean isStale() throws IOException {
  throw new RuntimeException("isStale()");
 }

 @Override
 public boolean isStaleCheckingEnabled() {
  throw new RuntimeException("isStaleCheckingEnabled()");
 }

 @Override
 public boolean isTransparent() {
  // Safe call, passed to the inherited method
  return super.isTransparent();
 }
 
 @Override
 public void open() throws IOException {
  // Nothing to do
 }

 @Override
 public void print(String data, String charset) throws IOException, IllegalStateException {
  // Save the passed HTTP headers for the request
  int idx = data.indexOf(SEPARATOR);
  if (idx != -1) {
   String name = data.substring(0, idx);
   String value = data.substring(idx + SEPARATOR_LENGTH).trim();
   getRequest().addHeader(new HTTPHeader(name, value));
  }
  // Other information are just ignored safely 
 }

 @Override
 public void print(String data) throws IOException, IllegalStateException {
  throw new RuntimeException("print(string): " + data);
 }

 @Override
 public void printLine() throws IOException, IllegalStateException {
  throw new RuntimeException("printLine()");
 }

 @Override
 public void printLine(String data, String charset) throws IOException, IllegalStateException {
  throw new RuntimeException("printLine(string, String): " + data + " -- " + charset);
 }

 @Override
 public void printLine(String data) throws IOException, IllegalStateException {
  throw new RuntimeException("printLine(string): " + data);
 }

 @Override
 public String readLine() throws IOException, IllegalStateException {
  throw new RuntimeException("readLine()");
 }

 private boolean waitForHttpStatus = true;
 
 @Override
 public String readLine(String charset) throws IOException, IllegalStateException {
  if (waitForHttpStatus) {
   // Dom Derrien: called only once to get the HTTP status, other information being read from the response output stream
   int responseCode = getResponse().getResponseCode();
   String line = "HTTP/1.1 " + responseCode;
   switch(responseCode) {
    case HttpStatus.SC_OK: line += " OK"; break;
    case HttpStatus.SC_BAD_REQUEST: line += " BAD REQUEST"; break;
    case HttpStatus.SC_UNAUTHORIZED: line += " UNAUTHORIZED"; break;
    case HttpStatus.SC_FORBIDDEN: line += " FORBIDDEN"; break;
    case HttpStatus.SC_NOT_FOUND: line += " NOT FOUND"; break;
    case HttpStatus.SC_INTERNAL_SERVER_ERROR: line += " INTERNAL SERVER ERROR"; break;
    case HttpStatus.SC_SERVICE_UNAVAILABLE: line += " SERVICE UNAVAILABLE"; break;
    default: line = "HTTP/1.1 " + HttpStatus.SC_BAD_REQUEST + " BAD REQUEST";
   }
   waitForHttpStatus = false;
   return line;
  }
  throw new RuntimeException("readLine(String)");
 }

 @Override
 public void releaseConnection() {
  // Do nothing, connection closed automatically...
 }

 @Override
 public void setConnectionTimeout(int timeout) {
  throw new RuntimeException("setConnectionTimeout(int)");
 }

 @Override
 public void setHost(String host) throws IllegalStateException {
  throw new RuntimeException("setHost(String");
 }

 @Override
 public void setHttpConnectionManager(HttpConnectionManager httpConnectionManager) {
  throw new RuntimeException("setHttpConnectionManager(HttpConnectionManager");
 }

 @Override
 public void setLastResponseInputStream(InputStream inStream) {
  // Safe call, passed to inherited method
  super.setLastResponseInputStream(inStream);
 }

 @Override
 public void setLocalAddress(InetAddress localAddress) {
  throw new RuntimeException("setLocalAddress(InetAddress)");
 }

 @Override
 protected void setLocked(boolean locked) {
  // Safe call, passed to inherited method
  super.setLocked(locked);
 }

 @Override
 public void setParams(HttpConnectionParams params) {
  throw new RuntimeException("setParams(HttpConnectionParams)");
 }

 @Override
 public void setPort(int port) throws IllegalStateException {
  throw new RuntimeException("setPort(int)");
 }

 @Override
 public void setProtocol(Protocol protocol) {
  throw new RuntimeException("setProtocol(Protocol)");
 }

 @Override
 public void setProxyHost(String host) throws IllegalStateException {
  throw new RuntimeException("setProxyHost(String)");
 }

 @Override
 public void setProxyPort(int port) throws IllegalStateException {
  throw new RuntimeException("setProxyPort(int)");
 }

 @Override
 public void setSendBufferSize(int sendBufferSize) throws SocketException {
  throw new RuntimeException("setSendBufferSize(int)");
 }

 @Override
 public void setSocketTimeout(int timeout) throws SocketException, IllegalStateException {
  // Safe call, passed to inherited method
  super.setSocketTimeout(timeout);
 }

 @Override
 public void setSoTimeout(int timeout) throws SocketException, IllegalStateException {
  throw new RuntimeException("setSoTimeout(int)");
 }

 @Override
 public void setStaleCheckingEnabled(boolean staleCheckEnabled) {
  throw new RuntimeException("setStaleCheckingEnabled(boolean)");
 }

 @Override
 public void setVirtualHost(String host) throws IllegalStateException {
  throw new RuntimeException("setVirtualHost(String)");
 }

 @Override
 public void shutdownOutput() {
  throw new RuntimeException("shutdownOutput()");
 }

 @Override
 public void tunnelCreated() throws IllegalStateException, IOException {
  throw new RuntimeException("tunnelCreated()");
 }

 @Override
 public void write(byte[] data, int offset, int length) throws IOException, IllegalStateException {
  throw new RuntimeException("write(byte[], int, int): " + new String(data) + ", " + offset + ", " + length);
 }

 @Override
 public void write(byte[] data) throws IOException, IllegalStateException {
  throw new RuntimeException("write(byte[]): " + new String(data));
 }

 @Override
 public void writeLine() throws IOException, IllegalStateException {
  // Safe call, new line being inserted automatically by the HTTPRequest renderer
 }

 @Override
 public void writeLine(byte[] data) throws IOException, IllegalStateException {
  throw new RuntimeException("writeLine(byte[]): " + new String(data));
 }
}

Anyone is free to fork it for his own needs. Be careful with the code because I deliver it without warranties! If you have issues to report, if you can document how to reproduce them, depending on my workload, I will help you. If you fix the issue on your side, I will be happy to merge the corresponding patches into my main branch.

I hope this helps,
A+, Dom
--
Notes:
  1. At least in United States of America until Amazon extends its coverage to company without a US bank account.
  2. Apache License, Version 2.0, January 2004, which allows users to make modifications while keeping them private.

1 comment:

  1. Dom, just to leave a note here that my additional fixes and few extra changes for the FPS library are described in my post http://radomirml.com/2010/10/18/amazon-fps-on-appengine

    ReplyDelete