aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArmin <me@obrown.io>2018-05-31 11:46:32 +0200
committerArmin Braun <me@obrown.io>2018-05-31 11:32:13 +0000
commit0aa798f350ce6504e1cf4e420c7481c65016d2f6 (patch)
treec1f2bab5d3662028a3fd88e255cc44a35492ee99
parent0bbcf9a921f859f8d417251e9f9feedd6f31674e (diff)
JAVAFICATION: Move QueueFactory to Java
Fixes #9693
-rw-r--r--logstash-core/lib/logstash/java_pipeline.rb1
-rw-r--r--logstash-core/lib/logstash/pipeline.rb1
-rw-r--r--logstash-core/lib/logstash/queue_factory.rb33
-rw-r--r--logstash-core/spec/logstash/queue_factory_spec.rb1
-rw-r--r--logstash-core/src/main/java/org/logstash/RubyUtil.java25
-rw-r--r--logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java74
-rw-r--r--logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java43
-rw-r--r--logstash-core/src/main/java/org/logstash/execution/AbstractWrappedQueueExt.java38
-rw-r--r--logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java8
-rw-r--r--logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java27
10 files changed, 170 insertions, 81 deletions
diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb
index 72980161..2b47e25c 100644
--- a/logstash-core/lib/logstash/java_pipeline.rb
+++ b/logstash-core/lib/logstash/java_pipeline.rb
@@ -5,7 +5,6 @@ require "logstash/filters/base"
require "logstash/inputs/base"
require "logstash/outputs/base"
require "logstash/instrument/collector"
-require "logstash/queue_factory"
require "logstash/compiler"
java_import org.logstash.common.DeadLetterQueueFactory
diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb
index b68e46b2..e34eb689 100644
--- a/logstash-core/lib/logstash/pipeline.rb
+++ b/logstash-core/lib/logstash/pipeline.rb
@@ -10,7 +10,6 @@ require "logstash/inputs/base"
require "logstash/outputs/base"
require "logstash/instrument/collector"
require "logstash/filter_delegator"
-require "logstash/queue_factory"
require "logstash/compiler"
java_import org.logstash.common.DeadLetterQueueFactory
diff --git a/logstash-core/lib/logstash/queue_factory.rb b/logstash-core/lib/logstash/queue_factory.rb
deleted file mode 100644
index f0a081ea..00000000
--- a/logstash-core/lib/logstash/queue_factory.rb
+++ /dev/null
@@ -1,33 +0,0 @@
-# encoding: utf-8
-require "fileutils"
-require "logstash/event"
-
-module LogStash
- class QueueFactory
- def self.create(settings)
- queue_type = settings.get("queue.type")
- queue_page_capacity = settings.get("queue.page_capacity")
- queue_max_bytes = settings.get("queue.max_bytes")
- queue_max_events = settings.get("queue.max_events")
- checkpoint_max_acks = settings.get("queue.checkpoint.acks")
- checkpoint_max_writes = settings.get("queue.checkpoint.writes")
- checkpoint_max_interval = settings.get("queue.checkpoint.interval")
-
- queue_path = ::File.join(settings.get("path.queue"), settings.get("pipeline.id"))
-
- case queue_type
- when "persisted"
- # persisted is the disk based acked queue
- FileUtils.mkdir_p(queue_path)
- LogStash::WrappedAckedQueue.new(queue_path, queue_page_capacity, queue_max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, queue_max_bytes)
- when "memory"
- # memory is the legacy and default setting
- LogStash::WrappedSynchronousQueue.new(
- settings.get("pipeline.batch.size") * settings.get("pipeline.workers") * 2
- )
- else
- raise ConfigurationError, "Invalid setting `#{queue_type}` for `queue.type`, supported types are: 'memory' or 'persisted'"
- end
- end
- end
-end
diff --git a/logstash-core/spec/logstash/queue_factory_spec.rb b/logstash-core/spec/logstash/queue_factory_spec.rb
index 3e1adf6b..8b0193f1 100644
--- a/logstash-core/spec/logstash/queue_factory_spec.rb
+++ b/logstash-core/spec/logstash/queue_factory_spec.rb
@@ -1,5 +1,4 @@
# encoding: utf-8
-require "logstash/queue_factory"
require "logstash/settings"
require "stud/temporary"
diff --git a/logstash-core/src/main/java/org/logstash/RubyUtil.java b/logstash-core/src/main/java/org/logstash/RubyUtil.java
index 7394d0e7..15391e1d 100644
--- a/logstash-core/src/main/java/org/logstash/RubyUtil.java
+++ b/logstash-core/src/main/java/org/logstash/RubyUtil.java
@@ -8,6 +8,7 @@ import org.jruby.RubyModule;
import org.jruby.anno.JRubyClass;
import org.jruby.exceptions.RaiseException;
import org.jruby.runtime.ObjectAllocator;
+import org.logstash.ackedqueue.QueueFactoryExt;
import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
import org.logstash.common.AbstractDeadLetterQueueWriterExt;
@@ -15,6 +16,7 @@ import org.logstash.common.BufferedTokenizerExt;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.OutputDelegatorExt;
import org.logstash.config.ir.compiler.OutputStrategyExt;
+import org.logstash.execution.AbstractWrappedQueueExt;
import org.logstash.execution.EventDispatcherExt;
import org.logstash.execution.ExecutionContextExt;
import org.logstash.execution.LogstashPipelineExt;
@@ -79,6 +81,8 @@ public final class RubyUtil {
public static final RubyClass ACKED_WRITE_CLIENT_CLASS;
+ public static final RubyClass ABSTRACT_WRAPPED_QUEUE_CLASS;
+
public static final RubyClass WRAPPED_SYNCHRONOUS_QUEUE_CLASS;
public static final RubyClass WRAPPED_ACKED_QUEUE_CLASS;
@@ -169,6 +173,8 @@ public final class RubyUtil {
public static final RubyClass PIPELINE_REPORTER_SNAPSHOT_CLASS;
+ public static final RubyClass QUEUE_FACTORY_CLASS;
+
public static final RubyClass HOOKS_REGISTRY_CLASS;
public static final RubyClass LOGSTASH_PIPELINE_CLASS;
@@ -339,6 +345,11 @@ public final class RubyUtil {
RUBY_TIMESTAMP_CLASS = setupLogstashClass(
JrubyTimestampExtLibrary.RubyTimestamp::new, JrubyTimestampExtLibrary.RubyTimestamp.class
);
+ ABSTRACT_WRAPPED_QUEUE_CLASS = LOGSTASH_MODULE.defineClassUnder(
+ "AbstractWrappedQueue", RUBY.getObject(),
+ ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR
+ );
+ ABSTRACT_WRAPPED_QUEUE_CLASS.defineAnnotatedMethods(AbstractWrappedQueueExt.class);
WRAPPED_WRITE_CLIENT_CLASS =
setupLogstashClass(JRubyWrappedWriteClientExt::new, JRubyWrappedWriteClientExt.class);
QUEUE_READ_CLIENT_BASE_CLASS =
@@ -351,12 +362,16 @@ public final class RubyUtil {
setupLogstashClass(JrubyMemoryWriteClientExt::new, JrubyMemoryWriteClientExt.class);
ACKED_WRITE_CLIENT_CLASS =
setupLogstashClass(JrubyAckedWriteClientExt::new, JrubyAckedWriteClientExt.class);
- WRAPPED_SYNCHRONOUS_QUEUE_CLASS =
- setupLogstashClass(JrubyWrappedSynchronousQueueExt::new,
- JrubyWrappedSynchronousQueueExt.class);
- WRAPPED_ACKED_QUEUE_CLASS = setupLogstashClass(JRubyWrappedAckedQueueExt::new,
- JRubyWrappedAckedQueueExt.class);
+ WRAPPED_SYNCHRONOUS_QUEUE_CLASS = setupLogstashClass(
+ ABSTRACT_WRAPPED_QUEUE_CLASS, JrubyWrappedSynchronousQueueExt::new,
+ JrubyWrappedSynchronousQueueExt.class
+ );
+ WRAPPED_ACKED_QUEUE_CLASS = setupLogstashClass(
+ ABSTRACT_WRAPPED_QUEUE_CLASS, JRubyWrappedAckedQueueExt::new,
+ JRubyWrappedAckedQueueExt.class
+ );
ACKED_QUEUE_CLASS = setupLogstashClass(JRubyAckedQueueExt::new, JRubyAckedQueueExt.class);
+ QUEUE_FACTORY_CLASS = setupLogstashClass(QueueFactoryExt::new, QueueFactoryExt.class);
RUBY_EVENT_CLASS = setupLogstashClass(
JrubyEventExtLibrary.RubyEvent::new, JrubyEventExtLibrary.RubyEvent.class
);
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java
new file mode 100644
index 00000000..e9517c45
--- /dev/null
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueFactoryExt.java
@@ -0,0 +1,74 @@
+package org.logstash.ackedqueue;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.jruby.Ruby;
+import org.jruby.RubyBasicObject;
+import org.jruby.RubyClass;
+import org.jruby.anno.JRubyClass;
+import org.jruby.anno.JRubyMethod;
+import org.jruby.runtime.ThreadContext;
+import org.jruby.runtime.builtin.IRubyObject;
+import org.logstash.RubyUtil;
+import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
+import org.logstash.execution.AbstractWrappedQueueExt;
+import org.logstash.ext.JrubyWrappedSynchronousQueueExt;
+
+@JRubyClass(name = "QueueFactory")
+public final class QueueFactoryExt extends RubyBasicObject {
+
+ public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) {
+ super(runtime, metaClass);
+ }
+
+ @JRubyMethod(meta = true)
+ public static AbstractWrappedQueueExt create(final ThreadContext context, final IRubyObject recv,
+ final IRubyObject settings) throws IOException {
+ final String type = getSetting(context, settings, "queue.type").asJavaString();
+ if ("persisted".equals(type)) {
+ final Path queuePath = Paths.get(
+ getSetting(context, settings, "path.queue").asJavaString(),
+ getSetting(context, settings, "pipeline.id").asJavaString()
+ );
+ Files.createDirectories(queuePath);
+ return new JRubyWrappedAckedQueueExt(context.runtime, RubyUtil.WRAPPED_ACKED_QUEUE_CLASS)
+ .initialize(
+ context, new IRubyObject[]{
+ context.runtime.newString(queuePath.toString()),
+ getSetting(context, settings, "queue.page_capacity"),
+ getSetting(context, settings, "queue.max_events"),
+ getSetting(context, settings, "queue.checkpoint.writes"),
+ getSetting(context, settings, "queue.checkpoint.acks"),
+ getSetting(context, settings, "queue.checkpoint.interval"),
+ getSetting(context, settings, "queue.max_bytes")
+ }
+ );
+ } else if ("memory".equals(type)) {
+ return new JrubyWrappedSynchronousQueueExt(
+ context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS
+ ).initialize(
+ context, context.runtime.newFixnum(
+ getSetting(context, settings, "pipeline.batch.size")
+ .convertToInteger().getIntValue()
+ * getSetting(context, settings, "pipeline.workers")
+ .convertToInteger().getIntValue()
+ )
+ );
+ } else {
+ throw context.runtime.newRaiseException(
+ RubyUtil.CONFIGURATION_ERROR_CLASS,
+ String.format(
+ "Invalid setting `%s` for `queue.type`, supported types are: 'memory' or 'persisted'",
+ type
+ )
+ );
+ }
+ }
+
+ private static IRubyObject getSetting(final ThreadContext context, final IRubyObject settings,
+ final String name) {
+ return settings.callMethod(context, "get_value", context.runtime.newString(name));
+ }
+}
diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java
index 55c3a04c..f506da4d 100644
--- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java
+++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java
@@ -6,25 +6,25 @@ import org.jruby.Ruby;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
-import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Arity;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
+import org.logstash.execution.AbstractWrappedQueueExt;
import org.logstash.ext.JrubyAckedReadClientExt;
import org.logstash.ext.JrubyAckedWriteClientExt;
import org.logstash.ext.JrubyEventExtLibrary;
@JRubyClass(name = "WrappedAckedQueue")
-public final class JRubyWrappedAckedQueueExt extends RubyObject {
+public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt {
private JRubyAckedQueueExt queue;
private final AtomicBoolean isClosed = new AtomicBoolean();
- @JRubyMethod(name = "initialize", optional = 7)
- public IRubyObject ruby_initialize(ThreadContext context, IRubyObject[] args) throws IOException {
+ @JRubyMethod(optional = 7)
+ public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[] args) throws IOException {
args = Arity.scanArgs(context.runtime, args, 7, 0);
int capacity = RubyFixnum.num2int(args[1]);
int maxEvents = RubyFixnum.num2int(args[2]);
@@ -36,7 +36,7 @@ public final class JRubyWrappedAckedQueueExt extends RubyObject {
checkpointMaxWrites, checkpointMaxAcks, queueMaxBytes);
this.queue.open();
- return context.nil;
+ return this;
}
public JRubyWrappedAckedQueueExt(final Ruby runtime, final RubyClass metaClass) {
@@ -53,16 +53,6 @@ public final class JRubyWrappedAckedQueueExt extends RubyObject {
isClosed.set(true);
}
- @JRubyMethod(name = "close")
- public IRubyObject rubyClose(ThreadContext context) {
- try {
- close();
- } catch (IOException e) {
- throw RubyUtil.newRubyIOError(context.runtime, e);
- }
- return context.nil;
- }
-
@JRubyMethod(name = {"push", "<<"})
public void rubyPush(ThreadContext context, IRubyObject event) {
checkIfClosed("write");
@@ -75,20 +65,29 @@ public final class JRubyWrappedAckedQueueExt extends RubyObject {
return queue.ruby_read_batch(context, size, wait);
}
+ @JRubyMethod(name = "is_empty?")
+ public IRubyObject rubyIsEmpty(ThreadContext context) {
+ return RubyBoolean.newBoolean(context.runtime, this.queue.isEmpty());
+ }
- @JRubyMethod(name = "write_client")
- public IRubyObject rubyWriteClient(final ThreadContext context) {
+ @Override
+ protected IRubyObject getWriteClient(final ThreadContext context) {
return JrubyAckedWriteClientExt.create(queue, isClosed);
}
- @JRubyMethod(name = "read_client")
- public IRubyObject rubyReadClient(final ThreadContext context) {
+ @Override
+ protected IRubyObject getReadClient() {
return JrubyAckedReadClientExt.create(queue);
}
- @JRubyMethod(name = "is_empty?")
- public IRubyObject rubyIsEmpty(ThreadContext context) {
- return RubyBoolean.newBoolean(context.runtime, this.queue.isEmpty());
+ @Override
+ protected IRubyObject doClose(final ThreadContext context) {
+ try {
+ close();
+ } catch (IOException e) {
+ throw RubyUtil.newRubyIOError(context.runtime, e);
+ }
+ return context.nil;
}
private void checkIfClosed(String action) {
diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractWrappedQueueExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractWrappedQueueExt.java
new file mode 100644
index 00000000..8e0dd97b
--- /dev/null
+++ b/logstash-core/src/main/java/org/logstash/execution/AbstractWrappedQueueExt.java
@@ -0,0 +1,38 @@
+package org.logstash.execution;
+
+import org.jruby.Ruby;
+import org.jruby.RubyBasicObject;
+import org.jruby.RubyClass;
+import org.jruby.anno.JRubyClass;
+import org.jruby.anno.JRubyMethod;
+import org.jruby.runtime.ThreadContext;
+import org.jruby.runtime.builtin.IRubyObject;
+
+@JRubyClass(name = "AbstractWrappedQueue")
+public abstract class AbstractWrappedQueueExt extends RubyBasicObject {
+
+ public AbstractWrappedQueueExt(final Ruby runtime, final RubyClass metaClass) {
+ super(runtime, metaClass);
+ }
+
+ @JRubyMethod(name = "write_client")
+ public final IRubyObject writeClient(final ThreadContext context) {
+ return getWriteClient(context);
+ }
+
+ @JRubyMethod(name = "read_client")
+ public final IRubyObject readClient() {
+ return getReadClient();
+ }
+
+ @JRubyMethod
+ public final IRubyObject close(final ThreadContext context) {
+ return doClose(context);
+ }
+
+ protected abstract IRubyObject doClose(ThreadContext context);
+
+ protected abstract IRubyObject getWriteClient(ThreadContext context);
+
+ protected abstract IRubyObject getReadClient();
+}
diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java b/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java
index 2dba2253..822ba8a2 100644
--- a/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java
+++ b/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryReadClientExt.java
@@ -1,5 +1,7 @@
package org.logstash.ext;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.anno.JRubyClass;
@@ -7,14 +9,10 @@ import org.logstash.RubyUtil;
import org.logstash.common.LsQueueUtils;
import org.logstash.execution.MemoryReadBatch;
import org.logstash.execution.QueueBatch;
-import org.logstash.execution.QueueReadClient;
import org.logstash.execution.QueueReadClientBase;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
@JRubyClass(name = "MemoryReadClient", parent = "QueueReadClientBase")
-public final class JrubyMemoryReadClientExt extends QueueReadClientBase implements QueueReadClient {
+public final class JrubyMemoryReadClientExt extends QueueReadClientBase {
private BlockingQueue queue;
diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java b/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java
index a3c564e3..d482ac33 100644
--- a/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java
+++ b/logstash-core/src/main/java/org/logstash/ext/JrubyWrappedSynchronousQueueExt.java
@@ -1,19 +1,18 @@
package org.logstash.ext;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyNumeric;
-import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import org.logstash.execution.AbstractWrappedQueueExt;
@JRubyClass(name = "WrappedSynchronousQueue")
-public final class JrubyWrappedSynchronousQueueExt extends RubyObject {
+public final class JrubyWrappedSynchronousQueueExt extends AbstractWrappedQueueExt {
private BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue;
@@ -21,27 +20,29 @@ public final class JrubyWrappedSynchronousQueueExt extends RubyObject {
super(runtime, metaClass);
}
- @JRubyMethod(name = "initialize")
+ @JRubyMethod
@SuppressWarnings("unchecked")
- public void rubyInitialize(final ThreadContext context, IRubyObject size) {
+ public JrubyWrappedSynchronousQueueExt initialize(final ThreadContext context,
+ IRubyObject size) {
int typedSize = ((RubyNumeric)size).getIntValue();
this.queue = new ArrayBlockingQueue<>(typedSize);
+ return this;
}
- @JRubyMethod(name = "write_client")
- public IRubyObject getWriteClient(final ThreadContext context) {
+ @Override
+ protected IRubyObject getWriteClient(final ThreadContext context) {
return JrubyMemoryWriteClientExt.create(queue);
}
- @JRubyMethod(name = "read_client")
- public IRubyObject getReadClient(final ThreadContext context) {
+ @Override
+ protected IRubyObject getReadClient() {
// batch size and timeout are currently hard-coded to 125 and 50ms as values observed
// to be reasonable tradeoffs between latency and throughput per PR #8707
return JrubyMemoryReadClientExt.create(queue, 125, 50);
}
- @JRubyMethod(name = "close")
- public IRubyObject rubyClose(final ThreadContext context) {
+ @Override
+ public IRubyObject doClose(final ThreadContext context) {
// no op
return this;
}