SyncProtocol.cpp   SyncProtocol.cpp 
skipping to change at line 73 skipping to change at line 73
if(writtenSize > SYNC_MAX_PAYLOAD_SIZE) if(writtenSize > SYNC_MAX_PAYLOAD_SIZE)
{ {
//crash here when message is too large in debugging //crash here when message is too large in debugging
Q_ASSERT(true); Q_ASSERT(true);
return 0; return 0;
} }
else else
{ {
//write header in front //write header in front
SyncHeader header = { getMessageType(), static_cast<tPayload Size>(writtenSize) }; SyncHeader header = { (quint8)getMessageType(), static_cast< tPayloadSize>(writtenSize) };
tmpStream.device()->seek(0); tmpStream.device()->seek(0);
tmpStream<<header; tmpStream<<header;
return totalSize; return totalSize;
} }
} }
void SyncMessage::serialize(QDataStream &stream) const void SyncMessage::serialize(QDataStream &stream) const
{ {
Q_UNUSED(stream); Q_UNUSED(stream);
skipping to change at line 104 skipping to change at line 104
stream<<str.toUtf8(); stream<<str.toUtf8();
} }
QString SyncMessage::readString(QDataStream &stream) QString SyncMessage::readString(QDataStream &stream)
{ {
QByteArray arr; QByteArray arr;
stream>>arr; stream>>arr;
return QString::fromUtf8(arr); return QString::fromUtf8(arr);
} }
SyncRemotePeer::SyncRemotePeer()
: isValid(false)
{
}
SyncRemotePeer::SyncRemotePeer(QAbstractSocket *socket, bool isServer, cons t QVector<SyncMessageHandler *> &handlerList) SyncRemotePeer::SyncRemotePeer(QAbstractSocket *socket, bool isServer, cons t QVector<SyncMessageHandler *> &handlerList)
: isValid(true), sock(socket), isPeerAServer(isServer), isAuthentica ted(false), authResponseSent(false), waitingForBody(false), : sock(socket), stream(sock), expectDisconnect(false), isPeerAServer (isServer), authenticated(false), authResponseSent(false), waitingForBody(f alse),
handlerList(handlerList) handlerList(handlerList)
{ {
Q_ASSERT(sock);
sock->setParent(this); //reparent
sock->setSocketOption(QAbstractSocket::LowDelayOption, 1);
stream.setVersion(SYNC_DATASTREAM_VERSION);
connect(sock, SIGNAL(readyRead()), this, SLOT(receiveMessage()));
connect(sock, SIGNAL(disconnected()), this, SLOT(sockDisconnected())
);
connect(sock, SIGNAL(error(QAbstractSocket::SocketError)), this, SLO
T(sockError(QAbstractSocket::SocketError)));
connect(sock, SIGNAL(stateChanged(QAbstractSocket::SocketState)), th
is, SLOT(sockStateChanged(QAbstractSocket::SocketState)));
// silence CoverityScan...
msgHeader.msgType=SyncProtocol::ERROR;
msgHeader.dataSize=0;
lastReceiveTime = QDateTime::currentMSecsSinceEpoch(); lastReceiveTime = QDateTime::currentMSecsSinceEpoch();
lastSendTime = lastReceiveTime; lastSendTime = lastReceiveTime;
msgWriteBuffer.reserve(SYNC_MAX_MESSAGE_SIZE); msgWriteBuffer.reserve(SYNC_MAX_MESSAGE_SIZE);
if(!isServer)
id = QUuid::createUuid();
} }
void SyncRemotePeer::receiveMessage() SyncRemotePeer::~SyncRemotePeer()
{
peerLog()<<"Destroyed";
delete sock;
}
void SyncRemotePeer::checkTimeout()
{
if(sock->state() == QAbstractSocket::UnconnectedState || sock->state
() == QAbstractSocket::ClosingState)
return;
qint64 currentTime = QDateTime::currentMSecsSinceEpoch();
qint64 writeDiff = currentTime - lastSendTime;
qint64 readDiff = currentTime - lastReceiveTime;
if(writeDiff > 5000 && authenticated) //only send ALIVE to authentic
ated peers
{
//no data sent to this peer for some time, send a ALIVE
Alive msg;
writeMessage(msg);
}
if(readDiff > 15000)
{
//no data received for some time, assume client timed out
peerLog(QString("No data received for %1ms, timing out").arg
(readDiff));
errorString = "Connection timed out";
if(sock->state()==QAbstractSocket::ConnectedState)
sock->disconnectFromHost();
else
{
sock->abort();
sockDisconnected();
}
}
}
void SyncRemotePeer::disconnectPeer()
{
expectDisconnect = true;
if(sock->state()==QAbstractSocket::ConnectedState)
sock->disconnectFromHost();
else if(sock->state()!=QAbstractSocket::UnconnectedState)
{
sock->abort();
sockDisconnected();
}
}
void SyncRemotePeer::sockDisconnected()
{
peerLog()<<"Socket disconnected";
emit disconnected(expectDisconnect);
}
void SyncRemotePeer::sockError(QAbstractSocket::SocketError err)
{ {
QDataStream dataStream(sock); errorString = sock->errorString();
dataStream.setVersion(SYNC_DATASTREAM_VERSION); peerLog()<<"Socket error:"<<errorString;
if(err == QAbstractSocket::RemoteHostClosedError) //handle remote cl
ose as normal disconnect
expectDisconnect = true;
if(sock->state()==QAbstractSocket::ConnectedState) // it is still co
nnected, wait for automatic disconnect
sock->disconnectFromHost();
else if(sock->state()==QAbstractSocket::UnconnectedState) //in this
case, we have to emit the signal manually
sockDisconnected();
}
void SyncRemotePeer::sockStateChanged(QAbstractSocket::SocketState state)
{
peerLog()<<"Socket state:"<<state;
}
void SyncRemotePeer::receiveMessage()
{
//to debug read buffer contents, uncomment //to debug read buffer contents, uncomment
//QByteArray peekData = sock->peek(SYNC_MAX_MESSAGE_SIZE); //QByteArray peekData = sock->peek(SYNC_MAX_MESSAGE_SIZE);
lastReceiveTime = QDateTime::currentMSecsSinceEpoch(); lastReceiveTime = QDateTime::currentMSecsSinceEpoch();
//This loop is required to make sure all pending data is read (i.e. multiple messages may be queued up) //This loop is required to make sure all pending data is read (i.e. multiple messages may be queued up)
while(sock->bytesAvailable()>0) while(sock->bytesAvailable()>0)
{ {
if(!waitingForBody) if(!waitingForBody)
{ {
//we use the socket's read buffer to wait until a fu ll packet is available //we use the socket's read buffer to wait until a fu ll packet is available
if(sock->bytesAvailable() < SYNC_HEADER_SIZE) if(sock->bytesAvailable() < SYNC_HEADER_SIZE)
return; return;
dataStream>>msgHeader; stream>>msgHeader;
//check if msgtype is valid //check if msgtype is valid
if(msgHeader.msgType>MSGTYPE_MAX) if(msgHeader.msgType>MSGTYPE_MAX)
{ {
writeError("invalid message type " + QString ::number(msgHeader.msgType)); writeError("invalid message type " + QString ::number(msgHeader.msgType));
return; return;
} }
if(!isAuthenticated && msgHeader.msgType > SERVER_CH ALLENGERESPONSEVALID) if(!authenticated && msgHeader.msgType > SERVER_CHAL LENGERESPONSEVALID)
{ {
//if not fully authenticated, it is an error to send messages other than auth messages //if not fully authenticated, it is an error to send messages other than auth messages
writeError("not authenticated"); writeError("not authenticated");
return; return;
} }
qDebug()<<"received header for "<<SyncMessageType(ms gHeader.msgType); peerLog()<<"received header for"<<SyncMessageType(ms gHeader.msgType);
} }
if(sock->bytesAvailable() < msgHeader.dataSize) if(sock->bytesAvailable() < msgHeader.dataSize)
{ {
waitingForBody = true; waitingForBody = true;
return; return;
} }
else else
{ {
waitingForBody = false; waitingForBody = false;
qDebug()<<"received body, processing"; peerLog()<<"received body, processing";
//full packet available, pass to handler //full packet available, pass to handler
SyncMessageHandler* handler = handlerList[msgHeader. msgType]; SyncMessageHandler* handler = handlerList[msgHeader. msgType];
if(!handler) if(!handler)
{ {
//no handler registered on this end for this msgtype //no handler registered on this end for this msgtype
writeError("unregistered message type " + QS tring::number(msgHeader.msgType)); writeError("unregistered message type " + QS tring::number(msgHeader.msgType));
return; return;
} }
if(!handlerList[msgHeader.msgType]->handleMessage(da taStream,*this)) if(!handlerList[msgHeader.msgType]->handleMessage(st ream, msgHeader.dataSize, *this))
{ {
writeError("last message of type " + QString ::number(msgHeader.msgType) + " was rejected"); writeError("last message of type " + QString ::number(msgHeader.msgType) + " was rejected");
} }
} }
} }
}
qDebug()<<"Available bytes after receiveMessage"<<sock->bytesAvailab void SyncRemotePeer::peerLog(const QString &msg) const
le(); {
peerLog()<<msg;
} }
void SyncRemotePeer::peerLog(const QString &msg) QDebug SyncRemotePeer::peerLog() const
{ {
qDebug()<<"[Sync][Peer"<<(sock->peerAddress().toString() + ":" + QSt ring::number(sock->peerPort()))<<"]:"<<msg; return qDebug()<<"[Sync][Peer"<<(sock->peerAddress().toString() + ": " + QString::number(sock->peerPort()))<<"]:";
} }
void SyncRemotePeer::writeMessage(const SyncMessage &msg) void SyncRemotePeer::writeMessage(const SyncMessage &msg)
{ {
qint64 size = msg.createFullMessage(msgWriteBuffer); qint64 size = msg.createFullMessage(msgWriteBuffer);
qDebug()<<"[SyncPeer] Send message"<<msg.getMessageType(); peerLog()<<"Send message"<<msg;
if(!size) if(!size)
{ {
//crash here when message is too large in debugging //crash here when message is too large in debugging
Q_ASSERT(true); Q_ASSERT(true);
qCritical()<<"[SyncPlugin] A message is too large for sendin g! Message buffer contents follow..."; qCritical()<<"[SyncPlugin] A message is too large for sendin g! Message buffer contents follow...";
qCritical()<<msgWriteBuffer.toHex(); qCritical()<<msgWriteBuffer.toHex();
//disconnect the client //disconnect the client
writeError("next pending message too large"); writeError("next pending message too large");
} }
skipping to change at line 213 skipping to change at line 298
{ {
writeData(msgWriteBuffer,size); writeData(msgWriteBuffer,size);
} }
} }
void SyncRemotePeer::writeData(const QByteArray &data, int size) void SyncRemotePeer::writeData(const QByteArray &data, int size)
{ {
//Only write if connected //Only write if connected
if(sock->state() == QAbstractSocket::ConnectedState) if(sock->state() == QAbstractSocket::ConnectedState)
{ {
sock->write(data.constData(),size>0?size:data.size()); stream.writeRawData(data.constData(),size>0?size:data.size()
//flush immediately if possible to reduce delay );
sock->flush();
lastSendTime = QDateTime::currentMSecsSinceEpoch(); lastSendTime = QDateTime::currentMSecsSinceEpoch();
} }
else else
peerLog("Can't write message, not connected"); peerLog("Can't write message, not connected");
} }
void SyncRemotePeer::writeError(const QString &err) void SyncRemotePeer::writeError(const QString &err)
{ {
qWarning()<<"[SyncPlugin] Disconnecting with error:"<<err; qWarning()<<"[SyncPlugin] Disconnecting with error:"<<err;
writeMessage(ErrorMessage(err)); writeMessage(ErrorMessage(err));
errorString = err;
sock->disconnectFromHost(); sock->disconnectFromHost();
} }
 End of changes. 20 change blocks. 
24 lines changed or deleted 117 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/