package org.graylog2.streams;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.QueryBuilder;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import javax.inject.Inject;
import org.bson.types.ObjectId;
import org.graylog.security.entities.EntityOwnershipService;
import org.graylog2.database.MongoConnection;
import org.graylog2.database.NotFoundException;
import org.graylog2.database.PersistedServiceImpl;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.MongoIndexSet;
import org.graylog2.indexer.indexset.IndexSetConfig;
import org.graylog2.indexer.indexset.IndexSetService;
import org.graylog2.notifications.Notification;
import org.graylog2.notifications.NotificationService;
import org.graylog2.plugin.Tools;
import org.graylog2.plugin.database.ValidationException;
import org.graylog2.plugin.database.users.User;
import org.graylog2.plugin.streams.Output;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.plugin.streams.StreamRule;
import org.graylog2.rest.resources.streams.requests.CreateStreamRequest;
import org.graylog2.streams.events.StreamDeletedEvent;
import org.graylog2.streams.events.StreamsChangedEvent;
import org.mongojack.DBProjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/graylog2/streams/StreamServiceImpl.class */
public class StreamServiceImpl extends PersistedServiceImpl implements StreamService {
    private static final Logger LOG = LoggerFactory.getLogger(StreamServiceImpl.class);
    private final StreamRuleService streamRuleService;
    private final OutputService outputService;
    private final IndexSetService indexSetService;
    private final MongoIndexSet.Factory indexSetFactory;
    private final NotificationService notificationService;
    private final EntityOwnershipService entityOwnershipService;
    private final ClusterEventBus clusterEventBus;

    @Inject
    public StreamServiceImpl(MongoConnection mongoConnection, StreamRuleService streamRuleService, OutputService outputService, IndexSetService indexSetService, MongoIndexSet.Factory factory, NotificationService notificationService, EntityOwnershipService entityOwnershipService, ClusterEventBus clusterEventBus) {
        super(mongoConnection);
        this.streamRuleService = streamRuleService;
        this.outputService = outputService;
        this.indexSetService = indexSetService;
        this.indexSetFactory = factory;
        this.notificationService = notificationService;
        this.entityOwnershipService = entityOwnershipService;
        this.clusterEventBus = clusterEventBus;
    }

    @Nullable
    private IndexSet getIndexSet(DBObject dBObject) {
        return getIndexSet((String) dBObject.get("index_set_id"));
    }

    @Nullable
    private IndexSet getIndexSet(String str) {
        if (Strings.isNullOrEmpty(str)) {
            return null;
        }
        return (IndexSet) this.indexSetService.get(str).flatMap(indexSetConfig -> {
            return Optional.of(this.indexSetFactory.create(indexSetConfig));
        }).orElse(null);
    }

    public Stream load(ObjectId objectId) throws NotFoundException {
        DBObject dBObject = get(StreamImpl.class, objectId);
        if (dBObject == null) {
            throw new NotFoundException("Stream <" + objectId + "> not found!");
        }
        List<StreamRule> loadForStreamId = this.streamRuleService.loadForStreamId(objectId.toHexString());
        Set<Output> loadOutputsForRawStream = loadOutputsForRawStream(dBObject);
        return new StreamImpl((ObjectId) dBObject.get("_id"), dBObject.toMap(), loadForStreamId, loadOutputsForRawStream, getIndexSet(dBObject));
    }

    @Override // org.graylog2.streams.StreamService
    public Stream create(Map<String, Object> map) {
        return new StreamImpl(map, getIndexSet((String) map.get("index_set_id")));
    }

    @Override // org.graylog2.streams.StreamService
    public Stream create(CreateStreamRequest createStreamRequest, String str) {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("title", createStreamRequest.title().strip());
        newHashMap.put("description", createStreamRequest.description());
        newHashMap.put("creator_user_id", str);
        newHashMap.put("created_at", Tools.nowUTC());
        newHashMap.put("content_pack", createStreamRequest.contentPack());
        newHashMap.put("matching_type", createStreamRequest.matchingType().toString());
        newHashMap.put("disabled", false);
        newHashMap.put("remove_matches_from_default_stream", Boolean.valueOf(createStreamRequest.removeMatchesFromDefaultStream()));
        newHashMap.put("index_set_id", createStreamRequest.indexSetId());
        return create(newHashMap);
    }

    @Override // org.graylog2.streams.StreamService
    public Stream load(String str) throws NotFoundException {
        try {
            return load(new ObjectId(str));
        } catch (IllegalArgumentException e) {
            throw new NotFoundException("Stream <" + str + "> not found!");
        }
    }

    @Override // org.graylog2.streams.StreamService
    public List<Stream> loadAllEnabled() {
        return loadAllEnabled(new HashMap());
    }

    public List<Stream> loadAllEnabled(Map<String, Object> map) {
        map.put("disabled", false);
        return loadAll(map);
    }

    @Override // org.graylog2.streams.StreamService
    public List<Stream> loadAll() {
        return loadAll(Collections.emptyMap());
    }

    public List<Stream> loadAll(Map<String, Object> map) {
        return loadAll((DBObject) new BasicDBObject(map));
    }

    private List<Stream> loadAll(DBObject dBObject) {
        List<DBObject> query = query(StreamImpl.class, dBObject);
        Map<String, List<StreamRule>> loadForStreamIds = this.streamRuleService.loadForStreamIds((List) query.stream().map(dBObject2 -> {
            return dBObject2.get("_id").toString();
        }).collect(Collectors.toList()));
        ImmutableList.Builder builder = ImmutableList.builder();
        Map<String, IndexSet> indexSetsForStreams = indexSetsForStreams(query);
        Map map = (Map) this.outputService.loadByIds((Set) query.stream().map(this::outputIdsForRawStream).flatMap(list -> {
            return list.stream().map((v0) -> {
                return v0.toHexString();
            });
        }).collect(Collectors.toSet())).stream().collect(Collectors.toMap((v0) -> {
            return v0.getId();
        }, Function.identity()));
        for (DBObject dBObject3 : query) {
            ObjectId objectId = (ObjectId) dBObject3.get("_id");
            String hexString = objectId.toHexString();
            List<StreamRule> orDefault = loadForStreamIds.getOrDefault(hexString, Collections.emptyList());
            LOG.debug("Found {} rules for stream <{}>", Integer.valueOf(orDefault.size()), hexString);
            Set set = (Set) outputIdsForRawStream(dBObject3).stream().map((v0) -> {
                return v0.toHexString();
            }).map(str -> {
                Output output = (Output) map.get(str);
                if (output == null) {
                    LOG.warn("Stream \"" + Strings.nullToEmpty((String) dBObject3.get("title")) + "\" <" + hexString + "> references missing output <" + str + "> - ignoring output.");
                }
                return output;
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toSet());
            Map map2 = dBObject3.toMap();
            builder.add(new StreamImpl(objectId, map2, orDefault, set, indexSetsForStreams.get((String) map2.get("index_set_id"))));
        }
        return builder.build();
    }

    private List<ObjectId> outputIdsForRawStream(DBObject dBObject) {
        List<ObjectId> list = (List) dBObject.get("outputs");
        return list == null ? Collections.emptyList() : list;
    }

    private Map<String, IndexSet> indexSetsForStreams(List<DBObject> list) {
        java.util.stream.Stream<IndexSetConfig> stream = this.indexSetService.findByIds((Set) list.stream().map(dBObject -> {
            return (String) dBObject.get("index_set_id");
        }).filter(str -> {
            return !Strings.isNullOrEmpty(str);
        }).collect(Collectors.toSet())).stream();
        Function function = (v0) -> {
            return v0.id();
        };
        MongoIndexSet.Factory factory = this.indexSetFactory;
        Objects.requireNonNull(factory);
        return (Map) stream.collect(Collectors.toMap(function, factory::create));
    }

    @Override // org.graylog2.streams.StreamService
    public Set<Stream> loadByIds(Collection<String> collection) {
        return ImmutableSet.copyOf(loadAll(QueryBuilder.start("_id").in((Set) collection.stream().map(ObjectId::new).collect(Collectors.toSet())).get()));
    }

    @Override // org.graylog2.streams.StreamService
    public Set<String> indexSetIdsByIds(Collection<String> collection) {
        return (Set) StreamSupport.stream(collection(StreamImpl.class).find(QueryBuilder.start("_id").in((Set) collection.stream().map(ObjectId::new).collect(Collectors.toSet())).get(), DBProjection.include(new String[]{"index_set_id"})).spliterator(), false).map(dBObject -> {
            return dBObject.get("index_set_id").toString();
        }).collect(Collectors.toSet());
    }

    protected Set<Output> loadOutputsForRawStream(DBObject dBObject) {
        List<ObjectId> outputIdsForRawStream = outputIdsForRawStream(dBObject);
        HashSet hashSet = new HashSet();
        if (outputIdsForRawStream != null) {
            for (ObjectId objectId : outputIdsForRawStream) {
                try {
                    hashSet.add(this.outputService.load(objectId.toHexString()));
                } catch (NotFoundException e) {
                    LOG.warn("Non-existing output <{}> referenced from stream <{}>!", objectId.toHexString(), dBObject.get("_id"));
                }
            }
        }
        return hashSet;
    }

    @Override // org.graylog2.streams.StreamService
    public long count() {
        return totalCount(StreamImpl.class);
    }

    @Override // org.graylog2.streams.StreamService
    public void destroy(Stream stream) throws NotFoundException {
        Iterator<StreamRule> it = this.streamRuleService.loadForStream(stream).iterator();
        while (it.hasNext()) {
            super.destroy((StreamServiceImpl) it.next());
        }
        String id = stream.getId();
        for (Notification notification : this.notificationService.all()) {
            Object detail = notification.getDetail(StreamRuleImpl.FIELD_STREAM_ID);
            if (detail != null && detail.toString().equals(id)) {
                LOG.debug("Removing notification that references stream: {}", notification);
                this.notificationService.destroy(notification);
            }
        }
        super.destroy((StreamServiceImpl) stream);
        this.clusterEventBus.post(StreamsChangedEvent.create(id));
        this.clusterEventBus.post(StreamDeletedEvent.create(id));
        this.entityOwnershipService.unregisterStream(id);
    }

    public void update(Stream stream, @Nullable String str, @Nullable String str2) throws ValidationException {
        if (str != null) {
            stream.getFields().put("title", str);
        }
        if (str2 != null) {
            stream.getFields().put("description", str2);
        }
        save(stream);
    }

    @Override // org.graylog2.streams.StreamService
    public void pause(Stream stream) throws ValidationException {
        stream.setDisabled(true);
        this.clusterEventBus.post(StreamsChangedEvent.create(save(stream)));
    }

    @Override // org.graylog2.streams.StreamService
    public void resume(Stream stream) throws ValidationException {
        stream.setDisabled(false);
        this.clusterEventBus.post(StreamsChangedEvent.create(save(stream)));
    }

    @Override // org.graylog2.streams.StreamService
    public void addOutput(Stream stream, Output output) {
        collection((StreamServiceImpl) stream).update(db("_id", new ObjectId(stream.getId())), db("$addToSet", new BasicDBObject("outputs", new ObjectId(output.getId()))));
        this.clusterEventBus.post(StreamsChangedEvent.create(stream.getId()));
    }

    @Override // org.graylog2.streams.StreamService
    public void addOutputs(ObjectId objectId, Collection<ObjectId> collection) {
        BasicDBList basicDBList = new BasicDBList();
        basicDBList.addAll(collection);
        collection(StreamImpl.class).update(db("_id", objectId), db("$addToSet", new BasicDBObject("outputs", new BasicDBObject("$each", basicDBList))));
        this.clusterEventBus.post(StreamsChangedEvent.create(objectId.toHexString()));
    }

    @Override // org.graylog2.streams.StreamService
    public void removeOutput(Stream stream, Output output) {
        collection((StreamServiceImpl) stream).update(db("_id", new ObjectId(stream.getId())), db("$pull", new BasicDBObject("outputs", new ObjectId(output.getId()))));
        this.clusterEventBus.post(StreamsChangedEvent.create(stream.getId()));
    }

    @Override // org.graylog2.streams.StreamService
    public void removeOutputFromAllStreams(Output output) {
        ObjectId objectId = new ObjectId(output.getId());
        BasicDBObject db = db("outputs", objectId);
        BasicDBObject db2 = db("$pull", db("outputs", objectId));
        DBCursor find = collection(StreamImpl.class).find(db);
        try {
            ImmutableSet immutableSet = (ImmutableSet) StreamSupport.stream(find.spliterator(), false).map(dBObject -> {
                return dBObject.get("_id");
            }).filter(Objects::nonNull).map(obj -> {
                return ((ObjectId) obj).toHexString();
            }).collect(ImmutableSet.toImmutableSet());
            if (find != null) {
                find.close();
            }
            collection(StreamImpl.class).update(db, db2, false, true);
            this.clusterEventBus.post(StreamsChangedEvent.create((ImmutableSet<String>) immutableSet));
        } catch (Throwable th) {
            if (find != null) {
                try {
                    find.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.graylog2.streams.StreamService
    public List<Stream> loadAllWithIndexSet(String str) {
        return loadAll((Map<String, Object>) db("index_set_id", str));
    }

    @Override // org.graylog2.streams.StreamService
    public void addToIndexSet(String str, Collection<String> collection) {
        if (collection(StreamImpl.class).update(QueryBuilder.start("_id").in((Set) collection.stream().map(ObjectId::new).collect(Collectors.toSet())).get(), db("$set", db("index_set_id", str)), false, true).getN() < collection.stream().distinct().count()) {
            throw new IllegalStateException("Assigning streams " + collection + " to index set <" + str + "> failed!");
        }
    }

    @Override // org.graylog2.streams.StreamService
    public String save(Stream stream) throws ValidationException {
        String save = super.save((StreamServiceImpl) stream);
        this.clusterEventBus.post(StreamsChangedEvent.create(save));
        return save;
    }

    @Override // org.graylog2.streams.StreamService
    public String saveWithRulesAndOwnership(Stream stream, Collection<StreamRule> collection, User user) throws ValidationException {
        String save = super.save((StreamServiceImpl) stream);
        this.streamRuleService.save((Set) collection.stream().map(streamRule -> {
            return this.streamRuleService.copy(save, streamRule);
        }).collect(Collectors.toSet()));
        this.entityOwnershipService.registerNewStream(save, user);
        this.clusterEventBus.post(StreamsChangedEvent.create(save));
        return save;
    }

    private BasicDBObject db(String str, Object obj) {
        return new BasicDBObject(str, obj);
    }

    private BasicDBObject db(Map<String, Object> map) {
        return new BasicDBObject(map);
    }
}
