diff options
author | Armin <me@obrown.io> | 2018-05-31 11:46:32 +0200 |
---|---|---|
committer | Armin Braun <me@obrown.io> | 2018-05-31 11:32:13 +0000 |
commit | 0aa798f350ce6504e1cf4e420c7481c65016d2f6 (patch) | |
tree | c1f2bab5d3662028a3fd88e255cc44a35492ee99 | |
parent | 0bbcf9a921f859f8d417251e9f9feedd6f31674e (diff) |
JAVAFICATION: Move QueueFactory to Java
Fixes #9693
10 files changed, 170 insertions, 81 deletions
diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index 72980161..2b47e25c 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -5,7 +5,6 @@ require "logstash/filters/base" require "logstash/inputs/base" require "logstash/outputs/base" require "logstash/instrument/collector" -require "logstash/queue_factory" require "logstash/compiler" java_import org.logstash.common.DeadLetterQueueFactory diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index b68e46b2..e34eb689 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -10,7 +10,6 @@ require "logstash/inputs/base" require "logstash/outputs/base" require "logstash/instrument/collector" require "logstash/filter_delegator" -require "logstash/queue_factory" require "logstash/compiler" java_import org.logstash.common.DeadLetterQueueFactory diff --git a/logstash-core/lib/logstash/queue_factory.rb b/logstash-core/lib/logstash/queue_factory.rb deleted file mode 100644 index f0a081ea..00000000 --- a/logstash-core/lib/logstash/queue_factory.rb +++ /dev/null @@ -1,33 +0,0 @@ -# encoding: utf-8 -require "fileutils" -require "logstash/event" - -module LogStash - class QueueFactory - def self.create(settings) - queue_type = settings.get("queue.type") - queue_page_capacity = settings.get("queue.page_capacity") - queue_max_bytes = settings.get("queue.max_bytes") - queue_max_events = settings.get("queue.max_events") - checkpoint_max_acks = settings.get("queue.checkpoint.acks") - checkpoint_max_writes = settings.get("queue.checkpoint.writes") - checkpoint_max_interval = settings.get("queue.checkpoint.interval") - - queue_path = ::File.join(settings.get("path.queue"), settings.get("pipeline.id")) - - case queue_type - when "persisted" - # persisted is the disk based acked queue - FileUtils.mkdir_p(queue_path) - LogStash::WrappedAckedQueue.new(queue_path, queue_page_capacity, queue_max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, queue_max_bytes) - when "memory" - # memory is the legacy and default setting - LogStash::WrappedSynchronousQueue.new( - settings.get("pipeline.batch.size") * settings.get("pipeline.workers") * 2 - ) - else - raise ConfigurationError, "Invalid setting `#{queue_type}` for `queue.type`, supported types are: 'memory' or 'persisted'" - end - end - end -end diff --git a/logstash-core/spec/logstash/queue_factory_spec.rb b/logstash-core/spec/logstash/queue_factory_spec.rb index 3e1adf6b..8b0193f1 100644 --- a/logstash-core/spec/logstash/queue_factory_spec.rb +++ b/logstash-core/spec/logstash/queue_factory_spec.rb @@ -1,5 +1,4 @@ # encoding: utf-8 -require "logstash/queue_factory" require "logstash/settings" require "stud/temporary" diff --git a/logstash-core/src/main/java/org/logstash/RubyUtil.java b/logstash-core/src/main/java/org/logstash/RubyUtil.java index 7394d0e7..15391e1d 100644 --- a/logstash-core/src/main/java/org/logstash/RubyUtil.java +++ b/logstash-core/src/main/java/org/logstash/RubyUtil.java @@ -8,6 +8,7 @@ import org.jruby.RubyModule; import org.jruby.anno.JRubyClass; import org.jruby.exceptions.RaiseException; import org.jruby.runtime.ObjectAllocator; +import org.logstash.ackedqueue.QueueFactoryExt; import org.logstash.ackedqueue.ext.JRubyAckedQueueExt; import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt; import org.logstash.common.AbstractDeadLetterQueueWriterExt; @@ -15,6 +16,7 @@ import org.logstash.common.BufferedTokenizerExt; import org.logstash.config.ir.compiler.FilterDelegatorExt; import org.logstash.config.ir.compiler.OutputDelegatorExt; import org.logstash.config.ir.compiler.OutputStrategyExt; +import org.logstash.execution.AbstractWrappedQueueExt; import org.logstash.execution.EventDispatcherExt; import org.logstash.execution.ExecutionContextExt; import org.logstash.execution.LogstashPipelineExt; @@ -79,6 +81,8 @@ public final class RubyUtil { public static final RubyClass ACKED_WRITE_CLIENT_CLASS; + public static final RubyClass ABSTRACT_WRAPPED_QUEUE_CLASS; + public static final RubyClass WRAPPED_SYNCHRONOUS_QUEUE_CLASS; public static final RubyClass WRAPPED_ACKED_QUEUE_CLASS; @@ -169,6 +173,8 @@ public final class RubyUtil { public static final RubyClass PIPELINE_REPORTER_SNAPSHOT_CLASS; + public static final RubyClass QUEUE_FACTORY_CLASS; + public static final RubyClass HOOKS_REGISTRY_CLASS; public static final RubyClass LOGSTASH_PIPELINE_CLASS; @@ -339,6 +345,11 @@ public final class RubyUtil { RUBY_TIMESTAMP_CLASS = setupLogstashClass( JrubyTimestampExtLibrary.RubyTimestamp::new, JrubyTimestampExtLibrary.RubyTimestamp.class ); + ABSTRACT_WRAPPED_QUEUE_CLASS = LOGSTASH_MODULE.defineClassUnder( + "AbstractWrappedQueue", RUBY.getObject(), + ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR + ); + ABSTRACT_WRAPPED_QUEUE_CLASS.defineAnnotatedMethods(AbstractWrappedQueueExt.class); WRAPPED_WRITE_CLIENT_CLASS = setupLogstashClass(JRubyWrappedWriteClientExt::new, JRubyWrappedWriteClientExt.class); QUEUE_READ_CLIENT_BASE_CLASS = @@ -351,12 +362,16 @@ public final class RubyUtil { setupLogstashClass(JrubyMemoryWriteClientExt::new, JrubyMemoryWriteClientExt.class); ACKED_WRITE_CLIENT_CLASS = setupLogstashClass(JrubyAckedWriteClientExt::new, JrubyAckedWriteClientExt.class); - WRAPPED_SYNCHRONOUS_QUEUE_CLASS = - setupLogstashClass(JrubyWrappedSynchronousQueueExt::new, - JrubyWrappedSynchronousQueueExt.class); - WRAPPED_ACKED_QUEUE_CLASS = setupLogstashClass(JRubyWrappedAckedQueueExt::new, - JRubyWrappedAckedQueueExt.class); + WRAPPED_SYNCHRONOUS_QUEUE_CLASS = setupLogstashClass( + ABSTRACT_WRAPPED_QUEUE_CLASS, JrubyWrappedSynchronousQueueExt::new, + JrubyWrappedSynchronousQueueExt.class + ); + WRAPPED_ACKED_QUEUE_CLASS = setupLogstashClass( + ABSTRACT_WRAPPED_QUEUE_CLASS, JRubyWrappedAckedQueueExt::new, + JRubyWrappedAckedQueueExt.class + ); ACKED_QUEUE_CLASS = setupLogstashClass(JRubyAckedQueueExt::new, JRubyAckedQueueExt.class); + QUEUE_FACTORY_CLASS = setupLogstashClass(QueueFactoryExt::new, QueueFactoryExt.class); RUBY_EVENT_CLASS = setupLogstashClass( JrubyEventExtLibrary.RubyEvent::new, JrubyEventExtLibrary.RubyEvent.class ); diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java new file mode 100644 index 00000000..e9517c45 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java @@ -0,0 +1,74 @@ +package org.logstash.ackedqueue; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import org.jruby.Ruby; +import org.jruby.RubyBasicObject; +import org.jruby.RubyClass; +import org.jruby.anno.JRubyClass; +import org.jruby.anno.JRubyMethod; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.RubyUtil; +import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt; +import org.logstash.execution.AbstractWrappedQueueExt; +import org.logstash.ext.JrubyWrappedSynchronousQueueExt; + +@JRubyClass(name = "QueueFactory") +public final class QueueFactoryExt extends RubyBasicObject { + + public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) { + super(runtime, metaClass); + } + + @JRubyMethod(meta = true) + public static AbstractWrappedQueueExt create(final ThreadContext context, final IRubyObject recv, + final IRubyObject settings) throws IOException { + final String type = getSetting(context, settings, "queue.type").asJavaString(); + if ("persisted".equals(type)) { + final Path queuePath = Paths.get( + getSetting(context, settings, "path.queue").asJavaString(), + getSetting(context, settings, "pipeline.id").asJavaString() + ); + Files.createDirectories(queuePath); + return new JRubyWrappedAckedQueueExt(context.runtime, RubyUtil.WRAPPED_ACKED_QUEUE_CLASS) + .initialize( + context, new IRubyObject[]{ + context.runtime.newString(queuePath.toString()), + getSetting(context, settings, "queue.page_capacity"), + getSetting(context, settings, "queue.max_events"), + getSetting(context, settings, "queue.checkpoint.writes"), + getSetting(context, settings, "queue.checkpoint.acks"), + getSetting(context, settings, "queue.checkpoint.interval"), + getSetting(context, settings, "queue.max_bytes") + } + ); + } else if ("memory".equals(type)) { + return new JrubyWrappedSynchronousQueueExt( + context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS + ).initialize( + context, context.runtime.newFixnum( + getSetting(context, settings, "pipeline.batch.size") + .convertToInteger().getIntValue() + * getSetting(context, settings, "pipeline.workers") + .convertToInteger().getIntValue() + ) + ); + } else { + throw context.runtime.newRaiseException( + RubyUtil.CONFIGURATION_ERROR_CLASS, + String.format( + "Invalid setting `%s` for `queue.type`, supported types are: 'memory' or 'persisted'", + type + ) + ); + } + } + + private static IRubyObject getSetting(final ThreadContext context, final IRubyObject settings, + final String name) { + return settings.callMethod(context, "get_value", context.runtime.newString(name)); + } +} diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java index 55c3a04c..f506da4d 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java @@ -6,25 +6,25 @@ import org.jruby.Ruby; import org.jruby.RubyBoolean; import org.jruby.RubyClass; import org.jruby.RubyFixnum; -import org.jruby.RubyObject; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.runtime.Arity; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.RubyUtil; +import org.logstash.execution.AbstractWrappedQueueExt; import org.logstash.ext.JrubyAckedReadClientExt; import org.logstash.ext.JrubyAckedWriteClientExt; import org.logstash.ext.JrubyEventExtLibrary; @JRubyClass(name = "WrappedAckedQueue") -public final class JRubyWrappedAckedQueueExt extends RubyObject { +public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt { private JRubyAckedQueueExt queue; private final AtomicBoolean isClosed = new AtomicBoolean(); - @JRubyMethod(name = "initialize", optional = 7) - public IRubyObject ruby_initialize(ThreadContext context, IRubyObject[] args) throws IOException { + @JRubyMethod(optional = 7) + public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[] args) throws IOException { args = Arity.scanArgs(context.runtime, args, 7, 0); int capacity = RubyFixnum.num2int(args[1]); int maxEvents = RubyFixnum.num2int(args[2]); @@ -36,7 +36,7 @@ public final class JRubyWrappedAckedQueueExt extends RubyObject { checkpointMaxWrites, checkpointMaxAcks, queueMaxBytes); this.queue.open(); - return context.nil; + return this; } public JRubyWrappedAckedQueueExt(final Ruby runtime, final RubyClass metaClass) { @@ -53,16 +53,6 @@ public final class JRubyWrappedAckedQueueExt extends RubyObject { isClosed.set(true); } - @JRubyMethod(name = "close") - public IRubyObject rubyClose(ThreadContext context) { - try { - close(); - } catch (IOException e) { - throw RubyUtil.newRubyIOError(context.runtime, e); - } - return context.nil; - } - @JRubyMethod(name = {"push", "<<"}) public void rubyPush(ThreadContext context, IRubyObject event) { checkIfClosed("write"); @@ -75,20 +65,29 @@ public final class JRubyWrappedAckedQueueExt extends RubyObject { return queue.ruby_read_batch(context, size, wait); } + @JRubyMethod(name = "is_empty?") + public IRubyObject rubyIsEmpty(ThreadContext context) { + return RubyBoolean.newBoolean(context.runtime, this.queue.isEmpty()); + } - @JRubyMethod(name = "write_client") - public IRubyObject rubyWriteClient(final ThreadContext context) { + @Override + protected IRubyObject getWriteClient(final ThreadContext context) { return JrubyAckedWriteClientExt.create(queue, isClosed); } - @JRubyMethod(name = "read_client") - public IRubyObject rubyReadClient(final ThreadContext context) { + @Override + protected IRubyObject getReadClient() { return JrubyAckedReadClientExt.create(queue); } - @JRubyMethod(name = "is_empty?") - public IRubyObject rubyIsEmpty(ThreadContext context) { - return RubyBoolean.newBoolean(context.runtime, this.queue.isEmpty()); + @Override + protected IRubyObject doClose(final ThreadContext context) { + try { + close(); + } catch (IOException e) { + throw RubyUtil.newRubyIOError(context.runtime, e); + } + return context.nil; } private void checkIfClosed(String action) { diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractWrappedQueueExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractWrappedQueueExt.java new file mode 100644 index 00000000..8e0dd97b --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractWrappedQueueExt.java @@ -0,0 +1,38 @@ +package org.logstash.execution; + +import org.jruby.Ruby; +import org.jruby.RubyBasicObject; +import org.jruby.RubyClass; +import org.jruby.anno.JRubyClass; +import org.jruby.anno.JRubyMethod; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; + +@JRubyClass(name = "AbstractWrappedQueue") +public abstract class AbstractWrappedQueueExt extends RubyBasicObject { + + public AbstractWrappedQueueExt(final Ruby runtime, final RubyClass metaClass) { + super(runtime, metaClass); + } + + @JRubyMethod(name = "write_client") + public final IRubyObject writeClient(final ThreadContext context) { + return getWriteClient(context); + } + + @JRubyMethod(name = "read_client") + public final IRubyObject readClient() { + return getReadClient(); + } + + @JRubyMethod + public final IRubyObject close(final ThreadContext context) { + return doClose(context); + } + + protected abstract IRubyObject doClose(ThreadContext context); + + protected abstract IRubyObject getWriteClient(ThreadContext context); + + protected abstract IRubyObject getReadClient(); +} diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java b/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java index 2dba2253..822ba8a2 100644 --- a/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java +++ b/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java @@ -1,5 +1,7 @@ package org.logstash.ext; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; import org.jruby.Ruby; import org.jruby.RubyClass; import org.jruby.anno.JRubyClass; @@ -7,14 +9,10 @@ import org.logstash.RubyUtil; import org.logstash.common.LsQueueUtils; import org.logstash.execution.MemoryReadBatch; import org.logstash.execution.QueueBatch; -import org.logstash.execution.QueueReadClient; import org.logstash.execution.QueueReadClientBase; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - @JRubyClass(name = "MemoryReadClient", parent = "QueueReadClientBase") -public final class JrubyMemoryReadClientExt extends QueueReadClientBase implements QueueReadClient { +public final class JrubyMemoryReadClientExt extends QueueReadClientBase { private BlockingQueue queue; diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java b/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java index a3c564e3..d482ac33 100644 --- a/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java +++ b/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java @@ -1,19 +1,18 @@ package org.logstash.ext; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import org.jruby.Ruby; import org.jruby.RubyClass; import org.jruby.RubyNumeric; -import org.jruby.RubyObject; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import org.logstash.execution.AbstractWrappedQueueExt; @JRubyClass(name = "WrappedSynchronousQueue") -public final class JrubyWrappedSynchronousQueueExt extends RubyObject { +public final class JrubyWrappedSynchronousQueueExt extends AbstractWrappedQueueExt { private BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue; @@ -21,27 +20,29 @@ public final class JrubyWrappedSynchronousQueueExt extends RubyObject { super(runtime, metaClass); } - @JRubyMethod(name = "initialize") + @JRubyMethod @SuppressWarnings("unchecked") - public void rubyInitialize(final ThreadContext context, IRubyObject size) { + public JrubyWrappedSynchronousQueueExt initialize(final ThreadContext context, + IRubyObject size) { int typedSize = ((RubyNumeric)size).getIntValue(); this.queue = new ArrayBlockingQueue<>(typedSize); + return this; } - @JRubyMethod(name = "write_client") - public IRubyObject getWriteClient(final ThreadContext context) { + @Override + protected IRubyObject getWriteClient(final ThreadContext context) { return JrubyMemoryWriteClientExt.create(queue); } - @JRubyMethod(name = "read_client") - public IRubyObject getReadClient(final ThreadContext context) { + @Override + protected IRubyObject getReadClient() { // batch size and timeout are currently hard-coded to 125 and 50ms as values observed // to be reasonable tradeoffs between latency and throughput per PR #8707 return JrubyMemoryReadClientExt.create(queue, 125, 50); } - @JRubyMethod(name = "close") - public IRubyObject rubyClose(final ThreadContext context) { + @Override + public IRubyObject doClose(final ThreadContext context) { // no op return this; } |