job "[[ .instance ]]-brokers" { [[- $c := merge .kafka . ]] [[ template "common/job_start" . ]] group "broker" { [[ $c := merge $c.broker $c ]] [[ template "common/group_start" $c ]] network { mode = "bridge" port "client" { [[- if has $c "static_client_port" ]] static = [[ $c.static_client_port ]] [[- end ]] } port "broker" { [[- if has $c "static_broker_port" ]] static = [[ $c.static_broker_port ]] [[- end ]] } [[- if conv.ToBool $c.prometheus.enabled ]] port "metrics" {} [[- end ]] } [[ template "common/volumes" $c ]] service { name = "[[ .instance ]]-broker[[ .consul.suffix ]]" port = "client" [[ template "common/service_meta" $c ]] [[ template "common/connect" $c ]] # Checks the broker is ready # See states signification here : https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/BrokerState.java check { name = "health" type = "script" command = "sh" args = [ "-c", "STATE=$(curl http://127.0.0.1:9404 | grep '^kafka_server_kafkaserver_brokerstate' | sed -r 's/.* (\\d+)\\.0/\\1/'); echo \"Broker state: ${STATE}\"; if [ \"${STATE}\" = \"3\" ]; then exit 0; else exit 1; fi" ] interval = "[[ $c.consul.check.interval ]]" timeout = "[[ $c.consul.check.timeout ]]" task = "kafka-broker" } tags = [ "broker-${NOMAD_ALLOC_INDEX}" ] } [[ template "common/task.wait_for" $c ]] [[ template "common/task.metrics_proxy" $c ]] task "kafka-broker" { driver = "[[ $c.nomad.driver ]]" leader = true config { [[ template "common/image" $c ]] pids_limit = 500 volumes = [ "local/conf/server.properties:/opt/kafka/config/server.properties:ro", "local/conf/client.properties:/opt/kafka/config/client.properties:ro" ] } [[ template "common/artifacts" $c ]] [[ template "common/vault.policies" $c ]] env { # So the PEM certs from vault are converted to PKCS12 for Kafka to use PEM_KEY_FILE = "/secrets/kafka.bundle.pem" PEM_CERT_FILE = "/secrets/kafka.bundle.pem" P12_FILE = "/secrets/kafka.p12" TMPDIR = "/local/tmp" LOG_DIR = "/alloc/logs/" } [[ template "common/file_env" $c ]] # Some env vars template { data = <<_EOT KAFKA_HEAP_OPTS=-Xmx[[ $c.resources.memory | mul 0.3 | math.Ceil ]]m -Xms[[ $c.resources.memory | mul 0.3 | math.Ceil ]]m KAFKA_OPTS=-Djava.io.tmpdir=/local/tmp -javaagent:/jmx_exporter/jmx_prometheus_javaagent.jar=127.0.0.1:9404:/opt/kafka/config/jmx_exporter.yml [[- if ne $c.zookeeper.user "" ]] -Djava.security.auth.login.config=/secrets/jaas.conf[[- end ]] _EOT destination = "secrets/.kafka.env" perms = 400 env = true } [[- if ne $c.zookeeper.user "" ]] # SASL user/pass to auth on ZooKeeper template { data = <<_EOT [[ template "kafka/brokers/jaas.conf" $c ]] _EOT destination = "secrets/jaas.conf" uid = 100000 gid = 109092 perms = "0640" } [[- end ]] # The main broker configuration template { data = <<_EOT [[ template "kafka/brokers/server.properties" $c ]] _EOT destination = "local/conf/server.properties" } # A configuration to be used by client tools template { data = <<_EOT [[ template "kafka/brokers/client.properties" $c ]] _EOT destination = "local/conf/client.properties" # No need to restart if the client conf changes change_mode = "noop" } # The certificate and private key for the broker template { data = <<_EOT {{- with pkiCert "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-broker" "common_name=[[ .instance ]]-broker[[ .consul.suffix ]]" (printf "alt_names=broker-%s.[[ .instance ]]-broker[[ .consul.suffix ]].service.[[ .consul.domain ]],[[ .instance ]]-broker[[ .consul.suffix ]].service.[[ .consul.domain ]],localhost" (env "NOMAD_ALLOC_INDEX")) (printf "ip_sans=%s,127.0.0.1" (env "NOMAD_HOST_IP_client")) (printf "ttl=%dh" (env "NOMAD_ALLOC_INDEX" | parseInt | multiply 24 | add 650)) }} {{ .Cert }} {{ .Key }} {{- end }} _EOT destination = "secrets/kafka.bundle.pem" uid = 100000 gid = 109092 perms = "0640" } # The trusted CA for the broker template { data = <<_EOT {{ with secret "[[ $c.vault.pki.path ]]/cert/ca_chain" }}{{ .Data.ca_chain }}{{ end }} _EOT destination = "secrets/kafka.ca.pem" uid = 100000 gid = 100000 } # Mount the persistent volume on /data volume_mount { volume = "data" destination = "/data" } [[ template "common/resources" $c ]] } } [[- if conv.ToBool $c.prometheus.enabled ]] [[- $c := merge .kafka.exporter $c ]] group "exporter" { network { mode = "bridge" port "metrics" {} } service { name = "[[ .instance ]]-exporter[[ .consul.suffix ]]" [[ template "common/service_meta" $c ]] } [[ template "common/task.wait_for" $c ]] task "kafka-exporter" { driver = "[[ $c.nomad.driver ]]" user = 9308 config { [[ template "common/image" $c ]] pids_limit = 100 command = "/local/kafka_exporter" } [[ template "common/artifacts" $c ]] [[ template "common/vault.policies" $c ]] [[ template "common/file_env" $c ]] template { data = <<_EOT [[ template "kafka/exporter/start.sh" $c ]] _EOT destination = "local/kafka_exporter" perms = "0755" } [[ template "common/metrics_cert" $c ]] # Ask vault for a client certificate to connect on the brokers template { data = <<_EOT {{- with pkiCert "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-exporter" "common_name=[[ .instance ]]-exporter" "ttl=72h" }} {{ .Cert }} {{ .Key }} {{- end }} _EOT destination = "secrets/kafka.bundle.pem" uid = 109308 gid = 100000 perms = "0400" } # The CA chain to validate kafka brokers template { data = <<_EOT {{ with secret "[[ $c.vault.pki.path ]]/cert/ca_chain" }}{{ .Data.ca_chain }}{{ end }} _EOT destination = "local/kafka.ca.pem" } [[ template "common/resources" $c ]] } } [[- end ]] }