Throttled Producers and Consumers in Java

BlockingQueue queue = new ArrayBlockingQueue<>(QUEUE_MAXIMUM_SIZE);

producers:  queue.put(dbObject);

consumers: while (RUNNING) {  DBObject dbObject = queue.poll(1, TimeUnit.SECONDS); ...

When transferring large volumes of data from one type of database (Mongo for example), to another (DynamoDB), the fastest way is to do things in parallel. Reading from one Mongo cursor from one Mongo server in the cluster gives you limited performance. Writing to DynamoDB on one thread maxes out quickly to something like 100/s. I ended up creating many threads to read from Mongo in parallel on defined partitions to utilize all of the shards and replica sets in the cluster. I had these threads put items on the queue. Setting a max size on the BlockingQueue would throttle my reads to not flood the consumers and run out of memory. The consumers would read from the queue and eventually be told to stop when all of the producers were done. This model was very performant and easy to monitor. By monitoring the size of the queue, I could see if the consumers or producers were working faster. With the BlockingQueue, the producers were not allowed to out-pace the consumers. The consumers didn’t have any problems with this though since DynamoDB is way more scalable than Mongo.

Advertisements

AWS DynamoDB map object to Base64 encoded gzipped JSON in Java


Annotation in a DynamoDBTable class:
    @DynamoDBAttribute
    @DynamoDBMarshalling(marshallerClass=PojoMarshaller.class)
    private Pojo pojo;

public static class PojoMarshaller extends GzipJsonMarshaller<Pojo> { }

-------------------------------------------------------------------------------------------------------------

import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMarshaller;
import com.fasterxml.jackson.databind.*;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.io.IOUtils;

import java.io.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.zip.*;

import static com.amazonaws.util.Throwables.failure;

public class GzipJsonMarshaller<T extends Object> implements DynamoDBMarshaller<T> {

    private static final ObjectMapper mapper = new ObjectMapper();
    private static final ObjectWriter writer = mapper.writer();

    @Override
    public String marshall(T obj) {
        try {
            String plainJsonString = writer.writeValueAsString(obj);
            byte[] binaryBytes = compressString(plainJsonString).array();
            String base64BinaryString = Base64.encodeBase64String(binaryBytes);
            return base64BinaryString;
        } catch (Exception e) {
            throw failure(e, "Unable to marshall the instance of " + obj.getClass() + "into a string");
        }
    }

    @Override
    public T unmarshall(Class<T> clazz, String base64BinaryString) {
        try {
            byte[] binaryBytes = Base64.decodeBase64(base64BinaryString);
            String plainJsonString = uncompressString(ByteBuffer.wrap(binaryBytes));
            return mapper.readValue(plainJsonString, clazz);
        } catch (Exception e) {
            throw failure(e, "Unable to unmarshall the string " + base64BinaryString + "into " + clazz);
        }
    }

    public static ByteBuffer compressString(String input) throws IOException {
        ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
        GZIPOutputStream gzipOutput = new GZIPOutputStream(byteArrayOutput);
        gzipOutput.write(input.getBytes("UTF-8"));
        gzipOutput.finish();
        byte[] compressedBytes = byteArrayOutput.toByteArray();
        ByteBuffer buffer = ByteBuffer.wrap(compressedBytes);
        return buffer;
    }

    public static String uncompressString(ByteBuffer input) throws IOException {
        byte[] bytes = input.array();
        ByteArrayInputStream byteArrayInput = new ByteArrayInputStream(bytes);
        GZIPInputStream gzipInput = new GZIPInputStream(byteArrayInput);
        ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
        IOUtils.copy(gzipInput, byteArrayOutput);
        return new String(byteArrayOutput.toByteArray(), "UTF-8");
    }

}


Convert Mongo DBObject to POJO with Jongo


import org.jongo.ResultHandler;
import org.jongo.bson.Bson;
import org.jongo.bson.BsonDocument;
import org.jongo.marshall.Unmarshaller;
import org.jongo.marshall.jackson.JacksonEngine;
import org.jongo.marshall.jackson.configuration.Mapping;

...

    public Pojo getPojo(DBObject dbObject) {
        JacksonEngine engine = new JacksonEngine(new Mapping.Builder().build());
        ResultHandler<Pojo> handler = new UnmarshallingResultHandler<>(engine, Pojo.class); 
        Pojo pojo = handler.map(dbObject);
        return pojo;
    }

    public static class UnmarshallingResultHandler<T> implements ResultHandler<T> {
        private final Unmarshaller unmarshaller;
        private final Class<T> clazz;
        public UnmarshallingResultHandler(Unmarshaller unmarshaller, Class<T> clazz) {
            this.unmarshaller = unmarshaller;
            this.clazz = clazz;
        }
        public T map(DBObject result) {
            BsonDocument bsonDocument = Bson.createDocument(result);
            return unmarshaller.unmarshall(bsonDocument, clazz);
        }
    }

How to convert Unicode URL to ASCII in Java

URLs can be quite complex when you add Internationalized Domain Names (IDNs) and Unicode characters to to the path. Often you’ll want to view and store these in ASCII, so proper conversion can become important. After much searching, I couldn’t find a great way to convert an entire Unicode URL to ASCII. Most examples just convert the domain to punycode, but forget about the port, path, and query string. Most examples don’t cover the case when the provided URL doesn’t have a scheme on the front. I tried to incorporate all of those URL components. I wanted a flexible conversion so I came up with some working code that probably has flaws, but it works for most URL formats you will encounter and a large variety that I tested it with.

Here’s a good list of domains to test this with. You can add ports, unicode paths, unicode params, and encoded paths characters to these for additional testing.
https://blogs.msdn.microsoft.com/shawnste/2006/09/14/idn-test-urls/

package com.company.utils;

import java.net.*;

public class UnicodeUtil {
    public static String convertUnicodeURLToAscii(String url) throws URISyntaxException {
        if(url != null) {
            url = url.trim();
            // Handle international domains by detecting non-ascii and converting them to punycode
            boolean isAscii = CharMatcher.ASCII.matchesAllOf(url);
            if(!isAscii) {
                URI uri = new URI(url);
                boolean includeScheme = true;

                // URI needs a scheme to work properly with authority parsing
                if(uri.getScheme() == null) {
                    uri = new URI("http://" + url);
                    includeScheme = false;
                }

                String scheme = uri.getScheme() != null ? uri.getScheme() + "://" : null;
                String authority = uri.getRawAuthority() != null ? uri.getRawAuthority() : ""; // includes domain and port
                String path = uri.getRawPath() != null ? uri.getRawPath() : "";
                String queryString = uri.getRawQuery() != null ? "?" + uri.getRawQuery() : "";

                // Must convert domain to punycode separately from the path
                url = (includeScheme ? scheme : "") + IDN.toASCII(authority) + path + queryString;

                // Convert path from unicode to ascii encoding
                url = new URI(url).toASCIIString();
            }
        }
        return url;
    }
}

Apache HTTP Client 4.3.4 Example with Timeouts, Client Cert and Trusted Cert from KeyStores

If you have ever used or tried to use the Apache HTTP Client library, you’ve probably discovered that is is very useful and powerful, but they change the library so much each version that it is hard to find proper examples of how to do complex things like adding connection timeouts, basic auth, or adding a client cert, or trusting a self-signed cert. Here’s a code snippet of getting a client cert and a trusted server cert loaded with timeouts in version 4.3.4 of the library.

    <dependency>
	<groupId>org.apache.httpcomponents</groupId>
	<artifactId>httpclient</artifactId>
	<version>4.3.4</version>
    </dependency>

    public static HttpClient getSSLClient(String serverKeystoreFile, String serverKeystorePassword, String serverKeystoreType,
                                         String clientCertFile, String clientCertPassword, String clientKeystoreType) throws Exception {

        // Server cert trust stuff
        KeyStore trustStore = KeyStore.getInstance(serverKeystoreType);
        trustStore.load(new FileInputStream(new File(serverKeystoreFile)), serverKeystorePassword.toCharArray());
        TrustManagerFactory trustFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
        trustFactory.init(trustStore);
        TrustManager[] trustManagers = trustFactory.getTrustManagers();

        // Client cert stuff
        KeyStore clientCert = KeyStore.getInstance(clientKeystoreType);
        clientCert.load(new FileInputStream(new File(clientCertFile)), clientCertPassword.toCharArray());
        KeyManagerFactory keyFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
        keyFactory.init(clientCert, clientCertPassword.toCharArray());
        KeyManager[] keyManagers = keyFactory.getKeyManagers();

        SSLContext sslcontext = SSLContext.getInstance("TLS");
        sslcontext.init(keyManagers, trustManagers, new SecureRandom());

        SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslcontext);

        HttpClient httpClient = HttpClients.custom()
                .setSSLSocketFactory(sslsf)
                .build();

        return httpClient;
    }

    public static String executePost() throws Exception {

	 URI uri = new URIBuilder("http://my.superawesome.fakeurlstuffandstuff.com/path1/path2/file.php")).build();
	 String serverKeystore = Properties.get("TRUSTED_CERT_KEYSTORE_FILE_PATH"); // JKS FILE
	 String serverPass = Properties.get("TRUSTED_CERT_KEYSTORE_PASSWORD");
	 String clientKeystore = Properties.get("CLIENT_KEYSTORE"); // PFX FILE
	 String clientPass = Properties.get("CLIENT_KEYSTORE_PASSWORD");

	 HttpClient client = HttpJsonUtil.getClient(serverKeystore, serverPass, "JKS", clientKeystore, clientPass, "PKCS12");

	 RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
	 requestConfigBuilder.setMaxRedirects(2);
	 requestConfigBuilder.setConnectionRequestTimeout(Properties.getInt("SERVER_REQUEST_TIMEOUT_MILLIS", 30_000));
	 requestConfigBuilder.setConnectTimeout(Properties.getInt("SERVER_CONNECT_TIMEOUT_MILLIS", 10_000));

	 httpPost = new HttpPost(uri);
	 httpPost.setConfig(requestConfigBuilder.build());
	 httpPost.setEntity(new StringEntity("REQUEST BODY"));
	 httpPost.setHeader("Content-Type", "application/json");
	 httpPost.setHeader("Accept", "application/json");
	 HttpResponse response = client.execute(httpPost);
	 return EntityUtils.toString(response.getEntity());       
    }

Image Proxy using Jersey and HttpClient

Here’s the Jersey Endpoint:

    @GET
    @Path("imageproxy")
    @Produces("image/png")
    public Response imageproxy(@QueryParam("url") String url) {
        byte[] result = null;
        try {
             result = ImageHelper.getUrlBinary(url);
        } catch(Exception ex) {
            logger.error("Error proxying image", ex);
        }
        if(result != null) {
            return Response.ok(new ByteArrayInputStream(result)).build();
        } else {
            return Response.noContent().build();
        }
    }

Here’s the HttpClient code:

import org.apache.commons.io.IOUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Logger;
import java.net.URI;

public class ImageHelper {
    private static Logger logger = Logger.getLogger(ImageHelper.class);
    public static byte[] getUrlBinary(String url) {
        byte[] result = null;
        try {
            URI uri = new URI(url);
            HttpClient client = new DefaultHttpClient();
            HttpGet httpGet = new HttpGet(uri);
            HttpResponse response = client.execute(httpGet);
            result = IOUtils.toByteArray(response.getEntity().getContent());
        } catch(Exception ex) {
            logger.error("Error getting binary from: " + url, ex);
        }
        return result;
    }
}

Java – TAXII – Collection Management Endpoint

As a followup to my last post, here’s a partial implementation of the collection management endpoint for a TAXII server. I added error handling via the TAXII status message response too.

    @POST
    @Path("collection")
    @Consumes (MediaType.APPLICATION_XML)
    @Produces (MediaType.APPLICATION_XML)
    public Response collection(@Context HttpServletRequest request, String x) {
        CollectionInformationRequest collectionRequest = null;
        SubscriptionManagementRequest subscriptionRequest = null;
        try {
            //printHeaders(request);

            System.out.println("---------- Request:");
            Object requestObject = getRequestObject(x);
            if(requestObject instanceof CollectionInformationRequest) {
                collectionRequest = (CollectionInformationRequest) requestObject;
            } else if(requestObject instanceof SubscriptionManagementRequest) {
                subscriptionRequest = (SubscriptionManagementRequest) requestObject;
            } else {
                throw new Exception("Unsupported request type");
            }
            
            System.out.println("---------- Response:");
            if(collectionRequest != null) {
                System.out.println(toXml(collectionRequest));

                List collections = new ArrayList();
                collections.add(factory.createCollectionRecordType()
                        .withAvailable(true)
                        .withCollectionType(CollectionTypeEnum.DATA_FEED)
                        .withCollectionName("default")
                        .withDescription("Default data set description")
                        .withPollingServices(factory.createServiceContactInfoType()
                                .withAddress("/poll")
                                .withMessageBindings(Versions.VID_TAXII_XML_11)
                                .withProtocolBinding(Versions.VID_TAXII_HTTP_10)
                        )
                        .withSubscriptionServices(factory.createServiceContactInfoType()
                                .withAddress("/collection")
                                .withMessageBindings(Versions.VID_TAXII_XML_11)
                                .withProtocolBinding(Versions.VID_TAXII_HTTP_10)
                        )
                        .withContentBindings(factory.createContentBindingIDType().withBindingId(ContentBindings.CB_STIX_XML_111))
                );

                CollectionInformationResponse collectionResponse = factory.createCollectionInformationResponse()
                        .withInResponseTo(collectionRequest.getMessageId())
                        .withMessageId(MessageHelper.generateMessageId())
                        .withCollections(collections);

                String responseString = toXml(collectionResponse);
                System.out.println(taxiiXml.marshalToString(collectionResponse, true));

                return generateResponse(responseString, request);
            } else {
                System.out.println(toXml(subscriptionRequest));
                
                String subscriptionId = subscriptionRequest.getSubscriptionID(); // Should be null on a subscribe
                CollectionActionEnum action = subscriptionRequest.getAction();
                PushParameterType pushLocation = subscriptionRequest.getPushParameters();
                
                // Gather type, query, content bindings so we know what kind of delivery they want
                // Store/update their subscription based on the desired action.  
                // Pause/Resume are tricky because you need to pick up where they paused and send what they missed
                
                SubscriptionManagementRequest subscriptionResponse = factory.createSubscriptionManagementRequest()
                        .withMessageId(MessageHelper.generateMessageId())
                        .withSubscriptionID(subscriptionId)
                        .withCollectionName(subscriptionRequest.getCollectionName())
                        .withAction(subscriptionRequest.getAction());
                
                String responseString = toXml(subscriptionResponse);
                return generateResponse(responseString, request);
            }
        } catch(Exception ex) {
            return handleError(ex, request, collectionRequest);
        }
    }

    private Response handleError(Exception ex, HttpServletRequest httpRequest, RequestMessageType taxiiRequest) {
        ex.printStackTrace();
        try {
            StatusMessage status = factory.createStatusMessage()
                    .withMessage("Error: " + ex.getMessage())
                    .withInResponseTo(taxiiRequest != null ? taxiiRequest.getMessageId() : null)
                    .withMessageId(MessageHelper.generateMessageId())
                    .withStatusType(StatusTypeEnum.FAILURE.value());
            String responseString = toXml(status);
            return generateResponse(responseString, httpRequest);
        } catch(Exception e) {
            e.printStackTrace();
            return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); 
        }
    }