/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.cli.app;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.quarkus.runtime.ShutdownEvent;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import java.util.HashMap;
import java.util.Optional;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.eclipse.hono.application.client.ApplicationClient;
import org.eclipse.hono.application.client.MessageContext;
import org.eclipse.hono.application.client.amqp.ProtonBasedApplicationClient;
import org.eclipse.hono.application.client.kafka.impl.KafkaApplicationClientImpl;
import org.eclipse.hono.cli.app.CommandAndControl;
import org.eclipse.hono.cli.app.TelemetryAndEvent;
import org.eclipse.hono.cli.util.ConnectionOptions;
import org.eclipse.hono.cli.util.PropertiesVersionProvider;
import org.eclipse.hono.client.amqp.config.ClientConfigProperties;
import org.eclipse.hono.client.amqp.connection.HonoConnection;
import org.eclipse.hono.client.kafka.CommonKafkaClientConfigProperties;
import org.eclipse.hono.client.kafka.consumer.MessagingKafkaConsumerConfigProperties;
import org.eclipse.hono.client.kafka.producer.CachingKafkaProducerFactory;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.config.FileFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

@Singleton
@CommandLine.Command(name="app", description={"A client for interacting with Hono's north bound API endpoints."}, mixinStandardHelpOptions=true, versionProvider=PropertiesVersionProvider.class, sortOptions=false, subcommands={TelemetryAndEvent.class, CommandAndControl.class})
@SuppressFBWarnings(value={"HARD_CODE_PASSWORD"}, justification="We use the default passwords of the Hono Sandbox installation throughout this class\nfor ease of use. The passwords are publicly documented and do not affect any\nprivate installations of Hono.\n")
public class NorthBoundApis {
    private static final Logger LOG = LoggerFactory.getLogger(NorthBoundApis.class);
    private static final String SANDBOX_AMQP_USER = "consumer@HONO";
    private static final String SANDBOX_AMQP_PWD = "verysecret";
    private static final String SANDBOX_KAFKA_USER = "hono";
    private static final String SANDBOX_KAFKA_PWD = "hono-secret";
    @CommandLine.Mixin
    ConnectionOptions connectionOptions;
    @CommandLine.Option(names={"--amqp"}, description={"Connect to the AMQP 1.0 based API endpoints", "If not set, the Kafka based endpoints are used by default"}, order=12)
    boolean useAmqp;
    @Inject
    Vertx vertx;
    @CommandLine.Spec
    CommandLine.Model.CommandSpec spec;
    ApplicationClient<? extends MessageContext> client;

    private void validateConnectionOptions() {
        if (this.connectionOptions.useSandbox) {
            if (!this.connectionOptions.trustStorePath.isPresent()) {
                throw new CommandLine.ParameterException(this.spec.commandLine(), "Missing required option: '--ca-file=<path>' needs to be specified when using '--sandbox'.\n");
            }
        } else if (this.connectionOptions.hostname.isEmpty() || this.connectionOptions.portNumber.isEmpty()) {
            throw new CommandLine.ParameterException(this.spec.commandLine(), "Missing required option: both '--host=<hostname>' and '--port=<portNumber> need to be specified when not using '--sandbox'.\n");
        }
    }

    private String scramJaasConfig(String username, String password) {
        return "%s required username=\"%s\" password=\"%s\";\n".formatted(ScramLoginModule.class.getName(), username, password);
    }

    Future<KafkaApplicationClientImpl> createKafkaClient() {
        String bootstrapServers;
        this.validateConnectionOptions();
        HashMap<String, String> commonProps = new HashMap<String, String>();
        this.connectionOptions.trustStorePath.ifPresent(path -> {
            commonProps.put("ssl.truststore.location", (String)path);
            this.connectionOptions.trustStorePassword.ifPresent(pwd -> commonProps.put("ssl.truststore.password", (String)pwd));
            Optional.ofNullable(FileFormat.detect(path)).ifPresent(fileFormat -> commonProps.put("ssl.truststore.type", fileFormat.name()));
        });
        if (this.connectionOptions.useSandbox) {
            bootstrapServers = "%s:9094".formatted("hono.eclipseprojects.io");
            commonProps.put("bootstrap.servers", bootstrapServers);
            commonProps.put("security.protocol", SecurityProtocol.SASL_SSL.name);
            commonProps.put("sasl.mechanism", ScramMechanism.SCRAM_SHA_512.mechanismName());
            Optional.ofNullable(this.connectionOptions.credentials).ifPresentOrElse(creds -> commonProps.put("sasl.jaas.config", this.scramJaasConfig(creds.username, creds.password)), () -> commonProps.put("sasl.jaas.config", this.scramJaasConfig(SANDBOX_KAFKA_USER, SANDBOX_KAFKA_PWD)));
        } else {
            bootstrapServers = "%s:%d".formatted(this.connectionOptions.hostname.get(), this.connectionOptions.portNumber.get());
            commonProps.put("bootstrap.servers", bootstrapServers);
            if (this.connectionOptions.disableHostnameVerification) {
                commonProps.put("ssl.endpoint.identification.algorithm", "");
            }
            if (this.connectionOptions.credentials == null) {
                if (this.connectionOptions.trustStorePath.isPresent()) {
                    commonProps.put("security.protocol", SecurityProtocol.SSL.name);
                }
            } else {
                if (this.connectionOptions.trustStorePath.isEmpty()) {
                    commonProps.put("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name);
                } else {
                    commonProps.put("security.protocol", SecurityProtocol.SASL_SSL.name);
                }
                commonProps.put("sasl.mechanism", ScramMechanism.SCRAM_SHA_512.mechanismName());
                commonProps.put("sasl.jaas.config", this.scramJaasConfig(this.connectionOptions.credentials.username, this.connectionOptions.credentials.password));
            }
        }
        CommonKafkaClientConfigProperties commonClientConfig = new CommonKafkaClientConfigProperties();
        commonClientConfig.setCommonClientConfig(commonProps);
        MessagingKafkaConsumerConfigProperties consumerProps = new MessagingKafkaConsumerConfigProperties();
        consumerProps.setCommonClientConfig(commonClientConfig);
        MessagingKafkaProducerConfigProperties producerProps = new MessagingKafkaProducerConfigProperties();
        producerProps.setCommonClientConfig(commonClientConfig);
        System.err.printf("Connecting to Kafka based messaging infrastructure [%s]%n", bootstrapServers);
        KafkaApplicationClientImpl kafkaClient = new KafkaApplicationClientImpl(this.vertx, consumerProps, CachingKafkaProducerFactory.sharedFactory(this.vertx), producerProps);
        return this.startClientAndWaitForReadiness(kafkaClient).map(ok -> {
            this.client = kafkaClient;
            return kafkaClient;
        });
    }

    Future<ProtonBasedApplicationClient> createAmqpClient() {
        this.validateConnectionOptions();
        ClientConfigProperties clientConfig = new ClientConfigProperties();
        clientConfig.setReconnectAttempts(5);
        clientConfig.setServerRole("Hono Messaging Infrastructure");
        this.connectionOptions.trustStorePath.ifPresent(path -> {
            clientConfig.setTrustStorePath((String)path);
            this.connectionOptions.trustStorePassword.ifPresent(clientConfig::setTrustStorePassword);
        });
        if (this.connectionOptions.useSandbox) {
            clientConfig.setHost("hono.eclipseprojects.io");
            clientConfig.setPort(15671);
            clientConfig.setUsername(SANDBOX_AMQP_USER);
            clientConfig.setPassword(SANDBOX_AMQP_PWD);
        } else {
            this.connectionOptions.hostname.ifPresent(clientConfig::setHost);
            this.connectionOptions.portNumber.ifPresent(clientConfig::setPort);
            clientConfig.setHostnameVerificationRequired(!this.connectionOptions.disableHostnameVerification);
            Optional.ofNullable(this.connectionOptions.credentials).ifPresent(creds -> {
                clientConfig.setUsername(creds.username);
                clientConfig.setPassword(creds.password);
            });
        }
        ProtonBasedApplicationClient amqpClient = new ProtonBasedApplicationClient(HonoConnection.newConnection(this.vertx, clientConfig));
        System.err.printf("Connecting to AMQP 1.0 based messaging infrastructure [%s:%d]%n", clientConfig.getHost(), clientConfig.getPort());
        return this.startClientAndWaitForReadiness(amqpClient).map(ok -> {
            this.client = amqpClient;
            return amqpClient;
        });
    }

    private Future<Void> startClientAndWaitForReadiness(ApplicationClient<? extends MessageContext> client) {
        Promise<Void> readyTracker = Promise.promise();
        client.addOnClientReadyHandler(readyTracker);
        return client.start().compose(ok -> readyTracker.future());
    }

    public Future<ApplicationClient<? extends MessageContext>> getApplicationClient() {
        Promise<ApplicationClient<? extends MessageContext>> result = Promise.promise();
        if (this.client != null) {
            result.complete(this.client);
        } else if (this.useAmqp) {
            this.createAmqpClient().onSuccess(result::complete).onFailure(result::fail);
        } else {
            this.createKafkaClient().onSuccess(result::complete).onFailure(result::fail);
        }
        return result.future();
    }

    public void onStop(@Observes ShutdownEvent ev) {
        if (this.client != null) {
            LOG.debug("disconnecting from Hono");
            this.client.stop().onComplete(ar -> LOG.debug("stopped consumers")).toCompletionStage().toCompletableFuture().join();
        }
    }
}

