request server-ACKs for iq stanzas

This commit is contained in:
Daniel Gultsch 2015-08-06 20:48:55 +02:00
parent 6694af8fca
commit 53ce5d223e
1 changed files with 37 additions and 33 deletions

View File

@ -95,7 +95,7 @@ public class XmppConnection implements Runnable {
private String streamId = null; private String streamId = null;
private int smVersion = 3; private int smVersion = 3;
private final SparseArray<String> messageReceipts = new SparseArray<>(); private final SparseArray<String> mStanzaReceipts = new SparseArray<>();
private int stanzasReceived = 0; private int stanzasReceived = 0;
private int stanzasSent = 0; private int stanzasSent = 0;
@ -341,23 +341,24 @@ public class XmppConnection implements Runnable {
+ ": session resumed with lost packages"); + ": session resumed with lost packages");
stanzasSent = serverCount; stanzasSent = serverCount;
} else { } else {
Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": session resumed");
+ ": session resumed");
} }
if (acknowledgedListener != null) { acknowledgeStanzaUpTo(serverCount);
for (int i = 0; i < messageReceipts.size(); ++i) { ArrayList<IqPacket> failedIqPackets = new ArrayList<>();
if (serverCount >= messageReceipts.keyAt(i)) { for(int i = 0; i < this.mStanzaReceipts.size(); ++i) {
acknowledgedListener.onMessageAcknowledged( String id = mStanzaReceipts.valueAt(i);
account, messageReceipts.valueAt(i)); Pair<IqPacket,OnIqPacketReceived> pair = id == null ? null : this.packetCallbacks.get(id);
if (pair != null) {
failedIqPackets.add(pair.first);
} }
} }
mStanzaReceipts.clear();
Log.d(Config.LOGTAG,"resending "+failedIqPackets.size()+" iq stanza");
for(IqPacket packet : failedIqPackets) {
sendUnmodifiedIqPacket(packet,null);
} }
messageReceipts.clear();
} catch (final NumberFormatException ignored) { } catch (final NumberFormatException ignored) {
} }
sendServiceDiscoveryInfo(account.getServer());
sendServiceDiscoveryInfo(account.getJid().toBareJid());
sendServiceDiscoveryItems(account.getServer());
sendInitialPing(); sendInitialPing();
} else if (nextTag.isStart("r")) { } else if (nextTag.isStart("r")) {
tagReader.readElement(nextTag); tagReader.readElement(nextTag);
@ -371,17 +372,7 @@ public class XmppConnection implements Runnable {
lastPacketReceived = SystemClock.elapsedRealtime(); lastPacketReceived = SystemClock.elapsedRealtime();
try { try {
final int serverSequence = Integer.parseInt(ack.getAttribute("h")); final int serverSequence = Integer.parseInt(ack.getAttribute("h"));
if (Config.EXTENDED_SM_LOGGING) { acknowledgeStanzaUpTo(serverSequence);
Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": server acknowledged stanza #" + serverSequence);
}
final String msgId = this.messageReceipts.get(serverSequence);
if (msgId != null) {
if (this.acknowledgedListener != null) {
this.acknowledgedListener.onMessageAcknowledged(
account, msgId);
}
this.messageReceipts.remove(serverSequence);
}
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
Log.d(Config.LOGTAG,account.getJid().toBareJid()+": server send ack without sequence number"); Log.d(Config.LOGTAG,account.getJid().toBareJid()+": server send ack without sequence number");
} }
@ -409,6 +400,22 @@ public class XmppConnection implements Runnable {
} }
} }
private void acknowledgeStanzaUpTo(int serverCount) {
for (int i = 0; i < mStanzaReceipts.size(); ++i) {
if (serverCount >= mStanzaReceipts.keyAt(i)) {
if (Config.EXTENDED_SM_LOGGING) {
Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": server acknowledged stanza #" + mStanzaReceipts.keyAt(i));
}
String id = mStanzaReceipts.valueAt(i);
if (acknowledgedListener != null) {
acknowledgedListener.onMessageAcknowledged(account, id);
}
mStanzaReceipts.removeAt(i);
i--;
}
}
}
private void sendInitialPing() { private void sendInitialPing() {
Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": sending intial ping"); Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": sending intial ping");
final IqPacket iq = new IqPacket(IqPacket.TYPE.GET); final IqPacket iq = new IqPacket(IqPacket.TYPE.GET);
@ -778,7 +785,7 @@ public class XmppConnection implements Runnable {
final EnablePacket enable = new EnablePacket(smVersion); final EnablePacket enable = new EnablePacket(smVersion);
tagWriter.writeStanzaAsync(enable); tagWriter.writeStanzaAsync(enable);
stanzasSent = 0; stanzasSent = 0;
messageReceipts.clear(); mStanzaReceipts.clear();
} }
features.carbonsEnabled = false; features.carbonsEnabled = false;
features.blockListRequested = false; features.blockListRequested = false;
@ -920,9 +927,6 @@ public class XmppConnection implements Runnable {
packet.setAttribute("id", id); packet.setAttribute("id", id);
} }
if (callback != null) { if (callback != null) {
if (packet.getId() == null) {
packet.setId(nextRandomId());
}
packetCallbacks.put(packet.getId(), new Pair<>(packet, callback)); packetCallbacks.put(packet.getId(), new Pair<>(packet, callback));
} }
this.sendPacket(packet); this.sendPacket(packet);
@ -947,11 +951,11 @@ public class XmppConnection implements Runnable {
++stanzasSent; ++stanzasSent;
} }
tagWriter.writeStanzaAsync(packet); tagWriter.writeStanzaAsync(packet);
if (packet instanceof MessagePacket && packet.getId() != null && getFeatures().sm()) { if ((packet instanceof MessagePacket || packet instanceof IqPacket) && packet.getId() != null && getFeatures().sm()) {
if (Config.EXTENDED_SM_LOGGING) { if (Config.EXTENDED_SM_LOGGING) {
Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": requesting ack for message stanza #" + stanzasSent); Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": requesting ack for stanza #" + stanzasSent);
} }
this.messageReceipts.put(stanzasSent, packet.getId()); this.mStanzaReceipts.put(stanzasSent, packet.getId());
tagWriter.writeStanzaAsync(new RequestPacket(this.smVersion)); tagWriter.writeStanzaAsync(new RequestPacket(this.smVersion));
} }
} }