Size: a a a

2019 June 24

НГ

Наиль Гилазиев in Kotlin JVM
Коллеги, доброго дня! Нужна ваша консультация.
Отлаживаю работу ktor server websocket в краевых ситуациях, когда сеть себя плохо ведёт, когда происходит некий race condition с закрытием сокета, (клиент закрывает сокет [FIN,ACK] а сервер пытается написать[будет RST] и тд.

Столкнулся с непониманием:
suspend outgoing.send функция имеет очень вероятностное описание:

/** Enqueue frame, may suspend if outgoing queue is full. May throw an exception if outgoing channel is already closed so it is impossible to transfer any message. Frames that were sent after close frame could be silently ignored. Please note that close frame could be sent automatically in reply to a peer close frame unless it is  raw websocket session.
    */

есть еще метод  suspend  flush()

/**  Flush all outstanding messages and suspend until all earlier sent messages will be written. Could be called at any time even after close. May return immediately if the connection is already terminated.  However it may also fail with an exception (or cancellation) at any point due to session failure.  Please note that [flush] doesn't guarantee that frames were actually delivered.
    */

Но по моим наблюдениям метод send засыпает и ждет подтверждения ACK от клиента.  (тестовый стенд - простой сервер который в цикле раз в секунду шлет сообщение клиенту - delay + send)
По поведению не видно, что send делает enqueue в какую-то очередь абстрагированную от TCP слоя. При таком поведении не понятно предназначение метода  flush, ведь send делает все за него.

При таком поведении становится невозможным пользоваться вот таким кодом для рассылки данных между подключенными клиентами:

// Iterate over all the connections
for (conn in wsConnections) {
     conn.outgoing.send(Frame.Text(text))
}
 
ps Код приведен из документации по websocket с сайта ktor

смысл: broadcast неких данных всем "подписчикам"

В итоге из-за нескольких очень медленных клиентов этот for будет очень медленно работать, так как каждый outgoing.send будет дожидаться ответа.  а в некоторых ситуациях и вовсе будет suspend почти на вечно (но это рассказ для отдельного issue, пока не берем это в расчет)

заворачивать каждый send в async это не вариант.
Вообщем не хватает матчасти, нужно больше тестов, больше чтения кода и понимания. Может кто-то сможет дать дельные советы.
источник

НГ

Наиль Гилазиев in Kotlin JVM
Телеграмм при переключении между десктоп и мобильным клиентом нещадно побил мой подготовленный текст и потерял некоторые параграфы. Восстановил на скорою руку.
источник

VP

Vladimir Petrakovich in Kotlin JVM
Наиль Гилазиев
Коллеги, доброго дня! Нужна ваша консультация.
Отлаживаю работу ktor server websocket в краевых ситуациях, когда сеть себя плохо ведёт, когда происходит некий race condition с закрытием сокета, (клиент закрывает сокет [FIN,ACK] а сервер пытается написать[будет RST] и тд.

Столкнулся с непониманием:
suspend outgoing.send функция имеет очень вероятностное описание:

/** Enqueue frame, may suspend if outgoing queue is full. May throw an exception if outgoing channel is already closed so it is impossible to transfer any message. Frames that were sent after close frame could be silently ignored. Please note that close frame could be sent automatically in reply to a peer close frame unless it is  raw websocket session.
    */

есть еще метод  suspend  flush()

/**  Flush all outstanding messages and suspend until all earlier sent messages will be written. Could be called at any time even after close. May return immediately if the connection is already terminated.  However it may also fail with an exception (or cancellation) at any point due to session failure.  Please note that [flush] doesn't guarantee that frames were actually delivered.
    */

Но по моим наблюдениям метод send засыпает и ждет подтверждения ACK от клиента.  (тестовый стенд - простой сервер который в цикле раз в секунду шлет сообщение клиенту - delay + send)
По поведению не видно, что send делает enqueue в какую-то очередь абстрагированную от TCP слоя. При таком поведении не понятно предназначение метода  flush, ведь send делает все за него.

При таком поведении становится невозможным пользоваться вот таким кодом для рассылки данных между подключенными клиентами:

// Iterate over all the connections
for (conn in wsConnections) {
     conn.outgoing.send(Frame.Text(text))
}
 
ps Код приведен из документации по websocket с сайта ktor

смысл: broadcast неких данных всем "подписчикам"

В итоге из-за нескольких очень медленных клиентов этот for будет очень медленно работать, так как каждый outgoing.send будет дожидаться ответа.  а в некоторых ситуациях и вовсе будет suspend почти на вечно (но это рассказ для отдельного issue, пока не берем это в расчет)

заворачивать каждый send в async это не вариант.
Вообщем не хватает матчасти, нужно больше тестов, больше чтения кода и понимания. Может кто-то сможет дать дельные советы.
Про flush ничего не скажу, а что касается broadcast - ну так тут поведение аналогично каналам. А почему async - не вариант?
источник

НГ

Наиль Гилазиев in Kotlin JVM
Поправлюсь: мне показалось, что это не вариант - так как это костыль, чтобы обойти проблему, костыль имеющий скорее всего перфоманс последствия. Ведь судя по документации метода send -  должно происходить enqueue в некую очередь, и may suspend if outgoing queue is full - значит это queue должна быть абстагирована от TCP уровня.
+ наличие метода flush тоже в пользу этого говорит.

Но по каждому из пунктов, я могу заблуждаться, поэтому обращаюсь ко всем
источник

VP

Vladimir Petrakovich in Kotlin JVM
Наиль Гилазиев
Поправлюсь: мне показалось, что это не вариант - так как это костыль, чтобы обойти проблему, костыль имеющий скорее всего перфоманс последствия. Ведь судя по документации метода send -  должно происходить enqueue в некую очередь, и may suspend if outgoing queue is full - значит это queue должна быть абстагирована от TCP уровня.
+ наличие метода flush тоже в пользу этого говорит.

Но по каждому из пунктов, я могу заблуждаться, поэтому обращаюсь ко всем
Абстрагирован скорее TCP уровень от вас, а какими средствами - никакими или дополнительной очередью - вроде бы не так важно.
Предположим, что там очередь (канал). Что тогда отличается?
источник

НГ

Наиль Гилазиев in Kotlin JVM
от этого зависит как правильно написать код для broadcast'a. Поэтому хотелось узнать где же правда, чтобы правильное решение принять.
Если метод send не абстрагировали от TCP слоя, тогда нужно учитывать все его нюансы. И чтобы защититься от "медленных клиентов", тогда придется писать код рассылки не так как приведено в документации
for (conn in wsConnections) {
     conn.outgoing.send(Frame.Text(text))
}

а вот так, заворачивая каждую отсылку в async
for (conn in wsConnections) {
     async {
           conn.outgoing.send(Frame.Text(text))
     }
}


Правильно ли это? Как это скажется на перфомансе, ведь на каждую отправку каждому клиенту будет создаваться async. И в случае подобия чата это будут большие числа.

Меня терзают сомнения, что это правильное решение...
источник

U

Unat in Kotlin JVM
async спроектирован работать именно так
источник

U

Unat in Kotlin JVM
это не создание треда, он задуман именно для этих целей
источник

НГ

Наиль Гилазиев in Kotlin JVM
да, понимаю, сомнения-то просто появились из-за документации метода send и метода flush
источник

U

Unat in Kotlin JVM
Единственное, надо на IO пул выкинуть эту рассылку
источник

U

Unat in Kotlin JVM
ну, если это ещё не сделано
источник

U

Unat in Kotlin JVM
А то дефолтный диспатчер может и подзабить
источник

VP

Vladimir Petrakovich in Kotlin JVM
Наиль Гилазиев
от этого зависит как правильно написать код для broadcast'a. Поэтому хотелось узнать где же правда, чтобы правильное решение принять.
Если метод send не абстрагировали от TCP слоя, тогда нужно учитывать все его нюансы. И чтобы защититься от "медленных клиентов", тогда придется писать код рассылки не так как приведено в документации
for (conn in wsConnections) {
     conn.outgoing.send(Frame.Text(text))
}

а вот так, заворачивая каждую отсылку в async
for (conn in wsConnections) {
     async {
           conn.outgoing.send(Frame.Text(text))
     }
}


Правильно ли это? Как это скажется на перфомансе, ведь на каждую отправку каждому клиенту будет создаваться async. И в случае подобия чата это будут большие числа.

Меня терзают сомнения, что это правильное решение...
Вам в любом случае придётся что-то делать с медленными клиентами, дополнительная очередь и её расширение только отсрочит провал.
Я бы завернул эту рассылку в coroutineScope с таймаутом. А если он выстрелит, что-то придётся решать.
источник

VP

Vladimir Petrakovich in Kotlin JVM
Unat
Единственное, надо на IO пул выкинуть эту рассылку
Зачем IO, там же всё неблокирующее должно быть
источник

U

Unat in Kotlin JVM
А, ну ок.
источник

U

Unat in Kotlin JVM
Тогда не надо IO
источник

BP

Bogdan Panchenko in Kotlin JVM
а как в ktor init подключить kotlin.serializations ?
источник

BP

Bogdan Panchenko in Kotlin JVM
или только вручную ?
источник

AM

Andrew Mikhaylov in Kotlin JVM
Дл сервера или для клиента?
источник

AM

Andrew Mikhaylov in Kotlin JVM
В клиенте через JsonFeature поддержка из коробки есть, для сервера и ContentNegotiation только руками конвертер писать, к сожалению. Последний раз я его руками на основании обычной мапы с регистрацией всего и вся руками делал, сейчас, возможно, это через SerialModule можно сделать проще.
источник