Pyspark integration into Pycharm -
i'm bit lost on how confiure pycharm can directly run scripts within pyspark. i'm using pyspark ontop of elasticsearch cluster , using following code run script. it's running default python interpreter tried configure pyspark shell interpreter, didn't work error it's not valid sdk:
__author__ = 'lucas' pyspark import sparkcontext, sparkconf if __name__ == "__main__": conf = sparkconf().setappname("estest") sc = sparkcontext(conf=conf) es_read_conf = { "es.nodes" : "localhost", "es.port" : "9200", "es.resource" : "titanic/passenger" } es_rdd = sc.newapihadooprdd( inputformatclass="org.elasticsearch.hadoop.mr.esinputformat", keyclass="org.apache.hadoop.io.nullwritable", valueclass="org.elasticsearch.hadoop.mr.linkedmapwritable", conf=es_read_conf) es_write_conf = { "es.nodes" : "localhost", "es.port" : "9200", "es.resource" : "titanic/value_counts" } doc = es_rdd.first()[1] field in doc: value_counts = es_rdd.map(lambda item: item[1][field]) value_counts = value_counts.map(lambda word: (word, 1)) value_counts = value_counts.reducebykey(lambda a, b: a+b) value_counts = value_counts.filter(lambda item: item[1] > 1) value_counts = value_counts.map(lambda item: ('key', { 'field': field, 'val': item[0], 'count': item[1] })) value_counts.saveasnewapihadoopfile( path='-', outputformatclass="org.elasticsearch.hadoop.mr.esoutputformat", keyclass="org.apache.hadoop.io.nullwritable", valueclass="org.elasticsearch.hadoop.mr.linkedmapwritable", conf=es_write_conf)
but generates following stacktrace:
traceback (most recent call last): file "/home/lucas/pycharmprojects/tweetspark/analytics/tweetanalyzer.py", line 20, in <module> conf=es_read_conf) file "/var/opt/spark/python/pyspark/context.py", line 601, in newapihadooprdd jconf, batchsize) file "/var/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ file "/var/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.py4jjavaerror: error occurred while calling z:org.apache.spark.api.python.pythonrdd.newapihadooprdd. : java.lang.classnotfoundexception: org.elasticsearch.hadoop.mr.linkedmapwritable @ java.net.urlclassloader$1.run(urlclassloader.java:366) @ java.net.urlclassloader$1.run(urlclassloader.java:355) @ java.security.accesscontroller.doprivileged(native method) @ java.net.urlclassloader.findclass(urlclassloader.java:354) @ java.lang.classloader.loadclass(classloader.java:425) @ java.lang.classloader.loadclass(classloader.java:358) @ java.lang.class.forname0(native method) @ java.lang.class.forname(class.java:278) @ org.apache.spark.util.utils$.classforname(utils.scala:179) @ org.apache.spark.api.python.pythonrdd$.newapihadooprddfromclassnames(pythonrdd.scala:519) @ org.apache.spark.api.python.pythonrdd$.newapihadooprdd(pythonrdd.scala:503) @ org.apache.spark.api.python.pythonrdd.newapihadooprdd(pythonrdd.scala) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:606) @ py4j.reflection.methodinvoker.invoke(methodinvoker.java:231) @ py4j.reflection.reflectionengine.invoke(reflectionengine.java:379) @ py4j.gateway.invoke(gateway.java:259) @ py4j.commands.abstractcommand.invokemethod(abstractcommand.java:133) @ py4j.commands.callcommand.execute(callcommand.java:79) @ py4j.gatewayconnection.run(gatewayconnection.java:207) @ java.lang.thread.run(thread.java:745)
what missing elasticsearch-spark.jar. download elasticsearch-hadoop integration, find elasticsearch-spark under dist
subdirectory, set spark_classpath environment variable
os.environ['spark_classpath'] = "/path/to/elasticsearch-hadoop-2.3.0/dist/elasticsearch-spark_2.10-2.3.0.jar"
Comments
Post a Comment