From 0deffef8daf94d0fed83f4249ef0a5356b7b315a Mon Sep 17 00:00:00 2001 From: Daniel Gultsch Date: Thu, 1 Mar 2018 08:27:30 +0100 Subject: [PATCH] fixed concurrent modification when killing mam queries --- .../services/MessageArchiveService.java | 134 +++++++++--------- 1 file changed, 67 insertions(+), 67 deletions(-) diff --git a/src/main/java/eu/siacs/conversations/services/MessageArchiveService.java b/src/main/java/eu/siacs/conversations/services/MessageArchiveService.java index cff6da497..a4dea82ba 100644 --- a/src/main/java/eu/siacs/conversations/services/MessageArchiveService.java +++ b/src/main/java/eu/siacs/conversations/services/MessageArchiveService.java @@ -17,7 +17,6 @@ import eu.siacs.conversations.generator.AbstractGenerator; import eu.siacs.conversations.xml.Namespace; import eu.siacs.conversations.xml.Element; import eu.siacs.conversations.xmpp.OnAdvancedStreamFeaturesLoaded; -import eu.siacs.conversations.xmpp.OnIqPacketReceived; import eu.siacs.conversations.xmpp.jid.Jid; import eu.siacs.conversations.xmpp.mam.MamReference; import eu.siacs.conversations.xmpp.stanzas.IqPacket; @@ -29,18 +28,13 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { private final HashSet queries = new HashSet<>(); private final ArrayList pendingQueries = new ArrayList<>(); - public enum PagingOrder { - NORMAL, - REVERSE - } - - public MessageArchiveService(final XmppConnectionService service) { + MessageArchiveService(final XmppConnectionService service) { this.mXmppConnectionService = service; } private void catchup(final Account account) { synchronized (this.queries) { - for(Iterator iterator = this.queries.iterator(); iterator.hasNext();) { + for (Iterator iterator = this.queries.iterator(); iterator.hasNext(); ) { Query query = iterator.next(); if (query.getAccount() == account) { iterator.remove(); @@ -51,7 +45,7 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { mXmppConnectionService.databaseBackend.getLastMessageReceived(account), mXmppConnectionService.databaseBackend.getLastClearDate(account) ); - mamReference = MamReference.max(mamReference,mXmppConnectionService.getAutomaticMessageDeletionDate()); + mamReference = MamReference.max(mamReference, mXmppConnectionService.getAutomaticMessageDeletionDate()); long endCatchup = account.getXmppConnection().getLastSessionEstablished(); final Query query; if (mamReference.getTimestamp() == 0) { @@ -61,7 +55,7 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { List conversations = mXmppConnectionService.getConversations(); for (Conversation conversation : conversations) { if (conversation.getMode() == Conversation.MODE_SINGLE && conversation.getAccount() == account && startCatchup > conversation.getLastMessageTransmitted().getTimestamp()) { - this.query(conversation,startCatchup,true); + this.query(conversation, startCatchup, true); } } query = new Query(account, new MamReference(startCatchup), endCatchup); @@ -74,7 +68,7 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { this.execute(query); } - public void catchupMUC(final Conversation conversation) { + void catchupMUC(final Conversation conversation) { if (conversation.getLastMessageTransmitted().getTimestamp() < 0 && conversation.countMessages() == 0) { query(conversation, new MamReference(0), @@ -108,7 +102,7 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { return true; } else { synchronized (this.queries) { - for(Query query : this.queries) { + for (Query query : this.queries) { if (query.getAccount() == account && query.isCatchup() && ((conversation.getMode() == Conversation.MODE_SINGLE && query.getWith() == null) || query.getConversation() == conversation)) { return true; } @@ -119,13 +113,13 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { } public Query query(final Conversation conversation, long end, boolean allowCatchup) { - return this.query(conversation,conversation.getLastMessageTransmitted(),end, allowCatchup); + return this.query(conversation, conversation.getLastMessageTransmitted(), end, allowCatchup); } public Query query(Conversation conversation, MamReference start, long end, boolean allowCatchup) { synchronized (this.queries) { final Query query; - final MamReference startActual = MamReference.max(start,mXmppConnectionService.getAutomaticMessageDeletionDate()); + final MamReference startActual = MamReference.max(start, mXmppConnectionService.getAutomaticMessageDeletionDate()); if (start.getTimestamp() == 0) { query = new Query(conversation, startActual, end, false); query.reference = conversation.getFirstMamReference(); @@ -137,7 +131,7 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { this.queries.add(reverseCatchup); this.execute(reverseCatchup); } - query = new Query(conversation, maxCatchup, end, allowCatchup); + query = new Query(conversation, maxCatchup, end, true); } else { query = new Query(conversation, startActual, end, false); } @@ -151,10 +145,10 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { } } - public void executePendingQueries(final Account account) { + void executePendingQueries(final Account account) { List pending = new ArrayList<>(); - synchronized(this.pendingQueries) { - for(Iterator iterator = this.pendingQueries.iterator(); iterator.hasNext();) { + synchronized (this.pendingQueries) { + for (Iterator iterator = this.pendingQueries.iterator(); iterator.hasNext(); ) { Query query = iterator.next(); if (query.getAccount() == account) { pending.add(query); @@ -162,35 +156,32 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { } } } - for(Query query : pending) { + for (Query query : pending) { this.execute(query); } } private void execute(final Query query) { - final Account account= query.getAccount(); + final Account account = query.getAccount(); if (account.getStatus() == Account.State.ONLINE) { Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": running mam query " + query.toString()); IqPacket packet = this.mXmppConnectionService.getIqGenerator().queryMessageArchiveManagement(query); - this.mXmppConnectionService.sendIqPacket(account, packet, new OnIqPacketReceived() { - @Override - public void onIqPacketReceived(Account account, IqPacket packet) { - Element fin = packet.findChild("fin", Namespace.MAM); - if (packet.getType() == IqPacket.TYPE.TIMEOUT) { - synchronized (MessageArchiveService.this.queries) { - MessageArchiveService.this.queries.remove(query); - if (query.hasCallback()) { - query.callback(false); - } + this.mXmppConnectionService.sendIqPacket(account, packet, (a, p) -> { + Element fin = p.findChild("fin", Namespace.MAM); + if (p.getType() == IqPacket.TYPE.TIMEOUT) { + synchronized (MessageArchiveService.this.queries) { + MessageArchiveService.this.queries.remove(query); + if (query.hasCallback()) { + query.callback(false); } - } else if (packet.getType() == IqPacket.TYPE.RESULT && fin != null ) { - processFin(query, fin); - } else if (packet.getType() == IqPacket.TYPE.RESULT && query.isLegacy()) { - //do nothing - } else { - Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": error executing mam: " + packet.toString()); - finalizeQuery(query, true); } + } else if (p.getType() == IqPacket.TYPE.RESULT && fin != null) { + processFin(query, fin); + } else if (p.getType() == IqPacket.TYPE.RESULT && query.isLegacy()) { + //do nothing + } else { + Log.d(Config.LOGTAG, a.getJid().toBareJid().toString() + ": error executing mam: " + p.toString()); + finalizeQuery(query, true); } }); } else { @@ -209,7 +200,7 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { conversation.sort(); conversation.setHasMessagesLeftOnServer(!done); } else { - for(Conversation tmp : this.mXmppConnectionService.getConversations()) { + for (Conversation tmp : this.mXmppConnectionService.getConversations()) { if (tmp.getAccount() == query.getAccount()) { tmp.sort(); } @@ -222,9 +213,9 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { } } - public boolean inCatchup(Account account) { + boolean inCatchup(Account account) { synchronized (this.queries) { - for(Query query : queries) { + for (Query query : queries) { if (query.account == account && query.isCatchup() && query.getWith() == null) { return true; } @@ -233,9 +224,9 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { return false; } - public boolean queryInProgress(Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) { + boolean queryInProgress(Conversation conversation, XmppConnectionService.OnMoreMessagesLoaded callback) { synchronized (this.queries) { - for(Query query : queries) { + for (Query query : queries) { if (query.conversation == conversation) { if (!query.hasCallback() && callback != null) { query.setCallback(callback); @@ -260,7 +251,7 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { private void processFin(Query query, Element fin) { boolean complete = fin.getAttributeAsBoolean("complete"); - Element set = fin.findChild("set","http://jabber.org/protocol/rsm"); + Element set = fin.findChild("set", "http://jabber.org/protocol/rsm"); Element last = set == null ? null : set.findChild("last"); String count = set == null ? null : set.findChildContent("count"); Element first = set == null ? null : set.findChild("first"); @@ -287,9 +278,9 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { done = done || (query.getActualMessageCount() == 0 && !query.isCatchup()); this.finalizeQuery(query, done); - Log.d(Config.LOGTAG,query.getAccount().getJid().toBareJid()+": finished mam after "+query.getTotalCount()+"("+query.getActualMessageCount()+") messages. messages left="+Boolean.toString(!done)+" count="+count); + Log.d(Config.LOGTAG, query.getAccount().getJid().toBareJid() + ": finished mam after " + query.getTotalCount() + "(" + query.getActualMessageCount() + ") messages. messages left=" + Boolean.toString(!done) + " count=" + count); if (query.isCatchup() && query.getActualMessageCount() > 0) { - mXmppConnectionService.getNotificationService().finishBacklog(true,query.getAccount()); + mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount()); } processPostponed(query); } else { @@ -307,33 +298,37 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { } } - public void kill(Conversation conversation) { + void kill(Conversation conversation) { + final ArrayList toBeKilled = new ArrayList<>(); synchronized (this.queries) { for (Query q : queries) { if (q.conversation == conversation) { - kill(q); + toBeKilled.add(q); } } } + for (Query q : toBeKilled) { + kill(q); + } } private void kill(Query query) { - Log.d(Config.LOGTAG,query.getAccount().getJid().toBareJid()+": killing mam query prematurely"); + Log.d(Config.LOGTAG, query.getAccount().getJid().toBareJid() + ": killing mam query prematurely"); query.callback = null; - this.finalizeQuery(query,false); + this.finalizeQuery(query, false); if (query.isCatchup() && query.getActualMessageCount() > 0) { - mXmppConnectionService.getNotificationService().finishBacklog(true,query.getAccount()); + mXmppConnectionService.getNotificationService().finishBacklog(true, query.getAccount()); } this.processPostponed(query); } private void processPostponed(Query query) { query.account.getAxolotlService().processPostponed(); - Log.d(Config.LOGTAG,query.getAccount().getJid().toBareJid()+": found "+query.pendingReceiptRequests.size()+" pending receipt requests"); + Log.d(Config.LOGTAG, query.getAccount().getJid().toBareJid() + ": found " + query.pendingReceiptRequests.size() + " pending receipt requests"); Iterator iterator = query.pendingReceiptRequests.iterator(); while (iterator.hasNext()) { ReceiptRequest rr = iterator.next(); - mXmppConnectionService.sendMessagePacket(query.account,mXmppConnectionService.getMessageGenerator().received(query.account, rr.getJid(),rr.getId())); + mXmppConnectionService.sendMessagePacket(query.account, mXmppConnectionService.getMessageGenerator().received(query.account, rr.getJid(), rr.getId())); iterator.remove(); } } @@ -343,7 +338,7 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { return null; } synchronized (this.queries) { - for(Query query : this.queries) { + for (Query query : this.queries) { if (query.getQueryId().equals(id)) { return query; } @@ -359,7 +354,13 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { } } + public enum PagingOrder { + NORMAL, + REVERSE + } + public class Query { + public HashSet pendingReceiptRequests = new HashSet<>(); private int totalCount = 0; private int actualCount = 0; private int actualInThisQuery = 0; @@ -372,17 +373,16 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { private PagingOrder pagingOrder = PagingOrder.NORMAL; private XmppConnectionService.OnMoreMessagesLoaded callback = null; private boolean catchup = true; - public HashSet pendingReceiptRequests = new HashSet<>(); - public Query(Conversation conversation, MamReference start, long end, boolean catchup) { - this(conversation.getAccount(),catchup ? start : start.timeOnly(),end); + Query(Conversation conversation, MamReference start, long end, boolean catchup) { + this(conversation.getAccount(), catchup ? start : start.timeOnly(), end); this.conversation = conversation; this.pagingOrder = catchup ? PagingOrder.NORMAL : PagingOrder.REVERSE; this.catchup = catchup; } - public Query(Account account, MamReference start, long end) { + Query(Account account, MamReference start, long end) { this.account = account; if (start.getReference() != null) { this.reference = start.getReference(); @@ -392,9 +392,9 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { this.end = end; this.queryId = new BigInteger(50, mXmppConnectionService.getRNG()).toString(32); } - + private Query page(String reference) { - Query query = new Query(this.account,new MamReference(this.start,reference),this.end); + Query query = new Query(this.account, new MamReference(this.start, reference), this.end); query.conversation = conversation; query.totalCount = totalCount; query.actualCount = actualCount; @@ -422,7 +422,7 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { return query; } - public Query prev(String reference) { + Query prev(String reference) { Query query = page(reference); query.pagingOrder = PagingOrder.REVERSE; return query; @@ -462,7 +462,7 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { public void callback(boolean done) { if (this.callback != null) { - this.callback.onMoreMessagesLoaded(actualCount,conversation); + this.callback.onMoreMessagesLoaded(actualCount, conversation); if (done) { this.callback.informUser(R.string.no_more_history_on_server); } @@ -490,11 +490,11 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { this.actualCount++; } - public int getTotalCount() { + int getTotalCount() { return this.totalCount; } - public int getActualMessageCount() { + int getActualMessageCount() { return this.actualCount; } @@ -530,8 +530,8 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { } builder.append(", end="); builder.append(AbstractGenerator.getTimestamp(this.end)); - builder.append(", order="+pagingOrder.toString()); - if (this.reference!=null) { + builder.append(", order=").append(pagingOrder.toString()); + if (this.reference != null) { if (this.pagingOrder == PagingOrder.NORMAL) { builder.append(", after="); } else { @@ -539,11 +539,11 @@ public class MessageArchiveService implements OnAdvancedStreamFeaturesLoaded { } builder.append(this.reference); } - builder.append(", catchup="+Boolean.toString(catchup)); + builder.append(", catchup=").append(Boolean.toString(catchup)); return builder.toString(); } - public boolean hasCallback() { + boolean hasCallback() { return this.callback != null; } }