diff --git a/samoa-api/pom.xml b/samoa-api/pom.xml
index 272b1b9c..d0219036 100644
--- a/samoa-api/pom.xml
+++ b/samoa-api/pom.xml
@@ -101,6 +101,11 @@
${hadoop.version}
test
+
+ org.apache.kafka
+ kafka_2.10
+ ${kafka.version}
+
diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaReader.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaReader.java
new file mode 100644
index 00000000..a6ac157d
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaReader.java
@@ -0,0 +1,265 @@
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.ErrorMapping;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.*;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class KafkaReader {
+
+ protected long readOffset;
+
+ private List m_replicaBrokers = new ArrayList();
+ private static final int kafkaConnectionTimeOut = 100000;
+ private static final int kafkaBufferSize = 64 * 1024;
+ private static final int fetchSizeBytes = 100000;
+ private static final int readErrorThreshold = 5;
+ private static final int numLeaderCheckAttempts = 3;
+ private static final int delayLeaderCheckAttempts = 100;
+ private static final String leaderClientID = "leaderLookup";
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(KafkaReader.class);
+
+ public KafkaReader() {
+ m_replicaBrokers = new ArrayList();
+ readOffset = 0L;
+ }
+
+ public ArrayList run(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) {
+ String message = "";
+ ArrayList returnInstances = new ArrayList();
+ PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
+ if (metadata == null) {
+ throw new IllegalArgumentException("Can't find metadata for Topic and Partition. Exiting");
+ }
+ if (metadata.leader() == null) {
+ throw new IllegalArgumentException("Can't find Leader for Topic and Partition. Exiting");
+ }
+ String leadBroker = metadata.leader().host();
+ String clientName = "Client_" + a_topic + "_" + a_partition;
+
+ SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, kafkaConnectionTimeOut, kafkaBufferSize, clientName);
+ int numErrors = 0;
+ while (a_maxReads > 0) {
+ if (consumer == null) {
+ consumer = new SimpleConsumer(leadBroker, a_port, kafkaConnectionTimeOut, kafkaBufferSize, clientName);
+ }
+ // reading data
+ FetchRequest req = new FetchRequestBuilder()
+ .clientId(clientName)
+ .addFetch(a_topic, a_partition, readOffset, fetchSizeBytes)
+ .build();
+ FetchResponse fetchResponse = null;
+
+ try {
+ fetchResponse = consumer.fetch(req);
+ } catch (Exception e) {
+ logger.error("ERROR occoured during fetching data from Kafka");
+ }
+
+ /**
+ * SimpleConsumer does not handle lead broker failures, you have to handle it
+ * once the fetch returns an error, we log the reason, close the consumer then try to figure
+ * out who the new leader is
+ */
+ if (fetchResponse.hasError()) {
+ numErrors++;
+ // Something went wrong!
+ short code = fetchResponse.errorCode(a_topic, a_partition);
+ logger.error("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
+ if (numErrors > readErrorThreshold) break;
+ if (code == ErrorMapping.OffsetOutOfRangeCode()) {
+ // We asked for an invalid offset. For simple case ask for the last element to reset
+ continue;
+ }
+ consumer.close();
+ consumer = null;
+ try {
+ leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ continue;
+ }
+
+ // Reading data cont.
+ numErrors = 0;
+ Iterator it = (Iterator) fetchResponse.messageSet(a_topic, a_partition).iterator();
+ MessageAndOffset messageAndOffset = null;
+
+ try {
+ messageAndOffset = (MessageAndOffset) it.next();
+ } catch (Exception e) {
+ logger.error("No more messages to read from Kafka.");
+ return null;
+ }
+
+ long currentOffset = messageAndOffset.offset();
+ if (currentOffset < readOffset) {
+ logger.error("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
+ continue;
+ }
+ readOffset = messageAndOffset.nextOffset();
+ ByteBuffer payload = messageAndOffset.message().payload();
+
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ try {
+ message = String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8");
+ returnInstances.add(message);
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ a_maxReads--;
+ }
+ if (consumer != null) consumer.close();
+ return returnInstances;
+ }
+
+ /**
+ * Defines where to start reading data from
+ * Helpers Available:
+ * kafka.api.OffsetRequest.EarliestTime() => finds the beginning of the data in the logs and starts streaming
+ * from there
+ * kafka.api.OffsetRequest.LatestTime() => will only stream new messages
+ *
+ * @param consumer
+ * @param topic
+ * @param partition
+ * @param whichTime
+ * @param clientName
+ * @return
+ */
+ public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
+ TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
+ Map requestInfo = new HashMap();
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+ kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
+ requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+ OffsetResponse response = consumer.getOffsetsBefore(request);
+
+ if (response.hasError()) {
+ logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
+ return 0;
+ }
+ long[] offsets = response.offsets(topic, partition);
+ return offsets[0];
+ }
+
+ /**
+ * Uses the findLeader() logic we defined to find the new leader, except here we only try to connect to one of the
+ * replicas for the topic/partition. This way if we can’t reach any of the Brokers with the data we are interested
+ * in we give up and exit hard.
+ *
+ * @param a_oldLeader
+ * @param a_topic
+ * @param a_partition
+ * @param a_port
+ * @return
+ * @throws Exception
+ */
+ private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
+ for (int i = 0; i < numLeaderCheckAttempts; i++) {
+ boolean goToSleep = false;
+ PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
+ if (metadata == null) {
+ goToSleep = true;
+ } else if (metadata.leader() == null) {
+ goToSleep = true;
+ } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
+ // first time through if the leader hasn't changed give ZooKeeper a second to recover
+ // second time, assume the broker did recover before failover, or it was a non-Broker issue
+ goToSleep = true;
+ } else {
+ return metadata.leader().host();
+ }
+ if (goToSleep) {
+ try {
+ Thread.sleep(delayLeaderCheckAttempts);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+ logger.error("Unable to find new leader after Broker failure. Exiting");
+ throw new Exception("Unable to find new leader after Broker failure. Exiting");
+ }
+
+ /**
+ * Query a live broker to find out leader information and replica information for a given topic and partition
+ *
+ * @param a_seedBrokers
+ * @param a_port
+ * @param a_topic
+ * @param a_partition
+ * @return
+ */
+ private PartitionMetadata findLeader(List a_seedBrokers, int a_port, String a_topic, int a_partition) {
+ PartitionMetadata returnMetaData = null;
+ for (String seed : a_seedBrokers) {
+ SimpleConsumer consumer = null;
+ try {
+ consumer = new SimpleConsumer(seed, a_port, kafkaConnectionTimeOut, kafkaBufferSize, leaderClientID);
+ List topics = new ArrayList();
+ topics.add(a_topic);
+ TopicMetadataRequest req = new TopicMetadataRequest(topics);
+ kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
+
+ //call to topicsMetadata() asks the Broker you are connected to for all the details about the topic we are interested in
+ List metaData = resp.topicsMetadata();
+ //loop on partitionsMetadata iterates through all the partitions until we find the one we want.
+ for (TopicMetadata item : metaData) {
+ for (PartitionMetadata part : item.partitionsMetadata()) {
+ if (part.partitionId() == a_partition) {
+ returnMetaData = part;
+ break;
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
+ + ", " + a_partition + "] Reason: " + e);
+ } finally {
+ if (consumer != null) consumer.close();
+ }
+ }
+ // add replica broker info to m_replicaBrokers
+ if (returnMetaData != null) {
+ m_replicaBrokers.clear();
+ for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
+ m_replicaBrokers.add(replica.host());
+ }
+ }
+ return returnMetaData;
+ }
+
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaStream.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaStream.java
new file mode 100644
index 00000000..ae4dcde3
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaStream.java
@@ -0,0 +1,200 @@
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.StringOption;
+import org.apache.samoa.instances.Attribute;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.moa.core.Example;
+import org.apache.samoa.moa.core.FastVector;
+import org.apache.samoa.moa.core.InstanceExample;
+import org.apache.samoa.moa.core.ObjectRepository;
+import org.apache.samoa.moa.options.AbstractOptionHandler;
+import org.apache.samoa.moa.streams.InstanceStream;
+import org.apache.samoa.moa.tasks.TaskMonitor;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+public class KafkaStream extends AbstractOptionHandler implements
+ InstanceStream {
+
+ private static final long serialVersionUID = 1L;
+
+ protected InstancesHeader streamHeader;
+
+ protected Instances instances;
+
+ private KafkaReader reader;
+
+ private KafkaToInstanceMapper mapper;
+
+ protected InstanceExample lastInstanceRead;
+
+ List seeds = new ArrayList();
+
+ // This is used to buffer messages read from kafka. It helps reducing number of queries to kafka
+ protected Queue instanceQueue;
+
+ public IntOption classIndexOption = new IntOption("classIndex", 'c',
+ "Class index of data. 0 for none or -1 for last attribute in file.",
+ -1, -1, Integer.MAX_VALUE);
+
+ public IntOption numAttrOption = new IntOption("numNumerics", 'u',
+ "The number of numeric attributes in" +
+ " dataset", 300, 0, Integer.MAX_VALUE);
+
+ public StringOption topicOption = new StringOption("topic", 't',
+ "Topic in the kafka to be used for reading data", "test");
+
+ public IntOption numMaxreadOption = new IntOption("numMaxread", 'r',
+ "Number of instances to be read in single read from kafka", 1, 0,
+ Integer.MAX_VALUE);
+
+ public IntOption partitionOption = new IntOption("partition", 'n',
+ "Partition number to be used for reading data", 0);
+
+ public IntOption portOption = new IntOption("port", 'p',
+ "Port in kafka to read data", 9092);
+
+ public StringOption seedOption = new StringOption("seed", 's',
+ "Seeds for kafka", "localhost");
+
+ public IntOption numClassesOption = new IntOption("numClasses", 'k',
+ "The number of classes in the data.", 2, 2, Integer.MAX_VALUE);
+
+ public IntOption timeDelayOption = new IntOption("timeDelay", 'y',
+ "Time delay in milliseconds between two read from kafka", 0, 0, Integer.MAX_VALUE);
+
+ public IntOption instanceType = new IntOption("instanceType", 'i',
+ "Type of instance to be used. DenseInstance(0)/SparaseInstance(1)", 0);
+
+ public StringOption keyValueSeparator = new StringOption("keyValueSeparator", 'a',
+ "Separator between key and value for string read from kafka", ":");
+
+ public StringOption valuesSeparator = new StringOption("valuesSeparator", 'b',
+ "Separator between values for string read from kafka", ",");
+
+ public void KafkaReader() {
+ reader = new KafkaReader();
+ }
+
+ @Override
+ protected void prepareForUseImpl(TaskMonitor monitor,
+ ObjectRepository repository) {
+ this.reader = new KafkaReader();
+ this.mapper = new KafkaToInstanceMapper();
+ generateHeader();
+ instanceQueue = new LinkedList();
+ seeds.add(this.seedOption.getValue());
+ }
+
+
+ protected void generateHeader() {
+ FastVector attributes = new FastVector<>();
+
+ for (int i = 0; i < this.numAttrOption.getValue(); i++) {
+ attributes.addElement(new Attribute("numeric" + (i + 1)));
+ }
+ FastVector classLabels = new FastVector<>();
+ for (int i = 0; i < this.numClassesOption.getValue(); i++) {
+ classLabels.addElement("class" + (i + 1));
+ }
+
+ attributes.addElement(new Attribute("class", classLabels));
+ this.streamHeader = new InstancesHeader(new Instances(
+ getCLICreationString(InstanceStream.class), attributes, 0));
+
+ if (this.classIndexOption.getValue() < 0) {
+ this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1);
+ } else if (this.classIndexOption.getValue() > 0) {
+ this.streamHeader.setClassIndex(this.classIndexOption.getValue() - 1);
+ }
+ }
+
+ @Override
+ public InstancesHeader getHeader() {
+ return this.streamHeader;
+ }
+
+ @Override
+ public long estimatedRemainingInstances() {
+ return -1;
+ }
+
+ private String getNextInstanceFromKafka() {
+ if (!instanceQueue.isEmpty()) {
+ return instanceQueue.remove();
+ }
+
+ ArrayList kafkaData;
+ do {
+ kafkaData = this.reader.run(this.numMaxreadOption.getValue(),
+ this.topicOption.getValue(), this.partitionOption.getValue(),
+ this.seeds, this.portOption.getValue());
+ } while (kafkaData == null);
+
+ instanceQueue.addAll(kafkaData);
+ return instanceQueue.remove();
+ }
+
+ @Override
+ public Example nextInstance() {
+ InstancesHeader header = getHeader();
+ Instance inst;
+ String kafkaString = getNextInstanceFromKafka();
+ inst = mapper.getInstance(kafkaString, keyValueSeparator.getValue(), valuesSeparator.getValue(),
+ instanceType.getValue(), numAttrOption.getValue(), header);
+
+ try {
+ Thread.sleep(timeDelayOption.getValue());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return new InstanceExample(inst);
+ }
+
+ @Override
+ public boolean isRestartable() {
+ // TODO Auto-generated method stub
+ return true;
+ }
+
+ @Override
+ public void restart() {
+ this.reader = new KafkaReader();
+ }
+
+ @Override
+ public boolean hasMoreInstances() {
+ return true;
+ }
+
+ @Override
+ public void getDescription(StringBuilder sb, int indent) {
+ // TODO Auto-generated method stub
+ }
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaToInstanceMapper.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaToInstanceMapper.java
new file mode 100644
index 00000000..23b57491
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaToInstanceMapper.java
@@ -0,0 +1,57 @@
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.instances.DenseInstance;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.instances.SparseInstance;
+
+public class KafkaToInstanceMapper {
+
+ public Instance getInstance(String message, String keyValueSeparator, String valuesSeparator, int instanceType, int numAttr, InstancesHeader header) {
+ Instance inst;
+ if (!message.isEmpty()) {
+ String[] KeyValueString = message.split(keyValueSeparator);
+ String[] attributes = KeyValueString[1].split(valuesSeparator);
+ inst = createInstance(header, instanceType);
+ for (int i = 0; i < attributes.length - 1; i++) {
+ if (i < numAttr) {
+ inst.setValue(i, Double.parseDouble(attributes[i]));
+ }
+ }
+ inst.setDataset(header);
+ inst.setClassValue(Double
+ .parseDouble(attributes[attributes.length - 1]));
+ return inst;
+ }
+ throw new IllegalArgumentException("Empty string value from Kafka");
+ }
+
+ public Instance createInstance(InstancesHeader header, int instanceType) {
+ Instance inst;
+ if (instanceType == 0)
+ inst = new DenseInstance(header.numAttributes());
+ else
+ inst = new SparseInstance(header.numAttributes());
+ return inst;
+ }
+}