Kubernetes?

Desde o seu lançamento em 2014 pela Google, o Kubernetes tem ganhado muita popularidade junto com o próprio Docker e, desde 2016, passou a ser o de facto Container orchestrator, sendo estabelecido como um padrão de mercado e ganhando versões gerenciadas em todas as major Clouds[1] [2] [3] (inclusive na Digital Ocean e Alibaba).

Toda essa popularidade tem atraído novas implementações e use-cases para o orquestrador, dentre eles a execução de Stateful applications e inclusive a tentativa de rodar bancos de dados em containers. Qual seria a necessidade de orquestrar um banco de dados? Ótima pergunta para um outro post. Por hoje, vamos focar na utilização do Spark Operator para executar Spark workloads em Kubernetes.

A idéia de executar nativamente no Kubernetes surgiu em 2016 e antes disso haviam alguns jeitinhos de fazer acontecer. Por exemplo: com Apache Zeppelin ou então, mais refinado ainda com um cluster que usaria o Spark Standalone mode.

Porém, executar nativamente seria muito mais interessante e a idéia cresceu bastante, tomou forma, ela foi desenvolvida e integrada na versão 2.3.0 do Spark que foi lançada em Feveiro de 2018.

Quais os benefícios de executar Spark no Kubernetes?

Como atualmente as empresas estão buscando se reinventar por meio da tão falada transformação digital para que possam ter competitividade e, principalmente, sobreviver diante de um mercado cada vez mais dinâmico, é comum ver abordagens que incluam Big Data, Inteligência Artificial e Cloud Computing[1] [2] [3].

Dentre os benefícios de utilizar Cloud ao invés de On-premises podemos listar:

  • Custo;
  • Elasticidade;
  • SLA;
  • Integridade.

Uma comparação interessante pode ser lida no blog da Databricks que é a empresa fundada pelos criadores do Apache Spark.

Como nós vemos uma adoção de Cloud Computing generalizada (até por empresas que teriam condições de bancar o próprio hardware), que muitas vezes nessas implementações de Cloud não existem clusters de Apache Hadoop já que os times de Dados (BI/Data Science/Analytics) optam cada vez mais por utilizar ferramentas como Google BigQuery ou AWS Redshift, não faz sentido subir um Hadoop apenas para utilizar o YARN para gerenciar os recursos.

Para entender melhor o design do Spark Operator, recomendo a leitura da documentação gerada pela equipe da GCP no GitHub.

Setup

Agora que toda a palavra já foi passada, vamos meter a mão na massa para mostrar a coisa acontecendo. Para isso, vamos usar:

  • Docker como motor de container no Kubernetes, e construção da imagem (link para instalação);
  • Minikube (link para instalação) para facilitar o provisionamento do Kubernetes (sim, será uma execução local);
  • uma versão compilada do Apache Spark que seja maior do que a 2.3.0.

Instalação do Spark

Para o Apache Spark, é possível tanto compilar o código fonte, que levará algumas boas horas, ou baixar uma versão compilada aqui (recomendo). Após ter o Apache descompactado, vamos adicionar o caminho no PATH para facilitar a execução:

export PATH=${PATH}:/path/to/apache-spark-X.Y.Z/bin

Criação do Kubernetes com Minikube

Para interação com o Kubernetes que será executado em uma VM pelo minikube, será necessário ter o kubectl instalado, que pode ser feito seguindo esse link.

Agora, para ter um cluster de Kubernetes vamos iniciar um minikube básico, com o propósito de rodar um dos exemplos que já vem no repositório do Spark chamado SparkPi apenas como demonstração:

minikube start

Construção da imagem base

Vamos utilizar o Docker daemon do Minikube para não depender de um registry externo (e só gerar lixo na VM). Para isso, o minikube tem um wrapper que facilita a nossa vida configurando o host:

eval $(minikube docker-env)

Por último, precisamos de uma imagem base para rodar os jobs. Existe um shell script no repositório do Spark para ajudar com isso. Como o PATH está com os binários, o commando:

docker-image-tool.sh -m -t latest build 

Obs.: O parâmetro -m aqui indica que o build será para o minikube.

FIRE IN THE HOLE!

Para executar o SparkPi vamos no modo fácil, usando o mesmo comando que seria utilizado para um cluster Spark spark-submit. Porém, o Spark Operator dá suporte a definição de jobs no "idioma do Kubernetes" usando CRD, aqui tem alguns exemplos.

A versão que instalei do Apache Spark é 2.4.3, isso precisa ser passado no parâmetro:

spark-submit --master k8s://https://$(minikube ip):8443 \
  --deploy-mode cluster \
  --name spark-pi \
  --class org.apache.spark.examples.SparkPi \
  --conf spark.executor.instances=2 \
  --executor-memory 512m \
  --conf spark.kubernetes.container.image=spark:latest local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar

O que entra de novo aqui é:

  • --master: Aceita como parâmetro um schema k8s:// e então nós passamos o endpoint da API do Kubernetes, que está exposta pelo minikube https://$(minikube ip):8443;
  • --conf spark.kubernetes.container.image=: Configuração para a imagem base que será executada no Kubernetes

Com o output:

...
19/08/22 11:59:09 INFO LoggingPodStatusWatcherImpl: State changed, new state:
	 pod name: spark-pi-1566485909677-driver
	 namespace: default
	 labels: spark-app-selector -> spark-20477e803e7648a59e9bcd37394f7f60, spark-role -> driver
	 pod uid: c789c4d2-27c4-45ce-ba10-539940cccb8d
	 creation time: 2019-08-22T14:58:30Z
	 service account name: default
	 volumes: spark-local-dir-1, spark-conf-volume, default-token-tj7jn
	 node name: minikube
	 start time: 2019-08-22T14:58:30Z
	 container images: spark:docker
	 phase: Succeeded
	 status: [ContainerStatus(containerID=docker://e044d944d2ebee2855cd2b993c62025d6406258ef247648a5902bf6ac09801cc, image=spark:docker, imageID=docker://sha256:86649110778a10aa5d6997d1e3d556b35454e9657978f3a87de32c21787ff82f, lastState=ContainerState(running=null, terminated=null, waiting=null, additionalProperties={}), name=spark-kubernetes-driver, ready=false, restartCount=0, state=ContainerState(running=null, terminated=ContainerStateTerminated(containerID=docker://e044d944d2ebee2855cd2b993c62025d6406258ef247648a5902bf6ac09801cc, exitCode=0, finishedAt=2019-08-22T14:59:08Z, message=null, reason=Completed, signal=null, startedAt=2019-08-22T14:58:32Z, additionalProperties={}), waiting=null, additionalProperties={}), additionalProperties={})]
19/08/22 11:59:09 INFO LoggingPodStatusWatcherImpl: Container final statuses:


	 Container name: spark-kubernetes-driver
	 Container image: spark:docker
	 Container state: Terminated
	 Exit code: 0

E para ver o resultado do job, e toda a execução podemos mandar um kubectl logs passando o nome do pod do driver como parâmetro:

kubectl logs spark-pi-1566485909677-driver

Que traz o output (algumas entradas foram omitidas), parecido com:

...
19/08/22 14:59:08 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 52 ms on 172.17.0.7 (executor 1) (2/2)
19/08/22 14:59:08 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
19/08/22 14:59:08 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 0.957 s
19/08/22 14:59:08 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.040608 s
Pi is roughly 3.138915694578473
19/08/22 14:59:08 INFO SparkUI: Stopped Spark web UI at http://spark-pi-1566485909677-driver-svc.default.svc:4040
19/08/22 14:59:08 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
19/08/22 14:59:08 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
19/08/22 14:59:08 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.)
19/08/22 14:59:08 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/08/22 14:59:08 INFO MemoryStore: MemoryStore cleared
19/08/22 14:59:08 INFO BlockManager: BlockManager stopped
19/08/22 14:59:08 INFO BlockManagerMaster: BlockManagerMaster stopped
19/08/22 14:59:08 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/08/22 14:59:08 INFO SparkContext: Successfully stopped SparkContext
19/08/22 14:59:08 INFO ShutdownHookManager: Shutdown hook called
19/08/22 14:59:08 INFO ShutdownHookManager: Deleting directory /tmp/spark-aeadc6ba-36aa-4b7e-8c74-53aa48c3c9b2
19/08/22 14:59:08 INFO ShutdownHookManager: Deleting directory /var/data/spark-084e8326-c8ce-4042-a2ed-75c1eb80414a/spark-ef8117bf-90d0-4a0d-9cab-f36a7bb18910

O resultado aparece no stdout:

19/08/22 14:59:08 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.040608 s
Pi is roughly 3.138915694578473

Cleanup

Para finalizar, vamos deletar a VM que o Minikube gera, para limpar o ambiente (a menos que você queira continuar brincando com ele):

minikube delete

Espero ter dispertado bastante curiosidade e te apresentado uma nova possibilidade para seus workloads de Big Data. Caso tenha alguma dúvida ou sugestão, comenta aí que nós conversamos.