package casa.dodwan.pubsub;

import casa.dodwan.cache.CacheService;
import casa.dodwan.config.DodwanEnvironment;
import casa.dodwan.docware.Descriptor;
import casa.dodwan.message.Message;
import casa.dodwan.message.MessageReceiver;
import casa.dodwan.message.MessageSAR;
import casa.dodwan.util.Console;
import casa.dodwan.util.Dispatcher;
import casa.dodwan.util.Logger;
import casa.dodwan.util.Processor;
import casa.dodwan.util.Service;
import casa.dodwan.util.Sink;
import casa.dodwan.util.Time;
import java.io.File;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/* loaded from: input_file:casa/dodwan/pubsub/PubSubService.class */
public class PubSubService extends Service implements Sink<Message> {
    private CacheService cacheService_;
    private Dispatcher<Message> loop_;
    private Dispatcher<Message> receivers_;
    private ReceptionHistory history_;
    private Console console_ = null;
    private Map<String, MessageReceiver> map_ = new HashMap();
    private PubSubEventDispatcher eventDispatcher_ = new PubSubEventDispatcher();

    public PubSubService(CacheService cacheService, File file) throws Exception {
        this.loop_ = new Dispatcher<>();
        this.receivers_ = new Dispatcher<>();
        this.history_ = null;
        super.suspend();
        this.cacheService_ = cacheService;
        this.receivers_ = new Dispatcher<>();
        this.loop_ = new Dispatcher<>();
        this.loop_.addSink(this.receivers_);
        this.loop_.addSink(this.cacheService_.archiver);
        if (file != null) {
            if (!file.exists()) {
                file.mkdirs();
            }
            this.history_ = new ReceptionHistory(new File(file, "history"));
            addListener(this.history_);
        }
    }

    public void addListener(PubSubListener pubSubListener) {
        this.eventDispatcher_.addListener(pubSubListener);
    }

    public void removeListener(PubSubListener pubSubListener) {
        this.eventDispatcher_.removeListener(pubSubListener);
    }

    public void addSubscription(String str, Descriptor descriptor, Processor<Message> processor) {
        if (this.map_.containsKey(str)) {
            System.out.println("The key " + str + ", is already registered.");
            return;
        }
        if (this.verbosity.isEnabled()) {
            System.out.println("PubSubService.addSubscriber(" + str + ", " + descriptor + ", " + processor + ")");
        }
        try {
            this.cacheService_.environment.addToLocalProfile(descriptor);
            if (processor != null) {
                MessageReceiver messageReceiver = new MessageReceiver(descriptor, processor, this.eventDispatcher_);
                this.map_.put(str, messageReceiver);
                this.receivers_.addSink(messageReceiver);
                for (String str2 : this.cacheService_.cache.getKeys(descriptor)) {
                    Descriptor descriptor2 = this.cacheService_.cache.getDescriptor(str2);
                    if (descriptor2 != null) {
                        if (!this.history_.contains(descriptor2.getOriginalDocumentId())) {
                            messageReceiver.write(new Message(descriptor2, this.cacheService_.cache.getPayload(str2).getBytes()));
                        }
                    }
                }
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }

    public void removeSubscription(String str) {
        if (this.verbosity.isEnabled()) {
            System.out.println("PubSubService.removeSubscription(" + str + ")");
        }
        MessageReceiver messageReceiver = this.map_.get(str);
        if (messageReceiver == null) {
            return;
        }
        this.cacheService_.environment.removeFromLocalProfile(messageReceiver.getPattern());
        this.receivers_.removeSink(messageReceiver);
        this.map_.remove(str);
    }

    public void publish(Message message) throws Exception {
        if (this.verbosity.isEnabled()) {
            System.out.println("PubSubService.publish()");
        }
        if (isSuspended()) {
            if (this.verbosity.isEnabled()) {
                System.out.println("  This service is suspended. Aborting...");
                return;
            }
            return;
        }
        int i = DodwanEnvironment.getInstance().max_payload_size;
        if (i > 0 && message.getPayload().length > i) {
            System.out.println("PubSubService.publish(msg): discarding message because its payload is too long (" + message.getPayload().length + " > max=" + i + ")");
            return;
        }
        Descriptor descriptor = message.getDescriptor();
        long lifetime = descriptor.getLifetime();
        if (lifetime != -1) {
            if (descriptor.getDeadline() == null) {
                descriptor.setDeadline(new Date(Time.currentTimeMillis() + lifetime));
            }
            descriptor.removeAttribute("_ltime");
        }
        Logger.log("pubsub", "> publish(id=" + message.getDescriptor().getDocumentId() + ")");
        try {
            MessageSAR.fragment(message, DodwanEnvironment.getInstance().fragmentation_threshold, this.loop_);
            this.eventDispatcher_.publish(message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override // casa.dodwan.util.Service
    public Console console() {
        if (this.console_ == null) {
            this.console_ = new PubSubServiceConsole(this);
        }
        return this.console_;
    }

    public Set<String> getKeys() {
        return this.map_.keySet();
    }

    public Set<String> getKeys(Descriptor descriptor) {
        HashSet hashSet = new HashSet();
        for (String str : this.map_.keySet()) {
            if (getReceiver(str).getPattern().matches(descriptor)) {
                hashSet.add(str);
            }
        }
        return hashSet;
    }

    public MessageReceiver getReceiver(String str) {
        return this.map_.get(str);
    }

    public Collection<MessageReceiver> getReceivers() {
        return this.map_.values();
    }

    public Collection<MessageReceiver> getReceivers(Descriptor descriptor) {
        HashSet hashSet = new HashSet();
        for (MessageReceiver messageReceiver : this.map_.values()) {
            if (messageReceiver.getPattern().matches(descriptor)) {
                hashSet.add(messageReceiver);
            }
        }
        return hashSet;
    }

    @Override // casa.dodwan.util.Suspendable
    public void resume() {
        if (this.verbosity.isEnabled()) {
            System.out.println("PubSubService.resume()");
        }
        if (isEnabled()) {
            super.resume();
        } else if (this.verbosity.isEnabled()) {
            System.out.println("  Failure: this service is disabled");
        }
    }

    public void setVerbosity(int i) {
        switch (i) {
            case 0:
                this.verbosity.disable();
                break;
            default:
                this.verbosity.enable();
                break;
        }
        System.out.println("PubSubService.setVerbosity(" + i + ")");
    }

    @Override // casa.dodwan.util.Suspendable
    public void suspend() {
        if (this.verbosity.isEnabled()) {
            System.out.println("PubSubService.suspend()");
        }
        if (isEnabled()) {
            super.suspend();
        } else if (this.verbosity.isEnabled()) {
            System.out.println("  Failure: this service is disabled");
        }
    }

    public String toString() {
        return "PubSubService\n nbProcessors: " + String.valueOf(this.map_.size()) + "\n isEnabled;" + String.valueOf(isEnabled()) + "\n isSuspended: " + String.valueOf(isSuspended());
    }

    @Override // casa.dodwan.util.Sink
    public void write(Message message) throws Exception {
        String originalDocumentId;
        Descriptor descriptor = message.getDescriptor();
        if (this.verbosity.isEnabled()) {
            System.out.println("PubSubService.write('" + descriptor.getKey() + "')");
        }
        if (isSuspended()) {
            if (this.verbosity.isEnabled()) {
                System.out.println("  This service is suspended. Aborting...");
                return;
            }
            return;
        }
        String serviceType = descriptor.getServiceType();
        if ((serviceType == null || !serviceType.startsWith("service/subscription")) && (originalDocumentId = descriptor.getOriginalDocumentId()) != null) {
            if (this.history_ == null || !this.history_.contains(originalDocumentId)) {
                this.receivers_.write(message);
            } else if (this.verbosity.isEnabled()) {
                System.out.println("PubSubService.write(): ignoring document " + originalDocumentId + " that has already been received in the past");
            }
        }
    }
}
