throw exception at the end of the stream

This commit is contained in:
Daniel Gultsch 2015-12-17 15:19:58 +01:00
parent e10a0b0c4c
commit 0f9058ffef
1 changed files with 122 additions and 128 deletions

View File

@ -357,135 +357,129 @@ public class XmppConnection implements Runnable {
} }
private void processStream() throws XmlPullParserException, IOException, NoSuchAlgorithmException { private void processStream() throws XmlPullParserException, IOException, NoSuchAlgorithmException {
Tag nextTag = tagReader.readTag(); Tag nextTag = tagReader.readTag();
while (nextTag != null && !nextTag.isEnd("stream")) { while (nextTag != null && !nextTag.isEnd("stream")) {
if (nextTag.isStart("error")) { if (nextTag.isStart("error")) {
processStreamError(nextTag); processStreamError(nextTag);
} else if (nextTag.isStart("features")) { } else if (nextTag.isStart("features")) {
processStreamFeatures(nextTag); processStreamFeatures(nextTag);
} else if (nextTag.isStart("proceed")) { } else if (nextTag.isStart("proceed")) {
switchOverToTls(nextTag); switchOverToTls(nextTag);
} else if (nextTag.isStart("success")) { } else if (nextTag.isStart("success")) {
final String challenge = tagReader.readElement(nextTag).getContent(); final String challenge = tagReader.readElement(nextTag).getContent();
try { try {
saslMechanism.getResponse(challenge); saslMechanism.getResponse(challenge);
} catch (final SaslMechanism.AuthenticationException e) { } catch (final SaslMechanism.AuthenticationException e) {
disconnect(true); disconnect(true);
Log.e(Config.LOGTAG, String.valueOf(e)); Log.e(Config.LOGTAG, String.valueOf(e));
} }
Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": logged in"); Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": logged in");
account.setKey(Account.PINNED_MECHANISM_KEY, account.setKey(Account.PINNED_MECHANISM_KEY,
String.valueOf(saslMechanism.getPriority())); String.valueOf(saslMechanism.getPriority()));
tagReader.reset(); tagReader.reset();
sendStartStream(); sendStartStream();
final Tag tag = tagReader.readTag(); final Tag tag = tagReader.readTag();
if (tag != null && tag.isStart("stream")) { if (tag != null && tag.isStart("stream")) {
processStream(); processStream();
} else { } else {
throw new IOException("server didn't restart stream after successful auth"); throw new IOException("server didn't restart stream after successful auth");
} }
break; break;
} else if (nextTag.isStart("failure")) { } else if (nextTag.isStart("failure")) {
throw new UnauthorizedException(); throw new UnauthorizedException();
} else if (nextTag.isStart("challenge")) { } else if (nextTag.isStart("challenge")) {
final String challenge = tagReader.readElement(nextTag).getContent(); final String challenge = tagReader.readElement(nextTag).getContent();
final Element response = new Element("response"); final Element response = new Element("response");
response.setAttribute("xmlns", response.setAttribute("xmlns",
"urn:ietf:params:xml:ns:xmpp-sasl"); "urn:ietf:params:xml:ns:xmpp-sasl");
try { try {
response.setContent(saslMechanism.getResponse(challenge)); response.setContent(saslMechanism.getResponse(challenge));
} catch (final SaslMechanism.AuthenticationException e) { } catch (final SaslMechanism.AuthenticationException e) {
// TODO: Send auth abort tag. // TODO: Send auth abort tag.
Log.e(Config.LOGTAG, e.toString()); Log.e(Config.LOGTAG, e.toString());
} }
tagWriter.writeElement(response); tagWriter.writeElement(response);
} else if (nextTag.isStart("enabled")) { } else if (nextTag.isStart("enabled")) {
final Element enabled = tagReader.readElement(nextTag); final Element enabled = tagReader.readElement(nextTag);
if ("true".equals(enabled.getAttribute("resume"))) { if ("true".equals(enabled.getAttribute("resume"))) {
this.streamId = enabled.getAttribute("id"); this.streamId = enabled.getAttribute("id");
Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() Log.d(Config.LOGTAG, account.getJid().toBareJid().toString()
+ ": stream managment(" + smVersion + ": stream managment(" + smVersion
+ ") enabled (resumable)"); + ") enabled (resumable)");
} else { } else {
Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() Log.d(Config.LOGTAG, account.getJid().toBareJid().toString()
+ ": stream management(" + smVersion + ") enabled"); + ": stream management(" + smVersion + ") enabled");
} }
this.stanzasReceived = 0; this.stanzasReceived = 0;
final RequestPacket r = new RequestPacket(smVersion); final RequestPacket r = new RequestPacket(smVersion);
tagWriter.writeStanzaAsync(r); tagWriter.writeStanzaAsync(r);
} else if (nextTag.isStart("resumed")) { } else if (nextTag.isStart("resumed")) {
lastPacketReceived = SystemClock.elapsedRealtime(); lastPacketReceived = SystemClock.elapsedRealtime();
final Element resumed = tagReader.readElement(nextTag); final Element resumed = tagReader.readElement(nextTag);
final String h = resumed.getAttribute("h"); final String h = resumed.getAttribute("h");
try { try {
final int serverCount = Integer.parseInt(h); final int serverCount = Integer.parseInt(h);
if (serverCount != stanzasSent) { if (serverCount != stanzasSent) {
Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() Log.d(Config.LOGTAG, account.getJid().toBareJid().toString()
+ ": session resumed with lost packages"); + ": session resumed with lost packages");
stanzasSent = serverCount; stanzasSent = serverCount;
} else { } else {
Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": session resumed"); Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": session resumed");
} }
acknowledgeStanzaUpTo(serverCount); acknowledgeStanzaUpTo(serverCount);
ArrayList<AbstractAcknowledgeableStanza> failedStanzas = new ArrayList<>(); ArrayList<AbstractAcknowledgeableStanza> failedStanzas = new ArrayList<>();
for(int i = 0; i < this.mStanzaQueue.size(); ++i) { for(int i = 0; i < this.mStanzaQueue.size(); ++i) {
failedStanzas.add(mStanzaQueue.valueAt(i)); failedStanzas.add(mStanzaQueue.valueAt(i));
} }
mStanzaQueue.clear(); mStanzaQueue.clear();
Log.d(Config.LOGTAG,"resending "+failedStanzas.size()+" stanzas"); Log.d(Config.LOGTAG,"resending "+failedStanzas.size()+" stanzas");
for(AbstractAcknowledgeableStanza packet : failedStanzas) { for(AbstractAcknowledgeableStanza packet : failedStanzas) {
if (packet instanceof MessagePacket) { if (packet instanceof MessagePacket) {
MessagePacket message = (MessagePacket) packet; MessagePacket message = (MessagePacket) packet;
mXmppConnectionService.markMessage(account, mXmppConnectionService.markMessage(account,
message.getTo().toBareJid(), message.getTo().toBareJid(),
message.getId(), message.getId(),
Message.STATUS_UNSEND); Message.STATUS_UNSEND);
}
sendPacket(packet);
}
} catch (final NumberFormatException ignored) {
}
Log.d(Config.LOGTAG, account.getJid().toBareJid()+ ": online with resource " + account.getResource());
changeStatus(Account.State.ONLINE);
} else if (nextTag.isStart("r")) {
tagReader.readElement(nextTag);
if (Config.EXTENDED_SM_LOGGING) {
Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": acknowledging stanza #" + this.stanzasReceived);
}
final AckPacket ack = new AckPacket(this.stanzasReceived, smVersion);
tagWriter.writeStanzaAsync(ack);
} else if (nextTag.isStart("a")) {
final Element ack = tagReader.readElement(nextTag);
lastPacketReceived = SystemClock.elapsedRealtime();
try {
final int serverSequence = Integer.parseInt(ack.getAttribute("h"));
acknowledgeStanzaUpTo(serverSequence);
} catch (NumberFormatException e) {
Log.d(Config.LOGTAG,account.getJid().toBareJid()+": server send ack without sequence number");
}
} else if (nextTag.isStart("failed")) {
tagReader.readElement(nextTag);
Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": resumption failed");
streamId = null;
if (account.getStatus() != Account.State.ONLINE) {
sendBindRequest();
}
} else if (nextTag.isStart("iq")) {
processIq(nextTag);
} else if (nextTag.isStart("message")) {
processMessage(nextTag);
} else if (nextTag.isStart("presence")) {
processPresence(nextTag);
}
nextTag = tagReader.readTag();
}
Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": last tag was " + nextTag);
if (account.getStatus() == Account.State.ONLINE) {
account. setStatus(Account.State.OFFLINE);
if (statusListener != null) {
statusListener.onStatusChanged(account);
}
} }
sendPacket(packet);
}
} catch (final NumberFormatException ignored) {
}
Log.d(Config.LOGTAG, account.getJid().toBareJid()+ ": online with resource " + account.getResource());
changeStatus(Account.State.ONLINE);
} else if (nextTag.isStart("r")) {
tagReader.readElement(nextTag);
if (Config.EXTENDED_SM_LOGGING) {
Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": acknowledging stanza #" + this.stanzasReceived);
}
final AckPacket ack = new AckPacket(this.stanzasReceived, smVersion);
tagWriter.writeStanzaAsync(ack);
} else if (nextTag.isStart("a")) {
final Element ack = tagReader.readElement(nextTag);
lastPacketReceived = SystemClock.elapsedRealtime();
try {
final int serverSequence = Integer.parseInt(ack.getAttribute("h"));
acknowledgeStanzaUpTo(serverSequence);
} catch (NumberFormatException e) {
Log.d(Config.LOGTAG,account.getJid().toBareJid()+": server send ack without sequence number");
}
} else if (nextTag.isStart("failed")) {
tagReader.readElement(nextTag);
Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": resumption failed");
streamId = null;
if (account.getStatus() != Account.State.ONLINE) {
sendBindRequest();
}
} else if (nextTag.isStart("iq")) {
processIq(nextTag);
} else if (nextTag.isStart("message")) {
processMessage(nextTag);
} else if (nextTag.isStart("presence")) {
processPresence(nextTag);
}
nextTag = tagReader.readTag();
}
throw new IOException("reached end of stream. last tag was "+nextTag);
} }
private void acknowledgeStanzaUpTo(int serverCount) { private void acknowledgeStanzaUpTo(int serverCount) {