do not call listeners while being synchronized on stanza queue

This commit is contained in:
Daniel Gultsch 2018-07-16 20:42:17 +02:00
parent 79fe1c8972
commit 2cc601ee36
3 changed files with 39 additions and 13 deletions

View File

@ -245,15 +245,19 @@ public class XmppConnectionService extends Service {
private final OnMessageAcknowledged mOnMessageAcknowledgedListener = new OnMessageAcknowledged() { private final OnMessageAcknowledged mOnMessageAcknowledgedListener = new OnMessageAcknowledged() {
@Override @Override
public void onMessageAcknowledged(Account account, String uuid) { public boolean onMessageAcknowledged(Account account, String uuid) {
for (final Conversation conversation : getConversations()) { for (final Conversation conversation : getConversations()) {
if (conversation.getAccount() == account) { if (conversation.getAccount() == account) {
Message message = conversation.findUnsentMessageWithUuid(uuid); Message message = conversation.findUnsentMessageWithUuid(uuid);
if (message != null) { if (message != null) {
markMessage(message, Message.STATUS_SEND); message.setStatus(Message.STATUS_SEND);
message.setErrorMessage(null);
databaseBackend.updateMessage(message, false);
return true;
} }
} }
} }
return false;
} }
}; };
@ -1093,10 +1097,13 @@ public class XmppConnectionService extends Service {
public void scheduleWakeUpCall(int seconds, int requestCode) { public void scheduleWakeUpCall(int seconds, int requestCode) {
final long timeToWake = SystemClock.elapsedRealtime() + (seconds < 0 ? 1 : seconds + 1) * 1000; final long timeToWake = SystemClock.elapsedRealtime() + (seconds < 0 ? 1 : seconds + 1) * 1000;
final AlarmManager alarmManager = (AlarmManager) getSystemService(Context.ALARM_SERVICE); final AlarmManager alarmManager = (AlarmManager) getSystemService(Context.ALARM_SERVICE);
Intent intent = new Intent(this, EventReceiver.class); if (alarmManager == null) {
return;
}
final Intent intent = new Intent(this, EventReceiver.class);
intent.setAction("ping"); intent.setAction("ping");
PendingIntent pendingIntent = PendingIntent.getBroadcast(this, requestCode, intent, 0);
try { try {
PendingIntent pendingIntent = PendingIntent.getBroadcast(this, requestCode, intent, 0);
alarmManager.set(AlarmManager.ELAPSED_REALTIME_WAKEUP, timeToWake, pendingIntent); alarmManager.set(AlarmManager.ELAPSED_REALTIME_WAKEUP, timeToWake, pendingIntent);
} catch (RuntimeException e) { } catch (RuntimeException e) {
Log.e(Config.LOGTAG, "unable to schedule alarm for ping", e); Log.e(Config.LOGTAG, "unable to schedule alarm for ping", e);
@ -1107,10 +1114,13 @@ public class XmppConnectionService extends Service {
private void scheduleNextIdlePing() { private void scheduleNextIdlePing() {
final long timeToWake = SystemClock.elapsedRealtime() + (Config.IDLE_PING_INTERVAL * 1000); final long timeToWake = SystemClock.elapsedRealtime() + (Config.IDLE_PING_INTERVAL * 1000);
final AlarmManager alarmManager = (AlarmManager) getSystemService(Context.ALARM_SERVICE); final AlarmManager alarmManager = (AlarmManager) getSystemService(Context.ALARM_SERVICE);
Intent intent = new Intent(this, EventReceiver.class); if (alarmManager == null) {
return;
}
final Intent intent = new Intent(this, EventReceiver.class);
intent.setAction(ACTION_IDLE_PING); intent.setAction(ACTION_IDLE_PING);
PendingIntent pendingIntent = PendingIntent.getBroadcast(this, 0, intent, 0);
try { try {
PendingIntent pendingIntent = PendingIntent.getBroadcast(this, 0, intent, 0);
alarmManager.setAndAllowWhileIdle(AlarmManager.ELAPSED_REALTIME_WAKEUP, timeToWake, pendingIntent); alarmManager.setAndAllowWhileIdle(AlarmManager.ELAPSED_REALTIME_WAKEUP, timeToWake, pendingIntent);
} catch (RuntimeException e) { } catch (RuntimeException e) {
Log.d(Config.LOGTAG, "unable to schedule alarm for idle ping", e); Log.d(Config.LOGTAG, "unable to schedule alarm for idle ping", e);
@ -3474,6 +3484,8 @@ public class XmppConnectionService extends Service {
final XmppConnection connection = account.getXmppConnection(); final XmppConnection connection = account.getXmppConnection();
if (connection != null) { if (connection != null) {
connection.sendIqPacket(packet, callback); connection.sendIqPacket(packet, callback);
} else if (callback != null) {
callback.onIqPacketReceived(account,new IqPacket(IqPacket.TYPE.TIMEOUT));
} }
} }

View File

@ -3,5 +3,5 @@ package eu.siacs.conversations.xmpp;
import eu.siacs.conversations.entities.Account; import eu.siacs.conversations.entities.Account;
public interface OnMessageAcknowledged { public interface OnMessageAcknowledged {
public void onMessageAcknowledged(Account account, String id); boolean onMessageAcknowledged(Account account, String id);
} }

View File

@ -574,6 +574,7 @@ public class XmppConnection implements Runnable {
final String h = resumed.getAttribute("h"); final String h = resumed.getAttribute("h");
try { try {
ArrayList<AbstractAcknowledgeableStanza> failedStanzas = new ArrayList<>(); ArrayList<AbstractAcknowledgeableStanza> failedStanzas = new ArrayList<>();
final boolean acknowledgedMessages;
synchronized (this.mStanzaQueue) { synchronized (this.mStanzaQueue) {
final int serverCount = Integer.parseInt(h); final int serverCount = Integer.parseInt(h);
if (serverCount < stanzasSent) { if (serverCount < stanzasSent) {
@ -583,12 +584,15 @@ public class XmppConnection implements Runnable {
} else { } else {
Log.d(Config.LOGTAG, account.getJid().asBareJid().toString() + ": session resumed"); Log.d(Config.LOGTAG, account.getJid().asBareJid().toString() + ": session resumed");
} }
acknowledgeStanzaUpTo(serverCount); acknowledgedMessages = acknowledgeStanzaUpTo(serverCount);
for (int i = 0; i < this.mStanzaQueue.size(); ++i) { for (int i = 0; i < this.mStanzaQueue.size(); ++i) {
failedStanzas.add(mStanzaQueue.valueAt(i)); failedStanzas.add(mStanzaQueue.valueAt(i));
} }
mStanzaQueue.clear(); mStanzaQueue.clear();
} }
if (acknowledgedMessages) {
mXmppConnectionService.updateConversationUi();
}
Log.d(Config.LOGTAG, "resending " + failedStanzas.size() + " stanzas"); Log.d(Config.LOGTAG, "resending " + failedStanzas.size() + " stanzas");
for (AbstractAcknowledgeableStanza packet : failedStanzas) { for (AbstractAcknowledgeableStanza packet : failedStanzas) {
if (packet instanceof MessagePacket) { if (packet instanceof MessagePacket) {
@ -629,9 +633,13 @@ public class XmppConnection implements Runnable {
final Element ack = tagReader.readElement(nextTag); final Element ack = tagReader.readElement(nextTag);
lastPacketReceived = SystemClock.elapsedRealtime(); lastPacketReceived = SystemClock.elapsedRealtime();
try { try {
final boolean acknowledgedMessages;
synchronized (this.mStanzaQueue) { synchronized (this.mStanzaQueue) {
final int serverSequence = Integer.parseInt(ack.getAttribute("h")); final int serverSequence = Integer.parseInt(ack.getAttribute("h"));
acknowledgeStanzaUpTo(serverSequence); acknowledgedMessages = acknowledgeStanzaUpTo(serverSequence);
}
if (acknowledgedMessages) {
mXmppConnectionService.updateConversationUi();
} }
} catch (NumberFormatException | NullPointerException e) { } catch (NumberFormatException | NullPointerException e) {
Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": server send ack without sequence number"); Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": server send ack without sequence number");
@ -641,8 +649,12 @@ public class XmppConnection implements Runnable {
try { try {
final int serverCount = Integer.parseInt(failed.getAttribute("h")); final int serverCount = Integer.parseInt(failed.getAttribute("h"));
Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": resumption failed but server acknowledged stanza #" + serverCount); Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": resumption failed but server acknowledged stanza #" + serverCount);
final boolean acknowledgedMessages;
synchronized (this.mStanzaQueue) { synchronized (this.mStanzaQueue) {
acknowledgeStanzaUpTo(serverCount); acknowledgedMessages = acknowledgeStanzaUpTo(serverCount);
}
if (acknowledgedMessages) {
mXmppConnectionService.updateConversationUi();
} }
} catch (NumberFormatException | NullPointerException e) { } catch (NumberFormatException | NullPointerException e) {
Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": resumption failed"); Log.d(Config.LOGTAG, account.getJid().asBareJid() + ": resumption failed");
@ -663,10 +675,11 @@ public class XmppConnection implements Runnable {
} }
} }
private void acknowledgeStanzaUpTo(int serverCount) { private boolean acknowledgeStanzaUpTo(int serverCount) {
if (serverCount > stanzasSent) { if (serverCount > stanzasSent) {
Log.e(Config.LOGTAG, "server acknowledged more stanzas than we sent. serverCount=" + serverCount + ", ourCount=" + stanzasSent); Log.e(Config.LOGTAG, "server acknowledged more stanzas than we sent. serverCount=" + serverCount + ", ourCount=" + stanzasSent);
} }
boolean acknowledgedMessages = false;
for (int i = 0; i < mStanzaQueue.size(); ++i) { for (int i = 0; i < mStanzaQueue.size(); ++i) {
if (serverCount >= mStanzaQueue.keyAt(i)) { if (serverCount >= mStanzaQueue.keyAt(i)) {
if (Config.EXTENDED_SM_LOGGING) { if (Config.EXTENDED_SM_LOGGING) {
@ -675,12 +688,13 @@ public class XmppConnection implements Runnable {
AbstractAcknowledgeableStanza stanza = mStanzaQueue.valueAt(i); AbstractAcknowledgeableStanza stanza = mStanzaQueue.valueAt(i);
if (stanza instanceof MessagePacket && acknowledgedListener != null) { if (stanza instanceof MessagePacket && acknowledgedListener != null) {
MessagePacket packet = (MessagePacket) stanza; MessagePacket packet = (MessagePacket) stanza;
acknowledgedListener.onMessageAcknowledged(account, packet.getId()); acknowledgedMessages |= acknowledgedListener.onMessageAcknowledged(account, packet.getId());
} }
mStanzaQueue.removeAt(i); mStanzaQueue.removeAt(i);
i--; i--;
} }
} }
return acknowledgedMessages;
} }
private @NonNull private @NonNull