use settable futures for slot requester

This commit is contained in:
Daniel Gultsch 2021-03-21 20:45:26 +01:00
parent 8ac97b0027
commit a6244d986a
3 changed files with 61 additions and 41 deletions

View File

@ -2,11 +2,18 @@ package eu.siacs.conversations.http;
import android.util.Log;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.checkerframework.checker.nullness.compatqual.NullableDecl;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import eu.siacs.conversations.Config;
@ -34,9 +41,7 @@ public class HttpUploadConnection implements Transferable, AbstractConnectionMan
private final HttpConnectionManager mHttpConnectionManager;
private final XmppConnectionService mXmppConnectionService;
private final SlotRequester mSlotRequester;
private final Method method;
private final boolean mUseTor;
private boolean delayed = false;
private DownloadableFile file;
private final Message message;
@ -46,14 +51,13 @@ public class HttpUploadConnection implements Transferable, AbstractConnectionMan
private long transmitted = 0;
private Call mostRecentCall;
private ListenableFuture<SlotRequester.Slot> slotFuture;
public HttpUploadConnection(Message message, Method method, HttpConnectionManager httpConnectionManager) {
this.message = message;
this.method = method;
this.mHttpConnectionManager = httpConnectionManager;
this.mXmppConnectionService = httpConnectionManager.getXmppConnectionService();
this.mSlotRequester = new SlotRequester(this.mXmppConnectionService);
this.mUseTor = mXmppConnectionService.useTorToConnect();
}
@Override
@ -81,6 +85,10 @@ public class HttpUploadConnection implements Transferable, AbstractConnectionMan
@Override
public void cancel() {
final ListenableFuture<SlotRequester.Slot> slotFuture = this.slotFuture;
if (slotFuture != null && !slotFuture.isDone()) {
slotFuture.cancel(true);
}
final Call call = this.mostRecentCall;
if (call != null && !call.isCanceled()) {
call.cancel();
@ -90,10 +98,16 @@ public class HttpUploadConnection implements Transferable, AbstractConnectionMan
private void fail(String errorMessage) {
finish();
final Call call = this.mostRecentCall;
final boolean cancelled = call != null && call.isCanceled();
final Future<SlotRequester.Slot> slotFuture = this.slotFuture;
final boolean cancelled = (call != null && call.isCanceled()) || (slotFuture != null && slotFuture.isCancelled());
mXmppConnectionService.markMessage(message, Message.STATUS_SEND_FAILED, cancelled ? Message.ERROR_MESSAGE_CANCELLED : errorMessage);
}
private void markAsCancelled() {
finish();
mXmppConnectionService.markMessage(message, Message.STATUS_SEND_FAILED, Message.ERROR_MESSAGE_CANCELLED);
}
private void finish() {
mHttpConnectionManager.finishUploadConnection(this);
message.setTransferable(null);
@ -118,19 +132,20 @@ public class HttpUploadConnection implements Transferable, AbstractConnectionMan
}
this.file.setExpectedSize(originalFileSize + (file.getKey() != null ? 16 : 0));
message.resetFileParams();
this.mSlotRequester.request(method, account, file, mime, new SlotRequester.OnSlotRequested() {
this.slotFuture = new SlotRequester(mXmppConnectionService).request(method, account, file, mime);
Futures.addCallback(this.slotFuture, new FutureCallback<SlotRequester.Slot>() {
@Override
public void success(final SlotRequester.Slot slot) {
//TODO needs to mark the message as cancelled afterwards (ie call fail())
HttpUploadConnection.this.slot = slot;
public void onSuccess(@NullableDecl SlotRequester.Slot result) {
HttpUploadConnection.this.slot = result;
HttpUploadConnection.this.upload();
}
@Override
public void failure(String message) {
fail(message);
public void onFailure(@NotNull final Throwable throwable) {
Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": unable to request slot", throwable);
fail(throwable.getMessage());
}
});
}, MoreExecutors.directExecutor());
message.setTransferable(this);
mXmppConnectionService.markMessage(message, Message.STATUS_UNSEND);
}

View File

@ -29,19 +29,19 @@
package eu.siacs.conversations.http;
import android.util.Log;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.Map;
import eu.siacs.conversations.Config;
import eu.siacs.conversations.entities.Account;
import eu.siacs.conversations.entities.DownloadableFile;
import eu.siacs.conversations.parser.IqParser;
import eu.siacs.conversations.services.XmppConnectionService;
import eu.siacs.conversations.xml.Element;
import eu.siacs.conversations.xml.Namespace;
import eu.siacs.conversations.xmpp.IqResponseException;
import eu.siacs.conversations.xmpp.Jid;
import eu.siacs.conversations.xmpp.stanzas.IqPacket;
import okhttp3.Headers;
@ -55,21 +55,22 @@ public class SlotRequester {
this.service = service;
}
public void request(Method method, Account account, DownloadableFile file, String mime, OnSlotRequested callback) {
public ListenableFuture<Slot> request(Method method, Account account, DownloadableFile file, String mime) {
if (method == Method.HTTP_UPLOAD_LEGACY) {
final Jid host = account.getXmppConnection().findDiscoItemByFeature(Namespace.HTTP_UPLOAD_LEGACY);
requestHttpUploadLegacy(account, host, file, mime, callback);
return requestHttpUploadLegacy(account, host, file, mime);
} else {
final Jid host = account.getXmppConnection().findDiscoItemByFeature(Namespace.HTTP_UPLOAD);
requestHttpUpload(account, host, file, mime, callback);
return requestHttpUpload(account, host, file, mime);
}
}
private void requestHttpUploadLegacy(Account account, Jid host, DownloadableFile file, String mime, OnSlotRequested callback) {
IqPacket request = service.getIqGenerator().requestHttpUploadLegacySlot(host, file, mime);
private ListenableFuture<Slot> requestHttpUploadLegacy(Account account, Jid host, DownloadableFile file, String mime) {
final SettableFuture<Slot> future = SettableFuture.create();
final IqPacket request = service.getIqGenerator().requestHttpUploadLegacySlot(host, file, mime);
service.sendIqPacket(account, request, (a, packet) -> {
if (packet.getType() == IqPacket.TYPE.RESULT) {
Element slotElement = packet.findChild("slot", Namespace.HTTP_UPLOAD_LEGACY);
final Element slotElement = packet.findChild("slot", Namespace.HTTP_UPLOAD_LEGACY);
if (slotElement != null) {
try {
final String putUrl = slotElement.findChildContent("put");
@ -80,22 +81,23 @@ public class SlotRequester {
HttpUrl.get(getUrl),
Headers.of("Content-Type", mime == null ? "application/octet-stream" : mime)
);
callback.success(slot);
future.set(slot);
return;
}
} catch (IllegalArgumentException e) {
//fall through
} catch (final IllegalArgumentException e) {
future.setException(e);
return;
}
}
}
Log.d(Config.LOGTAG, account.getJid().toString() + ": invalid response to slot request " + packet);
callback.failure(IqParser.extractErrorMessage(packet));
future.setException(new IqResponseException(IqParser.extractErrorMessage(packet)));
});
return future;
}
private void requestHttpUpload(Account account, Jid host, DownloadableFile file, String mime, OnSlotRequested callback) {
IqPacket request = service.getIqGenerator().requestHttpUploadSlot(host, file, mime);
private ListenableFuture<Slot> requestHttpUpload(Account account, Jid host, DownloadableFile file, String mime) {
final SettableFuture<Slot> future = SettableFuture.create();
final IqPacket request = service.getIqGenerator().requestHttpUploadSlot(host, file, mime);
service.sendIqPacket(account, request, (a, packet) -> {
if (packet.getType() == IqPacket.TYPE.RESULT) {
final Element slotElement = packet.findChild("slot", Namespace.HTTP_UPLOAD);
@ -107,7 +109,7 @@ public class SlotRequester {
final String getUrl = get == null ? null : get.getAttribute("url");
if (getUrl != null && putUrl != null) {
final ImmutableMap.Builder<String, String> headers = new ImmutableMap.Builder<>();
for (Element child : put.getChildren()) {
for (final Element child : put.getChildren()) {
if ("header".equals(child.getName())) {
final String name = child.getAttribute("name");
final String value = child.getContent();
@ -118,23 +120,18 @@ public class SlotRequester {
}
headers.put("Content-Type", mime == null ? "application/octet-stream" : mime);
final Slot slot = new Slot(HttpUrl.get(putUrl), HttpUrl.get(getUrl), headers.build());
callback.success(slot);
future.set(slot);
return;
}
} catch (IllegalArgumentException e) {
//fall through
} catch (final IllegalArgumentException e) {
future.setException(e);
return;
}
}
}
Log.d(Config.LOGTAG, account.getJid().toString() + ": invalid response to slot request " + packet);
callback.failure(IqParser.extractErrorMessage(packet));
future.setException(new IqResponseException(IqParser.extractErrorMessage(packet)));
});
}
public interface OnSlotRequested {
void success(Slot slot);
void failure(String message);
return future;
}
public static class Slot {

View File

@ -0,0 +1,8 @@
package eu.siacs.conversations.xmpp;
public class IqResponseException extends Exception {
public IqResponseException(final String message) {
super(message);
}
}