Size: a a a

2020 February 18

Н

Никита in Moscow Spark
я через shell запускаю
источник

PK

Pavel Klemenkov in Moscow Spark
Никита
я через shell запускаю
UI от этого никуда не девается
источник

AV

Artyom Vybornov in Moscow Spark
agathis
а вот это разве не коммит "руками"?
(пример из спарковой доки)
stream.foreachRDD { rdd =>
 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 // some time later, after outputs have completed
 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

в чем вообще смысл хранить оффсет "снаружи" (при том, что хранится не история, а один последний оффсет)
Для пресловутого exactly once нужно атомарно сохранить новые отступы вместе с данными. Поэтому и приходится вручную озаботится из сохранением.
источник

РП

Роман Пашкевич... in Moscow Spark
Коллеги.
А такой вопрос про SPARK.

Зависит ли скорость записи в HIVE от кол-ва экзекьюторов?

В конфиге:
.config('spark.executor.cores', '4')
.config('spark.executor.memory', '8G')

Правильно ли я понимаю?
- Если кусок данных меньше 8 GB, он будет выполняться 1м экзекьютором.
- При этом 1 экзекьютор максимум будет работать в 4 ядра (4 потока?)
источник

РП

Роман Пашкевич... in Moscow Spark
пишу в hive write.mode('append').format('orc').saveAsTable(table)
источник

R

Renarde in Moscow Spark
Роман Пашкевич
Коллеги.
А такой вопрос про SPARK.

Зависит ли скорость записи в HIVE от кол-ва экзекьюторов?

В конфиге:
.config('spark.executor.cores', '4')
.config('spark.executor.memory', '8G')

Правильно ли я понимаю?
- Если кусок данных меньше 8 GB, он будет выполняться 1м экзекьютором.
- При этом 1 экзекьютор максимум будет работать в 4 ядра (4 потока?)
неверно, играет роль параметр spark.shuffle.sql.partitions и количество партиций в исходном DataFrame.
источник

R

Renarde in Moscow Spark
допустим, у вас есть df.
Если вы сделаете:
df.coalesce(1).write…
запись всегда будет идти через одно ядро
источник

R

Renarde in Moscow Spark
для начала нужно посмотреть сколько у вас партиций в df-объекте (это зависит от того,  из чего он получается):
println(df.rdd.numPartitions)
источник

R

Renarde in Moscow Spark
допустим партиций 200 штук (что-то мне подсказывает что их именно столько, потому что spark.sql.shuffle.partitions по умолчанию 200 стоит).
тогда ваш DF при расчете нарезается на 200 кусочков, которые потом по очереди проходят через MxN параллельных процессов (где M- число cores per executor, N - число executors)
источник

РП

Роман Пашкевич... in Moscow Spark
Я забираю кусок данных. Он может быть от 1Мб до 10Гб и кладу в DF. Потом DF пишется на кластер.

spark.sql.shuffle.partitions - в конфиге отсутствует. Так что наверно он по-умолчанию. Посмотрю как отработает цикл.

Т.е. DF потом нарежется на N партиций, и в любом случае они пишутся по очереди?
источник

R

Renarde in Moscow Spark
Роман Пашкевич
Я забираю кусок данных. Он может быть от 1Мб до 10Гб и кладу в DF. Потом DF пишется на кластер.

spark.sql.shuffle.partitions - в конфиге отсутствует. Так что наверно он по-умолчанию. Посмотрю как отработает цикл.

Т.е. DF потом нарежется на N партиций, и в любом случае они пишутся по очереди?
а откуда забираете? файловая система, kafka, другая БД?
источник

PK

Pavel Klemenkov in Moscow Spark
Роман Пашкевич
Я забираю кусок данных. Он может быть от 1Мб до 10Гб и кладу в DF. Потом DF пишется на кластер.

spark.sql.shuffle.partitions - в конфиге отсутствует. Так что наверно он по-умолчанию. Посмотрю как отработает цикл.

Т.е. DF потом нарежется на N партиций, и в любом случае они пишутся по очереди?
Нет они пишутся в столько потоков, сколько ядер выделено вышей спарк джобе, при условии, что партиций в датафрейме не меньше
источник

R

Renarde in Moscow Spark
источник

R

Renarde in Moscow Spark
оригинальный DF - 8 партиций.
делаем coalesce(1) - будет писать строго одним таском.
делаем без coalesce - будет писать в 8 параллельных тасков (сколько кластер дает).
делаем с repartition(200) - будет писать по частям 200 партиций (200 тасков, по 8 тасков в параллель).
источник

R

Renarde in Moscow Spark
однако, не стоит выкручивать количество партций до 200 сходу, потому что нагрузка неравномерная.
Если на вход прилетит 1МБ датасет, при использовании repartition(200) его тоже нарежет на 200 кусочков, и вы получите small files problem.
источник

РП

Роман Пашкевич... in Moscow Spark
Renarde
а откуда забираете? файловая система, kafka, другая БД?
Другая БД. HANA
Пытаюсь понять. Текущий мой конфиг оптимален для записи или нет. И как его оптимизировать.
источник

R

Renarde in Moscow Spark
Роман Пашкевич
Другая БД. HANA
Пытаюсь понять. Текущий мой конфиг оптимален для записи или нет. И как его оптимизировать.
а читаете вы по jdbc, как я понимаю?
надо посмотреть как настроен spark.read.jdbc - есть ли у него опции partitionColumn, и сколько numPartitions установлено
источник

РП

Роман Пашкевич... in Moscow Spark
Renarde
а читаете вы по jdbc, как я понимаю?
надо посмотреть как настроен spark.read.jdbc - есть ли у него опции partitionColumn, и сколько numPartitions установлено
Да, jdbc.
Никак не настраивал.

Вот так читаю.

DF = spark.read \
   .format("jdbc") \
   .option("url", "jdbc:sap://sap.ru:30215") \
   .option("dbtable", 'SAPBI2."table"' ) \
   .option("user", "user") \
   .option("password", "pass") \
   .option("driver", "com.sap.db.jdbc.Driver") \
   .load()
источник

R

Renarde in Moscow Spark
тогда DF будет иметь ровно одну партицию, насколько я помню
источник

R

Renarde in Moscow Spark
то есть у вас чтение однопоточное, для начала. тут можно пошаманить с partitionColumn опциями (чтобы отправлять несколько параллельных селектов).
вот тут механика partitionColumn задокументирована
https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
источник