kafka/kafka-brokers.nomad.hcl

236 lines
6.5 KiB
HCL

job "[[ .instance ]]-brokers" {
[[- $c := merge .kafka . ]]
[[ template "common/job_start" . ]]
group "broker" {
[[ $c := merge $c.broker $c ]]
[[ template "common/group_start" $c ]]
network {
mode = "bridge"
port "client" {
[[- if has $c "static_client_port" ]]
static = [[ $c.static_client_port ]]
[[- end ]]
}
port "broker" {
[[- if has $c "static_broker_port" ]]
static = [[ $c.static_broker_port ]]
[[- end ]]
}
[[- if conv.ToBool $c.prometheus.enabled ]]
port "metrics" {}
[[- end ]]
}
[[ template "common/volumes" $c ]]
service {
name = "[[ .instance ]]-broker[[ .consul.suffix ]]"
port = "client"
[[ template "common/service_meta" $c ]]
[[ template "common/connect" $c ]]
# Checks the broker is ready
# See states signification here : https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/BrokerState.java
check {
name = "health"
type = "script"
command = "sh"
args = [
"-c",
"STATE=$(curl http://127.0.0.1:9404 | grep '^kafka_server_kafkaserver_brokerstate' | sed -r 's/.* (\\d+)\\.0/\\1/'); echo \"Broker state: ${STATE}\"; if [ \"${STATE}\" = \"3\" ]; then exit 0; else exit 1; fi"
]
interval = "[[ $c.consul.check.interval ]]"
timeout = "[[ $c.consul.check.timeout ]]"
task = "kafka-broker"
}
tags = [
"broker-${NOMAD_ALLOC_INDEX}"
]
}
[[ template "common/task.wait_for" $c ]]
[[ template "common/task.metrics_proxy" $c ]]
task "kafka-broker" {
driver = "[[ $c.nomad.driver ]]"
leader = true
config {
[[ template "common/image" $c ]]
pids_limit = 500
volumes = [
"local/conf/server.properties:/opt/kafka/config/server.properties:ro",
"local/conf/client.properties:/opt/kafka/config/client.properties:ro"
]
}
[[ template "common/artifacts" $c ]]
[[ template "common/vault.policies" $c ]]
env {
# So the PEM certs from vault are converted to PKCS12 for Kafka to use
PEM_KEY_FILE = "/secrets/kafka.bundle.pem"
PEM_CERT_FILE = "/secrets/kafka.bundle.pem"
P12_FILE = "/secrets/kafka.p12"
TMPDIR = "/local/tmp"
LOG_DIR = "/alloc/logs/"
}
[[ template "common/file_env" $c ]]
# Some env vars
template {
data = <<_EOT
KAFKA_HEAP_OPTS=-Xmx[[ $c.resources.memory | mul 0.3 | math.Ceil ]]m -Xms[[ $c.resources.memory | mul 0.3 | math.Ceil ]]m
KAFKA_OPTS=-Djava.io.tmpdir=/local/tmp -javaagent:/jmx_exporter/jmx_prometheus_javaagent.jar=127.0.0.1:9404:/opt/kafka/config/jmx_exporter.yml
[[- if ne $c.zookeeper.user "" ]] -Djava.security.auth.login.config=/secrets/jaas.conf[[- end ]]
_EOT
destination = "secrets/.kafka.env"
perms = 400
env = true
}
[[- if ne $c.zookeeper.user "" ]]
# SASL user/pass to auth on ZooKeeper
template {
data = <<_EOT
[[ template "kafka/brokers/jaas.conf" $c ]]
_EOT
destination = "secrets/jaas.conf"
uid = 100000
gid = 109092
perms = "0640"
}
[[- end ]]
# The main broker configuration
template {
data = <<_EOT
[[ template "kafka/brokers/server.properties" $c ]]
_EOT
destination = "local/conf/server.properties"
}
# A configuration to be used by client tools
template {
data = <<_EOT
[[ template "kafka/brokers/client.properties" $c ]]
_EOT
destination = "local/conf/client.properties"
# No need to restart if the client conf changes
change_mode = "noop"
}
# The certificate and private key for the broker
template {
data = <<_EOT
{{- with pkiCert "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-broker"
"common_name=[[ .instance ]]-broker[[ .consul.suffix ]]"
(printf "alt_names=broker-%s.[[ .instance ]]-broker[[ .consul.suffix ]].service.[[ .consul.domain ]],[[ .instance ]]-broker[[ .consul.suffix ]].service.[[ .consul.domain ]],localhost" (env "NOMAD_ALLOC_INDEX"))
(printf "ip_sans=%s,127.0.0.1" (env "NOMAD_HOST_IP_client"))
(printf "ttl=%dh" (env "NOMAD_ALLOC_INDEX" | parseInt | multiply 24 | add 650)) }}
{{ .Cert }}
{{ .Key }}
{{- end }}
_EOT
destination = "secrets/kafka.bundle.pem"
uid = 100000
gid = 109092
perms = "0640"
}
# The trusted CA for the broker
template {
data = <<_EOT
{{ with secret "[[ $c.vault.pki.path ]]/cert/ca_chain" }}{{ .Data.ca_chain }}{{ end }}
_EOT
destination = "secrets/kafka.ca.pem"
uid = 100000
gid = 100000
}
# Mount the persistent volume on /data
volume_mount {
volume = "data"
destination = "/data"
}
[[ template "common/resources" $c ]]
}
}
[[- if conv.ToBool $c.prometheus.enabled ]]
[[- $c := merge .kafka.exporter $c ]]
group "exporter" {
network {
mode = "bridge"
port "metrics" {}
}
service {
name = "[[ .instance ]]-exporter[[ .consul.suffix ]]"
[[ template "common/service_meta" $c ]]
}
[[ template "common/task.wait_for" $c ]]
task "kafka-exporter" {
driver = "[[ $c.nomad.driver ]]"
user = 9308
config {
[[ template "common/image" $c ]]
pids_limit = 100
command = "/local/kafka_exporter"
}
[[ template "common/artifacts" $c ]]
[[ template "common/vault.policies" $c ]]
[[ template "common/file_env" $c ]]
template {
data = <<_EOT
[[ template "kafka/exporter/start.sh" $c ]]
_EOT
destination = "local/kafka_exporter"
perms = "0755"
}
[[ template "common/metrics_cert" $c ]]
# Ask vault for a client certificate to connect on the brokers
template {
data = <<_EOT
{{- with pkiCert "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-exporter"
"common_name=[[ .instance ]]-exporter" "ttl=72h" }}
{{ .Cert }}
{{ .Key }}
{{- end }}
_EOT
destination = "secrets/kafka.bundle.pem"
uid = 109308
gid = 100000
perms = "0400"
}
# The CA chain to validate kafka brokers
template {
data = <<_EOT
{{ with secret "[[ $c.vault.pki.path ]]/cert/ca_chain" }}{{ .Data.ca_chain }}{{ end }}
_EOT
destination = "local/kafka.ca.pem"
}
[[ template "common/resources" $c ]]
}
}
[[- end ]]
}