diff --git a/bundles.yml b/bundles.yml new file mode 100644 index 0000000..5b9120e --- /dev/null +++ b/bundles.yml @@ -0,0 +1,4 @@ +--- + +dependencies: + - url: ../common.git diff --git a/images/jikkou/Dockerfile b/images/jikkou/Dockerfile new file mode 100644 index 0000000..ea7bd90 --- /dev/null +++ b/images/jikkou/Dockerfile @@ -0,0 +1,23 @@ +FROM [[ .docker.repo ]][[ .docker.base_images.alpine.image ]] AS builder + +ARG JIKKOU_VERSION=[[ .kafka.jikkou.version ]] + +ADD https://github.com/streamthoughts/jikkou/releases/download/v${JIKKOU_VERSION}/jikkou-${JIKKOU_VERSION}-linux-x86_64.tar.gz /tmp +ADD https://github.com/streamthoughts/jikkou/releases/download/v${JIKKOU_VERSION}/checksums_sha256.txt /tmp +RUN set -euxo pipefail &&\ + apk --no-cache add tar gzip &&\ + cd /tmp &&\ + grep "jikkou-${JIKKOU_VERSION}-linux-x86_64.tar.gz" checksums_sha256.txt | sha256sum -c &&\ + tar xvf jikkou-${JIKKOU_VERSION}-linux-x86_64.tar.gz &&\ + mv jikkou-${JIKKOU_VERSION}-linux-x86_64 /opt/jikkou &&\ + rm -f /opt/jikkou/bin/jikkou.bat + +FROM [[ .docker.repo ]][[ .docker.base_images.java17.image ]] +MAINTAINER [[ .docker.maintainer ]] + +ENV PATH=:/opt/jikkou/bin:$PATH +COPY --from=builder /opt/jikkou/ /opt/jikkou/ +RUN set -euxo pipefail &&\ + apk --no-cache add openssl gcompat + +CMD ["jikkou"] diff --git a/images/kafka-broker/Dockerfile b/images/kafka-broker/Dockerfile new file mode 100644 index 0000000..f6ae611 --- /dev/null +++ b/images/kafka-broker/Dockerfile @@ -0,0 +1,63 @@ +FROM [[ .docker.repo ]][[ .docker.base_images.java17.image ]] AS builder + +ARG KAFKA_VERSION=[[ .kafka.broker.version ]] \ + SCALA_VERSION=2.13 + +ADD https://www.apache.org/dyn/closer.cgi?action=download&filename=kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz +ADD https://www.apache.org/dyn/closer.cgi?action=download&filename=kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz.asc /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz.asc +ADD https://downloads.apache.org/kafka/KEYS /tmp/ + +RUN set -ex &&\ + apk add --no-cache \ + curl \ + ca-certificates \ + gnupg \ + tar \ + gzip \ + &&\ + cd /tmp &&\ + export GNUPGHOME="$(mktemp -d)" &&\ + gpg --import KEYS &&\ + gpg --batch --verify kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz.asc kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz &&\ + tar xvf kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz &&\ + mv kafka_${SCALA_VERSION}-${KAFKA_VERSION} /opt/kafka &&\ + rm -rf /opt/kafka/bin/windows /opt/kafka/site-docs + +FROM [[ .docker.repo ]][[ .docker.base_images.java17.image ]] +MAINTAINER [[ .docker.maintainer ]] + +ENV PATH=$PATH:/opt/kafka/bin \ + KAFKA_DATA_DIR=/data \ + KAFKA_CONF_DIR=/opt/kafka/config \ + KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" \ + KAFKA_GC_LOG_OPTS=" " \ + LOG_DIR=/logs + +COPY --from=builder /opt/kafka/ /opt/kafka/ +RUN set -euxo pipefail &&\ + apk --no-cache add \ + bash \ + openssl \ + &&\ + addgroup -g 9092 kafka &&\ + adduser --system \ + --ingroup kafka \ + --disabled-password \ + --uid 9092 \ + --home /opt/kafka \ + --no-create-home \ + --shell /sbin/nologin \ + kafka &&\ + mkdir /data /logs &&\ + chown kafka.kafka /data /logs &&\ + chmod 700 /data /logs +COPY root/ / +RUN set -euxo pipefail &&\ + chown kafka.kafka /opt/kafka/config/server.properties &&\ + chmod 640 /opt/kafka/config/server.properties + +EXPOSE 9092 +USER kafka + +CMD ["bash", "-c", "exec kafka-server-start.sh ${KAFKA_CONF_DIR}/server.properties"] + diff --git a/images/kafka-broker/root/entrypoint.d/10-kafka.env b/images/kafka-broker/root/entrypoint.d/10-kafka.env new file mode 100755 index 0000000..57fcd6b --- /dev/null +++ b/images/kafka-broker/root/entrypoint.d/10-kafka.env @@ -0,0 +1,11 @@ +#!/bin/sh + +set -eo pipefail +if [ -w "${KAFKA_CONF_DIR}/server.properties" ]; then + umask 077 + TMPCONF=$(mktemp) + envsubst < ${KAFKA_CONF_DIR}/server.properties > ${TMPCONF} + cat ${TMPCONF} > ${KAFKA_CONF_DIR}/server.properties + rm -f ${TMPCONF} +fi + diff --git a/images/kafka-broker/root/entrypoint.d/50-kafka-format.sh b/images/kafka-broker/root/entrypoint.d/50-kafka-format.sh new file mode 100755 index 0000000..4094284 --- /dev/null +++ b/images/kafka-broker/root/entrypoint.d/50-kafka-format.sh @@ -0,0 +1,6 @@ +#!/bin/sh + +set -eo pipefail +if [ -n "${KAFKA_CLUSTER_ID}" ]; then + kafka-storage.sh format --cluster-id ${KAFKA_CLUSTER_ID} --config ${KAFKA_CONF_DIR}/server.properties --ignore-formatted +fi diff --git a/images/kafka-broker/root/opt/kafka/bin/dns-srv-to-broker-list.sh b/images/kafka-broker/root/opt/kafka/bin/dns-srv-to-broker-list.sh new file mode 100755 index 0000000..9f5767a --- /dev/null +++ b/images/kafka-broker/root/opt/kafka/bin/dns-srv-to-broker-list.sh @@ -0,0 +1,15 @@ +#!/bin/sh + +set -euo pipefail + +SRV=$1 +OUTPUT="" + +IFS=$'\n' +for SERVER in $(nslookup -type=srv ${SRV} 2>&1 | awk 'f;/Non-authoritative/{f=1}' | sed '$d'); do + PORT=$(echo -n ${SERVER} | awk '{ print $6 }') + CNAME=$(echo -n ${SERVER} | awk '{ print $7 }') + ADDR=$(nslookup -type=a ${CNAME} 2>&1 | awk -F': ' 'NR==6 { print $2 }') + OUTPUT="${OUTPUT}${ADDR}:${PORT}," +done +echo $(echo -n ${OUTPUT} | sed 's/,$//') diff --git a/images/kafka-broker/root/opt/kafka/config/jmx_exporter.yml b/images/kafka-broker/root/opt/kafka/config/jmx_exporter.yml new file mode 100644 index 0000000..a34d63a --- /dev/null +++ b/images/kafka-broker/root/opt/kafka/config/jmx_exporter.yml @@ -0,0 +1,103 @@ +lowercaseOutputName: true + +rules: +# Special cases and very specific rules +- pattern : kafka.server<>Value + name: kafka_server_$1_$2 + type: GAUGE + labels: + clientId: "$3" + topic: "$4" + partition: "$5" +- pattern : kafka.server<>Value + name: kafka_server_$1_$2 + type: GAUGE + labels: + clientId: "$3" + broker: "$4:$5" +- pattern : kafka.coordinator.(\w+)<>Value + name: kafka_coordinator_$1_$2_$3 + type: GAUGE + +# Generic per-second counters with 0-2 key/value pairs +- pattern: kafka.(\w+)<>Count + name: kafka_$1_$2_$3_total + type: COUNTER + labels: + "$4": "$5" + "$6": "$7" +- pattern: kafka.(\w+)<>Count + name: kafka_$1_$2_$3_total + type: COUNTER + labels: + "$4": "$5" +- pattern: kafka.(\w+)<>Count + name: kafka_$1_$2_$3_total + type: COUNTER + +- pattern: kafka.server<>([a-z-]+) + name: kafka_server_quota_$3 + type: GAUGE + labels: + resource: "$1" + clientId: "$2" + +- pattern: kafka.server<>([a-z-]+) + name: kafka_server_quota_$4 + type: GAUGE + labels: + resource: "$1" + user: "$2" + clientId: "$3" + +# Generic gauges with 0-2 key/value pairs +- pattern: kafka.(\w+)<>Value + name: kafka_$1_$2_$3 + type: GAUGE + labels: + "$4": "$5" + "$6": "$7" +- pattern: kafka.(\w+)<>Value + name: kafka_$1_$2_$3 + type: GAUGE + labels: + "$4": "$5" +- pattern: kafka.(\w+)<>Value + name: kafka_$1_$2_$3 + type: GAUGE + +# Emulate Prometheus 'Summary' metrics for the exported 'Histogram's. +# +# Note that these are missing the '_sum' metric! +- pattern: kafka.(\w+)<>Count + name: kafka_$1_$2_$3_count + type: COUNTER + labels: + "$4": "$5" + "$6": "$7" +- pattern: kafka.(\w+)<>(\d+)thPercentile + name: kafka_$1_$2_$3 + type: GAUGE + labels: + "$4": "$5" + "$6": "$7" + quantile: "0.$8" +- pattern: kafka.(\w+)<>Count + name: kafka_$1_$2_$3_count + type: COUNTER + labels: + "$4": "$5" +- pattern: kafka.(\w+)<>(\d+)thPercentile + name: kafka_$1_$2_$3 + type: GAUGE + labels: + "$4": "$5" + quantile: "0.$6" +- pattern: kafka.(\w+)<>Count + name: kafka_$1_$2_$3_count + type: COUNTER +- pattern: kafka.(\w+)<>(\d+)thPercentile + name: kafka_$1_$2_$3 + type: GAUGE + labels: + quantile: "0.$4" diff --git a/images/kafka-broker/root/opt/kafka/config/log4j.properties b/images/kafka-broker/root/opt/kafka/config/log4j.properties new file mode 100644 index 0000000..0aaf0f4 --- /dev/null +++ b/images/kafka-broker/root/opt/kafka/config/log4j.properties @@ -0,0 +1,4 @@ +log4j.rootLogger=INFO, stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n diff --git a/images/kafka-broker/root/opt/kafka/config/server.properties b/images/kafka-broker/root/opt/kafka/config/server.properties new file mode 100644 index 0000000..188ed93 --- /dev/null +++ b/images/kafka-broker/root/opt/kafka/config/server.properties @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# This configuration file is intended for use in KRaft mode, where +# Apache ZooKeeper is not present. See config/kraft/README.md for details. +# + +############################# Server Basics ############################# + +# The role of this server. Setting this puts us in KRaft mode +process.roles=broker,controller + +# The node id associated with this instance's roles +node.id=1 + +# The connect string for the controller quorum +controller.quorum.voters=1@localhost:9093 + +############################# Socket Server Settings ############################# + +# The address the socket server listens on. +# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum. +# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(), +# with PLAINTEXT listener name, and port 9092. +# FORMAT: +# listeners = listener_name://host_name:port +# EXAMPLE: +# listeners = PLAINTEXT://your.host.name:9092 +listeners=PLAINTEXT://:9092,CONTROLLER://:9093 + +# Name of listener used for communication between brokers. +inter.broker.listener.name=PLAINTEXT + +# Listener name, hostname and port the broker will advertise to clients. +# If not set, it uses the value for "listeners". +advertised.listeners=PLAINTEXT://localhost:9092 + +# A comma-separated list of the names of the listeners used by the controller. +# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol +# This is required if running in KRaft mode. +controller.listener.names=CONTROLLER + +# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details +listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + +# The number of threads that the server uses for receiving requests from the network and sending responses to the network +num.network.threads=3 + +# The number of threads that the server uses for processing requests, which may include disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=102400 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma separated list of directories under which to store log files +log.dirs=${KAFKA_DATA_DIR} + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions=1 + +# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. +# This value is recommended to be increased for installations with data dirs located in RAID array. +num.recovery.threads.per.data.dir=1 + +############################# Internal Topic Settings ############################# +# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" +# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. +offsets.topic.replication.factor=1 +transaction.state.log.replication.factor=1 +transaction.state.log.min.isr=1 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion due to age +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log unless the remaining +# segments drop below log.retention.bytes. Functions independently of log.retention.hours. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 diff --git a/images/kafka-exporter/Dockerfile b/images/kafka-exporter/Dockerfile new file mode 100644 index 0000000..90f651a --- /dev/null +++ b/images/kafka-exporter/Dockerfile @@ -0,0 +1,21 @@ +FROM [[ .docker.repo ]][[ .docker.base_images.alpine.image ]] AS builder + +ARG EXPORTER_VERSION=[[ .kafka.exporter.version ]] + +ADD https://github.com/danielqsj/kafka_exporter/releases/download/v${EXPORTER_VERSION}/kafka_exporter-${EXPORTER_VERSION}.linux-amd64.tar.gz /tmp + +RUN set -euxo pipefail &&\ + cd /tmp &&\ + apk --no-cache add tar &&\ + tar xvzf kafka_exporter-${EXPORTER_VERSION}.linux-amd64.tar.gz &&\ + mv kafka_exporter-${EXPORTER_VERSION}.linux-amd64/kafka_exporter /usr/local/bin/kafka_exporter &&\ + chown root:root /usr/local/bin/kafka_exporter &&\ + chmod 755 /usr/local/bin/kafka_exporter + +FROM [[ .docker.repo ]][[ .docker.base_images.alpine.image ]] +MAINTAINER [[ .docker.maintainer ]] + +COPY --from=builder /usr/local/bin/kafka_exporter /usr/local/bin/kafka_exporter + +CMD ["kafka_exporter"] + diff --git a/init/kafka-broker-pki b/init/kafka-broker-pki new file mode 100755 index 0000000..f1ec4e3 --- /dev/null +++ b/init/kafka-broker-pki @@ -0,0 +1,45 @@ +#!/bin/sh + +set -e + +[[- $c := merge .kafka .]] +[[ template "common/vault.mkpki.sh" $c ]] + +# Role for the brokers +vault write [[ $c.vault.pki.path ]]/roles/[[ .instance ]]-broker \ + allowed_domains="[[ .instance ]]-broker[[ .consul.suffix ]],[[ .instance ]]-broker[[ .consul.suffix ]].service.[[ .consul.domain ]]" \ + allow_bare_domains=true \ + allow_subdomains=true \ + allow_localhost=true \ + allow_ip_sans=true \ + server_flag=true \ + client_flag=true \ + allow_wildcard_certificates=false \ + max_ttl=720h \ + ou="[[ $c.vault.pki.ou ]]" + +# Role for the prometheus exporter +vault write [[ $c.vault.pki.path ]]/roles/[[ .instance ]]-exporter \ + allowed_domains="[[ .instance ]]-exporter" \ + allow_bare_domains=true \ + allow_subdomains=false \ + allow_localhost=false \ + allow_ip_sans=false \ + server_flag=false \ + client_flag=true \ + allow_wildcard_certificates=false \ + max_ttl=72h \ + ou="[[ $c.vault.pki.ou ]]" + +# Role for Jikkou (topic and ACL management tool) +vault write [[ $c.vault.pki.path ]]/roles/[[ .instance ]]-jikkou \ + allowed_domains="[[ .instance ]]-jikkou" \ + allow_bare_domains=true \ + allow_subdomains=false \ + allow_localhost=false \ + allow_ip_sans=false \ + server_flag=false \ + client_flag=true \ + allow_wildcard_certificates=false \ + max_ttl=1h \ + ou="[[ $c.vault.pki.ou ]]" diff --git a/kafka-brokers.nomad.hcl b/kafka-brokers.nomad.hcl new file mode 100644 index 0000000..ac3e651 --- /dev/null +++ b/kafka-brokers.nomad.hcl @@ -0,0 +1,237 @@ +job "[[ .instance ]]-brokers" { + +[[- $c := merge .kafka . ]] + +[[ template "common/job_start" . ]] + + group "broker" { +[[ $c := merge $c.broker $c ]] + + count = [[ $c.count ]] + shutdown_delay = "6s" + +[[ template "common/constraints" $c ]] + + network { + mode = "bridge" + port "client" { +[[- if has $c "static_client_port" ]] + static = [[ $c.static_client_port ]] +[[- end ]] + } + port "broker" { +[[- if has $c "static_broker_port" ]] + static = [[ $c.static_broker_port ]] +[[- end ]] + } +[[- if conv.ToBool $c.prometheus.enabled ]] + port "metrics" {} +[[- end ]] + } + +[[ template "common/volumes" $c ]] + + service { + name = "[[ .instance ]]-broker[[ .consul.suffix ]]" + port = "client" +[[ template "common/service_meta" $c ]] +[[ template "common/connect" $c ]] + + # Checks the broker is ready + # See states signification here : https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/BrokerState.java + check { + name = "health" + type = "script" + command = "sh" + args = [ + "-c", + "STATE=$(curl http://127.0.0.1:9404 | grep '^kafka_server_kafkaserver_brokerstate' | sed -r 's/.* (\\d+)\\.0/\\1/'); echo \"Broker state: ${STATE}\"; if [ \"${STATE}\" = \"3\" ]; then exit 0; else exit 1; fi" + ] + interval = "30s" + timeout = "8s" + task = "kafka-broker" + } + + tags = [ + "broker-${NOMAD_ALLOC_INDEX}" + ] + } + +[[ template "common/task.wait_for" $c ]] + + task "kafka-broker" { + driver = "[[ $c.nomad.driver ]]" + leader = true + + config { + image = "[[ $c.image ]]" + readonly_rootfs = true + pids_limit = 500 + volumes = [ + "local/conf/server.properties:/opt/kafka/config/server.properties:ro", + "local/conf/client.properties:/opt/kafka/config/client.properties:ro" + ] + } + +[[ template "common/artifacts" $c ]] +[[ template "common/vault.policies" $c ]] + + env { + # So the PEM certs from vault are converted to PKCS12 for Kafka to use + PEM_KEY_FILE = "/secrets/kafka.bundle.pem" + PEM_CERT_FILE = "/secrets/kafka.bundle.pem" + P12_FILE = "/secrets/kafka.p12" + } +[[ template "common/file_env" $c ]] + + # Some env vars + template { + data = <<_EOT +KAFKA_HEAP_OPTS=-Xmx[[ $c.resources.memory | mul 0.3 | math.Ceil ]]m -Xms[[ $c.resources.memory | mul 0.3 | math.Ceil ]]m +KAFKA_OPTS=-javaagent:/jmx_exporter/jmx_prometheus_javaagent.jar=127.0.0.1:9404:/opt/kafka/config/jmx_exporter.yml +[[- if ne $c.zookeeper.user "" ]] -Djava.security.auth.login.config=/secrets/jaas.conf[[- end ]] +_EOT + destination = "secrets/.kafka.env" + perms = 400 + env = true + } + +[[- if ne $c.zookeeper.user "" ]] + + # SASL user/pass to auth on ZooKeeper + template { + data = <<_EOT +[[ template "kafka/brokers/jaas.conf" $c ]] +_EOT + destination = "secrets/jaas.conf" + uid = 100000 + gid = 109092 + perms = "0640" + } +[[- end ]] + + # The main broker configuration + template { + data = <<_EOT +[[ template "kafka/brokers/server.properties" $c ]] +_EOT + destination = "local/conf/server.properties" + } + + # A configuration to be used by client tools + template { + data = <<_EOT +[[ template "kafka/brokers/client.properties" $c ]] +_EOT + destination = "local/conf/client.properties" + # No need to restart if the client conf changes + change_mode = "noop" + } + + # The certificate and private key for the broker + template { + data = <<_EOT +{{- with pkiCert "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-broker" + "common_name=[[ .instance ]]-broker[[ .consul.suffix ]]" + (printf "alt_names=broker-%s.[[ .instance ]]-broker[[ .consul.suffix ]].service.[[ .consul.domain ]],[[ .instance ]]-broker[[ .consul.suffix ]].service.[[ .consul.domain ]],localhost" (env "NOMAD_ALLOC_INDEX")) + (printf "ip_sans=%s,127.0.0.1" (env "NOMAD_HOST_IP_client")) + (printf "ttl=%dh" (env "NOMAD_ALLOC_INDEX" | parseInt | multiply 24 | add 650)) }} +{{ .Cert }} +{{ .Key }} +{{- end }} +_EOT + destination = "secrets/kafka.bundle.pem" + uid = 100000 + gid = 109092 + perms = "0640" + } + + # The trusted CA for the broker + template { + data = <<_EOT +{{ with secret "[[ $c.vault.pki.path ]]/cert/ca_chain" }}{{ .Data.ca_chain }}{{ end }} +_EOT + destination = "secrets/kafka.ca.pem" + uid = 100000 + gid = 100000 + } + + # Mount the persistent volume on /data + volume_mount { + volume = "data" + destination = "/data" + } + +[[ template "common/resources" $c ]] + } + } + +[[- if conv.ToBool $c.prometheus.enabled ]] +[[- $c := merge .kafka.exporter $c ]] + group "exporter" { + + network { + mode = "bridge" + port "metrics" {} + } + + service { + name = "[[ .instance ]]-exporter[[ .consul.suffix ]]" +[[ template "common/service_meta" $c ]] + } + +[[ template "common/task.wait_for" $c ]] + + task "kafka-exporter" { + driver = "[[ $c.nomad.driver ]]" + user = 9308 + + config { + image = "[[ $c.image ]]" + readonly_rootfs = true + pids_limit = 100 + command = "/local/kafka_exporter" + } + +[[ template "common/artifacts" $c ]] +[[ template "common/vault.policies" $c ]] +[[ template "common/file_env" $c ]] + + template { + data = <<_EOT +[[ template "kafka/exporter/start.sh" $c ]] +_EOT + destination = "local/kafka_exporter" + perms = "0755" + } + +[[ template "common/metrics_cert" $c ]] + + # Ask vault for a client certificate to connect on the brokers + template { + data = <<_EOT +{{- with pkiCert "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-exporter" + "common_name=[[ .instance ]]-exporter" "ttl=72h" }} +{{ .Cert }} +{{ .Key }} +{{- end }} +_EOT + destination = "secrets/kafka.bundle.pem" + uid = 109308 + gid = 100000 + perms = "0400" + } + + # The CA chain to validate kafka brokers + template { + data = <<_EOT +{{ with secret "[[ $c.vault.pki.path ]]/cert/ca_chain" }}{{ .Data.ca_chain }}{{ end }} +_EOT + destination = "local/kafka.ca.pem" + } + } + + } + +[[- end ]] +} diff --git a/kafka-jikkou.nomad.hcl b/kafka-jikkou.nomad.hcl new file mode 100644 index 0000000..e3592a8 --- /dev/null +++ b/kafka-jikkou.nomad.hcl @@ -0,0 +1,122 @@ +job "[[ .instance ]]-jikkou" { + +[[- $c := merge .kafka . ]] +[[ template "common/job_start" $c ]] + + type = "batch" + + meta { + # Force job to be different for each execution + run_uuid = "${uuidv4()}" + } + + group "jikkou" { + +[[- $c := merge $c.jikkou $c ]] + + network { + mode = "bridge" + } + +[[ template "common/task.wait_for" $c ]] + + task "jikkou" { + driver = "[[ $c.nomad.driver ]]" + + config { + image = "[[ $c.image ]]" + readonly_rootfs = true + pids_limit = 100 + command = "/local/jikkou" + } + +[[ template "common/vault.policies" $c ]] + + env { + PEM_KEY_FILE = "/secrets/jikkou.bundle.pem" + PEM_CERT_FILE = "/secrets/jikkou.bundle.pem" + P12_FILE = "/secrets/jikkou.p12" + } + +[[ template "common/file_env" $c ]] + + template { + data = <<_EOT +KAFKA_BOOTSTRAP_SERVERS= +{{- range $index, $instance := service "[[ .instance ]]-broker[[ .consul.suffix ]]" -}} +{{ if ne $index 0 }},{{ end }}{{ $instance.Address }}:{{ $instance.Port }} +{{- end -}} +_EOT + change_mode = "noop" + destination = "secrets/.jikkou.env" + env = true + } + + # A wrapper script to apply topic definition and ACL + template { + data = <<_EOT +[[ template "kafka/jikkou/jikkou" $c ]] +_EOT + destination = "local/jikkou" + perms = 755 + } + + # The jikkou configuration file + template { + data = <<_EOT +[[ template "kafka/jikkou/jikkou.conf" $c ]] +_EOT + destination = "local/jikkou.conf" + perms = 755 + } + + # Kafka topics definition + template { + data = <<_EOT +[[- if isKind "string" $c.topics ]] +[[ $c.topics ]] +[[- else ]] +# Invalid kafka topics definition +[[- end ]] +_EOT + destination = "local/kafka-topics.yml" + } + + # Kafka ACL definition + template { + data = <<_EOT +[[- if isKind "string" $c.acls ]] +[[ $c.acls ]] +[[- else ]] +# Invalid kafka ACL definition +[[- end ]] +_EOT + destination = "local/kafka-acls.yml" + } + + # Client cert used by Jikkou to connect on kafka brokers + template { + data = <<_EOT +{{- with pkiCert "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-jikkou" "common_name=[[ .instance ]]-jikkou" "ttl=15m" }} +{{ .Cert }} +{{ .Key }} +{{- end }} +_EOT + destination = "secrets/jikkou.bundle.pem" + uid = 100000 + gid = 100000 + perms = "0400" + } + + # CA certificate to validate brokers' cert + template { + data = <<_EOT +{{ with secret "[[ $c.vault.pki.path ]]/cert/ca_chain" }}{{ .Data.ca_chain }}{{ end }} +_EOT + destination = "secrets/jikkou.ca.pem" + } + +[[ template "common/resources" $c ]] + } + } +} diff --git a/templates/brokers/client.properties b/templates/brokers/client.properties new file mode 100644 index 0000000..2889301 --- /dev/null +++ b/templates/brokers/client.properties @@ -0,0 +1,13 @@ +security.protocol=SSL +ssl.keystore.location=/secrets/kafka.p12 +ssl.keystore.password=password +ssl.keystore.type=PKCS12 +ssl.truststore.location=/secrets/kafka.ca.pem +ssl.truststore.type=PEM +ssl.client.auth=required +ssl.secure.random.implementation=SHA1PRNG +client.dns.lookup=use_all_dns_ips +bootstrap.servers= +{{- range $index, $instance := service "[[ .instance ]]-broker[[ .consul.suffix ]]" -}} +{{ if ne $index 0 }},{{ end }}{{ $instance.Address }}:{{ $instance.Port }} +{{- end }} diff --git a/templates/brokers/jaas.conf b/templates/brokers/jaas.conf new file mode 100644 index 0000000..63c9d80 --- /dev/null +++ b/templates/brokers/jaas.conf @@ -0,0 +1,6 @@ +Client { + org.apache.zookeeper.server.auth.DigestLoginModule required + username="[[ .zookeeper.user ]]" + password="[[ .zookeeper.password ]]" +; +}; diff --git a/templates/brokers/server.properties b/templates/brokers/server.properties new file mode 100644 index 0000000..f42ef25 --- /dev/null +++ b/templates/brokers/server.properties @@ -0,0 +1,37 @@ +# Broker ID +broker.id={{ env "NOMAD_ALLOC_INDEX" }} + +# Network settings +listeners=CLIENT://:{{ env "NOMAD_PORT_client" }},BROKER://:{{ env "NOMAD_PORT_broker" }} +listener.security.protocol.map=CLIENT:SSL,BROKER:SSL +inter.broker.listener.name=BROKER +advertised.listeners=CLIENT://broker-{{ env "NOMAD_ALLOC_INDEX" }}.exchange-broker{{ env "ENV_SUFFIX" }}.service.{{ env "CT_DOMAIN" }}:{{ env "NOMAD_HOST_PORT_client" }},BROKER://{{ env "NOMAD_ADDR_broker" }} + +# TLS Settings +ssl.keystore.location=/secrets/kafka.p12 +ssl.keystore.password=password +ssl.keystore.type=PKCS12 +ssl.truststore.location=/secrets/kafka.ca.pem +ssl.truststore.type=PEM +ssl.client.auth=required +ssl.secure.random.implementation=SHA1PRNG +ssl.principal.mapping.rules=\ + RULE:^CN=([^,]+),OU=.*$/$1/,\ + DEFAULT + +# Storage settings +log.dirs=/data/kafka + +# Zookeeper settings +zookeeper.connect=[[ join .zookeeper.servers "," ]][[ if has .zookeeper "path" ]][[ .zookeeper.path ]][[ end ]] + +# ACL +[[- if .use_acl ]] +authorizer.class.name=kafka.security.authorizer.AclAuthorizer +super.users=[[ range $idx, $user := .super_users ]][[ if ne $idx 0 ]];[[ end ]]User:[[ $user ]][[ end ]] +[[ end ]] + +# Other settings +[[- range $k, $v := .settings ]] +[[ $k ]]=[[ $v ]] +[[- end ]] diff --git a/templates/exporter/start.sh b/templates/exporter/start.sh new file mode 100644 index 0000000..069812a --- /dev/null +++ b/templates/exporter/start.sh @@ -0,0 +1,19 @@ +#!/bin/sh + +set -euo pipefail + +exec kafka_exporter \ + --tls.enabled \ + --tls.ca-file=/local/kafka.ca.pem \ + --tls.cert-file=/secrets/kafka.bundle.pem \ + --tls.key-file=/secrets/kafka.bundle.pem \ + --server.tls.enabled \ + --server.tls.mutual-auth-enabled \ + --server.tls.ca-file=/local/monitoring.ca.pem \ + --server.tls.cert-file=/secrets/metrics.bundle.pem \ + --server.tls.key-file=/secrets/metrics.bundle.pem \ +{{- range $index, $instance := service "[[ .instance ]]-broker[[ .consul.suffix ]]" }} + --kafka.server={{ $instance.Address }}:{{ $instance.Port }} \ +{{- end }} + --web.listen-address=:${NOMAD_ALLOC_PORT_metrics} + diff --git a/templates/jikkou/jikkou b/templates/jikkou/jikkou new file mode 100644 index 0000000..25795b5 --- /dev/null +++ b/templates/jikkou/jikkou @@ -0,0 +1,17 @@ +#!/bin/sh + +set -eux + +jikkou config set-context [[ .instance ]] --config-file=/local/jikkou.conf +jikkou config view --name [[ .instance ]] + +echo "Create and modify topics" +jikkou update \ + --selector="kind IN (KafkaTopic,KafkaTopicList)" \ + --files=/local/kafka-topics.yml + +echo "Apply ACLs" +jikkou apply \ + --selector="kind IN (KafkaPrincipalRole,KafkaPrincipalAuthorization)" \ + --options=delete-orphans=true \ + --files=/local/kafka-acls.yml diff --git a/templates/jikkou/jikkou.conf b/templates/jikkou/jikkou.conf new file mode 100644 index 0000000..81556de --- /dev/null +++ b/templates/jikkou/jikkou.conf @@ -0,0 +1,19 @@ +jikkou { + kafka { + client { + bootstrap.servers = " +{{- range $index, $instance := service "[[ .instance ]]-broker[ .consul.suffix ]]" -}} +{{ if ne $index 0 }},{{ end }}{{ $instance.Address }}:{{ $instance.Port }} +{{- end -}} +" + security.protocol = "SSL" + ssl.keystore.location = "/secrets/jikkou.p12" + ssl.keystore.type = "PKCS12" + ssl.keystore.password = "password" + ssl.truststore.location = "/secrets/jikkou.ca.pem" + ssl.truststore.type = "PEM" + client.id = "[[ .instance ]]-jikkou" + client.dns.lookup = "use_all_dns_ips" + } + } +} diff --git a/variables.yml b/variables.yml new file mode 100644 index 0000000..eb963b5 --- /dev/null +++ b/variables.yml @@ -0,0 +1,86 @@ +--- + +instance: kafka + +vault: + pki: + path: "[[ .vault.root ]]pki/kafka" + ou: Kafka Cluster ([[ .instance ]]) + +kafka: + broker: + count: 3 + version: 3.7.0 + image: '[[ .docker.repo ]]kafka-broker:[[ .kafka.broker.version ]]-1' + env: {} + resources: + cpu: 100 + memory: 1024 + prometheus: + enabled: '[[ .prometheus.available ]]' + # static_client_port: 9092 + # static_broker_port: 9095 + consul: + meta: + broker: 'broker-${NOMAD_ALLOC_INDEX}.[[ .instance ]][[ .consul.suffix ]].service.[[ .consul.domain ]]' + connect: + upstreams: + - destination_name: zookeeper[[ .consul.suffix ]] + local_bind_port: 2181 + vault: + policies: + - '[[ .instance ]]-broker[[ .consul.suffix ]]' + wait_for: + - service: zookeeper[[ .consul.suffix ]] + count: 2 + zookeeper: + servers: + - 127.0.0.1:2181 + # path: /kafka + user: "" + password: "" + use_acl: true + super_users: + - '[[ .instance ]]-jikkou' + settings: + log.retention.hours: 168 + compression.type: zstd + zookeeper.connection.timeout.ms: 600 + zookeeper.set.acl: true + message.max.bytes: 1073741824 + auto.create.topics.enable: false + transactional.id.expiration.ms: 2147483647 + offsets.retention.minutes: 13140 + volumes: + data: + type: csi + source: '[[ .instance ]]-broker-data' + per_alloc: true + + jikkou: + version: 0.33.3 + image: '[[ .docker.repo ]]jikkou:[[ .kafka.jikkou.version ]]-1' + env: {} + resources: + cpu: 10 + memory: 256 + vault: + policies: + - '[[ .instance ]]-jikkou[[ .consul.suffix ]]' + topics: "" + acls: "" + + exporter: + version: 1.7.0 + image: '[[ .docker.repo ]]kafka-exporter:[[ .kafka.exporter.version ]]-1' + env: {} + wait_for: + - service: '[[ .instance ]]-broker[[ .consul.suffix ]]' + count: '[[ .kafka.broker.count | mul 0.5 | add 1 | math.Floor ]]' + vault: + policies: + - '[[ .instance ]]-exporter[[ .consul.suffix ]]' + - metrics[[ .consul.suffix ]] + resources: + cpu: 10 + memory: 50 diff --git a/vault/policies/kafka-brokers.hcl b/vault/policies/kafka-brokers.hcl new file mode 100644 index 0000000..da3f8bc --- /dev/null +++ b/vault/policies/kafka-brokers.hcl @@ -0,0 +1,4 @@ +[[- $c := merge .kafka.broker .kafka . ]] +path "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-broker" { + capabilities = ["update"] +} diff --git a/vault/policies/kafka-exporter.hcl b/vault/policies/kafka-exporter.hcl new file mode 100644 index 0000000..3a2171e --- /dev/null +++ b/vault/policies/kafka-exporter.hcl @@ -0,0 +1,5 @@ +[[- $c := merge .kafka.exporter .kafka . ]] +path "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-exporter" { + capabilities = ["update"] +} + diff --git a/vault/policies/kafka-jikkou.hcl b/vault/policies/kafka-jikkou.hcl new file mode 100644 index 0000000..938fcf6 --- /dev/null +++ b/vault/policies/kafka-jikkou.hcl @@ -0,0 +1,4 @@ +[[- $c := merge .kafka . ]] +path "[[ $c.vault.pki.path ]]/issue/[[ .instance ]]-jikkou" { + capabilities = ["update"] +}