package org.n52.youngs.control.impl;

import com.google.common.base.MoreObjects;
import com.google.common.base.Stopwatch;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.n52.youngs.api.Report;
import org.n52.youngs.control.Runner;
import org.n52.youngs.exception.MappingError;
import org.n52.youngs.exception.SinkError;
import org.n52.youngs.harvest.NodeSourceRecord;
import org.n52.youngs.harvest.Source;
import org.n52.youngs.harvest.SourceException;
import org.n52.youngs.harvest.SourceRecord;
import org.n52.youngs.impl.ReportImpl;
import org.n52.youngs.load.Sink;
import org.n52.youngs.load.SinkRecord;
import org.n52.youngs.postprocess.PostProcessor;
import org.n52.youngs.transform.Mapper;
import org.n52.youngs.validation.XmlSchemaValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Node;
import org.xml.sax.SAXException;

/* loaded from: input_file:org/n52/youngs/control/impl/SingleThreadBulkRunner.class */
public class SingleThreadBulkRunner implements Runner {
    private static final Logger log = LoggerFactory.getLogger(SingleThreadBulkRunner.class);
    private static final long DEFAULT_BULK_SIZE = 10;
    private Source source;
    private Mapper mapper;
    private Sink sink;
    private PostProcessor postProcessor;
    private boolean validateXml;
    private List<XmlSchemaValidator> validators;
    private long bulkSize = DEFAULT_BULK_SIZE;
    private long recordsLimit = Long.MAX_VALUE;
    private Optional<Double> completedPercentage = Optional.empty();
    private final boolean testRun = false;
    private long startPosition = 1;

    public SingleThreadBulkRunner setBulkSize(long j) {
        this.bulkSize = j;
        return this;
    }

    public SingleThreadBulkRunner setStartPosition(long j) {
        this.startPosition = j;
        return this;
    }

    public SingleThreadBulkRunner setRecordsLimit(long j) {
        this.recordsLimit = j;
        return this;
    }

    @Override // org.n52.youngs.control.Runner
    public SingleThreadBulkRunner harvest(Source source) {
        this.source = source;
        log.debug("Saved source, waiting for load() to be called...", source);
        return this;
    }

    @Override // org.n52.youngs.control.Runner
    public SingleThreadBulkRunner transform(Mapper mapper) {
        this.mapper = mapper;
        log.debug("Saved mapper, waiting for load() to be called...", this.source);
        return this;
    }

    @Override // org.n52.youngs.control.Runner
    public Runner postTransformProcess(PostProcessor postProcessor) {
        this.postProcessor = postProcessor;
        log.debug("Saved postProcessor, waiting for load() to be called...", this.source);
        return this;
    }

    @Override // org.n52.youngs.control.Runner
    public Runner withValidators(List<XmlSchemaValidator> list) {
        this.validators = list;
        this.validateXml = true;
        return this;
    }

    @Override // org.n52.youngs.control.Runner
    public Report load(Sink sink) {
        this.sink = sink;
        Objects.nonNull(this.source);
        Objects.nonNull(this.mapper);
        Objects.nonNull(this.sink);
        log.info("Starting harvest from {} to {} with {}", new Object[]{this.source, this.sink, this.mapper});
        ReportImpl reportImpl = new ReportImpl();
        try {
            if (!sink.prepare(this.mapper.getMapper())) {
                log.error("The sink could not be prepared. Stopping load, please check the logs.");
                reportImpl.addMessage("The sink could not be prepared. Stopping load, please check the logs.", Report.Level.ERROR);
                return reportImpl;
            }
            Stopwatch createStarted = Stopwatch.createStarted();
            long j = this.startPosition;
            long recordCount = this.source.getRecordCount();
            long min = this.recordsLimit == Long.MAX_VALUE ? recordCount : Math.min(this.recordsLimit + this.startPosition, recordCount);
            Stopwatch createUnstarted = Stopwatch.createUnstarted();
            Stopwatch createUnstarted2 = Stopwatch.createUnstarted();
            Stopwatch createUnstarted3 = Stopwatch.createUnstarted();
            Stopwatch createUnstarted4 = Stopwatch.createUnstarted();
            double d = 0.0d;
            long j2 = 0;
            while (true) {
                long j3 = j2;
                if (j > min) {
                    break;
                }
                createUnstarted4.start();
                long min2 = Math.min((min - j) + 1, this.bulkSize);
                if (min2 <= 0) {
                    break;
                }
                log.info("### [{}] Requesting {} records from {} starting at {}, last requested record will be {} ###", new Object[]{Long.valueOf(j3), Long.valueOf(min2), this.source.getEndpoint(), Long.valueOf(j), Long.valueOf(min)});
                try {
                    try {
                        createUnstarted.start();
                        Collection<SourceRecord> records = this.source.getRecords(j, min2, reportImpl);
                        createUnstarted.stop();
                        if (this.validateXml) {
                            int i = 0;
                            Iterator<SourceRecord> it = records.iterator();
                            while (it.hasNext()) {
                                List<String> validate = validate(it.next());
                                if (!validate.isEmpty()) {
                                    validate.forEach(str -> {
                                        reportImpl.addMessage(str, Report.Level.INFO);
                                    });
                                }
                                int i2 = i;
                                i++;
                                log.debug("File #{} is schema valid", Integer.valueOf(i2));
                            }
                        }
                        log.debug("Mapping {} retrieved records.", Integer.valueOf(records.size()));
                        createUnstarted2.start();
                        List list = (List) records.stream().map(sourceRecord -> {
                            try {
                                SinkRecord map = this.mapper.map(sourceRecord);
                                return this.postProcessor != null ? this.postProcessor.process(map) : map;
                            } catch (MappingError e) {
                                reportImpl.addFailedRecord(sourceRecord.toString(), "Problem during mapping: " + e.getMessage());
                                return null;
                            }
                        }).filter((v0) -> {
                            return Objects.nonNull(v0);
                        }).collect(Collectors.toList());
                        createUnstarted2.stop();
                        log.debug("Storing {} mapped records.", Integer.valueOf(list.size()));
                        createUnstarted3.start();
                        list.forEach(sinkRecord -> {
                            try {
                                if (sink.store(sinkRecord)) {
                                    reportImpl.addSuccessfulRecord(sinkRecord.getId());
                                } else {
                                    reportImpl.addFailedRecord(sinkRecord.getId(), "see sink log");
                                }
                            } catch (SinkError e) {
                                log.warn("Problem during mapping: ", e);
                                reportImpl.addFailedRecord(sinkRecord.toString(), "Problem during mapping: " + e.getMessage());
                            }
                        });
                        createUnstarted3.stop();
                        if (createUnstarted.isRunning()) {
                            createUnstarted.stop();
                        }
                        if (createUnstarted2.isRunning()) {
                            createUnstarted2.stop();
                        }
                        if (createUnstarted3.isRunning()) {
                            createUnstarted3.stop();
                        }
                    } catch (Throwable th) {
                        if (createUnstarted.isRunning()) {
                            createUnstarted.stop();
                        }
                        if (createUnstarted2.isRunning()) {
                            createUnstarted2.stop();
                        }
                        if (createUnstarted3.isRunning()) {
                            createUnstarted3.stop();
                        }
                        throw th;
                    }
                } catch (RuntimeException e) {
                    String format = String.format("Unexpected error while processing records %s to %s: %s", Long.valueOf(j), Long.valueOf(min2), e.getMessage());
                    log.error(format, e);
                    reportImpl.addMessage(format, Report.Level.ERROR);
                    if (createUnstarted.isRunning()) {
                        createUnstarted.stop();
                    }
                    if (createUnstarted2.isRunning()) {
                        createUnstarted2.stop();
                    }
                    if (createUnstarted3.isRunning()) {
                        createUnstarted3.stop();
                    }
                } catch (SourceException e2) {
                    String format2 = String.format("Issue while processing records %s to %s: %s", Long.valueOf(j), Long.valueOf(min2), e2.getMessage());
                    log.info(format2, e2);
                    reportImpl.addMessage(format2, Report.Level.ERROR);
                    if (createUnstarted.isRunning()) {
                        createUnstarted.stop();
                    }
                    if (createUnstarted2.isRunning()) {
                        createUnstarted2.stop();
                    }
                    if (createUnstarted3.isRunning()) {
                        createUnstarted3.stop();
                    }
                }
                j += this.bulkSize;
                createUnstarted4.stop();
                d = ((d * j3) + createUnstarted4.elapsed(TimeUnit.SECONDS)) / (j3 + 1);
                updateAndLog(j3, (j3 + 1) * this.bulkSize, createUnstarted4.elapsed(TimeUnit.SECONDS), d);
                createUnstarted4.reset();
                j2 = j3 + 1;
            }
            createStarted.stop();
            log.info("Completed harvesting for {} ({} failed) of {} records in {} minutes", new Object[]{Integer.valueOf(reportImpl.getNumberOfRecordsAdded()), Integer.valueOf(reportImpl.getNumberOfRecordsFailed()), Long.valueOf(this.source.getRecordCount()), Long.valueOf(createStarted.elapsed(TimeUnit.MINUTES))});
            log.info("Time spent (minutes): source={}, mapping={}, sink={}", new Object[]{Long.valueOf(createUnstarted.elapsed(TimeUnit.MINUTES)), Long.valueOf(createUnstarted2.elapsed(TimeUnit.MINUTES)), Long.valueOf(createUnstarted3.elapsed(TimeUnit.MINUTES))});
            return reportImpl;
        } catch (SinkError e3) {
            log.error("Problem preparing sink", e3);
            reportImpl.addMessage(String.format("Problem preparing sink: %s", e3.getMessage()), Report.Level.ERROR);
            return reportImpl;
        }
    }

    @Override // org.n52.youngs.control.Runner
    public double getCompletedPercentage() {
        return this.completedPercentage.orElse(Double.valueOf(Double.NEGATIVE_INFINITY)).doubleValue();
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("source", this.source).add("mapper", this.mapper).add("sink", this.sink).toString();
    }

    private void updateAndLog(long j, long j2, long j3, double d) {
        this.completedPercentage = Optional.of(Double.valueOf((j2 / this.recordsLimit) * 100.0d));
        log.info("### [{}] Completed {}% of task in {} seconds (avg: {} seconds) ###", new Object[]{Long.valueOf(j), String.format("%1$,.2f", Double.valueOf(getCompletedPercentage())), Long.valueOf(j3), String.format("%1$,.2f", Double.valueOf(d))});
    }

    private List<String> validate(SourceRecord sourceRecord) throws SourceException {
        if (!(sourceRecord instanceof NodeSourceRecord)) {
            log.warn("The SourceRecord class {} is not supported", sourceRecord.getClass().getName());
            return Collections.emptyList();
        }
        NodeSourceRecord nodeSourceRecord = (NodeSourceRecord) sourceRecord;
        try {
            XmlSchemaValidator resolveValidator = resolveValidator(nodeSourceRecord.getRecord());
            return resolveValidator != null ? resolveValidator.validate(nodeSourceRecord.getRecord()) : Collections.singletonList("No schema validator available for namespace: " + nodeSourceRecord.getRecord().getNamespaceURI());
        } catch (IOException | SAXException e) {
            throw new SourceException(e.getMessage(), e);
        }
    }

    private XmlSchemaValidator resolveValidator(Node node) {
        if (this.validators == null || this.validators.isEmpty()) {
            return null;
        }
        for (XmlSchemaValidator xmlSchemaValidator : this.validators) {
            if (xmlSchemaValidator.matchesNamespace(node.getNamespaceURI())) {
                return xmlSchemaValidator;
            }
        }
        return null;
    }
}
