diff --git a/samoa-api/pom.xml b/samoa-api/pom.xml index 272b1b9c..d0219036 100644 --- a/samoa-api/pom.xml +++ b/samoa-api/pom.xml @@ -101,6 +101,11 @@ ${hadoop.version} test + + org.apache.kafka + kafka_2.10 + ${kafka.version} + diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaReader.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaReader.java new file mode 100644 index 00000000..a6ac157d --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaReader.java @@ -0,0 +1,265 @@ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import kafka.api.FetchRequest; +import kafka.api.FetchRequestBuilder; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.ErrorMapping; +import kafka.common.TopicAndPartition; +import kafka.javaapi.*; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.message.MessageAndOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.*; + +public class KafkaReader { + + protected long readOffset; + + private List m_replicaBrokers = new ArrayList(); + private static final int kafkaConnectionTimeOut = 100000; + private static final int kafkaBufferSize = 64 * 1024; + private static final int fetchSizeBytes = 100000; + private static final int readErrorThreshold = 5; + private static final int numLeaderCheckAttempts = 3; + private static final int delayLeaderCheckAttempts = 100; + private static final String leaderClientID = "leaderLookup"; + + private static final Logger logger = LoggerFactory + .getLogger(KafkaReader.class); + + public KafkaReader() { + m_replicaBrokers = new ArrayList(); + readOffset = 0L; + } + + public ArrayList run(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) { + String message = ""; + ArrayList returnInstances = new ArrayList(); + PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); + if (metadata == null) { + throw new IllegalArgumentException("Can't find metadata for Topic and Partition. Exiting"); + } + if (metadata.leader() == null) { + throw new IllegalArgumentException("Can't find Leader for Topic and Partition. Exiting"); + } + String leadBroker = metadata.leader().host(); + String clientName = "Client_" + a_topic + "_" + a_partition; + + SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, kafkaConnectionTimeOut, kafkaBufferSize, clientName); + int numErrors = 0; + while (a_maxReads > 0) { + if (consumer == null) { + consumer = new SimpleConsumer(leadBroker, a_port, kafkaConnectionTimeOut, kafkaBufferSize, clientName); + } + // reading data + FetchRequest req = new FetchRequestBuilder() + .clientId(clientName) + .addFetch(a_topic, a_partition, readOffset, fetchSizeBytes) + .build(); + FetchResponse fetchResponse = null; + + try { + fetchResponse = consumer.fetch(req); + } catch (Exception e) { + logger.error("ERROR occoured during fetching data from Kafka"); + } + + /** + * SimpleConsumer does not handle lead broker failures, you have to handle it + * once the fetch returns an error, we log the reason, close the consumer then try to figure + * out who the new leader is + */ + if (fetchResponse.hasError()) { + numErrors++; + // Something went wrong! + short code = fetchResponse.errorCode(a_topic, a_partition); + logger.error("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); + if (numErrors > readErrorThreshold) break; + if (code == ErrorMapping.OffsetOutOfRangeCode()) { + // We asked for an invalid offset. For simple case ask for the last element to reset + continue; + } + consumer.close(); + consumer = null; + try { + leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); + } catch (Exception e) { + e.printStackTrace(); + } + continue; + } + + // Reading data cont. + numErrors = 0; + Iterator it = (Iterator) fetchResponse.messageSet(a_topic, a_partition).iterator(); + MessageAndOffset messageAndOffset = null; + + try { + messageAndOffset = (MessageAndOffset) it.next(); + } catch (Exception e) { + logger.error("No more messages to read from Kafka."); + return null; + } + + long currentOffset = messageAndOffset.offset(); + if (currentOffset < readOffset) { + logger.error("Found an old offset: " + currentOffset + " Expecting: " + readOffset); + continue; + } + readOffset = messageAndOffset.nextOffset(); + ByteBuffer payload = messageAndOffset.message().payload(); + + byte[] bytes = new byte[payload.limit()]; + payload.get(bytes); + try { + message = String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"); + returnInstances.add(message); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + a_maxReads--; + } + if (consumer != null) consumer.close(); + return returnInstances; + } + + /** + * Defines where to start reading data from + * Helpers Available: + * kafka.api.OffsetRequest.EarliestTime() => finds the beginning of the data in the logs and starts streaming + * from there + * kafka.api.OffsetRequest.LatestTime() => will only stream new messages + * + * @param consumer + * @param topic + * @param partition + * @param whichTime + * @param clientName + * @return + */ + public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { + TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); + Map requestInfo = new HashMap(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); + kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( + requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); + OffsetResponse response = consumer.getOffsetsBefore(request); + + if (response.hasError()) { + logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); + return 0; + } + long[] offsets = response.offsets(topic, partition); + return offsets[0]; + } + + /** + * Uses the findLeader() logic we defined to find the new leader, except here we only try to connect to one of the + * replicas for the topic/partition. This way if we can’t reach any of the Brokers with the data we are interested + * in we give up and exit hard. + * + * @param a_oldLeader + * @param a_topic + * @param a_partition + * @param a_port + * @return + * @throws Exception + */ + private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { + for (int i = 0; i < numLeaderCheckAttempts; i++) { + boolean goToSleep = false; + PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); + if (metadata == null) { + goToSleep = true; + } else if (metadata.leader() == null) { + goToSleep = true; + } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { + // first time through if the leader hasn't changed give ZooKeeper a second to recover + // second time, assume the broker did recover before failover, or it was a non-Broker issue + goToSleep = true; + } else { + return metadata.leader().host(); + } + if (goToSleep) { + try { + Thread.sleep(delayLeaderCheckAttempts); + } catch (InterruptedException ie) { + } + } + } + logger.error("Unable to find new leader after Broker failure. Exiting"); + throw new Exception("Unable to find new leader after Broker failure. Exiting"); + } + + /** + * Query a live broker to find out leader information and replica information for a given topic and partition + * + * @param a_seedBrokers + * @param a_port + * @param a_topic + * @param a_partition + * @return + */ + private PartitionMetadata findLeader(List a_seedBrokers, int a_port, String a_topic, int a_partition) { + PartitionMetadata returnMetaData = null; + for (String seed : a_seedBrokers) { + SimpleConsumer consumer = null; + try { + consumer = new SimpleConsumer(seed, a_port, kafkaConnectionTimeOut, kafkaBufferSize, leaderClientID); + List topics = new ArrayList(); + topics.add(a_topic); + TopicMetadataRequest req = new TopicMetadataRequest(topics); + kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); + + //call to topicsMetadata() asks the Broker you are connected to for all the details about the topic we are interested in + List metaData = resp.topicsMetadata(); + //loop on partitionsMetadata iterates through all the partitions until we find the one we want. + for (TopicMetadata item : metaData) { + for (PartitionMetadata part : item.partitionsMetadata()) { + if (part.partitionId() == a_partition) { + returnMetaData = part; + break; + } + } + } + } catch (Exception e) { + logger.error("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + + ", " + a_partition + "] Reason: " + e); + } finally { + if (consumer != null) consumer.close(); + } + } + // add replica broker info to m_replicaBrokers + if (returnMetaData != null) { + m_replicaBrokers.clear(); + for (kafka.cluster.Broker replica : returnMetaData.replicas()) { + m_replicaBrokers.add(replica.host()); + } + } + return returnMetaData; + } + +} diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaStream.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaStream.java new file mode 100644 index 00000000..ae4dcde3 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaStream.java @@ -0,0 +1,200 @@ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.github.javacliparser.IntOption; +import com.github.javacliparser.StringOption; +import org.apache.samoa.instances.Attribute; +import org.apache.samoa.instances.Instance; +import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.instances.Instances; +import org.apache.samoa.moa.core.Example; +import org.apache.samoa.moa.core.FastVector; +import org.apache.samoa.moa.core.InstanceExample; +import org.apache.samoa.moa.core.ObjectRepository; +import org.apache.samoa.moa.options.AbstractOptionHandler; +import org.apache.samoa.moa.streams.InstanceStream; +import org.apache.samoa.moa.tasks.TaskMonitor; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +public class KafkaStream extends AbstractOptionHandler implements + InstanceStream { + + private static final long serialVersionUID = 1L; + + protected InstancesHeader streamHeader; + + protected Instances instances; + + private KafkaReader reader; + + private KafkaToInstanceMapper mapper; + + protected InstanceExample lastInstanceRead; + + List seeds = new ArrayList(); + + // This is used to buffer messages read from kafka. It helps reducing number of queries to kafka + protected Queue instanceQueue; + + public IntOption classIndexOption = new IntOption("classIndex", 'c', + "Class index of data. 0 for none or -1 for last attribute in file.", + -1, -1, Integer.MAX_VALUE); + + public IntOption numAttrOption = new IntOption("numNumerics", 'u', + "The number of numeric attributes in" + + " dataset", 300, 0, Integer.MAX_VALUE); + + public StringOption topicOption = new StringOption("topic", 't', + "Topic in the kafka to be used for reading data", "test"); + + public IntOption numMaxreadOption = new IntOption("numMaxread", 'r', + "Number of instances to be read in single read from kafka", 1, 0, + Integer.MAX_VALUE); + + public IntOption partitionOption = new IntOption("partition", 'n', + "Partition number to be used for reading data", 0); + + public IntOption portOption = new IntOption("port", 'p', + "Port in kafka to read data", 9092); + + public StringOption seedOption = new StringOption("seed", 's', + "Seeds for kafka", "localhost"); + + public IntOption numClassesOption = new IntOption("numClasses", 'k', + "The number of classes in the data.", 2, 2, Integer.MAX_VALUE); + + public IntOption timeDelayOption = new IntOption("timeDelay", 'y', + "Time delay in milliseconds between two read from kafka", 0, 0, Integer.MAX_VALUE); + + public IntOption instanceType = new IntOption("instanceType", 'i', + "Type of instance to be used. DenseInstance(0)/SparaseInstance(1)", 0); + + public StringOption keyValueSeparator = new StringOption("keyValueSeparator", 'a', + "Separator between key and value for string read from kafka", ":"); + + public StringOption valuesSeparator = new StringOption("valuesSeparator", 'b', + "Separator between values for string read from kafka", ","); + + public void KafkaReader() { + reader = new KafkaReader(); + } + + @Override + protected void prepareForUseImpl(TaskMonitor monitor, + ObjectRepository repository) { + this.reader = new KafkaReader(); + this.mapper = new KafkaToInstanceMapper(); + generateHeader(); + instanceQueue = new LinkedList(); + seeds.add(this.seedOption.getValue()); + } + + + protected void generateHeader() { + FastVector attributes = new FastVector<>(); + + for (int i = 0; i < this.numAttrOption.getValue(); i++) { + attributes.addElement(new Attribute("numeric" + (i + 1))); + } + FastVector classLabels = new FastVector<>(); + for (int i = 0; i < this.numClassesOption.getValue(); i++) { + classLabels.addElement("class" + (i + 1)); + } + + attributes.addElement(new Attribute("class", classLabels)); + this.streamHeader = new InstancesHeader(new Instances( + getCLICreationString(InstanceStream.class), attributes, 0)); + + if (this.classIndexOption.getValue() < 0) { + this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1); + } else if (this.classIndexOption.getValue() > 0) { + this.streamHeader.setClassIndex(this.classIndexOption.getValue() - 1); + } + } + + @Override + public InstancesHeader getHeader() { + return this.streamHeader; + } + + @Override + public long estimatedRemainingInstances() { + return -1; + } + + private String getNextInstanceFromKafka() { + if (!instanceQueue.isEmpty()) { + return instanceQueue.remove(); + } + + ArrayList kafkaData; + do { + kafkaData = this.reader.run(this.numMaxreadOption.getValue(), + this.topicOption.getValue(), this.partitionOption.getValue(), + this.seeds, this.portOption.getValue()); + } while (kafkaData == null); + + instanceQueue.addAll(kafkaData); + return instanceQueue.remove(); + } + + @Override + public Example nextInstance() { + InstancesHeader header = getHeader(); + Instance inst; + String kafkaString = getNextInstanceFromKafka(); + inst = mapper.getInstance(kafkaString, keyValueSeparator.getValue(), valuesSeparator.getValue(), + instanceType.getValue(), numAttrOption.getValue(), header); + + try { + Thread.sleep(timeDelayOption.getValue()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return new InstanceExample(inst); + } + + @Override + public boolean isRestartable() { + // TODO Auto-generated method stub + return true; + } + + @Override + public void restart() { + this.reader = new KafkaReader(); + } + + @Override + public boolean hasMoreInstances() { + return true; + } + + @Override + public void getDescription(StringBuilder sb, int indent) { + // TODO Auto-generated method stub + } +} diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaToInstanceMapper.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaToInstanceMapper.java new file mode 100644 index 00000000..23b57491 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaToInstanceMapper.java @@ -0,0 +1,57 @@ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.instances.DenseInstance; +import org.apache.samoa.instances.Instance; +import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.instances.SparseInstance; + +public class KafkaToInstanceMapper { + + public Instance getInstance(String message, String keyValueSeparator, String valuesSeparator, int instanceType, int numAttr, InstancesHeader header) { + Instance inst; + if (!message.isEmpty()) { + String[] KeyValueString = message.split(keyValueSeparator); + String[] attributes = KeyValueString[1].split(valuesSeparator); + inst = createInstance(header, instanceType); + for (int i = 0; i < attributes.length - 1; i++) { + if (i < numAttr) { + inst.setValue(i, Double.parseDouble(attributes[i])); + } + } + inst.setDataset(header); + inst.setClassValue(Double + .parseDouble(attributes[attributes.length - 1])); + return inst; + } + throw new IllegalArgumentException("Empty string value from Kafka"); + } + + public Instance createInstance(InstancesHeader header, int instanceType) { + Instance inst; + if (instanceType == 0) + inst = new DenseInstance(header.numAttributes()); + else + inst = new SparseInstance(header.numAttributes()); + return inst; + } +}