Skip to content

Commit

Permalink
Add full support for "batch" operation on tables
Browse files Browse the repository at this point in the history
  • Loading branch information
Renaud Paquay authored and lodejard committed Feb 13, 2012

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 3f5f5d7 commit 802e9b0
Showing 9 changed files with 473 additions and 80 deletions.
Original file line number Diff line number Diff line change
@@ -2,15 +2,15 @@
* Copyright 2011 Microsoft Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.windowsazure.services.core.utils.pipeline;

@@ -28,7 +28,7 @@

public class PipelineHelpers {
public static void ThrowIfError(ClientResponse r) {
if (r.getStatus() >= 300) {
if (r.getStatus() >= 400) {
throw new UniformInterfaceException(r);
}
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
import com.microsoft.windowsazure.services.table.implementation.AtomReaderWriter;
import com.microsoft.windowsazure.services.table.implementation.DefaultEdmValueConterter;
import com.microsoft.windowsazure.services.table.implementation.DefaultXMLStreamFactory;
import com.microsoft.windowsazure.services.table.implementation.HttpReaderWriter;
import com.microsoft.windowsazure.services.table.implementation.MimeReaderWriter;
import com.microsoft.windowsazure.services.table.implementation.SharedKeyFilter;
import com.microsoft.windowsazure.services.table.implementation.SharedKeyLiteFilter;
@@ -36,6 +37,7 @@ public void register(Builder.Registry registry) {
registry.add(XMLStreamFactory.class, DefaultXMLStreamFactory.class);
registry.add(AtomReaderWriter.class);
registry.add(MimeReaderWriter.class);
registry.add(HttpReaderWriter.class);
registry.add(EdmValueConverter.class, DefaultEdmValueConterter.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package com.microsoft.windowsazure.services.table.implementation;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader;
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.util.Enumeration;

import javax.activation.DataSource;
import javax.inject.Inject;
import javax.mail.Header;
import javax.mail.MessagingException;
import javax.mail.internet.InternetHeaders;

import com.sun.mail.util.LineInputStream;

public class HttpReaderWriter {

@Inject
public HttpReaderWriter() {
}

public StatusLine parseStatusLine(DataSource ds) {
try {
LineInputStream stream = new LineInputStream(ds.getInputStream());
String line = stream.readLine();
StringReader lineReader = new StringReader(line);

expect(lineReader, "HTTP/1.1");
expect(lineReader, " ");
String statusString = extractInput(lineReader, ' ');
String reason = extractInput(lineReader, -1);

return new StatusLine().setStatus(Integer.parseInt(statusString)).setReason(reason);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

public InternetHeaders parseHeaders(DataSource ds) {
try {
return new InternetHeaders(ds.getInputStream());
}
catch (MessagingException e) {
throw new RuntimeException(e);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

public InputStream parseEntity(DataSource ds) {
try {
return ds.getInputStream();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

public void appendMethod(OutputStream stream, String verb, URI uri) {
try {
String method = String.format("%s %s %s\r\n", verb, uri, "HTTP/1.1");
stream.write(method.getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

public void appendHeaders(OutputStream stream, InternetHeaders headers) {
try {
// Headers
Enumeration<Header> e = headers.getAllHeaders();
while (e.hasMoreElements()) {
Header header = e.nextElement();

String headerLine = String.format("%s: %s\r\n", header.getName(), header.getValue());
stream.write(headerLine.getBytes("UTF-8"));
}

// Empty line
stream.write("\r\n".getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

public void appendEntity(OutputStream stream, InputStream entity) {
try {
byte[] buffer = new byte[1024];
while (true) {
int n = entity.read(buffer);
if (n == -1)
break;
stream.write(buffer, 0, n);
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

private void expect(Reader reader, String string) {
try {
for (int i = 0; i < string.length(); i++) {
int ch = reader.read();
if (ch < 0)
throw new RuntimeException(String.format("Expected '%s', found '%s' instead", string,
string.substring(0, i) + ch));
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

private String extractInput(Reader reader, int delimiter) {
try {
StringBuilder sb = new StringBuilder();
while (true) {
int ch = reader.read();
if (ch == -1 || ch == delimiter)
break;

sb.append((char) ch);
}
return sb.toString();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

public class StatusLine {
private int status;
private String reason;

public int getStatus() {
return status;
}

public StatusLine setStatus(int status) {
this.status = status;
return this;
}

public String getReason() {
return reason;
}

public StatusLine setReason(String reason) {
this.reason = reason;
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.microsoft.windowsazure.services.table.implementation;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import javax.activation.DataSource;

public class InputStreamDataSource implements DataSource {
private final InputStream stream;
private final String contentType;

public InputStreamDataSource(InputStream stream, String contentType) {
this.stream = stream;
this.contentType = contentType;

}

@Override
public String getContentType() {
return contentType;
}

@Override
public InputStream getInputStream() throws IOException {
return stream;
}

@Override
public String getName() {
return null;
}

@Override
public OutputStream getOutputStream() throws IOException {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -3,32 +3,40 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import javax.activation.DataHandler;
import javax.activation.DataSource;
import javax.inject.Inject;
import javax.mail.BodyPart;
import javax.mail.MessagingException;
import javax.mail.MultipartDataSource;
import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMultipart;
import javax.mail.internet.MimePartDataSource;

public class MimeReaderWriter {

@Inject
public MimeReaderWriter() {
}

public MimeMultipart getMimeMultipart(List<String> bodyPartContents) {
public MimeMultipart getMimeMultipart(List<DataSource> bodyPartContents) {
try {
return getMimeMultipartCore(bodyPartContents);
}
catch (MessagingException e) {
throw new RuntimeException(e);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

private MimeMultipart getMimeMultipartCore(List<String> bodyPartContents) throws MessagingException {
private MimeMultipart getMimeMultipartCore(List<DataSource> bodyPartContents) throws MessagingException,
IOException {
// Create unique part boundary strings
String batchId = String.format("batch_%s", UUID.randomUUID().toString());
String changeSet = String.format("changeset_%s", UUID.randomUUID().toString());
@@ -38,14 +46,11 @@ private MimeMultipart getMimeMultipartCore(List<String> bodyPartContents) throws
//
MimeMultipart changeSets = new MimeMultipart(new SetBoundaryMultipartDataSource(changeSet));

for (String bodyPart : bodyPartContents) {
for (DataSource bodyPart : bodyPartContents) {
MimeBodyPart mimeBodyPart = new MimeBodyPart();

mimeBodyPart.setContent(bodyPart, "application/http");

//Note: Both content type and encoding need to be set *after* setting content, because
// MimeBodyPart implementation replaces them when calling "setContent".
mimeBodyPart.setHeader("Content-Type", "application/http");
mimeBodyPart.setDataHandler(new DataHandler(bodyPart));
mimeBodyPart.setHeader("Content-Type", bodyPart.getContentType());
mimeBodyPart.setHeader("Content-Transfer-Encoding", "binary");

changeSets.addBodyPart(mimeBodyPart);
@@ -110,4 +115,33 @@ public BodyPart getBodyPart(int index) throws MessagingException {
return null;
}
}

public List<DataSource> parseParts(final InputStream entityInputStream, final String contentType) {
try {
return parsePartsCore(entityInputStream, contentType);
}
catch (IOException e) {
throw new RuntimeException(e);
}
catch (MessagingException e) {
throw new RuntimeException(e);
}
}

private List<DataSource> parsePartsCore(InputStream entityInputStream, String contentType)
throws MessagingException, IOException {
DataSource ds = new InputStreamDataSource(entityInputStream, contentType);
MimeMultipart batch = new MimeMultipart(ds);
MimeBodyPart batchBody = (MimeBodyPart) batch.getBodyPart(0);

MimeMultipart changeSets = new MimeMultipart(new MimePartDataSource(batchBody));

List<DataSource> result = new ArrayList<DataSource>();
for (int i = 0; i < changeSets.getCount(); i++) {
BodyPart part = changeSets.getBodyPart(i);

result.add(new InputStreamDataSource(part.getInputStream(), part.getContentType()));
}
return result;
}
}
Loading

0 comments on commit 802e9b0

Please sign in to comment.