synchronize access to stanza queue

This commit is contained in:
Daniel Gultsch 2016-06-14 10:17:37 +02:00
parent 39ad426ca9
commit 95a51ea2e0
1 changed files with 31 additions and 21 deletions

View File

@ -506,22 +506,24 @@ public class XmppConnection implements Runnable {
final Element resumed = tagReader.readElement(nextTag); final Element resumed = tagReader.readElement(nextTag);
final String h = resumed.getAttribute("h"); final String h = resumed.getAttribute("h");
try { try {
final int serverCount = Integer.parseInt(h);
if (serverCount != stanzasSent) {
Log.d(Config.LOGTAG, account.getJid().toBareJid().toString()
+ ": session resumed with lost packages");
stanzasSent = serverCount;
} else {
Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": session resumed");
}
acknowledgeStanzaUpTo(serverCount);
ArrayList<AbstractAcknowledgeableStanza> failedStanzas = new ArrayList<>(); ArrayList<AbstractAcknowledgeableStanza> failedStanzas = new ArrayList<>();
for(int i = 0; i < this.mStanzaQueue.size(); ++i) { synchronized (this.mStanzaQueue) {
failedStanzas.add(mStanzaQueue.valueAt(i)); final int serverCount = Integer.parseInt(h);
if (serverCount != stanzasSent) {
Log.d(Config.LOGTAG, account.getJid().toBareJid().toString()
+ ": session resumed with lost packages");
stanzasSent = serverCount;
} else {
Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": session resumed");
}
acknowledgeStanzaUpTo(serverCount);
for (int i = 0; i < this.mStanzaQueue.size(); ++i) {
failedStanzas.add(mStanzaQueue.valueAt(i));
}
mStanzaQueue.clear();
} }
mStanzaQueue.clear(); Log.d(Config.LOGTAG, "resending " + failedStanzas.size() + " stanzas");
Log.d(Config.LOGTAG,"resending "+failedStanzas.size()+" stanzas"); for (AbstractAcknowledgeableStanza packet : failedStanzas) {
for(AbstractAcknowledgeableStanza packet : failedStanzas) {
if (packet instanceof MessagePacket) { if (packet instanceof MessagePacket) {
MessagePacket message = (MessagePacket) packet; MessagePacket message = (MessagePacket) packet;
mXmppConnectionService.markMessage(account, mXmppConnectionService.markMessage(account,
@ -546,8 +548,10 @@ public class XmppConnection implements Runnable {
final Element ack = tagReader.readElement(nextTag); final Element ack = tagReader.readElement(nextTag);
lastPacketReceived = SystemClock.elapsedRealtime(); lastPacketReceived = SystemClock.elapsedRealtime();
try { try {
final int serverSequence = Integer.parseInt(ack.getAttribute("h")); synchronized (this.mStanzaQueue) {
acknowledgeStanzaUpTo(serverSequence); final int serverSequence = Integer.parseInt(ack.getAttribute("h"));
acknowledgeStanzaUpTo(serverSequence);
}
} catch (NumberFormatException | NullPointerException e) { } catch (NumberFormatException | NullPointerException e) {
Log.d(Config.LOGTAG,account.getJid().toBareJid()+": server send ack without sequence number"); Log.d(Config.LOGTAG,account.getJid().toBareJid()+": server send ack without sequence number");
} }
@ -556,7 +560,9 @@ public class XmppConnection implements Runnable {
try { try {
final int serverCount = Integer.parseInt(failed.getAttribute("h")); final int serverCount = Integer.parseInt(failed.getAttribute("h"));
Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": resumption failed but server acknowledged stanza #"+serverCount); Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": resumption failed but server acknowledged stanza #"+serverCount);
acknowledgeStanzaUpTo(serverCount); synchronized (this.mStanzaQueue) {
acknowledgeStanzaUpTo(serverCount);
}
} catch (NumberFormatException | NullPointerException e) { } catch (NumberFormatException | NullPointerException e) {
Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": resumption failed"); Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": resumption failed");
} }
@ -1020,8 +1026,10 @@ public class XmppConnection implements Runnable {
if (smVersion != 0) { if (smVersion != 0) {
final EnablePacket enable = new EnablePacket(smVersion); final EnablePacket enable = new EnablePacket(smVersion);
tagWriter.writeStanzaAsync(enable); tagWriter.writeStanzaAsync(enable);
stanzasSent = 0; synchronized (this.mStanzaQueue) {
mStanzaQueue.clear(); stanzasSent = 0;
mStanzaQueue.clear();
}
} }
features.carbonsEnabled = false; features.carbonsEnabled = false;
features.blockListRequested = false; features.blockListRequested = false;
@ -1256,8 +1264,10 @@ public class XmppConnection implements Runnable {
tagWriter.writeStanzaAsync(packet); tagWriter.writeStanzaAsync(packet);
if (packet instanceof AbstractAcknowledgeableStanza) { if (packet instanceof AbstractAcknowledgeableStanza) {
AbstractAcknowledgeableStanza stanza = (AbstractAcknowledgeableStanza) packet; AbstractAcknowledgeableStanza stanza = (AbstractAcknowledgeableStanza) packet;
++stanzasSent; synchronized (this.mStanzaQueue) {
this.mStanzaQueue.put(stanzasSent, stanza); ++stanzasSent;
this.mStanzaQueue.append(stanzasSent, stanza);
}
if (stanza instanceof MessagePacket && stanza.getId() != null && getFeatures().sm()) { if (stanza instanceof MessagePacket && stanza.getId() != null && getFeatures().sm()) {
if (Config.EXTENDED_SM_LOGGING) { if (Config.EXTENDED_SM_LOGGING) {
Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": requesting ack for message stanza #" + stanzasSent); Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": requesting ack for message stanza #" + stanzasSent);