Quickstart: Como rodar Apache Spark no Kubernetes
Kubernetes?
Desde o seu lançamento em 2014 pela Google, o Kubernetes tem ganhado muita popularidade junto com o próprio Docker e, desde 2016, passou a ser o de facto Container orchestrator, sendo estabelecido como um padrão de mercado e ganhando versões gerenciadas em todas as major Clouds[1] [2] [3] (inclusive na Digital Ocean e Alibaba).
Toda essa popularidade tem atraído novas implementações e use-cases para o orquestrador, dentre eles a execução de Stateful applications e inclusive a tentativa de rodar bancos de dados em containers. Qual seria a necessidade de orquestrar um banco de dados? Ótima pergunta para um outro post. Por hoje, vamos focar na utilização do Spark Operator para executar Spark workloads em Kubernetes.
A idéia de executar nativamente no Kubernetes surgiu em 2016 e antes disso haviam alguns jeitinhos de fazer acontecer. Por exemplo: com Apache Zeppelin ou então, mais refinado ainda com um cluster que usaria o Spark Standalone mode.
Porém, executar nativamente seria muito mais interessante e a idéia cresceu bastante, tomou forma, ela foi desenvolvida e integrada na versão 2.3.0 do Spark que foi lançada em Feveiro de 2018.
Quais os benefícios de executar Spark no Kubernetes?
Como atualmente as empresas estão buscando se reinventar por meio da tão falada transformação digital para que possam ter competitividade e, principalmente, sobreviver diante de um mercado cada vez mais dinâmico, é comum ver abordagens que incluam Big Data, Inteligência Artificial e Cloud Computing[1] [2] [3].
Dentre os benefícios de utilizar Cloud ao invés de On-premises podemos listar:
- Custo;
- Elasticidade;
- SLA;
- Integridade.
Uma comparação interessante pode ser lida no blog da Databricks que é a empresa fundada pelos criadores do Apache Spark.
Como nós vemos uma adoção de Cloud Computing generalizada (até por empresas que teriam condições de bancar o próprio hardware), que muitas vezes nessas implementações de Cloud não existem clusters de Apache Hadoop já que os times de Dados (BI/Data Science/Analytics) optam cada vez mais por utilizar ferramentas como Google BigQuery ou AWS Redshift, não faz sentido subir um Hadoop apenas para utilizar o YARN para gerenciar os recursos.
Para entender melhor o design do Spark Operator, recomendo a leitura da documentação gerada pela equipe da GCP no GitHub.
Setup
Agora que toda a palavra já foi passada, vamos meter a mão na massa para mostrar a coisa acontecendo. Para isso, vamos usar:
- Docker como motor de container no Kubernetes, e construção da imagem (link para instalação);
- Minikube (link para instalação) para facilitar o provisionamento do Kubernetes (sim, será uma execução local);
- uma versão compilada do Apache Spark que seja maior do que a 2.3.0.
Instalação do Spark
Para o Apache Spark, é possível tanto compilar o código fonte, que levará algumas boas horas, ou baixar uma versão compilada aqui (recomendo). Após ter o Apache descompactado, vamos adicionar o caminho no PATH para facilitar a execução:
export PATH=${PATH}:/path/to/apache-spark-X.Y.Z/bin
Criação do Kubernetes com Minikube
Para interação com o Kubernetes que será executado em uma VM pelo minikube
, será necessário ter o kubectl
instalado, que pode ser feito seguindo esse link.
Agora, para ter um cluster de Kubernetes vamos iniciar um minikube
básico, com o propósito de rodar um dos exemplos que já vem no repositório do Spark chamado SparkPi
apenas como demonstração:
minikube start
Construção da imagem base
Vamos utilizar o Docker daemon do Minikube para não depender de um registry externo (e só gerar lixo na VM). Para isso, o minikube tem um wrapper que facilita a nossa vida configurando o host:
eval $(minikube docker-env)
Por último, precisamos de uma imagem base para rodar os jobs. Existe um shell script no repositório do Spark para ajudar com isso. Como o PATH
está com os binários, o commando:
docker-image-tool.sh -m -t latest build
Obs.: O parâmetro -m
aqui indica que o build será para o minikube.
FIRE IN THE HOLE!
Para executar o SparkPi vamos no modo fácil, usando o mesmo comando que seria utilizado para um cluster Spark spark-submit. Porém, o Spark Operator dá suporte a definição de jobs no "idioma do Kubernetes" usando CRD, aqui tem alguns exemplos.
A versão que instalei do Apache Spark é 2.4.3, isso precisa ser passado no parâmetro:
spark-submit --master k8s://https://$(minikube ip):8443 \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=2 \
--executor-memory 512m \
--conf spark.kubernetes.container.image=spark:latest local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar
O que entra de novo aqui é:
--master
: Aceita como parâmetro um schemak8s://
e então nós passamos o endpoint da API do Kubernetes, que está exposta pelo minikubehttps://$(minikube ip):8443
;--conf spark.kubernetes.container.image=
: Configuração para a imagem base que será executada no Kubernetes
Com o output:
...
19/08/22 11:59:09 INFO LoggingPodStatusWatcherImpl: State changed, new state:
pod name: spark-pi-1566485909677-driver
namespace: default
labels: spark-app-selector -> spark-20477e803e7648a59e9bcd37394f7f60, spark-role -> driver
pod uid: c789c4d2-27c4-45ce-ba10-539940cccb8d
creation time: 2019-08-22T14:58:30Z
service account name: default
volumes: spark-local-dir-1, spark-conf-volume, default-token-tj7jn
node name: minikube
start time: 2019-08-22T14:58:30Z
container images: spark:docker
phase: Succeeded
status: [ContainerStatus(containerID=docker://e044d944d2ebee2855cd2b993c62025d6406258ef247648a5902bf6ac09801cc, image=spark:docker, imageID=docker://sha256:86649110778a10aa5d6997d1e3d556b35454e9657978f3a87de32c21787ff82f, lastState=ContainerState(running=null, terminated=null, waiting=null, additionalProperties={}), name=spark-kubernetes-driver, ready=false, restartCount=0, state=ContainerState(running=null, terminated=ContainerStateTerminated(containerID=docker://e044d944d2ebee2855cd2b993c62025d6406258ef247648a5902bf6ac09801cc, exitCode=0, finishedAt=2019-08-22T14:59:08Z, message=null, reason=Completed, signal=null, startedAt=2019-08-22T14:58:32Z, additionalProperties={}), waiting=null, additionalProperties={}), additionalProperties={})]
19/08/22 11:59:09 INFO LoggingPodStatusWatcherImpl: Container final statuses:
Container name: spark-kubernetes-driver
Container image: spark:docker
Container state: Terminated
Exit code: 0
E para ver o resultado do job, e toda a execução podemos mandar um kubectl logs
passando o nome do pod do driver como parâmetro:
kubectl logs spark-pi-1566485909677-driver
Que traz o output (algumas entradas foram omitidas), parecido com:
...
19/08/22 14:59:08 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 52 ms on 172.17.0.7 (executor 1) (2/2)
19/08/22 14:59:08 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
19/08/22 14:59:08 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 0.957 s
19/08/22 14:59:08 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.040608 s
Pi is roughly 3.138915694578473
19/08/22 14:59:08 INFO SparkUI: Stopped Spark web UI at http://spark-pi-1566485909677-driver-svc.default.svc:4040
19/08/22 14:59:08 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
19/08/22 14:59:08 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
19/08/22 14:59:08 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.)
19/08/22 14:59:08 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/08/22 14:59:08 INFO MemoryStore: MemoryStore cleared
19/08/22 14:59:08 INFO BlockManager: BlockManager stopped
19/08/22 14:59:08 INFO BlockManagerMaster: BlockManagerMaster stopped
19/08/22 14:59:08 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/08/22 14:59:08 INFO SparkContext: Successfully stopped SparkContext
19/08/22 14:59:08 INFO ShutdownHookManager: Shutdown hook called
19/08/22 14:59:08 INFO ShutdownHookManager: Deleting directory /tmp/spark-aeadc6ba-36aa-4b7e-8c74-53aa48c3c9b2
19/08/22 14:59:08 INFO ShutdownHookManager: Deleting directory /var/data/spark-084e8326-c8ce-4042-a2ed-75c1eb80414a/spark-ef8117bf-90d0-4a0d-9cab-f36a7bb18910
O resultado aparece no stdout:
19/08/22 14:59:08 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.040608 s
Pi is roughly 3.138915694578473
Cleanup
Para finalizar, vamos deletar a VM que o Minikube gera, para limpar o ambiente (a menos que você queira continuar brincando com ele):
minikube delete
Espero ter dispertado bastante curiosidade e te apresentado uma nova possibilidade para seus workloads de Big Data. Caso tenha alguma dúvida ou sugestão, comenta aí que nós conversamos.