/*
 * Decompiled with CFR 0.152.
 */
package org.apache.skywalking.apm.agent.core.remote;

import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
import org.apache.skywalking.apm.agent.core.remote.GRPCStreamServiceStatus;
import org.apache.skywalking.apm.agent.core.util.CollectionUtil;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.dependencies.io.grpc.Channel;
import org.apache.skywalking.apm.dependencies.io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogReportServiceGrpc;

@DefaultImplementor
public class LogReportServiceClient
implements BootService,
GRPCChannelListener,
IConsumer<LogData.Builder> {
    private static final ILog LOGGER = LogManager.getLogger(LogReportServiceClient.class);
    private volatile DataCarrier<LogData.Builder> carrier;
    private volatile GRPCChannelStatus status;
    private volatile LogReportServiceGrpc.LogReportServiceStub logReportServiceStub;

    @Override
    public void prepare() throws Throwable {
        ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
    }

    @Override
    public void boot() throws Throwable {
        this.carrier = new DataCarrier("gRPC-log", "gRPC-log", Config.Buffer.CHANNEL_SIZE, Config.Buffer.BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
        this.carrier.consume(this, 1);
    }

    @Override
    public void onComplete() throws Throwable {
    }

    public void produce(LogData.Builder logData) {
        if (Objects.nonNull(logData) && !this.carrier.produce(logData) && LOGGER.isDebugEnable()) {
            LOGGER.debug("One log has been abandoned, cause by buffer is full.");
        }
    }

    @Override
    public void init(Properties properties) {
    }

    @Override
    public void consume(final List<LogData.Builder> dataList) {
        if (CollectionUtil.isEmpty(dataList)) {
            return;
        }
        if (GRPCChannelStatus.CONNECTED.equals((Object)this.status)) {
            final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
            StreamObserver<LogData> logDataStreamObserver = ((LogReportServiceGrpc.LogReportServiceStub)this.logReportServiceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)).collect(new StreamObserver<Commands>(){

                @Override
                public void onNext(Commands commands) {
                }

                @Override
                public void onError(Throwable throwable) {
                    status.finished();
                    LOGGER.error(throwable, "Try to send {} log data to collector, with unexpected exception.", dataList.size());
                    ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
                }

                @Override
                public void onCompleted() {
                    status.finished();
                }
            });
            boolean isFirst = true;
            for (LogData.Builder logData : dataList) {
                if (isFirst) {
                    logData.setService(Config.Agent.SERVICE_NAME);
                    isFirst = false;
                }
                logDataStreamObserver.onNext(logData.build());
            }
            logDataStreamObserver.onCompleted();
            status.wait4Finish();
        }
    }

    @Override
    public void onError(List<LogData.Builder> data, Throwable t) {
        LOGGER.error(t, "Try to consume {} log data to sender, with unexpected exception.", data.size());
    }

    @Override
    public void onExit() {
    }

    @Override
    public void statusChanged(GRPCChannelStatus status) {
        if (GRPCChannelStatus.CONNECTED.equals((Object)status)) {
            Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
            this.logReportServiceStub = (LogReportServiceGrpc.LogReportServiceStub)LogReportServiceGrpc.newStub(channel).withMaxOutboundMessageSize(Config.Log.MAX_MESSAGE_SIZE);
        }
        this.status = status;
    }

    @Override
    public void shutdown() {
        this.carrier.shutdownConsumers();
    }
}

