From a6244d986a679f64555ab336cad13fbeeee9956d Mon Sep 17 00:00:00 2001 From: Daniel Gultsch Date: Sun, 21 Mar 2021 20:45:26 +0100 Subject: [PATCH] use settable futures for slot requester --- .../http/HttpUploadConnection.java | 39 +++++++++---- .../conversations/http/SlotRequester.java | 55 +++++++++---------- .../xmpp/IqResponseException.java | 8 +++ 3 files changed, 61 insertions(+), 41 deletions(-) create mode 100644 src/main/java/eu/siacs/conversations/xmpp/IqResponseException.java diff --git a/src/main/java/eu/siacs/conversations/http/HttpUploadConnection.java b/src/main/java/eu/siacs/conversations/http/HttpUploadConnection.java index 92670a83d..0a90072c0 100644 --- a/src/main/java/eu/siacs/conversations/http/HttpUploadConnection.java +++ b/src/main/java/eu/siacs/conversations/http/HttpUploadConnection.java @@ -2,11 +2,18 @@ package eu.siacs.conversations.http; import android.util.Log; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; + +import org.checkerframework.checker.nullness.compatqual.NullableDecl; import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import eu.siacs.conversations.Config; @@ -34,9 +41,7 @@ public class HttpUploadConnection implements Transferable, AbstractConnectionMan private final HttpConnectionManager mHttpConnectionManager; private final XmppConnectionService mXmppConnectionService; - private final SlotRequester mSlotRequester; private final Method method; - private final boolean mUseTor; private boolean delayed = false; private DownloadableFile file; private final Message message; @@ -46,14 +51,13 @@ public class HttpUploadConnection implements Transferable, AbstractConnectionMan private long transmitted = 0; private Call mostRecentCall; + private ListenableFuture slotFuture; public HttpUploadConnection(Message message, Method method, HttpConnectionManager httpConnectionManager) { this.message = message; this.method = method; this.mHttpConnectionManager = httpConnectionManager; this.mXmppConnectionService = httpConnectionManager.getXmppConnectionService(); - this.mSlotRequester = new SlotRequester(this.mXmppConnectionService); - this.mUseTor = mXmppConnectionService.useTorToConnect(); } @Override @@ -81,6 +85,10 @@ public class HttpUploadConnection implements Transferable, AbstractConnectionMan @Override public void cancel() { + final ListenableFuture slotFuture = this.slotFuture; + if (slotFuture != null && !slotFuture.isDone()) { + slotFuture.cancel(true); + } final Call call = this.mostRecentCall; if (call != null && !call.isCanceled()) { call.cancel(); @@ -90,10 +98,16 @@ public class HttpUploadConnection implements Transferable, AbstractConnectionMan private void fail(String errorMessage) { finish(); final Call call = this.mostRecentCall; - final boolean cancelled = call != null && call.isCanceled(); + final Future slotFuture = this.slotFuture; + final boolean cancelled = (call != null && call.isCanceled()) || (slotFuture != null && slotFuture.isCancelled()); mXmppConnectionService.markMessage(message, Message.STATUS_SEND_FAILED, cancelled ? Message.ERROR_MESSAGE_CANCELLED : errorMessage); } + private void markAsCancelled() { + finish(); + mXmppConnectionService.markMessage(message, Message.STATUS_SEND_FAILED, Message.ERROR_MESSAGE_CANCELLED); + } + private void finish() { mHttpConnectionManager.finishUploadConnection(this); message.setTransferable(null); @@ -118,19 +132,20 @@ public class HttpUploadConnection implements Transferable, AbstractConnectionMan } this.file.setExpectedSize(originalFileSize + (file.getKey() != null ? 16 : 0)); message.resetFileParams(); - this.mSlotRequester.request(method, account, file, mime, new SlotRequester.OnSlotRequested() { + this.slotFuture = new SlotRequester(mXmppConnectionService).request(method, account, file, mime); + Futures.addCallback(this.slotFuture, new FutureCallback() { @Override - public void success(final SlotRequester.Slot slot) { - //TODO needs to mark the message as cancelled afterwards (ie call fail()) - HttpUploadConnection.this.slot = slot; + public void onSuccess(@NullableDecl SlotRequester.Slot result) { + HttpUploadConnection.this.slot = result; HttpUploadConnection.this.upload(); } @Override - public void failure(String message) { - fail(message); + public void onFailure(@NotNull final Throwable throwable) { + Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": unable to request slot", throwable); + fail(throwable.getMessage()); } - }); + }, MoreExecutors.directExecutor()); message.setTransferable(this); mXmppConnectionService.markMessage(message, Message.STATUS_UNSEND); } diff --git a/src/main/java/eu/siacs/conversations/http/SlotRequester.java b/src/main/java/eu/siacs/conversations/http/SlotRequester.java index 30a484693..5a3558855 100644 --- a/src/main/java/eu/siacs/conversations/http/SlotRequester.java +++ b/src/main/java/eu/siacs/conversations/http/SlotRequester.java @@ -29,19 +29,19 @@ package eu.siacs.conversations.http; -import android.util.Log; - import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import java.util.Map; -import eu.siacs.conversations.Config; import eu.siacs.conversations.entities.Account; import eu.siacs.conversations.entities.DownloadableFile; import eu.siacs.conversations.parser.IqParser; import eu.siacs.conversations.services.XmppConnectionService; import eu.siacs.conversations.xml.Element; import eu.siacs.conversations.xml.Namespace; +import eu.siacs.conversations.xmpp.IqResponseException; import eu.siacs.conversations.xmpp.Jid; import eu.siacs.conversations.xmpp.stanzas.IqPacket; import okhttp3.Headers; @@ -55,21 +55,22 @@ public class SlotRequester { this.service = service; } - public void request(Method method, Account account, DownloadableFile file, String mime, OnSlotRequested callback) { + public ListenableFuture request(Method method, Account account, DownloadableFile file, String mime) { if (method == Method.HTTP_UPLOAD_LEGACY) { final Jid host = account.getXmppConnection().findDiscoItemByFeature(Namespace.HTTP_UPLOAD_LEGACY); - requestHttpUploadLegacy(account, host, file, mime, callback); + return requestHttpUploadLegacy(account, host, file, mime); } else { final Jid host = account.getXmppConnection().findDiscoItemByFeature(Namespace.HTTP_UPLOAD); - requestHttpUpload(account, host, file, mime, callback); + return requestHttpUpload(account, host, file, mime); } } - private void requestHttpUploadLegacy(Account account, Jid host, DownloadableFile file, String mime, OnSlotRequested callback) { - IqPacket request = service.getIqGenerator().requestHttpUploadLegacySlot(host, file, mime); + private ListenableFuture requestHttpUploadLegacy(Account account, Jid host, DownloadableFile file, String mime) { + final SettableFuture future = SettableFuture.create(); + final IqPacket request = service.getIqGenerator().requestHttpUploadLegacySlot(host, file, mime); service.sendIqPacket(account, request, (a, packet) -> { if (packet.getType() == IqPacket.TYPE.RESULT) { - Element slotElement = packet.findChild("slot", Namespace.HTTP_UPLOAD_LEGACY); + final Element slotElement = packet.findChild("slot", Namespace.HTTP_UPLOAD_LEGACY); if (slotElement != null) { try { final String putUrl = slotElement.findChildContent("put"); @@ -80,22 +81,23 @@ public class SlotRequester { HttpUrl.get(getUrl), Headers.of("Content-Type", mime == null ? "application/octet-stream" : mime) ); - callback.success(slot); + future.set(slot); return; } - } catch (IllegalArgumentException e) { - //fall through + } catch (final IllegalArgumentException e) { + future.setException(e); + return; } } } - Log.d(Config.LOGTAG, account.getJid().toString() + ": invalid response to slot request " + packet); - callback.failure(IqParser.extractErrorMessage(packet)); + future.setException(new IqResponseException(IqParser.extractErrorMessage(packet))); }); - + return future; } - private void requestHttpUpload(Account account, Jid host, DownloadableFile file, String mime, OnSlotRequested callback) { - IqPacket request = service.getIqGenerator().requestHttpUploadSlot(host, file, mime); + private ListenableFuture requestHttpUpload(Account account, Jid host, DownloadableFile file, String mime) { + final SettableFuture future = SettableFuture.create(); + final IqPacket request = service.getIqGenerator().requestHttpUploadSlot(host, file, mime); service.sendIqPacket(account, request, (a, packet) -> { if (packet.getType() == IqPacket.TYPE.RESULT) { final Element slotElement = packet.findChild("slot", Namespace.HTTP_UPLOAD); @@ -107,7 +109,7 @@ public class SlotRequester { final String getUrl = get == null ? null : get.getAttribute("url"); if (getUrl != null && putUrl != null) { final ImmutableMap.Builder headers = new ImmutableMap.Builder<>(); - for (Element child : put.getChildren()) { + for (final Element child : put.getChildren()) { if ("header".equals(child.getName())) { final String name = child.getAttribute("name"); final String value = child.getContent(); @@ -118,23 +120,18 @@ public class SlotRequester { } headers.put("Content-Type", mime == null ? "application/octet-stream" : mime); final Slot slot = new Slot(HttpUrl.get(putUrl), HttpUrl.get(getUrl), headers.build()); - callback.success(slot); + future.set(slot); return; } - } catch (IllegalArgumentException e) { - //fall through + } catch (final IllegalArgumentException e) { + future.setException(e); + return; } } } - Log.d(Config.LOGTAG, account.getJid().toString() + ": invalid response to slot request " + packet); - callback.failure(IqParser.extractErrorMessage(packet)); + future.setException(new IqResponseException(IqParser.extractErrorMessage(packet))); }); - - } - - public interface OnSlotRequested { - void success(Slot slot); - void failure(String message); + return future; } public static class Slot { diff --git a/src/main/java/eu/siacs/conversations/xmpp/IqResponseException.java b/src/main/java/eu/siacs/conversations/xmpp/IqResponseException.java new file mode 100644 index 000000000..84357eaa3 --- /dev/null +++ b/src/main/java/eu/siacs/conversations/xmpp/IqResponseException.java @@ -0,0 +1,8 @@ +package eu.siacs.conversations.xmpp; + +public class IqResponseException extends Exception { + + public IqResponseException(final String message) { + super(message); + } +}