236 lines
6.5 KiB
HCL
236 lines
6.5 KiB
HCL
job "[[ .instance ]]-brokers" {
|
|
|
|
[[- $c := merge .kafka . ]]
|
|
|
|
[[ template "common/job_start" . ]]
|
|
|
|
group "broker" {
|
|
[[ $c := merge $c.broker $c ]]
|
|
[[ template "common/group_start" $c ]]
|
|
|
|
network {
|
|
mode = "bridge"
|
|
port "client" {
|
|
[[- if has $c "static_client_port" ]]
|
|
static = [[ $c.static_client_port ]]
|
|
[[- end ]]
|
|
}
|
|
port "broker" {
|
|
[[- if has $c "static_broker_port" ]]
|
|
static = [[ $c.static_broker_port ]]
|
|
[[- end ]]
|
|
}
|
|
[[- if conv.ToBool $c.prometheus.enabled ]]
|
|
port "metrics" {}
|
|
[[- end ]]
|
|
}
|
|
|
|
[[ template "common/volumes" $c ]]
|
|
|
|
service {
|
|
name = "[[ .instance ]]-broker[[ .consul.suffix ]]"
|
|
port = "client"
|
|
[[ template "common/service_meta" $c ]]
|
|
[[ template "common/connect" $c ]]
|
|
|
|
# Checks the broker is ready
|
|
# See states signification here : https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/BrokerState.java
|
|
check {
|
|
name = "health"
|
|
type = "script"
|
|
command = "sh"
|
|
args = [
|
|
"-c",
|
|
"STATE=$(curl http://127.0.0.1:9404 | grep '^kafka_server_kafkaserver_brokerstate' | sed -r 's/.* (\\d+)\\.0/\\1/'); echo \"Broker state: ${STATE}\"; if [ \"${STATE}\" = \"3\" ]; then exit 0; else exit 1; fi"
|
|
]
|
|
interval = "[[ $c.consul.check.interval ]]"
|
|
timeout = "[[ $c.consul.check.timeout ]]"
|
|
task = "kafka-broker"
|
|
}
|
|
|
|
tags = [
|
|
"broker-${NOMAD_ALLOC_INDEX}"
|
|
]
|
|
}
|
|
|
|
[[ template "common/task.wait_for" $c ]]
|
|
[[ template "common/task.metrics_proxy" $c ]]
|
|
|
|
task "kafka-broker" {
|
|
driver = "[[ $c.nomad.driver ]]"
|
|
leader = true
|
|
|
|
config {
|
|
[[ template "common/image" $c ]]
|
|
pids_limit = 500
|
|
volumes = [
|
|
"local/conf/server.properties:/opt/kafka/config/server.properties:ro",
|
|
"local/conf/client.properties:/opt/kafka/config/client.properties:ro"
|
|
]
|
|
}
|
|
|
|
[[ template "common/artifacts" $c ]]
|
|
[[ template "common/vault.policies" $c ]]
|
|
|
|
env {
|
|
# So the PEM certs from vault are converted to PKCS12 for Kafka to use
|
|
PEM_KEY_FILE = "/secrets/kafka.bundle.pem"
|
|
PEM_CERT_FILE = "/secrets/kafka.bundle.pem"
|
|
P12_FILE = "/secrets/kafka.p12"
|
|
TMPDIR = "/local/tmp"
|
|
LOG_DIR = "/alloc/logs/"
|
|
}
|
|
[[ template "common/file_env" $c ]]
|
|
|
|
# Some env vars
|
|
template {
|
|
data = <<_EOT
|
|
KAFKA_HEAP_OPTS=-Xmx[[ $c.resources.memory | mul 0.3 | math.Ceil ]]m -Xms[[ $c.resources.memory | mul 0.3 | math.Ceil ]]m
|
|
KAFKA_OPTS=-Djava.io.tmpdir=/local/tmp -javaagent:/jmx_exporter/jmx_prometheus_javaagent.jar=127.0.0.1:9404:/opt/kafka/config/jmx_exporter.yml
|
|
[[- if ne $c.zookeeper.user "" ]] -Djava.security.auth.login.config=/secrets/jaas.conf[[- end ]]
|
|
_EOT
|
|
destination = "secrets/.kafka.env"
|
|
perms = 400
|
|
env = true
|
|
}
|
|
|
|
[[- if ne $c.zookeeper.user "" ]]
|
|
|
|
# SASL user/pass to auth on ZooKeeper
|
|
template {
|
|
data = <<_EOT
|
|
[[ template "kafka/brokers/jaas.conf" $c ]]
|
|
_EOT
|
|
destination = "secrets/jaas.conf"
|
|
uid = 100000
|
|
gid = 109092
|
|
perms = "0640"
|
|
}
|
|
[[- end ]]
|
|
|
|
# The main broker configuration
|
|
template {
|
|
data = <<_EOT
|
|
[[ template "kafka/brokers/server.properties" $c ]]
|
|
_EOT
|
|
destination = "local/conf/server.properties"
|
|
}
|
|
|
|
# A configuration to be used by client tools
|
|
template {
|
|
data = <<_EOT
|
|
[[ template "kafka/brokers/client.properties" $c ]]
|
|
_EOT
|
|
destination = "local/conf/client.properties"
|
|
# No need to restart if the client conf changes
|
|
change_mode = "noop"
|
|
}
|
|
|
|
# The certificate and private key for the broker
|
|
template {
|
|
data = <<_EOT
|
|
{{- with pkiCert "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-broker"
|
|
"common_name=[[ .instance ]]-broker[[ .consul.suffix ]]"
|
|
(printf "alt_names=broker-%s.[[ .instance ]]-broker[[ .consul.suffix ]].service.[[ .consul.domain ]],[[ .instance ]]-broker[[ .consul.suffix ]].service.[[ .consul.domain ]],localhost" (env "NOMAD_ALLOC_INDEX"))
|
|
(printf "ip_sans=%s,127.0.0.1" (env "NOMAD_HOST_IP_client"))
|
|
(printf "ttl=%dh" (env "NOMAD_ALLOC_INDEX" | parseInt | multiply 24 | add 650)) }}
|
|
{{ .Cert }}
|
|
{{ .Key }}
|
|
{{- end }}
|
|
_EOT
|
|
destination = "secrets/kafka.bundle.pem"
|
|
uid = 100000
|
|
gid = 109092
|
|
perms = "0640"
|
|
}
|
|
|
|
# The trusted CA for the broker
|
|
template {
|
|
data = <<_EOT
|
|
{{ with secret "[[ $c.vault.pki.path ]]/cert/ca_chain" }}{{ .Data.ca_chain }}{{ end }}
|
|
_EOT
|
|
destination = "secrets/kafka.ca.pem"
|
|
uid = 100000
|
|
gid = 100000
|
|
}
|
|
|
|
# Mount the persistent volume on /data
|
|
volume_mount {
|
|
volume = "data"
|
|
destination = "/data"
|
|
}
|
|
|
|
[[ template "common/resources" $c ]]
|
|
}
|
|
}
|
|
|
|
[[- if conv.ToBool $c.prometheus.enabled ]]
|
|
[[- $c := merge .kafka.exporter $c ]]
|
|
group "exporter" {
|
|
|
|
network {
|
|
mode = "bridge"
|
|
port "metrics" {}
|
|
}
|
|
|
|
service {
|
|
name = "[[ .instance ]]-exporter[[ .consul.suffix ]]"
|
|
[[ template "common/service_meta" $c ]]
|
|
}
|
|
|
|
[[ template "common/task.wait_for" $c ]]
|
|
|
|
task "kafka-exporter" {
|
|
driver = "[[ $c.nomad.driver ]]"
|
|
user = 9308
|
|
|
|
config {
|
|
[[ template "common/image" $c ]]
|
|
pids_limit = 100
|
|
command = "/local/kafka_exporter"
|
|
}
|
|
|
|
[[ template "common/artifacts" $c ]]
|
|
[[ template "common/vault.policies" $c ]]
|
|
[[ template "common/file_env" $c ]]
|
|
|
|
template {
|
|
data = <<_EOT
|
|
[[ template "kafka/exporter/start.sh" $c ]]
|
|
_EOT
|
|
destination = "local/kafka_exporter"
|
|
perms = "0755"
|
|
}
|
|
|
|
[[ template "common/metrics_cert" $c ]]
|
|
|
|
# Ask vault for a client certificate to connect on the brokers
|
|
template {
|
|
data = <<_EOT
|
|
{{- with pkiCert "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-exporter"
|
|
"common_name=[[ .instance ]]-exporter" "ttl=72h" }}
|
|
{{ .Cert }}
|
|
{{ .Key }}
|
|
{{- end }}
|
|
_EOT
|
|
destination = "secrets/kafka.bundle.pem"
|
|
uid = 109308
|
|
gid = 100000
|
|
perms = "0400"
|
|
}
|
|
|
|
# The CA chain to validate kafka brokers
|
|
template {
|
|
data = <<_EOT
|
|
{{ with secret "[[ $c.vault.pki.path ]]/cert/ca_chain" }}{{ .Data.ca_chain }}{{ end }}
|
|
_EOT
|
|
destination = "local/kafka.ca.pem"
|
|
}
|
|
|
|
[[ template "common/resources" $c ]]
|
|
}
|
|
}
|
|
|
|
[[- end ]]
|
|
}
|